Stream Ingestion
Stream ingestion pipelines consume real-time data from Apache Kafka topics and other event streams with exactly-once or at-least-once delivery guarantees. The Pipeline Service uses the KafkaExtractOperator for Kafka-based streaming and integrates with Apache Flink for continuous processing.
Supported Stream Sources
| Source | Connector | Delivery Guarantee |
|---|---|---|
| Apache Kafka | KafkaExtractOperator | Exactly-once (with Flink checkpoints) |
| Kafka (via Airflow) | KafkaExtractOperator (micro-batch) | At-least-once |
| Strimzi Kafka | Native Strimzi connector | Exactly-once |
Pipeline Definition
metadata:
name: realtime-clickstream
version: "1.0.0"
owner: analytics-team
sources:
clickstream:
type: kafka
connection: ${KAFKA_CONNECTION}
topic: matih.web.clickstream
consumer_group: pipeline-clickstream-v1
format: json
schema_registry: ${SCHEMA_REGISTRY_URL}
start_offset: latest
transformations:
- name: parse_events
type: sql_transform
input: clickstream
sql: |
SELECT
JSON_VALUE(value, '$.user_id') AS user_id,
JSON_VALUE(value, '$.event_type') AS event_type,
JSON_VALUE(value, '$.page_url') AS page_url,
event_timestamp
FROM {{ input }}
sinks:
analytics_lake:
type: iceberg
table: analytics.web.clickstream_events
mode: append
partition_by: [date]
orchestration:
engine: flink
checkpoint_interval: 60s
parallelism: 4KafkaExtractOperator
Source: data-plane/pipeline-service/src/matih_pipeline/operators/kafka_extract.py
Configuration Parameters
| Parameter | Type | Required | Description |
|---|---|---|---|
topic | string | Yes | Kafka topic name |
consumer_group | string | Yes | Consumer group ID |
format | string | Yes | Message format (json, avro, protobuf) |
start_offset | string | No | Start offset (earliest, latest, timestamp) |
schema_registry | string | No | Schema Registry URL for Avro/Protobuf |
max_records | int | No | Max records per micro-batch (Airflow mode) |
checkpoint_interval | string | No | Flink checkpoint interval |
Processing Modes
Continuous Streaming (Flink)
For sub-second latency requirements, the pipeline delegates to Flink. The Pipeline Service generates a Flink SQL job and submits it to the Flink SQL Gateway:
CREATE TEMPORARY TABLE kafka_source (
value STRING,
event_timestamp TIMESTAMP(3) METADATA FROM 'timestamp',
WATERMARK FOR event_timestamp AS event_timestamp - INTERVAL '10' SECOND
) WITH (
'connector' = 'kafka',
'topic' = 'matih.web.clickstream',
'properties.bootstrap.servers' = '...',
'format' = 'json'
);Micro-Batch (Airflow)
For less time-sensitive workloads, the operator runs as an Airflow task that polls Kafka in batches:
# Consume up to max_records per task execution
records = consumer.poll(timeout_ms=30000, max_records=10000)Backpressure and Scaling
| Setting | Default | Description |
|---|---|---|
parallelism | 4 | Number of Flink task slots |
max_records | 10000 | Records per Airflow micro-batch |
checkpoint_interval | 60s | State checkpoint frequency |
idle_timeout | 300s | Shutdown after idle period (Airflow mode) |
Related Pages
- Batch Ingestion -- Scheduled alternative
- Event Sourcing -- Event-driven patterns
- Flink Streaming Jobs -- Production Flink SQL jobs