MATIH Platform is in active MVP development. Documentation reflects current implementation status.
12. AI Service
Service Architecture

AI Service Architecture

The AI Service is a Python/FastAPI application comprising 65+ modules organized into distinct subsystems: agent orchestration, SQL generation, retrieval-augmented generation, BI analytics, streaming, session management, feedback, personalization, and context graph integration. This section examines the architectural decisions, module organization, dependency relationships, configuration management, and deployment topology that make the AI Service the most complex single microservice in the MATIH platform.


Module Organization

The AI Service source tree is organized under data-plane/ai-service/src/ with the following top-level structure:

src/
  main.py                    # FastAPI application entry point
  config/                    # Application settings and configuration
    settings.py              # Pydantic Settings with env var binding
  agents/                    # Core agent subsystem (25+ files)
    orchestrator.py          # AgentOrchestrator
    multi_agent_router.py    # MultiAgentExecutor, routing strategies
    models.py                # Agent data models
    llm_providers.py         # LLM client abstraction
    tools.py                 # ToolRegistry
    memory_stores.py         # MemoryManager, HybridMemory
    streaming.py             # StreamEvent, SSE/WebSocket adapters
    websocket_api.py         # WebSocket connection management
    hitl.py                  # Human-in-the-loop approval
    hitl_enhanced.py         # Guardrails, checkpoints, interrupts
    langgraph.py             # LangGraph workflow integration
    prompt_engineering.py    # Template versioning, A/B testing
    specialized_agents.py    # BaseSpecializedAgent, AgentRegistry
    guardrails/              # Content safety guardrails
    persona/                 # Agent persona management
    memory/                  # Advanced memory subsystem
    session/                 # Agent-level session management
    studio/                  # Agent Studio for configuration
    quality_metrics/         # Response quality tracking
    approval/                # Approval workflow stores
    drift_detection/         # Agent drift monitoring
    hallucination_classifier/ # Response hallucination detection
  sql_generation/            # Text-to-SQL subsystem
    generator.py             # SQLGenerator pipeline
    enhanced_generator.py    # EnhancedSQLGenerator
    query_decomposer.py      # QueryDecomposer
    sql_optimizer.py         # SQLOptimizer, DialectConverter
    validator.py             # SQLValidator
    suggestions.py           # SuggestionService
    benchmark/               # SQL generation benchmarking
  rag/                       # Retrieval-Augmented Generation
    advanced_rag.py          # Hybrid search, query expansion, self-reflection
  bi/                        # BI-specific analytics subsystem
    agents/                  # BI agent implementations
      orchestrator.py        # AnalyticsOrchestrator state machine
      router_agent.py        # RouterAgent with intent classification
      sql_agent.py           # SQLAgent for BI queries
      analysis_agent.py      # AnalysisAgent for statistics
      viz_agent.py           # VisualizationAgent
      docs_agent.py          # DocumentationAgent
    dashboard/               # Dashboard management
    api/                     # BI-specific API routes
    visualization/           # Chart generation
    widgets/                 # Widget configuration
    export/                  # Report export
  session/                   # Session management subsystem
    manager.py               # SessionManager
    store.py                 # SessionStore interface
    factory.py               # Store factory with Redis/Postgres backends
    summarization.py         # Conversation summarization
    privacy.py               # PII handling in sessions
  feedback/                  # Feedback and RLHF subsystem
    models.py                # FeedbackEvent, FeedbackType, FeedbackSource
    integration/             # Kafka integration for feedback events
    learning/                # Learning from feedback signals
    pipeline/                # Feedback processing pipeline
    insights/                # Feedback analytics
    collectors/              # Multi-source feedback collection
  personalization/           # Personalization engine
    engine.py                # PersonalizationEngine
    models.py                # User profiles, preferences
    feedback.py              # Feedback collector integration
    router.py                # Personalized routing
  context_graph/             # Context Graph integration
    integration/             # Orchestrator hooks, Kafka, metrics bridge
    services/                # Thinking capture, trace, feedback stores
    embeddings/              # Thinking embeddings
    security/                # Authorization, permissions
    config/                  # Tenant configuration
  llm/                       # LLM subsystem
    cache/                   # Response caching
    context/                 # Context management
    router/                  # Model routing
    validation/              # Response validation
    performance/             # Performance tracking
    infrastructure/          # LLM infrastructure management
  vector_store/              # Qdrant integration
  storage/                   # Database connection management
    database.py              # SQLAlchemy async engine
    pool.py                  # Connection pooling
  prompt_ab_testing/         # Prompt A/B testing infrastructure
  observability/             # Metrics, tracing, logging
  intent/                    # Intent classification
  knowledge/                 # Knowledge base management
  semantic/                  # Semantic processing
  evaluation/                # Model evaluation
  quality/                   # Quality assurance

Application Entry Point

The FastAPI application is defined in main.py and follows a layered initialization pattern:

from fastapi import FastAPI
from fastapi.middleware.cors import CORSMiddleware
 
app = FastAPI(
    title="MATIH AI Service",
    version="1.0.0",
    description="Conversational Analytics Engine",
)
 
# CORS middleware for frontend integration
app.add_middleware(
    CORSMiddleware,
    allow_origins=["*"],
    allow_credentials=True,
    allow_methods=["*"],
    allow_headers=["*"],
)
 
# Include route modules
app.include_router(conversation_router, prefix="/api/v1")
app.include_router(text_to_sql_router, prefix="/api/v1")
app.include_router(bi_router, prefix="/api/v1/bi")
app.include_router(feedback_router, prefix="/api/v1/feedback")
app.include_router(ws_router)  # WebSocket routes at /ws

Lifecycle Hooks

The application uses FastAPI's startup and shutdown events for resource initialization:

@app.on_event("startup")
async def startup():
    # Initialize database connections
    await initialize_database()
    # Initialize Redis for sessions
    await initialize_session_store()
    # Initialize Qdrant vector store
    await initialize_vector_store()
    # Initialize Context Graph hooks
    await initialize_context_graph()
 
@app.on_event("shutdown")
async def shutdown():
    await close_session_store()
    await close_database()

Configuration Management

The AI Service uses Pydantic Settings for configuration, binding environment variables to typed configuration fields:

# config/settings.py
from pydantic_settings import BaseSettings
 
class Settings(BaseSettings):
    # Service identity
    service_name: str = "ai-service"
    service_port: int = 8000
    environment: str = "development"
 
    # Database
    database_url: str = "postgresql+asyncpg://..."
    database_pool_size: int = 20
    database_max_overflow: int = 10
 
    # Redis
    redis_url: str = "redis://localhost:6379"
    redis_pool_size: int = 10
 
    # LLM Configuration
    openai_api_key: str | None = None
    anthropic_api_key: str | None = None
    default_llm_provider: str = "openai"
    default_model: str = "gpt-4"
    default_temperature: float = 0.7
    default_max_tokens: int = 4096
 
    # Qdrant
    qdrant_url: str = "http://localhost:6333"
    qdrant_collection: str = "schema_embeddings"
 
    # SQL Generation
    context_top_k: int = 5
    max_sql_retries: int = 3
 
    # Kafka
    kafka_bootstrap_servers: str = "localhost:29092"
    kafka_consumer_group: str = "ai-service"
 
    # Context Graph
    dgraph_url: str = "http://localhost:8080"
    context_graph_enabled: bool = True
 
    class Config:
        env_file = ".env"
        env_prefix = "MATIH_"

Environment-Specific Overrides

Configuration cascades from defaults through environment variables, with Kubernetes deploying the service with environment-specific values injected via ConfigMaps and Secrets:

Configuration SourcePriorityExample
Default values in Settings classLowestdefault_temperature: float = 0.7
.env file (local development)MediumMATIH_OPENAI_API_KEY=sk-xxx
Kubernetes ConfigMapHighDatabase URLs, feature flags
Kubernetes Secret (via secretKeyRef)HighestAPI keys, database passwords

Dependency Injection

The AI Service uses a lightweight dependency injection pattern through FastAPI's Depends mechanism and singleton factories:

# Factory pattern for shared resources
_settings: Settings | None = None
 
def get_settings() -> Settings:
    global _settings
    if _settings is None:
        _settings = Settings()
    return _settings
 
# FastAPI dependency injection
from fastapi import Depends
 
@app.post("/api/v1/conversation")
async def conversation(
    request: ConversationRequest,
    settings: Settings = Depends(get_settings),
    session_store: SessionStore = Depends(get_session_store),
):
    ...

Service Layer Architecture

The service follows a clean layered architecture:

+---------------------------------------------------+
|                   API Layer                        |
|        FastAPI Routes, WebSocket Handlers          |
+---------------------------------------------------+
                        |
+---------------------------------------------------+
|               Orchestration Layer                  |
|    AgentOrchestrator, AnalyticsOrchestrator,       |
|    MultiAgentExecutor                              |
+---------------------------------------------------+
                        |
+---------------------------------------------------+
|                 Agent Layer                        |
|    RouterAgent, SQLAgent, AnalysisAgent,           |
|    VisualizationAgent, DocumentationAgent          |
+---------------------------------------------------+
                        |
+---------------------------------------------------+
|               Service Layer                        |
|    SQLGenerator, RAGPipeline, SessionManager,      |
|    PersonalizationEngine, FeedbackPipeline         |
+---------------------------------------------------+
                        |
+---------------------------------------------------+
|             Infrastructure Layer                   |
|    LLMClient, QdrantStore, Redis, PostgreSQL,      |
|    Kafka, Dgraph                                   |
+---------------------------------------------------+

LLM Provider Abstraction

The AI Service abstracts LLM providers behind a unified LLMClient interface, allowing seamless switching between providers:

class LLMClient:
    """LLM client interface wrapping llm_providers implementations."""
 
    def __init__(
        self,
        provider: str = "openai",
        model: str | None = None,
        **kwargs,
    ) -> None:
        from agents.llm_providers import get_llm_client, LLMMessage
        self._client = get_llm_client(
            provider=provider,
            model=model,
            **kwargs,
        )
 
    async def chat(
        self,
        messages: list[dict[str, Any]],
        tools: list[dict[str, Any]] | None = None,
        temperature: float = 0.7,
        max_tokens: int = 4096,
        stream: bool = False,
    ) -> dict[str, Any]:
        """Send chat request to LLM."""
        ...
 
    async def stream_chat(
        self,
        messages: list[dict[str, Any]],
        tools: list[dict[str, Any]] | None = None,
        temperature: float = 0.7,
        max_tokens: int = 4096,
    ) -> AsyncIterator[dict[str, Any]]:
        """Stream chat response from LLM."""
        ...

Supported Providers

ProviderModel ExamplesSpecial Features
OpenAIgpt-4, gpt-4-turbo, gpt-3.5-turboFunction calling, streaming
Anthropicclaude-3-opus, claude-3-sonnetExtended thinking, large context
vLLMSelf-hosted open-source modelsLow latency, cost optimization

The provider selection happens at three levels:

  1. Global default: Set via MATIH_DEFAULT_LLM_PROVIDER environment variable
  2. Agent-level: Each agent can specify its preferred provider in AgentConfig
  3. Request-level: API callers can override the provider per request

Multi-Tenancy

Every request to the AI Service carries a tenant_id that flows through all layers:

@dataclass
class AgentContext:
    """Context for agent execution."""
    session_id: str
    tenant_id: str           # Tenant isolation boundary
    user_id: str | None
    current_input: str = ""
    status: AgentStatus = AgentStatus.IDLE
    metadata: dict[str, Any] = field(default_factory=dict)

Tenant isolation is enforced at multiple levels:

LayerIsolation Mechanism
Session storageRedis keys prefixed with tenant:{tenant_id}:session:
Vector storeQdrant payloads filtered by tenant_id
SQL generationSchema context retrieved per tenant
FeedbackEvents tagged and partitioned by tenant
PersonalizationProfiles stored per (tenant_id, user_id) pair
Context GraphDgraph queries filtered by tenant URN

Error Handling

The AI Service implements a structured error handling strategy:

from fastapi import HTTPException
from fastapi.responses import JSONResponse
 
class AIServiceError(Exception):
    """Base exception for AI Service errors."""
    def __init__(self, message: str, code: str, status_code: int = 500):
        self.message = message
        self.code = code
        self.status_code = status_code
 
class LLMProviderError(AIServiceError):
    """Error from LLM provider."""
    pass
 
class SQLGenerationError(AIServiceError):
    """Error in SQL generation pipeline."""
    pass
 
class SessionNotFoundError(AIServiceError):
    """Session not found."""
    def __init__(self, session_id: str):
        super().__init__(
            message=f"Session {session_id} not found",
            code="SESSION_NOT_FOUND",
            status_code=404,
        )
 
@app.exception_handler(AIServiceError)
async def ai_service_error_handler(request, exc: AIServiceError):
    return JSONResponse(
        status_code=exc.status_code,
        content={
            "error": exc.code,
            "message": exc.message,
        },
    )

Graceful Degradation

The AI Service follows a graceful degradation pattern for optional subsystems. If the Context Graph, Redis, or Kafka are unavailable, the service continues operating with reduced functionality:

# Context Graph hooks degrade gracefully
try:
    from context_graph.integration.orchestrator_hooks import (
        get_orchestrator_hooks,
    )
    _cg_hooks = get_orchestrator_hooks()
    if _cg_hooks and _cg_hooks.enabled:
        _thinking_trace_id = await _cg_hooks.on_agent_message_start(
            session_id, tenant_id, message, agent_id,
        )
except Exception:
    _cg_hooks = None  # Silently degrade

Observability

The AI Service instruments all critical paths with structured logging via structlog, distributed tracing, and Prometheus metrics:

Structured Logging

import structlog
 
logger = structlog.get_logger()
 
logger.info(
    "processing_message",
    agent_id=agent_id,
    session_id=session_id,
    message_length=len(message),
    tenant_id=tenant_id,
)

Key Metrics

MetricTypeLabels
ai_service_request_duration_secondsHistogramendpoint, tenant_id, status
ai_service_llm_tokens_totalCounterprovider, model, direction
ai_service_llm_latency_secondsHistogramprovider, model
ai_service_sql_generation_duration_secondsHistogramtenant_id, success
ai_service_agent_execution_countCounteragent_type, status
ai_service_active_sessionsGaugetenant_id
ai_service_websocket_connectionsGaugetenant_id
ai_service_rag_retrieval_duration_secondsHistogramstrategy

Deployment Topology

The AI Service runs as a Kubernetes Deployment with horizontal pod autoscaling:

# infrastructure/helm/ai-service/values.yaml
replicaCount: 2
 
image:
  repository: matih/ai-service
  tag: latest
 
resources:
  requests:
    cpu: 500m
    memory: 1Gi
  limits:
    cpu: 2000m
    memory: 4Gi
 
autoscaling:
  enabled: true
  minReplicas: 2
  maxReplicas: 10
  targetCPUUtilizationPercentage: 70
 
service:
  port: 8000
  type: ClusterIP
 
env:
  - name: DATABASE_URL
    valueFrom:
      secretKeyRef:
        name: ai-service-secrets
        key: database-url
  - name: OPENAI_API_KEY
    valueFrom:
      secretKeyRef:
        name: ai-service-secrets
        key: openai-api-key

Health Checks

The service exposes standard health check endpoints:

EndpointPurposeChecks
GET /healthLiveness probeApplication is running
GET /health/readyReadiness probeDatabase, Redis, Qdrant connections
GET /health/startupStartup probeInitial model loading complete

Security

The AI Service enforces security at multiple layers:

  1. Authentication: JWT tokens validated on every request, extracting tenant_id and user_id
  2. Authorization: Role-based access control for administrative endpoints
  3. Input sanitization: User inputs sanitized before processing through agents
  4. Memory guardrails: PII detection and sanitization in conversation memory
  5. Content safety: Guardrail registry for blocking harmful content generation
  6. Network policy: Kubernetes NetworkPolicy restricting ingress/egress to known services
# infrastructure/helm/ai-service/templates/networkpolicy.yaml
apiVersion: networking.k8s.io/v1
kind: NetworkPolicy
metadata:
  name: ai-service
spec:
  podSelector:
    matchLabels:
      app: ai-service
  ingress:
    - from:
        - podSelector:
            matchLabels:
              app: api-gateway
      ports:
        - port: 8000
  egress:
    - to:
        - podSelector:
            matchLabels:
              app: query-engine
        - podSelector:
            matchLabels:
              app: redis
        - podSelector:
            matchLabels:
              app: qdrant