AI Service Architecture
The AI Service is a Python/FastAPI application comprising 65+ modules organized into distinct subsystems: agent orchestration, SQL generation, retrieval-augmented generation, BI analytics, streaming, session management, feedback, personalization, and context graph integration. This section examines the architectural decisions, module organization, dependency relationships, configuration management, and deployment topology that make the AI Service the most complex single microservice in the MATIH platform.
Module Organization
The AI Service source tree is organized under data-plane/ai-service/src/ with the following top-level structure:
src/
main.py # FastAPI application entry point
config/ # Application settings and configuration
settings.py # Pydantic Settings with env var binding
agents/ # Core agent subsystem (25+ files)
orchestrator.py # AgentOrchestrator
multi_agent_router.py # MultiAgentExecutor, routing strategies
models.py # Agent data models
llm_providers.py # LLM client abstraction
tools.py # ToolRegistry
memory_stores.py # MemoryManager, HybridMemory
streaming.py # StreamEvent, SSE/WebSocket adapters
websocket_api.py # WebSocket connection management
hitl.py # Human-in-the-loop approval
hitl_enhanced.py # Guardrails, checkpoints, interrupts
langgraph.py # LangGraph workflow integration
prompt_engineering.py # Template versioning, A/B testing
specialized_agents.py # BaseSpecializedAgent, AgentRegistry
guardrails/ # Content safety guardrails
persona/ # Agent persona management
memory/ # Advanced memory subsystem
session/ # Agent-level session management
studio/ # Agent Studio for configuration
quality_metrics/ # Response quality tracking
approval/ # Approval workflow stores
drift_detection/ # Agent drift monitoring
hallucination_classifier/ # Response hallucination detection
sql_generation/ # Text-to-SQL subsystem
generator.py # SQLGenerator pipeline
enhanced_generator.py # EnhancedSQLGenerator
query_decomposer.py # QueryDecomposer
sql_optimizer.py # SQLOptimizer, DialectConverter
validator.py # SQLValidator
suggestions.py # SuggestionService
benchmark/ # SQL generation benchmarking
rag/ # Retrieval-Augmented Generation
advanced_rag.py # Hybrid search, query expansion, self-reflection
bi/ # BI-specific analytics subsystem
agents/ # BI agent implementations
orchestrator.py # AnalyticsOrchestrator state machine
router_agent.py # RouterAgent with intent classification
sql_agent.py # SQLAgent for BI queries
analysis_agent.py # AnalysisAgent for statistics
viz_agent.py # VisualizationAgent
docs_agent.py # DocumentationAgent
dashboard/ # Dashboard management
api/ # BI-specific API routes
visualization/ # Chart generation
widgets/ # Widget configuration
export/ # Report export
session/ # Session management subsystem
manager.py # SessionManager
store.py # SessionStore interface
factory.py # Store factory with Redis/Postgres backends
summarization.py # Conversation summarization
privacy.py # PII handling in sessions
feedback/ # Feedback and RLHF subsystem
models.py # FeedbackEvent, FeedbackType, FeedbackSource
integration/ # Kafka integration for feedback events
learning/ # Learning from feedback signals
pipeline/ # Feedback processing pipeline
insights/ # Feedback analytics
collectors/ # Multi-source feedback collection
personalization/ # Personalization engine
engine.py # PersonalizationEngine
models.py # User profiles, preferences
feedback.py # Feedback collector integration
router.py # Personalized routing
context_graph/ # Context Graph integration
integration/ # Orchestrator hooks, Kafka, metrics bridge
services/ # Thinking capture, trace, feedback stores
embeddings/ # Thinking embeddings
security/ # Authorization, permissions
config/ # Tenant configuration
llm/ # LLM subsystem
cache/ # Response caching
context/ # Context management
router/ # Model routing
validation/ # Response validation
performance/ # Performance tracking
infrastructure/ # LLM infrastructure management
vector_store/ # Qdrant integration
storage/ # Database connection management
database.py # SQLAlchemy async engine
pool.py # Connection pooling
prompt_ab_testing/ # Prompt A/B testing infrastructure
observability/ # Metrics, tracing, logging
intent/ # Intent classification
knowledge/ # Knowledge base management
semantic/ # Semantic processing
evaluation/ # Model evaluation
quality/ # Quality assuranceApplication Entry Point
The FastAPI application is defined in main.py and follows a layered initialization pattern:
from fastapi import FastAPI
from fastapi.middleware.cors import CORSMiddleware
app = FastAPI(
title="MATIH AI Service",
version="1.0.0",
description="Conversational Analytics Engine",
)
# CORS middleware for frontend integration
app.add_middleware(
CORSMiddleware,
allow_origins=["*"],
allow_credentials=True,
allow_methods=["*"],
allow_headers=["*"],
)
# Include route modules
app.include_router(conversation_router, prefix="/api/v1")
app.include_router(text_to_sql_router, prefix="/api/v1")
app.include_router(bi_router, prefix="/api/v1/bi")
app.include_router(feedback_router, prefix="/api/v1/feedback")
app.include_router(ws_router) # WebSocket routes at /wsLifecycle Hooks
The application uses FastAPI's startup and shutdown events for resource initialization:
@app.on_event("startup")
async def startup():
# Initialize database connections
await initialize_database()
# Initialize Redis for sessions
await initialize_session_store()
# Initialize Qdrant vector store
await initialize_vector_store()
# Initialize Context Graph hooks
await initialize_context_graph()
@app.on_event("shutdown")
async def shutdown():
await close_session_store()
await close_database()Configuration Management
The AI Service uses Pydantic Settings for configuration, binding environment variables to typed configuration fields:
# config/settings.py
from pydantic_settings import BaseSettings
class Settings(BaseSettings):
# Service identity
service_name: str = "ai-service"
service_port: int = 8000
environment: str = "development"
# Database
database_url: str = "postgresql+asyncpg://..."
database_pool_size: int = 20
database_max_overflow: int = 10
# Redis
redis_url: str = "redis://localhost:6379"
redis_pool_size: int = 10
# LLM Configuration
openai_api_key: str | None = None
anthropic_api_key: str | None = None
default_llm_provider: str = "openai"
default_model: str = "gpt-4"
default_temperature: float = 0.7
default_max_tokens: int = 4096
# Qdrant
qdrant_url: str = "http://localhost:6333"
qdrant_collection: str = "schema_embeddings"
# SQL Generation
context_top_k: int = 5
max_sql_retries: int = 3
# Kafka
kafka_bootstrap_servers: str = "localhost:29092"
kafka_consumer_group: str = "ai-service"
# Context Graph
dgraph_url: str = "http://localhost:8080"
context_graph_enabled: bool = True
class Config:
env_file = ".env"
env_prefix = "MATIH_"Environment-Specific Overrides
Configuration cascades from defaults through environment variables, with Kubernetes deploying the service with environment-specific values injected via ConfigMaps and Secrets:
| Configuration Source | Priority | Example |
|---|---|---|
Default values in Settings class | Lowest | default_temperature: float = 0.7 |
.env file (local development) | Medium | MATIH_OPENAI_API_KEY=sk-xxx |
| Kubernetes ConfigMap | High | Database URLs, feature flags |
Kubernetes Secret (via secretKeyRef) | Highest | API keys, database passwords |
Dependency Injection
The AI Service uses a lightweight dependency injection pattern through FastAPI's Depends mechanism and singleton factories:
# Factory pattern for shared resources
_settings: Settings | None = None
def get_settings() -> Settings:
global _settings
if _settings is None:
_settings = Settings()
return _settings
# FastAPI dependency injection
from fastapi import Depends
@app.post("/api/v1/conversation")
async def conversation(
request: ConversationRequest,
settings: Settings = Depends(get_settings),
session_store: SessionStore = Depends(get_session_store),
):
...Service Layer Architecture
The service follows a clean layered architecture:
+---------------------------------------------------+
| API Layer |
| FastAPI Routes, WebSocket Handlers |
+---------------------------------------------------+
|
+---------------------------------------------------+
| Orchestration Layer |
| AgentOrchestrator, AnalyticsOrchestrator, |
| MultiAgentExecutor |
+---------------------------------------------------+
|
+---------------------------------------------------+
| Agent Layer |
| RouterAgent, SQLAgent, AnalysisAgent, |
| VisualizationAgent, DocumentationAgent |
+---------------------------------------------------+
|
+---------------------------------------------------+
| Service Layer |
| SQLGenerator, RAGPipeline, SessionManager, |
| PersonalizationEngine, FeedbackPipeline |
+---------------------------------------------------+
|
+---------------------------------------------------+
| Infrastructure Layer |
| LLMClient, QdrantStore, Redis, PostgreSQL, |
| Kafka, Dgraph |
+---------------------------------------------------+LLM Provider Abstraction
The AI Service abstracts LLM providers behind a unified LLMClient interface, allowing seamless switching between providers:
class LLMClient:
"""LLM client interface wrapping llm_providers implementations."""
def __init__(
self,
provider: str = "openai",
model: str | None = None,
**kwargs,
) -> None:
from agents.llm_providers import get_llm_client, LLMMessage
self._client = get_llm_client(
provider=provider,
model=model,
**kwargs,
)
async def chat(
self,
messages: list[dict[str, Any]],
tools: list[dict[str, Any]] | None = None,
temperature: float = 0.7,
max_tokens: int = 4096,
stream: bool = False,
) -> dict[str, Any]:
"""Send chat request to LLM."""
...
async def stream_chat(
self,
messages: list[dict[str, Any]],
tools: list[dict[str, Any]] | None = None,
temperature: float = 0.7,
max_tokens: int = 4096,
) -> AsyncIterator[dict[str, Any]]:
"""Stream chat response from LLM."""
...Supported Providers
| Provider | Model Examples | Special Features |
|---|---|---|
| OpenAI | gpt-4, gpt-4-turbo, gpt-3.5-turbo | Function calling, streaming |
| Anthropic | claude-3-opus, claude-3-sonnet | Extended thinking, large context |
| vLLM | Self-hosted open-source models | Low latency, cost optimization |
The provider selection happens at three levels:
- Global default: Set via
MATIH_DEFAULT_LLM_PROVIDERenvironment variable - Agent-level: Each agent can specify its preferred provider in
AgentConfig - Request-level: API callers can override the provider per request
Multi-Tenancy
Every request to the AI Service carries a tenant_id that flows through all layers:
@dataclass
class AgentContext:
"""Context for agent execution."""
session_id: str
tenant_id: str # Tenant isolation boundary
user_id: str | None
current_input: str = ""
status: AgentStatus = AgentStatus.IDLE
metadata: dict[str, Any] = field(default_factory=dict)Tenant isolation is enforced at multiple levels:
| Layer | Isolation Mechanism |
|---|---|
| Session storage | Redis keys prefixed with tenant:{tenant_id}:session: |
| Vector store | Qdrant payloads filtered by tenant_id |
| SQL generation | Schema context retrieved per tenant |
| Feedback | Events tagged and partitioned by tenant |
| Personalization | Profiles stored per (tenant_id, user_id) pair |
| Context Graph | Dgraph queries filtered by tenant URN |
Error Handling
The AI Service implements a structured error handling strategy:
from fastapi import HTTPException
from fastapi.responses import JSONResponse
class AIServiceError(Exception):
"""Base exception for AI Service errors."""
def __init__(self, message: str, code: str, status_code: int = 500):
self.message = message
self.code = code
self.status_code = status_code
class LLMProviderError(AIServiceError):
"""Error from LLM provider."""
pass
class SQLGenerationError(AIServiceError):
"""Error in SQL generation pipeline."""
pass
class SessionNotFoundError(AIServiceError):
"""Session not found."""
def __init__(self, session_id: str):
super().__init__(
message=f"Session {session_id} not found",
code="SESSION_NOT_FOUND",
status_code=404,
)
@app.exception_handler(AIServiceError)
async def ai_service_error_handler(request, exc: AIServiceError):
return JSONResponse(
status_code=exc.status_code,
content={
"error": exc.code,
"message": exc.message,
},
)Graceful Degradation
The AI Service follows a graceful degradation pattern for optional subsystems. If the Context Graph, Redis, or Kafka are unavailable, the service continues operating with reduced functionality:
# Context Graph hooks degrade gracefully
try:
from context_graph.integration.orchestrator_hooks import (
get_orchestrator_hooks,
)
_cg_hooks = get_orchestrator_hooks()
if _cg_hooks and _cg_hooks.enabled:
_thinking_trace_id = await _cg_hooks.on_agent_message_start(
session_id, tenant_id, message, agent_id,
)
except Exception:
_cg_hooks = None # Silently degradeObservability
The AI Service instruments all critical paths with structured logging via structlog, distributed tracing, and Prometheus metrics:
Structured Logging
import structlog
logger = structlog.get_logger()
logger.info(
"processing_message",
agent_id=agent_id,
session_id=session_id,
message_length=len(message),
tenant_id=tenant_id,
)Key Metrics
| Metric | Type | Labels |
|---|---|---|
ai_service_request_duration_seconds | Histogram | endpoint, tenant_id, status |
ai_service_llm_tokens_total | Counter | provider, model, direction |
ai_service_llm_latency_seconds | Histogram | provider, model |
ai_service_sql_generation_duration_seconds | Histogram | tenant_id, success |
ai_service_agent_execution_count | Counter | agent_type, status |
ai_service_active_sessions | Gauge | tenant_id |
ai_service_websocket_connections | Gauge | tenant_id |
ai_service_rag_retrieval_duration_seconds | Histogram | strategy |
Deployment Topology
The AI Service runs as a Kubernetes Deployment with horizontal pod autoscaling:
# infrastructure/helm/ai-service/values.yaml
replicaCount: 2
image:
repository: matih/ai-service
tag: latest
resources:
requests:
cpu: 500m
memory: 1Gi
limits:
cpu: 2000m
memory: 4Gi
autoscaling:
enabled: true
minReplicas: 2
maxReplicas: 10
targetCPUUtilizationPercentage: 70
service:
port: 8000
type: ClusterIP
env:
- name: DATABASE_URL
valueFrom:
secretKeyRef:
name: ai-service-secrets
key: database-url
- name: OPENAI_API_KEY
valueFrom:
secretKeyRef:
name: ai-service-secrets
key: openai-api-keyHealth Checks
The service exposes standard health check endpoints:
| Endpoint | Purpose | Checks |
|---|---|---|
GET /health | Liveness probe | Application is running |
GET /health/ready | Readiness probe | Database, Redis, Qdrant connections |
GET /health/startup | Startup probe | Initial model loading complete |
Security
The AI Service enforces security at multiple layers:
- Authentication: JWT tokens validated on every request, extracting
tenant_idanduser_id - Authorization: Role-based access control for administrative endpoints
- Input sanitization: User inputs sanitized before processing through agents
- Memory guardrails: PII detection and sanitization in conversation memory
- Content safety: Guardrail registry for blocking harmful content generation
- Network policy: Kubernetes NetworkPolicy restricting ingress/egress to known services
# infrastructure/helm/ai-service/templates/networkpolicy.yaml
apiVersion: networking.k8s.io/v1
kind: NetworkPolicy
metadata:
name: ai-service
spec:
podSelector:
matchLabels:
app: ai-service
ingress:
- from:
- podSelector:
matchLabels:
app: api-gateway
ports:
- port: 8000
egress:
- to:
- podSelector:
matchLabels:
app: query-engine
- podSelector:
matchLabels:
app: redis
- podSelector:
matchLabels:
app: qdrant