MATIH Platform is in active MVP development. Documentation reflects current implementation status.
14. Context Graph & Ontology
Integration
Kafka Producers & Consumers

Kafka Producers and Consumers

The Context Graph uses Kafka for streaming agent thinking events and decision events between services. The ContextGraphKafkaConsumer subscribes to event topics and persists them to Dgraph, while the producer streams events from the capture service for downstream analytics.


Overview

Kafka integration enables decoupled, asynchronous processing of context graph events. Thinking traces can be captured at the orchestrator level and consumed by multiple downstream services without blocking the agent execution path.

Source: data-plane/ai-service/src/context_graph/integration/kafka_consumer.py


Topics

TopicDescriptionProducerConsumer
agent-thinking-eventsAgent thinking trace lifecycle eventsOrchestrator HooksContext Graph Consumer
agent-decision-eventsDecision capture eventsAgent Decision CaptureContext Graph Consumer
agent-trace-eventsTrace start/complete eventsAgent Trace ServiceContext Graph Consumer

Consumer Configuration

The consumer uses manual commit mode for at-least-once delivery:

consumer = ContextGraphKafkaConsumer(
    dgraph_store=store,
    config=kafka_config,
    group_id="context-graph-dgraph-consumer",
)
 
await consumer.start()
ParameterDescriptionDefault
group_idKafka consumer groupcontext-graph-dgraph-consumer
bootstrap_serversKafka broker addressesFrom KAFKA_BOOTSTRAP_SERVERS env var
auto_offset_resetWhere to start consumingearliest
enable_auto_commitAuto-commit offsetsFalse (manual commit)

Event Processing

The consumer processes events by type:

  1. Thinking Trace Events -- Upserts traces to Dgraph with all steps and metadata
  2. Decision Events -- Stores decisions in the bi-temporal store with actor chain
  3. Trace Events -- Updates trace status and completion metadata

Failed messages are logged with the error details and skipped (at-least-once semantics with manual retry).


Producer Integration

The producer is integrated into the AgentThinkingCaptureService and streams events as they are captured:

service = AgentThinkingCaptureService(
    dgraph_store=store,
    kafka_producer=producer,
)

Events are published asynchronously without blocking the agent execution path.


Feature Flag Control

Kafka integration is controlled by the context_graph_kafka feature flag:

Flag StateBehavior
DisabledEvents are stored directly to Dgraph only
EnabledEvents are published to Kafka and consumed asynchronously

Monitoring

MetricDescription
context_graph_kafka_messages_processedTotal messages successfully processed
context_graph_kafka_messages_failedTotal messages that failed processing
context_graph_kafka_consumer_lagConsumer group lag