Workflow Orchestrator
Production - Step-based workflow execution with checkpoints and resumption
The Workflow Orchestrator provides a step-based execution engine for complex, multi-step agent workflows. Unlike the MultiAgentOrchestrator which focuses on team delegation, the Workflow Orchestrator manages sequential step execution with checkpointing and resumption capabilities. Defined in data-plane/ai-service/src/workflow/.
12.2.3.1Workflow Concepts
A workflow consists of a series of steps that execute in order, each producing outputs that feed into subsequent steps. Workflows support:
- Checkpointing: Save state after each step for crash recovery
- Resumption: Resume from the last successful step after failure
- Conditional branching: Skip or re-route steps based on intermediate results
- Timeouts: Per-step and per-workflow timeout enforcement
- Parallel steps: Execute independent steps concurrently
Workflow Definition
from src.workflow.models import WorkflowDefinition, WorkflowStep
workflow = WorkflowDefinition(
name="data_analysis_pipeline",
description="End-to-end data analysis workflow",
steps=[
WorkflowStep(
name="parse_question",
agent_type="router",
description="Classify the user's intent and extract entities",
timeout_seconds=30,
),
WorkflowStep(
name="generate_sql",
agent_type="sql",
description="Generate SQL from the parsed question",
depends_on=["parse_question"],
timeout_seconds=60,
),
WorkflowStep(
name="execute_query",
agent_type="executor",
description="Execute the generated SQL against the query engine",
depends_on=["generate_sql"],
timeout_seconds=300,
),
WorkflowStep(
name="analyze_results",
agent_type="analysis",
description="Perform statistical analysis on query results",
depends_on=["execute_query"],
timeout_seconds=120,
),
WorkflowStep(
name="generate_visualization",
agent_type="visualization",
description="Create appropriate visualizations",
depends_on=["execute_query"], # Can run parallel to analysis
timeout_seconds=60,
),
],
)12.2.3.2Execution Engine
The workflow engine processes steps in topological order:
class WorkflowExecutor:
async def execute(self, workflow: WorkflowDefinition, context: dict) -> WorkflowResult:
checkpoint = await self._load_checkpoint(workflow.id)
completed_steps = checkpoint.completed_steps if checkpoint else set()
for step in self._topological_sort(workflow.steps):
if step.name in completed_steps:
continue # Skip already-completed steps
# Check dependencies
if not all(dep in completed_steps for dep in step.depends_on):
continue # Dependencies not met
# Execute step
try:
result = await self._execute_step(step, context)
context[step.name] = result
completed_steps.add(step.name)
await self._save_checkpoint(workflow.id, completed_steps, context)
except TimeoutError:
return WorkflowResult(status="timeout", step=step.name)
except Exception as e:
return WorkflowResult(status="failed", step=step.name, error=str(e))
return WorkflowResult(status="completed", context=context)12.2.3.3API Endpoints
# Create a workflow
curl -X POST http://localhost:8000/api/v1/workflows \
-H "Content-Type: application/json" \
-H "X-Tenant-ID: acme-corp" \
-d '{
"name": "quarterly_report",
"steps": [
{"name": "gather_data", "agent_type": "sql", "timeout_seconds": 120},
{"name": "analyze", "agent_type": "analysis", "depends_on": ["gather_data"]},
{"name": "visualize", "agent_type": "visualization", "depends_on": ["gather_data"]}
]
}'
# Execute a workflow
curl -X POST http://localhost:8000/api/v1/workflows/{workflow_id}/execute \
-H "Content-Type: application/json" \
-H "X-Tenant-ID: acme-corp" \
-d '{"question": "Generate Q3 2025 executive report"}'
# Check workflow status
curl http://localhost:8000/api/v1/workflows/{workflow_id}/status?tenant_id=acme-corp
# Resume a failed workflow
curl -X POST http://localhost:8000/api/v1/workflows/{workflow_id}/resume \
-H "X-Tenant-ID: acme-corp"Workflow Status Response
{
"workflow_id": "wf-uuid-123",
"status": "in_progress",
"current_step": "analyze",
"completed_steps": ["gather_data"],
"pending_steps": ["analyze", "visualize"],
"started_at": "2025-01-15T10:30:00Z",
"elapsed_ms": 5420
}