Context Graph Architecture
The Context Graph is a knowledge graph subsystem embedded within the AI Service that captures agent reasoning traces, entity relationships, and semantic patterns. It uses Dgraph as its graph database and integrates with vector stores (Qdrant/Pinecone) for embedding-based similarity search. This section covers the Dgraph storage layer, data models, tenant isolation, and the graph API surface.
Architecture
Context Graph Module
|
+-- Models Layer
| +-- AgentThinkingTrace
| +-- AgentThinkingStep
| +-- APICallRecord
| +-- TokenUsageRecord
|
+-- Storage Layer
| +-- DgraphContextStore ------> Dgraph (GraphQL mutations/queries)
| +-- EmbeddingStore ----------> Qdrant / Pinecone
|
+-- Service Layer
| +-- AgentThinkingCaptureService
| +-- AgentTraceService
| +-- PatternDetectorService
| +-- CommunityDetection
| +-- SemanticSearchService
|
+-- Integration Layer
| +-- OrchestratorHooks -------> BI + General orchestrators
| +-- MetricsBridge ------------> Observability spans
| +-- KafkaProducer/Consumer --> Event streaming
|
+-- Security Layer
| +-- Authorization
| +-- Permissions
|
+-- Config Layer
+-- TenantConfig ------------> Per-tenant feature flagsDgraphContextStore
The DgraphContextStore in context_graph/storage/dgraph_context_store.py provides CRUD operations for thinking traces using the commons DgraphClient:
class DgraphContextStore:
"""Dgraph storage operations for context graph thinking data.
Provides:
- CRUD for AgentThinkingTrace
- Batch mutations for thinking steps
- Tenant-isolated queries
- Upsert pattern for trace updates
"""
def __init__(
self,
client: DgraphClient | None = None,
config: DgraphConfig | None = None,
) -> None:
if client is not None:
self._client = client
else:
self._client = DgraphClient(config=config)
self._initialized = False
async def initialize(self) -> None:
"""Initialize the store and deploy schema if needed."""
healthy = await self._client.health()
if not healthy:
logger.warning(
"Dgraph not reachable, context store in degraded mode"
)
return
await self._client.deploy_schema(schema_path=_SCHEMA_PATH)
self._initialized = TrueSchema
The Dgraph schema defines the graph structure:
type AgentThinkingTrace {
trace_id: String! @id
tenant_id: String! @search(by: [exact])
session_id: String! @search(by: [exact])
actor_urn: String! @search(by: [exact])
goal: String @search(by: [fulltext])
outcome: String @search(by: [exact])
started_at: DateTime @search
completed_at: DateTime @search
duration_ms: Float
steps: [AgentThinkingStep]
api_calls: [APICallRecord]
total_input_tokens: Int
total_output_tokens: Int
total_cost_usd: Float
parent_trace_id: String @search(by: [exact])
input_embedding_id: String
output_embedding_id: String
thinking_embedding_id: String
}
type AgentThinkingStep {
step_id: String! @id
step_type: String! @search(by: [exact])
reasoning: String @search(by: [fulltext])
confidence: Float
duration_ms: Float
timestamp: DateTime
metadata: String
}
type APICallRecord {
call_id: String! @id
api_type: String! @search(by: [exact])
endpoint: String @search(by: [exact])
method: String
status_code: Int
latency_ms: Float
request_tokens: Int
response_tokens: Int
cost_usd: Float
error: String
}Trace CRUD Operations
async def store_thinking_trace(
self, trace: AgentThinkingTrace
) -> str | None:
"""Persist a new thinking trace to Dgraph."""
mutation = {
"set": {
"dgraph.type": "AgentThinkingTrace",
"trace_id": trace.trace_id,
"tenant_id": trace.tenant_id,
"session_id": trace.session_id,
"actor_urn": trace.actor_urn,
"goal": trace.goal,
"started_at": trace.started_at.isoformat(),
"steps": [
self._step_to_dict(step) for step in trace.steps
],
}
}
uid = await self._client.mutate(mutation)
return uid
async def get_thinking_trace(
self, trace_id: str, tenant_id: str
) -> AgentThinkingTrace | None:
"""Retrieve a thinking trace by ID with tenant isolation."""
query = """
query GetTrace($trace_id: string, $tenant_id: string) {
traces(func: eq(trace_id, $trace_id))
@filter(eq(tenant_id, $tenant_id)) {
trace_id
tenant_id
session_id
actor_urn
goal
outcome
started_at
completed_at
duration_ms
steps {
step_id
step_type
reasoning
confidence
duration_ms
}
api_calls {
call_id
api_type
endpoint
latency_ms
cost_usd
}
}
}
"""
result = await self._client.query(
query, variables={"$trace_id": trace_id, "$tenant_id": tenant_id}
)
...Tenant Isolation
All Context Graph operations are tenant-isolated:
| Layer | Isolation Mechanism |
|---|---|
| Dgraph queries | @filter(eq(tenant_id, $tenant_id)) on every query |
| Mutations | tenant_id required on every mutation |
| Embeddings | Metadata filter {"tenant_id": tenant_id} in vector store |
| Kafka events | Events include tenant_id for partitioning |
| Feature flags | Per-tenant enable/disable via TenantConfig |
Per-Tenant Configuration
# context_graph/config/tenant_config.py
class TenantContextGraphConfig:
"""Per-tenant Context Graph configuration."""
tenant_id: str
enabled: bool = True
thinking_capture_enabled: bool = True
embedding_enabled: bool = True
max_traces_per_session: int = 100
max_steps_per_trace: int = 100
retention_days: int = 90Feature Flags
The Context Graph uses semantic feature flags to control feature availability:
# context_graph/services/semantic_feature_flags.py
class SemanticFeature(str, Enum):
CONTEXT_GRAPH_THINKING = "context_graph_thinking"
CONTEXT_GRAPH_EMBEDDING = "context_graph_embedding"
CONTEXT_GRAPH_ANALYTICS = "context_graph_analytics"
CONTEXT_GRAPH_KAFKA = "context_graph_kafka"
class SemanticFeatureFlags:
"""Controls feature availability per tenant."""
def is_enabled(
self,
feature: SemanticFeature,
tenant_id: str | None = None,
) -> bool:
"""Check if a feature is enabled."""
...Graceful Degradation
The Context Graph is designed to degrade gracefully when dependencies are unavailable:
# DgraphContextStore initialization
async def initialize(self) -> None:
healthy = await self._client.health()
if not healthy:
logger.warning(
"Dgraph not reachable, context store in degraded mode"
)
return # Store operates in degraded mode
# Orchestrator hooks check enablement
@property
def enabled(self) -> bool:
return self.service is not None
# Tenant-level check
def _is_enabled_for_tenant(self, tenant_id: str) -> bool:
config = get_tenant_context_graph_config(tenant_id)
if not config.enabled or not config.thinking_capture_enabled:
return False
return TrueGraph Services
Pattern Detection
The PatternDetectorService identifies recurring patterns in agent behavior:
class PatternDetectorService:
"""Detects patterns in agent thinking traces."""
async def detect_patterns(
self,
tenant_id: str,
session_ids: list[str],
) -> list[Pattern]:
"""Analyze traces for recurring patterns."""
traces = await self._store.list_traces(
tenant_id=tenant_id,
session_ids=session_ids,
)
# Extract step sequences
sequences = [
[step.step_type for step in trace.steps]
for trace in traces
]
# Find frequent subsequences
patterns = self._find_patterns(sequences)
return patternsCommunity Detection
Groups related entities in the graph:
class CommunityDetectionService:
"""Detects communities in the knowledge graph."""
async def detect_communities(
self,
tenant_id: str,
algorithm: str = "louvain",
) -> list[Community]:
"""Detect communities using graph algorithms."""
...Semantic Search
Searches the graph using natural language:
class SemanticSearchService:
"""Semantic search across the context graph."""
async def search(
self,
tenant_id: str,
query: str,
top_k: int = 10,
) -> list[SearchResult]:
"""Search context graph using semantic similarity."""
# Embed query
query_embedding = await self._embed(query)
# Search in vector store
results = await self._embedding_store.search(
query_vector=query_embedding,
tenant_id=tenant_id,
top_k=top_k,
)
return resultsKafka Integration
The Context Graph streams events to Kafka for downstream processing:
| Topic | Events | Consumers |
|---|---|---|
context-graph-traces | Trace created, completed | Analytics, learning |
context-graph-steps | Individual thinking steps | Real-time monitoring |
context-graph-patterns | Detected patterns | Pattern analytics |
context-graph-embeddings | New embeddings generated | Similarity index |
# context_graph/integration/kafka_producer.py
class ContextGraphKafkaProducer:
"""Publishes context graph events to Kafka."""
async def publish_trace_event(
self,
trace: AgentThinkingTrace,
event_type: str,
) -> None:
await self._producer.send(
topic="context-graph-traces",
key=trace.tenant_id.encode(),
value=json.dumps({
"event_type": event_type,
"trace_id": trace.trace_id,
"tenant_id": trace.tenant_id,
"timestamp": datetime.utcnow().isoformat(),
}).encode(),
)