State Transition CDC
The State Transition CDC Flink job captures all FSM (Finite State Machine) state transitions from the PostgreSQL fsm_state_transitions table and archives them to an Iceberg table for long-term audit and analytics. Unlike the windowed aggregation jobs, this is a continuous CDC stream with no aggregation.
Source: infrastructure/flink/jobs/state-transition-cdc.sql
Job Configuration
| Property | Value |
|---|---|
| Source | PostgreSQL CDC (postgres-cdc connector) |
| Source table | public.fsm_state_transitions in ai_service database |
| Replication slot | flink_fsm_cdc |
| Decoding plugin | pgoutput |
| Sink table | polaris.matih_analytics.state_transition_log |
| Processing mode | Continuous (no windowing) |
Prerequisites
The following PostgreSQL configuration is required before running this job:
| Requirement | Command | Notes |
|---|---|---|
| Logical WAL level | ALTER SYSTEM SET wal_level = logical; | Requires PostgreSQL restart |
| Replication privilege | ALTER ROLE ai_service REPLICATION; | Grant to the CDC user |
| pgoutput plugin | Built-in on PostgreSQL 10+ | No installation needed |
The replication slot flink_fsm_cdc is created automatically by the Flink CDC connector on first run.
Source Schema
| Column | Type | Constraint | Description |
|---|---|---|---|
id | STRING | PRIMARY KEY | Transition record ID |
entity_type | STRING | NOT NULL | Entity type (session, agent, pipeline) |
entity_id | STRING | NOT NULL | Entity identifier |
tenant_id | STRING | NOT NULL | Tenant identifier |
from_state | STRING | NOT NULL | Previous state |
to_state | STRING | NOT NULL | New state |
triggered_by | STRING | -- | User or system trigger |
trigger_reason | STRING | -- | Reason for transition |
metadata | STRING | -- | JSON metadata |
created_at | TIMESTAMP(3) | NOT NULL | Transition timestamp |
Output Mapping
The CDC job performs a direct column mapping with no aggregation:
INSERT INTO polaris.matih_analytics.state_transition_log
SELECT
id AS transition_id,
entity_type,
entity_id,
tenant_id,
from_state,
to_state,
triggered_by,
trigger_reason,
metadata,
created_at AS event_timestamp
FROM fsm_transitions_cdc;Credential Injection
Database credentials must be injected before submitting the job to the Flink SQL Gateway. Three methods are supported:
| Method | Description |
|---|---|
envsubst | Replace ${DATABASE_USER} and ${DATABASE_PASSWORD} placeholders |
| ConfigMap + init-container | Use Kubernetes init-container with envsubst |
| SQL Gateway session SET | Set parameters in the Flink SQL Gateway session |
envsubst Example
export DATABASE_USER=ai_service
export DATABASE_PASSWORD=$(kubectl get secret db-credentials \
-n matih-data-plane -o jsonpath='{.data.password}' | base64 -d)
envsubst < state-transition-cdc.sql | \
curl -X POST http://flink-sql-gateway:8083/v1/statementsAudit Use Cases
The archived state transition log supports:
| Use Case | Query Pattern |
|---|---|
| Session audit trail | Filter by entity_type = 'session' and entity_id |
| Agent state history | Filter by entity_type = 'agent' |
| Compliance reporting | Full scan with date range on event_timestamp |
| State duration analysis | Compute time between consecutive transitions |
Related Pages
- Change Data Capture -- CDC pipeline patterns
- Session Analytics -- Aggregated session metrics
- Flink Overview -- Flink architecture