MATIH Platform is in active MVP development. Documentation reflects current implementation status.
10. Data Catalog & Governance
Data Lineage

Data Lineage

Data lineage tracks the origin, movement, and transformation of data across the platform. MATIH integrates with the OpenLineage standard to capture lineage events from pipelines, queries, and transformations, and provides column-level lineage, impact analysis, and interactive lineage visualization.


Lineage Architecture

+------------------+     OpenLineage Events     +-------------------+
| Pipeline Service | --------------------------> | Catalog Service   |
| (Airflow, Spark) |                             | (LineageController|
+------------------+                             |  LineageRepository|
                                                 |  ImpactService)   |
+------------------+     Query Lineage           |                   |
| Query Engine     | --------------------------> |                   |
| (SQL parsed)     |                             +--------+----------+
+------------------+                                      |
                                                 +--------v----------+
+------------------+     dbt Lineage             |   OpenMetadata    |
| Pipeline Service | --------------------------> |   (Lineage Store) |
| (dbt integration)|                             +-------------------+
+------------------+

Lineage Sources

SourceMechanismGranularity
Apache AirflowOpenLineage integration in DAG tasksTable-level
Apache SparkOpenLineage Spark listenerColumn-level
dbtdbt manifest parsingColumn-level
SQL queriesQuery parsing in Query EngineTable-level
Flink CDCEvent metadata from CDC jobsTable-level
Manual definitionAPI-based lineage submissionTable or column

OpenLineage Integration

The Catalog Service consumes OpenLineage events and stores them in both the local lineage graph and OpenMetadata.

OpenLineage Event Format

{
  "eventTime": "2026-02-12T10:30:00Z",
  "eventType": "COMPLETE",
  "producer": "https://airflow.matih-data-plane/",
  "run": {
    "runId": "run-123",
    "facets": {
      "processing_engine": {"name": "spark", "version": "4.1.1"}
    }
  },
  "job": {
    "namespace": "matih-tenant-acme",
    "name": "daily_sales_etl",
    "facets": {
      "sql": {"query": "INSERT INTO analytics.daily_sales SELECT ..."}
    }
  },
  "inputs": [
    {
      "namespace": "matih-tenant-acme",
      "name": "raw.orders",
      "facets": {
        "schema": {
          "fields": [
            {"name": "order_id", "type": "BIGINT"},
            {"name": "amount", "type": "DECIMAL"}
          ]
        }
      }
    }
  ],
  "outputs": [
    {
      "namespace": "matih-tenant-acme",
      "name": "analytics.daily_sales",
      "facets": {
        "schema": {
          "fields": [
            {"name": "date", "type": "DATE"},
            {"name": "total_amount", "type": "DECIMAL"}
          ]
        },
        "columnLineage": {
          "fields": {
            "total_amount": {
              "inputFields": [
                {"namespace": "matih-tenant-acme", "name": "raw.orders", "field": "amount"}
              ],
              "transformationDescription": "SUM(amount)",
              "transformationType": "AGGREGATE"
            }
          }
        }
      }
    }
  ]
}

Lineage Event Processing

@RestController
@RequestMapping("/v1/lineage")
public class LineageController {
 
    @PostMapping("/events")
    public ResponseEntity<Void> ingestLineageEvent(@RequestBody OpenLineageEvent event) {
        // 1. Validate event format
        // 2. Extract table and column references
        // 3. Store in local lineage graph (PostgreSQL)
        // 4. Sync to OpenMetadata
        // 5. Update column-level lineage if present
        // 6. Publish lineage event to Kafka
    }
}

Column-Level Lineage

The ColumnLineageController provides fine-grained lineage tracking at the column level:

GET /v1/lineage/columns?table=analytics.daily_sales&column=total_amount

Response:
{
  "table": "analytics.daily_sales",
  "column": "total_amount",
  "upstream": [
    {
      "table": "raw.orders",
      "column": "amount",
      "transformation": "SUM(amount)",
      "transformationType": "AGGREGATE",
      "pipeline": "daily_sales_etl",
      "lastUpdated": "2026-02-12T02:00:00Z"
    }
  ],
  "downstream": [
    {
      "table": "reports.revenue_dashboard",
      "column": "daily_revenue",
      "transformation": "direct_copy",
      "transformationType": "IDENTITY",
      "pipeline": "dashboard_refresh",
      "lastUpdated": "2026-02-12T06:00:00Z"
    }
  ]
}

Transformation Types

TypeDescriptionExample
IDENTITYDirect copy without transformationSELECT amount FROM orders
AGGREGATEAggregation function appliedSUM(amount)
FILTERRow filtering appliedWHERE status = 'active'
JOINColumn produced by join operationorders.customer_id JOIN customers.id
EXPRESSIONCalculated from expressionamount * quantity
CONDITIONALConditional transformationCASE WHEN ... THEN ... END
EXTERNALData from external systemAPI import

Impact Analysis

The LineageImpactService provides downstream impact analysis for proposed changes:

@Service
public class LineageImpactService {
 
    public ImpactAnalysisResult analyzeImpact(UUID tenantId, String tableFqn,
                                               ChangeType changeType, String columnName) {
        // 1. Traverse lineage graph downstream from the affected asset
        // 2. Identify all dependent tables, views, and dashboards
        // 3. Classify impact severity for each dependent
        // 4. Return structured impact report
    }
}

Impact Analysis Request

POST /v1/lineage/impact-analysis

Request:
{
  "tableFqn": "raw.orders",
  "changeType": "COLUMN_REMOVED",
  "columnName": "discount_code",
  "includeIndirect": true
}

Impact Analysis Response

{
  "sourceTable": "raw.orders",
  "changeType": "COLUMN_REMOVED",
  "column": "discount_code",
  "directImpacts": [
    {
      "asset": "analytics.order_details",
      "assetType": "TABLE",
      "impactedColumn": "discount_applied",
      "transformation": "CASE WHEN discount_code IS NOT NULL ...",
      "severity": "BREAKING",
      "pipeline": "order_enrichment_etl"
    }
  ],
  "indirectImpacts": [
    {
      "asset": "reports.discount_analysis",
      "assetType": "VIEW",
      "impactedColumn": "discount_rate",
      "severity": "BREAKING",
      "dependencyChain": ["raw.orders", "analytics.order_details", "reports.discount_analysis"]
    },
    {
      "asset": "dashboard:sales-overview",
      "assetType": "DASHBOARD",
      "impactedWidget": "Discount Usage Chart",
      "severity": "DEGRADED"
    }
  ],
  "totalImpactedAssets": 5,
  "breakingChanges": 2,
  "degradedAssets": 3,
  "recommendation": "Update analytics.order_details pipeline to handle missing discount_code column before removing"
}

Impact Severity Levels

SeverityDescriptionAction Required
BREAKINGDownstream asset will failMust update downstream before applying change
DEGRADEDDownstream asset will produce partial resultsShould update downstream
COSMETICDownstream asset displays differentlyOptional update
NONENo impact detectedSafe to proceed

Lineage Visualization

The LineageVisualizationService and VisualizationImpactAnalysisService generate graph representations for the frontend:

GET /v1/lineage/graph?table=analytics.daily_sales&depth=3&direction=BOTH

Response:
{
  "nodes": [
    {"id": "raw.orders", "type": "TABLE", "label": "orders", "schema": "raw"},
    {"id": "raw.customers", "type": "TABLE", "label": "customers", "schema": "raw"},
    {"id": "analytics.daily_sales", "type": "TABLE", "label": "daily_sales", "schema": "analytics"},
    {"id": "reports.revenue_dashboard", "type": "DASHBOARD", "label": "Revenue Dashboard"},
    {"id": "pipeline:daily_sales_etl", "type": "PIPELINE", "label": "daily_sales_etl"}
  ],
  "edges": [
    {"source": "raw.orders", "target": "pipeline:daily_sales_etl", "type": "INPUT"},
    {"source": "raw.customers", "target": "pipeline:daily_sales_etl", "type": "INPUT"},
    {"source": "pipeline:daily_sales_etl", "target": "analytics.daily_sales", "type": "OUTPUT"},
    {"source": "analytics.daily_sales", "target": "reports.revenue_dashboard", "type": "SOURCE"}
  ],
  "metadata": {
    "depth": 3,
    "direction": "BOTH",
    "totalNodes": 5,
    "totalEdges": 4
  }
}

Graph Parameters

ParameterDescriptionDefault
tableStarting table for the graphRequired
depthMaximum hops from the starting table3
directionUPSTREAM, DOWNSTREAM, or BOTHBOTH
includeColumnsInclude column-level lineage nodesfalse
includePipelinesShow pipeline nodes as intermediariestrue
includeDashboardsShow dashboard consumerstrue

Lineage Storage

Lineage data is stored in two locations:

StorePurposeQuery Pattern
PostgreSQL (CatalogLineageRepository)Relational lineage edges for API queriesImpact analysis, dependency lookups
OpenMetadataCanonical lineage store with UI visualizationBrowsing, governance

The CatalogLineage entity:

@Entity
public class CatalogLineage {
    private UUID id;
    private UUID tenantId;
    private String sourceTableFqn;
    private String sourceColumn;          // NULL for table-level lineage
    private String targetTableFqn;
    private String targetColumn;          // NULL for table-level lineage
    private String transformationType;
    private String transformationDescription;
    private String pipelineName;
    private Instant lastObserved;
    private Instant createdAt;
}

Related Sections