Kafka Event Streaming
The AI Service uses Apache Kafka as its primary event streaming backbone for asynchronous communication with other platform services. Events cover feedback collection, agent thinking traces, learning signals, insights, and query lineage. The implementation uses aiokafka for non-blocking producer and consumer operations within the FastAPI async runtime.
Kafka Configuration
Kafka connection and topic settings are managed through the KafkaConfig Pydantic model in src/feedback/integration/kafka/config.py:
class KafkaConfig(BaseModel):
bootstrap_servers: str = "localhost:9092"
security_protocol: str = "PLAINTEXT"
sasl_mechanism: str | None = None
sasl_username: str | None = None
sasl_password: str | None = NoneAll values are sourced from environment variables with the KAFKA_ prefix.
Topic Definitions
| Topic | Partitions | Replication | Retention | Purpose |
|---|---|---|---|---|
ai-feedback-events | 6 | 3 | 7 days | User feedback on AI responses |
ai-learning-signals | 3 | 3 | 7 days | Processed signals for model improvement |
ai-insights | 3 | 3 | 7 days | Analytical insights for alerting |
agent-thinking-events | 6 | 3 | 7 days | Agent reasoning traces for Context Graph |
agent-decision-events | 6 | 3 | 7 days | Router and orchestrator decisions |
agent-trace-events | 6 | 3 | 7 days | Full execution traces |
agent-metrics-events | 3 | 3 | 7 days | Quality and performance metrics |
query-lineage-events | 3 | 3 | 30 days | SQL lineage and provenance |
audit-events | 6 | 3 | 90 days | Read-only audit trail |
Producer Pattern
The AI Service produces events using the KafkaProducer wrapper in src/context_graph/integration/kafka_producer.py:
from aiokafka import AIOKafkaProducer
class KafkaEventProducer:
def __init__(self, config: KafkaConfig):
self._producer = AIOKafkaProducer(
bootstrap_servers=config.bootstrap_servers,
security_protocol=config.security_protocol,
value_serializer=lambda v: json.dumps(v).encode("utf-8"),
)
async def publish(self, topic: str, event: dict, key: str | None = None):
await self._producer.send_and_wait(
topic=topic,
value=event,
key=key.encode("utf-8") if key else None,
)Consumer Pattern
The consumer runs as a background task during application lifecycle:
from aiokafka import AIOKafkaConsumer
class KafkaEventConsumer:
def __init__(self, config: KafkaConfig, topics: list[str]):
self._consumer = AIOKafkaConsumer(
*topics,
bootstrap_servers=config.bootstrap_servers,
group_id=config.consumer_group,
auto_offset_reset="latest",
value_deserializer=lambda v: json.loads(v.decode("utf-8")),
)
async def consume(self, handler: Callable):
async for message in self._consumer:
await handler(message.topic, message.value)Event Schemas
All events include a common envelope with metadata:
{
"event_id": "evt-abc123",
"event_type": "feedback.submitted",
"tenant_id": "acme-corp",
"timestamp": "2025-03-15T10:00:00Z",
"source": "ai-service",
"data": {}
}Security
In production, Kafka connections use SASL/SCRAM authentication with credentials stored in Kubernetes Secrets:
env:
- name: KAFKA_SASL_USERNAME
valueFrom:
secretKeyRef:
name: kafka-credentials
key: username
- name: KAFKA_SASL_PASSWORD
valueFrom:
secretKeyRef:
name: kafka-credentials
key: passwordMonitoring
Kafka producer and consumer metrics are exposed via Prometheus:
| Metric | Type | Description |
|---|---|---|
ai_kafka_messages_produced_total | Counter | Total messages produced by topic |
ai_kafka_messages_consumed_total | Counter | Total messages consumed by topic |
ai_kafka_consumer_lag | Gauge | Consumer group lag by partition |
ai_kafka_produce_latency_seconds | Histogram | Producer send latency |