Pipeline Templates
MATIH provides a library of pre-built pipeline templates organized by industry vertical. Templates accelerate pipeline development by providing production-ready configurations for common data patterns including source definitions, transformation chains, quality checks, monitoring alerts, and orchestration settings.
Template Architecture
+-------------------+ Template API +-------------------+
| Pipeline Service | ------------------> | Template Library |
| (TemplateService) | | (YAML definitions)|
+-------------------+ +--------+----------+
|
+--------v----------+
| Template Renderer |
| (Jinja2 + Params) |
+--------+----------+
|
+--------v----------+
| Pipeline |
| Definition |
+-------------------+Template Library
Templates are stored in the templates/data/ directory, organized by industry vertical:
| Industry | Template | File |
|---|---|---|
| AdTech | Event Stream Processing | templates/data/adtech/event_stream.yaml |
| Biotech | Genomics Pipeline | templates/data/biotech/genomics_pipeline.yaml |
| Cybersecurity | Log Aggregation | templates/data/cybersecurity/log_aggregation.yaml |
| EdTech | Learning Analytics | templates/data/edtech/learning_analytics.yaml |
| Energy | Smart Meter Processing | templates/data/energy/smart_meter.yaml |
| FinTech | Payment Processing | templates/data/fintech/payment_processing.yaml |
| Hyperscalers | Multi-Region Sync | templates/data/hyperscalers/multi_region_sync.yaml |
| Robotics | Sensor Pipeline | templates/data/robotics/sensor_pipeline.yaml |
| Semiconductors | Fab Data Processing | templates/data/semiconductors/fab_data.yaml |
| Social Networks | Activity Stream | templates/data/social_networks/activity_stream.yaml |
Additionally, Spark job templates are available in templates/spark/:
| Template | File | Description |
|---|---|---|
| Spark Iceberg Job | templates/spark/spark-iceberg-job.yaml | SparkApplication CRD template for Iceberg ETL |
Template Structure
Each template follows a standardized YAML structure with six sections:
Metadata Section
metadata:
name: payment-processing-pipeline
version: "1.0.0"
industry: fintech
description: Real-time payment processing pipeline with fraud scoring and settlement
volume:
daily_transactions: 50M
peak_tps: 10000
latency_sla_ms: 100
compliance:
- PCI-DSS
- SOX
- AML/KYC| Field | Description |
|---|---|
name | Template identifier |
version | Semantic version of the template |
industry | Industry vertical |
description | Human-readable description |
volume | Expected data volume and throughput |
compliance | Applicable compliance frameworks |
Sources Section
Defines all input data sources with connection and extraction configuration:
sources:
payment_gateway:
type: kafka
format: avro
topic: payments.gateway.events
schema_registry: ${SCHEMA_REGISTRY_URL}
consumer_group: payment-processor
partitions: 64
properties:
auto.offset.reset: earliest
enable.auto.commit: false
merchant_master:
type: jdbc
format: delta
connection: ${MERCHANT_DB_URL}
table: merchants
mode: cdc
watermark_column: updated_atTransformations Section
Ordered list of transformation steps:
transformations:
- name: parse_transactions
type: schema_transform
input: payment_gateway
operations:
- extract_json_fields:
source_field: payload
fields: [transaction_id, card_token, amount, currency]
- mask_pii:
fields:
- card_token: "****-****-****-${last4}"
- cvv: "***"
- name: enrich_merchant
type: lookup_join
input: parsed_transactions
lookup: merchant_master
join_key: merchant_id
cache:
type: redis
ttl_seconds: 3600
- name: calculate_risk_score
type: ml_inference
input: enriched_transactions
model:
name: fraud_detection_v3
endpoint: ${ML_SERVING_URL}/predict
timeout_ms: 50
fallback_score: 0.5
features:
- amount
- merchant_category_code
- velocity_24h
- geo_distance_from_home
- name: apply_business_rules
type: rule_engine
input: scored_transactions
rules:
- name: high_value_review
condition: "amount > 10000 AND fraud_score > 0.3"
action: flag_for_review
- name: velocity_block
condition: "velocity_24h > 20 AND fraud_score > 0.7"
action: declineTransformation Types
| Type | Description | Use Case |
|---|---|---|
schema_transform | Field extraction, type casting, PII masking | Parsing raw events |
lookup_join | Enrich with dimension data via key lookup | Adding merchant details |
ml_inference | Call ML model for scoring | Fraud detection |
rule_engine | Apply business rules | Payment routing decisions |
window_aggregate | Time-windowed aggregations | Settlement calculation |
api_enrichment | Enrich with external API data | Currency conversion |
sql_transform | SQL-based transformation | Complex data manipulation |
deduplication | Remove duplicate records | Event dedup |
filter | Row filtering | Data partitioning |
Sinks Section
Output destinations:
sinks:
approved_transactions:
type: kafka
topic: payments.approved
format: avro
partitioning:
key: merchant_id
partitions: 32
transaction_lake:
type: delta
path: s3://data-lake/fintech/transactions/
partition_by: [year, month, day]
mode: append
schema_evolution: true
optimize:
z_order_by: [merchant_id, customer_id]
vacuum_hours: 168
settlement_files:
type: sftp
host: ${SETTLEMENT_SFTP_HOST}
path: /outgoing/settlements/
format: csv
schedule: "0 2 * * *"
metrics_sink:
type: prometheus
endpoint: ${PROMETHEUS_PUSHGATEWAY}
metrics:
- name: transactions_processed_total
type: counter
labels: [status, merchant_category]Quality Checks Section
Built-in quality validations:
quality_checks:
- name: amount_positive
type: column_check
column: amount
check: "amount > 0"
severity: critical
- name: valid_currency
type: allowed_values
column: currency
values: ["USD", "EUR", "GBP", "CAD", "AUD", "JPY"]
severity: warning
- name: transaction_freshness
type: freshness
column: timestamp
max_delay: 5 minutes
severity: critical
- name: duplicate_check
type: uniqueness
columns: [transaction_id]
window: 24 hours
severity: critical
- name: settlement_reconciliation
type: aggregate_match
source_aggregate:
table: approved_transactions
aggregation: "SUM(amount)"
group_by: [merchant_id, date]
target_aggregate:
table: settlements
aggregation: "SUM(total_amount)"
group_by: [merchant_id, date]
tolerance: 0.01
severity: criticalOrchestration Section
Execution engine configuration:
orchestration:
engine: temporal
workflow: payment_processing_workflow
schedules:
realtime_processing:
type: continuous
parallelism: 16
batch_settlements:
schedule: "0 2 * * *"
timeout: 2 hours
merchant_refresh:
schedule: "0 * * * *"
timeout: 30 minutes
retry_policy:
max_attempts: 3
initial_interval: 1s
backoff_coefficient: 2
max_interval: 60s
dead_letter:
topic: payments.dlq
retention_days: 30Template Service
The TemplateService manages the template library:
class TemplateService:
"""Manages pipeline templates."""
async def list_templates(self, industry: str = None) -> List[TemplateInfo]:
"""List available templates, optionally filtered by industry."""
async def get_template(self, template_id: str) -> TemplateDefinition:
"""Get a template definition."""
async def render_template(self, template_id: str, parameters: dict) -> PipelineDefinition:
"""Render a template with user-provided parameters."""
async def create_pipeline_from_template(self, template_id: str,
parameters: dict) -> Pipeline:
"""Create a new pipeline from a template."""Template Rendering
Templates use Jinja2-style parameter substitution:
# User provides parameters
parameters = {
"SCHEMA_REGISTRY_URL": "http://schema-registry:8081",
"MERCHANT_DB_URL": "jdbc:postgresql://merchant-db:5432/merchants",
"ML_SERVING_URL": "http://ml-service:8000",
"SETTLEMENT_SFTP_HOST": "sftp.settlement-provider.com",
"PROMETHEUS_PUSHGATEWAY": "http://prometheus-pushgateway:9091",
}
# Template service renders the template
pipeline = await template_service.render_template(
"fintech/payment_processing",
parameters
)Template API
List Templates
GET /v1/pipelines/templates?industry=fintech
Response:
{
"templates": [
{
"id": "fintech/payment_processing",
"name": "Payment Processing Pipeline",
"industry": "fintech",
"version": "1.0.0",
"description": "Real-time payment processing with fraud detection",
"compliance": ["PCI-DSS", "SOX"],
"parameters": [
{"name": "SCHEMA_REGISTRY_URL", "required": true, "description": "Schema Registry endpoint"},
{"name": "MERCHANT_DB_URL", "required": true, "description": "Merchant database JDBC URL"},
{"name": "ML_SERVING_URL", "required": true, "description": "ML serving endpoint"}
]
}
]
}Get Template Details
GET /v1/pipelines/templates/{templateId}Create Pipeline from Template
POST /v1/pipelines/templates/{templateId}/create
Request:
{
"pipelineName": "acme-payment-processing",
"parameters": {
"SCHEMA_REGISTRY_URL": "http://schema-registry:8081",
"MERCHANT_DB_URL": "jdbc:postgresql://merchant-db:5432/merchants"
},
"customizations": {
"sources.payment_gateway.partitions": 128,
"quality_checks[0].severity": "warning"
}
}Template Customization
Users can customize templates at creation time:
| Customization | Example | Scope |
|---|---|---|
| Parameter override | Change partition count | Source-level |
| Quality rule severity | Downgrade from critical to warning | Check-level |
| Schedule change | Change cron expression | Orchestration-level |
| Sink modification | Change output format | Sink-level |
| Add transformation | Insert additional transformation step | Pipeline-level |
| Remove step | Skip optional transformation | Pipeline-level |
Spark Job Templates
The templates/spark/ directory contains Spark-specific templates:
Spark Iceberg Job Template
The spark-iceberg-job.yaml provides a base template for Spark applications that interact with Iceberg tables via Polaris:
| Feature | Configuration |
|---|---|
| Catalog type | REST (Polaris) |
| Credential vending | Enabled via X-Iceberg-Access-Delegation header |
| Authentication | OAuth2 via Kubernetes Secret |
| Dynamic allocation | Min 1, max 10 executors |
| Event logging | S3-based event log storage |
| Restart policy | OnFailure with 3 retries |
Both Scala/Java and Python templates are provided:
# Scala/Java template
spec:
type: Scala
mainClass: org.apache.spark.examples.SparkPi
mainApplicationFile: "local:///opt/spark/examples/jars/spark-examples_2.12-3.5.0.jar"
# Python template
spec:
type: Python
mainApplicationFile: "local:///opt/spark/work-dir/etl_job.py"
pythonVersion: "3"Template SDK
The Pipeline Service includes a Python SDK for programmatic template interaction:
from matih_pipeline.sdk.models import PipelineTemplate, Source, Transformation, Sink
# Create a custom template programmatically
template = PipelineTemplate(
name="custom-etl",
version="1.0.0",
sources=[
Source(name="my_db", type="jdbc", connection="${DB_URL}"),
],
transformations=[
Transformation(name="clean", type="sql_transform", sql="SELECT ..."),
],
sinks=[
Sink(name="lake", type="iceberg", table="analytics.my_table"),
],
)
# Register template
await template_service.register_template(template)Related Sections
- Pipeline Service -- Template rendering in the pipeline architecture
- Airflow -- DAGs generated from templates
- Spark -- Spark job templates
- API Reference -- Template management endpoints