MATIH Platform is in active MVP development. Documentation reflects current implementation status.
12. AI Service
Bi Agents

BI Agent Orchestration

The BI Agent Orchestration layer in bi/agents/orchestrator.py implements a state machine that coordinates five specialized agents for conversational data analytics within the BI workbench. Unlike the general-purpose AgentOrchestrator, the AnalyticsOrchestrator is specifically designed for dashboard-oriented queries and follows a deterministic routing pattern based on intent classification.


Architecture

The BI orchestration layer is organized under data-plane/ai-service/src/bi/:

bi/
  agents/
    orchestrator.py       # AnalyticsOrchestrator state machine
    state.py              # AgentState for BI workflows
    router_agent.py       # RouterAgent with intent classification
    sql_agent.py          # SQLAgent for query generation
    analysis_agent.py     # AnalysisAgent for statistics
    viz_agent.py          # VisualizationAgent for charts
    docs_agent.py         # DocumentationAgent for help
  dashboard/              # Dashboard management
    repository.py         # Dashboard persistence
    service.py            # Dashboard CRUD operations
  api/
    dashboard_routes.py   # Dashboard API endpoints
  visualization/          # Chart rendering
  widgets/                # Widget configuration
  export/                 # Report export
  filters/                # Dynamic filter management
  scheduling/             # Scheduled report generation
  metrics/                # BI metrics tracking
  selfservice/            # Self-service analytics

AnalyticsOrchestrator

The AnalyticsOrchestrator implements a LangGraph-style state machine:

class AnalyticsOrchestrator:
    """Orchestrates the multi-agent analytics workflow.
 
    The workflow typically follows:
    1. Router Agent: Classify intent and route
    2. SQL Agent: Generate and execute queries
    3. Analysis Agent: Perform statistical analysis
    4. Visualization Agent: Generate charts
    5. Documentation Agent: Provide help
    """
 
    MAX_AGENT_VISITS = 10  # Prevent infinite loops
 
    def __init__(
        self,
        sql_generator: Any = None,
        query_executor: Any = None,
        llm_client: Any = None,
        schema_provider: Any = None,
        session_store: Any = None,
    ) -> None:
        # Initialize agents
        self._router = RouterAgent(llm_client=llm_client)
        self._sql_agent = SQLAgent(
            sql_generator=sql_generator,
            query_executor=query_executor,
        )
        self._analysis_agent = AnalysisAgent(llm_client=llm_client)
        self._viz_agent = VisualizationAgent()
        self._docs_agent = DocumentationAgent(
            schema_provider=schema_provider,
        )
 
        # Agent registry
        self._agents: dict[str, AgentFunction] = {
            "router": self._router.classify,
            "sql": self._sql_agent.generate,
            "analysis": self._analysis_agent.analyze,
            "visualization": self._viz_agent.generate,
            "docs": self._docs_agent.respond,
        }

Conversation Session

Each conversation maintains a ConversationSession that holds the state machine state and session metadata:

@dataclass
class ConversationSession:
    """A conversation session with state and history."""
    session_id: str
    tenant_id: str
    user_id: str | None
    state: AgentState
    created_at: datetime
    last_active: datetime
    message_count: int = 0

Session Persistence

Sessions are persisted to the session store (Redis-backed with PostgreSQL fallback):

async def _save_session(self, session: ConversationSession) -> bool:
    """Persist session to storage."""
    store = self._get_session_store()
    if store:
        try:
            await store.save(session.to_session_data())
            return True
        except Exception as e:
            logger.warning("session_save_failed", error=str(e))
    return False

The session store uses lazy initialization with graceful degradation. If Redis is unavailable, the orchestrator continues with in-memory session storage.


Agent State Machine

The AgentState tracks the current state of the analytics workflow:

@dataclass
class AgentState:
    """State passed between agents in the workflow."""
    session_id: str
    tenant_id: str
    user_id: str | None = None
 
    # Current request
    user_message: str = ""
    intent: Intent | None = None
 
    # SQL context
    generated_sql: str | None = None
    sql_executed: bool = False
    query_results: dict[str, Any] | None = None
 
    # Analysis context
    analysis_results: dict[str, Any] | None = None
    insights: list[str] = field(default_factory=list)
 
    # Visualization context
    chart_config: dict[str, Any] | None = None
    chart_type: str | None = None
 
    # Conversation history
    messages: list[ConversationMessage] = field(default_factory=list)
 
    # Routing
    next_agent: str | None = None
    visited_agents: list[str] = field(default_factory=list)
    error: str | None = None

State Flow

                      +----------+
            +-------->| Router   |<--------+
            |         +----+-----+         |
            |              |               |
            |    +---------+---------+     |
            |    |         |         |     |
            |    v         v         v     |
            | +-----+  +------+  +-----+  |
            | | SQL |  | Docs |  | Viz  |  |
            | +--+--+  +------+  +--+--+  |
            |    |                   |     |
            |    v                   |     |
            | +--------+            |     |
            | |Analysis|            |     |
            | +---+----+            |     |
            |     |                 |     |
            |     v                 v     |
            |  +------+         +------+  |
            |  | Viz  |         |Return|  |
            |  +--+---+         +------+  |
            |     |                       |
            |     v                       |
            |  +------+                   |
            +--+Return|                   |
               +------+                   |
                                          |
            (Refinement) -----------------+

Specialized Agents

Router Agent

The Router Agent classifies user intent and determines which agent should handle the request:

class RouterAgent:
    """Classifies user intent and routes to appropriate agent."""
 
    async def classify(self, state: AgentState) -> AgentState:
        """Classify intent and set next_agent."""
        intent = await self._classify_intent(state.user_message)
        state.intent = intent
 
        # Route based on intent
        routing = {
            Intent.QUERY_DATA: "sql",
            Intent.AGGREGATE: "sql",
            Intent.FILTER: "sql",
            Intent.COMPARE: "sql",
            Intent.TREND: "sql",
            Intent.RANK: "sql",
            Intent.ANALYZE: "analysis",
            Intent.CORRELATE: "analysis",
            Intent.FORECAST: "analysis",
            Intent.ANOMALY: "analysis",
            Intent.VISUALIZE: "visualization",
            Intent.DASHBOARD: "visualization",
            Intent.HELP: "docs",
            Intent.SCHEMA: "docs",
            Intent.CLARIFY: "docs",
            Intent.REFINE: "sql",
            Intent.UNDO: None,       # Handle internally
            Intent.RESET: None,      # Handle internally
        }
 
        state.next_agent = routing.get(intent)
        return state

SQL Agent

Generates and executes SQL queries using the Text-to-SQL pipeline:

class SQLAgent:
    """Generates and executes SQL for BI queries."""
 
    async def generate(self, state: AgentState) -> AgentState:
        """Generate SQL from user message and execute."""
        # Generate SQL
        result = await self._sql_generator.generate(
            question=state.user_message,
            tenant_id=state.tenant_id,
        )
 
        if result.success:
            state.generated_sql = result.query.sql
 
            # Execute query
            query_results = await self._query_executor.execute(
                sql=result.query.sql,
                tenant_id=state.tenant_id,
            )
            state.query_results = query_results
            state.sql_executed = True
 
            # Route to analysis for further processing
            state.next_agent = "analysis"
        else:
            state.error = result.error
            state.next_agent = None
 
        return state

Analysis Agent

Performs statistical analysis on query results:

class AnalysisAgent:
    """Performs statistical analysis on query results."""
 
    async def analyze(self, state: AgentState) -> AgentState:
        """Analyze query results and generate insights."""
        if not state.query_results:
            state.next_agent = None
            return state
 
        # Perform analysis types based on intent
        analyses = {
            Intent.ANALYZE: self._statistical_analysis,
            Intent.CORRELATE: self._correlation_analysis,
            Intent.FORECAST: self._trend_analysis,
            Intent.ANOMALY: self._anomaly_detection,
            Intent.TREND: self._trend_analysis,
        }
 
        analyze_fn = analyses.get(state.intent, self._basic_analysis)
        analysis_result = await analyze_fn(state.query_results)
 
        state.analysis_results = analysis_result
        state.insights = analysis_result.get("insights", [])
 
        # Route to visualization
        state.next_agent = "visualization"
        return state
Analysis TypeTechniquesOutput
StatisticalMean, median, std dev, percentiles, distributionSummary statistics
TrendLinear regression, moving average, seasonalityTrend direction, growth rate
AnomalyZ-score, IQR, isolation forestAnomalous data points
CorrelationPearson, Spearman, cross-correlationCorrelation coefficients

Visualization Agent

Generates chart configurations from query results and analysis:

class VisualizationAgent:
    """Generates visualization configurations."""
 
    async def generate(self, state: AgentState) -> AgentState:
        """Generate chart config from data and analysis."""
        chart_type = self._recommend_chart_type(
            intent=state.intent,
            data=state.query_results,
            analysis=state.analysis_results,
        )
 
        state.chart_type = chart_type
        state.chart_config = self._build_chart_config(
            chart_type=chart_type,
            data=state.query_results,
            insights=state.insights,
        )
 
        state.next_agent = None  # Terminal state
        return state

Documentation Agent

Handles help requests and schema exploration:

class DocumentationAgent:
    """Provides help and schema information."""
 
    async def respond(self, state: AgentState) -> AgentState:
        """Respond to help and schema requests."""
        if state.intent == Intent.SCHEMA:
            # Retrieve and format schema information
            schema_info = await self._schema_provider.get_schema(
                tenant_id=state.tenant_id,
            )
            state.messages.append(
                ConversationMessage(
                    role=MessageRole.ASSISTANT,
                    content=self._format_schema(schema_info),
                )
            )
        elif state.intent == Intent.HELP:
            state.messages.append(
                ConversationMessage(
                    role=MessageRole.ASSISTANT,
                    content=self._get_help_text(),
                )
            )
 
        state.next_agent = None
        return state

Message Processing

The main processing loop executes the state machine:

async def process_message(
    self,
    session_id: str,
    message: str,
    tenant_id: str | None = None,
) -> dict[str, Any]:
    """Process a user message through the agent workflow."""
    session = await self.get_session(session_id)
 
    # Add user message to history
    session.state.user_message = message
    session.state.messages.append(
        ConversationMessage(role=MessageRole.USER, content=message)
    )
 
    # Start with router
    session.state.next_agent = "router"
    session.state.visited_agents = []
 
    # Execute state machine
    visits = 0
    while session.state.next_agent and visits < self.MAX_AGENT_VISITS:
        agent_name = session.state.next_agent
        agent_fn = self._agents.get(agent_name)
 
        if not agent_fn:
            break
 
        session.state.visited_agents.append(agent_name)
        session.state.next_agent = None  # Reset
 
        session.state = await agent_fn(session.state)
        visits += 1
 
    # Save session
    await self._save_session(session)
 
    return self._build_response(session.state)

The MAX_AGENT_VISITS = 10 safeguard prevents infinite loops in the state machine.


Response Format

The orchestrator builds a structured response containing all artifacts from the workflow:

{
    "session_id": "sess-abc-123",
    "message": "Here are the total sales by region for Q4 2025.",
    "intent": "AGGREGATE",
    "sql": "SELECT region, SUM(amount) as total_sales FROM ...",
    "data": {
        "columns": ["region", "total_sales"],
        "rows": [
            ["North America", 1250000],
            ["Europe", 890000],
            ["Asia Pacific", 670000]
        ]
    },
    "insights": [
        "North America leads with 44% of total sales",
        "Asia Pacific showed 15% growth vs Q3"
    ],
    "visualization": {
        "chart_type": "bar",
        "x_axis": "region",
        "y_axis": "total_sales",
        "title": "Total Sales by Region - Q4 2025"
    },
    "agents_visited": ["router", "sql", "analysis", "visualization"]
}

Dashboard Integration

The BI agents integrate with the dashboard subsystem in bi/dashboard/:

ComponentFilePurpose
DashboardRepositoryrepository.pyPostgreSQL persistence for dashboard configs
DashboardServiceservice.pyCRUD operations, sharing, versioning
Dashboard routesapi/dashboard_routes.pyREST API for dashboard management

Dashboard Operations

POST /api/v1/bi/dashboards              # Create dashboard
GET  /api/v1/bi/dashboards              # List dashboards
GET  /api/v1/bi/dashboards/{id}         # Get dashboard
PUT  /api/v1/bi/dashboards/{id}         # Update dashboard
POST /api/v1/bi/dashboards/{id}/widgets  # Add widget from conversation