MATIH Platform is in active MVP development. Documentation reflects current implementation status.
12. AI Service
Agent System
Workflow Orchestrator

Workflow Orchestrator

Production - Step-based workflow execution with checkpoints and resumption

The Workflow Orchestrator provides a step-based execution engine for complex, multi-step agent workflows. Unlike the MultiAgentOrchestrator which focuses on team delegation, the Workflow Orchestrator manages sequential step execution with checkpointing and resumption capabilities. Defined in data-plane/ai-service/src/workflow/.


12.2.3.1Workflow Concepts

A workflow consists of a series of steps that execute in order, each producing outputs that feed into subsequent steps. Workflows support:

  • Checkpointing: Save state after each step for crash recovery
  • Resumption: Resume from the last successful step after failure
  • Conditional branching: Skip or re-route steps based on intermediate results
  • Timeouts: Per-step and per-workflow timeout enforcement
  • Parallel steps: Execute independent steps concurrently

Workflow Definition

from src.workflow.models import WorkflowDefinition, WorkflowStep
 
workflow = WorkflowDefinition(
    name="data_analysis_pipeline",
    description="End-to-end data analysis workflow",
    steps=[
        WorkflowStep(
            name="parse_question",
            agent_type="router",
            description="Classify the user's intent and extract entities",
            timeout_seconds=30,
        ),
        WorkflowStep(
            name="generate_sql",
            agent_type="sql",
            description="Generate SQL from the parsed question",
            depends_on=["parse_question"],
            timeout_seconds=60,
        ),
        WorkflowStep(
            name="execute_query",
            agent_type="executor",
            description="Execute the generated SQL against the query engine",
            depends_on=["generate_sql"],
            timeout_seconds=300,
        ),
        WorkflowStep(
            name="analyze_results",
            agent_type="analysis",
            description="Perform statistical analysis on query results",
            depends_on=["execute_query"],
            timeout_seconds=120,
        ),
        WorkflowStep(
            name="generate_visualization",
            agent_type="visualization",
            description="Create appropriate visualizations",
            depends_on=["execute_query"],  # Can run parallel to analysis
            timeout_seconds=60,
        ),
    ],
)

12.2.3.2Execution Engine

The workflow engine processes steps in topological order:

class WorkflowExecutor:
    async def execute(self, workflow: WorkflowDefinition, context: dict) -> WorkflowResult:
        checkpoint = await self._load_checkpoint(workflow.id)
        completed_steps = checkpoint.completed_steps if checkpoint else set()
 
        for step in self._topological_sort(workflow.steps):
            if step.name in completed_steps:
                continue  # Skip already-completed steps
 
            # Check dependencies
            if not all(dep in completed_steps for dep in step.depends_on):
                continue  # Dependencies not met
 
            # Execute step
            try:
                result = await self._execute_step(step, context)
                context[step.name] = result
                completed_steps.add(step.name)
                await self._save_checkpoint(workflow.id, completed_steps, context)
            except TimeoutError:
                return WorkflowResult(status="timeout", step=step.name)
            except Exception as e:
                return WorkflowResult(status="failed", step=step.name, error=str(e))
 
        return WorkflowResult(status="completed", context=context)

12.2.3.3API Endpoints

# Create a workflow
curl -X POST http://localhost:8000/api/v1/workflows \
  -H "Content-Type: application/json" \
  -H "X-Tenant-ID: acme-corp" \
  -d '{
    "name": "quarterly_report",
    "steps": [
      {"name": "gather_data", "agent_type": "sql", "timeout_seconds": 120},
      {"name": "analyze", "agent_type": "analysis", "depends_on": ["gather_data"]},
      {"name": "visualize", "agent_type": "visualization", "depends_on": ["gather_data"]}
    ]
  }'
 
# Execute a workflow
curl -X POST http://localhost:8000/api/v1/workflows/{workflow_id}/execute \
  -H "Content-Type: application/json" \
  -H "X-Tenant-ID: acme-corp" \
  -d '{"question": "Generate Q3 2025 executive report"}'
 
# Check workflow status
curl http://localhost:8000/api/v1/workflows/{workflow_id}/status?tenant_id=acme-corp
 
# Resume a failed workflow
curl -X POST http://localhost:8000/api/v1/workflows/{workflow_id}/resume \
  -H "X-Tenant-ID: acme-corp"

Workflow Status Response

{
  "workflow_id": "wf-uuid-123",
  "status": "in_progress",
  "current_step": "analyze",
  "completed_steps": ["gather_data"],
  "pending_steps": ["analyze", "visualize"],
  "started_at": "2025-01-15T10:30:00Z",
  "elapsed_ms": 5420
}