Embedding Generation
The Context Graph generates multiple types of embeddings to enable semantic search, similarity matching, and pattern analysis across the knowledge graph. The embedding subsystem includes structural embeddings via Node2Vec, trajectory embeddings for agent paths, thinking embeddings for reasoning traces, and general similarity embeddings. All embeddings are stored in the vector store (Qdrant/Pinecone) with tenant-scoped metadata for isolated retrieval.
Embedding Architecture
Knowledge Graph (Dgraph)
|
+-- Node2Vec Walker ------> Structural Embeddings
| |
+-- Trajectory Encoder ----> Trajectory Embeddings
| |
+-- Thinking Encoder ------> Thinking Embeddings
| |
+-- Model Providers -------> General Embeddings
|
v
Vector Store (Qdrant/Pinecone)
|
v
Similarity Search APIEmbedding Types
| Type | Source Data | Purpose | Dimensions |
|---|---|---|---|
| Structural | Graph topology | Capture node roles and neighborhoods | 128 |
| Trajectory | Agent execution paths | Match similar reasoning paths | 128 |
| Thinking (Input) | User query text | Find similar questions | 768 |
| Thinking (Output) | Agent response text | Find similar answers | 768 |
| Thinking (Reasoning) | Concatenated reasoning steps | Match reasoning patterns | 768 |
| Entity Semantic | Entity name + description | Semantic entity search | 768 |
Structural Embeddings (Node2Vec)
The StructuralEmbedder in context_graph/embeddings/structural.py generates embeddings that capture graph topology:
Node2Vec Configuration
class Node2VecConfig(BaseModel):
"""Configuration for Node2Vec embeddings."""
# Walk parameters
num_walks: int = 10 # Number of walks per node
walk_length: int = 80 # Length of each random walk
p: float = 1.0 # Return parameter (controls revisiting)
q: float = 0.5 # In-out parameter (BFS vs DFS)
# Skip-gram parameters
embedding_dim: int = 128 # Embedding dimension
window_size: int = 5 # Skip-gram window size
min_count: int = 1 # Minimum word count
epochs: int = 5 # Training epochs
learning_rate: float = 0.025 # Initial learning rate
# Graph options
directed: bool = True # Treat graph as directed
weighted: bool = True # Use edge weights
seed: int = 42 # Random seedHow Node2Vec Works
Node2Vec generates embeddings by:
- Random walks: Perform biased random walks from each node
- Walk bias: Parameters
pandqcontrol walk behavior:- Low
p(return parameter): Walker tends to revisit nodes, capturing local structure - Low
q(in-out parameter): Walker explores outward, capturing global structure
- Low
- Skip-gram training: Treat walk sequences like sentences, train Word2Vec-style
class StructuralEmbedder:
"""Generates structural embeddings using Node2Vec."""
def generate_walks(
self,
graph: dict[str, list[GraphEdge]],
) -> list[list[str]]:
"""Generate biased random walks."""
walks = []
nodes = list(graph.keys())
for _ in range(self._config.num_walks):
random.shuffle(nodes)
for node in nodes:
walk = self._biased_walk(graph, node)
walks.append(walk)
return walks
def _biased_walk(
self,
graph: dict[str, list[GraphEdge]],
start: str,
) -> list[str]:
"""Perform a single biased random walk."""
walk = [start]
prev = None
for _ in range(self._config.walk_length - 1):
current = walk[-1]
neighbors = graph.get(current, [])
if not neighbors:
break
# Calculate transition probabilities
probs = []
for edge in neighbors:
if edge.target == prev:
# Return to previous node
probs.append(edge.weight / self._config.p)
elif prev and edge.target in self._get_neighbors(graph, prev):
# BFS-like: neighbor of previous node
probs.append(edge.weight)
else:
# DFS-like: explore further
probs.append(edge.weight / self._config.q)
# Normalize and sample
total = sum(probs)
probs = [p / total for p in probs]
next_node = random.choices(
[e.target for e in neighbors],
weights=probs,
)[0]
walk.append(next_node)
prev = current
return walkWhen to Use
| p, q Values | Behavior | Best For |
|---|---|---|
| p=1, q=1 | Unbiased (classic random walk) | General purpose |
| p=1, q=0.5 | Prefer structural equivalence | Finding similar roles |
| p=0.5, q=1 | Prefer local neighbors | Community detection |
| p=0.25, q=4 | Strong BFS bias | Structural roles |
Trajectory Embeddings
Trajectory embeddings capture the execution paths of agents:
# context_graph/embeddings/trajectory.py
class TrajectoryEmbedder:
"""Generates embeddings for agent execution trajectories."""
async def embed_trajectory(
self,
steps: list[str],
metadata: dict[str, Any] | None = None,
) -> list[float]:
"""Embed a sequence of agent steps."""
# Concatenate steps into a trajectory string
trajectory_text = " -> ".join(steps)
# Add metadata context
if metadata:
context = json.dumps(metadata)
trajectory_text = f"{trajectory_text} | Context: {context}"
# Generate embedding
embedding = await self._model.embed(trajectory_text)
return embeddingTrajectory Representation
An agent trajectory is a sequence of step types:
INTENT_ANALYSIS -> SQL_GENERATION -> QUERY_EXECUTION ->
DATA_ANALYSIS -> VISUALIZATION_SELECTION -> RESPONSE_FORMATTINGEmbedding these trajectories allows finding sessions with similar reasoning patterns, even if the specific queries differ.
Thinking Embeddings
The ThinkingEmbeddingService in context_graph/embeddings/thinking_embeddings.py generates three types of embeddings per thinking trace:
class ThinkingEmbeddingService:
"""Generates embeddings for agent thinking traces.
Three embedding types:
- Input embedding: User query -> vector (similar questions)
- Output embedding: Agent response -> vector (result similarity)
- Thinking embedding: Concatenated reasoning -> vector (path analysis)
"""
async def embed_thinking_trace(
self,
trace: AgentThinkingTrace,
) -> ThinkingEmbeddings:
"""Generate all three embeddings for a thinking trace."""
result = ThinkingEmbeddings(trace_id=trace.trace_id)
# Input embedding: user's goal/query
if trace.goal:
input_vector = await self._model.embed(trace.goal)
input_id = await self.store.upsert_embedding(
entity_urn=f"{trace.get_urn()}:input",
embedding_type=EmbeddingType.ENTITY_SEMANTIC,
values=input_vector,
tenant_id=trace.tenant_id,
metadata={
"trace_id": trace.trace_id,
"embedding_kind": "thinking_input",
"goal": trace.goal[:200],
},
)
result.input_embedding_id = input_id
# Output embedding: agent's response
if trace.outcome:
output_vector = await self._model.embed(trace.outcome)
output_id = await self.store.upsert_embedding(
entity_urn=f"{trace.get_urn()}:output",
embedding_type=EmbeddingType.ENTITY_SEMANTIC,
values=output_vector,
tenant_id=trace.tenant_id,
metadata={
"trace_id": trace.trace_id,
"embedding_kind": "thinking_output",
},
)
result.output_embedding_id = output_id
# Thinking embedding: concatenated reasoning steps
reasoning_text = " ".join(
step.reasoning for step in trace.steps
if step.reasoning
)
if reasoning_text:
thinking_vector = await self._model.embed(reasoning_text)
thinking_id = await self.store.upsert_embedding(
entity_urn=f"{trace.get_urn()}:thinking",
embedding_type=EmbeddingType.ENTITY_SEMANTIC,
values=thinking_vector,
tenant_id=trace.tenant_id,
metadata={
"trace_id": trace.trace_id,
"embedding_kind": "thinking_reasoning",
"step_count": len(trace.steps),
},
)
result.thinking_embedding_id = thinking_id
return resultSimilarity Search
class SimilarTrace(BaseModel):
"""A similar thinking trace from search."""
trace_id: str
similarity_score: float
goal: str = ""
path_taken: list[str] = Field(default_factory=list)
outcome: str | None = None
async def find_similar_traces(
self,
query: str,
tenant_id: str,
top_k: int = 5,
embedding_kind: str = "thinking_input",
) -> list[SimilarTrace]:
"""Find traces similar to a query."""
query_vector = await self._model.embed(query)
results = await self.store.search(
query_vector=query_vector,
tenant_id=tenant_id,
filter_metadata={"embedding_kind": embedding_kind},
top_k=top_k,
)
return [
SimilarTrace(
trace_id=r.metadata["trace_id"],
similarity_score=r.score,
goal=r.metadata.get("goal", ""),
)
for r in results
]Model Providers
The model_providers.py abstracts embedding model selection:
class EmbeddingModelProvider:
"""Abstraction for embedding model providers."""
SUPPORTED_MODELS = {
"sentence-transformers/all-MiniLM-L6-v2": {
"dimensions": 384,
"type": "local",
},
"sentence-transformers/all-mpnet-base-v2": {
"dimensions": 768,
"type": "local",
},
"text-embedding-ada-002": {
"dimensions": 1536,
"type": "openai",
},
"text-embedding-3-small": {
"dimensions": 1536,
"type": "openai",
},
}
async def embed(self, text: str) -> list[float]:
"""Generate embedding for text."""
...
async def embed_batch(
self, texts: list[str]
) -> list[list[float]]:
"""Generate embeddings for multiple texts."""
...Embedding Quality Metrics
The EmbeddingQualityMetrics service monitors embedding quality:
class EmbeddingQualityMetrics:
"""Monitors embedding quality and coverage."""
async def compute_metrics(
self,
tenant_id: str,
) -> dict[str, Any]:
return {
"total_embeddings": await self._count(tenant_id),
"coverage": await self._coverage(tenant_id),
"avg_similarity": await self._avg_similarity(tenant_id),
"embedding_freshness": await self._freshness(tenant_id),
}| Metric | Description | Target |
|---|---|---|
| Total embeddings | Number of stored embeddings | Growing |
| Coverage | Percentage of entities with embeddings | > 90% |
| Average similarity | Mean pairwise similarity (diversity) | 0.3-0.7 |
| Freshness | Percentage updated in last 24h | > 80% |
Vector Store Configuration
| Parameter | Value |
|---|---|
| Store | Qdrant (primary) or Pinecone |
| Collection | context_graph_embeddings |
| Distance metric | Cosine similarity |
| Dimensions | 128 (structural) / 768 (semantic) |
| Metadata fields | tenant_id, embedding_kind, trace_id, entity_urn |
| Index type | HNSW |