Temporal Workflows
Temporal provides durable workflow orchestration for long-running data pipeline processes in MATIH. Unlike Airflow (which excels at scheduled DAG execution) and Flink (which handles continuous streaming), Temporal manages workflows that require durable state, human-in-the-loop approvals, long-running operations with complex retry semantics, and cross-service orchestration.
Temporal Architecture
+-------------------+ Workflow Start +-------------------+
| Pipeline Service | ------------------> | Temporal Server |
| (API Routes) | | (History + Worker |
+-------------------+ | Service) |
+--------+----------+
|
+--------v----------+
| Temporal Workers |
| (MATIH Activities)|
+--------+----------+
|
+--------------------+--------------------+
| | |
+---------v---+ +---------v---+ +---------v---+
| Spark Job | | Airflow DAG | | Notification|
| Activity | | Activity | | Activity |
+-------------+ +-------------+ +-------------+Workflow Types
MATIH uses Temporal for several categories of workflows:
Data Pipeline Orchestration
Complex multi-engine pipelines that combine Spark, Airflow, and Flink:
@workflow.defn
class DataPipelineWorkflow:
"""Orchestrates a multi-engine data pipeline."""
@workflow.run
async def run(self, pipeline_config: PipelineConfig) -> PipelineResult:
# Step 1: Extract from source (Spark)
extraction = await workflow.execute_activity(
extract_data,
pipeline_config.source,
start_to_close_timeout=timedelta(hours=1),
)
# Step 2: Quality validation
quality = await workflow.execute_activity(
validate_quality,
extraction.output_path,
start_to_close_timeout=timedelta(minutes=30),
)
if quality.score < pipeline_config.min_quality_score:
# Step 2a: Notify and wait for approval
await workflow.execute_activity(
notify_quality_issue,
quality,
start_to_close_timeout=timedelta(minutes=5),
)
approved = await workflow.wait_condition(
lambda: self.approval_received,
timeout=timedelta(hours=4),
)
if not approved:
return PipelineResult(status="CANCELLED", reason="Quality below threshold")
# Step 3: Transform (Airflow DAG)
transform = await workflow.execute_activity(
trigger_airflow_dag,
pipeline_config.transform_dag,
start_to_close_timeout=timedelta(hours=2),
)
# Step 4: Load to destination
load = await workflow.execute_activity(
load_to_iceberg,
transform.output_path,
pipeline_config.destination,
start_to_close_timeout=timedelta(hours=1),
)
return PipelineResult(
status="COMPLETED",
rows_processed=load.rows,
duration=workflow.info().current_history_length,
)Data Backfill Workflow
Manages large-scale historical data backfills:
@workflow.defn
class DataBackfillWorkflow:
"""Manages partitioned data backfill with progress tracking."""
@workflow.run
async def run(self, backfill_config: BackfillConfig) -> BackfillResult:
partitions = generate_partitions(
backfill_config.start_date,
backfill_config.end_date,
backfill_config.partition_size,
)
results = []
for partition in partitions:
result = await workflow.execute_activity(
process_partition,
partition,
retry_policy=RetryPolicy(
maximum_attempts=3,
initial_interval=timedelta(seconds=30),
maximum_interval=timedelta(minutes=10),
backoff_coefficient=2.0,
),
start_to_close_timeout=timedelta(hours=1),
)
results.append(result)
# Report progress
self.progress = len(results) / len(partitions)
return BackfillResult(
partitions_processed=len(results),
total_rows=sum(r.rows for r in results),
)Schema Migration Workflow
Coordinates schema changes across multiple systems:
@workflow.defn
class SchemaMigrationWorkflow:
"""Coordinates schema migration with rollback capability."""
@workflow.run
async def run(self, migration: MigrationConfig) -> MigrationResult:
# 1. Validate migration
validation = await workflow.execute_activity(validate_migration, migration)
# 2. Create backup
backup = await workflow.execute_activity(create_backup, migration.table)
# 3. Apply migration
try:
result = await workflow.execute_activity(apply_migration, migration)
except Exception as e:
# 4. Rollback on failure
await workflow.execute_activity(restore_backup, backup)
raise
# 5. Validate post-migration
post_validation = await workflow.execute_activity(
validate_post_migration, migration
)
if not post_validation.passed:
await workflow.execute_activity(restore_backup, backup)
return MigrationResult(status="ROLLED_BACK")
return MigrationResult(status="COMPLETED")Activity Definitions
Activities are the atomic units of work in Temporal workflows:
Pipeline Activities
| Activity | Description | Timeout |
|---|---|---|
extract_data | Extract data from source via Spark or JDBC | 1 hour |
validate_quality | Run quality checks on extracted data | 30 minutes |
trigger_airflow_dag | Trigger and wait for Airflow DAG completion | 2 hours |
load_to_iceberg | Write data to Iceberg table | 1 hour |
process_partition | Process a single partition during backfill | 1 hour |
Notification Activities
# data-plane/pipeline-service/src/matih_pipeline/activities/notification_activity.py
@activity.defn
async def notify_quality_issue(quality_result: QualityResult) -> None:
"""Send quality issue notification to stakeholders."""
# Send to Slack, email, or PagerDuty based on severity
@activity.defn
async def notify_pipeline_completion(result: PipelineResult) -> None:
"""Send pipeline completion notification."""
@activity.defn
async def notify_failure(error: str, pipeline_id: str) -> None:
"""Send failure notification with error details."""Infrastructure Activities
| Activity | Description |
|---|---|
create_backup | Create data backup before destructive operations |
restore_backup | Restore from backup on failure |
validate_migration | Pre-flight checks for schema migration |
apply_migration | Execute schema migration DDL |
compact_iceberg_table | Trigger Iceberg file compaction |
Retry Policies
Temporal provides configurable retry policies per activity:
default_retry_policy = RetryPolicy(
initial_interval=timedelta(seconds=10),
backoff_coefficient=2.0,
maximum_interval=timedelta(minutes=5),
maximum_attempts=5,
non_retryable_error_types=[
"ValidationError",
"PermissionDeniedError",
"SchemaConflictError",
],
)Retry Strategies by Activity Type
| Activity Type | Max Attempts | Initial Interval | Max Interval | Backoff |
|---|---|---|---|---|
| Data extraction | 3 | 30s | 10min | 2.0x |
| Quality validation | 2 | 10s | 5min | 2.0x |
| Airflow DAG trigger | 3 | 60s | 15min | 2.0x |
| Data loading | 3 | 30s | 10min | 2.0x |
| Notifications | 5 | 5s | 1min | 1.5x |
| Backup/Restore | 2 | 60s | 5min | 2.0x |
Workflow Monitoring
Workflow Status API
GET /v1/pipelines/workflows/{workflowId}/status
Response:
{
"workflowId": "wf-daily-sales-etl-2026-02-12",
"runId": "run-456",
"status": "RUNNING",
"startedAt": "2026-02-12T06:00:00Z",
"currentActivity": "trigger_airflow_dag",
"progress": 0.6,
"activities": [
{"name": "extract_data", "status": "COMPLETED", "duration": "5m 23s"},
{"name": "validate_quality", "status": "COMPLETED", "duration": "1m 12s"},
{"name": "trigger_airflow_dag", "status": "RUNNING", "startedAt": "2026-02-12T06:07:00Z"}
],
"retries": 0,
"pendingSignals": []
}Metrics
| Metric | Type | Description |
|---|---|---|
temporal.workflow.started | Counter | Workflows started |
temporal.workflow.completed | Counter | Workflows completed successfully |
temporal.workflow.failed | Counter | Workflows failed |
temporal.workflow.duration | Histogram | Workflow execution duration |
temporal.activity.execution | Timer | Per-activity execution time |
temporal.activity.retries | Counter | Activity retry count |
Temporal Server Configuration
| Property | Value |
|---|---|
| Namespace | matih-pipelines |
| Persistence | PostgreSQL |
| Visibility | Elasticsearch |
| Frontend port | 7233 |
| Worker concurrency | 10 (per worker pod) |
| Workflow timeout | 24 hours (default) |
| Activity timeout | Configurable per activity |
Integration with Pipeline Service
The Pipeline Service exposes Temporal workflows through its API:
# Pipeline API routes for Temporal workflows
@router.post("/pipelines/{pipeline_id}/execute")
async def execute_pipeline(pipeline_id: str, params: ExecutionParams):
# Determine engine: if workflow orchestration needed -> Temporal
if pipeline.requires_orchestration:
workflow_id = await temporal_client.start_workflow(
DataPipelineWorkflow.run,
pipeline_config,
id=f"pipeline-{pipeline_id}-{datetime.utcnow().isoformat()}",
task_queue="pipeline-workers",
)
return {"workflowId": workflow_id, "status": "STARTED"}
@router.post("/pipelines/workflows/{workflow_id}/signal")
async def signal_workflow(workflow_id: str, signal: WorkflowSignal):
# Send approval or cancellation signals to running workflows
await temporal_client.get_workflow_handle(workflow_id).signal(signal.name, signal.data)Related Sections
- Pipeline Service -- Service architecture and API routes
- Airflow -- DAG execution triggered by Temporal activities
- Spark -- Spark jobs executed as Temporal activities
- API Reference -- Workflow management endpoints