Airflow Integration
Apache Airflow serves as the primary batch orchestration engine in MATIH. The Pipeline Service generates Airflow DAGs from pipeline definitions, manages DAG lifecycle, and provides custom operators optimized for MATIH workloads. This section covers the Airflow integration architecture, custom operator library, DAG synchronization, the Airflow API service, and monitoring.
Integration Architecture
+-------------------+ DAG sync +-------------------+
| Pipeline Service | -------------> | Airflow Scheduler |
| (DagSyncService) | | (DAG Directory) |
+-------------------+ +--------+----------+
|
+--------v----------+
| Airflow Workers |
| (Execute tasks) |
+--------+----------+
|
+---------------+---------------+
| | |
+---------v--+ +--------v---+ +--------v---+
| DB Extract | | SQL Xform | | Delta Load |
| Operator | | Operator | | Operator |
+------------+ +------------+ +------------+Airflow Deployment
Airflow is deployed within the matih-data-plane namespace using the official Apache Airflow Helm chart:
| Component | Configuration |
|---|---|
| Executor | KubernetesExecutor |
| DAG persistence | Shared PVC mounted to scheduler and workers |
| Database | PostgreSQL (shared with data plane) |
| Log storage | Object storage (S3/GCS/Azure Blob) |
| Authentication | MATIH JWT integration via custom Flask-AppBuilder security manager |
| Connections | Synced from Pipeline Service connection manager |
Custom Operators
The Pipeline Service provides eight custom Airflow operators:
DatabaseExtractOperator
Extracts data from relational databases via JDBC:
# data-plane/pipeline-service/src/matih_pipeline/operators/database_extract.py
class DatabaseExtractOperator(BaseOperator):
"""Extract data from a relational database."""
def __init__(self, connection_id: str, sql: str, output_path: str,
format: str = "parquet", partition_column: str = None,
chunk_size: int = 100000, **kwargs):
super().__init__(**kwargs)
self.connection_id = connection_id
self.sql = sql
self.output_path = output_path
self.format = format
self.partition_column = partition_column
self.chunk_size = chunk_size
def execute(self, context):
# 1. Resolve connection credentials from K8s Secret
# 2. Open JDBC connection
# 3. Execute query in chunks
# 4. Write each chunk to output path in specified format
# 5. Emit OpenLineage event
# 6. Return metadata (row count, file paths)ApiExtractOperator
Extracts data from REST APIs with pagination support:
class ApiExtractOperator(BaseOperator):
"""Extract data from REST APIs with pagination."""
def __init__(self, endpoint: str, method: str = "GET",
pagination: dict = None, auth: dict = None,
output_path: str = None, **kwargs):
# Supports offset, cursor, and link-based pagination
# Supports API key, OAuth2, and Bearer token authenticationCloudStorageExtractOperator
Extracts files from object storage:
class CloudStorageExtractOperator(BaseOperator):
"""Extract files from S3/GCS/Azure Blob Storage."""
def __init__(self, bucket: str, prefix: str, file_pattern: str = "*",
output_path: str = None, format: str = "auto", **kwargs):
# Supports recursive listing, pattern matching, format detection
# Handles Parquet, CSV, JSON, Avro, ORC formatsKafkaExtractOperator
Consumes messages from Kafka topics:
class KafkaExtractOperator(BaseOperator):
"""Consume messages from a Kafka topic."""
def __init__(self, topic: str, bootstrap_servers: str,
group_id: str, max_messages: int = 100000,
output_path: str = None, **kwargs):
# Batch consumption with configurable limits
# Schema Registry integration for Avro deserializationSqlTransformOperator
Executes SQL transformations:
class SqlTransformOperator(BaseOperator):
"""Execute SQL transformations."""
def __init__(self, sql: str, connection_id: str = None,
engine: str = "trino", output_table: str = None, **kwargs):
# Routes SQL to Trino, ClickHouse, or direct database connection
# Supports Jinja templating for dynamic SQLDbtTransformOperator
Executes dbt models:
class DbtTransformOperator(BaseOperator):
"""Execute dbt models within a pipeline."""
def __init__(self, project_dir: str, models: list = None,
select: str = None, exclude: str = None,
full_refresh: bool = False, **kwargs):
# Runs dbt run, dbt test, dbt build
# Captures dbt manifest for lineageClickHouseLoadOperator and DeltaLoadOperator
Load data into analytics stores:
class ClickHouseLoadOperator(BaseOperator):
"""Bulk load data into ClickHouse."""
class DeltaLoadOperator(BaseOperator):
"""Write data to Delta Lake / Iceberg tables."""DAG Generation
The PipelineDagGenerator converts pipeline YAML into Airflow DAG Python code:
Generation Process
Pipeline YAML
|
v
[Parse YAML] -> Pipeline model
|
v
[Resolve connections] -> Connection credentials
|
v
[Map to operators] -> Operator instances
|
v
[Build dependency graph] -> Task ordering
|
v
[Generate Python DAG] -> dag_file.py
|
v
[Write to DAG directory] -> Airflow picks upGenerated DAG Structure
A typical generated DAG:
# Auto-generated by MATIH Pipeline Service
# Pipeline: daily-sales-etl v1.0.0
# Generated: 2026-02-12T10:30:00Z
from airflow import DAG
from datetime import datetime, timedelta
from matih_pipeline.operators.database_extract import DatabaseExtractOperator
from matih_pipeline.operators.sql_transform import SqlTransformOperator
from matih_pipeline.operators.delta_load import DeltaLoadOperator
default_args = {
"owner": "data-engineering-team",
"depends_on_past": False,
"email_on_failure": True,
"email": ["data-eng@acme.com"],
"retries": 3,
"retry_delay": timedelta(minutes=5),
}
with DAG(
dag_id="daily_sales_etl",
default_args=default_args,
schedule_interval="0 6 * * *",
start_date=datetime(2026, 1, 1),
catchup=False,
tags=["sales", "daily", "etl"],
) as dag:
extract = DatabaseExtractOperator(
task_id="extract_transactions",
connection_id="sales-db",
sql="SELECT * FROM raw_transactions WHERE updated_at > '{{ ds }}'",
output_path="s3://pipeline-staging/sales/{{ ds }}/",
)
transform = SqlTransformOperator(
task_id="clean_transactions",
sql="SELECT transaction_id, COALESCE(amount, 0) AS amount ... FROM {{ params.input }}",
engine="trino",
)
load = DeltaLoadOperator(
task_id="load_to_lake",
table="analytics.sales.daily_transactions",
mode="append",
partition_by=["date"],
)
extract >> transform >> loadAirflow API Service
The AirflowApiService provides a typed client for the Airflow REST API:
class AirflowApiService:
"""Client for Airflow REST API."""
async def trigger_dag(self, dag_id: str, conf: dict = None) -> DagRun:
"""Trigger a DAG run with optional configuration."""
async def get_dag_runs(self, dag_id: str, limit: int = 25) -> List[DagRun]:
"""Get recent DAG runs."""
async def get_task_instances(self, dag_id: str, run_id: str) -> List[TaskInstance]:
"""Get task instances for a DAG run."""
async def get_dag_status(self, dag_id: str) -> DagStatus:
"""Get current status of a DAG (active, paused, etc.)."""
async def pause_dag(self, dag_id: str) -> None:
"""Pause a DAG."""
async def unpause_dag(self, dag_id: str) -> None:
"""Unpause a DAG."""
async def get_task_logs(self, dag_id: str, run_id: str, task_id: str) -> str:
"""Get logs for a specific task instance."""DAG Monitoring
Airflow Monitoring Routes
GET /v1/airflow/dags
GET /v1/airflow/dags/{dagId}/runs
GET /v1/airflow/dags/{dagId}/runs/{runId}/tasks
GET /v1/airflow/dags/{dagId}/runs/{runId}/tasks/{taskId}/logs
POST /v1/airflow/dags/{dagId}/trigger
POST /v1/airflow/dags/{dagId}/pause
POST /v1/airflow/dags/{dagId}/unpauseMetrics
| Metric | Type | Description |
|---|---|---|
airflow.dag_runs.total | Counter | Total DAG runs |
airflow.dag_runs.duration | Histogram | DAG run duration |
airflow.dag_runs.status | Counter | Runs by status (success, failed, running) |
airflow.task_instances.duration | Histogram | Task execution duration |
airflow.task_instances.retries | Counter | Task retry count |
Connection Synchronization
Pipeline Service connections are synchronized to Airflow connections:
class ConnectionManagerService:
async def sync_to_airflow(self, connection: Connection):
"""Sync a Pipeline Service connection to an Airflow connection."""
airflow_conn = {
"connection_id": connection.id,
"conn_type": map_conn_type(connection.type),
"host": connection.host,
"port": connection.port,
"schema": connection.database,
"login": None, # Injected from K8s Secret at runtime
"password": None, # Injected from K8s Secret at runtime
"extra": json.dumps(connection.properties)
}
await self.airflow_api.create_connection(airflow_conn)Credentials are never stored in Airflow's connection database. They are injected at runtime from Kubernetes Secrets.
Related Sections
- Pipeline Service -- Service architecture and DAG generation
- Spark -- Spark jobs triggered from Airflow DAGs
- Templates -- Template-based DAG generation
- API Reference -- Airflow management endpoints