MATIH Platform is in active MVP development. Documentation reflects current implementation status.
11. Pipelines & Data Engineering
Batch Ingestion

Batch Ingestion

Batch ingestion pipelines extract data in scheduled bulk operations from relational databases, data warehouses, and flat files. The Pipeline Service uses the DatabaseExtractOperator for JDBC-based extraction with support for full and incremental load strategies.


Extraction Modes

ModeDescriptionUse Case
fullExtracts all rows from the source tableInitial loads, small reference tables
incrementalExtracts rows modified since the last watermarkLarge transactional tables with update timestamps
partitionExtracts specific date or key partitionsPartitioned fact tables, daily aggregates
snapshotCaptures a point-in-time snapshot with hash comparisonSlowly changing dimensions

Pipeline Definition

metadata:
  name: daily-sales-batch
  version: "1.0.0"
  owner: data-engineering
 
sources:
  sales_db:
    type: jdbc
    connection: ${SALES_DB_CONNECTION}
    table: raw_transactions
    mode: incremental
    watermark_column: updated_at
    fetch_size: 10000
    partition_column: region_id
    num_partitions: 8
 
sinks:
  data_lake:
    type: iceberg
    table: analytics.sales.transactions
    mode: append
    partition_by: [date, region_id]
 
orchestration:
  engine: airflow
  schedule: "0 6 * * *"
  retry_policy:
    max_attempts: 3
    initial_interval: 60s

DatabaseExtractOperator

The operator is located at data-plane/pipeline-service/src/matih_pipeline/operators/database_extract.py.

Configuration Parameters

ParameterTypeRequiredDescription
connectionstringYesConnection reference name
tablestringYesSource table or view name
modestringYesExtraction mode (full, incremental, partition, snapshot)
watermark_columnstringNoColumn for incremental tracking
fetch_sizeintNoJDBC fetch size (default: 5000)
partition_columnstringNoColumn for parallel partition reads
num_partitionsintNoNumber of parallel partitions (default: 4)
query_overridestringNoCustom SQL query instead of table scan

Incremental Watermark Tracking

The Pipeline Service stores watermark state in PostgreSQL per pipeline and source:

GET /v1/pipelines/:pipelineId/watermarks/:sourceName

Response:
{
  "pipelineId": "pipeline-456",
  "sourceName": "sales_db",
  "lastWatermark": "2026-02-11T23:59:59Z",
  "rowsProcessed": 125000,
  "lastRunAt": "2026-02-12T06:00:00Z"
}

On each incremental run, the operator generates a WHERE clause:

SELECT * FROM raw_transactions
WHERE updated_at > '2026-02-11T23:59:59Z'
ORDER BY updated_at

Parallel Extraction

When partition_column and num_partitions are set, the operator splits the extraction into parallel tasks. Each partition reads a distinct range of the partition column using boundary queries:

-- Partition 1 of 8
SELECT * FROM raw_transactions
WHERE region_id >= 1 AND region_id < 100
 
-- Partition 2 of 8
SELECT * FROM raw_transactions
WHERE region_id >= 100 AND region_id < 200

Observability

MetricTypeDescription
pipeline.batch.rows_extractedCounterTotal rows extracted per run
pipeline.batch.duration_secondsHistogramExtraction duration
pipeline.batch.bytes_transferredCounterData volume transferred

Related Pages