MATIH Platform is in active MVP development. Documentation reflects current implementation status.
12. AI Service
Rag System

RAG System

The AI Service implements an advanced Retrieval-Augmented Generation (RAG) pipeline that augments LLM responses with relevant context from vector stores, knowledge bases, and schema metadata. The implementation in rag/advanced_rag.py supports hybrid search, query expansion, multi-hop reasoning, self-reflection, and source attribution. This section covers the architecture, retrieval strategies, and generation approaches used in the MATIH RAG system.


Architecture Overview

The RAG pipeline follows a retrieve-then-generate pattern with multiple enhancement stages:

User Query
    |
    v
+--------------------+
| Query Processor    |  <-- Expansion, rewriting, HyDE
+--------------------+
    |
    v (multiple query variations)
+--------------------+
| Retrieval Engine   |  <-- Dense + Sparse hybrid search
+--------------------+
    |
    v (raw results)
+--------------------+
| Reranker           |  <-- Cross-encoder reranking
+--------------------+
    |
    v (ranked results)
+--------------------+
| Context Assembler  |  <-- Token budget, deduplication
+--------------------+
    |
    v (assembled context)
+--------------------+
| Generator          |  <-- LLM with chain-of-thought
+--------------------+
    |
    v (draft response)
+--------------------+
| Self-Reflector     |  <-- Verify, correct, cite
+--------------------+
    |
    v
Final RAG Response

Configuration

The RAG pipeline is configured through the RAGConfig dataclass:

@dataclass
class RAGConfig:
    """Configuration for RAG pipeline."""
    retrieval_strategy: RetrievalStrategy = RetrievalStrategy.HYBRID
    generation_strategy: GenerationStrategy = GenerationStrategy.CHAIN_OF_THOUGHT
    top_k: int = 5
    min_relevance_score: float = 0.3
    max_context_tokens: int = 4000
    enable_reranking: bool = True
    enable_query_expansion: bool = True
    enable_self_reflection: bool = True
    enable_citation: bool = True
    max_reasoning_steps: int = 5
    temperature: float = 0.1
    model: str = "gpt-4"

Retrieval Strategies

The system supports five retrieval strategies, each suited to different query types:

Dense Retrieval

Uses vector similarity search with sentence-transformer embeddings:

class RetrievalStrategy(Enum):
    DENSE = "dense"           # Vector similarity search
    SPARSE = "sparse"         # BM25/TF-IDF keyword search
    HYBRID = "hybrid"         # Combined dense + sparse
    MULTI_QUERY = "multi_query"  # Multiple query variations
    HyDE = "hyde"             # Hypothetical document embeddings
StrategyBest ForMechanism
DENSESemantic similarityEncode query to vector, cosine similarity against document vectors
SPARSEExact keyword matchingBM25 scoring on tokenized documents
HYBRIDGeneral purpose (default)Weighted combination of dense and sparse scores
MULTI_QUERYAmbiguous queriesGenerate multiple query variations, union results
HyDEAbstract questionsGenerate hypothetical answer, use it as retrieval query

Hybrid Search Implementation

The hybrid strategy combines dense (vector) and sparse (keyword) retrieval with configurable weights:

async def hybrid_search(
    self,
    query: str,
    top_k: int = 5,
    dense_weight: float = 0.7,
    sparse_weight: float = 0.3,
) -> list[RetrievedContext]:
    """Execute hybrid search combining dense and sparse retrieval."""
 
    # Dense retrieval via Qdrant
    dense_results = await self._dense_search(query, top_k=top_k * 2)
 
    # Sparse retrieval via BM25
    sparse_results = await self._sparse_search(query, top_k=top_k * 2)
 
    # Reciprocal Rank Fusion (RRF)
    combined = self._reciprocal_rank_fusion(
        dense_results=dense_results,
        sparse_results=sparse_results,
        dense_weight=dense_weight,
        sparse_weight=sparse_weight,
    )
 
    return combined[:top_k]

Reciprocal Rank Fusion

RRF combines results from multiple retrieval methods:

def _reciprocal_rank_fusion(
    self,
    dense_results: list[RetrievedContext],
    sparse_results: list[RetrievedContext],
    dense_weight: float = 0.7,
    sparse_weight: float = 0.3,
    k: int = 60,
) -> list[RetrievedContext]:
    """Combine results using Reciprocal Rank Fusion."""
    scores: dict[str, float] = {}
 
    for rank, result in enumerate(dense_results):
        scores[result.source_id] = scores.get(result.source_id, 0) + (
            dense_weight / (k + rank + 1)
        )
 
    for rank, result in enumerate(sparse_results):
        scores[result.source_id] = scores.get(result.source_id, 0) + (
            sparse_weight / (k + rank + 1)
        )
 
    # Sort by fused score
    sorted_ids = sorted(scores, key=scores.get, reverse=True)
    ...

Query Processing

The QueryProcessor enhances user queries before retrieval:

Query Expansion

Generates multiple variations of the query to improve recall:

class QueryProcessor:
    """Processes and expands queries for better retrieval."""
 
    async def expand_query(self, query: str) -> list[str]:
        """Expand query into multiple variations."""
        prompt = f"""Generate 3 alternative phrasings of this question.
        Keep the same meaning but use different words and structure.
 
        Question: {query}
 
        Alternatives:"""
 
        response = await self.llm_client.chat([
            {"role": "user", "content": prompt}
        ])
 
        alternatives = self._parse_alternatives(response["content"])
        return [query] + alternatives  # Original + alternatives

Hypothetical Document Embeddings (HyDE)

For abstract or conceptual questions, HyDE generates a hypothetical answer and uses its embedding for retrieval:

async def hyde_transform(self, query: str) -> str:
    """Generate hypothetical document for HyDE retrieval."""
    prompt = f"""Write a short, factual passage that would answer
    this question. Include specific details and numbers.
 
    Question: {query}
 
    Passage:"""
 
    response = await self.llm_client.chat([
        {"role": "user", "content": prompt}
    ])
 
    return response["content"]

This technique is particularly effective when the user's question uses different vocabulary than the stored documents.


Reranking

After retrieval, a cross-encoder reranker scores each result against the query for more precise relevance ranking:

async def rerank(
    self,
    query: str,
    results: list[RetrievedContext],
    top_k: int = 5,
) -> list[RetrievedContext]:
    """Rerank results using cross-encoder model."""
    pairs = [(query, result.content) for result in results]
    scores = self._reranker_model.predict(pairs)
 
    for result, score in zip(results, scores):
        result.relevance_score = float(score)
 
    ranked = sorted(results, key=lambda r: r.relevance_score, reverse=True)
    return ranked[:top_k]

The reranking step is more computationally expensive than initial retrieval but significantly improves precision by evaluating query-document pairs jointly rather than independently.


Context Assembly

The context assembler builds the final context window for the LLM, respecting token budgets:

async def assemble_context(
    self,
    results: list[RetrievedContext],
    max_tokens: int = 4000,
) -> str:
    """Assemble retrieved results into LLM context."""
    context_parts = []
    current_tokens = 0
 
    for result in results:
        chunk_tokens = self._count_tokens(result.content)
        if current_tokens + chunk_tokens > max_tokens:
            break
 
        context_parts.append(
            f"[Source: {result.source_title}]\n{result.content}"
        )
        current_tokens += chunk_tokens
 
    return "\n\n---\n\n".join(context_parts)

Deduplication

Before assembly, the pipeline deduplicates overlapping chunks:

MethodDescription
Exact dedupRemove identical chunks
Semantic dedupRemove chunks with cosine similarity > 0.95
Source dedupKeep only the highest-scored chunk per source document

Generation Strategies

The RAG pipeline supports four generation strategies:

Simple Generation

Single-shot generation with context:

class GenerationStrategy(Enum):
    SIMPLE = "simple"
    CHAIN_OF_THOUGHT = "cot"
    TREE_OF_THOUGHT = "tot"
    SELF_REFLECTION = "reflect"

Chain-of-Thought (Default)

Step-by-step reasoning with the retrieved context:

async def generate_cot(
    self,
    query: str,
    context: str,
) -> RAGResponse:
    """Generate response using chain-of-thought reasoning."""
    prompt = f"""Answer the following question using ONLY the provided context.
    Think step by step.
 
    Context:
    {context}
 
    Question: {query}
 
    Step-by-step reasoning:
    1."""
 
    response = await self._llm.chat([
        {"role": "system", "content": "You are a precise data analyst."},
        {"role": "user", "content": prompt},
    ], temperature=0.1)
 
    return RAGResponse(
        answer=response["content"],
        reasoning_steps=self._extract_steps(response["content"]),
        ...
    )

Self-Reflection

Iterative generation with self-correction:

async def generate_with_reflection(
    self,
    query: str,
    context: str,
    max_steps: int = 3,
) -> RAGResponse:
    """Generate with self-reflection and correction."""
    for step in range(max_steps):
        # Generate initial response
        draft = await self._generate_draft(query, context)
 
        # Self-reflect: check for hallucinations
        reflection = await self._reflect(query, context, draft)
 
        if reflection["is_faithful"]:
            return RAGResponse(answer=draft, ...)
 
        # Correct based on reflection
        context = self._add_reflection_to_context(
            context, reflection["issues"]
        )
 
    return RAGResponse(answer=draft, ...)

Source Attribution

When citation is enabled, the pipeline tracks which context chunks were used in the response:

async def generate_with_citations(
    self,
    query: str,
    contexts: list[RetrievedContext],
) -> RAGResponse:
    """Generate response with inline citations."""
    # Number each context chunk
    numbered_context = "\n\n".join(
        f"[{i+1}] {ctx.content}" for i, ctx in enumerate(contexts)
    )
 
    prompt = f"""Answer using the provided sources. Cite sources as [1], [2], etc.
 
    Sources:
    {numbered_context}
 
    Question: {query}"""
 
    response = await self._llm.chat([
        {"role": "user", "content": prompt}
    ])
 
    citations = self._extract_citations(response["content"], contexts)
 
    return RAGResponse(
        answer=response["content"],
        citations=citations,
        contexts=contexts,
    )

Citation Format

{
    "citations": [
        {
            "reference": "[1]",
            "source_id": "doc-123",
            "source_title": "Q4 2025 Revenue Report",
            "excerpt": "Total revenue increased by 15%...",
            "relevance_score": 0.94
        }
    ]
}

RAG Response Structure

@dataclass
class RAGResponse:
    """Response from RAG pipeline."""
    answer: str                                    # Generated answer
    contexts: list[RetrievedContext]                # Retrieved contexts used
    reasoning_steps: list[str] = field(...)         # Chain-of-thought steps
    citations: list[dict[str, Any]] = field(...)    # Source citations
    confidence: float = 0.0                         # Overall confidence
    tokens_used: int = 0                            # Total tokens consumed
    latency_ms: int = 0                             # End-to-end latency
    metadata: dict[str, Any] = field(...)           # Additional metadata

Vector Store Integration

The RAG system uses Qdrant as its primary vector store:

Qdrant Configuration

ParameterValue
Vector dimensions768 (sentence-transformers default)
Distance metricCosine similarity
Index typeHNSW
Payload filteringtenant_id, knowledge_base_id, doc_type
Collection per tenantNo (shared collection with payload filter)

Embedding Model

The system uses sentence-transformers for generating embeddings:

from sentence_transformers import SentenceTransformer
 
model = SentenceTransformer("all-MiniLM-L6-v2")
embeddings = model.encode(texts, normalize_embeddings=True)
ModelDimensionsSpeedQuality
all-MiniLM-L6-v2384FastGood
all-mpnet-base-v2768MediumBetter
instructor-large768SlowBest

Multi-Hop Reasoning

For complex questions requiring information from multiple sources, the RAG pipeline supports multi-hop reasoning:

async def multi_hop_retrieve(
    self,
    query: str,
    max_hops: int = 3,
) -> list[RetrievedContext]:
    """Retrieve context through multiple reasoning hops."""
    all_contexts = []
    current_query = query
 
    for hop in range(max_hops):
        # Retrieve for current query
        results = await self._retrieve(current_query)
        all_contexts.extend(results)
 
        # Generate follow-up query based on what we found
        follow_up = await self._generate_follow_up(
            original_query=query,
            current_query=current_query,
            found_contexts=results,
        )
 
        if not follow_up or follow_up == current_query:
            break  # No new information needed
 
        current_query = follow_up
 
    return self._deduplicate(all_contexts)

This approach is useful for questions like "What was the revenue impact of the product launched by the team that won the innovation award?" which requires chaining information from multiple sources.


Performance Optimization

OptimizationImplementation
Embedding cacheCache query embeddings for repeated questions
Result cacheCache retrieval results with TTL
Batch embeddingBatch multiple queries for efficient GPU utilization
Async retrievalConcurrent dense + sparse retrieval
Token countingFast tokenizer for accurate budget management
Pre-filteringQdrant payload filters before vector search