MATIH Platform is in active MVP development. Documentation reflects current implementation status.
13. ML Service & MLOps
Streaming Features

Streaming Features

The streaming feature pipeline enables real-time feature computation using Flink SQL and Kafka integration. Streaming features are computed from event streams and written directly to the online store for immediate availability in inference.


Streaming Feature Declaration

from src.features.unified_feature_store import (
    StreamingFeatureDeclaration,
    StreamingAggregationDefinition,
    StreamingWindowDefinition,
)
 
streaming_feature = StreamingFeatureDeclaration(
    name="realtime_transaction_stats",
    entity_keys=["customer_id"],
    kafka_topic="transactions",
    aggregations=[
        StreamingAggregationDefinition(
            name="txn_count_1h",
            source_field="amount",
            aggregation="count",
            window=StreamingWindowDefinition(
                window_type="tumbling",
                size_ms=3600000,  # 1 hour
            ),
        ),
        StreamingAggregationDefinition(
            name="txn_sum_1h",
            source_field="amount",
            aggregation="sum",
            window=StreamingWindowDefinition(
                window_type="tumbling",
                size_ms=3600000,
            ),
        ),
    ],
    output_to_online_store=True,
    output_to_offline_store=False,
    ttl_seconds=86400,
)

Creating Streaming Pipelines

job_id = await store.create_streaming_feature(
    tenant_id="acme-corp",
    declaration=streaming_feature,
)

This generates a Flink SQL job and starts consuming from the specified Kafka topic.


Generated Flink SQL

The pipeline automatically generates Flink SQL DDL and queries:

CREATE TABLE kafka_source (
    customer_id STRING,
    event_timestamp TIMESTAMP(3),
    WATERMARK FOR event_timestamp AS event_timestamp - INTERVAL '5' SECOND
) WITH (
    'connector' = 'kafka',
    'topic' = 'transactions',
    'properties.bootstrap.servers' = 'kafka:9092',
    'format' = 'json'
);
 
SELECT
    customer_id,
    COUNT(amount) AS txn_count_1h,
    window_start, window_end
FROM TABLE(TUMBLE(event_timestamp, INTERVAL '3600000' MILLISECOND))
GROUP BY customer_id, window_start, window_end;

Window Types

TypeDescriptionParameters
tumblingFixed non-overlapping windowssize_ms
slidingOverlapping windowssize_ms, slide_ms
sessionGap-based windowsgap_ms

Processing Events

Events can also be processed individually through the API:

await store.process_streaming_event(
    tenant_id="acme-corp",
    feature_name="realtime_transaction_stats",
    event={"customer_id": "c123", "amount": 99.50, "event_timestamp": "2026-02-12T10:00:00"},
)

Source Files

FilePath
Streaming Feature Servicedata-plane/ml-service/src/features/streaming_feature_service.py
UnifiedFeatureStoredata-plane/ml-service/src/features/unified_feature_store.py