MATIH Platform is in active MVP development. Documentation reflects current implementation status.
12. AI Service
Integrations
Data Lineage

Data Lineage

The Data Lineage integration tracks the provenance of every SQL query generated by the AI Service, recording which tables and columns were accessed, how data was transformed, and which agent pipeline produced the query. This information feeds into the platform-wide lineage graph managed by the OpenMetadata integration.


Lineage Architecture

Lineage tracking is implemented through hooks in the SQL generation pipeline and the agent orchestrator:

ComponentRoleLocation
Lineage BridgeCaptures query metadata during SQL generationsrc/context_graph/integration/lineage_bridge.py
Kafka ProducerPublishes lineage events to Kafkasrc/context_graph/integration/kafka_producer.py
Schema IntegrationResolves table and column referencessrc/context_graph/integration/schema_integration.py

Lineage Event Schema

Each SQL query produces a lineage event published to the query-lineage-events Kafka topic:

{
  "event_id": "lin-abc123",
  "event_type": "query.lineage",
  "tenant_id": "acme-corp",
  "session_id": "sess-xyz789",
  "timestamp": "2025-03-15T10:00:00Z",
  "query": {
    "sql": "SELECT region, SUM(revenue) FROM sales GROUP BY region",
    "dialect": "trino"
  },
  "sources": [
    {
      "table": "public.sales",
      "columns_read": ["region", "revenue"],
      "filter_columns": []
    }
  ],
  "transformations": [
    {
      "type": "aggregation",
      "function": "SUM",
      "input_column": "revenue",
      "output_alias": "sum_revenue"
    },
    {
      "type": "group_by",
      "columns": ["region"]
    }
  ],
  "agent_pipeline": ["router_agent", "sql_agent"],
  "user_question": "What is revenue by region?"
}

Column-Level Lineage

The lineage system tracks column-level provenance by parsing the generated SQL AST using sqlglot:

Lineage TypeDescriptionExample
Direct readColumn read without transformationSELECT name FROM customers
AggregationColumn used in aggregate functionSUM(revenue)
FilterColumn used in WHERE clauseWHERE region = 'US'
Join keyColumn used in JOIN conditionON orders.customer_id = customers.id
DerivedColumn created by expressionprice * quantity AS total

Integration with OpenMetadata

Lineage events are consumed by the platform lineage service which persists them into OpenMetadata:

AI Service --> Kafka (query-lineage-events) --> Lineage Consumer --> OpenMetadata API

This enables end-to-end lineage visualization from the original natural language question through SQL generation to the underlying data assets.

Lineage Query API

The AI Service exposes a read-only lineage query endpoint:

PropertyValue
MethodGET
Path/api/v1/lineage/query/:query_id
AuthJWT required

Response

{
  "query_id": "q-abc123",
  "upstream_tables": ["public.sales", "public.customers"],
  "downstream_consumers": [],
  "column_lineage": [
    {
      "output": "sum_revenue",
      "inputs": [{"table": "sales", "column": "revenue"}],
      "transformation": "SUM"
    }
  ]
}

Configuration

Environment VariableDefaultDescription
LINEAGE_ENABLEDtrueEnable lineage event production
LINEAGE_KAFKA_TOPICquery-lineage-eventsKafka topic for lineage events
LINEAGE_PARSE_SQLtrueEnable SQL AST parsing for column lineage