MATIH Platform is in active MVP development. Documentation reflects current implementation status.
11. Pipelines & Data Engineering
Event Sourcing

Event Sourcing

Event sourcing pipelines capture domain events as an immutable log and derive materialized views from the event stream. MATIH uses Kafka as the event store and supports event-driven pipeline triggers, state reconstruction, and event replay for reprocessing.


Event Sourcing Architecture

Producers ──> Kafka Event Log ──> Flink (stream) ──> Materialized Views
                    |
                    └──> Iceberg (archive) ──> Historical Queries

Core Concepts

ConceptDescription
EventImmutable fact that something happened (e.g., OrderPlaced)
Event storeKafka topic with log compaction disabled (full history)
ProjectionMaterialized view derived from events
SnapshotPeriodic state checkpoint for faster rebuilds
ReplayReprocessing events from a given offset

Pipeline Definition

metadata:
  name: order-events-pipeline
  version: "1.0.0"
  owner: domain-team
 
sources:
  order_events:
    type: kafka
    connection: ${KAFKA_CONNECTION}
    topic: domain.orders.events
    consumer_group: order-projection-v1
    format: json
    start_offset: earliest
    event_types:
      - OrderPlaced
      - OrderConfirmed
      - OrderShipped
      - OrderDelivered
      - OrderCancelled
 
transformations:
  - name: build_order_state
    type: event_fold
    input: order_events
    key_field: order_id
    state_type: order_aggregate
    fold_function: |
      def fold(state, event):
          if event.type == "OrderPlaced":
              return {"status": "placed", "amount": event.amount}
          elif event.type == "OrderCancelled":
              return {**state, "status": "cancelled"}
          return {**state, "status": event.type.lower().replace("order", "")}
 
sinks:
  order_view:
    type: iceberg
    table: domain.orders.current_state
    mode: merge_on_key
    merge_keys: [order_id]
 
orchestration:
  engine: flink
  checkpoint_interval: 30s

Event Schema

Events follow a standard envelope format:

{
  "event_id": "evt-abc-123",
  "event_type": "OrderPlaced",
  "entity_type": "Order",
  "entity_id": "order-456",
  "tenant_id": "tenant-789",
  "timestamp": "2026-02-12T10:30:00Z",
  "version": 1,
  "data": {
    "customer_id": "cust-100",
    "amount": 99.99,
    "currency": "USD"
  },
  "metadata": {
    "source": "order-service",
    "correlation_id": "req-xyz"
  }
}

Event Replay

The Pipeline Service supports event replay for reprocessing historical data:

POST /v1/pipelines/:pipelineId/replay

Request:
{
  "from_offset": "earliest",
  "to_timestamp": "2026-02-10T00:00:00Z",
  "target_consumer_group": "order-projection-replay-v1",
  "parallel_consumers": 4
}

Snapshot Management

Periodic snapshots reduce replay time for state reconstruction:

SettingDefaultDescription
snapshot_interval1 hourFrequency of state snapshots
snapshot_storeIcebergStorage backend for snapshots
max_snapshots24Number of snapshots to retain

MATIH Platform Events

The MATIH platform itself uses event sourcing for several subsystems:

TopicEntityPurpose
matih.ai.state-changesSession, AgentFSM state transition log
matih.ai.agent-tracesAgentAgent execution traces
matih.ai.llm-opsLLM CallLLM operation tracking

Related Pages