Dgraph Context Store
The DgraphContextStore provides CRUD operations for AgentThinkingTrace, AgentThinkingStep, and APICallRecord entities using the Dgraph GraphQL API. All queries are tenant-isolated and support batch mutations for high-throughput ingestion.
Overview
Dgraph serves as the primary graph database for the Context Graph, storing agent thinking traces as first-class graph entities with rich relationships. The store uses the commons DgraphClient and follows the repository pattern from the ontology service.
Source: data-plane/ai-service/src/context_graph/storage/dgraph_context_store.py
Schema
The Dgraph schema is deployed automatically on initialization from a GraphQL schema file. Key types include:
| Type | Description | Key Fields |
|---|---|---|
AgentThinkingTrace | Top-level trace for an agent execution | traceId, tenantId, sessionId, goal, status, outcome |
AgentThinkingStep | Individual reasoning step within a trace | stepId, stepType, reasoning, confidence, durationMs |
APICallRecord | External API call made during a trace | callId, apiType, endpoint, latencyMs, costUsd |
Initialization
The store follows the singleton pattern with automatic schema deployment:
from context_graph.storage.dgraph_context_store import (
create_dgraph_context_store,
get_dgraph_context_store,
)
# Create and initialize
store = create_dgraph_context_store(config=dgraph_config)
await store.initialize()
# Retrieve singleton
store = get_dgraph_context_store()During initialization, the store checks Dgraph connectivity and deploys the GraphQL schema. If Dgraph is unreachable, the store operates in degraded mode with a warning logged.
CRUD Operations
Store a Thinking Trace
trace_id = await store.store_thinking_trace(trace)Uses a GraphQL upsert mutation. Returns the Dgraph-assigned ID or None on failure.
Update a Thinking Trace
success = await store.update_thinking_trace(trace)Patches fields including status, outcome, token counts, cost, and embedding IDs.
Get a Thinking Trace
trace_data = await store.get_thinking_trace(
trace_id="trace-123",
tenant_id="acme",
)Returns the full trace with nested steps and API calls, scoped to the specified tenant.
Query Thinking Traces
traces = await store.query_thinking_traces(
tenant_id="acme",
session_id="sess-456",
status="completed",
limit=50,
offset=0,
)Supports filtering by session_id, actor_urn, and status. Results are ordered by startedAt descending.
Batch Operations
Store Thinking Steps
count = await store.store_thinking_steps(
trace_id="trace-123",
steps=[step1, step2, step3],
)Batch-stores multiple steps in a single mutation. Each step includes its reasoning_hash for deduplication.
Store API Call Records
call_id = await store.store_api_call(
trace_id="trace-123",
call=api_call_record,
)Analytics Queries
Model Performance Statistics
stats = await store.get_model_performance_stats(
tenant_id="acme",
model_id="gpt-4", # optional filter
)Returns per-model metrics including duration, confidence, token counts, and estimated cost.
Path Analysis
paths = await store.get_path_analysis(
tenant_id="acme",
limit=20,
)Returns completed traces with their pathTaken, duration, cost, and outcome for frequency analysis.
Health Check
health = await store.health_check()
# {"backend": "dgraph", "status": "healthy", "initialized": true}Configuration
| Parameter | Source | Description |
|---|---|---|
DGRAPH_GRPC_URL | Environment variable | Dgraph gRPC endpoint |
DGRAPH_HTTP_URL | Environment variable | Dgraph HTTP/GraphQL endpoint |
| Schema path | storage/dgraph/schema.graphql | GraphQL schema file |