MATIH Platform is in active MVP development. Documentation reflects current implementation status.
11. Pipelines & Data Engineering
Stream Ingestion

Stream Ingestion

Stream ingestion pipelines consume real-time data from Apache Kafka topics and other event streams with exactly-once or at-least-once delivery guarantees. The Pipeline Service uses the KafkaExtractOperator for Kafka-based streaming and integrates with Apache Flink for continuous processing.


Supported Stream Sources

SourceConnectorDelivery Guarantee
Apache KafkaKafkaExtractOperatorExactly-once (with Flink checkpoints)
Kafka (via Airflow)KafkaExtractOperator (micro-batch)At-least-once
Strimzi KafkaNative Strimzi connectorExactly-once

Pipeline Definition

metadata:
  name: realtime-clickstream
  version: "1.0.0"
  owner: analytics-team
 
sources:
  clickstream:
    type: kafka
    connection: ${KAFKA_CONNECTION}
    topic: matih.web.clickstream
    consumer_group: pipeline-clickstream-v1
    format: json
    schema_registry: ${SCHEMA_REGISTRY_URL}
    start_offset: latest
 
transformations:
  - name: parse_events
    type: sql_transform
    input: clickstream
    sql: |
      SELECT
        JSON_VALUE(value, '$.user_id') AS user_id,
        JSON_VALUE(value, '$.event_type') AS event_type,
        JSON_VALUE(value, '$.page_url') AS page_url,
        event_timestamp
      FROM {{ input }}
 
sinks:
  analytics_lake:
    type: iceberg
    table: analytics.web.clickstream_events
    mode: append
    partition_by: [date]
 
orchestration:
  engine: flink
  checkpoint_interval: 60s
  parallelism: 4

KafkaExtractOperator

Source: data-plane/pipeline-service/src/matih_pipeline/operators/kafka_extract.py

Configuration Parameters

ParameterTypeRequiredDescription
topicstringYesKafka topic name
consumer_groupstringYesConsumer group ID
formatstringYesMessage format (json, avro, protobuf)
start_offsetstringNoStart offset (earliest, latest, timestamp)
schema_registrystringNoSchema Registry URL for Avro/Protobuf
max_recordsintNoMax records per micro-batch (Airflow mode)
checkpoint_intervalstringNoFlink checkpoint interval

Processing Modes

Continuous Streaming (Flink)

For sub-second latency requirements, the pipeline delegates to Flink. The Pipeline Service generates a Flink SQL job and submits it to the Flink SQL Gateway:

CREATE TEMPORARY TABLE kafka_source (
    value STRING,
    event_timestamp TIMESTAMP(3) METADATA FROM 'timestamp',
    WATERMARK FOR event_timestamp AS event_timestamp - INTERVAL '10' SECOND
) WITH (
    'connector' = 'kafka',
    'topic' = 'matih.web.clickstream',
    'properties.bootstrap.servers' = '...',
    'format' = 'json'
);

Micro-Batch (Airflow)

For less time-sensitive workloads, the operator runs as an Airflow task that polls Kafka in batches:

# Consume up to max_records per task execution
records = consumer.poll(timeout_ms=30000, max_records=10000)

Backpressure and Scaling

SettingDefaultDescription
parallelism4Number of Flink task slots
max_records10000Records per Airflow micro-batch
checkpoint_interval60sState checkpoint frequency
idle_timeout300sShutdown after idle period (Airflow mode)

Related Pages