MATIH Platform is in active MVP development. Documentation reflects current implementation status.
10a. Data Ingestion
Architecture

Architecture

This section describes the internal architecture of the Matih Data Ingestion subsystem, the data flows for both connector-based and file-based ingestion, the per-tenant deployment model, and how ingested data integrates with the rest of the platform.


System Architecture

The ingestion subsystem spans both the Control Plane and the Data Plane. The Ingestion Service lives in the Control Plane and provides the management API. Airbyte runs in the Data Plane and performs the actual data extraction and loading.

+===========================================================================+
|                           CONTROL PLANE                                    |
|                                                                            |
|  +-------------------+      +---------------------+                       |
|  | Tenant Service    |----->| Ingestion Service   |                       |
|  | (Provisioning)    |      | (Java/Spring Boot)  |                       |
|  +-------------------+      | Port 8113           |                       |
|                              |                     |                       |
|                              | - Source CRUD        |                       |
|                              | - Connection CRUD    |                       |
|                              | - Sync orchestration |                       |
|                              | - File import        |                       |
|                              +----------+----------+                       |
|                                         |                                  |
+=========================================|==================================+
                                          |
                             Airbyte API  |  Pipeline Service API
                                          |
+=========================================|==================================+
|                           DATA PLANE    |                                  |
|                                         |                                  |
|  +-------------------+      +----------v----------+                       |
|  | Pipeline Service  |<-----| Airbyte             |                       |
|  | (FastAPI)          |      | (Per-Tenant)        |                       |
|  | Port 8112          |      | - Source connectors  |                       |
|  +--------+----------+      | - Destination: Iceberg|                      |
|           |                  +----------+----------+                       |
|           |                             |                                  |
|  +--------v----------+      +----------v----------+                       |
|  | Apache Spark      |      | Apache Iceberg      |                       |
|  | (Transformations) |      | (Polaris Catalog)   |                       |
|  +-------------------+      +----------+----------+                       |
|                                         |                                  |
+=========================================|==================================+
                                          |
                              +-----------+-----------+
                              |           |           |
                        +-----v---+ +----v-----+ +---v--------+
                        | Trino   | |ClickHouse| | StarRocks  |
                        | v458    | |          | |            |
                        +---------+ +----------+ +------------+

Core Components

Ingestion Service (Control Plane)

The Ingestion Service is a Java 21 / Spring Boot 3.2 application running on port 8113. It serves as the management and orchestration layer for all ingestion operations.

ResponsibilityImplementation
Source managementSourceController + SourceService -- CRUD operations for external data sources with connection testing and schema discovery
Connection managementConnectionController + ConnectionService -- configures sync pipelines between sources and the Iceberg destination
Sync orchestrationSyncController + SyncService -- triggers, cancels, and monitors Airbyte sync jobs
File importFileImportController + FileImportService -- handles multipart upload, schema inference, preview, and import execution
Airbyte integrationAirbyteClient -- REST client for the Airbyte API (source creation, schema discovery, connection setup, job management)
Pipeline integrationPipelineServiceClient -- communicates with the Pipeline Service for post-ingestion transformations

Airbyte (Data Plane)

Each tenant receives an isolated Airbyte deployment in the Data Plane. Airbyte provides:

  • 600+ source connectors for databases, SaaS apps, APIs, files, and cloud storage
  • Iceberg destination connector that writes extracted data into Apache Iceberg tables via the Polaris REST Catalog
  • Schema discovery that introspects source systems and reports available streams (tables, collections, API endpoints) with column types
  • Sync modes including Full Refresh (complete re-extraction), Incremental (cursor-based delta), and CDC (change data capture via database logs)
  • Job management with automatic retries, backpressure handling, and structured logging

Polaris REST Catalog (Data Plane)

All ingested data lands in Apache Iceberg tables managed by the Polaris REST Catalog. Polaris provides:

  • Namespace isolation per tenant (e.g., tenant_abc.raw_data)
  • ACID transactions ensuring readers never see partial sync results
  • Schema evolution allowing Airbyte to add columns as source schemas change
  • Time travel enabling queries against historical data snapshots

Data Flow: Connector-Based Ingestion (Airbyte)

The following sequence describes the full lifecycle of a connector-based sync.

User                    Ingestion Service          Airbyte              Iceberg/Polaris
 |                           |                       |                       |
 |-- 1. Create Source ------>|                       |                       |
 |                           |-- 2. Create Airbyte -->|                      |
 |                           |      Source            |                      |
 |                           |<-- Airbyte Source ID --|                      |
 |<-- Source Created --------|                       |                       |
 |                           |                       |                       |
 |-- 3. Test Connection ---->|                       |                       |
 |                           |-- 4. Check Source ---->|                      |
 |                           |<-- Connection OK ------|                      |
 |<-- Test Result -----------|                       |                       |
 |                           |                       |                       |
 |-- 5. Discover Schema ---->|                       |                       |
 |                           |-- 6. Discover ------->|                      |
 |                           |<-- Streams + Columns --|                      |
 |<-- Schema Response -------|                       |                       |
 |                           |                       |                       |
 |-- 7. Create Connection -->|                       |                       |
 |   (select streams,       |-- 8. Create Airbyte -->|                      |
 |    set schedule,          |      Connection        |                      |
 |    choose sync mode)      |<-- Connection ID ------|                      |
 |<-- Connection Created ----|                       |                       |
 |                           |                       |                       |
 |-- 9. Trigger Sync ------->|                       |                       |
 |                           |-- 10. Trigger Job ---->|                      |
 |                           |<-- Job ID -------------|                      |
 |<-- Sync Started ----------|                       |                       |
 |                           |                       |-- 11. Extract ------->|
 |                           |                       |      & Load           |
 |                           |                       |<-- Write Confirmed ---|
 |                           |<-- Job Complete -------|                      |
 |<-- Sync Completed --------|                       |                       |

Step-by-Step

  1. Source creation. The user submits a CreateSourceRequest with connector type (e.g., postgres) and connection configuration (host, port, database, credentials). The Ingestion Service stores the source metadata and calls the Airbyte API to create a corresponding Airbyte source.

  2. Connection testing. The user triggers a connection test. The Ingestion Service delegates to Airbyte's checkSource endpoint, which attempts to connect using the provided credentials and reports success or failure.

  3. Schema discovery. The user triggers schema discovery. Airbyte introspects the source system and returns a list of available streams (tables, collections) with their columns, data types, and supported sync modes.

  4. Connection creation. The user selects which streams to sync, chooses a sync mode (Full Refresh, Incremental, or CDC), and optionally sets a cron schedule. The Ingestion Service creates an Airbyte connection linking the source to the tenant's Iceberg destination.

  5. Sync execution. On trigger (manual or scheduled), the Ingestion Service calls Airbyte to start a sync job. Airbyte extracts data from the source and writes it to Iceberg tables in the tenant's namespace. The Ingestion Service polls for job status and records sync history (records synced, bytes transferred, duration, errors).


Data Flow: File Import

File import bypasses Airbyte entirely. The Ingestion Service handles the full lifecycle.

User                    Ingestion Service          Object Storage         Iceberg/Polaris
 |                           |                       |                       |
 |-- 1. Upload File -------->|                       |                       |
 |   (multipart/form-data)  |-- 2. Store File ------>|                      |
 |                           |<-- Storage Path -------|                      |
 |<-- FileImportJob ----------|                       |                       |
 |   (status: UPLOADED)      |                       |                       |
 |                           |                       |                       |
 |-- 3. Get Preview -------->|                       |                       |
 |                           |-- 4. Read & Parse ---->|                      |
 |                           |<-- File Content -------|                      |
 |<-- FilePreviewResponse ---|                       |                       |
 |   (columns, sample rows,  |                       |                       |
 |    inferred types)        |                       |                       |
 |                           |                       |                       |
 |-- 5. Update Schema ------>|                       |                       |
 |   (target table, column   |                       |                       |
 |    mappings)              |                       |                       |
 |<-- FileImportJob ----------|                       |                       |
 |                           |                       |                       |
 |-- 6. Execute Import ----->|                       |                       |
 |                           |-- 7. Read File ------->|                      |
 |                           |                       |                       |
 |                           |-- 8. Write to ---------|----->                |
 |                           |      Iceberg           |                      |
 |<-- FileImportJob ----------|                       |                       |
 |   (status: COMPLETED,     |                       |                       |
 |    records_imported: N)   |                       |                       |

Step-by-Step

  1. Upload. The user uploads a file via multipart/form-data. The Ingestion Service stores the file in object storage and creates a FileImportJob record with status UPLOADED.

  2. Preview. The user requests a preview. The Ingestion Service reads the file, infers column names and types, and returns sample rows along with the detected schema. For CSV files, delimiter and encoding are auto-detected. For Excel files, each sheet is available as a separate preview.

  3. Schema update. The user optionally adjusts the target table name, target schema, and column mappings (rename, retype, exclude columns).

  4. Import execution. The user triggers the import. The Ingestion Service reads the file from object storage, applies the column mappings, and writes the data into an Iceberg table in the tenant's namespace. The job status transitions through IMPORTING to COMPLETED (or FAILED), recording the number of records imported.


Platform Component Integration Map

Ingested data flows through the platform via the following integration points.

+------------------+         +------------------+         +------------------+
| External Sources |         | Airbyte          |         | Iceberg Tables   |
| (Databases, SaaS,|-------->| (Extract & Load) |-------->| (Polaris Catalog)|
|  Cloud Storage,  |         +------------------+         +--------+---------+
|  Files)          |                                               |
+------------------+                                               |
                                                                   |
         +--------------------+--------------------+---------------+
         |                    |                    |               |
  +------v------+    +--------v-------+    +-------v------+  +---v-----------+
  | Trino v458  |    | ClickHouse     |    | StarRocks    |  | Spark v4.1.1  |
  | (OLAP SQL)  |    | (Real-time)    |    | (MPP SQL)    |  | (Batch ETL)   |
  +------+------+    +--------+-------+    +-------+------+  +---+-----------+
         |                    |                    |              |
         +--------------------+--------------------+--------------+
                              |
                    +---------v---------+
                    |   Query Engine    |
                    |   (Port 8080)     |
                    +---------+---------+
                              |
         +--------------------+--------------------+
         |                    |                    |
  +------v------+    +-------v-------+    +-------v-------+
  | AI Service  |    | BI Service    |    | SQL Workbench |
  | (NL to SQL) |    | (Dashboards)  |    | (Direct SQL)  |
  +-------------+    +---------------+    +---------------+

Integration Details

Downstream ComponentIntegration PointDescription
TrinoPolaris REST CatalogTrino reads Iceberg tables via Polaris. Ingested data is queryable within seconds of sync completion.
ClickHouseIceberg table functionsClickHouse accesses Iceberg data for real-time analytics workloads.
StarRocksIceberg external catalogStarRocks queries Iceberg tables for MPP analytical workloads.
SparkIceberg Spark integrationSpark reads and transforms Iceberg tables for batch ETL pipelines.
OpenMetadataAirbyte lineage eventsAirbyte emits OpenLineage events that OpenMetadata captures for data lineage tracking.
dbtIceberg tables as sourcesdbt models reference ingested Iceberg tables as source data for transformations.
KafkaCDC events (via Flink)Flink CDC jobs consume change events from ingested tables for real-time streaming.
Data Quality ServiceIceberg table validationPost-ingestion quality checks validate freshness, completeness, and schema conformance.

Per-Tenant Deployment Model

The ingestion subsystem enforces tenant isolation at multiple layers.

+-------------------------------------------------------------------+
|                        Tenant A                                    |
|                                                                    |
|  +------------------+    +------------------+    +---------------+ |
|  | Airbyte Instance |    | Iceberg Namespace|    | K8s Secrets   | |
|  | (Dedicated pods) |    | tenant_a.*       |    | (Credentials) | |
|  +------------------+    +------------------+    +---------------+ |
+-------------------------------------------------------------------+

+-------------------------------------------------------------------+
|                        Tenant B                                    |
|                                                                    |
|  +------------------+    +------------------+    +---------------+ |
|  | Airbyte Instance |    | Iceberg Namespace|    | K8s Secrets   | |
|  | (Dedicated pods) |    | tenant_b.*       |    | (Credentials) | |
|  +------------------+    +------------------+    +---------------+ |
+-------------------------------------------------------------------+

Isolation Guarantees

LayerIsolation Mechanism
ComputeEach tenant runs a dedicated Airbyte server and worker pods. Sync jobs do not share compute resources across tenants.
StorageIngested data lands in tenant-specific Iceberg namespaces. Polaris enforces namespace-level access control.
CredentialsSource connection credentials are stored in tenant-scoped Kubernetes secrets. No tenant can access another tenant's credentials.
NetworkAirbyte pods run in tenant-specific Kubernetes namespaces with network policies preventing cross-tenant communication.
APIEvery Ingestion Service API call requires an X-Tenant-Id header. The service filters all database queries by tenant_id.

Provisioning Flow

When a new tenant is created via the Tenant Service:

  1. The Tenant Service calls the Ingestion Service to provision ingestion infrastructure
  2. A dedicated Airbyte instance is deployed in the tenant's Data Plane namespace
  3. The Iceberg destination connector is pre-configured pointing to the tenant's Polaris namespace
  4. Kubernetes secrets are created for the Airbyte-to-Polaris connection
  5. The tenant's users can immediately begin configuring sources and running syncs

Entity Relationship Model

The Ingestion Service manages four core entities.

+--------------------+        +------------------------+
| IngestionSource    |        | IngestionConnection    |
|--------------------|        |------------------------|
| id (UUID)          |<-------| id (UUID)              |
| tenantId           |        | tenantId               |
| name               |        | sourceId (FK)          |
| connectorType      |        | name                   |
| airbyteSourceId    |        | airbyteConnectionId    |
| connectionConfig   |        | destinationType        |
| status             |        | syncSchedule           |
| lastTestedAt       |        | selectedStreams         |
+--------------------+        | syncMode               |
                              | status                 |
                              | lastSyncAt             |
                              +----------+-------------+
                                         |
                                         | 1:N
                                         |
                              +----------v-------------+
                              | SyncHistory            |
                              |------------------------|
                              | id (UUID)              |
                              | tenantId               |
                              | connectionId (FK)      |
                              | airbyteJobId           |
                              | status                 |
                              | recordsSynced          |
                              | bytesSynced            |
                              | startedAt              |
                              | completedAt            |
                              | durationMs             |
                              | errorMessage           |
                              +------------------------+

+--------------------+
| FileImportJob      |
|--------------------|
| id (UUID)          |
| tenantId           |
| fileName           |
| fileSize           |
| fileFormat         |
| storagePath        |
| targetTableName    |
| targetSchema       |
| inferredSchema     |
| status             |
| recordsImported    |
| errorMessage       |
+--------------------+

Entity Status Enums

EntityStatuses
IngestionSourceACTIVE, INACTIVE, ERROR, TESTING
IngestionConnectionACTIVE, PAUSED, ERROR, PENDING
SyncHistoryRUNNING, SUCCEEDED, FAILED, CANCELLED
FileImportJobUPLOADED, PREVIEWING, IMPORTING, COMPLETED, FAILED