MATIH Platform is in active MVP development. Documentation reflects current implementation status.
10a. Data Ingestion
API Reference

API Reference

This section documents all REST API endpoints provided by the Ingestion Service. The service runs on port 8113 and is accessible through the API Gateway at the /api/v1/ path prefix.

All endpoints require the X-Tenant-Id header for tenant-scoped operations (except GET /api/v1/sources/connector-types and GET /api/v1/files/formats which are tenant-independent).


Authentication

All requests must include a valid JWT bearer token in the Authorization header and the tenant identifier in the X-Tenant-Id header.

Authorization: Bearer <jwt-token>
X-Tenant-Id: <tenant-uuid>

Sources API

Sources represent external data systems configured for ingestion. Each source maps to an Airbyte source instance.

Create Source

Creates a new ingestion source and registers it with Airbyte.

POST /api/v1/sources

Request Body:

{
  "name": "production-postgres",
  "description": "Production PostgreSQL database for order data",
  "connectorType": "postgres",
  "connectionConfig": {
    "host": "orders-db.example.com",
    "port": 5432,
    "database": "orders",
    "username": "readonly_user",
    "password": "********",
    "ssl_mode": "require"
  }
}
FieldTypeRequiredDescription
namestringYesHuman-readable source name
descriptionstringNoDescription of the data source
connectorTypestringYesAirbyte connector type identifier (e.g., postgres, salesforce, s3)
connectionConfigobjectYesConnector-specific configuration map

Response: 201 Created

{
  "id": "a1b2c3d4-e5f6-7890-abcd-ef1234567890",
  "name": "production-postgres",
  "description": "Production PostgreSQL database for order data",
  "connectorType": "postgres",
  "status": "INACTIVE",
  "lastTestedAt": null,
  "createdAt": "2024-03-15T10:00:00Z"
}

Get Source

Retrieves a single source by ID.

GET /api/v1/sources/{sourceId}

Response: 200 OK

{
  "id": "a1b2c3d4-e5f6-7890-abcd-ef1234567890",
  "name": "production-postgres",
  "description": "Production PostgreSQL database for order data",
  "connectorType": "postgres",
  "status": "ACTIVE",
  "lastTestedAt": "2024-03-15T10:05:00Z",
  "createdAt": "2024-03-15T10:00:00Z"
}

List Sources

Lists all sources for the current tenant.

GET /api/v1/sources

Response: 200 OK

[
  {
    "id": "a1b2c3d4-...",
    "name": "production-postgres",
    "connectorType": "postgres",
    "status": "ACTIVE",
    "lastTestedAt": "2024-03-15T10:05:00Z",
    "createdAt": "2024-03-15T10:00:00Z"
  },
  {
    "id": "b2c3d4e5-...",
    "name": "salesforce-crm",
    "connectorType": "salesforce",
    "status": "ACTIVE",
    "lastTestedAt": "2024-03-14T08:00:00Z",
    "createdAt": "2024-03-10T14:30:00Z"
  }
]

Update Source

Updates an existing source. Connection configuration changes require re-testing.

PUT /api/v1/sources/{sourceId}

Request Body:

{
  "name": "production-postgres-updated",
  "description": "Updated description",
  "connectionConfig": {
    "host": "new-host.example.com",
    "port": 5432,
    "database": "orders",
    "username": "readonly_user",
    "password": "********"
  }
}

Response: 200 OK with updated SourceResponse.

Delete Source

Deletes a source and its Airbyte counterpart. Fails if the source has active connections.

DELETE /api/v1/sources/{sourceId}

Response: 204 No Content

Test Connection

Tests the source connection by delegating to Airbyte's connection check.

POST /api/v1/sources/{sourceId}/test

Response: 200 OK

{
  "id": "a1b2c3d4-...",
  "name": "production-postgres",
  "connectorType": "postgres",
  "status": "ACTIVE",
  "lastTestedAt": "2024-03-15T10:05:00Z",
  "createdAt": "2024-03-15T10:00:00Z"
}

On failure, status is set to ERROR.

Discover Schema

Discovers the schema of a source, returning available streams with their columns and supported sync modes.

POST /api/v1/sources/{sourceId}/discover

Response: 200 OK

{
  "streams": [
    {
      "name": "public.orders",
      "columns": [
        { "name": "order_id", "type": "INTEGER", "nullable": false, "sampleValues": [] },
        { "name": "customer_id", "type": "INTEGER", "nullable": false, "sampleValues": [] },
        { "name": "order_date", "type": "DATE", "nullable": false, "sampleValues": [] },
        { "name": "total_amount", "type": "DECIMAL", "nullable": false, "sampleValues": [] },
        { "name": "status", "type": "STRING", "nullable": true, "sampleValues": [] }
      ],
      "supportedSyncModes": ["FULL_REFRESH", "INCREMENTAL", "CDC"]
    },
    {
      "name": "public.customers",
      "columns": [
        { "name": "customer_id", "type": "INTEGER", "nullable": false, "sampleValues": [] },
        { "name": "name", "type": "STRING", "nullable": false, "sampleValues": [] },
        { "name": "email", "type": "STRING", "nullable": true, "sampleValues": [] }
      ],
      "supportedSyncModes": ["FULL_REFRESH", "INCREMENTAL", "CDC"]
    }
  ]
}

List Connector Types

Lists all available connector type identifiers. This endpoint does not require X-Tenant-Id.

GET /api/v1/sources/connector-types

Response: 200 OK

[
  "postgres",
  "mysql",
  "mongodb",
  "mssql",
  "oracle",
  "salesforce",
  "hubspot",
  "stripe",
  "s3",
  "gcs",
  "azure-blob-storage",
  "sftp-bulk",
  "github",
  "jira",
  "zendesk"
]

Connections API

Connections define a sync pipeline between a source and the Iceberg destination, including stream selection, sync mode, and schedule.

Create Connection

Creates a new connection for a source.

POST /api/v1/connections

Request Body:

{
  "sourceId": "a1b2c3d4-e5f6-7890-abcd-ef1234567890",
  "name": "orders-daily-sync",
  "destinationType": "iceberg",
  "syncSchedule": "0 0 * * *",
  "selectedStreams": ["public.orders", "public.customers"],
  "syncMode": "INCREMENTAL"
}
FieldTypeRequiredDescription
sourceIdUUIDYesID of the source to sync from
namestringYesHuman-readable connection name
destinationTypestringNoDestination type (defaults to iceberg)
syncSchedulestringNoCron expression for scheduled syncs (null for manual only)
selectedStreamsstring[]NoStreams to sync (null for all discovered streams)
syncModeenumNoFULL_REFRESH, INCREMENTAL, or CDC (defaults to FULL_REFRESH)

Response: 201 Created

{
  "id": "c3d4e5f6-7890-abcd-ef12-34567890abcd",
  "sourceId": "a1b2c3d4-...",
  "sourceName": "production-postgres",
  "name": "orders-daily-sync",
  "destinationType": "iceberg",
  "syncSchedule": "0 0 * * *",
  "syncMode": "INCREMENTAL",
  "status": "PENDING",
  "lastSyncAt": null,
  "createdAt": "2024-03-15T10:10:00Z"
}

Get Connection

GET /api/v1/connections/{connectionId}

Response: 200 OK with ConnectionResponse.

List Connections

GET /api/v1/connections

Response: 200 OK with array of ConnectionResponse.

Update Connection

PUT /api/v1/connections/{connectionId}

Request Body: Same as CreateConnectionRequest.

Response: 200 OK with updated ConnectionResponse.

Delete Connection

DELETE /api/v1/connections/{connectionId}

Response: 204 No Content


Syncs API

The Syncs API manages sync job execution and history.

Trigger Sync

Triggers a manual sync for a connection.

POST /api/v1/syncs/connections/{connectionId}/trigger

Response: 202 Accepted

{
  "id": "d4e5f6a7-...",
  "connectionId": "c3d4e5f6-...",
  "connectionName": "orders-daily-sync",
  "status": "RUNNING",
  "recordsSynced": null,
  "bytesSynced": null,
  "startedAt": "2024-03-15T10:15:00Z",
  "completedAt": null,
  "durationMs": null
}

Cancel Sync

Cancels a running sync job.

POST /api/v1/syncs/{syncId}/cancel

Response: 204 No Content

Get Sync Status

Retrieves the current status of a sync job.

GET /api/v1/syncs/{syncId}

Response: 200 OK

{
  "id": "d4e5f6a7-...",
  "connectionId": "c3d4e5f6-...",
  "connectionName": "orders-daily-sync",
  "status": "SUCCEEDED",
  "recordsSynced": 145230,
  "bytesSynced": 52428800,
  "startedAt": "2024-03-15T10:15:00Z",
  "completedAt": "2024-03-15T10:18:30Z",
  "durationMs": 210000
}

List Sync History

Lists sync history with pagination and optional connection filter.

GET /api/v1/syncs?connectionId={connectionId}&page=0&size=20&sort=startedAt,desc
ParameterTypeRequiredDefaultDescription
connectionIdUUIDNo--Filter by connection ID
pageintegerNo0Page number (zero-indexed)
sizeintegerNo20Page size
sortstringNostartedAtSort field and direction

Response: 200 OK

{
  "content": [
    {
      "id": "d4e5f6a7-...",
      "connectionId": "c3d4e5f6-...",
      "connectionName": "orders-daily-sync",
      "status": "SUCCEEDED",
      "recordsSynced": 145230,
      "bytesSynced": 52428800,
      "startedAt": "2024-03-15T10:15:00Z",
      "completedAt": "2024-03-15T10:18:30Z",
      "durationMs": 210000
    }
  ],
  "totalElements": 47,
  "totalPages": 3,
  "number": 0,
  "size": 20
}

File Import API

The File Import API handles file upload, preview, schema configuration, and import execution.

Upload File

Uploads a file for import. The file is stored in tenant-scoped object storage.

POST /api/v1/files/upload
Content-Type: multipart/form-data
ParameterTypeRequiredDescription
filebinaryYesThe file to upload (multipart form field)

Response: 201 Created

{
  "id": "e5f6a7b8-...",
  "tenantId": "...",
  "fileName": "sales_2024.csv",
  "fileSize": 2456789,
  "fileFormat": "csv",
  "storagePath": "/tenant-abc/uploads/e5f6a7b8-.../sales_2024.csv",
  "status": "UPLOADED",
  "createdAt": "2024-03-15T10:30:00Z"
}

Get File Preview

Returns the inferred schema and sample rows from an uploaded file.

GET /api/v1/files/{fileId}/preview

Response: 200 OK

{
  "fileId": "e5f6a7b8-...",
  "fileName": "sales_2024.csv",
  "columns": [
    { "name": "order_id", "type": "INTEGER", "nullable": false, "sampleValues": ["1001", "1002", "1003"] },
    { "name": "customer_name", "type": "STRING", "nullable": true, "sampleValues": ["Alice", "Bob"] },
    { "name": "amount", "type": "DECIMAL", "nullable": false, "sampleValues": ["150.00", "299.50"] }
  ],
  "previewRows": [
    { "order_id": 1001, "customer_name": "Alice", "amount": 150.00 },
    { "order_id": 1002, "customer_name": "Bob", "amount": 299.50 }
  ],
  "totalRows": 15423,
  "inferredTypes": {
    "order_id": "INTEGER",
    "customer_name": "STRING",
    "amount": "DECIMAL"
  }
}

Update Schema

Updates the target table name, schema, and column mappings for an uploaded file before import.

PUT /api/v1/files/{fileId}/schema

Request Body:

{
  "targetTableName": "sales_orders_2024",
  "targetSchema": "raw_data",
  "columnMappings": {
    "order_id": "order_id",
    "customer_name": "customer",
    "amount": "total_amount"
  }
}
FieldTypeRequiredDescription
targetTableNamestringYesIceberg table name for the imported data
targetSchemastringNoIceberg schema/namespace (defaults to tenant default)
columnMappingsmapNoSource column name to target column name mappings

Response: 200 OK with updated FileImportJob.

Execute Import

Triggers the import of an uploaded and configured file into an Iceberg table.

POST /api/v1/files/{fileId}/import

Response: 202 Accepted

{
  "id": "e5f6a7b8-...",
  "fileName": "sales_2024.csv",
  "targetTableName": "sales_orders_2024",
  "targetSchema": "raw_data",
  "status": "IMPORTING",
  "createdAt": "2024-03-15T10:30:00Z"
}

Get Import Status

Polls the status of a file import job.

GET /api/v1/files/{fileId}/status

Response: 200 OK

{
  "id": "e5f6a7b8-...",
  "fileName": "sales_2024.csv",
  "status": "COMPLETED",
  "recordsImported": 15423,
  "completedAt": "2024-03-15T10:31:45Z"
}

List Supported Formats

Returns the list of supported file formats. This endpoint does not require X-Tenant-Id.

GET /api/v1/files/formats

Response: 200 OK

[
  "csv",
  "xlsx",
  "xls",
  "parquet",
  "json",
  "jsonl",
  "avro"
]

Error Responses

All endpoints return standardized error responses.

{
  "timestamp": "2024-03-15T10:30:00Z",
  "status": 400,
  "error": "Bad Request",
  "message": "Source name is required",
  "path": "/api/v1/sources"
}

Common Error Codes

HTTP StatusDescriptionExample
400 Bad RequestValidation error in the request bodyMissing required field, invalid connector type
401 UnauthorizedMissing or invalid JWT tokenExpired token, malformed Authorization header
403 ForbiddenInsufficient permissionsUser does not have data:ingestion:manage permission
404 Not FoundResource not foundSource ID does not exist or belongs to a different tenant
409 ConflictResource conflictDeleting a source that has active connections
500 Internal Server ErrorServer-side errorAirbyte API unreachable, database connection failure