MATIH Platform is in active MVP development. Documentation reflects current implementation status.
12. AI Service
Integrations
Kafka Event Streaming

Kafka Event Streaming

The AI Service uses Apache Kafka as its primary event streaming backbone for asynchronous communication with other platform services. Events cover feedback collection, agent thinking traces, learning signals, insights, and query lineage. The implementation uses aiokafka for non-blocking producer and consumer operations within the FastAPI async runtime.


Kafka Configuration

Kafka connection and topic settings are managed through the KafkaConfig Pydantic model in src/feedback/integration/kafka/config.py:

class KafkaConfig(BaseModel):
    bootstrap_servers: str = "localhost:9092"
    security_protocol: str = "PLAINTEXT"
    sasl_mechanism: str | None = None
    sasl_username: str | None = None
    sasl_password: str | None = None

All values are sourced from environment variables with the KAFKA_ prefix.

Topic Definitions

TopicPartitionsReplicationRetentionPurpose
ai-feedback-events637 daysUser feedback on AI responses
ai-learning-signals337 daysProcessed signals for model improvement
ai-insights337 daysAnalytical insights for alerting
agent-thinking-events637 daysAgent reasoning traces for Context Graph
agent-decision-events637 daysRouter and orchestrator decisions
agent-trace-events637 daysFull execution traces
agent-metrics-events337 daysQuality and performance metrics
query-lineage-events3330 daysSQL lineage and provenance
audit-events6390 daysRead-only audit trail

Producer Pattern

The AI Service produces events using the KafkaProducer wrapper in src/context_graph/integration/kafka_producer.py:

from aiokafka import AIOKafkaProducer
 
class KafkaEventProducer:
    def __init__(self, config: KafkaConfig):
        self._producer = AIOKafkaProducer(
            bootstrap_servers=config.bootstrap_servers,
            security_protocol=config.security_protocol,
            value_serializer=lambda v: json.dumps(v).encode("utf-8"),
        )
 
    async def publish(self, topic: str, event: dict, key: str | None = None):
        await self._producer.send_and_wait(
            topic=topic,
            value=event,
            key=key.encode("utf-8") if key else None,
        )

Consumer Pattern

The consumer runs as a background task during application lifecycle:

from aiokafka import AIOKafkaConsumer
 
class KafkaEventConsumer:
    def __init__(self, config: KafkaConfig, topics: list[str]):
        self._consumer = AIOKafkaConsumer(
            *topics,
            bootstrap_servers=config.bootstrap_servers,
            group_id=config.consumer_group,
            auto_offset_reset="latest",
            value_deserializer=lambda v: json.loads(v.decode("utf-8")),
        )
 
    async def consume(self, handler: Callable):
        async for message in self._consumer:
            await handler(message.topic, message.value)

Event Schemas

All events include a common envelope with metadata:

{
  "event_id": "evt-abc123",
  "event_type": "feedback.submitted",
  "tenant_id": "acme-corp",
  "timestamp": "2025-03-15T10:00:00Z",
  "source": "ai-service",
  "data": {}
}

Security

In production, Kafka connections use SASL/SCRAM authentication with credentials stored in Kubernetes Secrets:

env:
  - name: KAFKA_SASL_USERNAME
    valueFrom:
      secretKeyRef:
        name: kafka-credentials
        key: username
  - name: KAFKA_SASL_PASSWORD
    valueFrom:
      secretKeyRef:
        name: kafka-credentials
        key: password

Monitoring

Kafka producer and consumer metrics are exposed via Prometheus:

MetricTypeDescription
ai_kafka_messages_produced_totalCounterTotal messages produced by topic
ai_kafka_messages_consumed_totalCounterTotal messages consumed by topic
ai_kafka_consumer_lagGaugeConsumer group lag by partition
ai_kafka_produce_latency_secondsHistogramProducer send latency