Agent Orchestrator
The Agent Orchestrator is the central coordination layer of the AI Service, responsible for routing user messages through specialized agents, managing conversation memory, executing tools, enforcing human-in-the-loop approval workflows, and integrating with LangGraph for complex multi-step workflows. This section provides a deep dive into the AgentOrchestrator, MultiAgentExecutor, and their supporting subsystems.
Core Data Models
Before examining the orchestrator itself, it is essential to understand the data models that define the agent ecosystem. These are defined in agents/models.py.
Agent Types and Roles
class AgentType(str, Enum):
"""Types of agents in the system."""
CONVERSATIONAL = "conversational" # Multi-turn conversation
TASK = "task" # Single-task execution
SUPERVISOR = "supervisor" # Supervises other agents
SPECIALIST = "specialist" # Domain-specific expertise
ROUTER = "router" # Routes to other agents
class AgentRole(str, Enum):
"""Agent roles in multi-agent system."""
COORDINATOR = "coordinator" # Coordinates multi-agent workflows
ANALYST = "analyst" # Data analysis
RESEARCHER = "researcher" # Information retrieval
EXECUTOR = "executor" # Action execution
VALIDATOR = "validator" # Output validation
SUMMARIZER = "summarizer" # Result summarizationAgent Status Lifecycle
Agents transition through well-defined states during execution:
IDLE --> THINKING --> EXECUTING --> COMPLETED
| | | |
| | v |
| | WAITING_APPROVAL |
| | | |
| v v v
+-------> FAILED <-----+-------------+| Status | Description |
|---|---|
IDLE | Agent is registered but not currently processing |
THINKING | Agent is building context and calling the LLM |
EXECUTING | Agent is executing tool calls |
WAITING_APPROVAL | Execution paused pending human approval |
COMPLETED | Processing finished successfully |
FAILED | Processing encountered an unrecoverable error |
Agent Configuration
Each agent is configured through an AgentConfig dataclass:
@dataclass
class AgentConfig:
"""Configuration for an agent."""
name: str
description: str
system_prompt: str
agent_type: AgentType = AgentType.CONVERSATIONAL
role: AgentRole = AgentRole.ANALYST
temperature: float = 0.7
max_tokens: int = 4096
max_iterations: int = 10 # Maximum tool-call loops
tools_enabled: bool = True
memory_enabled: bool = True
approval_required: bool = FalseMessage Protocol
Messages flowing through the orchestrator use a structured format:
@dataclass
class AgentMessage:
"""A message in agent conversation."""
id: str # Unique message ID
role: MessageRole # USER, ASSISTANT, SYSTEM, TOOL, HUMAN
content: str # Message content
name: str | None = None # Tool name (for tool messages)
tool_call_id: str | None = None # Tool call reference
metadata: dict[str, Any] = field(default_factory=dict)
timestamp: datetime = field(default_factory=datetime.utcnow)AgentOrchestrator
The AgentOrchestrator class in agents/orchestrator.py is the primary entry point for processing user messages. It integrates four core subsystems: LLM communication, tool execution, memory management, and HITL approval.
Constructor
class AgentOrchestrator:
"""Orchestrates agent execution with tools, memory, and HITL."""
def __init__(
self,
llm_client: LLMClient | None = None,
tool_registry: ToolRegistry | None = None,
memory_manager: MemoryManager | None = None,
approval_handler: ApprovalHandler | None = None,
memory_guardrail_integration: Any | None = None,
) -> None:
self._llm = llm_client
self._tools = tool_registry or create_default_registry()
self._memory = memory_manager or MemoryManager()
self._approval = approval_handler or get_approval_handler()
self._memory_guardrail = memory_guardrail_integration
self._agents: dict[str, ConversationalAgent] = {}
self._active_contexts: dict[str, AgentContext] = {}
self._message_count: dict[str, int] = {}
self._compliance_check_interval = 10Agent Registration
Agents are registered with the orchestrator before they can process messages:
orchestrator = AgentOrchestrator(llm_client=llm)
# Register a data analyst agent
analyst = ConversationalAgent(
id="data-analyst",
config=AgentConfig(
name="Data Analyst",
description="Analyzes data and generates insights",
system_prompt="You are a data analyst for enterprise data...",
role=AgentRole.ANALYST,
temperature=0.3,
max_iterations=5,
),
)
orchestrator.register_agent(analyst)Message Processing Pipeline
The process_message method is the main entry point. It executes a multi-step pipeline:
async def process_message(
self,
agent_id: str,
session_id: str,
tenant_id: str,
message: str,
user_id: str | None = None,
stream: bool = False,
) -> AgentResponse | AsyncIterator[StreamChunk]:The pipeline consists of these steps:
- Agent lookup: Find the registered agent by ID
- Context retrieval: Get or create the
AgentContextfor the session - Memory loading: Retrieve conversation history from
MemoryManager - Guardrail check: Sanitize input through memory guardrails if configured
- Memory write: Add the user message to conversation memory
- Compliance check: Periodic policy compliance verification (every N messages)
- Context Graph trace start: Begin thinking trace capture via orchestrator hooks
- LLM processing: Send messages to the LLM with tool definitions
- Tool execution loop: Execute tool calls and feed results back to LLM
- HITL check: Pause for approval if the tool call requires it
- Response assembly: Build the final
AgentResponse - Context Graph trace complete: Record thinking trace outcome
The Response Object
@dataclass
class AgentResponse:
"""Response from agent processing."""
id: str # Unique response ID
content: str = "" # Text response
tool_calls: list[ToolCall] = field(...) # Tool calls made
tool_results: list[ToolResult] = field(...) # Tool execution results
reasoning: str | None = None # Agent's reasoning (if exposed)
requires_approval: bool = False # Whether HITL approval needed
approval_request: ApprovalRequest | None = None
metadata: dict[str, Any] = field(...)
execution_time_ms: float = 0.0
tokens_used: int = 0Synchronous Processing
The _process_sync method implements the core agent loop:
async def _process_sync(
self,
agent: ConversationalAgent,
context: AgentContext,
memory: Any,
start_time: float,
) -> AgentResponse:
response = AgentResponse()
iteration = 0
max_iterations = agent.config.max_iterations
while iteration < max_iterations:
iteration += 1
context.status = AgentStatus.THINKING
# Build LLM messages from system prompt + memory
messages = self._build_messages(agent, context, memory)
tools = self._tools.get_tool_definitions()
# Call LLM
llm_response = await self._llm.chat(
messages=messages,
tools=tools if tools else None,
temperature=agent.config.temperature,
max_tokens=agent.config.max_tokens,
)
response.tokens_used += llm_response.get("usage", {}).get(
"total_tokens", 0
)
# Check for tool calls
tool_calls = llm_response.get("tool_calls", [])
if tool_calls:
context.status = AgentStatus.EXECUTING
for tc in tool_calls:
# Build ToolCall object
tool_call = ToolCall(
id=tc.get("id"),
name=tc["function"]["name"],
arguments=json.loads(tc["function"]["arguments"]),
)
# Check HITL approval requirement
approval = await self._approval.check_and_request_approval(
session_id=context.session_id,
tenant_id=context.tenant_id,
agent_id=agent.id,
tool_call=tool_call,
)
if approval:
response.requires_approval = True
response.approval_request = approval
return response
# Execute tool
result = await self._tools.execute(
tool_call.name,
tool_call.arguments,
)
response.tool_results.append(result)
# Continue loop to let LLM process tool results
continue
# No tool calls - LLM provided final response
response.content = llm_response.get("content", "")
break
return responseIteration Safety
The max_iterations parameter on AgentConfig prevents infinite tool-call loops. Each iteration represents one LLM call plus optional tool execution. If the iteration count reaches the maximum, the orchestrator returns whatever partial response is available.
Tool Registry
The ToolRegistry in agents/tools.py manages the set of tools available to agents:
class ToolRegistry:
"""Registry of tools available to agents."""
def __init__(self) -> None:
self._tools: dict[str, ToolDefinition] = {}
def register(
self,
name: str,
description: str,
parameters: dict[str, Any],
handler: Callable,
requires_approval: bool = False,
) -> None:
"""Register a tool."""
self._tools[name] = ToolDefinition(
name=name,
description=description,
parameters=parameters,
handler=handler,
requires_approval=requires_approval,
)
def get_tool_definitions(self) -> list[dict[str, Any]]:
"""Get OpenAI-compatible tool definitions."""
return [
{
"type": "function",
"function": {
"name": tool.name,
"description": tool.description,
"parameters": tool.parameters,
},
}
for tool in self._tools.values()
]
async def execute(
self,
name: str,
arguments: dict[str, Any],
) -> ToolResult:
"""Execute a tool by name."""
tool = self._tools.get(name)
if not tool:
return ToolResult(
tool_call_id=name,
output="",
error=f"Tool not found: {name}",
)
result = await tool.handler(**arguments)
return ToolResult(
tool_call_id=name,
output=str(result),
)Default Tool Registry
The create_default_registry() function builds a registry with the standard platform tools:
| Tool Name | Description | Approval Required |
|---|---|---|
execute_sql | Execute SQL query against the Query Engine | No |
search_schema | Search data catalog for tables and columns | No |
get_table_info | Get detailed table metadata | No |
create_visualization | Generate a chart from data | No |
export_data | Export query results to file | Yes |
modify_dashboard | Modify an existing dashboard | Yes |
execute_action | Execute a data mutation action | Yes |
Memory Management
The MemoryManager in agents/memory_stores.py handles conversation history with dual-strategy memory:
ConversationBuffer Memory
Stores raw messages in a bounded buffer:
class ConversationBufferMemory:
"""Sliding window buffer of recent messages."""
def __init__(self, max_messages: int = 50):
self._messages: list[AgentMessage] = []
self._max_messages = max_messages
def add(self, message: AgentMessage) -> None:
self._messages.append(message)
if len(self._messages) > self._max_messages:
self._messages.pop(0)
def get_messages(self, limit: int | None = None) -> list[AgentMessage]:
if limit:
return self._messages[-limit:]
return list(self._messages)SummaryMemory
Maintains a running summary of older conversation history, using the LLM to compress messages beyond the buffer window:
class SummaryMemory:
"""Summarizes older conversation history."""
def __init__(self, llm_client: LLMClient, max_summary_tokens: int = 500):
self._llm = llm_client
self._summary: str = ""
self._max_tokens = max_summary_tokens
async def update_summary(
self,
new_messages: list[AgentMessage],
) -> str:
"""Update summary with new messages."""
messages_text = "\n".join(
f"{m.role.value}: {m.content}" for m in new_messages
)
prompt = f"""Summarize this conversation, incorporating the previous summary.
Previous summary: {self._summary}
New messages:
{messages_text}
Updated summary:"""
response = await self._llm.chat([{"role": "user", "content": prompt}])
self._summary = response.get("content", "")
return self._summaryHybridMemory
Combines buffer and summary memory for optimal context:
class HybridMemory:
"""Combines buffer and summary memory."""
def __init__(
self,
buffer: ConversationBufferMemory,
summary: SummaryMemory,
buffer_threshold: int = 20,
):
self._buffer = buffer
self._summary = summary
self._threshold = buffer_threshold
async def get_context(self) -> list[dict[str, Any]]:
"""Get conversation context for LLM."""
messages = self._buffer.get_messages()
# If buffer is large, summarize older messages
if len(messages) > self._threshold:
older = messages[:-self._threshold]
await self._summary.update_summary(older)
recent = messages[-self._threshold:]
return [
{"role": "system", "content": f"Summary: {self._summary._summary}"},
*[m.to_dict() for m in recent],
]
return [m.to_dict() for m in messages]Human-in-the-Loop (HITL)
The HITL subsystem in agents/hitl.py enables approval workflows for high-risk operations:
Approval Flow
User Message
|
v
Agent Processing
|
v
Tool Call Generated
|
v
Approval Check -----> Tool requires approval?
| |
No Yes
| |
v v
Execute Tool Create ApprovalRequest
| |
v v
Return Response Return to User
|
v
User Approves/Rejects
|
+-----+------+
| |
Approved Rejected
| |
v v
Execute Tool Cancel & RespondApprovalRequest
@dataclass
class ApprovalRequest:
"""Request for human approval of an action."""
id: str
session_id: str
tenant_id: str
agent_id: str
tool_call: ToolCall
reason: str # Why approval is needed
risk_level: str # low, medium, high, critical
status: ApprovalStatus = ApprovalStatus.PENDING
created_at: datetime = field(default_factory=datetime.utcnow)
expires_at: datetime | None = NoneApproval Policies
Approval requirements can be configured per tool, per agent, or per tenant:
| Policy Level | Configuration |
|---|---|
| Tool-level | requires_approval=True in ToolRegistry registration |
| Agent-level | approval_required=True in AgentConfig |
| Tenant-level | Tenant configuration with approval policies |
| Risk-based | Dynamic approval based on risk level assessment |
MultiAgentExecutor
The MultiAgentExecutor in agents/multi_agent_router.py coordinates multiple specialized agents for complex queries that require different expertise.
Routing Strategies
class RoutingStrategy(str, Enum):
"""Strategy for routing tasks to agents."""
SINGLE = "single" # Route to single best agent
PARALLEL = "parallel" # Route to multiple agents in parallel
SEQUENTIAL = "sequential" # Route through agents sequentially
HIERARCHICAL = "hierarchical" # Supervisor decides routing
CONSENSUS = "consensus" # Multiple agents, aggregate resultsRouting Decision
The AgentRouter uses the LLM to determine the optimal routing strategy:
class AgentRouter:
"""Routes tasks to appropriate specialized agents."""
async def route(
self,
query: str,
context: AgentContext,
) -> RoutingDecision:
"""Determine routing for a query."""
# Get available agents and their capabilities
agents = self._registry.list_agents()
capabilities = [
{
"agent_id": a.id,
"name": a.config.name,
"capabilities": a.capabilities,
"description": a.config.description,
}
for a in agents
]
# Use LLM to decide routing
response = await self._llm.chat([
{"role": "system", "content": ROUTING_PROMPT},
{"role": "user", "content": json.dumps({
"query": query,
"available_agents": capabilities,
})},
])
return RoutingDecision(
query=query,
strategy=RoutingStrategy(response["strategy"]),
selected_agents=response["agents"],
reasoning=response["reasoning"],
confidence=response["confidence"],
)Execution Modes
Sequential Execution: Agents execute in order, each receiving the output of the previous:
async def _execute_sequential(
self,
agents: list[str],
context: AgentContext,
) -> TaskExecution:
accumulated_context = context
for agent_id in agents:
result = await self._execute_agent(agent_id, accumulated_context)
accumulated_context = self._merge_context(
accumulated_context, result
)
return TaskExecution(results=all_results)Parallel Execution: Agents execute concurrently, results aggregated:
async def _execute_parallel(
self,
agents: list[str],
context: AgentContext,
) -> TaskExecution:
tasks = [
self._execute_agent(agent_id, context)
for agent_id in agents
]
results = await asyncio.gather(*tasks, return_exceptions=True)
return self._aggregate_results(results)Hierarchical Execution: A supervisor agent decides which agents to invoke and how to combine their results.
LangGraph Integration
The AI Service integrates with LangGraph for stateful, graph-based workflows in agents/langgraph.py:
class StateGraph:
"""Defines a state machine workflow for agents."""
def __init__(self, state_class: type):
self._state_class = state_class
self._nodes: dict[str, NodeType] = {}
self._edges: list[tuple[str, str]] = []
self._conditional_edges: dict[str, Callable] = {}
self._entry_point: str | None = None
def add_node(self, name: str, func: Callable) -> "StateGraph":
"""Add a processing node."""
self._nodes[name] = NodeType(name=name, func=func)
return self
def add_edge(self, source: str, target: str) -> "StateGraph":
"""Add a directed edge between nodes."""
self._edges.append((source, target))
return self
def add_conditional_edges(
self,
source: str,
condition: Callable,
mapping: dict[str, str],
) -> "StateGraph":
"""Add conditional routing from a node."""
self._conditional_edges[source] = (condition, mapping)
return self
def set_entry_point(self, name: str) -> "StateGraph":
"""Set the entry point node."""
self._entry_point = name
return self
def compile(self) -> "CompiledGraph":
"""Compile the graph for execution."""
return CompiledGraph(self)Example Workflow
# Define a multi-step analytics workflow
workflow = StateGraph(state_class=AnalyticsState)
workflow.add_node("classify", router_agent.classify)
workflow.add_node("generate_sql", sql_agent.generate)
workflow.add_node("execute_query", query_executor.execute)
workflow.add_node("analyze", analysis_agent.analyze)
workflow.add_node("visualize", viz_agent.generate)
workflow.set_entry_point("classify")
workflow.add_conditional_edges(
"classify",
router_agent.decide_next,
{
"query": "generate_sql",
"analysis": "analyze",
"help": "docs",
},
)
workflow.add_edge("generate_sql", "execute_query")
workflow.add_edge("execute_query", "analyze")
workflow.add_edge("analyze", "visualize")
compiled = workflow.compile()
result = await compiled.invoke(initial_state)Context Graph Integration
The orchestrator integrates with the Context Graph to capture agent thinking traces:
# Start thinking trace before processing
_thinking_trace_id = None
try:
from context_graph.integration.orchestrator_hooks import (
get_orchestrator_hooks,
)
_cg_hooks = get_orchestrator_hooks()
if _cg_hooks and _cg_hooks.enabled:
_thinking_trace_id = await _cg_hooks.on_agent_message_start(
session_id, tenant_id, message, agent_id,
)
except Exception:
_cg_hooks = None
# ... process message ...
# Complete thinking trace after processing
try:
if _thinking_trace_id and _cg_hooks:
await _cg_hooks.on_agent_message_complete(
_thinking_trace_id, success=bool(resp.content),
)
except Exception:
passThis integration is entirely non-invasive. If the Context Graph subsystem is unavailable, the orchestrator continues processing without thinking trace capture.
Memory Guardrail Integration
The orchestrator supports an optional memory guardrail integration for PII protection and compliance:
# Sanitize user input before storing in memory
if self._memory_guardrail:
write_result = await self._memory_guardrail.safe_memory_write(
content=message,
tenant_id=tenant_id,
session_id=session_id,
fact_type="conversation_turn",
)
if write_result.sanitized_content:
safe_message = write_result.sanitized_content
# Periodic compliance check (every N messages)
if session_count % self._compliance_check_interval == 0:
compliance = await self._memory_guardrail.check_memory_policy_compliance(
tenant_id=tenant_id,
session_id=session_id,
)
if compliance.escalation_needed:
logger.warning(
"memory_compliance_escalation",
violations=compliance.violation_count,
)Performance Considerations
| Aspect | Strategy |
|---|---|
| LLM latency | Streaming responses to reduce perceived latency |
| Tool execution | Parallel tool execution when tools are independent |
| Memory retrieval | In-memory buffer with Redis fallback |
| Context window | Summary memory to compress long conversations |
| Iteration limits | max_iterations to prevent runaway loops |
| Token tracking | Per-response token counting for cost management |
| Connection pooling | Shared LLM client connections across requests |