Streaming Features
The streaming feature pipeline enables real-time feature computation using Flink SQL and Kafka integration. Streaming features are computed from event streams and written directly to the online store for immediate availability in inference.
Streaming Feature Declaration
from src.features.unified_feature_store import (
StreamingFeatureDeclaration,
StreamingAggregationDefinition,
StreamingWindowDefinition,
)
streaming_feature = StreamingFeatureDeclaration(
name="realtime_transaction_stats",
entity_keys=["customer_id"],
kafka_topic="transactions",
aggregations=[
StreamingAggregationDefinition(
name="txn_count_1h",
source_field="amount",
aggregation="count",
window=StreamingWindowDefinition(
window_type="tumbling",
size_ms=3600000, # 1 hour
),
),
StreamingAggregationDefinition(
name="txn_sum_1h",
source_field="amount",
aggregation="sum",
window=StreamingWindowDefinition(
window_type="tumbling",
size_ms=3600000,
),
),
],
output_to_online_store=True,
output_to_offline_store=False,
ttl_seconds=86400,
)Creating Streaming Pipelines
job_id = await store.create_streaming_feature(
tenant_id="acme-corp",
declaration=streaming_feature,
)This generates a Flink SQL job and starts consuming from the specified Kafka topic.
Generated Flink SQL
The pipeline automatically generates Flink SQL DDL and queries:
CREATE TABLE kafka_source (
customer_id STRING,
event_timestamp TIMESTAMP(3),
WATERMARK FOR event_timestamp AS event_timestamp - INTERVAL '5' SECOND
) WITH (
'connector' = 'kafka',
'topic' = 'transactions',
'properties.bootstrap.servers' = 'kafka:9092',
'format' = 'json'
);
SELECT
customer_id,
COUNT(amount) AS txn_count_1h,
window_start, window_end
FROM TABLE(TUMBLE(event_timestamp, INTERVAL '3600000' MILLISECOND))
GROUP BY customer_id, window_start, window_end;Window Types
| Type | Description | Parameters |
|---|---|---|
tumbling | Fixed non-overlapping windows | size_ms |
sliding | Overlapping windows | size_ms, slide_ms |
session | Gap-based windows | gap_ms |
Processing Events
Events can also be processed individually through the API:
await store.process_streaming_event(
tenant_id="acme-corp",
feature_name="realtime_transaction_stats",
event={"customer_id": "c123", "amount": 99.50, "event_timestamp": "2026-02-12T10:00:00"},
)Source Files
| File | Path |
|---|---|
| Streaming Feature Service | data-plane/ml-service/src/features/streaming_feature_service.py |
| UnifiedFeatureStore | data-plane/ml-service/src/features/unified_feature_store.py |