Change Data Capture
Change Data Capture (CDC) pipelines capture row-level changes from source databases in real time using the PostgreSQL Write-Ahead Log (WAL). MATIH uses the Flink CDC connector for continuous streaming and Debezium for Kafka-based CDC patterns.
CDC Architecture
PostgreSQL (WAL) ──> Flink CDC Connector ──> Flink SQL Job ──> Iceberg Sink
──> Debezium Connector ──> Kafka Topic ──> ConsumerSupported Source Databases
| Database | Connector | Replication Method |
|---|---|---|
| PostgreSQL 10+ | postgres-cdc | Logical replication (pgoutput) |
| MySQL 5.7+ | mysql-cdc | binlog |
| SQL Server | sqlserver-cdc | CT (change tracking) |
| MongoDB | mongodb-cdc | Change streams |
Prerequisites
For PostgreSQL CDC, the following database configuration is required:
-- 1. Enable logical replication (requires restart)
ALTER SYSTEM SET wal_level = logical;
-- 2. Grant replication privilege to the service user
ALTER ROLE ai_service REPLICATION;
-- 3. The pgoutput plugin is built-in on PostgreSQL 10+
-- No additional plugin installation requiredPipeline Definition
metadata:
name: fsm-state-cdc
version: "1.0.0"
owner: platform-team
sources:
state_transitions:
type: cdc
connector: postgres-cdc
connection: ${DATABASE_CONNECTION}
database: ai_service
schema: public
table: fsm_state_transitions
slot_name: flink_fsm_cdc
decoding_plugin: pgoutput
sinks:
iceberg_archive:
type: iceberg
table: matih_analytics.state_transition_log
mode: append
orchestration:
engine: flink
checkpoint_interval: 30s
parallelism: 2Production CDC Job
The state-transition CDC job is defined in infrastructure/flink/jobs/state-transition-cdc.sql. It captures all FSM state transitions from PostgreSQL and archives them to Iceberg for long-term audit:
CREATE TEMPORARY TABLE fsm_transitions_cdc (
id STRING,
entity_type STRING NOT NULL,
entity_id STRING NOT NULL,
tenant_id STRING NOT NULL,
from_state STRING NOT NULL,
to_state STRING NOT NULL,
triggered_by STRING,
trigger_reason STRING,
metadata STRING,
created_at TIMESTAMP(3) NOT NULL,
PRIMARY KEY (id) NOT ENFORCED
) WITH (
'connector' = 'postgres-cdc',
'hostname' = 'postgresql.matih-data-plane.svc.cluster.local',
'port' = '5432',
'database-name' = 'ai_service',
'schema-name' = 'public',
'table-name' = 'fsm_state_transitions',
'slot.name' = 'flink_fsm_cdc'
);Credential Injection
CDC job credentials are injected at deployment time using envsubst:
export DATABASE_USER=ai_service
export DATABASE_PASSWORD=$(kubectl get secret ...)
envsubst < state-transition-cdc.sql | curl -X POST sql-gateway/v1/statementsMonitoring
| Metric | Description |
|---|---|
cdc.lag_ms | Replication lag between source and sink |
cdc.events_processed | Total CDC events processed |
cdc.checkpoint_duration_ms | Flink checkpoint duration |
Related Pages
- Database Replication -- Full replication patterns
- State Transition CDC -- Production Flink CDC job details
- Stream Ingestion -- Kafka-based streaming