MATIH Platform is in active MVP development. Documentation reflects current implementation status.
1. Introduction
Data Engineering

Data Engineering

Production - pipeline-service, query-engine, catalog-service -- Pipeline orchestration, Trino federation, CDC, quality monitoring

The Data Engineering pillar of the MATIH Platform provides the foundational data infrastructure that powers every other capability. From federated SQL queries across diverse data sources to automated pipeline orchestration with built-in quality monitoring, the data engineering layer ensures that data flows reliably, accurately, and efficiently through the platform.


1.1Federated Query Engine (Trino)

MATIH uses Trino as its federated SQL query engine, enabling queries that span multiple data sources in a single SQL statement.

Architecture

Trino Coordinator (1 instance per cluster)
  |
  +-- Worker Nodes (auto-scaled based on query load)
  |     |
  |     +-- Connector: PostgreSQL (tenant operational databases)
  |     +-- Connector: Hive/S3 (data lake -- Parquet, ORC, Iceberg)
  |     +-- Connector: Elasticsearch (log analytics, full-text search)
  |     +-- Connector: ClickHouse (OLAP analytics)
  |     +-- Connector: Custom (tenant-specific data sources)
  |
  +-- Resource Groups (per-tenant CPU and memory limits)
  +-- Query Queue (fair-share scheduling across tenants)
  +-- Result Cache (Redis-backed, configurable TTL)

Multi-Tenant Isolation in Trino

Isolation MechanismDescription
Per-tenant catalogsEach tenant has dedicated Trino catalog configurations with separate credentials
Resource groupsPer-tenant CPU, memory, and concurrency limits enforced by Trino's resource group manager
Query timeoutConfigurable per tenant (default: 5 minutes for interactive, 30 minutes for batch)
Result cachingRedis-backed cache with tenant-scoped keys and configurable TTL
Audit loggingEvery query execution logged with tenant_id, user_id, SQL, duration, and rows returned

Federated Query Example

A single query can join data across a PostgreSQL operational database and a data lake in S3:

-- Join customer data from PostgreSQL with purchase history from S3/Parquet
SELECT
    c.customer_name,
    c.segment,
    SUM(p.amount) AS total_purchases,
    COUNT(DISTINCT p.order_id) AS order_count
FROM postgresql_catalog.crm.customers c
JOIN datalake_catalog.sales.purchases p
    ON c.customer_id = p.customer_id
WHERE p.purchase_date >= DATE '2025-01-01'
GROUP BY c.customer_name, c.segment
ORDER BY total_purchases DESC
LIMIT 20

This query is transparent to the user -- they do not need to know that customers lives in PostgreSQL and purchases lives in S3/Parquet. The AI's SQL Agent generates federated queries automatically based on the schema context retrieved from Qdrant.


1.2Pipeline Orchestration

The pipeline-service (Java, port 8092) provides orchestration capabilities for data workflows, integrating with multiple execution engines:

EngineUse CaseIntegration Pattern
Apache AirflowBatch pipeline scheduling and dependency managementDAG management via pipeline-service API; Kubernetes executor for isolated task execution
Apache FlinkReal-time stream processingFlink SQL jobs managed via pipeline-service; Kafka source/sink connectors
Apache SparkLarge-scale batch transformationsSpark-on-Kubernetes via spark-submit; pipeline-service tracks job lifecycle
TemporalLong-running, stateful workflowsWorkflow definitions for complex multi-step processes with retry and compensation

Pipeline Lifecycle

Pipeline Definition (YAML or conversational)
  |
  v
Validation (schema checks, dependency resolution)
  |
  v
Scheduling (cron-based or event-triggered)
  |
  v
Execution (Airflow DAG, Flink job, Spark job, or Temporal workflow)
  |
  +-- Data Quality Checks (pre-load and post-load)
  |
  +-- Lineage Tracking (catalog-service records source-to-target mappings)
  |
  +-- Monitoring (Prometheus metrics, Grafana dashboards)
  |
  v
Completion (success notification or failure alert with retry)

Pipeline Templates

MATIH includes pre-built templates for common data engineering patterns:

TemplateDescriptionEngine
Full refreshDrop and recreate target table from sourceSpark or Trino CTAS
Incremental loadLoad only new/changed records since last runSpark with watermark tracking
SCD Type 2Slowly changing dimension with history trackingSpark with merge logic
CDC streamReal-time change capture from source databaseFlink with Kafka CDC connector
Data quality checkStandalone quality validation pipelinedata-quality-service rules engine
Aggregation rollupPre-compute aggregations for dashboard performanceFlink windowed aggregation or Spark batch

1.3Data Ingestion Patterns

MATIH supports three primary ingestion patterns:

Batch Ingestion

Source (Database, API, File System)
  -> Connector (JDBC, REST, S3)
    -> Staging Area (raw zone in data lake)
      -> Transformation (Spark SQL, Trino CTAS)
        -> Target (analytics zone, served via Trino)

Batch ingestion is orchestrated by Airflow DAGs managed through the pipeline-service. Each ingestion job:

  • Extracts data from the source using configurable connectors
  • Validates schema compatibility with the target
  • Applies data quality checks before and after loading
  • Records lineage in the catalog-service
  • Emits pipeline events to Kafka for downstream consumers

Stream Ingestion

Source (Kafka topic, CDC stream)
  -> Flink Job (parsing, transformation, enrichment)
    -> Target (Kafka topic, data lake, OLAP engine)

Stream ingestion is powered by Apache Flink with four pre-built streaming jobs:

Flink JobSourceTargetPurpose
agent-performance-aggAgent events (Kafka)Analytics tablesAggregate agent response times and success rates
llm-operations-aggLLM events (Kafka)Analytics tablesTrack token usage and latency by model and tenant
session-analyticsSession events (Kafka)Analytics tablesCompute session duration, message count, engagement
state-transition-cdcPostgreSQL CDC (Kafka)Analytics tablesCapture state changes for event sourcing

Hybrid Ingestion

The most common pattern combines batch and stream:

  1. Initial load: Batch ingestion to backfill historical data
  2. Ongoing updates: CDC stream for real-time incremental changes
  3. Reconciliation: Periodic batch job to verify stream consistency

1.4Data Quality Monitoring

The data-quality-service (Python, port 8000) provides continuous quality monitoring:

FeatureDescription
Rule-based checksDefine expectations (e.g., "column X is never null", "row count increases daily", "values in range [0, 100]")
Statistical profilingAutomatic distribution analysis, outlier detection, and trend monitoring for every column
Quality scoresPer-table and per-column quality scores (0-100) based on completeness, accuracy, timeliness, consistency, and validity
Schema drift detectionAutomatic detection of new columns, type changes, and dropped columns in source data
AlertingAutomatic notifications when quality scores drop below tenant-configurable thresholds
Lineage-aware impactWhen a quality issue is detected, the system identifies all downstream dashboards, models, and reports affected

Quality Score Computation

The quality score for a table is computed as a weighted average of five dimensions:

DimensionWeightChecks
Completeness25%Null rate, missing value patterns, required field coverage
Accuracy25%Value range validation, referential integrity, business rule compliance
Timeliness20%Data freshness (time since last update), SLA compliance
Consistency15%Cross-source consistency, duplicate detection, format validation
Validity15%Data type compliance, enum validation, pattern matching

Quality scores are surfaced in multiple places:

  • Data catalog -- Each table shows its current quality score with trend
  • Conversational analytics -- The Analysis Agent includes quality warnings when answering questions about low-quality data
  • Dashboards -- Widgets display a quality indicator when their source data quality drops below threshold
  • ML training -- The ML Workbench warns when training data quality is below the configured threshold

1.5Data Catalog Integration

The catalog-service (Java, port 8086) provides the metadata backbone for all data engineering activities:

CapabilityDescription
Automated discoveryCrawl connected Trino catalogs to discover tables, columns, and schemas automatically
Business glossaryDefine business terms and map them to technical schema elements
Data lineageTrack data flow from source through transformations to consumption points
SearchFull-text search via Elasticsearch with faceted filtering by schema, type, quality, and owner
ClassificationAutomatic and manual tagging of sensitive data (PII, PHI, financial) for governance
Impact analysis"What breaks if I change this?" queries via Context Graph traversal

1.6Conversational Data Engineering

One of the unique capabilities of MATIH is the ability to create and manage data pipelines through conversation:

User: "Create a daily pipeline that loads order data from the Postgres
       orders table into the analytics schema, with deduplication on
       order_id and a data quality check for null values in amount."

AI Response:
  - Generated pipeline template:
    Source: postgresql_catalog.operations.orders
    Target: analytics.orders_daily
    Schedule: Daily at 02:00 UTC
    Deduplication: order_id (keep latest)
    Quality Checks:
      - orders.amount IS NOT NULL (threshold: 99%)
      - orders.order_date IS NOT NULL (threshold: 100%)
      - Row count >= previous day (threshold: 95%)
  - Review and deploy? [Deploy] [Edit] [Cancel]

The AI generates a pipeline template that the data engineer can review, modify, and deploy. This conversational interface reduces the time to create a new pipeline from hours (writing Airflow DAGs manually) to minutes.


Deep Dive References

  • Query Engine -- Trino configuration, query execution, caching, and optimization
  • Pipeline Service -- Pipeline orchestration architecture and API reference
  • Data Quality -- Quality rule engine, scoring, and alerting
  • Data Catalog -- Catalog service, lineage tracking, and discovery
  • Flink Jobs -- Streaming job definitions and deployment