MATIH Platform is in active MVP development. Documentation reflects current implementation status.
11. Pipelines & Data Engineering
Templates

Pipeline Templates

MATIH provides a library of pre-built pipeline templates organized by industry vertical. Templates accelerate pipeline development by providing production-ready configurations for common data patterns including source definitions, transformation chains, quality checks, monitoring alerts, and orchestration settings.


Template Architecture

+-------------------+     Template API     +-------------------+
| Pipeline Service  | ------------------> | Template Library  |
| (TemplateService) |                     | (YAML definitions)|
+-------------------+                     +--------+----------+
                                                   |
                                          +--------v----------+
                                          | Template Renderer |
                                          | (Jinja2 + Params) |
                                          +--------+----------+
                                                   |
                                          +--------v----------+
                                          | Pipeline          |
                                          | Definition        |
                                          +-------------------+

Template Library

Templates are stored in the templates/data/ directory, organized by industry vertical:

IndustryTemplateFile
AdTechEvent Stream Processingtemplates/data/adtech/event_stream.yaml
BiotechGenomics Pipelinetemplates/data/biotech/genomics_pipeline.yaml
CybersecurityLog Aggregationtemplates/data/cybersecurity/log_aggregation.yaml
EdTechLearning Analyticstemplates/data/edtech/learning_analytics.yaml
EnergySmart Meter Processingtemplates/data/energy/smart_meter.yaml
FinTechPayment Processingtemplates/data/fintech/payment_processing.yaml
HyperscalersMulti-Region Synctemplates/data/hyperscalers/multi_region_sync.yaml
RoboticsSensor Pipelinetemplates/data/robotics/sensor_pipeline.yaml
SemiconductorsFab Data Processingtemplates/data/semiconductors/fab_data.yaml
Social NetworksActivity Streamtemplates/data/social_networks/activity_stream.yaml

Additionally, Spark job templates are available in templates/spark/:

TemplateFileDescription
Spark Iceberg Jobtemplates/spark/spark-iceberg-job.yamlSparkApplication CRD template for Iceberg ETL

Template Structure

Each template follows a standardized YAML structure with six sections:

Metadata Section

metadata:
  name: payment-processing-pipeline
  version: "1.0.0"
  industry: fintech
  description: Real-time payment processing pipeline with fraud scoring and settlement
  volume:
    daily_transactions: 50M
    peak_tps: 10000
    latency_sla_ms: 100
  compliance:
    - PCI-DSS
    - SOX
    - AML/KYC
FieldDescription
nameTemplate identifier
versionSemantic version of the template
industryIndustry vertical
descriptionHuman-readable description
volumeExpected data volume and throughput
complianceApplicable compliance frameworks

Sources Section

Defines all input data sources with connection and extraction configuration:

sources:
  payment_gateway:
    type: kafka
    format: avro
    topic: payments.gateway.events
    schema_registry: ${SCHEMA_REGISTRY_URL}
    consumer_group: payment-processor
    partitions: 64
    properties:
      auto.offset.reset: earliest
      enable.auto.commit: false
 
  merchant_master:
    type: jdbc
    format: delta
    connection: ${MERCHANT_DB_URL}
    table: merchants
    mode: cdc
    watermark_column: updated_at

Transformations Section

Ordered list of transformation steps:

transformations:
  - name: parse_transactions
    type: schema_transform
    input: payment_gateway
    operations:
      - extract_json_fields:
          source_field: payload
          fields: [transaction_id, card_token, amount, currency]
      - mask_pii:
          fields:
            - card_token: "****-****-****-${last4}"
            - cvv: "***"
 
  - name: enrich_merchant
    type: lookup_join
    input: parsed_transactions
    lookup: merchant_master
    join_key: merchant_id
    cache:
      type: redis
      ttl_seconds: 3600
 
  - name: calculate_risk_score
    type: ml_inference
    input: enriched_transactions
    model:
      name: fraud_detection_v3
      endpoint: ${ML_SERVING_URL}/predict
      timeout_ms: 50
      fallback_score: 0.5
    features:
      - amount
      - merchant_category_code
      - velocity_24h
      - geo_distance_from_home
 
  - name: apply_business_rules
    type: rule_engine
    input: scored_transactions
    rules:
      - name: high_value_review
        condition: "amount > 10000 AND fraud_score > 0.3"
        action: flag_for_review
      - name: velocity_block
        condition: "velocity_24h > 20 AND fraud_score > 0.7"
        action: decline

Transformation Types

TypeDescriptionUse Case
schema_transformField extraction, type casting, PII maskingParsing raw events
lookup_joinEnrich with dimension data via key lookupAdding merchant details
ml_inferenceCall ML model for scoringFraud detection
rule_engineApply business rulesPayment routing decisions
window_aggregateTime-windowed aggregationsSettlement calculation
api_enrichmentEnrich with external API dataCurrency conversion
sql_transformSQL-based transformationComplex data manipulation
deduplicationRemove duplicate recordsEvent dedup
filterRow filteringData partitioning

Sinks Section

Output destinations:

sinks:
  approved_transactions:
    type: kafka
    topic: payments.approved
    format: avro
    partitioning:
      key: merchant_id
      partitions: 32
 
  transaction_lake:
    type: delta
    path: s3://data-lake/fintech/transactions/
    partition_by: [year, month, day]
    mode: append
    schema_evolution: true
    optimize:
      z_order_by: [merchant_id, customer_id]
      vacuum_hours: 168
 
  settlement_files:
    type: sftp
    host: ${SETTLEMENT_SFTP_HOST}
    path: /outgoing/settlements/
    format: csv
    schedule: "0 2 * * *"
 
  metrics_sink:
    type: prometheus
    endpoint: ${PROMETHEUS_PUSHGATEWAY}
    metrics:
      - name: transactions_processed_total
        type: counter
        labels: [status, merchant_category]

Quality Checks Section

Built-in quality validations:

quality_checks:
  - name: amount_positive
    type: column_check
    column: amount
    check: "amount > 0"
    severity: critical
 
  - name: valid_currency
    type: allowed_values
    column: currency
    values: ["USD", "EUR", "GBP", "CAD", "AUD", "JPY"]
    severity: warning
 
  - name: transaction_freshness
    type: freshness
    column: timestamp
    max_delay: 5 minutes
    severity: critical
 
  - name: duplicate_check
    type: uniqueness
    columns: [transaction_id]
    window: 24 hours
    severity: critical
 
  - name: settlement_reconciliation
    type: aggregate_match
    source_aggregate:
      table: approved_transactions
      aggregation: "SUM(amount)"
      group_by: [merchant_id, date]
    target_aggregate:
      table: settlements
      aggregation: "SUM(total_amount)"
      group_by: [merchant_id, date]
    tolerance: 0.01
    severity: critical

Orchestration Section

Execution engine configuration:

orchestration:
  engine: temporal
  workflow: payment_processing_workflow
  schedules:
    realtime_processing:
      type: continuous
      parallelism: 16
    batch_settlements:
      schedule: "0 2 * * *"
      timeout: 2 hours
    merchant_refresh:
      schedule: "0 * * * *"
      timeout: 30 minutes
 
  retry_policy:
    max_attempts: 3
    initial_interval: 1s
    backoff_coefficient: 2
    max_interval: 60s
 
  dead_letter:
    topic: payments.dlq
    retention_days: 30

Template Service

The TemplateService manages the template library:

class TemplateService:
    """Manages pipeline templates."""
 
    async def list_templates(self, industry: str = None) -> List[TemplateInfo]:
        """List available templates, optionally filtered by industry."""
 
    async def get_template(self, template_id: str) -> TemplateDefinition:
        """Get a template definition."""
 
    async def render_template(self, template_id: str, parameters: dict) -> PipelineDefinition:
        """Render a template with user-provided parameters."""
 
    async def create_pipeline_from_template(self, template_id: str,
                                             parameters: dict) -> Pipeline:
        """Create a new pipeline from a template."""

Template Rendering

Templates use Jinja2-style parameter substitution:

# User provides parameters
parameters = {
    "SCHEMA_REGISTRY_URL": "http://schema-registry:8081",
    "MERCHANT_DB_URL": "jdbc:postgresql://merchant-db:5432/merchants",
    "ML_SERVING_URL": "http://ml-service:8000",
    "SETTLEMENT_SFTP_HOST": "sftp.settlement-provider.com",
    "PROMETHEUS_PUSHGATEWAY": "http://prometheus-pushgateway:9091",
}
 
# Template service renders the template
pipeline = await template_service.render_template(
    "fintech/payment_processing",
    parameters
)

Template API

List Templates

GET /v1/pipelines/templates?industry=fintech

Response:
{
  "templates": [
    {
      "id": "fintech/payment_processing",
      "name": "Payment Processing Pipeline",
      "industry": "fintech",
      "version": "1.0.0",
      "description": "Real-time payment processing with fraud detection",
      "compliance": ["PCI-DSS", "SOX"],
      "parameters": [
        {"name": "SCHEMA_REGISTRY_URL", "required": true, "description": "Schema Registry endpoint"},
        {"name": "MERCHANT_DB_URL", "required": true, "description": "Merchant database JDBC URL"},
        {"name": "ML_SERVING_URL", "required": true, "description": "ML serving endpoint"}
      ]
    }
  ]
}

Get Template Details

GET /v1/pipelines/templates/{templateId}

Create Pipeline from Template

POST /v1/pipelines/templates/{templateId}/create

Request:
{
  "pipelineName": "acme-payment-processing",
  "parameters": {
    "SCHEMA_REGISTRY_URL": "http://schema-registry:8081",
    "MERCHANT_DB_URL": "jdbc:postgresql://merchant-db:5432/merchants"
  },
  "customizations": {
    "sources.payment_gateway.partitions": 128,
    "quality_checks[0].severity": "warning"
  }
}

Template Customization

Users can customize templates at creation time:

CustomizationExampleScope
Parameter overrideChange partition countSource-level
Quality rule severityDowngrade from critical to warningCheck-level
Schedule changeChange cron expressionOrchestration-level
Sink modificationChange output formatSink-level
Add transformationInsert additional transformation stepPipeline-level
Remove stepSkip optional transformationPipeline-level

Spark Job Templates

The templates/spark/ directory contains Spark-specific templates:

Spark Iceberg Job Template

The spark-iceberg-job.yaml provides a base template for Spark applications that interact with Iceberg tables via Polaris:

FeatureConfiguration
Catalog typeREST (Polaris)
Credential vendingEnabled via X-Iceberg-Access-Delegation header
AuthenticationOAuth2 via Kubernetes Secret
Dynamic allocationMin 1, max 10 executors
Event loggingS3-based event log storage
Restart policyOnFailure with 3 retries

Both Scala/Java and Python templates are provided:

# Scala/Java template
spec:
  type: Scala
  mainClass: org.apache.spark.examples.SparkPi
  mainApplicationFile: "local:///opt/spark/examples/jars/spark-examples_2.12-3.5.0.jar"
 
# Python template
spec:
  type: Python
  mainApplicationFile: "local:///opt/spark/work-dir/etl_job.py"
  pythonVersion: "3"

Template SDK

The Pipeline Service includes a Python SDK for programmatic template interaction:

from matih_pipeline.sdk.models import PipelineTemplate, Source, Transformation, Sink
 
# Create a custom template programmatically
template = PipelineTemplate(
    name="custom-etl",
    version="1.0.0",
    sources=[
        Source(name="my_db", type="jdbc", connection="${DB_URL}"),
    ],
    transformations=[
        Transformation(name="clean", type="sql_transform", sql="SELECT ..."),
    ],
    sinks=[
        Sink(name="lake", type="iceberg", table="analytics.my_table"),
    ],
)
 
# Register template
await template_service.register_template(template)

Related Sections