MATIH Platform is in active MVP development. Documentation reflects current implementation status.
14. Context Graph & Ontology
Storage Backends
Dgraph Context Store

Dgraph Context Store

The DgraphContextStore provides CRUD operations for AgentThinkingTrace, AgentThinkingStep, and APICallRecord entities using the Dgraph GraphQL API. All queries are tenant-isolated and support batch mutations for high-throughput ingestion.


Overview

Dgraph serves as the primary graph database for the Context Graph, storing agent thinking traces as first-class graph entities with rich relationships. The store uses the commons DgraphClient and follows the repository pattern from the ontology service.

Source: data-plane/ai-service/src/context_graph/storage/dgraph_context_store.py


Schema

The Dgraph schema is deployed automatically on initialization from a GraphQL schema file. Key types include:

TypeDescriptionKey Fields
AgentThinkingTraceTop-level trace for an agent executiontraceId, tenantId, sessionId, goal, status, outcome
AgentThinkingStepIndividual reasoning step within a tracestepId, stepType, reasoning, confidence, durationMs
APICallRecordExternal API call made during a tracecallId, apiType, endpoint, latencyMs, costUsd

Initialization

The store follows the singleton pattern with automatic schema deployment:

from context_graph.storage.dgraph_context_store import (
    create_dgraph_context_store,
    get_dgraph_context_store,
)
 
# Create and initialize
store = create_dgraph_context_store(config=dgraph_config)
await store.initialize()
 
# Retrieve singleton
store = get_dgraph_context_store()

During initialization, the store checks Dgraph connectivity and deploys the GraphQL schema. If Dgraph is unreachable, the store operates in degraded mode with a warning logged.


CRUD Operations

Store a Thinking Trace

trace_id = await store.store_thinking_trace(trace)

Uses a GraphQL upsert mutation. Returns the Dgraph-assigned ID or None on failure.

Update a Thinking Trace

success = await store.update_thinking_trace(trace)

Patches fields including status, outcome, token counts, cost, and embedding IDs.

Get a Thinking Trace

trace_data = await store.get_thinking_trace(
    trace_id="trace-123",
    tenant_id="acme",
)

Returns the full trace with nested steps and API calls, scoped to the specified tenant.

Query Thinking Traces

traces = await store.query_thinking_traces(
    tenant_id="acme",
    session_id="sess-456",
    status="completed",
    limit=50,
    offset=0,
)

Supports filtering by session_id, actor_urn, and status. Results are ordered by startedAt descending.


Batch Operations

Store Thinking Steps

count = await store.store_thinking_steps(
    trace_id="trace-123",
    steps=[step1, step2, step3],
)

Batch-stores multiple steps in a single mutation. Each step includes its reasoning_hash for deduplication.

Store API Call Records

call_id = await store.store_api_call(
    trace_id="trace-123",
    call=api_call_record,
)

Analytics Queries

Model Performance Statistics

stats = await store.get_model_performance_stats(
    tenant_id="acme",
    model_id="gpt-4",  # optional filter
)

Returns per-model metrics including duration, confidence, token counts, and estimated cost.

Path Analysis

paths = await store.get_path_analysis(
    tenant_id="acme",
    limit=20,
)

Returns completed traces with their pathTaken, duration, cost, and outcome for frequency analysis.


Health Check

health = await store.health_check()
# {"backend": "dgraph", "status": "healthy", "initialized": true}

Configuration

ParameterSourceDescription
DGRAPH_GRPC_URLEnvironment variableDgraph gRPC endpoint
DGRAPH_HTTP_URLEnvironment variableDgraph HTTP/GraphQL endpoint
Schema pathstorage/dgraph/schema.graphqlGraphQL schema file