Agent System Overview
Production - Core agent orchestration, multi-agent teams, HITL, guardrails, studio, evaluation, runtime lifecycle
The Agent System is the central nervous system of the AI Service. It manages 80+ agent types across a layered architecture that handles everything from simple conversational interactions to complex multi-agent team coordination with human-in-the-loop approval workflows.
12.2.1Agent Architecture
The agent system is organized into several tightly integrated subsystems:
API Layer
Agent REST APIWebSocket APIStudio APIEvaluation APIRuntime API
Orchestration Layer
AgentOrchestratorMultiAgentOrchestratorWorkflowOrchestratorAnalyticsOrchestrator
Agent Types
ConversationalSpecialistSupervisorPersonaTool-specific
Memory & Context
HybridMemoryMemoryManagerConversationContextEpisodicMemory
Safety & Quality
GuardrailsHITL ApprovalDrift DetectionHallucination ClassifierQuality Metrics
Infrastructure
LLM ClientTool RegistrySession StoreVector MemoryKafka Events
Core Agent Models
The agent system is built around a set of core data models defined in src/agents/models.py:
class AgentType(str, Enum):
"""Types of agents in the system."""
CONVERSATIONAL = "conversational"
SPECIALIST = "specialist"
SUPERVISOR = "supervisor"
WORKER = "worker"
class AgentRole(str, Enum):
"""Roles an agent can assume."""
ANALYST = "analyst"
COORDINATOR = "coordinator"
EXECUTOR = "executor"
REVIEWER = "reviewer"
class AgentStatus(str, Enum):
"""Current status of an agent."""
IDLE = "idle"
THINKING = "thinking"
EXECUTING = "executing"
WAITING_APPROVAL = "waiting_approval"
COMPLETED = "completed"
FAILED = "failed"Agent Configuration
Every agent is configured through AgentConfig:
@dataclass
class AgentConfig:
name: str
description: str
agent_type: AgentType
role: AgentRole
system_prompt: str
max_iterations: int = 10 # Max tool-call loops
timeout_seconds: int = 300 # Per-request timeout
temperature: float = 0.7 # LLM sampling temperature
max_tokens: int = 4096 # Max response tokens
memory_type: str = "hybrid" # Memory strategy
tools: list[str] = field(default_factory=list)12.2.2Agent Subsystem Pages
| Page | Description |
|---|---|
| Agent Orchestrator | Core AgentOrchestrator class: LLM integration, tool execution loop, memory management, streaming, Context Graph hooks |
| Multi-Agent Orchestrator | MultiAgentOrchestrator for team creation, delegation modes (sequential, parallel, hierarchical), workflow graphs |
| Workflow Orchestrator | Step-based workflow execution with checkpointing and resumption |
| Agent Pools | AgentPool, PooledAgent, AgentPoolManager for task queuing and horizontal scaling |
| Agent Memory | AgentMemoryManager covering short-term buffers, long-term persistent memory, and episodic recall |
| Agent Studio | Agent creation UI/API, template library, preview sandbox, deployment pipeline |
| Guardrails | Content safety, action validation, PII detection, prompt injection defense |
| Approval Workflows | HITL approval: request creation, reviewer assignment, decision handling, audit trail |
| Drift Detection | Data drift monitoring, statistical tests, alerting, remediation recommendations |
| Hallucination Classifier | Hallucination detection, confidence scoring, feedback-driven improvement |
| Evaluation Runner | Automated evaluation: datasets, benchmark execution, metric calculation |
| Quality Metrics | Quality metric calculation, trending, threshold alerts, scoring models |
| Persona Agents | BiAnalyst, CustomerSupport, DataEngineer, Leadership, MLOps, PlatformAdmin personas |
| Specialized Agents | Airflow, Polaris, OpenMetadata, Ray, Spark integration agents |
| Tool Management | Tool registration, permission management, execution sandboxing |
| Runtime & Lifecycle | Agent runtime: feature flags, beta program, production rollout, GA readiness |
12.2.3Message Flow
Every user message follows this flow through the agent system:
User Message
|
v
AgentOrchestrator.process_message()
|
+-- 1. Get/create AgentContext for session
+-- 2. Memory guardrail check on input
+-- 3. Add message to HybridMemory
+-- 4. Start Context Graph thinking trace
|
v
_process_sync() or _process_streaming()
|
+-- Build messages (system prompt + memory context + recent messages)
+-- Call LLM via LLMClient
|
+-- If tool_calls in response:
| +-- For each tool call:
| | +-- Check approval via ApprovalHandler
| | +-- If approval needed: wait for decision
| | +-- Execute tool via ToolRegistry
| | +-- Add ToolResult to memory
| +-- Continue loop (next LLM call with tool results)
|
+-- If text response (no tool calls):
| +-- Memory guardrail check on output
| +-- Add assistant message to memory
| +-- Break loop
|
v
Save memory + record action + complete thinking trace
|
v
AgentResponse (content, tool_calls, tool_results, metadata)Example: Processing a Message
curl -X POST http://localhost:8000/api/v1/agents/process \
-H "Content-Type: application/json" \
-H "X-Tenant-ID: acme-corp" \
-d '{
"agent_id": "default-assistant",
"session_id": "session-abc-123",
"message": "Show me the top 10 customers by revenue last quarter",
"stream": false
}'Response:
{
"id": "resp-uuid-456",
"content": "Here are the top 10 customers by revenue for Q3 2025...",
"tool_calls": [
{
"id": "tc-1",
"name": "execute_sql",
"arguments": {
"sql": "SELECT customer_name, SUM(revenue) as total_revenue FROM orders WHERE order_date >= '2025-07-01' GROUP BY customer_name ORDER BY total_revenue DESC LIMIT 10"
}
}
],
"tool_results": [
{
"tool_call_id": "tc-1",
"name": "execute_sql",
"result": { "rows": [...], "columns": ["customer_name", "total_revenue"] }
}
],
"requires_approval": false,
"execution_time_ms": 2340.5,
"tokens_used": 1847
}