MATIH Platform is in active MVP development. Documentation reflects current implementation status.
12. AI Service
Agent Orchestrator

Agent Orchestrator

The Agent Orchestrator is the central coordination layer of the AI Service, responsible for routing user messages through specialized agents, managing conversation memory, executing tools, enforcing human-in-the-loop approval workflows, and integrating with LangGraph for complex multi-step workflows. This section provides a deep dive into the AgentOrchestrator, MultiAgentExecutor, and their supporting subsystems.


Core Data Models

Before examining the orchestrator itself, it is essential to understand the data models that define the agent ecosystem. These are defined in agents/models.py.

Agent Types and Roles

class AgentType(str, Enum):
    """Types of agents in the system."""
    CONVERSATIONAL = "conversational"  # Multi-turn conversation
    TASK = "task"                      # Single-task execution
    SUPERVISOR = "supervisor"          # Supervises other agents
    SPECIALIST = "specialist"          # Domain-specific expertise
    ROUTER = "router"                  # Routes to other agents
 
class AgentRole(str, Enum):
    """Agent roles in multi-agent system."""
    COORDINATOR = "coordinator"   # Coordinates multi-agent workflows
    ANALYST = "analyst"           # Data analysis
    RESEARCHER = "researcher"     # Information retrieval
    EXECUTOR = "executor"         # Action execution
    VALIDATOR = "validator"       # Output validation
    SUMMARIZER = "summarizer"     # Result summarization

Agent Status Lifecycle

Agents transition through well-defined states during execution:

IDLE --> THINKING --> EXECUTING --> COMPLETED
  |         |            |             |
  |         |            v             |
  |         |     WAITING_APPROVAL     |
  |         |            |             |
  |         v            v             v
  +-------> FAILED <-----+-------------+
StatusDescription
IDLEAgent is registered but not currently processing
THINKINGAgent is building context and calling the LLM
EXECUTINGAgent is executing tool calls
WAITING_APPROVALExecution paused pending human approval
COMPLETEDProcessing finished successfully
FAILEDProcessing encountered an unrecoverable error

Agent Configuration

Each agent is configured through an AgentConfig dataclass:

@dataclass
class AgentConfig:
    """Configuration for an agent."""
    name: str
    description: str
    system_prompt: str
    agent_type: AgentType = AgentType.CONVERSATIONAL
    role: AgentRole = AgentRole.ANALYST
    temperature: float = 0.7
    max_tokens: int = 4096
    max_iterations: int = 10     # Maximum tool-call loops
    tools_enabled: bool = True
    memory_enabled: bool = True
    approval_required: bool = False

Message Protocol

Messages flowing through the orchestrator use a structured format:

@dataclass
class AgentMessage:
    """A message in agent conversation."""
    id: str                              # Unique message ID
    role: MessageRole                    # USER, ASSISTANT, SYSTEM, TOOL, HUMAN
    content: str                         # Message content
    name: str | None = None              # Tool name (for tool messages)
    tool_call_id: str | None = None      # Tool call reference
    metadata: dict[str, Any] = field(default_factory=dict)
    timestamp: datetime = field(default_factory=datetime.utcnow)

AgentOrchestrator

The AgentOrchestrator class in agents/orchestrator.py is the primary entry point for processing user messages. It integrates four core subsystems: LLM communication, tool execution, memory management, and HITL approval.

Constructor

class AgentOrchestrator:
    """Orchestrates agent execution with tools, memory, and HITL."""
 
    def __init__(
        self,
        llm_client: LLMClient | None = None,
        tool_registry: ToolRegistry | None = None,
        memory_manager: MemoryManager | None = None,
        approval_handler: ApprovalHandler | None = None,
        memory_guardrail_integration: Any | None = None,
    ) -> None:
        self._llm = llm_client
        self._tools = tool_registry or create_default_registry()
        self._memory = memory_manager or MemoryManager()
        self._approval = approval_handler or get_approval_handler()
        self._memory_guardrail = memory_guardrail_integration
        self._agents: dict[str, ConversationalAgent] = {}
        self._active_contexts: dict[str, AgentContext] = {}
        self._message_count: dict[str, int] = {}
        self._compliance_check_interval = 10

Agent Registration

Agents are registered with the orchestrator before they can process messages:

orchestrator = AgentOrchestrator(llm_client=llm)
 
# Register a data analyst agent
analyst = ConversationalAgent(
    id="data-analyst",
    config=AgentConfig(
        name="Data Analyst",
        description="Analyzes data and generates insights",
        system_prompt="You are a data analyst for enterprise data...",
        role=AgentRole.ANALYST,
        temperature=0.3,
        max_iterations=5,
    ),
)
orchestrator.register_agent(analyst)

Message Processing Pipeline

The process_message method is the main entry point. It executes a multi-step pipeline:

async def process_message(
    self,
    agent_id: str,
    session_id: str,
    tenant_id: str,
    message: str,
    user_id: str | None = None,
    stream: bool = False,
) -> AgentResponse | AsyncIterator[StreamChunk]:

The pipeline consists of these steps:

  1. Agent lookup: Find the registered agent by ID
  2. Context retrieval: Get or create the AgentContext for the session
  3. Memory loading: Retrieve conversation history from MemoryManager
  4. Guardrail check: Sanitize input through memory guardrails if configured
  5. Memory write: Add the user message to conversation memory
  6. Compliance check: Periodic policy compliance verification (every N messages)
  7. Context Graph trace start: Begin thinking trace capture via orchestrator hooks
  8. LLM processing: Send messages to the LLM with tool definitions
  9. Tool execution loop: Execute tool calls and feed results back to LLM
  10. HITL check: Pause for approval if the tool call requires it
  11. Response assembly: Build the final AgentResponse
  12. Context Graph trace complete: Record thinking trace outcome

The Response Object

@dataclass
class AgentResponse:
    """Response from agent processing."""
    id: str                                    # Unique response ID
    content: str = ""                          # Text response
    tool_calls: list[ToolCall] = field(...)     # Tool calls made
    tool_results: list[ToolResult] = field(...) # Tool execution results
    reasoning: str | None = None               # Agent's reasoning (if exposed)
    requires_approval: bool = False            # Whether HITL approval needed
    approval_request: ApprovalRequest | None = None
    metadata: dict[str, Any] = field(...)
    execution_time_ms: float = 0.0
    tokens_used: int = 0

Synchronous Processing

The _process_sync method implements the core agent loop:

async def _process_sync(
    self,
    agent: ConversationalAgent,
    context: AgentContext,
    memory: Any,
    start_time: float,
) -> AgentResponse:
    response = AgentResponse()
    iteration = 0
    max_iterations = agent.config.max_iterations
 
    while iteration < max_iterations:
        iteration += 1
        context.status = AgentStatus.THINKING
 
        # Build LLM messages from system prompt + memory
        messages = self._build_messages(agent, context, memory)
        tools = self._tools.get_tool_definitions()
 
        # Call LLM
        llm_response = await self._llm.chat(
            messages=messages,
            tools=tools if tools else None,
            temperature=agent.config.temperature,
            max_tokens=agent.config.max_tokens,
        )
 
        response.tokens_used += llm_response.get("usage", {}).get(
            "total_tokens", 0
        )
 
        # Check for tool calls
        tool_calls = llm_response.get("tool_calls", [])
        if tool_calls:
            context.status = AgentStatus.EXECUTING
            for tc in tool_calls:
                # Build ToolCall object
                tool_call = ToolCall(
                    id=tc.get("id"),
                    name=tc["function"]["name"],
                    arguments=json.loads(tc["function"]["arguments"]),
                )
 
                # Check HITL approval requirement
                approval = await self._approval.check_and_request_approval(
                    session_id=context.session_id,
                    tenant_id=context.tenant_id,
                    agent_id=agent.id,
                    tool_call=tool_call,
                )
                if approval:
                    response.requires_approval = True
                    response.approval_request = approval
                    return response
 
                # Execute tool
                result = await self._tools.execute(
                    tool_call.name,
                    tool_call.arguments,
                )
                response.tool_results.append(result)
 
            # Continue loop to let LLM process tool results
            continue
 
        # No tool calls - LLM provided final response
        response.content = llm_response.get("content", "")
        break
 
    return response

Iteration Safety

The max_iterations parameter on AgentConfig prevents infinite tool-call loops. Each iteration represents one LLM call plus optional tool execution. If the iteration count reaches the maximum, the orchestrator returns whatever partial response is available.


Tool Registry

The ToolRegistry in agents/tools.py manages the set of tools available to agents:

class ToolRegistry:
    """Registry of tools available to agents."""
 
    def __init__(self) -> None:
        self._tools: dict[str, ToolDefinition] = {}
 
    def register(
        self,
        name: str,
        description: str,
        parameters: dict[str, Any],
        handler: Callable,
        requires_approval: bool = False,
    ) -> None:
        """Register a tool."""
        self._tools[name] = ToolDefinition(
            name=name,
            description=description,
            parameters=parameters,
            handler=handler,
            requires_approval=requires_approval,
        )
 
    def get_tool_definitions(self) -> list[dict[str, Any]]:
        """Get OpenAI-compatible tool definitions."""
        return [
            {
                "type": "function",
                "function": {
                    "name": tool.name,
                    "description": tool.description,
                    "parameters": tool.parameters,
                },
            }
            for tool in self._tools.values()
        ]
 
    async def execute(
        self,
        name: str,
        arguments: dict[str, Any],
    ) -> ToolResult:
        """Execute a tool by name."""
        tool = self._tools.get(name)
        if not tool:
            return ToolResult(
                tool_call_id=name,
                output="",
                error=f"Tool not found: {name}",
            )
        result = await tool.handler(**arguments)
        return ToolResult(
            tool_call_id=name,
            output=str(result),
        )

Default Tool Registry

The create_default_registry() function builds a registry with the standard platform tools:

Tool NameDescriptionApproval Required
execute_sqlExecute SQL query against the Query EngineNo
search_schemaSearch data catalog for tables and columnsNo
get_table_infoGet detailed table metadataNo
create_visualizationGenerate a chart from dataNo
export_dataExport query results to fileYes
modify_dashboardModify an existing dashboardYes
execute_actionExecute a data mutation actionYes

Memory Management

The MemoryManager in agents/memory_stores.py handles conversation history with dual-strategy memory:

ConversationBuffer Memory

Stores raw messages in a bounded buffer:

class ConversationBufferMemory:
    """Sliding window buffer of recent messages."""
 
    def __init__(self, max_messages: int = 50):
        self._messages: list[AgentMessage] = []
        self._max_messages = max_messages
 
    def add(self, message: AgentMessage) -> None:
        self._messages.append(message)
        if len(self._messages) > self._max_messages:
            self._messages.pop(0)
 
    def get_messages(self, limit: int | None = None) -> list[AgentMessage]:
        if limit:
            return self._messages[-limit:]
        return list(self._messages)

SummaryMemory

Maintains a running summary of older conversation history, using the LLM to compress messages beyond the buffer window:

class SummaryMemory:
    """Summarizes older conversation history."""
 
    def __init__(self, llm_client: LLMClient, max_summary_tokens: int = 500):
        self._llm = llm_client
        self._summary: str = ""
        self._max_tokens = max_summary_tokens
 
    async def update_summary(
        self,
        new_messages: list[AgentMessage],
    ) -> str:
        """Update summary with new messages."""
        messages_text = "\n".join(
            f"{m.role.value}: {m.content}" for m in new_messages
        )
        prompt = f"""Summarize this conversation, incorporating the previous summary.
 
Previous summary: {self._summary}
 
New messages:
{messages_text}
 
Updated summary:"""
 
        response = await self._llm.chat([{"role": "user", "content": prompt}])
        self._summary = response.get("content", "")
        return self._summary

HybridMemory

Combines buffer and summary memory for optimal context:

class HybridMemory:
    """Combines buffer and summary memory."""
 
    def __init__(
        self,
        buffer: ConversationBufferMemory,
        summary: SummaryMemory,
        buffer_threshold: int = 20,
    ):
        self._buffer = buffer
        self._summary = summary
        self._threshold = buffer_threshold
 
    async def get_context(self) -> list[dict[str, Any]]:
        """Get conversation context for LLM."""
        messages = self._buffer.get_messages()
 
        # If buffer is large, summarize older messages
        if len(messages) > self._threshold:
            older = messages[:-self._threshold]
            await self._summary.update_summary(older)
            recent = messages[-self._threshold:]
            return [
                {"role": "system", "content": f"Summary: {self._summary._summary}"},
                *[m.to_dict() for m in recent],
            ]
 
        return [m.to_dict() for m in messages]

Human-in-the-Loop (HITL)

The HITL subsystem in agents/hitl.py enables approval workflows for high-risk operations:

Approval Flow

User Message
    |
    v
Agent Processing
    |
    v
Tool Call Generated
    |
    v
Approval Check -----> Tool requires approval?
    |                      |
    No                    Yes
    |                      |
    v                      v
Execute Tool         Create ApprovalRequest
    |                      |
    v                      v
Return Response      Return to User
                          |
                          v
                    User Approves/Rejects
                          |
                    +-----+------+
                    |            |
                 Approved    Rejected
                    |            |
                    v            v
              Execute Tool   Cancel & Respond

ApprovalRequest

@dataclass
class ApprovalRequest:
    """Request for human approval of an action."""
    id: str
    session_id: str
    tenant_id: str
    agent_id: str
    tool_call: ToolCall
    reason: str                    # Why approval is needed
    risk_level: str                # low, medium, high, critical
    status: ApprovalStatus = ApprovalStatus.PENDING
    created_at: datetime = field(default_factory=datetime.utcnow)
    expires_at: datetime | None = None

Approval Policies

Approval requirements can be configured per tool, per agent, or per tenant:

Policy LevelConfiguration
Tool-levelrequires_approval=True in ToolRegistry registration
Agent-levelapproval_required=True in AgentConfig
Tenant-levelTenant configuration with approval policies
Risk-basedDynamic approval based on risk level assessment

MultiAgentExecutor

The MultiAgentExecutor in agents/multi_agent_router.py coordinates multiple specialized agents for complex queries that require different expertise.

Routing Strategies

class RoutingStrategy(str, Enum):
    """Strategy for routing tasks to agents."""
    SINGLE = "single"           # Route to single best agent
    PARALLEL = "parallel"       # Route to multiple agents in parallel
    SEQUENTIAL = "sequential"   # Route through agents sequentially
    HIERARCHICAL = "hierarchical"  # Supervisor decides routing
    CONSENSUS = "consensus"     # Multiple agents, aggregate results

Routing Decision

The AgentRouter uses the LLM to determine the optimal routing strategy:

class AgentRouter:
    """Routes tasks to appropriate specialized agents."""
 
    async def route(
        self,
        query: str,
        context: AgentContext,
    ) -> RoutingDecision:
        """Determine routing for a query."""
        # Get available agents and their capabilities
        agents = self._registry.list_agents()
        capabilities = [
            {
                "agent_id": a.id,
                "name": a.config.name,
                "capabilities": a.capabilities,
                "description": a.config.description,
            }
            for a in agents
        ]
 
        # Use LLM to decide routing
        response = await self._llm.chat([
            {"role": "system", "content": ROUTING_PROMPT},
            {"role": "user", "content": json.dumps({
                "query": query,
                "available_agents": capabilities,
            })},
        ])
 
        return RoutingDecision(
            query=query,
            strategy=RoutingStrategy(response["strategy"]),
            selected_agents=response["agents"],
            reasoning=response["reasoning"],
            confidence=response["confidence"],
        )

Execution Modes

Sequential Execution: Agents execute in order, each receiving the output of the previous:

async def _execute_sequential(
    self,
    agents: list[str],
    context: AgentContext,
) -> TaskExecution:
    accumulated_context = context
    for agent_id in agents:
        result = await self._execute_agent(agent_id, accumulated_context)
        accumulated_context = self._merge_context(
            accumulated_context, result
        )
    return TaskExecution(results=all_results)

Parallel Execution: Agents execute concurrently, results aggregated:

async def _execute_parallel(
    self,
    agents: list[str],
    context: AgentContext,
) -> TaskExecution:
    tasks = [
        self._execute_agent(agent_id, context)
        for agent_id in agents
    ]
    results = await asyncio.gather(*tasks, return_exceptions=True)
    return self._aggregate_results(results)

Hierarchical Execution: A supervisor agent decides which agents to invoke and how to combine their results.


LangGraph Integration

The AI Service integrates with LangGraph for stateful, graph-based workflows in agents/langgraph.py:

class StateGraph:
    """Defines a state machine workflow for agents."""
 
    def __init__(self, state_class: type):
        self._state_class = state_class
        self._nodes: dict[str, NodeType] = {}
        self._edges: list[tuple[str, str]] = []
        self._conditional_edges: dict[str, Callable] = {}
        self._entry_point: str | None = None
 
    def add_node(self, name: str, func: Callable) -> "StateGraph":
        """Add a processing node."""
        self._nodes[name] = NodeType(name=name, func=func)
        return self
 
    def add_edge(self, source: str, target: str) -> "StateGraph":
        """Add a directed edge between nodes."""
        self._edges.append((source, target))
        return self
 
    def add_conditional_edges(
        self,
        source: str,
        condition: Callable,
        mapping: dict[str, str],
    ) -> "StateGraph":
        """Add conditional routing from a node."""
        self._conditional_edges[source] = (condition, mapping)
        return self
 
    def set_entry_point(self, name: str) -> "StateGraph":
        """Set the entry point node."""
        self._entry_point = name
        return self
 
    def compile(self) -> "CompiledGraph":
        """Compile the graph for execution."""
        return CompiledGraph(self)

Example Workflow

# Define a multi-step analytics workflow
workflow = StateGraph(state_class=AnalyticsState)
 
workflow.add_node("classify", router_agent.classify)
workflow.add_node("generate_sql", sql_agent.generate)
workflow.add_node("execute_query", query_executor.execute)
workflow.add_node("analyze", analysis_agent.analyze)
workflow.add_node("visualize", viz_agent.generate)
 
workflow.set_entry_point("classify")
workflow.add_conditional_edges(
    "classify",
    router_agent.decide_next,
    {
        "query": "generate_sql",
        "analysis": "analyze",
        "help": "docs",
    },
)
workflow.add_edge("generate_sql", "execute_query")
workflow.add_edge("execute_query", "analyze")
workflow.add_edge("analyze", "visualize")
 
compiled = workflow.compile()
result = await compiled.invoke(initial_state)

Context Graph Integration

The orchestrator integrates with the Context Graph to capture agent thinking traces:

# Start thinking trace before processing
_thinking_trace_id = None
try:
    from context_graph.integration.orchestrator_hooks import (
        get_orchestrator_hooks,
    )
    _cg_hooks = get_orchestrator_hooks()
    if _cg_hooks and _cg_hooks.enabled:
        _thinking_trace_id = await _cg_hooks.on_agent_message_start(
            session_id, tenant_id, message, agent_id,
        )
except Exception:
    _cg_hooks = None
 
# ... process message ...
 
# Complete thinking trace after processing
try:
    if _thinking_trace_id and _cg_hooks:
        await _cg_hooks.on_agent_message_complete(
            _thinking_trace_id, success=bool(resp.content),
        )
except Exception:
    pass

This integration is entirely non-invasive. If the Context Graph subsystem is unavailable, the orchestrator continues processing without thinking trace capture.


Memory Guardrail Integration

The orchestrator supports an optional memory guardrail integration for PII protection and compliance:

# Sanitize user input before storing in memory
if self._memory_guardrail:
    write_result = await self._memory_guardrail.safe_memory_write(
        content=message,
        tenant_id=tenant_id,
        session_id=session_id,
        fact_type="conversation_turn",
    )
    if write_result.sanitized_content:
        safe_message = write_result.sanitized_content
 
# Periodic compliance check (every N messages)
if session_count % self._compliance_check_interval == 0:
    compliance = await self._memory_guardrail.check_memory_policy_compliance(
        tenant_id=tenant_id,
        session_id=session_id,
    )
    if compliance.escalation_needed:
        logger.warning(
            "memory_compliance_escalation",
            violations=compliance.violation_count,
        )

Performance Considerations

AspectStrategy
LLM latencyStreaming responses to reduce perceived latency
Tool executionParallel tool execution when tools are independent
Memory retrievalIn-memory buffer with Redis fallback
Context windowSummary memory to compress long conversations
Iteration limitsmax_iterations to prevent runaway loops
Token trackingPer-response token counting for cost management
Connection poolingShared LLM client connections across requests