MATIH Platform is in active MVP development. Documentation reflects current implementation status.
11. Pipelines & Data Engineering
Change Data Capture

Change Data Capture

Change Data Capture (CDC) pipelines capture row-level changes from source databases in real time using the PostgreSQL Write-Ahead Log (WAL). MATIH uses the Flink CDC connector for continuous streaming and Debezium for Kafka-based CDC patterns.


CDC Architecture

PostgreSQL (WAL) ──> Flink CDC Connector ──> Flink SQL Job ──> Iceberg Sink
                 ──> Debezium Connector  ──> Kafka Topic   ──> Consumer

Supported Source Databases

DatabaseConnectorReplication Method
PostgreSQL 10+postgres-cdcLogical replication (pgoutput)
MySQL 5.7+mysql-cdcbinlog
SQL Serversqlserver-cdcCT (change tracking)
MongoDBmongodb-cdcChange streams

Prerequisites

For PostgreSQL CDC, the following database configuration is required:

-- 1. Enable logical replication (requires restart)
ALTER SYSTEM SET wal_level = logical;
 
-- 2. Grant replication privilege to the service user
ALTER ROLE ai_service REPLICATION;
 
-- 3. The pgoutput plugin is built-in on PostgreSQL 10+
-- No additional plugin installation required

Pipeline Definition

metadata:
  name: fsm-state-cdc
  version: "1.0.0"
  owner: platform-team
 
sources:
  state_transitions:
    type: cdc
    connector: postgres-cdc
    connection: ${DATABASE_CONNECTION}
    database: ai_service
    schema: public
    table: fsm_state_transitions
    slot_name: flink_fsm_cdc
    decoding_plugin: pgoutput
 
sinks:
  iceberg_archive:
    type: iceberg
    table: matih_analytics.state_transition_log
    mode: append
 
orchestration:
  engine: flink
  checkpoint_interval: 30s
  parallelism: 2

Production CDC Job

The state-transition CDC job is defined in infrastructure/flink/jobs/state-transition-cdc.sql. It captures all FSM state transitions from PostgreSQL and archives them to Iceberg for long-term audit:

CREATE TEMPORARY TABLE fsm_transitions_cdc (
    id              STRING,
    entity_type     STRING NOT NULL,
    entity_id       STRING NOT NULL,
    tenant_id       STRING NOT NULL,
    from_state      STRING NOT NULL,
    to_state        STRING NOT NULL,
    triggered_by    STRING,
    trigger_reason  STRING,
    metadata        STRING,
    created_at      TIMESTAMP(3) NOT NULL,
    PRIMARY KEY (id) NOT ENFORCED
) WITH (
    'connector' = 'postgres-cdc',
    'hostname' = 'postgresql.matih-data-plane.svc.cluster.local',
    'port' = '5432',
    'database-name' = 'ai_service',
    'schema-name' = 'public',
    'table-name' = 'fsm_state_transitions',
    'slot.name' = 'flink_fsm_cdc'
);

Credential Injection

CDC job credentials are injected at deployment time using envsubst:

export DATABASE_USER=ai_service
export DATABASE_PASSWORD=$(kubectl get secret ...)
envsubst < state-transition-cdc.sql | curl -X POST sql-gateway/v1/statements

Monitoring

MetricDescription
cdc.lag_msReplication lag between source and sink
cdc.events_processedTotal CDC events processed
cdc.checkpoint_duration_msFlink checkpoint duration

Related Pages