Kafka Producers and Consumers
The Context Graph uses Kafka for streaming agent thinking events and decision events between services. The ContextGraphKafkaConsumer subscribes to event topics and persists them to Dgraph, while the producer streams events from the capture service for downstream analytics.
Overview
Kafka integration enables decoupled, asynchronous processing of context graph events. Thinking traces can be captured at the orchestrator level and consumed by multiple downstream services without blocking the agent execution path.
Source: data-plane/ai-service/src/context_graph/integration/kafka_consumer.py
Topics
| Topic | Description | Producer | Consumer |
|---|---|---|---|
agent-thinking-events | Agent thinking trace lifecycle events | Orchestrator Hooks | Context Graph Consumer |
agent-decision-events | Decision capture events | Agent Decision Capture | Context Graph Consumer |
agent-trace-events | Trace start/complete events | Agent Trace Service | Context Graph Consumer |
Consumer Configuration
The consumer uses manual commit mode for at-least-once delivery:
consumer = ContextGraphKafkaConsumer(
dgraph_store=store,
config=kafka_config,
group_id="context-graph-dgraph-consumer",
)
await consumer.start()| Parameter | Description | Default |
|---|---|---|
group_id | Kafka consumer group | context-graph-dgraph-consumer |
bootstrap_servers | Kafka broker addresses | From KAFKA_BOOTSTRAP_SERVERS env var |
auto_offset_reset | Where to start consuming | earliest |
enable_auto_commit | Auto-commit offsets | False (manual commit) |
Event Processing
The consumer processes events by type:
- Thinking Trace Events -- Upserts traces to Dgraph with all steps and metadata
- Decision Events -- Stores decisions in the bi-temporal store with actor chain
- Trace Events -- Updates trace status and completion metadata
Failed messages are logged with the error details and skipped (at-least-once semantics with manual retry).
Producer Integration
The producer is integrated into the AgentThinkingCaptureService and streams events as they are captured:
service = AgentThinkingCaptureService(
dgraph_store=store,
kafka_producer=producer,
)Events are published asynchronously without blocking the agent execution path.
Feature Flag Control
Kafka integration is controlled by the context_graph_kafka feature flag:
| Flag State | Behavior |
|---|---|
| Disabled | Events are stored directly to Dgraph only |
| Enabled | Events are published to Kafka and consumed asynchronously |
Monitoring
| Metric | Description |
|---|---|
context_graph_kafka_messages_processed | Total messages successfully processed |
context_graph_kafka_messages_failed | Total messages that failed processing |
context_graph_kafka_consumer_lag | Consumer group lag |