Data Engineering
The Data Engineering pillar of the MATIH Platform provides the foundational data infrastructure that powers every other capability. From federated SQL queries across diverse data sources to automated pipeline orchestration with built-in quality monitoring, the data engineering layer ensures that data flows reliably, accurately, and efficiently through the platform.
1.1Federated Query Engine (Trino)
MATIH uses Trino as its federated SQL query engine, enabling queries that span multiple data sources in a single SQL statement.
Architecture
Trino Coordinator (1 instance per cluster)
|
+-- Worker Nodes (auto-scaled based on query load)
| |
| +-- Connector: PostgreSQL (tenant operational databases)
| +-- Connector: Hive/S3 (data lake -- Parquet, ORC, Iceberg)
| +-- Connector: Elasticsearch (log analytics, full-text search)
| +-- Connector: ClickHouse (OLAP analytics)
| +-- Connector: Custom (tenant-specific data sources)
|
+-- Resource Groups (per-tenant CPU and memory limits)
+-- Query Queue (fair-share scheduling across tenants)
+-- Result Cache (Redis-backed, configurable TTL)Multi-Tenant Isolation in Trino
| Isolation Mechanism | Description |
|---|---|
| Per-tenant catalogs | Each tenant has dedicated Trino catalog configurations with separate credentials |
| Resource groups | Per-tenant CPU, memory, and concurrency limits enforced by Trino's resource group manager |
| Query timeout | Configurable per tenant (default: 5 minutes for interactive, 30 minutes for batch) |
| Result caching | Redis-backed cache with tenant-scoped keys and configurable TTL |
| Audit logging | Every query execution logged with tenant_id, user_id, SQL, duration, and rows returned |
Federated Query Example
A single query can join data across a PostgreSQL operational database and a data lake in S3:
-- Join customer data from PostgreSQL with purchase history from S3/Parquet
SELECT
c.customer_name,
c.segment,
SUM(p.amount) AS total_purchases,
COUNT(DISTINCT p.order_id) AS order_count
FROM postgresql_catalog.crm.customers c
JOIN datalake_catalog.sales.purchases p
ON c.customer_id = p.customer_id
WHERE p.purchase_date >= DATE '2025-01-01'
GROUP BY c.customer_name, c.segment
ORDER BY total_purchases DESC
LIMIT 20This query is transparent to the user -- they do not need to know that customers lives in PostgreSQL and purchases lives in S3/Parquet. The AI's SQL Agent generates federated queries automatically based on the schema context retrieved from Qdrant.
1.2Pipeline Orchestration
The pipeline-service (Java, port 8092) provides orchestration capabilities for data workflows, integrating with multiple execution engines:
| Engine | Use Case | Integration Pattern |
|---|---|---|
| Apache Airflow | Batch pipeline scheduling and dependency management | DAG management via pipeline-service API; Kubernetes executor for isolated task execution |
| Apache Flink | Real-time stream processing | Flink SQL jobs managed via pipeline-service; Kafka source/sink connectors |
| Apache Spark | Large-scale batch transformations | Spark-on-Kubernetes via spark-submit; pipeline-service tracks job lifecycle |
| Temporal | Long-running, stateful workflows | Workflow definitions for complex multi-step processes with retry and compensation |
Pipeline Lifecycle
Pipeline Definition (YAML or conversational)
|
v
Validation (schema checks, dependency resolution)
|
v
Scheduling (cron-based or event-triggered)
|
v
Execution (Airflow DAG, Flink job, Spark job, or Temporal workflow)
|
+-- Data Quality Checks (pre-load and post-load)
|
+-- Lineage Tracking (catalog-service records source-to-target mappings)
|
+-- Monitoring (Prometheus metrics, Grafana dashboards)
|
v
Completion (success notification or failure alert with retry)Pipeline Templates
MATIH includes pre-built templates for common data engineering patterns:
| Template | Description | Engine |
|---|---|---|
| Full refresh | Drop and recreate target table from source | Spark or Trino CTAS |
| Incremental load | Load only new/changed records since last run | Spark with watermark tracking |
| SCD Type 2 | Slowly changing dimension with history tracking | Spark with merge logic |
| CDC stream | Real-time change capture from source database | Flink with Kafka CDC connector |
| Data quality check | Standalone quality validation pipeline | data-quality-service rules engine |
| Aggregation rollup | Pre-compute aggregations for dashboard performance | Flink windowed aggregation or Spark batch |
1.3Data Ingestion Patterns
MATIH supports three primary ingestion patterns:
Batch Ingestion
Source (Database, API, File System)
-> Connector (JDBC, REST, S3)
-> Staging Area (raw zone in data lake)
-> Transformation (Spark SQL, Trino CTAS)
-> Target (analytics zone, served via Trino)Batch ingestion is orchestrated by Airflow DAGs managed through the pipeline-service. Each ingestion job:
- Extracts data from the source using configurable connectors
- Validates schema compatibility with the target
- Applies data quality checks before and after loading
- Records lineage in the catalog-service
- Emits pipeline events to Kafka for downstream consumers
Stream Ingestion
Source (Kafka topic, CDC stream)
-> Flink Job (parsing, transformation, enrichment)
-> Target (Kafka topic, data lake, OLAP engine)Stream ingestion is powered by Apache Flink with four pre-built streaming jobs:
| Flink Job | Source | Target | Purpose |
|---|---|---|---|
agent-performance-agg | Agent events (Kafka) | Analytics tables | Aggregate agent response times and success rates |
llm-operations-agg | LLM events (Kafka) | Analytics tables | Track token usage and latency by model and tenant |
session-analytics | Session events (Kafka) | Analytics tables | Compute session duration, message count, engagement |
state-transition-cdc | PostgreSQL CDC (Kafka) | Analytics tables | Capture state changes for event sourcing |
Hybrid Ingestion
The most common pattern combines batch and stream:
- Initial load: Batch ingestion to backfill historical data
- Ongoing updates: CDC stream for real-time incremental changes
- Reconciliation: Periodic batch job to verify stream consistency
1.4Data Quality Monitoring
The data-quality-service (Python, port 8000) provides continuous quality monitoring:
| Feature | Description |
|---|---|
| Rule-based checks | Define expectations (e.g., "column X is never null", "row count increases daily", "values in range [0, 100]") |
| Statistical profiling | Automatic distribution analysis, outlier detection, and trend monitoring for every column |
| Quality scores | Per-table and per-column quality scores (0-100) based on completeness, accuracy, timeliness, consistency, and validity |
| Schema drift detection | Automatic detection of new columns, type changes, and dropped columns in source data |
| Alerting | Automatic notifications when quality scores drop below tenant-configurable thresholds |
| Lineage-aware impact | When a quality issue is detected, the system identifies all downstream dashboards, models, and reports affected |
Quality Score Computation
The quality score for a table is computed as a weighted average of five dimensions:
| Dimension | Weight | Checks |
|---|---|---|
| Completeness | 25% | Null rate, missing value patterns, required field coverage |
| Accuracy | 25% | Value range validation, referential integrity, business rule compliance |
| Timeliness | 20% | Data freshness (time since last update), SLA compliance |
| Consistency | 15% | Cross-source consistency, duplicate detection, format validation |
| Validity | 15% | Data type compliance, enum validation, pattern matching |
Quality scores are surfaced in multiple places:
- Data catalog -- Each table shows its current quality score with trend
- Conversational analytics -- The Analysis Agent includes quality warnings when answering questions about low-quality data
- Dashboards -- Widgets display a quality indicator when their source data quality drops below threshold
- ML training -- The ML Workbench warns when training data quality is below the configured threshold
1.5Data Catalog Integration
The catalog-service (Java, port 8086) provides the metadata backbone for all data engineering activities:
| Capability | Description |
|---|---|
| Automated discovery | Crawl connected Trino catalogs to discover tables, columns, and schemas automatically |
| Business glossary | Define business terms and map them to technical schema elements |
| Data lineage | Track data flow from source through transformations to consumption points |
| Search | Full-text search via Elasticsearch with faceted filtering by schema, type, quality, and owner |
| Classification | Automatic and manual tagging of sensitive data (PII, PHI, financial) for governance |
| Impact analysis | "What breaks if I change this?" queries via Context Graph traversal |
1.6Conversational Data Engineering
One of the unique capabilities of MATIH is the ability to create and manage data pipelines through conversation:
User: "Create a daily pipeline that loads order data from the Postgres
orders table into the analytics schema, with deduplication on
order_id and a data quality check for null values in amount."
AI Response:
- Generated pipeline template:
Source: postgresql_catalog.operations.orders
Target: analytics.orders_daily
Schedule: Daily at 02:00 UTC
Deduplication: order_id (keep latest)
Quality Checks:
- orders.amount IS NOT NULL (threshold: 99%)
- orders.order_date IS NOT NULL (threshold: 100%)
- Row count >= previous day (threshold: 95%)
- Review and deploy? [Deploy] [Edit] [Cancel]The AI generates a pipeline template that the data engineer can review, modify, and deploy. This conversational interface reduces the time to create a new pipeline from hours (writing Airflow DAGs manually) to minutes.
Deep Dive References
- Query Engine -- Trino configuration, query execution, caching, and optimization
- Pipeline Service -- Pipeline orchestration architecture and API reference
- Data Quality -- Quality rule engine, scoring, and alerting
- Data Catalog -- Catalog service, lineage tracking, and discovery
- Flink Jobs -- Streaming job definitions and deployment