MATIH Platform is in active MVP development. Documentation reflects current implementation status.
14. Context Graph & Ontology
Context Graph

Context Graph Architecture

The Context Graph is a knowledge graph subsystem embedded within the AI Service that captures agent reasoning traces, entity relationships, and semantic patterns. It uses Dgraph as its graph database and integrates with vector stores (Qdrant/Pinecone) for embedding-based similarity search. This section covers the Dgraph storage layer, data models, tenant isolation, and the graph API surface.


Architecture

Context Graph Module
    |
    +-- Models Layer
    |   +-- AgentThinkingTrace
    |   +-- AgentThinkingStep
    |   +-- APICallRecord
    |   +-- TokenUsageRecord
    |
    +-- Storage Layer
    |   +-- DgraphContextStore ------> Dgraph (GraphQL mutations/queries)
    |   +-- EmbeddingStore ----------> Qdrant / Pinecone
    |
    +-- Service Layer
    |   +-- AgentThinkingCaptureService
    |   +-- AgentTraceService
    |   +-- PatternDetectorService
    |   +-- CommunityDetection
    |   +-- SemanticSearchService
    |
    +-- Integration Layer
    |   +-- OrchestratorHooks -------> BI + General orchestrators
    |   +-- MetricsBridge ------------> Observability spans
    |   +-- KafkaProducer/Consumer --> Event streaming
    |
    +-- Security Layer
    |   +-- Authorization
    |   +-- Permissions
    |
    +-- Config Layer
        +-- TenantConfig ------------> Per-tenant feature flags

DgraphContextStore

The DgraphContextStore in context_graph/storage/dgraph_context_store.py provides CRUD operations for thinking traces using the commons DgraphClient:

class DgraphContextStore:
    """Dgraph storage operations for context graph thinking data.
 
    Provides:
    - CRUD for AgentThinkingTrace
    - Batch mutations for thinking steps
    - Tenant-isolated queries
    - Upsert pattern for trace updates
    """
 
    def __init__(
        self,
        client: DgraphClient | None = None,
        config: DgraphConfig | None = None,
    ) -> None:
        if client is not None:
            self._client = client
        else:
            self._client = DgraphClient(config=config)
        self._initialized = False
 
    async def initialize(self) -> None:
        """Initialize the store and deploy schema if needed."""
        healthy = await self._client.health()
        if not healthy:
            logger.warning(
                "Dgraph not reachable, context store in degraded mode"
            )
            return
 
        await self._client.deploy_schema(schema_path=_SCHEMA_PATH)
        self._initialized = True

Schema

The Dgraph schema defines the graph structure:

type AgentThinkingTrace {
    trace_id: String! @id
    tenant_id: String! @search(by: [exact])
    session_id: String! @search(by: [exact])
    actor_urn: String! @search(by: [exact])
    goal: String @search(by: [fulltext])
    outcome: String @search(by: [exact])
    started_at: DateTime @search
    completed_at: DateTime @search
    duration_ms: Float
    steps: [AgentThinkingStep]
    api_calls: [APICallRecord]
    total_input_tokens: Int
    total_output_tokens: Int
    total_cost_usd: Float
    parent_trace_id: String @search(by: [exact])
    input_embedding_id: String
    output_embedding_id: String
    thinking_embedding_id: String
}
 
type AgentThinkingStep {
    step_id: String! @id
    step_type: String! @search(by: [exact])
    reasoning: String @search(by: [fulltext])
    confidence: Float
    duration_ms: Float
    timestamp: DateTime
    metadata: String
}
 
type APICallRecord {
    call_id: String! @id
    api_type: String! @search(by: [exact])
    endpoint: String @search(by: [exact])
    method: String
    status_code: Int
    latency_ms: Float
    request_tokens: Int
    response_tokens: Int
    cost_usd: Float
    error: String
}

Trace CRUD Operations

    async def store_thinking_trace(
        self, trace: AgentThinkingTrace
    ) -> str | None:
        """Persist a new thinking trace to Dgraph."""
        mutation = {
            "set": {
                "dgraph.type": "AgentThinkingTrace",
                "trace_id": trace.trace_id,
                "tenant_id": trace.tenant_id,
                "session_id": trace.session_id,
                "actor_urn": trace.actor_urn,
                "goal": trace.goal,
                "started_at": trace.started_at.isoformat(),
                "steps": [
                    self._step_to_dict(step) for step in trace.steps
                ],
            }
        }
        uid = await self._client.mutate(mutation)
        return uid
 
    async def get_thinking_trace(
        self, trace_id: str, tenant_id: str
    ) -> AgentThinkingTrace | None:
        """Retrieve a thinking trace by ID with tenant isolation."""
        query = """
        query GetTrace($trace_id: string, $tenant_id: string) {
            traces(func: eq(trace_id, $trace_id))
                @filter(eq(tenant_id, $tenant_id)) {
                trace_id
                tenant_id
                session_id
                actor_urn
                goal
                outcome
                started_at
                completed_at
                duration_ms
                steps {
                    step_id
                    step_type
                    reasoning
                    confidence
                    duration_ms
                }
                api_calls {
                    call_id
                    api_type
                    endpoint
                    latency_ms
                    cost_usd
                }
            }
        }
        """
        result = await self._client.query(
            query, variables={"$trace_id": trace_id, "$tenant_id": tenant_id}
        )
        ...

Tenant Isolation

All Context Graph operations are tenant-isolated:

LayerIsolation Mechanism
Dgraph queries@filter(eq(tenant_id, $tenant_id)) on every query
Mutationstenant_id required on every mutation
EmbeddingsMetadata filter {"tenant_id": tenant_id} in vector store
Kafka eventsEvents include tenant_id for partitioning
Feature flagsPer-tenant enable/disable via TenantConfig

Per-Tenant Configuration

# context_graph/config/tenant_config.py
class TenantContextGraphConfig:
    """Per-tenant Context Graph configuration."""
    tenant_id: str
    enabled: bool = True
    thinking_capture_enabled: bool = True
    embedding_enabled: bool = True
    max_traces_per_session: int = 100
    max_steps_per_trace: int = 100
    retention_days: int = 90

Feature Flags

The Context Graph uses semantic feature flags to control feature availability:

# context_graph/services/semantic_feature_flags.py
class SemanticFeature(str, Enum):
    CONTEXT_GRAPH_THINKING = "context_graph_thinking"
    CONTEXT_GRAPH_EMBEDDING = "context_graph_embedding"
    CONTEXT_GRAPH_ANALYTICS = "context_graph_analytics"
    CONTEXT_GRAPH_KAFKA = "context_graph_kafka"
 
class SemanticFeatureFlags:
    """Controls feature availability per tenant."""
 
    def is_enabled(
        self,
        feature: SemanticFeature,
        tenant_id: str | None = None,
    ) -> bool:
        """Check if a feature is enabled."""
        ...

Graceful Degradation

The Context Graph is designed to degrade gracefully when dependencies are unavailable:

# DgraphContextStore initialization
async def initialize(self) -> None:
    healthy = await self._client.health()
    if not healthy:
        logger.warning(
            "Dgraph not reachable, context store in degraded mode"
        )
        return  # Store operates in degraded mode
 
# Orchestrator hooks check enablement
@property
def enabled(self) -> bool:
    return self.service is not None
 
# Tenant-level check
def _is_enabled_for_tenant(self, tenant_id: str) -> bool:
    config = get_tenant_context_graph_config(tenant_id)
    if not config.enabled or not config.thinking_capture_enabled:
        return False
    return True

Graph Services

Pattern Detection

The PatternDetectorService identifies recurring patterns in agent behavior:

class PatternDetectorService:
    """Detects patterns in agent thinking traces."""
 
    async def detect_patterns(
        self,
        tenant_id: str,
        session_ids: list[str],
    ) -> list[Pattern]:
        """Analyze traces for recurring patterns."""
        traces = await self._store.list_traces(
            tenant_id=tenant_id,
            session_ids=session_ids,
        )
 
        # Extract step sequences
        sequences = [
            [step.step_type for step in trace.steps]
            for trace in traces
        ]
 
        # Find frequent subsequences
        patterns = self._find_patterns(sequences)
        return patterns

Community Detection

Groups related entities in the graph:

class CommunityDetectionService:
    """Detects communities in the knowledge graph."""
 
    async def detect_communities(
        self,
        tenant_id: str,
        algorithm: str = "louvain",
    ) -> list[Community]:
        """Detect communities using graph algorithms."""
        ...

Semantic Search

Searches the graph using natural language:

class SemanticSearchService:
    """Semantic search across the context graph."""
 
    async def search(
        self,
        tenant_id: str,
        query: str,
        top_k: int = 10,
    ) -> list[SearchResult]:
        """Search context graph using semantic similarity."""
        # Embed query
        query_embedding = await self._embed(query)
 
        # Search in vector store
        results = await self._embedding_store.search(
            query_vector=query_embedding,
            tenant_id=tenant_id,
            top_k=top_k,
        )
 
        return results

Kafka Integration

The Context Graph streams events to Kafka for downstream processing:

TopicEventsConsumers
context-graph-tracesTrace created, completedAnalytics, learning
context-graph-stepsIndividual thinking stepsReal-time monitoring
context-graph-patternsDetected patternsPattern analytics
context-graph-embeddingsNew embeddings generatedSimilarity index
# context_graph/integration/kafka_producer.py
class ContextGraphKafkaProducer:
    """Publishes context graph events to Kafka."""
 
    async def publish_trace_event(
        self,
        trace: AgentThinkingTrace,
        event_type: str,
    ) -> None:
        await self._producer.send(
            topic="context-graph-traces",
            key=trace.tenant_id.encode(),
            value=json.dumps({
                "event_type": event_type,
                "trace_id": trace.trace_id,
                "tenant_id": trace.tenant_id,
                "timestamp": datetime.utcnow().isoformat(),
            }).encode(),
        )