BI Agent Orchestration
The BI Agent Orchestration layer in bi/agents/orchestrator.py implements a state machine that coordinates five specialized agents for conversational data analytics within the BI workbench. Unlike the general-purpose AgentOrchestrator, the AnalyticsOrchestrator is specifically designed for dashboard-oriented queries and follows a deterministic routing pattern based on intent classification.
Architecture
The BI orchestration layer is organized under data-plane/ai-service/src/bi/:
bi/
agents/
orchestrator.py # AnalyticsOrchestrator state machine
state.py # AgentState for BI workflows
router_agent.py # RouterAgent with intent classification
sql_agent.py # SQLAgent for query generation
analysis_agent.py # AnalysisAgent for statistics
viz_agent.py # VisualizationAgent for charts
docs_agent.py # DocumentationAgent for help
dashboard/ # Dashboard management
repository.py # Dashboard persistence
service.py # Dashboard CRUD operations
api/
dashboard_routes.py # Dashboard API endpoints
visualization/ # Chart rendering
widgets/ # Widget configuration
export/ # Report export
filters/ # Dynamic filter management
scheduling/ # Scheduled report generation
metrics/ # BI metrics tracking
selfservice/ # Self-service analyticsAnalyticsOrchestrator
The AnalyticsOrchestrator implements a LangGraph-style state machine:
class AnalyticsOrchestrator:
"""Orchestrates the multi-agent analytics workflow.
The workflow typically follows:
1. Router Agent: Classify intent and route
2. SQL Agent: Generate and execute queries
3. Analysis Agent: Perform statistical analysis
4. Visualization Agent: Generate charts
5. Documentation Agent: Provide help
"""
MAX_AGENT_VISITS = 10 # Prevent infinite loops
def __init__(
self,
sql_generator: Any = None,
query_executor: Any = None,
llm_client: Any = None,
schema_provider: Any = None,
session_store: Any = None,
) -> None:
# Initialize agents
self._router = RouterAgent(llm_client=llm_client)
self._sql_agent = SQLAgent(
sql_generator=sql_generator,
query_executor=query_executor,
)
self._analysis_agent = AnalysisAgent(llm_client=llm_client)
self._viz_agent = VisualizationAgent()
self._docs_agent = DocumentationAgent(
schema_provider=schema_provider,
)
# Agent registry
self._agents: dict[str, AgentFunction] = {
"router": self._router.classify,
"sql": self._sql_agent.generate,
"analysis": self._analysis_agent.analyze,
"visualization": self._viz_agent.generate,
"docs": self._docs_agent.respond,
}Conversation Session
Each conversation maintains a ConversationSession that holds the state machine state and session metadata:
@dataclass
class ConversationSession:
"""A conversation session with state and history."""
session_id: str
tenant_id: str
user_id: str | None
state: AgentState
created_at: datetime
last_active: datetime
message_count: int = 0Session Persistence
Sessions are persisted to the session store (Redis-backed with PostgreSQL fallback):
async def _save_session(self, session: ConversationSession) -> bool:
"""Persist session to storage."""
store = self._get_session_store()
if store:
try:
await store.save(session.to_session_data())
return True
except Exception as e:
logger.warning("session_save_failed", error=str(e))
return FalseThe session store uses lazy initialization with graceful degradation. If Redis is unavailable, the orchestrator continues with in-memory session storage.
Agent State Machine
The AgentState tracks the current state of the analytics workflow:
@dataclass
class AgentState:
"""State passed between agents in the workflow."""
session_id: str
tenant_id: str
user_id: str | None = None
# Current request
user_message: str = ""
intent: Intent | None = None
# SQL context
generated_sql: str | None = None
sql_executed: bool = False
query_results: dict[str, Any] | None = None
# Analysis context
analysis_results: dict[str, Any] | None = None
insights: list[str] = field(default_factory=list)
# Visualization context
chart_config: dict[str, Any] | None = None
chart_type: str | None = None
# Conversation history
messages: list[ConversationMessage] = field(default_factory=list)
# Routing
next_agent: str | None = None
visited_agents: list[str] = field(default_factory=list)
error: str | None = NoneState Flow
+----------+
+-------->| Router |<--------+
| +----+-----+ |
| | |
| +---------+---------+ |
| | | | |
| v v v |
| +-----+ +------+ +-----+ |
| | SQL | | Docs | | Viz | |
| +--+--+ +------+ +--+--+ |
| | | |
| v | |
| +--------+ | |
| |Analysis| | |
| +---+----+ | |
| | | |
| v v |
| +------+ +------+ |
| | Viz | |Return| |
| +--+---+ +------+ |
| | |
| v |
| +------+ |
+--+Return| |
+------+ |
|
(Refinement) -----------------+Specialized Agents
Router Agent
The Router Agent classifies user intent and determines which agent should handle the request:
class RouterAgent:
"""Classifies user intent and routes to appropriate agent."""
async def classify(self, state: AgentState) -> AgentState:
"""Classify intent and set next_agent."""
intent = await self._classify_intent(state.user_message)
state.intent = intent
# Route based on intent
routing = {
Intent.QUERY_DATA: "sql",
Intent.AGGREGATE: "sql",
Intent.FILTER: "sql",
Intent.COMPARE: "sql",
Intent.TREND: "sql",
Intent.RANK: "sql",
Intent.ANALYZE: "analysis",
Intent.CORRELATE: "analysis",
Intent.FORECAST: "analysis",
Intent.ANOMALY: "analysis",
Intent.VISUALIZE: "visualization",
Intent.DASHBOARD: "visualization",
Intent.HELP: "docs",
Intent.SCHEMA: "docs",
Intent.CLARIFY: "docs",
Intent.REFINE: "sql",
Intent.UNDO: None, # Handle internally
Intent.RESET: None, # Handle internally
}
state.next_agent = routing.get(intent)
return stateSQL Agent
Generates and executes SQL queries using the Text-to-SQL pipeline:
class SQLAgent:
"""Generates and executes SQL for BI queries."""
async def generate(self, state: AgentState) -> AgentState:
"""Generate SQL from user message and execute."""
# Generate SQL
result = await self._sql_generator.generate(
question=state.user_message,
tenant_id=state.tenant_id,
)
if result.success:
state.generated_sql = result.query.sql
# Execute query
query_results = await self._query_executor.execute(
sql=result.query.sql,
tenant_id=state.tenant_id,
)
state.query_results = query_results
state.sql_executed = True
# Route to analysis for further processing
state.next_agent = "analysis"
else:
state.error = result.error
state.next_agent = None
return stateAnalysis Agent
Performs statistical analysis on query results:
class AnalysisAgent:
"""Performs statistical analysis on query results."""
async def analyze(self, state: AgentState) -> AgentState:
"""Analyze query results and generate insights."""
if not state.query_results:
state.next_agent = None
return state
# Perform analysis types based on intent
analyses = {
Intent.ANALYZE: self._statistical_analysis,
Intent.CORRELATE: self._correlation_analysis,
Intent.FORECAST: self._trend_analysis,
Intent.ANOMALY: self._anomaly_detection,
Intent.TREND: self._trend_analysis,
}
analyze_fn = analyses.get(state.intent, self._basic_analysis)
analysis_result = await analyze_fn(state.query_results)
state.analysis_results = analysis_result
state.insights = analysis_result.get("insights", [])
# Route to visualization
state.next_agent = "visualization"
return state| Analysis Type | Techniques | Output |
|---|---|---|
| Statistical | Mean, median, std dev, percentiles, distribution | Summary statistics |
| Trend | Linear regression, moving average, seasonality | Trend direction, growth rate |
| Anomaly | Z-score, IQR, isolation forest | Anomalous data points |
| Correlation | Pearson, Spearman, cross-correlation | Correlation coefficients |
Visualization Agent
Generates chart configurations from query results and analysis:
class VisualizationAgent:
"""Generates visualization configurations."""
async def generate(self, state: AgentState) -> AgentState:
"""Generate chart config from data and analysis."""
chart_type = self._recommend_chart_type(
intent=state.intent,
data=state.query_results,
analysis=state.analysis_results,
)
state.chart_type = chart_type
state.chart_config = self._build_chart_config(
chart_type=chart_type,
data=state.query_results,
insights=state.insights,
)
state.next_agent = None # Terminal state
return stateDocumentation Agent
Handles help requests and schema exploration:
class DocumentationAgent:
"""Provides help and schema information."""
async def respond(self, state: AgentState) -> AgentState:
"""Respond to help and schema requests."""
if state.intent == Intent.SCHEMA:
# Retrieve and format schema information
schema_info = await self._schema_provider.get_schema(
tenant_id=state.tenant_id,
)
state.messages.append(
ConversationMessage(
role=MessageRole.ASSISTANT,
content=self._format_schema(schema_info),
)
)
elif state.intent == Intent.HELP:
state.messages.append(
ConversationMessage(
role=MessageRole.ASSISTANT,
content=self._get_help_text(),
)
)
state.next_agent = None
return stateMessage Processing
The main processing loop executes the state machine:
async def process_message(
self,
session_id: str,
message: str,
tenant_id: str | None = None,
) -> dict[str, Any]:
"""Process a user message through the agent workflow."""
session = await self.get_session(session_id)
# Add user message to history
session.state.user_message = message
session.state.messages.append(
ConversationMessage(role=MessageRole.USER, content=message)
)
# Start with router
session.state.next_agent = "router"
session.state.visited_agents = []
# Execute state machine
visits = 0
while session.state.next_agent and visits < self.MAX_AGENT_VISITS:
agent_name = session.state.next_agent
agent_fn = self._agents.get(agent_name)
if not agent_fn:
break
session.state.visited_agents.append(agent_name)
session.state.next_agent = None # Reset
session.state = await agent_fn(session.state)
visits += 1
# Save session
await self._save_session(session)
return self._build_response(session.state)The MAX_AGENT_VISITS = 10 safeguard prevents infinite loops in the state machine.
Response Format
The orchestrator builds a structured response containing all artifacts from the workflow:
{
"session_id": "sess-abc-123",
"message": "Here are the total sales by region for Q4 2025.",
"intent": "AGGREGATE",
"sql": "SELECT region, SUM(amount) as total_sales FROM ...",
"data": {
"columns": ["region", "total_sales"],
"rows": [
["North America", 1250000],
["Europe", 890000],
["Asia Pacific", 670000]
]
},
"insights": [
"North America leads with 44% of total sales",
"Asia Pacific showed 15% growth vs Q3"
],
"visualization": {
"chart_type": "bar",
"x_axis": "region",
"y_axis": "total_sales",
"title": "Total Sales by Region - Q4 2025"
},
"agents_visited": ["router", "sql", "analysis", "visualization"]
}Dashboard Integration
The BI agents integrate with the dashboard subsystem in bi/dashboard/:
| Component | File | Purpose |
|---|---|---|
DashboardRepository | repository.py | PostgreSQL persistence for dashboard configs |
DashboardService | service.py | CRUD operations, sharing, versioning |
| Dashboard routes | api/dashboard_routes.py | REST API for dashboard management |
Dashboard Operations
POST /api/v1/bi/dashboards # Create dashboard
GET /api/v1/bi/dashboards # List dashboards
GET /api/v1/bi/dashboards/{id} # Get dashboard
PUT /api/v1/bi/dashboards/{id} # Update dashboard
POST /api/v1/bi/dashboards/{id}/widgets # Add widget from conversation