Knowledge Graph Construction
The Context Graph builds its knowledge graph from multiple sources: agent reasoning traces, entity extraction from conversations, schema metadata synchronization, and explicit ontology definitions. This section covers the graph construction pipeline, entity extraction, relationship discovery, incremental synchronization, and knowledge graph maintenance.
Construction Pipeline
Sources
|
+-- Agent Thinking Traces ------+
| |
+-- Conversation History --------+
| |
+-- Schema Metadata (Catalog) ---+----> Entity Extraction
| | |
+-- Ontology Definitions --------+ v
Relationship Discovery
|
v
Graph Mutation (Dgraph)
|
v
Embedding Generation
|
v
Vector Store (Qdrant/Pinecone)Entity Extraction
The ConceptExtractor in context_graph/services/concept_extractor.py extracts entities from agent interactions:
class ConceptExtractor:
"""Extracts entities and concepts from text."""
async def extract(
self,
text: str,
tenant_id: str,
context: dict[str, Any] | None = None,
) -> list[ExtractedEntity]:
"""Extract entities from text."""
# Use LLM for entity extraction
response = await self._llm.chat([
{"role": "system", "content": EXTRACTION_PROMPT},
{"role": "user", "content": text},
])
entities = self._parse_entities(response["content"])
# Resolve against existing ontology
resolved = await self._resolve_entities(
entities, tenant_id
)
return resolvedExtracted Entity Types
| Entity Type | Source | Example |
|---|---|---|
| Table | Schema metadata, SQL queries | analytics.sales.orders |
| Column | Schema metadata, SQL queries | orders.amount |
| Metric | Conversation, semantic layer | "total revenue" |
| Dimension | Conversation, semantic layer | "region", "product category" |
| Business concept | Conversation | "customer churn", "retention rate" |
| Time period | Conversation | "Q4 2025", "last month" |
| Filter value | Conversation, SQL | "status = completed" |
Knowledge Graph Synchronization
The IncrementalSync service keeps the knowledge graph synchronized with upstream sources:
class IncrementalSyncService:
"""Incrementally synchronizes external data into the knowledge graph."""
async def sync_schema(
self,
tenant_id: str,
) -> SyncResult:
"""Synchronize data catalog schema into knowledge graph."""
# Get current catalog metadata
catalog = await self._catalog_client.get_tables(tenant_id)
# Get existing graph entities
existing = await self._store.get_schema_entities(tenant_id)
# Compute diff
to_add, to_update, to_remove = self._compute_diff(
catalog, existing
)
# Apply mutations
for entity in to_add:
await self._store.create_entity(entity)
for entity in to_update:
await self._store.update_entity(entity)
for entity in to_remove:
await self._store.mark_deprecated(entity)
return SyncResult(
added=len(to_add),
updated=len(to_update),
deprecated=len(to_remove),
)
async def sync_ontology(
self,
tenant_id: str,
) -> SyncResult:
"""Synchronize ontology definitions into knowledge graph."""
# Pull ontology from Ontology Service
ontology = await self._ontology_client.get_types(tenant_id)
for entity_type in ontology:
await self._store.upsert_ontology_entity(entity_type)
return SyncResult(added=len(ontology))Sync Schedule
| Source | Sync Frequency | Method |
|---|---|---|
| Data Catalog | Every 15 minutes | Incremental (change detection) |
| Ontology Service | On change (webhook) | Full sync per entity |
| Agent traces | Real-time | Stream processing |
| Conversations | Per message | Async extraction |
Relationship Discovery
The system discovers relationships between entities automatically:
Schema-Based Relationships
class SchemaRelationshipDiscoverer:
"""Discovers relationships from schema metadata."""
async def discover(
self,
tenant_id: str,
) -> list[DiscoveredRelationship]:
"""Discover relationships from foreign keys and naming conventions."""
tables = await self._catalog_client.get_tables(tenant_id)
relationships = []
for table in tables:
# Explicit foreign keys
for fk in table.foreign_keys:
relationships.append(DiscoveredRelationship(
source=table.fqn,
target=fk.referenced_table,
type="FOREIGN_KEY",
confidence=1.0,
columns=(fk.column, fk.referenced_column),
))
# Name-based inference (e.g., customer_id -> customers.id)
for column in table.columns:
if column.name.endswith("_id"):
entity = column.name[:-3] + "s" # customer_id -> customers
inferred = self._find_table(tables, entity)
if inferred:
relationships.append(DiscoveredRelationship(
source=table.fqn,
target=inferred.fqn,
type="INFERRED_FK",
confidence=0.8,
))
return relationshipsUsage-Based Relationships
class UsageRelationshipDiscoverer:
"""Discovers relationships from query patterns."""
async def discover(
self,
tenant_id: str,
window_days: int = 30,
) -> list[DiscoveredRelationship]:
"""Discover relationships from co-occurrence in queries."""
# Analyze historical queries
queries = await self._get_queries(tenant_id, window_days)
# Count table co-occurrences in JOINs
join_pairs = self._extract_join_pairs(queries)
# Tables frequently JOINed together imply a relationship
relationships = []
for (table_a, table_b), count in join_pairs.items():
if count >= self._min_occurrence:
relationships.append(DiscoveredRelationship(
source=table_a,
target=table_b,
type="USAGE_BASED",
confidence=min(count / 100, 0.9),
evidence=f"Co-occurred in {count} queries",
))
return relationshipsGraph Structure
The knowledge graph stores entities as nodes and relationships as edges in Dgraph:
Node Types
| Node Type | URN Pattern | Properties |
|---|---|---|
| Table | urn:matih:table:{tenant}:{catalog}.{schema}.{table} | Name, description, columns, stats |
| Column | urn:matih:column:{tenant}:{table}.{column} | Name, type, description, tags |
| Metric | urn:matih:metric:{tenant}:{metric_name} | Expression, type, dimensions |
| Agent | urn:matih:agent:{tenant}:{agent_id} | Name, type, capabilities |
| Trace | urn:matih:trace:{tenant}:{trace_id} | Goal, outcome, duration |
| User | urn:matih:user:{tenant}:{user_id} | Name, role, preferences |
| Pattern | urn:matih:pattern:{tenant}:{pattern_id} | Description, frequency |
Edge Types
| Edge Type | Source | Target | Properties |
|---|---|---|---|
HAS_COLUMN | Table | Column | Position, primary key |
REFERENCES | Column | Column | Foreign key |
USES_TABLE | Trace | Table | Access count |
PRODUCES_METRIC | Agent | Metric | Frequency |
ASKED_BY | Trace | User | Timestamp |
SIMILAR_TO | Trace | Trace | Similarity score |
FOLLOWS | Pattern | Pattern | Sequence order |
Memory Consolidation
The MemoryConsolidationService compacts the knowledge graph by merging redundant nodes:
class MemoryConsolidationService:
"""Consolidates and compacts the knowledge graph."""
async def consolidate(
self,
tenant_id: str,
) -> ConsolidationResult:
"""Consolidate redundant graph entries."""
# Find duplicate entities
duplicates = await self._find_duplicates(tenant_id)
# Merge duplicates
merged = 0
for group in duplicates:
canonical = self._select_canonical(group)
for duplicate in group:
if duplicate.urn != canonical.urn:
await self._merge(canonical, duplicate)
merged += 1
# Prune stale edges
pruned = await self._prune_stale_edges(
tenant_id, max_age_days=90
)
return ConsolidationResult(
duplicates_merged=merged,
edges_pruned=pruned,
)Entity Backfill
The EntityBackfillJob populates the graph for existing data:
class EntityBackfillJob:
"""Backfills knowledge graph from existing data sources."""
async def run(
self,
tenant_id: str,
sources: list[str] = ["catalog", "ontology", "history"],
) -> BackfillResult:
"""Run backfill job for a tenant."""
result = BackfillResult()
if "catalog" in sources:
result.catalog = await self._backfill_catalog(tenant_id)
if "ontology" in sources:
result.ontology = await self._backfill_ontology(tenant_id)
if "history" in sources:
result.history = await self._backfill_history(tenant_id)
return result