API Reference
This section provides the complete REST API documentation for the Pipeline Service. All endpoints require a valid JWT token in the Authorization header and the X-Tenant-ID header for tenant context.
Base URL
http://pipeline-service.matih-data-plane.svc.cluster.local:8000/v1Authentication
All endpoints require:
| Header | Required | Description |
|---|---|---|
Authorization | Yes | Bearer JWT token |
X-Tenant-ID | Yes | UUID of the tenant |
Pipeline Management
Create Pipeline
POST /v1/pipelinesRequest Body:
{
"name": "daily-sales-etl",
"version": "1.0.0",
"description": "Daily sales data extraction and loading",
"definition": {
"sources": {...},
"transformations": [...],
"sinks": {...},
"quality_checks": [...],
"orchestration": {...}
},
"tags": ["sales", "daily", "etl"]
}Response (201 Created):
{
"pipelineId": "pipeline-456",
"name": "daily-sales-etl",
"version": "1.0.0",
"status": "CREATED",
"createdAt": "2026-02-12T10:30:00Z"
}List Pipelines
GET /v1/pipelines?tags=sales&status=ACTIVE&page=0&size=20Get Pipeline
GET /v1/pipelines/{pipelineId}Update Pipeline
PUT /v1/pipelines/{pipelineId}Delete Pipeline
DELETE /v1/pipelines/{pipelineId}Pipeline Execution
Execute Pipeline
POST /v1/pipelines/{pipelineId}/executeRequest Body:
{
"parameters": {
"start_date": "2026-02-01",
"end_date": "2026-02-12"
},
"engine_override": null,
"priority": "NORMAL"
}Response (202 Accepted):
{
"executionId": "exec-789",
"pipelineId": "pipeline-456",
"status": "QUEUED",
"engine": "airflow",
"estimatedDurationMs": 300000,
"submittedAt": "2026-02-12T10:30:00Z"
}Get Execution Status
GET /v1/pipelines/executions/{executionId}Response (200 OK):
{
"executionId": "exec-789",
"pipelineId": "pipeline-456",
"status": "RUNNING",
"engine": "airflow",
"progress": {
"totalTasks": 5,
"completedTasks": 3,
"currentTask": "enrich_with_products",
"percentage": 60
},
"metrics": {
"rowsProcessed": 1234567,
"bytesProcessed": 536870912,
"elapsedMs": 120000
},
"startedAt": "2026-02-12T10:30:05Z"
}List Executions
GET /v1/pipelines/executions?pipelineId={id}&status=RUNNING&page=0&size=20Cancel Execution
POST /v1/pipelines/executions/{executionId}/cancelGet Execution Logs
GET /v1/pipelines/executions/{executionId}/logs?task={taskName}&lines=100Scheduling
Create Schedule
POST /v1/pipelines/schedulesRequest Body:
{
"pipelineId": "pipeline-456",
"cronExpression": "0 6 * * *",
"timezone": "America/New_York",
"parameters": {
"lookback_days": 1
},
"enabled": true
}List Schedules
GET /v1/pipelines/schedules?pipelineId={id}Update Schedule
PUT /v1/pipelines/schedules/{scheduleId}Delete Schedule
DELETE /v1/pipelines/schedules/{scheduleId}Trigger Schedule
POST /v1/pipelines/schedules/{scheduleId}/triggerConnection Management
Create Connection
POST /v1/pipelines/connectionsRequest Body:
{
"name": "sales-database",
"type": "jdbc",
"host": "sales-db.internal",
"port": 5432,
"database": "sales",
"credentialSecretRef": "sales-db-credentials",
"properties": {
"sslMode": "require",
"connectionTimeout": 10000
}
}List Connections
GET /v1/pipelines/connectionsTest Connection
POST /v1/pipelines/connections/{connectionId}/testResponse (200 OK):
{
"connected": true,
"latencyMs": 15,
"version": "PostgreSQL 16.2",
"details": {
"schemas": ["public", "staging"],
"tables": 45
}
}Delete Connection
DELETE /v1/pipelines/connections/{connectionId}Airflow Management
List DAGs
GET /v1/airflow/dagsGet DAG Runs
GET /v1/airflow/dags/{dagId}/runs?limit=25Get Task Instances
GET /v1/airflow/dags/{dagId}/runs/{runId}/tasksGet Task Logs
GET /v1/airflow/dags/{dagId}/runs/{runId}/tasks/{taskId}/logsTrigger DAG
POST /v1/airflow/dags/{dagId}/triggerRequest Body:
{
"conf": {
"start_date": "2026-02-01",
"end_date": "2026-02-12"
}
}Pause/Unpause DAG
POST /v1/airflow/dags/{dagId}/pause
POST /v1/airflow/dags/{dagId}/unpauseTemplate Management
List Templates
GET /v1/pipelines/templates?industry=fintechResponse (200 OK):
{
"templates": [
{
"id": "fintech/payment_processing",
"name": "Payment Processing Pipeline",
"industry": "fintech",
"version": "1.0.0",
"description": "Real-time payment processing with fraud detection",
"compliance": ["PCI-DSS", "SOX"],
"parameters": [
{"name": "SCHEMA_REGISTRY_URL", "required": true}
]
}
]
}Get Template
GET /v1/pipelines/templates/{templateId}Create Pipeline from Template
POST /v1/pipelines/templates/{templateId}/createRequest Body:
{
"pipelineName": "acme-payment-processing",
"parameters": {
"SCHEMA_REGISTRY_URL": "http://schema-registry:8081"
},
"customizations": {
"sources.payment_gateway.partitions": 128
}
}Monitoring
Get Pipeline Metrics
GET /v1/pipelines/monitoring/metrics?pipelineId={id}&period=7dResponse (200 OK):
{
"pipelineId": "pipeline-456",
"period": "7d",
"metrics": {
"executionCount": 7,
"successRate": 1.0,
"avgDurationMs": 285000,
"totalRowsProcessed": 8765432,
"totalBytesProcessed": 12345678900,
"totalCostUsd": 3.45
},
"history": [
{"date": "2026-02-11", "status": "COMPLETED", "durationMs": 290000, "rows": 1250000},
{"date": "2026-02-12", "status": "COMPLETED", "durationMs": 275000, "rows": 1234567}
]
}Get Alerts
GET /v1/pipelines/monitoring/alerts?severity=CRITICAL&status=OPENAcknowledge Alert
POST /v1/pipelines/monitoring/alerts/{alertId}/acknowledgeWorkflow Management (Temporal)
Get Workflow Status
GET /v1/pipelines/workflows/{workflowId}/statusSignal Workflow
POST /v1/pipelines/workflows/{workflowId}/signalRequest Body:
{
"signalName": "approval",
"data": {
"approved": true,
"approvedBy": "data-steward@acme.com",
"reason": "Quality check review completed"
}
}Cancel Workflow
POST /v1/pipelines/workflows/{workflowId}/cancelLineage
Get Pipeline Lineage
GET /v1/pipelines/{pipelineId}/lineageResponse (200 OK):
{
"pipelineId": "pipeline-456",
"sources": [
{"type": "TABLE", "fqn": "sales_db.public.raw_transactions"}
],
"sinks": [
{"type": "TABLE", "fqn": "analytics.sales.daily_transactions"}
],
"transformations": [
{"name": "clean_transactions", "type": "sql_transform"},
{"name": "enrich_with_products", "type": "lookup_join"}
],
"lastExecuted": "2026-02-12T06:00:00Z"
}Cost Tracking
Get Execution Cost
GET /v1/pipelines/executions/{executionId}/costGet Pipeline Cost Summary
GET /v1/pipelines/{pipelineId}/cost?period=30dResponse (200 OK):
{
"pipelineId": "pipeline-456",
"period": "30d",
"totalCostUsd": 14.50,
"breakdown": {
"compute": 10.20,
"storage": 3.10,
"network": 1.20
},
"costPerExecution": 0.48,
"trend": "stable"
}Common Response Codes
| Status | Description |
|---|---|
| 200 | Success |
| 201 | Created |
| 202 | Accepted (async execution started) |
| 400 | Bad request (validation error) |
| 401 | Unauthorized (invalid JWT) |
| 403 | Forbidden (insufficient permissions) |
| 404 | Pipeline or execution not found |
| 409 | Conflict (pipeline already running) |
| 429 | Rate limited |
| 500 | Internal server error |