Multi-Agent Orchestrator
The MultiAgentOrchestrator coordinates teams of specialized agents working together on complex tasks. It supports three collaboration modes (sequential, parallel, hierarchical) and integrates with LangGraph for declarative workflow definitions. Defined in data-plane/ai-service/src/agents/orchestrator.py.
12.2.2.1Class Architecture
class MultiAgentOrchestrator:
"""Orchestrates multiple agents working together."""
def __init__(self, agent_orchestrator: AgentOrchestrator | None = None):
self._orchestrator = agent_orchestrator or AgentOrchestrator()
self._teams: dict[str, dict[str, Any]] = {}
self._task_queues: dict[str, asyncio.Queue[MultiAgentTask]] = {}
self._graphs: dict[str, CompiledGraph[dict[str, Any]]] = {}The MultiAgentOrchestrator wraps a standard AgentOrchestrator and adds team management, task delegation, and workflow graph execution on top.
12.2.2.2Team Management
Creating Teams
team_id = multi_orchestrator.create_team(
name="Analytics Team",
supervisor_id="supervisor",
member_ids=["sql_agent", "analysis_agent", "viz_agent"],
collaboration_mode="hierarchical", # sequential | parallel | hierarchical
)Each team has:
- A supervisor agent that coordinates work
- Member agents that execute specialized tasks
- A collaboration mode that determines execution strategy
- A task queue for managing work items
Team Data Structure
{
"id": "team-uuid",
"name": "Analytics Team",
"supervisor_id": "supervisor",
"member_ids": ["sql_agent", "analysis_agent", "viz_agent"],
"collaboration_mode": "hierarchical",
"created_at": "2025-01-15T10:30:00Z"
}12.2.2.3Collaboration Modes
Sequential Execution
Members execute one after another. Each member receives the same task description. Results accumulate across all members.
async def _execute_sequential(self, team, task):
task.status = "in_progress"
accumulated_result = {}
for member_id in team["member_ids"]:
response = await self._orchestrator.process_message(
agent_id=member_id,
session_id=f"task:{task.id}",
tenant_id="system",
message=task.description,
)
accumulated_result[member_id] = {
"content": response.content,
"tool_results": [{"name": r.name, "result": r.result} for r in response.tool_results],
}
task.result = accumulated_result
task.status = "completed"
return taskParallel Execution
All members execute simultaneously using asyncio.gather. Ideal for independent subtasks where results can be combined.
async def _execute_parallel(self, team, task):
async def process_agent(member_id):
response = await self._orchestrator.process_message(
agent_id=member_id,
session_id=f"task:{task.id}:{member_id}",
tenant_id="system",
message=task.description,
)
return member_id, {"content": response.content}
results = await asyncio.gather(
*[process_agent(m) for m in team["member_ids"]]
)
task.result = dict(results)
task.status = "completed"
return taskHierarchical Execution
The supervisor agent first analyzes the task and decides how to delegate. Currently falls back to sequential execution after supervisor analysis.
async def _execute_hierarchical(self, team, task):
supervisor_response = await self._orchestrator.process_message(
agent_id=team["supervisor_id"],
session_id=f"task:{task.id}:supervisor",
tenant_id="system",
message=f"Analyze and delegate this task: {task.description}",
)
# Execute through selected members based on supervisor decision
return await self._execute_sequential(team, task)12.2.2.4Workflow Graphs
The MultiAgentOrchestrator supports LangGraph-style workflow definitions:
# Register a workflow
workflow = create_agent_graph()
graph_id = multi_orchestrator.register_workflow("default_workflow", workflow)
# Execute a workflow
result = await multi_orchestrator.execute_workflow(
workflow_name="default_workflow",
initial_state={"question": "Analyze Q3 sales trends"},
session_id="session-123",
tenant_id="acme-corp",
)Workflows are compiled StateGraph instances that define nodes (agents) and edges (transitions):
from agents.langgraph import StateGraph, create_agent_graph
graph = StateGraph(dict)
graph.add_node("router", router_fn)
graph.add_node("sql_agent", sql_fn)
graph.add_node("analysis", analysis_fn)
graph.add_edge("router", "sql_agent")
graph.add_edge("sql_agent", "analysis")
compiled = graph.compile()12.2.2.5Default Multi-Agent System
The create_multi_agent_system() factory creates a pre-configured system:
def create_multi_agent_system() -> MultiAgentOrchestrator:
orchestrator = AgentOrchestrator()
# Specialized agents
sql_agent = ConversationalAgent(
id="sql_agent",
config=AgentConfig(
name="SQL Agent",
description="Specialized in SQL query generation and optimization",
agent_type=AgentType.SPECIALIST,
role=AgentRole.ANALYST,
system_prompt="You are an expert SQL analyst.",
),
)
analysis_agent = ConversationalAgent(
id="analysis_agent",
config=AgentConfig(
name="Analysis Agent",
agent_type=AgentType.SPECIALIST,
system_prompt="You are a data analyst.",
),
)
viz_agent = ConversationalAgent(
id="viz_agent",
config=AgentConfig(
name="Visualization Agent",
agent_type=AgentType.SPECIALIST,
system_prompt="You are a visualization expert.",
),
)
supervisor_agent = ConversationalAgent(
id="supervisor",
config=AgentConfig(
name="Supervisor",
agent_type=AgentType.SUPERVISOR,
role=AgentRole.COORDINATOR,
system_prompt="You coordinate a team of specialized agents.",
),
)
# Register all agents and create team
multi_orchestrator = MultiAgentOrchestrator(orchestrator)
multi_orchestrator.create_team(
name="Analytics Team",
supervisor_id="supervisor",
member_ids=["sql_agent", "analysis_agent", "viz_agent"],
collaboration_mode="hierarchical",
)
return multi_orchestrator12.2.2.6API Usage
Delegate a Task
curl -X POST http://localhost:8000/api/v1/agents/teams/{team_id}/delegate \
-H "Content-Type: application/json" \
-H "X-Tenant-ID: acme-corp" \
-d '{
"description": "Analyze the correlation between marketing spend and revenue growth",
"priority": "high"
}'Response:
{
"task_id": "task-uuid-789",
"status": "completed",
"result": {
"sql_agent": {
"content": "Generated SQL query for marketing/revenue data...",
"tool_results": [{"name": "execute_sql", "result": {...}}]
},
"analysis_agent": {
"content": "The correlation coefficient is 0.82, indicating strong positive correlation..."
},
"viz_agent": {
"content": "Created a scatter plot with trend line..."
}
},
"started_at": "2025-01-15T10:30:00Z",
"completed_at": "2025-01-15T10:30:12Z"
}