Event Sourcing
Event sourcing pipelines capture domain events as an immutable log and derive materialized views from the event stream. MATIH uses Kafka as the event store and supports event-driven pipeline triggers, state reconstruction, and event replay for reprocessing.
Event Sourcing Architecture
Producers ──> Kafka Event Log ──> Flink (stream) ──> Materialized Views
|
└──> Iceberg (archive) ──> Historical QueriesCore Concepts
| Concept | Description |
|---|---|
| Event | Immutable fact that something happened (e.g., OrderPlaced) |
| Event store | Kafka topic with log compaction disabled (full history) |
| Projection | Materialized view derived from events |
| Snapshot | Periodic state checkpoint for faster rebuilds |
| Replay | Reprocessing events from a given offset |
Pipeline Definition
metadata:
name: order-events-pipeline
version: "1.0.0"
owner: domain-team
sources:
order_events:
type: kafka
connection: ${KAFKA_CONNECTION}
topic: domain.orders.events
consumer_group: order-projection-v1
format: json
start_offset: earliest
event_types:
- OrderPlaced
- OrderConfirmed
- OrderShipped
- OrderDelivered
- OrderCancelled
transformations:
- name: build_order_state
type: event_fold
input: order_events
key_field: order_id
state_type: order_aggregate
fold_function: |
def fold(state, event):
if event.type == "OrderPlaced":
return {"status": "placed", "amount": event.amount}
elif event.type == "OrderCancelled":
return {**state, "status": "cancelled"}
return {**state, "status": event.type.lower().replace("order", "")}
sinks:
order_view:
type: iceberg
table: domain.orders.current_state
mode: merge_on_key
merge_keys: [order_id]
orchestration:
engine: flink
checkpoint_interval: 30sEvent Schema
Events follow a standard envelope format:
{
"event_id": "evt-abc-123",
"event_type": "OrderPlaced",
"entity_type": "Order",
"entity_id": "order-456",
"tenant_id": "tenant-789",
"timestamp": "2026-02-12T10:30:00Z",
"version": 1,
"data": {
"customer_id": "cust-100",
"amount": 99.99,
"currency": "USD"
},
"metadata": {
"source": "order-service",
"correlation_id": "req-xyz"
}
}Event Replay
The Pipeline Service supports event replay for reprocessing historical data:
POST /v1/pipelines/:pipelineId/replay
Request:
{
"from_offset": "earliest",
"to_timestamp": "2026-02-10T00:00:00Z",
"target_consumer_group": "order-projection-replay-v1",
"parallel_consumers": 4
}Snapshot Management
Periodic snapshots reduce replay time for state reconstruction:
| Setting | Default | Description |
|---|---|---|
snapshot_interval | 1 hour | Frequency of state snapshots |
snapshot_store | Iceberg | Storage backend for snapshots |
max_snapshots | 24 | Number of snapshots to retain |
MATIH Platform Events
The MATIH platform itself uses event sourcing for several subsystems:
| Topic | Entity | Purpose |
|---|---|---|
matih.ai.state-changes | Session, Agent | FSM state transition log |
matih.ai.agent-traces | Agent | Agent execution traces |
matih.ai.llm-ops | LLM Call | LLM operation tracking |
Related Pages
- Stream Ingestion -- Real-time Kafka consumption
- Change Data Capture -- Database change capture
- Flink Streaming Jobs -- Production streaming jobs