MATIH Platform is in active MVP development. Documentation reflects current implementation status.
12. AI Service
Feedback

Feedback and RLHF

The AI Service implements a comprehensive feedback pipeline that collects signals from multiple sources, processes them through learning loops, and uses them to improve model responses over time. The feedback subsystem supports explicit user ratings, implicit behavioral signals, correction data, and system-generated quality metrics. This section covers the feedback data models, collection mechanisms, processing pipeline, and RLHF integration.


Feedback Architecture

+-------------------+     +-------------------+     +-------------------+
| Explicit Feedback |     | Implicit Signals  |     | System Metrics    |
| (ratings, comments|     | (clicks, dwell    |     | (latency, errors, |
|  corrections)     |     |  time, refinements)|    |  token usage)     |
+--------+----------+     +--------+----------+     +--------+----------+
         |                         |                          |
         v                         v                          v
+----------------------------------------------------------------+
|                    Feedback Collector                           |
|  Normalizes, validates, and routes feedback events             |
+----------------------------+-----------------------------------+
                             |
                             v
+----------------------------+-----------------------------------+
|                    Kafka Topic: feedback-events                 |
+----------------------------+-----------------------------------+
                             |
              +--------------+----------------+
              |              |                |
              v              v                v
    +---------+----+ +------+-------+ +------+--------+
    | Learning     | | Insights     | | Quality       |
    | Pipeline     | | Engine       | | Dashboard     |
    +---------+----+ +------+-------+ +------+--------+
              |              |                |
              v              v                v
    +----------------------------------------------------+
    |  Model Improvement: Prompt tuning, fine-tuning,    |
    |  example selection, routing optimization           |
    +----------------------------------------------------+

Feedback Data Models

The feedback models are defined in feedback/models.py:

FeedbackEvent

class FeedbackEvent(BaseModel):
    """Base feedback event model."""
    id: UUID = Field(default_factory=uuid4)
    type: FeedbackType
    source: FeedbackSource
    severity: FeedbackSeverity = FeedbackSeverity.INFO
    category: FeedbackCategory
    tenant_id: str
    user_id: str | None = None
    session_id: str | None = None
    message_id: str | None = None
    timestamp: datetime = Field(default_factory=datetime.utcnow)
    data: dict[str, Any] = Field(default_factory=dict)
    tags: list[str] = Field(default_factory=list)

FeedbackType

class FeedbackType(str, Enum):
    LOG = "log"                    # System log event
    METRIC = "metric"              # Numeric metric
    INTERACTION = "interaction"    # User interaction
    QUERY = "query"                # Query-level feedback
    ERROR = "error"                # Error report
    PERFORMANCE = "performance"    # Performance measurement
    QUALITY = "quality"            # Quality assessment
    USER_EXPLICIT = "user_explicit"  # Explicit user feedback
    EXPLICIT = "explicit"          # Direct rating/comment
    IMPLICIT = "implicit"          # Behavioral signal

FeedbackSource

class FeedbackSource(str, Enum):
    # System sources
    APPLICATION_LOG = "application_log"
    SYSTEM_METRIC = "system_metric"
    TRACE = "trace"
 
    # User sources
    USER_RATING = "user_rating"
    USER_COMMENT = "user_comment"
    USER_CORRECTION = "user_correction"
 
    # Agent sources
    QUERY_RESULT = "query_result"
    MODEL_RESPONSE = "model_response"
    PIPELINE_OUTPUT = "pipeline_output"
 
    # External sources
    API_RESPONSE = "api_response"
    DATABASE_QUERY = "database_query"
    EXTERNAL_SERVICE = "external_service"

FeedbackCategory

class FeedbackCategory(str, Enum):
    ACCURACY = "accuracy"              # Response correctness
    LATENCY = "latency"                # Response speed
    COST = "cost"                      # Token/compute cost
    ERROR = "error"                    # Error occurrence
    USER_SATISFACTION = "user_satisfaction"  # User happiness
    QUALITY = "quality"                # Overall quality
    SECURITY = "security"             # Security concern
    COMPLIANCE = "compliance"          # Compliance issue

Feedback Collection

Explicit Feedback

Users provide explicit feedback through the UI:

# Thumbs up/down rating
{
    "type": "explicit",
    "source": "user_rating",
    "category": "accuracy",
    "data": {
        "rating": "positive",        # positive, negative
        "message_id": "msg-123",
        "query": "Show total sales",
        "response": "SELECT SUM(amount)...",
    }
}
 
# Text comment
{
    "type": "explicit",
    "source": "user_comment",
    "category": "quality",
    "data": {
        "comment": "The chart should show percentages, not raw numbers",
        "message_id": "msg-123",
    }
}
 
# SQL correction
{
    "type": "explicit",
    "source": "user_correction",
    "category": "accuracy",
    "data": {
        "original_sql": "SELECT SUM(amount) FROM orders",
        "corrected_sql": "SELECT SUM(amount) FROM orders WHERE status = 'completed'",
        "explanation": "Should only count completed orders",
    }
}

Implicit Feedback

Behavioral signals collected automatically:

SignalIndicatorCollection Method
Query refinementUser refines the questionDetected by REFINE intent
Result interactionUser clicks/expands resultsFrontend event tracking
Dwell timeTime spent viewing responseFrontend timer
Follow-up questionsUser asks for more detailSession analysis
Export actionUser exports resultsExport event tracking
Dashboard additionUser adds to dashboardDashboard event tracking
AbandonmentUser leaves without interactionSession timeout analysis

System Metrics

Automatically collected system performance data:

# Performance feedback
{
    "type": "performance",
    "source": "system_metric",
    "category": "latency",
    "data": {
        "endpoint": "/api/v1/text-to-sql",
        "latency_ms": 1250,
        "tokens_used": 3400,
        "llm_provider": "openai",
        "model": "gpt-4",
    }
}

Decision Feedback

The decision_feedback.py module tracks decisions made by the agent system:

class DecisionFeedback:
    """Tracks agent decision quality."""
 
    async def record_decision(
        self,
        session_id: str,
        tenant_id: str,
        decision_type: str,          # routing, sql_generation, visualization
        decision_data: dict[str, Any],
        outcome: str,                 # success, failure, correction
        confidence: float,
    ) -> None:
        """Record an agent decision and its outcome."""
        event = FeedbackEvent(
            type=FeedbackType.QUERY,
            source=FeedbackSource.MODEL_RESPONSE,
            category=FeedbackCategory.ACCURACY,
            tenant_id=tenant_id,
            session_id=session_id,
            data={
                "decision_type": decision_type,
                "decision_data": decision_data,
                "outcome": outcome,
                "confidence": confidence,
            },
        )
        await self._collector.collect(event)

Decision Types Tracked

DecisionData RecordedOutcome Signals
Intent routingClassified intent, confidenceCorrect route vs. user correction
SQL generationGenerated SQL, confidenceExecution success vs. error
Table selectionSelected tablesTables actually needed
Chart typeRecommended chartUser acceptance or override
Analysis typeStatistical methodInsight usefulness rating

Feedback Pipeline

The feedback pipeline processes events through multiple stages:

class FeedbackPipeline:
    """Multi-stage feedback processing pipeline."""
 
    async def process(self, event: FeedbackEvent) -> None:
        """Process a feedback event through the pipeline."""
 
        # Stage 1: Validation
        validated = await self._validate(event)
        if not validated:
            return
 
        # Stage 2: Enrichment
        enriched = await self._enrich(event)
 
        # Stage 3: Classification
        classified = await self._classify(enriched)
 
        # Stage 4: Storage
        await self._store(classified)
 
        # Stage 5: Routing
        await self._route(classified)

Pipeline Stages

StagePurposeImplementation
ValidationSchema validation, deduplicationPydantic model validation
EnrichmentAdd session context, user profileJoin with session/user data
ClassificationCategorize and scoreRule-based + ML classification
StoragePersist for analysisPostgreSQL + Kafka publish
RoutingRoute to learning systemsTopic-based routing

Kafka Integration

Feedback events are published to Kafka for downstream processing:

# feedback/integration/kafka/config.py
FEEDBACK_TOPICS = {
    "feedback-events": "All feedback events",
    "feedback-ratings": "User ratings only",
    "feedback-corrections": "SQL corrections",
    "feedback-metrics": "System performance metrics",
}
TopicPartition StrategyConsumers
feedback-eventsBy tenant_idAnalytics, learning pipeline
feedback-ratingsBy session_idQuality dashboard
feedback-correctionsBy tenant_idSQL fine-tuning pipeline
feedback-metricsBy endpointObservability stack

Learning Pipeline

The learning pipeline in feedback/learning/ uses feedback to improve system performance:

Prompt Tuning

Feedback drives prompt optimization:

class FeedbackDrivenPromptTuner:
    """Tunes prompts based on feedback signals."""
 
    async def analyze_and_tune(
        self,
        tenant_id: str,
        window_days: int = 7,
    ) -> list[PromptUpdate]:
        """Analyze recent feedback and suggest prompt updates."""
 
        # Collect negative feedback patterns
        negative = await self._get_negative_feedback(
            tenant_id, days=window_days
        )
 
        # Identify common failure patterns
        patterns = self._identify_patterns(negative)
 
        # Generate prompt improvements
        updates = []
        for pattern in patterns:
            improvement = await self._generate_improvement(pattern)
            updates.append(improvement)
 
        return updates

Few-Shot Example Selection

Corrections are used to build better few-shot examples:

class FewShotSelector:
    """Selects few-shot examples based on feedback."""
 
    async def select_examples(
        self,
        query: str,
        tenant_id: str,
        k: int = 3,
    ) -> list[FewShotExample]:
        """Select the best few-shot examples for a query."""
 
        # Get positively-rated query pairs
        positive_pairs = await self._get_positive_pairs(tenant_id)
 
        # Find most similar to current query
        similar = self._find_similar(query, positive_pairs, k=k)
 
        return [
            FewShotExample(
                input_text=pair["question"],
                output_text=pair["sql"],
                explanation=pair.get("explanation"),
            )
            for pair in similar
        ]

RLHF Integration

The system supports reinforcement learning from human feedback:

RLHF ComponentImplementation
Reward signalUser ratings mapped to numeric rewards
Preference pairsPositive vs. negative response pairs
Training dataCorrections formatted as preference data
EvaluationA/B testing of updated vs. baseline prompts
DeploymentGradual rollout via prompt A/B testing

Feedback Insights

The feedback/insights/ module generates analytics from collected feedback:

class FeedbackInsights:
    """Generate insights from feedback data."""
 
    async def get_quality_report(
        self,
        tenant_id: str,
        period_days: int = 30,
    ) -> dict[str, Any]:
        return {
            "overall_satisfaction": await self._satisfaction_score(
                tenant_id, period_days
            ),
            "accuracy_rate": await self._accuracy_rate(
                tenant_id, period_days
            ),
            "top_failure_patterns": await self._failure_patterns(
                tenant_id, period_days
            ),
            "improvement_trend": await self._improvement_trend(
                tenant_id, period_days
            ),
            "most_corrected_queries": await self._most_corrected(
                tenant_id, period_days
            ),
        }

Quality Metrics Dashboard

MetricCalculationTarget
Satisfaction scorePositive ratings / total ratings> 85%
SQL accuracyExecuted without error / total generated> 90%
Correction rateCorrections / total responsesunder 10%
Refinement rateRefinements / total queriesunder 20%
Abandonment rateAbandoned sessions / total sessionsunder 15%

API Endpoints

POST /api/v1/feedback                    # Submit feedback event
GET  /api/v1/feedback/insights/{tenant}  # Get feedback insights
GET  /api/v1/feedback/quality/{tenant}   # Get quality metrics
POST /api/v1/feedback/correction         # Submit SQL correction
GET  /api/v1/feedback/decisions/{session} # Get decision feedback