MATIH Platform is in active MVP development. Documentation reflects current implementation status.
11. Pipelines & Data Engineering
Airflow

Airflow Integration

Apache Airflow serves as the primary batch orchestration engine in MATIH. The Pipeline Service generates Airflow DAGs from pipeline definitions, manages DAG lifecycle, and provides custom operators optimized for MATIH workloads. This section covers the Airflow integration architecture, custom operator library, DAG synchronization, the Airflow API service, and monitoring.


Integration Architecture

+-------------------+    DAG sync     +-------------------+
| Pipeline Service  | -------------> | Airflow Scheduler |
| (DagSyncService)  |                | (DAG Directory)   |
+-------------------+                +--------+----------+
                                              |
                                     +--------v----------+
                                     | Airflow Workers   |
                                     | (Execute tasks)   |
                                     +--------+----------+
                                              |
                              +---------------+---------------+
                              |               |               |
                    +---------v--+   +--------v---+  +--------v---+
                    | DB Extract |   | SQL Xform  |   | Delta Load |
                    | Operator   |   | Operator   |   | Operator   |
                    +------------+   +------------+   +------------+

Airflow Deployment

Airflow is deployed within the matih-data-plane namespace using the official Apache Airflow Helm chart:

ComponentConfiguration
ExecutorKubernetesExecutor
DAG persistenceShared PVC mounted to scheduler and workers
DatabasePostgreSQL (shared with data plane)
Log storageObject storage (S3/GCS/Azure Blob)
AuthenticationMATIH JWT integration via custom Flask-AppBuilder security manager
ConnectionsSynced from Pipeline Service connection manager

Custom Operators

The Pipeline Service provides eight custom Airflow operators:

DatabaseExtractOperator

Extracts data from relational databases via JDBC:

# data-plane/pipeline-service/src/matih_pipeline/operators/database_extract.py
 
class DatabaseExtractOperator(BaseOperator):
    """Extract data from a relational database."""
 
    def __init__(self, connection_id: str, sql: str, output_path: str,
                 format: str = "parquet", partition_column: str = None,
                 chunk_size: int = 100000, **kwargs):
        super().__init__(**kwargs)
        self.connection_id = connection_id
        self.sql = sql
        self.output_path = output_path
        self.format = format
        self.partition_column = partition_column
        self.chunk_size = chunk_size
 
    def execute(self, context):
        # 1. Resolve connection credentials from K8s Secret
        # 2. Open JDBC connection
        # 3. Execute query in chunks
        # 4. Write each chunk to output path in specified format
        # 5. Emit OpenLineage event
        # 6. Return metadata (row count, file paths)

ApiExtractOperator

Extracts data from REST APIs with pagination support:

class ApiExtractOperator(BaseOperator):
    """Extract data from REST APIs with pagination."""
 
    def __init__(self, endpoint: str, method: str = "GET",
                 pagination: dict = None, auth: dict = None,
                 output_path: str = None, **kwargs):
        # Supports offset, cursor, and link-based pagination
        # Supports API key, OAuth2, and Bearer token authentication

CloudStorageExtractOperator

Extracts files from object storage:

class CloudStorageExtractOperator(BaseOperator):
    """Extract files from S3/GCS/Azure Blob Storage."""
 
    def __init__(self, bucket: str, prefix: str, file_pattern: str = "*",
                 output_path: str = None, format: str = "auto", **kwargs):
        # Supports recursive listing, pattern matching, format detection
        # Handles Parquet, CSV, JSON, Avro, ORC formats

KafkaExtractOperator

Consumes messages from Kafka topics:

class KafkaExtractOperator(BaseOperator):
    """Consume messages from a Kafka topic."""
 
    def __init__(self, topic: str, bootstrap_servers: str,
                 group_id: str, max_messages: int = 100000,
                 output_path: str = None, **kwargs):
        # Batch consumption with configurable limits
        # Schema Registry integration for Avro deserialization

SqlTransformOperator

Executes SQL transformations:

class SqlTransformOperator(BaseOperator):
    """Execute SQL transformations."""
 
    def __init__(self, sql: str, connection_id: str = None,
                 engine: str = "trino", output_table: str = None, **kwargs):
        # Routes SQL to Trino, ClickHouse, or direct database connection
        # Supports Jinja templating for dynamic SQL

DbtTransformOperator

Executes dbt models:

class DbtTransformOperator(BaseOperator):
    """Execute dbt models within a pipeline."""
 
    def __init__(self, project_dir: str, models: list = None,
                 select: str = None, exclude: str = None,
                 full_refresh: bool = False, **kwargs):
        # Runs dbt run, dbt test, dbt build
        # Captures dbt manifest for lineage

ClickHouseLoadOperator and DeltaLoadOperator

Load data into analytics stores:

class ClickHouseLoadOperator(BaseOperator):
    """Bulk load data into ClickHouse."""
 
class DeltaLoadOperator(BaseOperator):
    """Write data to Delta Lake / Iceberg tables."""

DAG Generation

The PipelineDagGenerator converts pipeline YAML into Airflow DAG Python code:

Generation Process

Pipeline YAML
    |
    v
[Parse YAML] -> Pipeline model
    |
    v
[Resolve connections] -> Connection credentials
    |
    v
[Map to operators] -> Operator instances
    |
    v
[Build dependency graph] -> Task ordering
    |
    v
[Generate Python DAG] -> dag_file.py
    |
    v
[Write to DAG directory] -> Airflow picks up

Generated DAG Structure

A typical generated DAG:

# Auto-generated by MATIH Pipeline Service
# Pipeline: daily-sales-etl v1.0.0
# Generated: 2026-02-12T10:30:00Z
 
from airflow import DAG
from datetime import datetime, timedelta
from matih_pipeline.operators.database_extract import DatabaseExtractOperator
from matih_pipeline.operators.sql_transform import SqlTransformOperator
from matih_pipeline.operators.delta_load import DeltaLoadOperator
 
default_args = {
    "owner": "data-engineering-team",
    "depends_on_past": False,
    "email_on_failure": True,
    "email": ["data-eng@acme.com"],
    "retries": 3,
    "retry_delay": timedelta(minutes=5),
}
 
with DAG(
    dag_id="daily_sales_etl",
    default_args=default_args,
    schedule_interval="0 6 * * *",
    start_date=datetime(2026, 1, 1),
    catchup=False,
    tags=["sales", "daily", "etl"],
) as dag:
 
    extract = DatabaseExtractOperator(
        task_id="extract_transactions",
        connection_id="sales-db",
        sql="SELECT * FROM raw_transactions WHERE updated_at > '{{ ds }}'",
        output_path="s3://pipeline-staging/sales/{{ ds }}/",
    )
 
    transform = SqlTransformOperator(
        task_id="clean_transactions",
        sql="SELECT transaction_id, COALESCE(amount, 0) AS amount ... FROM {{ params.input }}",
        engine="trino",
    )
 
    load = DeltaLoadOperator(
        task_id="load_to_lake",
        table="analytics.sales.daily_transactions",
        mode="append",
        partition_by=["date"],
    )
 
    extract >> transform >> load

Airflow API Service

The AirflowApiService provides a typed client for the Airflow REST API:

class AirflowApiService:
    """Client for Airflow REST API."""
 
    async def trigger_dag(self, dag_id: str, conf: dict = None) -> DagRun:
        """Trigger a DAG run with optional configuration."""
 
    async def get_dag_runs(self, dag_id: str, limit: int = 25) -> List[DagRun]:
        """Get recent DAG runs."""
 
    async def get_task_instances(self, dag_id: str, run_id: str) -> List[TaskInstance]:
        """Get task instances for a DAG run."""
 
    async def get_dag_status(self, dag_id: str) -> DagStatus:
        """Get current status of a DAG (active, paused, etc.)."""
 
    async def pause_dag(self, dag_id: str) -> None:
        """Pause a DAG."""
 
    async def unpause_dag(self, dag_id: str) -> None:
        """Unpause a DAG."""
 
    async def get_task_logs(self, dag_id: str, run_id: str, task_id: str) -> str:
        """Get logs for a specific task instance."""

DAG Monitoring

Airflow Monitoring Routes

GET /v1/airflow/dags
GET /v1/airflow/dags/{dagId}/runs
GET /v1/airflow/dags/{dagId}/runs/{runId}/tasks
GET /v1/airflow/dags/{dagId}/runs/{runId}/tasks/{taskId}/logs
POST /v1/airflow/dags/{dagId}/trigger
POST /v1/airflow/dags/{dagId}/pause
POST /v1/airflow/dags/{dagId}/unpause

Metrics

MetricTypeDescription
airflow.dag_runs.totalCounterTotal DAG runs
airflow.dag_runs.durationHistogramDAG run duration
airflow.dag_runs.statusCounterRuns by status (success, failed, running)
airflow.task_instances.durationHistogramTask execution duration
airflow.task_instances.retriesCounterTask retry count

Connection Synchronization

Pipeline Service connections are synchronized to Airflow connections:

class ConnectionManagerService:
 
    async def sync_to_airflow(self, connection: Connection):
        """Sync a Pipeline Service connection to an Airflow connection."""
        airflow_conn = {
            "connection_id": connection.id,
            "conn_type": map_conn_type(connection.type),
            "host": connection.host,
            "port": connection.port,
            "schema": connection.database,
            "login": None,  # Injected from K8s Secret at runtime
            "password": None,  # Injected from K8s Secret at runtime
            "extra": json.dumps(connection.properties)
        }
        await self.airflow_api.create_connection(airflow_conn)

Credentials are never stored in Airflow's connection database. They are injected at runtime from Kubernetes Secrets.


Related Sections