MATIH Platform is in active MVP development. Documentation reflects current implementation status.
14. Context Graph & Ontology
Knowledge Graph

Knowledge Graph Construction

The Context Graph builds its knowledge graph from multiple sources: agent reasoning traces, entity extraction from conversations, schema metadata synchronization, and explicit ontology definitions. This section covers the graph construction pipeline, entity extraction, relationship discovery, incremental synchronization, and knowledge graph maintenance.


Construction Pipeline

Sources
    |
    +-- Agent Thinking Traces ------+
    |                                |
    +-- Conversation History --------+
    |                                |
    +-- Schema Metadata (Catalog) ---+----> Entity Extraction
    |                                |          |
    +-- Ontology Definitions --------+          v
                                         Relationship Discovery
                                               |
                                               v
                                         Graph Mutation (Dgraph)
                                               |
                                               v
                                         Embedding Generation
                                               |
                                               v
                                         Vector Store (Qdrant/Pinecone)

Entity Extraction

The ConceptExtractor in context_graph/services/concept_extractor.py extracts entities from agent interactions:

class ConceptExtractor:
    """Extracts entities and concepts from text."""
 
    async def extract(
        self,
        text: str,
        tenant_id: str,
        context: dict[str, Any] | None = None,
    ) -> list[ExtractedEntity]:
        """Extract entities from text."""
        # Use LLM for entity extraction
        response = await self._llm.chat([
            {"role": "system", "content": EXTRACTION_PROMPT},
            {"role": "user", "content": text},
        ])
 
        entities = self._parse_entities(response["content"])
 
        # Resolve against existing ontology
        resolved = await self._resolve_entities(
            entities, tenant_id
        )
 
        return resolved

Extracted Entity Types

Entity TypeSourceExample
TableSchema metadata, SQL queriesanalytics.sales.orders
ColumnSchema metadata, SQL queriesorders.amount
MetricConversation, semantic layer"total revenue"
DimensionConversation, semantic layer"region", "product category"
Business conceptConversation"customer churn", "retention rate"
Time periodConversation"Q4 2025", "last month"
Filter valueConversation, SQL"status = completed"

Knowledge Graph Synchronization

The IncrementalSync service keeps the knowledge graph synchronized with upstream sources:

class IncrementalSyncService:
    """Incrementally synchronizes external data into the knowledge graph."""
 
    async def sync_schema(
        self,
        tenant_id: str,
    ) -> SyncResult:
        """Synchronize data catalog schema into knowledge graph."""
        # Get current catalog metadata
        catalog = await self._catalog_client.get_tables(tenant_id)
 
        # Get existing graph entities
        existing = await self._store.get_schema_entities(tenant_id)
 
        # Compute diff
        to_add, to_update, to_remove = self._compute_diff(
            catalog, existing
        )
 
        # Apply mutations
        for entity in to_add:
            await self._store.create_entity(entity)
        for entity in to_update:
            await self._store.update_entity(entity)
        for entity in to_remove:
            await self._store.mark_deprecated(entity)
 
        return SyncResult(
            added=len(to_add),
            updated=len(to_update),
            deprecated=len(to_remove),
        )
 
    async def sync_ontology(
        self,
        tenant_id: str,
    ) -> SyncResult:
        """Synchronize ontology definitions into knowledge graph."""
        # Pull ontology from Ontology Service
        ontology = await self._ontology_client.get_types(tenant_id)
 
        for entity_type in ontology:
            await self._store.upsert_ontology_entity(entity_type)
 
        return SyncResult(added=len(ontology))

Sync Schedule

SourceSync FrequencyMethod
Data CatalogEvery 15 minutesIncremental (change detection)
Ontology ServiceOn change (webhook)Full sync per entity
Agent tracesReal-timeStream processing
ConversationsPer messageAsync extraction

Relationship Discovery

The system discovers relationships between entities automatically:

Schema-Based Relationships

class SchemaRelationshipDiscoverer:
    """Discovers relationships from schema metadata."""
 
    async def discover(
        self,
        tenant_id: str,
    ) -> list[DiscoveredRelationship]:
        """Discover relationships from foreign keys and naming conventions."""
        tables = await self._catalog_client.get_tables(tenant_id)
        relationships = []
 
        for table in tables:
            # Explicit foreign keys
            for fk in table.foreign_keys:
                relationships.append(DiscoveredRelationship(
                    source=table.fqn,
                    target=fk.referenced_table,
                    type="FOREIGN_KEY",
                    confidence=1.0,
                    columns=(fk.column, fk.referenced_column),
                ))
 
            # Name-based inference (e.g., customer_id -> customers.id)
            for column in table.columns:
                if column.name.endswith("_id"):
                    entity = column.name[:-3] + "s"  # customer_id -> customers
                    inferred = self._find_table(tables, entity)
                    if inferred:
                        relationships.append(DiscoveredRelationship(
                            source=table.fqn,
                            target=inferred.fqn,
                            type="INFERRED_FK",
                            confidence=0.8,
                        ))
 
        return relationships

Usage-Based Relationships

class UsageRelationshipDiscoverer:
    """Discovers relationships from query patterns."""
 
    async def discover(
        self,
        tenant_id: str,
        window_days: int = 30,
    ) -> list[DiscoveredRelationship]:
        """Discover relationships from co-occurrence in queries."""
        # Analyze historical queries
        queries = await self._get_queries(tenant_id, window_days)
 
        # Count table co-occurrences in JOINs
        join_pairs = self._extract_join_pairs(queries)
 
        # Tables frequently JOINed together imply a relationship
        relationships = []
        for (table_a, table_b), count in join_pairs.items():
            if count >= self._min_occurrence:
                relationships.append(DiscoveredRelationship(
                    source=table_a,
                    target=table_b,
                    type="USAGE_BASED",
                    confidence=min(count / 100, 0.9),
                    evidence=f"Co-occurred in {count} queries",
                ))
 
        return relationships

Graph Structure

The knowledge graph stores entities as nodes and relationships as edges in Dgraph:

Node Types

Node TypeURN PatternProperties
Tableurn:matih:table:{tenant}:{catalog}.{schema}.{table}Name, description, columns, stats
Columnurn:matih:column:{tenant}:{table}.{column}Name, type, description, tags
Metricurn:matih:metric:{tenant}:{metric_name}Expression, type, dimensions
Agenturn:matih:agent:{tenant}:{agent_id}Name, type, capabilities
Traceurn:matih:trace:{tenant}:{trace_id}Goal, outcome, duration
Userurn:matih:user:{tenant}:{user_id}Name, role, preferences
Patternurn:matih:pattern:{tenant}:{pattern_id}Description, frequency

Edge Types

Edge TypeSourceTargetProperties
HAS_COLUMNTableColumnPosition, primary key
REFERENCESColumnColumnForeign key
USES_TABLETraceTableAccess count
PRODUCES_METRICAgentMetricFrequency
ASKED_BYTraceUserTimestamp
SIMILAR_TOTraceTraceSimilarity score
FOLLOWSPatternPatternSequence order

Memory Consolidation

The MemoryConsolidationService compacts the knowledge graph by merging redundant nodes:

class MemoryConsolidationService:
    """Consolidates and compacts the knowledge graph."""
 
    async def consolidate(
        self,
        tenant_id: str,
    ) -> ConsolidationResult:
        """Consolidate redundant graph entries."""
        # Find duplicate entities
        duplicates = await self._find_duplicates(tenant_id)
 
        # Merge duplicates
        merged = 0
        for group in duplicates:
            canonical = self._select_canonical(group)
            for duplicate in group:
                if duplicate.urn != canonical.urn:
                    await self._merge(canonical, duplicate)
                    merged += 1
 
        # Prune stale edges
        pruned = await self._prune_stale_edges(
            tenant_id, max_age_days=90
        )
 
        return ConsolidationResult(
            duplicates_merged=merged,
            edges_pruned=pruned,
        )

Entity Backfill

The EntityBackfillJob populates the graph for existing data:

class EntityBackfillJob:
    """Backfills knowledge graph from existing data sources."""
 
    async def run(
        self,
        tenant_id: str,
        sources: list[str] = ["catalog", "ontology", "history"],
    ) -> BackfillResult:
        """Run backfill job for a tenant."""
        result = BackfillResult()
 
        if "catalog" in sources:
            result.catalog = await self._backfill_catalog(tenant_id)
 
        if "ontology" in sources:
            result.ontology = await self._backfill_ontology(tenant_id)
 
        if "history" in sources:
            result.history = await self._backfill_history(tenant_id)
 
        return result