MATIH Platform is in active MVP development. Documentation reflects current implementation status.
11. Pipelines & Data Engineering
API Reference

API Reference

This section provides the complete REST API documentation for the Pipeline Service. All endpoints require a valid JWT token in the Authorization header and the X-Tenant-ID header for tenant context.


Base URL

http://pipeline-service.matih-data-plane.svc.cluster.local:8000/v1

Authentication

All endpoints require:

HeaderRequiredDescription
AuthorizationYesBearer JWT token
X-Tenant-IDYesUUID of the tenant

Pipeline Management

Create Pipeline

POST /v1/pipelines

Request Body:

{
  "name": "daily-sales-etl",
  "version": "1.0.0",
  "description": "Daily sales data extraction and loading",
  "definition": {
    "sources": {...},
    "transformations": [...],
    "sinks": {...},
    "quality_checks": [...],
    "orchestration": {...}
  },
  "tags": ["sales", "daily", "etl"]
}

Response (201 Created):

{
  "pipelineId": "pipeline-456",
  "name": "daily-sales-etl",
  "version": "1.0.0",
  "status": "CREATED",
  "createdAt": "2026-02-12T10:30:00Z"
}

List Pipelines

GET /v1/pipelines?tags=sales&status=ACTIVE&page=0&size=20

Get Pipeline

GET /v1/pipelines/{pipelineId}

Update Pipeline

PUT /v1/pipelines/{pipelineId}

Delete Pipeline

DELETE /v1/pipelines/{pipelineId}

Pipeline Execution

Execute Pipeline

POST /v1/pipelines/{pipelineId}/execute

Request Body:

{
  "parameters": {
    "start_date": "2026-02-01",
    "end_date": "2026-02-12"
  },
  "engine_override": null,
  "priority": "NORMAL"
}

Response (202 Accepted):

{
  "executionId": "exec-789",
  "pipelineId": "pipeline-456",
  "status": "QUEUED",
  "engine": "airflow",
  "estimatedDurationMs": 300000,
  "submittedAt": "2026-02-12T10:30:00Z"
}

Get Execution Status

GET /v1/pipelines/executions/{executionId}

Response (200 OK):

{
  "executionId": "exec-789",
  "pipelineId": "pipeline-456",
  "status": "RUNNING",
  "engine": "airflow",
  "progress": {
    "totalTasks": 5,
    "completedTasks": 3,
    "currentTask": "enrich_with_products",
    "percentage": 60
  },
  "metrics": {
    "rowsProcessed": 1234567,
    "bytesProcessed": 536870912,
    "elapsedMs": 120000
  },
  "startedAt": "2026-02-12T10:30:05Z"
}

List Executions

GET /v1/pipelines/executions?pipelineId={id}&status=RUNNING&page=0&size=20

Cancel Execution

POST /v1/pipelines/executions/{executionId}/cancel

Get Execution Logs

GET /v1/pipelines/executions/{executionId}/logs?task={taskName}&lines=100

Scheduling

Create Schedule

POST /v1/pipelines/schedules

Request Body:

{
  "pipelineId": "pipeline-456",
  "cronExpression": "0 6 * * *",
  "timezone": "America/New_York",
  "parameters": {
    "lookback_days": 1
  },
  "enabled": true
}

List Schedules

GET /v1/pipelines/schedules?pipelineId={id}

Update Schedule

PUT /v1/pipelines/schedules/{scheduleId}

Delete Schedule

DELETE /v1/pipelines/schedules/{scheduleId}

Trigger Schedule

POST /v1/pipelines/schedules/{scheduleId}/trigger

Connection Management

Create Connection

POST /v1/pipelines/connections

Request Body:

{
  "name": "sales-database",
  "type": "jdbc",
  "host": "sales-db.internal",
  "port": 5432,
  "database": "sales",
  "credentialSecretRef": "sales-db-credentials",
  "properties": {
    "sslMode": "require",
    "connectionTimeout": 10000
  }
}

List Connections

GET /v1/pipelines/connections

Test Connection

POST /v1/pipelines/connections/{connectionId}/test

Response (200 OK):

{
  "connected": true,
  "latencyMs": 15,
  "version": "PostgreSQL 16.2",
  "details": {
    "schemas": ["public", "staging"],
    "tables": 45
  }
}

Delete Connection

DELETE /v1/pipelines/connections/{connectionId}

Airflow Management

List DAGs

GET /v1/airflow/dags

Get DAG Runs

GET /v1/airflow/dags/{dagId}/runs?limit=25

Get Task Instances

GET /v1/airflow/dags/{dagId}/runs/{runId}/tasks

Get Task Logs

GET /v1/airflow/dags/{dagId}/runs/{runId}/tasks/{taskId}/logs

Trigger DAG

POST /v1/airflow/dags/{dagId}/trigger

Request Body:

{
  "conf": {
    "start_date": "2026-02-01",
    "end_date": "2026-02-12"
  }
}

Pause/Unpause DAG

POST /v1/airflow/dags/{dagId}/pause
POST /v1/airflow/dags/{dagId}/unpause

Template Management

List Templates

GET /v1/pipelines/templates?industry=fintech

Response (200 OK):

{
  "templates": [
    {
      "id": "fintech/payment_processing",
      "name": "Payment Processing Pipeline",
      "industry": "fintech",
      "version": "1.0.0",
      "description": "Real-time payment processing with fraud detection",
      "compliance": ["PCI-DSS", "SOX"],
      "parameters": [
        {"name": "SCHEMA_REGISTRY_URL", "required": true}
      ]
    }
  ]
}

Get Template

GET /v1/pipelines/templates/{templateId}

Create Pipeline from Template

POST /v1/pipelines/templates/{templateId}/create

Request Body:

{
  "pipelineName": "acme-payment-processing",
  "parameters": {
    "SCHEMA_REGISTRY_URL": "http://schema-registry:8081"
  },
  "customizations": {
    "sources.payment_gateway.partitions": 128
  }
}

Monitoring

Get Pipeline Metrics

GET /v1/pipelines/monitoring/metrics?pipelineId={id}&period=7d

Response (200 OK):

{
  "pipelineId": "pipeline-456",
  "period": "7d",
  "metrics": {
    "executionCount": 7,
    "successRate": 1.0,
    "avgDurationMs": 285000,
    "totalRowsProcessed": 8765432,
    "totalBytesProcessed": 12345678900,
    "totalCostUsd": 3.45
  },
  "history": [
    {"date": "2026-02-11", "status": "COMPLETED", "durationMs": 290000, "rows": 1250000},
    {"date": "2026-02-12", "status": "COMPLETED", "durationMs": 275000, "rows": 1234567}
  ]
}

Get Alerts

GET /v1/pipelines/monitoring/alerts?severity=CRITICAL&status=OPEN

Acknowledge Alert

POST /v1/pipelines/monitoring/alerts/{alertId}/acknowledge

Workflow Management (Temporal)

Get Workflow Status

GET /v1/pipelines/workflows/{workflowId}/status

Signal Workflow

POST /v1/pipelines/workflows/{workflowId}/signal

Request Body:

{
  "signalName": "approval",
  "data": {
    "approved": true,
    "approvedBy": "data-steward@acme.com",
    "reason": "Quality check review completed"
  }
}

Cancel Workflow

POST /v1/pipelines/workflows/{workflowId}/cancel

Lineage

Get Pipeline Lineage

GET /v1/pipelines/{pipelineId}/lineage

Response (200 OK):

{
  "pipelineId": "pipeline-456",
  "sources": [
    {"type": "TABLE", "fqn": "sales_db.public.raw_transactions"}
  ],
  "sinks": [
    {"type": "TABLE", "fqn": "analytics.sales.daily_transactions"}
  ],
  "transformations": [
    {"name": "clean_transactions", "type": "sql_transform"},
    {"name": "enrich_with_products", "type": "lookup_join"}
  ],
  "lastExecuted": "2026-02-12T06:00:00Z"
}

Cost Tracking

Get Execution Cost

GET /v1/pipelines/executions/{executionId}/cost

Get Pipeline Cost Summary

GET /v1/pipelines/{pipelineId}/cost?period=30d

Response (200 OK):

{
  "pipelineId": "pipeline-456",
  "period": "30d",
  "totalCostUsd": 14.50,
  "breakdown": {
    "compute": 10.20,
    "storage": 3.10,
    "network": 1.20
  },
  "costPerExecution": 0.48,
  "trend": "stable"
}

Common Response Codes

StatusDescription
200Success
201Created
202Accepted (async execution started)
400Bad request (validation error)
401Unauthorized (invalid JWT)
403Forbidden (insufficient permissions)
404Pipeline or execution not found
409Conflict (pipeline already running)
429Rate limited
500Internal server error

Related Sections