Batch Ingestion
Batch ingestion pipelines extract data in scheduled bulk operations from relational databases, data warehouses, and flat files. The Pipeline Service uses the DatabaseExtractOperator for JDBC-based extraction with support for full and incremental load strategies.
Extraction Modes
| Mode | Description | Use Case |
|---|---|---|
full | Extracts all rows from the source table | Initial loads, small reference tables |
incremental | Extracts rows modified since the last watermark | Large transactional tables with update timestamps |
partition | Extracts specific date or key partitions | Partitioned fact tables, daily aggregates |
snapshot | Captures a point-in-time snapshot with hash comparison | Slowly changing dimensions |
Pipeline Definition
metadata:
name: daily-sales-batch
version: "1.0.0"
owner: data-engineering
sources:
sales_db:
type: jdbc
connection: ${SALES_DB_CONNECTION}
table: raw_transactions
mode: incremental
watermark_column: updated_at
fetch_size: 10000
partition_column: region_id
num_partitions: 8
sinks:
data_lake:
type: iceberg
table: analytics.sales.transactions
mode: append
partition_by: [date, region_id]
orchestration:
engine: airflow
schedule: "0 6 * * *"
retry_policy:
max_attempts: 3
initial_interval: 60sDatabaseExtractOperator
The operator is located at data-plane/pipeline-service/src/matih_pipeline/operators/database_extract.py.
Configuration Parameters
| Parameter | Type | Required | Description |
|---|---|---|---|
connection | string | Yes | Connection reference name |
table | string | Yes | Source table or view name |
mode | string | Yes | Extraction mode (full, incremental, partition, snapshot) |
watermark_column | string | No | Column for incremental tracking |
fetch_size | int | No | JDBC fetch size (default: 5000) |
partition_column | string | No | Column for parallel partition reads |
num_partitions | int | No | Number of parallel partitions (default: 4) |
query_override | string | No | Custom SQL query instead of table scan |
Incremental Watermark Tracking
The Pipeline Service stores watermark state in PostgreSQL per pipeline and source:
GET /v1/pipelines/:pipelineId/watermarks/:sourceName
Response:
{
"pipelineId": "pipeline-456",
"sourceName": "sales_db",
"lastWatermark": "2026-02-11T23:59:59Z",
"rowsProcessed": 125000,
"lastRunAt": "2026-02-12T06:00:00Z"
}On each incremental run, the operator generates a WHERE clause:
SELECT * FROM raw_transactions
WHERE updated_at > '2026-02-11T23:59:59Z'
ORDER BY updated_atParallel Extraction
When partition_column and num_partitions are set, the operator splits the extraction into parallel tasks. Each partition reads a distinct range of the partition column using boundary queries:
-- Partition 1 of 8
SELECT * FROM raw_transactions
WHERE region_id >= 1 AND region_id < 100
-- Partition 2 of 8
SELECT * FROM raw_transactions
WHERE region_id >= 100 AND region_id < 200Observability
| Metric | Type | Description |
|---|---|---|
pipeline.batch.rows_extracted | Counter | Total rows extracted per run |
pipeline.batch.duration_seconds | Histogram | Extraction duration |
pipeline.batch.bytes_transferred | Counter | Data volume transferred |
Related Pages
- Pipeline Service -- Architecture overview
- Stream Ingestion -- Real-time alternative
- Database Replication -- Full replication patterns