Data Lineage
The Data Lineage integration tracks the provenance of every SQL query generated by the AI Service, recording which tables and columns were accessed, how data was transformed, and which agent pipeline produced the query. This information feeds into the platform-wide lineage graph managed by the OpenMetadata integration.
Lineage Architecture
Lineage tracking is implemented through hooks in the SQL generation pipeline and the agent orchestrator:
| Component | Role | Location |
|---|---|---|
| Lineage Bridge | Captures query metadata during SQL generation | src/context_graph/integration/lineage_bridge.py |
| Kafka Producer | Publishes lineage events to Kafka | src/context_graph/integration/kafka_producer.py |
| Schema Integration | Resolves table and column references | src/context_graph/integration/schema_integration.py |
Lineage Event Schema
Each SQL query produces a lineage event published to the query-lineage-events Kafka topic:
{
"event_id": "lin-abc123",
"event_type": "query.lineage",
"tenant_id": "acme-corp",
"session_id": "sess-xyz789",
"timestamp": "2025-03-15T10:00:00Z",
"query": {
"sql": "SELECT region, SUM(revenue) FROM sales GROUP BY region",
"dialect": "trino"
},
"sources": [
{
"table": "public.sales",
"columns_read": ["region", "revenue"],
"filter_columns": []
}
],
"transformations": [
{
"type": "aggregation",
"function": "SUM",
"input_column": "revenue",
"output_alias": "sum_revenue"
},
{
"type": "group_by",
"columns": ["region"]
}
],
"agent_pipeline": ["router_agent", "sql_agent"],
"user_question": "What is revenue by region?"
}Column-Level Lineage
The lineage system tracks column-level provenance by parsing the generated SQL AST using sqlglot:
| Lineage Type | Description | Example |
|---|---|---|
| Direct read | Column read without transformation | SELECT name FROM customers |
| Aggregation | Column used in aggregate function | SUM(revenue) |
| Filter | Column used in WHERE clause | WHERE region = 'US' |
| Join key | Column used in JOIN condition | ON orders.customer_id = customers.id |
| Derived | Column created by expression | price * quantity AS total |
Integration with OpenMetadata
Lineage events are consumed by the platform lineage service which persists them into OpenMetadata:
AI Service --> Kafka (query-lineage-events) --> Lineage Consumer --> OpenMetadata APIThis enables end-to-end lineage visualization from the original natural language question through SQL generation to the underlying data assets.
Lineage Query API
The AI Service exposes a read-only lineage query endpoint:
| Property | Value |
|---|---|
| Method | GET |
| Path | /api/v1/lineage/query/:query_id |
| Auth | JWT required |
Response
{
"query_id": "q-abc123",
"upstream_tables": ["public.sales", "public.customers"],
"downstream_consumers": [],
"column_lineage": [
{
"output": "sum_revenue",
"inputs": [{"table": "sales", "column": "revenue"}],
"transformation": "SUM"
}
]
}Configuration
| Environment Variable | Default | Description |
|---|---|---|
LINEAGE_ENABLED | true | Enable lineage event production |
LINEAGE_KAFKA_TOPIC | query-lineage-events | Kafka topic for lineage events |
LINEAGE_PARSE_SQL | true | Enable SQL AST parsing for column lineage |