MATIH Platform is in active MVP development. Documentation reflects current implementation status.
12. AI Service
Agent System
Agent Orchestrator

Agent Orchestrator

Production - AgentOrchestrator with LLM client, tool execution, memory, HITL, and Context Graph integration

The AgentOrchestrator is the primary execution engine for all agent interactions in the AI Service. It coordinates LLM calls, tool execution, memory management, approval workflows, and streaming responses. Defined in data-plane/ai-service/src/agents/orchestrator.py, it is the single entry point for processing user messages through any registered agent.


12.2.1.1Class Architecture

AgentOrchestrator Constructor
src/agents/orchestrator.py
The orchestrator accepts optional dependencies for LLM, tools, memory, approval, and guardrails. All have sensible defaults.
class AgentOrchestrator:
    """Orchestrates agent execution with tools, memory, and HITL."""
 
    def __init__(
        self,
        llm_client: LLMClient | None = None,
        tool_registry: ToolRegistry | None = None,
        memory_manager: MemoryManager | None = None,
        approval_handler: ApprovalHandler | None = None,
        memory_guardrail_integration: Any | None = None,
    ) -> None:
        self._llm = llm_client
        self._tools = tool_registry or create_default_registry()
        self._memory = memory_manager or MemoryManager()
        self._approval = approval_handler or get_approval_handler()
        self._memory_guardrail = memory_guardrail_integration
        self._agents: dict[str, ConversationalAgent] = {}
        self._active_contexts: dict[str, AgentContext] = {}
        self._message_count: dict[str, int] = {}
        self._compliance_check_interval = 10

Key Dependencies

DependencyTypePurpose
LLMClientRequired (warns if None)Wraps OpenAI/Anthropic/vLLM providers for chat and streaming
ToolRegistryOptional (defaults to built-in)Manages available tools and their execution
MemoryManagerOptional (defaults to in-memory)Manages per-session conversation memory
ApprovalHandlerOptional (defaults to global)Routes tool calls through HITL approval when required
memory_guardrail_integrationOptionalSanitizes content before writing to memory

12.2.1.2Message Processing Pipeline

The process_message method is the main entry point. It accepts an agent ID, session ID, tenant ID, and user message, then routes through either sync or streaming processing.

async def process_message(
    self,
    agent_id: str,
    session_id: str,
    tenant_id: str,
    message: str,
    user_id: str | None = None,
    stream: bool = False,
) -> AgentResponse | AsyncIterator[StreamChunk]:

Processing Steps

  1. Agent Lookup: Retrieves the registered ConversationalAgent by ID
  2. Context Creation: Gets or creates an AgentContext for the session
  3. Memory Guardrail: Sanitizes user input before storage (PII removal, injection defense)
  4. Message Storage: Adds the user message to HybridMemory
  5. Compliance Check: Every 10 messages, runs a policy compliance audit
  6. Thinking Trace: Starts a Context Graph thinking trace via orchestrator_hooks
  7. LLM Processing: Routes to _process_sync or _process_streaming
  8. Trace Completion: Records success/failure in the thinking trace

Sync Processing Loop

The _process_sync method implements the ReAct (Reason + Act) loop:

async def _process_sync(self, agent, context, memory, start_time):
    response = AgentResponse()
    iteration = 0
    max_iterations = agent.config.max_iterations  # Default: 10
 
    while iteration < max_iterations:
        iteration += 1
        context.status = AgentStatus.THINKING
 
        # Build messages from system prompt + memory + history
        messages = self._build_messages(agent, context, memory)
        tools = self._tools.get_tool_definitions()
 
        # Call LLM
        llm_response = await self._llm.chat(
            messages=messages,
            tools=tools if tools else None,
            temperature=agent.config.temperature,
            max_tokens=agent.config.max_tokens,
        )
 
        # If tool calls: execute and continue loop
        tool_calls = llm_response.get("tool_calls", [])
        if tool_calls:
            context.status = AgentStatus.EXECUTING
            for tc in tool_calls:
                # Check approval, execute tool, store result
                ...
            continue  # Next iteration with tool results
 
        # No tool calls: final text response
        response.content = llm_response.get("content", "")
        break
 
    return response

12.2.1.3Tool Execution with Approval

When the LLM returns tool calls, each one passes through the approval pipeline:

for tc in tool_calls:
    tool_call = ToolCall(
        id=tc.get("id", str(uuid4())),
        name=tc.get("function", {}).get("name", ""),
        arguments=json.loads(tc.get("function", {}).get("arguments", "{}")),
    )
 
    # Check if approval required
    approval_request = await self._approval.check_and_request_approval(
        session_id=context.session_id,
        tenant_id=context.tenant_id,
        agent_id=agent.id,
        tool_call=tool_call,
    )
 
    if approval_request:
        response.requires_approval = True
        response.approval_request = approval_request
        context.status = AgentStatus.WAITING_APPROVAL
 
        # Block until approval decision
        await self._approval.workflow.wait_for_approval(approval_request.id)
        approval_request = self._approval.workflow.get_request(approval_request.id)
 
        if approval_request.status != ApprovalStatus.APPROVED:
            result = ToolResult(
                tool_call_id=tool_call.id,
                name=tool_call.name,
                error=f"Action rejected: {approval_request.status.value}",
                approved=False,
            )
            response.tool_results.append(result)
            continue
 
    # Execute approved tool
    result = await self._tools.execute_tool(tool_call, check_approval=False)
    response.tool_results.append(result)

12.2.1.4LLM Client Abstraction

The LLMClient class wraps the underlying provider implementations:

class LLMClient:
    def __init__(self, provider: str = "openai", model: str | None = None, **kwargs):
        from agents.llm_providers import get_llm_client, LLMMessage
        self._client = get_llm_client(provider=provider, model=model, **kwargs)
 
    async def chat(self, messages, tools=None, temperature=0.7, max_tokens=4096, stream=False):
        llm_messages = [LLMMessage(role=m["role"], content=m["content"], ...) for m in messages]
        response = await self._client.chat(messages=llm_messages, tools=tools, ...)
        return {
            "content": response.content,
            "tool_calls": response.tool_calls,
            "usage": response.usage,
            "finish_reason": response.finish_reason,
        }
 
    async def stream_chat(self, messages, tools=None, temperature=0.7, max_tokens=4096):
        async for token in self._client.stream_chat(messages=llm_messages, ...):
            yield {
                "type": token.type,       # "text", "tool_call", "done"
                "content": token.content,
                "tool_call": token.tool_call,
                "finish_reason": token.finish_reason,
            }

12.2.1.5Streaming Response

The _process_streaming method yields StreamChunk objects for real-time delivery:

@dataclass
class StreamChunk:
    type: str       # text, tool_call, tool_result, status, error, done
    content: str = ""
    data: dict[str, Any] = field(default_factory=dict)
    is_final: bool = False

Stream chunk types:

TypeDescription
statusAgent state change (thinking, executing_tools)
textPartial text content from LLM
tool_callTool call initiated
tool_resultTool execution completed
doneFinal chunk with accumulated content
errorError occurred during processing

WebSocket Streaming Example

const ws = new WebSocket('ws://localhost:8000/api/v1/ws/agent');
ws.onopen = () => {
  ws.send(JSON.stringify({
    type: 'message',
    agent_id: 'default-assistant',
    session_id: 'session-123',
    tenant_id: 'acme-corp',
    message: 'What were our top products last month?',
    stream: true
  }));
};
ws.onmessage = (event) => {
  const chunk = JSON.parse(event.data);
  switch (chunk.type) {
    case 'text':
      process.stdout.write(chunk.content);
      break;
    case 'tool_call':
      console.log('Tool:', chunk.data.name);
      break;
    case 'done':
      console.log('\nComplete.');
      break;
  }
};

12.2.1.6Memory Guardrails

The orchestrator integrates memory guardrails at three points:

  1. Input sanitization: Before storing user messages
  2. Output sanitization: Before storing assistant responses
  3. Periodic compliance: Every N messages (configurable via _compliance_check_interval)
# Input guardrail
if self._memory_guardrail:
    write_result = await self._memory_guardrail.safe_memory_write(
        content=message,
        tenant_id=tenant_id,
        session_id=session_id,
        fact_type="conversation_turn",
    )
    if write_result.sanitized_content:
        safe_message = write_result.sanitized_content
 
# Periodic compliance check
session_count = self._message_count.get(session_id, 0) + 1
if session_count % self._compliance_check_interval == 0:
    compliance = await self._memory_guardrail.check_memory_policy_compliance(
        tenant_id=tenant_id,
        session_id=session_id,
    )
    if compliance.escalation_needed:
        logger.warning("memory_compliance_escalation", ...)

12.2.1.7Context Graph Integration

The orchestrator hooks into the Context Graph to record agent thinking traces:

# Start trace
from context_graph.integration.orchestrator_hooks import get_orchestrator_hooks
_cg_hooks = get_orchestrator_hooks()
if _cg_hooks and _cg_hooks.enabled:
    _thinking_trace_id = await _cg_hooks.on_agent_message_start(
        session_id, tenant_id, message, agent_id,
    )
 
# After processing
if _thinking_trace_id and _cg_hooks:
    await _cg_hooks.on_agent_message_complete(
        _thinking_trace_id, success=bool(resp.content),
    )

This creates a persistent record in Dgraph of every agent reasoning step, enabling post-hoc analysis and debugging.


12.2.1.8Default Agent Configuration

The create_default_orchestrator() factory creates a pre-configured orchestrator with a general-purpose assistant:

def create_default_orchestrator() -> AgentOrchestrator:
    orchestrator = AgentOrchestrator(memory_guardrail_integration=memory_guardrail)
 
    default_agent = ConversationalAgent(
        tenant_id="default",
        config=AgentConfig(
            name="assistant",
            description="General purpose AI assistant",
            agent_type=AgentType.CONVERSATIONAL,
            role=AgentRole.ANALYST,
            system_prompt="""You are a helpful AI assistant for data analysis...
You can help users:
- Query data using SQL
- Analyze data and find insights
- Create visualizations
- Search knowledge bases
- Answer questions about their data""",
            max_iterations=10,
            timeout_seconds=300,
            memory_type="hybrid",
        ),
    )
    orchestrator.register_agent(default_agent)
    return orchestrator

Singleton Access

from src.agents.orchestrator import get_orchestrator
 
orchestrator = get_orchestrator()  # Returns global singleton

12.2.1.9Error Handling

The orchestrator implements comprehensive error handling:

Error TypeBehavior
Agent not foundRaises ValueError with agent ID
No LLM clientRaises RuntimeError with configuration guidance
LLM call failureCatches exception, sets AgentStatus.FAILED, returns error in AgentResponse
Tool execution failureCaptures error in ToolResult.error, continues processing
Memory save failureLogs warning, does not fail the request
Streaming errorYields StreamChunk(type="error") with exception details
except Exception as e:
    logger.exception("agent_processing_error", agent_id=agent.id)
    context.status = AgentStatus.FAILED
    response.content = f"An error occurred: {str(e)}"
    response.metadata["error"] = str(e)