Agent Orchestrator
The AgentOrchestrator is the primary execution engine for all agent interactions in the AI Service. It coordinates LLM calls, tool execution, memory management, approval workflows, and streaming responses. Defined in data-plane/ai-service/src/agents/orchestrator.py, it is the single entry point for processing user messages through any registered agent.
12.2.1.1Class Architecture
Key Dependencies
| Dependency | Type | Purpose |
|---|---|---|
LLMClient | Required (warns if None) | Wraps OpenAI/Anthropic/vLLM providers for chat and streaming |
ToolRegistry | Optional (defaults to built-in) | Manages available tools and their execution |
MemoryManager | Optional (defaults to in-memory) | Manages per-session conversation memory |
ApprovalHandler | Optional (defaults to global) | Routes tool calls through HITL approval when required |
memory_guardrail_integration | Optional | Sanitizes content before writing to memory |
12.2.1.2Message Processing Pipeline
The process_message method is the main entry point. It accepts an agent ID, session ID, tenant ID, and user message, then routes through either sync or streaming processing.
async def process_message(
self,
agent_id: str,
session_id: str,
tenant_id: str,
message: str,
user_id: str | None = None,
stream: bool = False,
) -> AgentResponse | AsyncIterator[StreamChunk]:Processing Steps
- Agent Lookup: Retrieves the registered
ConversationalAgentby ID - Context Creation: Gets or creates an
AgentContextfor the session - Memory Guardrail: Sanitizes user input before storage (PII removal, injection defense)
- Message Storage: Adds the user message to
HybridMemory - Compliance Check: Every 10 messages, runs a policy compliance audit
- Thinking Trace: Starts a Context Graph thinking trace via
orchestrator_hooks - LLM Processing: Routes to
_process_syncor_process_streaming - Trace Completion: Records success/failure in the thinking trace
Sync Processing Loop
The _process_sync method implements the ReAct (Reason + Act) loop:
async def _process_sync(self, agent, context, memory, start_time):
response = AgentResponse()
iteration = 0
max_iterations = agent.config.max_iterations # Default: 10
while iteration < max_iterations:
iteration += 1
context.status = AgentStatus.THINKING
# Build messages from system prompt + memory + history
messages = self._build_messages(agent, context, memory)
tools = self._tools.get_tool_definitions()
# Call LLM
llm_response = await self._llm.chat(
messages=messages,
tools=tools if tools else None,
temperature=agent.config.temperature,
max_tokens=agent.config.max_tokens,
)
# If tool calls: execute and continue loop
tool_calls = llm_response.get("tool_calls", [])
if tool_calls:
context.status = AgentStatus.EXECUTING
for tc in tool_calls:
# Check approval, execute tool, store result
...
continue # Next iteration with tool results
# No tool calls: final text response
response.content = llm_response.get("content", "")
break
return response12.2.1.3Tool Execution with Approval
When the LLM returns tool calls, each one passes through the approval pipeline:
for tc in tool_calls:
tool_call = ToolCall(
id=tc.get("id", str(uuid4())),
name=tc.get("function", {}).get("name", ""),
arguments=json.loads(tc.get("function", {}).get("arguments", "{}")),
)
# Check if approval required
approval_request = await self._approval.check_and_request_approval(
session_id=context.session_id,
tenant_id=context.tenant_id,
agent_id=agent.id,
tool_call=tool_call,
)
if approval_request:
response.requires_approval = True
response.approval_request = approval_request
context.status = AgentStatus.WAITING_APPROVAL
# Block until approval decision
await self._approval.workflow.wait_for_approval(approval_request.id)
approval_request = self._approval.workflow.get_request(approval_request.id)
if approval_request.status != ApprovalStatus.APPROVED:
result = ToolResult(
tool_call_id=tool_call.id,
name=tool_call.name,
error=f"Action rejected: {approval_request.status.value}",
approved=False,
)
response.tool_results.append(result)
continue
# Execute approved tool
result = await self._tools.execute_tool(tool_call, check_approval=False)
response.tool_results.append(result)12.2.1.4LLM Client Abstraction
The LLMClient class wraps the underlying provider implementations:
class LLMClient:
def __init__(self, provider: str = "openai", model: str | None = None, **kwargs):
from agents.llm_providers import get_llm_client, LLMMessage
self._client = get_llm_client(provider=provider, model=model, **kwargs)
async def chat(self, messages, tools=None, temperature=0.7, max_tokens=4096, stream=False):
llm_messages = [LLMMessage(role=m["role"], content=m["content"], ...) for m in messages]
response = await self._client.chat(messages=llm_messages, tools=tools, ...)
return {
"content": response.content,
"tool_calls": response.tool_calls,
"usage": response.usage,
"finish_reason": response.finish_reason,
}
async def stream_chat(self, messages, tools=None, temperature=0.7, max_tokens=4096):
async for token in self._client.stream_chat(messages=llm_messages, ...):
yield {
"type": token.type, # "text", "tool_call", "done"
"content": token.content,
"tool_call": token.tool_call,
"finish_reason": token.finish_reason,
}12.2.1.5Streaming Response
The _process_streaming method yields StreamChunk objects for real-time delivery:
@dataclass
class StreamChunk:
type: str # text, tool_call, tool_result, status, error, done
content: str = ""
data: dict[str, Any] = field(default_factory=dict)
is_final: bool = FalseStream chunk types:
| Type | Description |
|---|---|
status | Agent state change (thinking, executing_tools) |
text | Partial text content from LLM |
tool_call | Tool call initiated |
tool_result | Tool execution completed |
done | Final chunk with accumulated content |
error | Error occurred during processing |
WebSocket Streaming Example
const ws = new WebSocket('ws://localhost:8000/api/v1/ws/agent');
ws.onopen = () => {
ws.send(JSON.stringify({
type: 'message',
agent_id: 'default-assistant',
session_id: 'session-123',
tenant_id: 'acme-corp',
message: 'What were our top products last month?',
stream: true
}));
};
ws.onmessage = (event) => {
const chunk = JSON.parse(event.data);
switch (chunk.type) {
case 'text':
process.stdout.write(chunk.content);
break;
case 'tool_call':
console.log('Tool:', chunk.data.name);
break;
case 'done':
console.log('\nComplete.');
break;
}
};12.2.1.6Memory Guardrails
The orchestrator integrates memory guardrails at three points:
- Input sanitization: Before storing user messages
- Output sanitization: Before storing assistant responses
- Periodic compliance: Every N messages (configurable via
_compliance_check_interval)
# Input guardrail
if self._memory_guardrail:
write_result = await self._memory_guardrail.safe_memory_write(
content=message,
tenant_id=tenant_id,
session_id=session_id,
fact_type="conversation_turn",
)
if write_result.sanitized_content:
safe_message = write_result.sanitized_content
# Periodic compliance check
session_count = self._message_count.get(session_id, 0) + 1
if session_count % self._compliance_check_interval == 0:
compliance = await self._memory_guardrail.check_memory_policy_compliance(
tenant_id=tenant_id,
session_id=session_id,
)
if compliance.escalation_needed:
logger.warning("memory_compliance_escalation", ...)12.2.1.7Context Graph Integration
The orchestrator hooks into the Context Graph to record agent thinking traces:
# Start trace
from context_graph.integration.orchestrator_hooks import get_orchestrator_hooks
_cg_hooks = get_orchestrator_hooks()
if _cg_hooks and _cg_hooks.enabled:
_thinking_trace_id = await _cg_hooks.on_agent_message_start(
session_id, tenant_id, message, agent_id,
)
# After processing
if _thinking_trace_id and _cg_hooks:
await _cg_hooks.on_agent_message_complete(
_thinking_trace_id, success=bool(resp.content),
)This creates a persistent record in Dgraph of every agent reasoning step, enabling post-hoc analysis and debugging.
12.2.1.8Default Agent Configuration
The create_default_orchestrator() factory creates a pre-configured orchestrator with a general-purpose assistant:
def create_default_orchestrator() -> AgentOrchestrator:
orchestrator = AgentOrchestrator(memory_guardrail_integration=memory_guardrail)
default_agent = ConversationalAgent(
tenant_id="default",
config=AgentConfig(
name="assistant",
description="General purpose AI assistant",
agent_type=AgentType.CONVERSATIONAL,
role=AgentRole.ANALYST,
system_prompt="""You are a helpful AI assistant for data analysis...
You can help users:
- Query data using SQL
- Analyze data and find insights
- Create visualizations
- Search knowledge bases
- Answer questions about their data""",
max_iterations=10,
timeout_seconds=300,
memory_type="hybrid",
),
)
orchestrator.register_agent(default_agent)
return orchestratorSingleton Access
from src.agents.orchestrator import get_orchestrator
orchestrator = get_orchestrator() # Returns global singleton12.2.1.9Error Handling
The orchestrator implements comprehensive error handling:
| Error Type | Behavior |
|---|---|
| Agent not found | Raises ValueError with agent ID |
| No LLM client | Raises RuntimeError with configuration guidance |
| LLM call failure | Catches exception, sets AgentStatus.FAILED, returns error in AgentResponse |
| Tool execution failure | Captures error in ToolResult.error, continues processing |
| Memory save failure | Logs warning, does not fail the request |
| Streaming error | Yields StreamChunk(type="error") with exception details |
except Exception as e:
logger.exception("agent_processing_error", agent_id=agent.id)
context.status = AgentStatus.FAILED
response.content = f"An error occurred: {str(e)}"
response.metadata["error"] = str(e)