MATIH Platform is in active MVP development. Documentation reflects current implementation status.
11. Pipelines & Data Engineering
Flink Streaming Jobs
State Transition CDC

State Transition CDC

The State Transition CDC Flink job captures all FSM (Finite State Machine) state transitions from the PostgreSQL fsm_state_transitions table and archives them to an Iceberg table for long-term audit and analytics. Unlike the windowed aggregation jobs, this is a continuous CDC stream with no aggregation.

Source: infrastructure/flink/jobs/state-transition-cdc.sql


Job Configuration

PropertyValue
SourcePostgreSQL CDC (postgres-cdc connector)
Source tablepublic.fsm_state_transitions in ai_service database
Replication slotflink_fsm_cdc
Decoding pluginpgoutput
Sink tablepolaris.matih_analytics.state_transition_log
Processing modeContinuous (no windowing)

Prerequisites

The following PostgreSQL configuration is required before running this job:

RequirementCommandNotes
Logical WAL levelALTER SYSTEM SET wal_level = logical;Requires PostgreSQL restart
Replication privilegeALTER ROLE ai_service REPLICATION;Grant to the CDC user
pgoutput pluginBuilt-in on PostgreSQL 10+No installation needed

The replication slot flink_fsm_cdc is created automatically by the Flink CDC connector on first run.


Source Schema

ColumnTypeConstraintDescription
idSTRINGPRIMARY KEYTransition record ID
entity_typeSTRINGNOT NULLEntity type (session, agent, pipeline)
entity_idSTRINGNOT NULLEntity identifier
tenant_idSTRINGNOT NULLTenant identifier
from_stateSTRINGNOT NULLPrevious state
to_stateSTRINGNOT NULLNew state
triggered_bySTRING--User or system trigger
trigger_reasonSTRING--Reason for transition
metadataSTRING--JSON metadata
created_atTIMESTAMP(3)NOT NULLTransition timestamp

Output Mapping

The CDC job performs a direct column mapping with no aggregation:

INSERT INTO polaris.matih_analytics.state_transition_log
SELECT
    id              AS transition_id,
    entity_type,
    entity_id,
    tenant_id,
    from_state,
    to_state,
    triggered_by,
    trigger_reason,
    metadata,
    created_at      AS event_timestamp
FROM fsm_transitions_cdc;

Credential Injection

Database credentials must be injected before submitting the job to the Flink SQL Gateway. Three methods are supported:

MethodDescription
envsubstReplace ${DATABASE_USER} and ${DATABASE_PASSWORD} placeholders
ConfigMap + init-containerUse Kubernetes init-container with envsubst
SQL Gateway session SETSet parameters in the Flink SQL Gateway session

envsubst Example

export DATABASE_USER=ai_service
export DATABASE_PASSWORD=$(kubectl get secret db-credentials \
    -n matih-data-plane -o jsonpath='{.data.password}' | base64 -d)
envsubst < state-transition-cdc.sql | \
    curl -X POST http://flink-sql-gateway:8083/v1/statements

Audit Use Cases

The archived state transition log supports:

Use CaseQuery Pattern
Session audit trailFilter by entity_type = 'session' and entity_id
Agent state historyFilter by entity_type = 'agent'
Compliance reportingFull scan with date range on event_timestamp
State duration analysisCompute time between consecutive transitions

Related Pages