MATIH Platform is in active MVP development. Documentation reflects current implementation status.
11. Pipelines & Data Engineering
Temporal

Temporal Workflows

Temporal provides durable workflow orchestration for long-running data pipeline processes in MATIH. Unlike Airflow (which excels at scheduled DAG execution) and Flink (which handles continuous streaming), Temporal manages workflows that require durable state, human-in-the-loop approvals, long-running operations with complex retry semantics, and cross-service orchestration.


Temporal Architecture

+-------------------+    Workflow Start    +-------------------+
| Pipeline Service  | ------------------> | Temporal Server   |
| (API Routes)      |                     | (History + Worker |
+-------------------+                     |  Service)         |
                                          +--------+----------+
                                                   |
                                          +--------v----------+
                                          | Temporal Workers  |
                                          | (MATIH Activities)|
                                          +--------+----------+
                                                   |
                              +--------------------+--------------------+
                              |                    |                    |
                    +---------v---+      +---------v---+     +---------v---+
                    | Spark Job   |      | Airflow DAG |     | Notification|
                    | Activity    |      | Activity    |     | Activity    |
                    +-------------+      +-------------+     +-------------+

Workflow Types

MATIH uses Temporal for several categories of workflows:

Data Pipeline Orchestration

Complex multi-engine pipelines that combine Spark, Airflow, and Flink:

@workflow.defn
class DataPipelineWorkflow:
    """Orchestrates a multi-engine data pipeline."""
 
    @workflow.run
    async def run(self, pipeline_config: PipelineConfig) -> PipelineResult:
        # Step 1: Extract from source (Spark)
        extraction = await workflow.execute_activity(
            extract_data,
            pipeline_config.source,
            start_to_close_timeout=timedelta(hours=1),
        )
 
        # Step 2: Quality validation
        quality = await workflow.execute_activity(
            validate_quality,
            extraction.output_path,
            start_to_close_timeout=timedelta(minutes=30),
        )
 
        if quality.score < pipeline_config.min_quality_score:
            # Step 2a: Notify and wait for approval
            await workflow.execute_activity(
                notify_quality_issue,
                quality,
                start_to_close_timeout=timedelta(minutes=5),
            )
            approved = await workflow.wait_condition(
                lambda: self.approval_received,
                timeout=timedelta(hours=4),
            )
            if not approved:
                return PipelineResult(status="CANCELLED", reason="Quality below threshold")
 
        # Step 3: Transform (Airflow DAG)
        transform = await workflow.execute_activity(
            trigger_airflow_dag,
            pipeline_config.transform_dag,
            start_to_close_timeout=timedelta(hours=2),
        )
 
        # Step 4: Load to destination
        load = await workflow.execute_activity(
            load_to_iceberg,
            transform.output_path,
            pipeline_config.destination,
            start_to_close_timeout=timedelta(hours=1),
        )
 
        return PipelineResult(
            status="COMPLETED",
            rows_processed=load.rows,
            duration=workflow.info().current_history_length,
        )

Data Backfill Workflow

Manages large-scale historical data backfills:

@workflow.defn
class DataBackfillWorkflow:
    """Manages partitioned data backfill with progress tracking."""
 
    @workflow.run
    async def run(self, backfill_config: BackfillConfig) -> BackfillResult:
        partitions = generate_partitions(
            backfill_config.start_date,
            backfill_config.end_date,
            backfill_config.partition_size,
        )
 
        results = []
        for partition in partitions:
            result = await workflow.execute_activity(
                process_partition,
                partition,
                retry_policy=RetryPolicy(
                    maximum_attempts=3,
                    initial_interval=timedelta(seconds=30),
                    maximum_interval=timedelta(minutes=10),
                    backoff_coefficient=2.0,
                ),
                start_to_close_timeout=timedelta(hours=1),
            )
            results.append(result)
 
            # Report progress
            self.progress = len(results) / len(partitions)
 
        return BackfillResult(
            partitions_processed=len(results),
            total_rows=sum(r.rows for r in results),
        )

Schema Migration Workflow

Coordinates schema changes across multiple systems:

@workflow.defn
class SchemaMigrationWorkflow:
    """Coordinates schema migration with rollback capability."""
 
    @workflow.run
    async def run(self, migration: MigrationConfig) -> MigrationResult:
        # 1. Validate migration
        validation = await workflow.execute_activity(validate_migration, migration)
 
        # 2. Create backup
        backup = await workflow.execute_activity(create_backup, migration.table)
 
        # 3. Apply migration
        try:
            result = await workflow.execute_activity(apply_migration, migration)
        except Exception as e:
            # 4. Rollback on failure
            await workflow.execute_activity(restore_backup, backup)
            raise
 
        # 5. Validate post-migration
        post_validation = await workflow.execute_activity(
            validate_post_migration, migration
        )
 
        if not post_validation.passed:
            await workflow.execute_activity(restore_backup, backup)
            return MigrationResult(status="ROLLED_BACK")
 
        return MigrationResult(status="COMPLETED")

Activity Definitions

Activities are the atomic units of work in Temporal workflows:

Pipeline Activities

ActivityDescriptionTimeout
extract_dataExtract data from source via Spark or JDBC1 hour
validate_qualityRun quality checks on extracted data30 minutes
trigger_airflow_dagTrigger and wait for Airflow DAG completion2 hours
load_to_icebergWrite data to Iceberg table1 hour
process_partitionProcess a single partition during backfill1 hour

Notification Activities

# data-plane/pipeline-service/src/matih_pipeline/activities/notification_activity.py
 
@activity.defn
async def notify_quality_issue(quality_result: QualityResult) -> None:
    """Send quality issue notification to stakeholders."""
    # Send to Slack, email, or PagerDuty based on severity
 
@activity.defn
async def notify_pipeline_completion(result: PipelineResult) -> None:
    """Send pipeline completion notification."""
 
@activity.defn
async def notify_failure(error: str, pipeline_id: str) -> None:
    """Send failure notification with error details."""

Infrastructure Activities

ActivityDescription
create_backupCreate data backup before destructive operations
restore_backupRestore from backup on failure
validate_migrationPre-flight checks for schema migration
apply_migrationExecute schema migration DDL
compact_iceberg_tableTrigger Iceberg file compaction

Retry Policies

Temporal provides configurable retry policies per activity:

default_retry_policy = RetryPolicy(
    initial_interval=timedelta(seconds=10),
    backoff_coefficient=2.0,
    maximum_interval=timedelta(minutes=5),
    maximum_attempts=5,
    non_retryable_error_types=[
        "ValidationError",
        "PermissionDeniedError",
        "SchemaConflictError",
    ],
)

Retry Strategies by Activity Type

Activity TypeMax AttemptsInitial IntervalMax IntervalBackoff
Data extraction330s10min2.0x
Quality validation210s5min2.0x
Airflow DAG trigger360s15min2.0x
Data loading330s10min2.0x
Notifications55s1min1.5x
Backup/Restore260s5min2.0x

Workflow Monitoring

Workflow Status API

GET /v1/pipelines/workflows/{workflowId}/status

Response:
{
  "workflowId": "wf-daily-sales-etl-2026-02-12",
  "runId": "run-456",
  "status": "RUNNING",
  "startedAt": "2026-02-12T06:00:00Z",
  "currentActivity": "trigger_airflow_dag",
  "progress": 0.6,
  "activities": [
    {"name": "extract_data", "status": "COMPLETED", "duration": "5m 23s"},
    {"name": "validate_quality", "status": "COMPLETED", "duration": "1m 12s"},
    {"name": "trigger_airflow_dag", "status": "RUNNING", "startedAt": "2026-02-12T06:07:00Z"}
  ],
  "retries": 0,
  "pendingSignals": []
}

Metrics

MetricTypeDescription
temporal.workflow.startedCounterWorkflows started
temporal.workflow.completedCounterWorkflows completed successfully
temporal.workflow.failedCounterWorkflows failed
temporal.workflow.durationHistogramWorkflow execution duration
temporal.activity.executionTimerPer-activity execution time
temporal.activity.retriesCounterActivity retry count

Temporal Server Configuration

PropertyValue
Namespacematih-pipelines
PersistencePostgreSQL
VisibilityElasticsearch
Frontend port7233
Worker concurrency10 (per worker pod)
Workflow timeout24 hours (default)
Activity timeoutConfigurable per activity

Integration with Pipeline Service

The Pipeline Service exposes Temporal workflows through its API:

# Pipeline API routes for Temporal workflows
 
@router.post("/pipelines/{pipeline_id}/execute")
async def execute_pipeline(pipeline_id: str, params: ExecutionParams):
    # Determine engine: if workflow orchestration needed -> Temporal
    if pipeline.requires_orchestration:
        workflow_id = await temporal_client.start_workflow(
            DataPipelineWorkflow.run,
            pipeline_config,
            id=f"pipeline-{pipeline_id}-{datetime.utcnow().isoformat()}",
            task_queue="pipeline-workers",
        )
        return {"workflowId": workflow_id, "status": "STARTED"}
 
@router.post("/pipelines/workflows/{workflow_id}/signal")
async def signal_workflow(workflow_id: str, signal: WorkflowSignal):
    # Send approval or cancellation signals to running workflows
    await temporal_client.get_workflow_handle(workflow_id).signal(signal.name, signal.data)

Related Sections

  • Pipeline Service -- Service architecture and API routes
  • Airflow -- DAG execution triggered by Temporal activities
  • Spark -- Spark jobs executed as Temporal activities
  • API Reference -- Workflow management endpoints