RAG System
The AI Service implements an advanced Retrieval-Augmented Generation (RAG) pipeline that augments LLM responses with relevant context from vector stores, knowledge bases, and schema metadata. The implementation in rag/advanced_rag.py supports hybrid search, query expansion, multi-hop reasoning, self-reflection, and source attribution. This section covers the architecture, retrieval strategies, and generation approaches used in the MATIH RAG system.
Architecture Overview
The RAG pipeline follows a retrieve-then-generate pattern with multiple enhancement stages:
User Query
|
v
+--------------------+
| Query Processor | <-- Expansion, rewriting, HyDE
+--------------------+
|
v (multiple query variations)
+--------------------+
| Retrieval Engine | <-- Dense + Sparse hybrid search
+--------------------+
|
v (raw results)
+--------------------+
| Reranker | <-- Cross-encoder reranking
+--------------------+
|
v (ranked results)
+--------------------+
| Context Assembler | <-- Token budget, deduplication
+--------------------+
|
v (assembled context)
+--------------------+
| Generator | <-- LLM with chain-of-thought
+--------------------+
|
v (draft response)
+--------------------+
| Self-Reflector | <-- Verify, correct, cite
+--------------------+
|
v
Final RAG ResponseConfiguration
The RAG pipeline is configured through the RAGConfig dataclass:
@dataclass
class RAGConfig:
"""Configuration for RAG pipeline."""
retrieval_strategy: RetrievalStrategy = RetrievalStrategy.HYBRID
generation_strategy: GenerationStrategy = GenerationStrategy.CHAIN_OF_THOUGHT
top_k: int = 5
min_relevance_score: float = 0.3
max_context_tokens: int = 4000
enable_reranking: bool = True
enable_query_expansion: bool = True
enable_self_reflection: bool = True
enable_citation: bool = True
max_reasoning_steps: int = 5
temperature: float = 0.1
model: str = "gpt-4"Retrieval Strategies
The system supports five retrieval strategies, each suited to different query types:
Dense Retrieval
Uses vector similarity search with sentence-transformer embeddings:
class RetrievalStrategy(Enum):
DENSE = "dense" # Vector similarity search
SPARSE = "sparse" # BM25/TF-IDF keyword search
HYBRID = "hybrid" # Combined dense + sparse
MULTI_QUERY = "multi_query" # Multiple query variations
HyDE = "hyde" # Hypothetical document embeddings| Strategy | Best For | Mechanism |
|---|---|---|
DENSE | Semantic similarity | Encode query to vector, cosine similarity against document vectors |
SPARSE | Exact keyword matching | BM25 scoring on tokenized documents |
HYBRID | General purpose (default) | Weighted combination of dense and sparse scores |
MULTI_QUERY | Ambiguous queries | Generate multiple query variations, union results |
HyDE | Abstract questions | Generate hypothetical answer, use it as retrieval query |
Hybrid Search Implementation
The hybrid strategy combines dense (vector) and sparse (keyword) retrieval with configurable weights:
async def hybrid_search(
self,
query: str,
top_k: int = 5,
dense_weight: float = 0.7,
sparse_weight: float = 0.3,
) -> list[RetrievedContext]:
"""Execute hybrid search combining dense and sparse retrieval."""
# Dense retrieval via Qdrant
dense_results = await self._dense_search(query, top_k=top_k * 2)
# Sparse retrieval via BM25
sparse_results = await self._sparse_search(query, top_k=top_k * 2)
# Reciprocal Rank Fusion (RRF)
combined = self._reciprocal_rank_fusion(
dense_results=dense_results,
sparse_results=sparse_results,
dense_weight=dense_weight,
sparse_weight=sparse_weight,
)
return combined[:top_k]Reciprocal Rank Fusion
RRF combines results from multiple retrieval methods:
def _reciprocal_rank_fusion(
self,
dense_results: list[RetrievedContext],
sparse_results: list[RetrievedContext],
dense_weight: float = 0.7,
sparse_weight: float = 0.3,
k: int = 60,
) -> list[RetrievedContext]:
"""Combine results using Reciprocal Rank Fusion."""
scores: dict[str, float] = {}
for rank, result in enumerate(dense_results):
scores[result.source_id] = scores.get(result.source_id, 0) + (
dense_weight / (k + rank + 1)
)
for rank, result in enumerate(sparse_results):
scores[result.source_id] = scores.get(result.source_id, 0) + (
sparse_weight / (k + rank + 1)
)
# Sort by fused score
sorted_ids = sorted(scores, key=scores.get, reverse=True)
...Query Processing
The QueryProcessor enhances user queries before retrieval:
Query Expansion
Generates multiple variations of the query to improve recall:
class QueryProcessor:
"""Processes and expands queries for better retrieval."""
async def expand_query(self, query: str) -> list[str]:
"""Expand query into multiple variations."""
prompt = f"""Generate 3 alternative phrasings of this question.
Keep the same meaning but use different words and structure.
Question: {query}
Alternatives:"""
response = await self.llm_client.chat([
{"role": "user", "content": prompt}
])
alternatives = self._parse_alternatives(response["content"])
return [query] + alternatives # Original + alternativesHypothetical Document Embeddings (HyDE)
For abstract or conceptual questions, HyDE generates a hypothetical answer and uses its embedding for retrieval:
async def hyde_transform(self, query: str) -> str:
"""Generate hypothetical document for HyDE retrieval."""
prompt = f"""Write a short, factual passage that would answer
this question. Include specific details and numbers.
Question: {query}
Passage:"""
response = await self.llm_client.chat([
{"role": "user", "content": prompt}
])
return response["content"]This technique is particularly effective when the user's question uses different vocabulary than the stored documents.
Reranking
After retrieval, a cross-encoder reranker scores each result against the query for more precise relevance ranking:
async def rerank(
self,
query: str,
results: list[RetrievedContext],
top_k: int = 5,
) -> list[RetrievedContext]:
"""Rerank results using cross-encoder model."""
pairs = [(query, result.content) for result in results]
scores = self._reranker_model.predict(pairs)
for result, score in zip(results, scores):
result.relevance_score = float(score)
ranked = sorted(results, key=lambda r: r.relevance_score, reverse=True)
return ranked[:top_k]The reranking step is more computationally expensive than initial retrieval but significantly improves precision by evaluating query-document pairs jointly rather than independently.
Context Assembly
The context assembler builds the final context window for the LLM, respecting token budgets:
async def assemble_context(
self,
results: list[RetrievedContext],
max_tokens: int = 4000,
) -> str:
"""Assemble retrieved results into LLM context."""
context_parts = []
current_tokens = 0
for result in results:
chunk_tokens = self._count_tokens(result.content)
if current_tokens + chunk_tokens > max_tokens:
break
context_parts.append(
f"[Source: {result.source_title}]\n{result.content}"
)
current_tokens += chunk_tokens
return "\n\n---\n\n".join(context_parts)Deduplication
Before assembly, the pipeline deduplicates overlapping chunks:
| Method | Description |
|---|---|
| Exact dedup | Remove identical chunks |
| Semantic dedup | Remove chunks with cosine similarity > 0.95 |
| Source dedup | Keep only the highest-scored chunk per source document |
Generation Strategies
The RAG pipeline supports four generation strategies:
Simple Generation
Single-shot generation with context:
class GenerationStrategy(Enum):
SIMPLE = "simple"
CHAIN_OF_THOUGHT = "cot"
TREE_OF_THOUGHT = "tot"
SELF_REFLECTION = "reflect"Chain-of-Thought (Default)
Step-by-step reasoning with the retrieved context:
async def generate_cot(
self,
query: str,
context: str,
) -> RAGResponse:
"""Generate response using chain-of-thought reasoning."""
prompt = f"""Answer the following question using ONLY the provided context.
Think step by step.
Context:
{context}
Question: {query}
Step-by-step reasoning:
1."""
response = await self._llm.chat([
{"role": "system", "content": "You are a precise data analyst."},
{"role": "user", "content": prompt},
], temperature=0.1)
return RAGResponse(
answer=response["content"],
reasoning_steps=self._extract_steps(response["content"]),
...
)Self-Reflection
Iterative generation with self-correction:
async def generate_with_reflection(
self,
query: str,
context: str,
max_steps: int = 3,
) -> RAGResponse:
"""Generate with self-reflection and correction."""
for step in range(max_steps):
# Generate initial response
draft = await self._generate_draft(query, context)
# Self-reflect: check for hallucinations
reflection = await self._reflect(query, context, draft)
if reflection["is_faithful"]:
return RAGResponse(answer=draft, ...)
# Correct based on reflection
context = self._add_reflection_to_context(
context, reflection["issues"]
)
return RAGResponse(answer=draft, ...)Source Attribution
When citation is enabled, the pipeline tracks which context chunks were used in the response:
async def generate_with_citations(
self,
query: str,
contexts: list[RetrievedContext],
) -> RAGResponse:
"""Generate response with inline citations."""
# Number each context chunk
numbered_context = "\n\n".join(
f"[{i+1}] {ctx.content}" for i, ctx in enumerate(contexts)
)
prompt = f"""Answer using the provided sources. Cite sources as [1], [2], etc.
Sources:
{numbered_context}
Question: {query}"""
response = await self._llm.chat([
{"role": "user", "content": prompt}
])
citations = self._extract_citations(response["content"], contexts)
return RAGResponse(
answer=response["content"],
citations=citations,
contexts=contexts,
)Citation Format
{
"citations": [
{
"reference": "[1]",
"source_id": "doc-123",
"source_title": "Q4 2025 Revenue Report",
"excerpt": "Total revenue increased by 15%...",
"relevance_score": 0.94
}
]
}RAG Response Structure
@dataclass
class RAGResponse:
"""Response from RAG pipeline."""
answer: str # Generated answer
contexts: list[RetrievedContext] # Retrieved contexts used
reasoning_steps: list[str] = field(...) # Chain-of-thought steps
citations: list[dict[str, Any]] = field(...) # Source citations
confidence: float = 0.0 # Overall confidence
tokens_used: int = 0 # Total tokens consumed
latency_ms: int = 0 # End-to-end latency
metadata: dict[str, Any] = field(...) # Additional metadataVector Store Integration
The RAG system uses Qdrant as its primary vector store:
Qdrant Configuration
| Parameter | Value |
|---|---|
| Vector dimensions | 768 (sentence-transformers default) |
| Distance metric | Cosine similarity |
| Index type | HNSW |
| Payload filtering | tenant_id, knowledge_base_id, doc_type |
| Collection per tenant | No (shared collection with payload filter) |
Embedding Model
The system uses sentence-transformers for generating embeddings:
from sentence_transformers import SentenceTransformer
model = SentenceTransformer("all-MiniLM-L6-v2")
embeddings = model.encode(texts, normalize_embeddings=True)| Model | Dimensions | Speed | Quality |
|---|---|---|---|
all-MiniLM-L6-v2 | 384 | Fast | Good |
all-mpnet-base-v2 | 768 | Medium | Better |
instructor-large | 768 | Slow | Best |
Multi-Hop Reasoning
For complex questions requiring information from multiple sources, the RAG pipeline supports multi-hop reasoning:
async def multi_hop_retrieve(
self,
query: str,
max_hops: int = 3,
) -> list[RetrievedContext]:
"""Retrieve context through multiple reasoning hops."""
all_contexts = []
current_query = query
for hop in range(max_hops):
# Retrieve for current query
results = await self._retrieve(current_query)
all_contexts.extend(results)
# Generate follow-up query based on what we found
follow_up = await self._generate_follow_up(
original_query=query,
current_query=current_query,
found_contexts=results,
)
if not follow_up or follow_up == current_query:
break # No new information needed
current_query = follow_up
return self._deduplicate(all_contexts)This approach is useful for questions like "What was the revenue impact of the product launched by the team that won the innovation award?" which requires chaining information from multiple sources.
Performance Optimization
| Optimization | Implementation |
|---|---|
| Embedding cache | Cache query embeddings for repeated questions |
| Result cache | Cache retrieval results with TTL |
| Batch embedding | Batch multiple queries for efficient GPU utilization |
| Async retrieval | Concurrent dense + sparse retrieval |
| Token counting | Fast tokenizer for accurate budget management |
| Pre-filtering | Qdrant payload filters before vector search |