API Reference
This section documents all REST API endpoints provided by the Ingestion Service. The service runs on port 8113 and is accessible through the API Gateway at the /api/v1/ path prefix.
All endpoints require the X-Tenant-Id header for tenant-scoped operations (except GET /api/v1/sources/connector-types and GET /api/v1/files/formats which are tenant-independent).
Authentication
All requests must include a valid JWT bearer token in the Authorization header and the tenant identifier in the X-Tenant-Id header.
Authorization: Bearer <jwt-token>
X-Tenant-Id: <tenant-uuid>Sources API
Sources represent external data systems configured for ingestion. Each source maps to an Airbyte source instance.
Create Source
Creates a new ingestion source and registers it with Airbyte.
POST /api/v1/sourcesRequest Body:
{
"name": "production-postgres",
"description": "Production PostgreSQL database for order data",
"connectorType": "postgres",
"connectionConfig": {
"host": "orders-db.example.com",
"port": 5432,
"database": "orders",
"username": "readonly_user",
"password": "********",
"ssl_mode": "require"
}
}| Field | Type | Required | Description |
|---|---|---|---|
name | string | Yes | Human-readable source name |
description | string | No | Description of the data source |
connectorType | string | Yes | Airbyte connector type identifier (e.g., postgres, salesforce, s3) |
connectionConfig | object | Yes | Connector-specific configuration map |
Response: 201 Created
{
"id": "a1b2c3d4-e5f6-7890-abcd-ef1234567890",
"name": "production-postgres",
"description": "Production PostgreSQL database for order data",
"connectorType": "postgres",
"status": "INACTIVE",
"lastTestedAt": null,
"createdAt": "2024-03-15T10:00:00Z"
}Get Source
Retrieves a single source by ID.
GET /api/v1/sources/{sourceId}Response: 200 OK
{
"id": "a1b2c3d4-e5f6-7890-abcd-ef1234567890",
"name": "production-postgres",
"description": "Production PostgreSQL database for order data",
"connectorType": "postgres",
"status": "ACTIVE",
"lastTestedAt": "2024-03-15T10:05:00Z",
"createdAt": "2024-03-15T10:00:00Z"
}List Sources
Lists all sources for the current tenant.
GET /api/v1/sourcesResponse: 200 OK
[
{
"id": "a1b2c3d4-...",
"name": "production-postgres",
"connectorType": "postgres",
"status": "ACTIVE",
"lastTestedAt": "2024-03-15T10:05:00Z",
"createdAt": "2024-03-15T10:00:00Z"
},
{
"id": "b2c3d4e5-...",
"name": "salesforce-crm",
"connectorType": "salesforce",
"status": "ACTIVE",
"lastTestedAt": "2024-03-14T08:00:00Z",
"createdAt": "2024-03-10T14:30:00Z"
}
]Update Source
Updates an existing source. Connection configuration changes require re-testing.
PUT /api/v1/sources/{sourceId}Request Body:
{
"name": "production-postgres-updated",
"description": "Updated description",
"connectionConfig": {
"host": "new-host.example.com",
"port": 5432,
"database": "orders",
"username": "readonly_user",
"password": "********"
}
}Response: 200 OK with updated SourceResponse.
Delete Source
Deletes a source and its Airbyte counterpart. Fails if the source has active connections.
DELETE /api/v1/sources/{sourceId}Response: 204 No Content
Test Connection
Tests the source connection by delegating to Airbyte's connection check.
POST /api/v1/sources/{sourceId}/testResponse: 200 OK
{
"id": "a1b2c3d4-...",
"name": "production-postgres",
"connectorType": "postgres",
"status": "ACTIVE",
"lastTestedAt": "2024-03-15T10:05:00Z",
"createdAt": "2024-03-15T10:00:00Z"
}On failure, status is set to ERROR.
Discover Schema
Discovers the schema of a source, returning available streams with their columns and supported sync modes.
POST /api/v1/sources/{sourceId}/discoverResponse: 200 OK
{
"streams": [
{
"name": "public.orders",
"columns": [
{ "name": "order_id", "type": "INTEGER", "nullable": false, "sampleValues": [] },
{ "name": "customer_id", "type": "INTEGER", "nullable": false, "sampleValues": [] },
{ "name": "order_date", "type": "DATE", "nullable": false, "sampleValues": [] },
{ "name": "total_amount", "type": "DECIMAL", "nullable": false, "sampleValues": [] },
{ "name": "status", "type": "STRING", "nullable": true, "sampleValues": [] }
],
"supportedSyncModes": ["FULL_REFRESH", "INCREMENTAL", "CDC"]
},
{
"name": "public.customers",
"columns": [
{ "name": "customer_id", "type": "INTEGER", "nullable": false, "sampleValues": [] },
{ "name": "name", "type": "STRING", "nullable": false, "sampleValues": [] },
{ "name": "email", "type": "STRING", "nullable": true, "sampleValues": [] }
],
"supportedSyncModes": ["FULL_REFRESH", "INCREMENTAL", "CDC"]
}
]
}List Connector Types
Lists all available connector type identifiers. This endpoint does not require X-Tenant-Id.
GET /api/v1/sources/connector-typesResponse: 200 OK
[
"postgres",
"mysql",
"mongodb",
"mssql",
"oracle",
"salesforce",
"hubspot",
"stripe",
"s3",
"gcs",
"azure-blob-storage",
"sftp-bulk",
"github",
"jira",
"zendesk"
]Connections API
Connections define a sync pipeline between a source and the Iceberg destination, including stream selection, sync mode, and schedule.
Create Connection
Creates a new connection for a source.
POST /api/v1/connectionsRequest Body:
{
"sourceId": "a1b2c3d4-e5f6-7890-abcd-ef1234567890",
"name": "orders-daily-sync",
"destinationType": "iceberg",
"syncSchedule": "0 0 * * *",
"selectedStreams": ["public.orders", "public.customers"],
"syncMode": "INCREMENTAL"
}| Field | Type | Required | Description |
|---|---|---|---|
sourceId | UUID | Yes | ID of the source to sync from |
name | string | Yes | Human-readable connection name |
destinationType | string | No | Destination type (defaults to iceberg) |
syncSchedule | string | No | Cron expression for scheduled syncs (null for manual only) |
selectedStreams | string[] | No | Streams to sync (null for all discovered streams) |
syncMode | enum | No | FULL_REFRESH, INCREMENTAL, or CDC (defaults to FULL_REFRESH) |
Response: 201 Created
{
"id": "c3d4e5f6-7890-abcd-ef12-34567890abcd",
"sourceId": "a1b2c3d4-...",
"sourceName": "production-postgres",
"name": "orders-daily-sync",
"destinationType": "iceberg",
"syncSchedule": "0 0 * * *",
"syncMode": "INCREMENTAL",
"status": "PENDING",
"lastSyncAt": null,
"createdAt": "2024-03-15T10:10:00Z"
}Get Connection
GET /api/v1/connections/{connectionId}Response: 200 OK with ConnectionResponse.
List Connections
GET /api/v1/connectionsResponse: 200 OK with array of ConnectionResponse.
Update Connection
PUT /api/v1/connections/{connectionId}Request Body: Same as CreateConnectionRequest.
Response: 200 OK with updated ConnectionResponse.
Delete Connection
DELETE /api/v1/connections/{connectionId}Response: 204 No Content
Syncs API
The Syncs API manages sync job execution and history.
Trigger Sync
Triggers a manual sync for a connection.
POST /api/v1/syncs/connections/{connectionId}/triggerResponse: 202 Accepted
{
"id": "d4e5f6a7-...",
"connectionId": "c3d4e5f6-...",
"connectionName": "orders-daily-sync",
"status": "RUNNING",
"recordsSynced": null,
"bytesSynced": null,
"startedAt": "2024-03-15T10:15:00Z",
"completedAt": null,
"durationMs": null
}Cancel Sync
Cancels a running sync job.
POST /api/v1/syncs/{syncId}/cancelResponse: 204 No Content
Get Sync Status
Retrieves the current status of a sync job.
GET /api/v1/syncs/{syncId}Response: 200 OK
{
"id": "d4e5f6a7-...",
"connectionId": "c3d4e5f6-...",
"connectionName": "orders-daily-sync",
"status": "SUCCEEDED",
"recordsSynced": 145230,
"bytesSynced": 52428800,
"startedAt": "2024-03-15T10:15:00Z",
"completedAt": "2024-03-15T10:18:30Z",
"durationMs": 210000
}List Sync History
Lists sync history with pagination and optional connection filter.
GET /api/v1/syncs?connectionId={connectionId}&page=0&size=20&sort=startedAt,desc| Parameter | Type | Required | Default | Description |
|---|---|---|---|---|
connectionId | UUID | No | -- | Filter by connection ID |
page | integer | No | 0 | Page number (zero-indexed) |
size | integer | No | 20 | Page size |
sort | string | No | startedAt | Sort field and direction |
Response: 200 OK
{
"content": [
{
"id": "d4e5f6a7-...",
"connectionId": "c3d4e5f6-...",
"connectionName": "orders-daily-sync",
"status": "SUCCEEDED",
"recordsSynced": 145230,
"bytesSynced": 52428800,
"startedAt": "2024-03-15T10:15:00Z",
"completedAt": "2024-03-15T10:18:30Z",
"durationMs": 210000
}
],
"totalElements": 47,
"totalPages": 3,
"number": 0,
"size": 20
}File Import API
The File Import API handles file upload, preview, schema configuration, and import execution.
Upload File
Uploads a file for import. The file is stored in tenant-scoped object storage.
POST /api/v1/files/upload
Content-Type: multipart/form-data| Parameter | Type | Required | Description |
|---|---|---|---|
file | binary | Yes | The file to upload (multipart form field) |
Response: 201 Created
{
"id": "e5f6a7b8-...",
"tenantId": "...",
"fileName": "sales_2024.csv",
"fileSize": 2456789,
"fileFormat": "csv",
"storagePath": "/tenant-abc/uploads/e5f6a7b8-.../sales_2024.csv",
"status": "UPLOADED",
"createdAt": "2024-03-15T10:30:00Z"
}Get File Preview
Returns the inferred schema and sample rows from an uploaded file.
GET /api/v1/files/{fileId}/previewResponse: 200 OK
{
"fileId": "e5f6a7b8-...",
"fileName": "sales_2024.csv",
"columns": [
{ "name": "order_id", "type": "INTEGER", "nullable": false, "sampleValues": ["1001", "1002", "1003"] },
{ "name": "customer_name", "type": "STRING", "nullable": true, "sampleValues": ["Alice", "Bob"] },
{ "name": "amount", "type": "DECIMAL", "nullable": false, "sampleValues": ["150.00", "299.50"] }
],
"previewRows": [
{ "order_id": 1001, "customer_name": "Alice", "amount": 150.00 },
{ "order_id": 1002, "customer_name": "Bob", "amount": 299.50 }
],
"totalRows": 15423,
"inferredTypes": {
"order_id": "INTEGER",
"customer_name": "STRING",
"amount": "DECIMAL"
}
}Update Schema
Updates the target table name, schema, and column mappings for an uploaded file before import.
PUT /api/v1/files/{fileId}/schemaRequest Body:
{
"targetTableName": "sales_orders_2024",
"targetSchema": "raw_data",
"columnMappings": {
"order_id": "order_id",
"customer_name": "customer",
"amount": "total_amount"
}
}| Field | Type | Required | Description |
|---|---|---|---|
targetTableName | string | Yes | Iceberg table name for the imported data |
targetSchema | string | No | Iceberg schema/namespace (defaults to tenant default) |
columnMappings | map | No | Source column name to target column name mappings |
Response: 200 OK with updated FileImportJob.
Execute Import
Triggers the import of an uploaded and configured file into an Iceberg table.
POST /api/v1/files/{fileId}/importResponse: 202 Accepted
{
"id": "e5f6a7b8-...",
"fileName": "sales_2024.csv",
"targetTableName": "sales_orders_2024",
"targetSchema": "raw_data",
"status": "IMPORTING",
"createdAt": "2024-03-15T10:30:00Z"
}Get Import Status
Polls the status of a file import job.
GET /api/v1/files/{fileId}/statusResponse: 200 OK
{
"id": "e5f6a7b8-...",
"fileName": "sales_2024.csv",
"status": "COMPLETED",
"recordsImported": 15423,
"completedAt": "2024-03-15T10:31:45Z"
}List Supported Formats
Returns the list of supported file formats. This endpoint does not require X-Tenant-Id.
GET /api/v1/files/formatsResponse: 200 OK
[
"csv",
"xlsx",
"xls",
"parquet",
"json",
"jsonl",
"avro"
]Error Responses
All endpoints return standardized error responses.
{
"timestamp": "2024-03-15T10:30:00Z",
"status": 400,
"error": "Bad Request",
"message": "Source name is required",
"path": "/api/v1/sources"
}Common Error Codes
| HTTP Status | Description | Example |
|---|---|---|
400 Bad Request | Validation error in the request body | Missing required field, invalid connector type |
401 Unauthorized | Missing or invalid JWT token | Expired token, malformed Authorization header |
403 Forbidden | Insufficient permissions | User does not have data:ingestion:manage permission |
404 Not Found | Resource not found | Source ID does not exist or belongs to a different tenant |
409 Conflict | Resource conflict | Deleting a source that has active connections |
500 Internal Server Error | Server-side error | Airbyte API unreachable, database connection failure |