MATIH Platform is in active MVP development. Documentation reflects current implementation status.
12. AI Service
Agent System
Multi-Agent Orchestrator

Multi-Agent Orchestrator

Production - Team creation, sequential/parallel/hierarchical execution, LangGraph workflow integration

The MultiAgentOrchestrator coordinates teams of specialized agents working together on complex tasks. It supports three collaboration modes (sequential, parallel, hierarchical) and integrates with LangGraph for declarative workflow definitions. Defined in data-plane/ai-service/src/agents/orchestrator.py.


12.2.2.1Class Architecture

class MultiAgentOrchestrator:
    """Orchestrates multiple agents working together."""
 
    def __init__(self, agent_orchestrator: AgentOrchestrator | None = None):
        self._orchestrator = agent_orchestrator or AgentOrchestrator()
        self._teams: dict[str, dict[str, Any]] = {}
        self._task_queues: dict[str, asyncio.Queue[MultiAgentTask]] = {}
        self._graphs: dict[str, CompiledGraph[dict[str, Any]]] = {}

The MultiAgentOrchestrator wraps a standard AgentOrchestrator and adds team management, task delegation, and workflow graph execution on top.


12.2.2.2Team Management

Creating Teams

team_id = multi_orchestrator.create_team(
    name="Analytics Team",
    supervisor_id="supervisor",
    member_ids=["sql_agent", "analysis_agent", "viz_agent"],
    collaboration_mode="hierarchical",  # sequential | parallel | hierarchical
)

Each team has:

  • A supervisor agent that coordinates work
  • Member agents that execute specialized tasks
  • A collaboration mode that determines execution strategy
  • A task queue for managing work items

Team Data Structure

{
    "id": "team-uuid",
    "name": "Analytics Team",
    "supervisor_id": "supervisor",
    "member_ids": ["sql_agent", "analysis_agent", "viz_agent"],
    "collaboration_mode": "hierarchical",
    "created_at": "2025-01-15T10:30:00Z"
}

12.2.2.3Collaboration Modes

Sequential Execution

Members execute one after another. Each member receives the same task description. Results accumulate across all members.

async def _execute_sequential(self, team, task):
    task.status = "in_progress"
    accumulated_result = {}
 
    for member_id in team["member_ids"]:
        response = await self._orchestrator.process_message(
            agent_id=member_id,
            session_id=f"task:{task.id}",
            tenant_id="system",
            message=task.description,
        )
        accumulated_result[member_id] = {
            "content": response.content,
            "tool_results": [{"name": r.name, "result": r.result} for r in response.tool_results],
        }
 
    task.result = accumulated_result
    task.status = "completed"
    return task

Parallel Execution

All members execute simultaneously using asyncio.gather. Ideal for independent subtasks where results can be combined.

async def _execute_parallel(self, team, task):
    async def process_agent(member_id):
        response = await self._orchestrator.process_message(
            agent_id=member_id,
            session_id=f"task:{task.id}:{member_id}",
            tenant_id="system",
            message=task.description,
        )
        return member_id, {"content": response.content}
 
    results = await asyncio.gather(
        *[process_agent(m) for m in team["member_ids"]]
    )
    task.result = dict(results)
    task.status = "completed"
    return task

Hierarchical Execution

The supervisor agent first analyzes the task and decides how to delegate. Currently falls back to sequential execution after supervisor analysis.

async def _execute_hierarchical(self, team, task):
    supervisor_response = await self._orchestrator.process_message(
        agent_id=team["supervisor_id"],
        session_id=f"task:{task.id}:supervisor",
        tenant_id="system",
        message=f"Analyze and delegate this task: {task.description}",
    )
    # Execute through selected members based on supervisor decision
    return await self._execute_sequential(team, task)

12.2.2.4Workflow Graphs

The MultiAgentOrchestrator supports LangGraph-style workflow definitions:

# Register a workflow
workflow = create_agent_graph()
graph_id = multi_orchestrator.register_workflow("default_workflow", workflow)
 
# Execute a workflow
result = await multi_orchestrator.execute_workflow(
    workflow_name="default_workflow",
    initial_state={"question": "Analyze Q3 sales trends"},
    session_id="session-123",
    tenant_id="acme-corp",
)

Workflows are compiled StateGraph instances that define nodes (agents) and edges (transitions):

from agents.langgraph import StateGraph, create_agent_graph
 
graph = StateGraph(dict)
graph.add_node("router", router_fn)
graph.add_node("sql_agent", sql_fn)
graph.add_node("analysis", analysis_fn)
graph.add_edge("router", "sql_agent")
graph.add_edge("sql_agent", "analysis")
compiled = graph.compile()

12.2.2.5Default Multi-Agent System

The create_multi_agent_system() factory creates a pre-configured system:

def create_multi_agent_system() -> MultiAgentOrchestrator:
    orchestrator = AgentOrchestrator()
 
    # Specialized agents
    sql_agent = ConversationalAgent(
        id="sql_agent",
        config=AgentConfig(
            name="SQL Agent",
            description="Specialized in SQL query generation and optimization",
            agent_type=AgentType.SPECIALIST,
            role=AgentRole.ANALYST,
            system_prompt="You are an expert SQL analyst.",
        ),
    )
 
    analysis_agent = ConversationalAgent(
        id="analysis_agent",
        config=AgentConfig(
            name="Analysis Agent",
            agent_type=AgentType.SPECIALIST,
            system_prompt="You are a data analyst.",
        ),
    )
 
    viz_agent = ConversationalAgent(
        id="viz_agent",
        config=AgentConfig(
            name="Visualization Agent",
            agent_type=AgentType.SPECIALIST,
            system_prompt="You are a visualization expert.",
        ),
    )
 
    supervisor_agent = ConversationalAgent(
        id="supervisor",
        config=AgentConfig(
            name="Supervisor",
            agent_type=AgentType.SUPERVISOR,
            role=AgentRole.COORDINATOR,
            system_prompt="You coordinate a team of specialized agents.",
        ),
    )
 
    # Register all agents and create team
    multi_orchestrator = MultiAgentOrchestrator(orchestrator)
    multi_orchestrator.create_team(
        name="Analytics Team",
        supervisor_id="supervisor",
        member_ids=["sql_agent", "analysis_agent", "viz_agent"],
        collaboration_mode="hierarchical",
    )
    return multi_orchestrator

12.2.2.6API Usage

Delegate a Task

curl -X POST http://localhost:8000/api/v1/agents/teams/{team_id}/delegate \
  -H "Content-Type: application/json" \
  -H "X-Tenant-ID: acme-corp" \
  -d '{
    "description": "Analyze the correlation between marketing spend and revenue growth",
    "priority": "high"
  }'

Response:

{
  "task_id": "task-uuid-789",
  "status": "completed",
  "result": {
    "sql_agent": {
      "content": "Generated SQL query for marketing/revenue data...",
      "tool_results": [{"name": "execute_sql", "result": {...}}]
    },
    "analysis_agent": {
      "content": "The correlation coefficient is 0.82, indicating strong positive correlation..."
    },
    "viz_agent": {
      "content": "Created a scatter plot with trend line..."
    }
  },
  "started_at": "2025-01-15T10:30:00Z",
  "completed_at": "2025-01-15T10:30:12Z"
}