Data Lineage
Data lineage tracks the origin, movement, and transformation of data across the platform. MATIH integrates with the OpenLineage standard to capture lineage events from pipelines, queries, and transformations, and provides column-level lineage, impact analysis, and interactive lineage visualization.
Lineage Architecture
+------------------+ OpenLineage Events +-------------------+
| Pipeline Service | --------------------------> | Catalog Service |
| (Airflow, Spark) | | (LineageController|
+------------------+ | LineageRepository|
| ImpactService) |
+------------------+ Query Lineage | |
| Query Engine | --------------------------> | |
| (SQL parsed) | +--------+----------+
+------------------+ |
+--------v----------+
+------------------+ dbt Lineage | OpenMetadata |
| Pipeline Service | --------------------------> | (Lineage Store) |
| (dbt integration)| +-------------------+
+------------------+Lineage Sources
| Source | Mechanism | Granularity |
|---|---|---|
| Apache Airflow | OpenLineage integration in DAG tasks | Table-level |
| Apache Spark | OpenLineage Spark listener | Column-level |
| dbt | dbt manifest parsing | Column-level |
| SQL queries | Query parsing in Query Engine | Table-level |
| Flink CDC | Event metadata from CDC jobs | Table-level |
| Manual definition | API-based lineage submission | Table or column |
OpenLineage Integration
The Catalog Service consumes OpenLineage events and stores them in both the local lineage graph and OpenMetadata.
OpenLineage Event Format
{
"eventTime": "2026-02-12T10:30:00Z",
"eventType": "COMPLETE",
"producer": "https://airflow.matih-data-plane/",
"run": {
"runId": "run-123",
"facets": {
"processing_engine": {"name": "spark", "version": "4.1.1"}
}
},
"job": {
"namespace": "matih-tenant-acme",
"name": "daily_sales_etl",
"facets": {
"sql": {"query": "INSERT INTO analytics.daily_sales SELECT ..."}
}
},
"inputs": [
{
"namespace": "matih-tenant-acme",
"name": "raw.orders",
"facets": {
"schema": {
"fields": [
{"name": "order_id", "type": "BIGINT"},
{"name": "amount", "type": "DECIMAL"}
]
}
}
}
],
"outputs": [
{
"namespace": "matih-tenant-acme",
"name": "analytics.daily_sales",
"facets": {
"schema": {
"fields": [
{"name": "date", "type": "DATE"},
{"name": "total_amount", "type": "DECIMAL"}
]
},
"columnLineage": {
"fields": {
"total_amount": {
"inputFields": [
{"namespace": "matih-tenant-acme", "name": "raw.orders", "field": "amount"}
],
"transformationDescription": "SUM(amount)",
"transformationType": "AGGREGATE"
}
}
}
}
}
]
}Lineage Event Processing
@RestController
@RequestMapping("/v1/lineage")
public class LineageController {
@PostMapping("/events")
public ResponseEntity<Void> ingestLineageEvent(@RequestBody OpenLineageEvent event) {
// 1. Validate event format
// 2. Extract table and column references
// 3. Store in local lineage graph (PostgreSQL)
// 4. Sync to OpenMetadata
// 5. Update column-level lineage if present
// 6. Publish lineage event to Kafka
}
}Column-Level Lineage
The ColumnLineageController provides fine-grained lineage tracking at the column level:
GET /v1/lineage/columns?table=analytics.daily_sales&column=total_amount
Response:
{
"table": "analytics.daily_sales",
"column": "total_amount",
"upstream": [
{
"table": "raw.orders",
"column": "amount",
"transformation": "SUM(amount)",
"transformationType": "AGGREGATE",
"pipeline": "daily_sales_etl",
"lastUpdated": "2026-02-12T02:00:00Z"
}
],
"downstream": [
{
"table": "reports.revenue_dashboard",
"column": "daily_revenue",
"transformation": "direct_copy",
"transformationType": "IDENTITY",
"pipeline": "dashboard_refresh",
"lastUpdated": "2026-02-12T06:00:00Z"
}
]
}Transformation Types
| Type | Description | Example |
|---|---|---|
IDENTITY | Direct copy without transformation | SELECT amount FROM orders |
AGGREGATE | Aggregation function applied | SUM(amount) |
FILTER | Row filtering applied | WHERE status = 'active' |
JOIN | Column produced by join operation | orders.customer_id JOIN customers.id |
EXPRESSION | Calculated from expression | amount * quantity |
CONDITIONAL | Conditional transformation | CASE WHEN ... THEN ... END |
EXTERNAL | Data from external system | API import |
Impact Analysis
The LineageImpactService provides downstream impact analysis for proposed changes:
@Service
public class LineageImpactService {
public ImpactAnalysisResult analyzeImpact(UUID tenantId, String tableFqn,
ChangeType changeType, String columnName) {
// 1. Traverse lineage graph downstream from the affected asset
// 2. Identify all dependent tables, views, and dashboards
// 3. Classify impact severity for each dependent
// 4. Return structured impact report
}
}Impact Analysis Request
POST /v1/lineage/impact-analysis
Request:
{
"tableFqn": "raw.orders",
"changeType": "COLUMN_REMOVED",
"columnName": "discount_code",
"includeIndirect": true
}Impact Analysis Response
{
"sourceTable": "raw.orders",
"changeType": "COLUMN_REMOVED",
"column": "discount_code",
"directImpacts": [
{
"asset": "analytics.order_details",
"assetType": "TABLE",
"impactedColumn": "discount_applied",
"transformation": "CASE WHEN discount_code IS NOT NULL ...",
"severity": "BREAKING",
"pipeline": "order_enrichment_etl"
}
],
"indirectImpacts": [
{
"asset": "reports.discount_analysis",
"assetType": "VIEW",
"impactedColumn": "discount_rate",
"severity": "BREAKING",
"dependencyChain": ["raw.orders", "analytics.order_details", "reports.discount_analysis"]
},
{
"asset": "dashboard:sales-overview",
"assetType": "DASHBOARD",
"impactedWidget": "Discount Usage Chart",
"severity": "DEGRADED"
}
],
"totalImpactedAssets": 5,
"breakingChanges": 2,
"degradedAssets": 3,
"recommendation": "Update analytics.order_details pipeline to handle missing discount_code column before removing"
}Impact Severity Levels
| Severity | Description | Action Required |
|---|---|---|
BREAKING | Downstream asset will fail | Must update downstream before applying change |
DEGRADED | Downstream asset will produce partial results | Should update downstream |
COSMETIC | Downstream asset displays differently | Optional update |
NONE | No impact detected | Safe to proceed |
Lineage Visualization
The LineageVisualizationService and VisualizationImpactAnalysisService generate graph representations for the frontend:
GET /v1/lineage/graph?table=analytics.daily_sales&depth=3&direction=BOTH
Response:
{
"nodes": [
{"id": "raw.orders", "type": "TABLE", "label": "orders", "schema": "raw"},
{"id": "raw.customers", "type": "TABLE", "label": "customers", "schema": "raw"},
{"id": "analytics.daily_sales", "type": "TABLE", "label": "daily_sales", "schema": "analytics"},
{"id": "reports.revenue_dashboard", "type": "DASHBOARD", "label": "Revenue Dashboard"},
{"id": "pipeline:daily_sales_etl", "type": "PIPELINE", "label": "daily_sales_etl"}
],
"edges": [
{"source": "raw.orders", "target": "pipeline:daily_sales_etl", "type": "INPUT"},
{"source": "raw.customers", "target": "pipeline:daily_sales_etl", "type": "INPUT"},
{"source": "pipeline:daily_sales_etl", "target": "analytics.daily_sales", "type": "OUTPUT"},
{"source": "analytics.daily_sales", "target": "reports.revenue_dashboard", "type": "SOURCE"}
],
"metadata": {
"depth": 3,
"direction": "BOTH",
"totalNodes": 5,
"totalEdges": 4
}
}Graph Parameters
| Parameter | Description | Default |
|---|---|---|
table | Starting table for the graph | Required |
depth | Maximum hops from the starting table | 3 |
direction | UPSTREAM, DOWNSTREAM, or BOTH | BOTH |
includeColumns | Include column-level lineage nodes | false |
includePipelines | Show pipeline nodes as intermediaries | true |
includeDashboards | Show dashboard consumers | true |
Lineage Storage
Lineage data is stored in two locations:
| Store | Purpose | Query Pattern |
|---|---|---|
PostgreSQL (CatalogLineageRepository) | Relational lineage edges for API queries | Impact analysis, dependency lookups |
| OpenMetadata | Canonical lineage store with UI visualization | Browsing, governance |
The CatalogLineage entity:
@Entity
public class CatalogLineage {
private UUID id;
private UUID tenantId;
private String sourceTableFqn;
private String sourceColumn; // NULL for table-level lineage
private String targetTableFqn;
private String targetColumn; // NULL for table-level lineage
private String transformationType;
private String transformationDescription;
private String pipelineName;
private Instant lastObserved;
private Instant createdAt;
}Related Sections
- Catalog Service -- Lineage event handling in the catalog
- Governance Service -- Lineage-informed governance policies
- Pipeline Service -- Lineage emission from pipelines
- API Reference -- Lineage API endpoints