MATIH Platform is in active MVP development. Documentation reflects current implementation status.
11. Pipelines & Data Engineering
Flink

Flink Streaming

Apache Flink provides real-time stream processing capabilities for the MATIH platform. Four production Flink SQL jobs process continuous data streams for CDC archival, session analytics, agent performance aggregation, and LLM operations monitoring. All jobs read from Kafka topics and write to Iceberg tables via the Polaris REST Catalog.


Flink Architecture

+------------------+     CDC Events      +------------------+
| PostgreSQL       | ------------------> | Flink CDC        |
| (WAL Replication)|                     | Connector        |
+------------------+                     +--------+---------+
                                                  |
+------------------+     Kafka Events    +--------v---------+
| Kafka Topics     | ------------------> | Flink SQL        |
| - state-changes  |                     | Engine           |
| - agent-traces   |                     |                  |
| - llm-ops        |                     +--------+---------+
+------------------+                              |
                                         +--------v---------+
                                         | Iceberg Tables   |
                                         | (via Polaris)    |
                                         +------------------+

Production Flink Jobs

MATIH runs four Flink SQL jobs in production:

JobSourceSinkWindowPurpose
State Transition CDCPostgreSQL (CDC)state_transition_logNone (streaming)Archive FSM transitions to Iceberg
Session AnalyticsKafka (matih.ai.state-changes)session_analytics15-min tumblingAggregate session metrics
Agent PerformanceKafka (matih.ai.agent-traces)agent_performance_metrics5-min tumblingAggregate agent performance
LLM OperationsKafka (matih.ai.llm-ops)llm_operations_metrics15-min tumblingAggregate LLM usage metrics

Job 1: State Transition CDC Archival

Source file: infrastructure/flink/jobs/state-transition-cdc.sql

This job captures all finite state machine (FSM) transitions from the AI Service's PostgreSQL database and archives them to an Iceberg table for long-term audit and analysis.

Source Configuration

CREATE TEMPORARY TABLE fsm_transitions_cdc (
    id              STRING,
    entity_type     STRING NOT NULL,
    entity_id       STRING NOT NULL,
    tenant_id       STRING NOT NULL,
    from_state      STRING NOT NULL,
    to_state        STRING NOT NULL,
    triggered_by    STRING,
    trigger_reason  STRING,
    metadata        STRING,
    created_at      TIMESTAMP(3) NOT NULL,
    PRIMARY KEY (id) NOT ENFORCED
) WITH (
    'connector' = 'postgres-cdc',
    'hostname' = 'postgresql.matih-data-plane.svc.cluster.local',
    'port' = '5432',
    'username' = '${DATABASE_USER}',
    'password' = '${DATABASE_PASSWORD}',
    'database-name' = 'ai_service',
    'schema-name' = 'public',
    'table-name' = 'fsm_state_transitions',
    'slot.name' = 'flink_fsm_cdc',
    'decoding.plugin.name' = 'pgoutput'
);

Sink Query

INSERT INTO polaris.matih_analytics.state_transition_log
SELECT
    id              AS transition_id,
    entity_type,
    entity_id,
    tenant_id,
    from_state,
    to_state,
    triggered_by,
    trigger_reason,
    metadata,
    created_at      AS event_timestamp
FROM fsm_transitions_cdc;

Prerequisites

RequirementConfiguration
PostgreSQL WAL levelALTER SYSTEM SET wal_level = logical; (requires restart)
Replication pluginpgoutput (built-in on PostgreSQL 10+)
User privilegeALTER ROLE ai_service REPLICATION;
Replication slotflink_fsm_cdc (created automatically by connector)

Credential Injection

Credentials are injected via environment variable substitution before submitting to the Flink SQL Gateway:

export DATABASE_USER=ai_service
export DATABASE_PASSWORD=xxx
envsubst < state-transition-cdc.sql | curl -X POST sql-gateway/v1/statements

Job 2: Session Analytics

Source file: infrastructure/flink/jobs/session-analytics.sql

This job aggregates session-level metrics from state change events in 15-minute tumbling windows.

Source Configuration

CREATE TEMPORARY TABLE state_changes_source (
    event_id        STRING,
    event_type      STRING,
    tenant_id       STRING NOT NULL,
    entity_type     STRING,
    entity_id       STRING,
    from_state      STRING,
    to_state        STRING,
    triggered_by    STRING,
    metadata        STRING,
    `timestamp`     TIMESTAMP(3),
    WATERMARK FOR `timestamp` AS `timestamp` - INTERVAL '30' SECOND
) WITH (
    'connector' = 'kafka',
    'topic' = 'matih.ai.state-changes',
    'properties.bootstrap.servers' = 'strimzi-kafka-kafka-bootstrap.matih-data-plane.svc.cluster.local:9093',
    'properties.group.id' = 'flink-session-analytics',
    'scan.startup.mode' = 'latest-offset',
    'format' = 'json'
);

Aggregation Query

INSERT INTO polaris.matih_analytics.session_analytics
SELECT
    TUMBLE_START(`timestamp`, INTERVAL '15' MINUTE) AS window_start,
    TUMBLE_END(`timestamp`, INTERVAL '15' MINUTE)   AS window_end,
    tenant_id,
    COUNT(DISTINCT entity_id)                         AS total_sessions,
    AVG(CAST(JSON_VALUE(metadata, '$.duration_ms') AS DOUBLE)) AS avg_duration_ms,
    AVG(CAST(JSON_VALUE(metadata, '$.turn_count') AS DOUBLE))  AS avg_turns,
    CAST(SUM(CASE WHEN to_state = 'completed' THEN 1 ELSE 0 END) AS DOUBLE)
        / GREATEST(COUNT(DISTINCT entity_id), 1)     AS completion_rate,
    COUNT(DISTINCT triggered_by)                      AS unique_users,
    TUMBLE_END(`timestamp`, INTERVAL '15' MINUTE)    AS event_timestamp
FROM state_changes_source
WHERE entity_type = 'session_phase'
GROUP BY
    TUMBLE(`timestamp`, INTERVAL '15' MINUTE),
    tenant_id;

Computed Metrics

MetricDescriptionDerivation
total_sessionsUnique sessions in windowCOUNT(DISTINCT entity_id)
avg_duration_msAverage session durationFrom metadata.duration_ms on completion events
avg_turnsAverage conversation turnsFrom metadata.turn_count on completion events
completion_ratePercentage of sessions reaching completionCompleted / total sessions
unique_usersDistinct users in windowCOUNT(DISTINCT triggered_by)

Job 3: Agent Performance Aggregation

Source file: infrastructure/flink/jobs/agent-performance-agg.sql

This job aggregates agent-level performance metrics from trace events in 5-minute tumbling windows.

Source Configuration

CREATE TEMPORARY TABLE agent_traces_source (
    event_id        STRING,
    event_type      STRING,
    tenant_id       STRING NOT NULL,
    trace_id        STRING,
    agent_id        STRING NOT NULL,
    action          STRING,
    session_id      STRING,
    latency_ms      INT,
    tokens_input    INT,
    tokens_output   INT,
    cost_usd        DOUBLE,
    tools_used      ARRAY<STRING>,
    error_message   STRING,
    `timestamp`     TIMESTAMP(3),
    WATERMARK FOR `timestamp` AS `timestamp` - INTERVAL '30' SECOND
) WITH (
    'connector' = 'kafka',
    'topic' = 'matih.ai.agent-traces',
    'properties.bootstrap.servers' = 'strimzi-kafka-kafka-bootstrap.matih-data-plane.svc.cluster.local:9093',
    'properties.group.id' = 'flink-agent-perf-agg',
    'scan.startup.mode' = 'latest-offset',
    'format' = 'json'
);

Aggregation Query

The query computes per-agent, per-tenant metrics:

INSERT INTO polaris.matih_analytics.agent_performance_metrics
SELECT
    TUMBLE_START(`timestamp`, INTERVAL '5' MINUTE)  AS window_start,
    TUMBLE_END(`timestamp`, INTERVAL '5' MINUTE)    AS window_end,
    tenant_id,
    agent_id,
    CAST(NULL AS STRING)                             AS agent_name,
    -- Success rate
    CAST(SUM(CASE WHEN action = 'completed' AND error_message IS NULL THEN 1 ELSE 0 END) AS DOUBLE)
        / GREATEST(COUNT(*), 1)                      AS success_rate,
    AVG(CAST(COALESCE(latency_ms, 0) AS DOUBLE))    AS avg_latency_ms,
    CAST(NULL AS BIGINT)                             AS p50_latency_ms,
    CAST(NULL AS BIGINT)                             AS p95_latency_ms,
    CAST(NULL AS BIGINT)                             AS p99_latency_ms,
    COUNT(*)                                          AS total_traces,
    SUM(CAST(COALESCE(tokens_input, 0) + COALESCE(tokens_output, 0) AS BIGINT)) AS total_tokens,
    SUM(COALESCE(cost_usd, 0.0))                     AS total_cost_usd,
    -- Error rate
    CAST(SUM(CASE WHEN error_message IS NOT NULL THEN 1 ELSE 0 END) AS DOUBLE)
        / GREATEST(COUNT(*), 1)                      AS error_rate,
    0.0                                               AS escalation_rate,
    TUMBLE_END(`timestamp`, INTERVAL '5' MINUTE)     AS event_timestamp
FROM agent_traces_source
WHERE action IN ('completed', 'failed')
GROUP BY
    TUMBLE(`timestamp`, INTERVAL '5' MINUTE),
    tenant_id,
    agent_id;

Computed Metrics

MetricDescription
success_rateCompleted without error / total traces
avg_latency_msAverage trace latency
total_tracesTotal completed + failed traces
total_tokensSum of input + output tokens
total_cost_usdSum of per-trace costs
error_rateTraces with errors / total traces

Note: Percentile latencies (p50/p95/p99) are computed as NULL in the streaming job because PERCENTILE_APPROX() requires a registered UDF. Downstream queries against the Iceberg table via Trino can compute these percentiles on demand.


Job 4: LLM Operations Analytics

Source file: infrastructure/flink/jobs/llm-operations-agg.sql

This job aggregates LLM provider usage metrics in 15-minute tumbling windows, grouped by tenant, provider, and model.

Aggregation Query

INSERT INTO polaris.matih_analytics.llm_operations_metrics
SELECT
    TUMBLE_START(`timestamp`, INTERVAL '15' MINUTE) AS window_start,
    TUMBLE_END(`timestamp`, INTERVAL '15' MINUTE)   AS window_end,
    tenant_id,
    COALESCE(provider, 'unknown')                    AS provider,
    COALESCE(model, 'unknown')                       AS model,
    -- Cache hit rate
    CAST(SUM(CASE WHEN cache_hit = TRUE THEN 1 ELSE 0 END) AS DOUBLE)
        / GREATEST(COUNT(*), 1)                      AS cache_hit_rate,
    AVG(CAST(COALESCE(tokens_input, 0) AS DOUBLE))  AS avg_tokens_input,
    AVG(CAST(COALESCE(tokens_output, 0) AS DOUBLE)) AS avg_tokens_output,
    COUNT(*)                                          AS total_requests,
    SUM(COALESCE(cost_usd, 0.0))                     AS total_cost_usd,
    AVG(CAST(COALESCE(latency_ms, 0) AS DOUBLE))    AS avg_latency_ms,
    -- Error rate
    CAST(SUM(CASE WHEN success = FALSE THEN 1 ELSE 0 END) AS DOUBLE)
        / GREATEST(COUNT(*), 1)                      AS error_rate,
    TUMBLE_END(`timestamp`, INTERVAL '15' MINUTE)    AS event_timestamp
FROM llm_ops_source
GROUP BY
    TUMBLE(`timestamp`, INTERVAL '15' MINUTE),
    tenant_id,
    provider,
    model;

Computed Metrics

MetricDescription
cache_hit_ratePercentage of requests served from LLM cache
avg_tokens_inputAverage input tokens per request
avg_tokens_outputAverage output tokens per request
total_requestsTotal LLM API calls in window
total_cost_usdTotal cost in window
avg_latency_msAverage request latency
error_rateFailed requests / total requests

Flink Job Management

The Pipeline Service provides management APIs for Flink jobs:

Flink Job Service

class FlinkJobService:
    """Manages Flink SQL jobs."""
 
    async def submit_sql_job(self, sql: str, job_name: str) -> FlinkJob:
        """Submit a Flink SQL job to the SQL Gateway."""
 
    async def get_job_status(self, job_id: str) -> FlinkJobStatus:
        """Get the status of a running Flink job."""
 
    async def cancel_job(self, job_id: str) -> bool:
        """Cancel a running Flink job."""
 
    async def create_savepoint(self, job_id: str, path: str) -> str:
        """Create a savepoint for a Flink job."""
 
    async def resume_from_savepoint(self, job_id: str, savepoint_path: str) -> FlinkJob:
        """Resume a job from a savepoint."""

Flink Savepoint Service

The FlinkSavepointService manages savepoints for job upgrades and migrations:

class FlinkSavepointService:
    """Manages Flink savepoints for graceful upgrades."""
 
    async def trigger_savepoint(self, job_id: str) -> Savepoint:
        # 1. Trigger savepoint on running job
        # 2. Wait for savepoint completion
        # 3. Store savepoint path in metadata
 
    async def restore_from_savepoint(self, job_id: str, savepoint: Savepoint) -> FlinkJob:
        # 1. Cancel current job (if running)
        # 2. Submit new job with savepoint path
        # 3. Verify job is consuming from savepoint offset

Kafka Integration

Topic Configuration

TopicProducersPartition CountRetention
matih.ai.state-changesAI Service127 days
matih.ai.agent-tracesAI Service127 days
matih.ai.llm-opsAI Service127 days

Consumer Group Configuration

Consumer GroupJobStartup Mode
flink-session-analyticsSession Analyticslatest-offset
flink-agent-perf-aggAgent Performancelatest-offset
flink-llm-ops-aggLLM Operationslatest-offset

Watermark Strategy

All Kafka-sourced jobs use a 30-second watermark delay:

WATERMARK FOR `timestamp` AS `timestamp` - INTERVAL '30' SECOND

This allows for 30 seconds of late event arrival before the window closes. Events arriving after the watermark are dropped.


Iceberg Sink Configuration

All Flink jobs write to Iceberg tables in the polaris.matih_analytics namespace. Tables are pre-created via DDL scripts before the Flink jobs start.

Sink Tables

TablePartitionCompaction
state_transition_logevent_timestamp (daily)Automatic by Iceberg
session_analyticswindow_start (hourly)Automatic by Iceberg
agent_performance_metricswindow_start (hourly)Automatic by Iceberg
llm_operations_metricswindow_start (hourly)Automatic by Iceberg

Dev vs Production Configuration

AspectDevelopmentProduction
Kafka port9092 (plaintext)9093 (TLS)
Parallelism14-8
Checkpoint interval60s10s
Savepoint on cancelDisabledEnabled
State backendHeapRocksDB

Related Sections

  • Pipeline Service -- Flink job management via Pipeline Service
  • Spark -- Complementary batch processing engine
  • Temporal -- Workflow orchestration for Flink job lifecycle
  • API Reference -- Flink management endpoints