Feedback and RLHF
The AI Service implements a comprehensive feedback pipeline that collects signals from multiple sources, processes them through learning loops, and uses them to improve model responses over time. The feedback subsystem supports explicit user ratings, implicit behavioral signals, correction data, and system-generated quality metrics. This section covers the feedback data models, collection mechanisms, processing pipeline, and RLHF integration.
Feedback Architecture
+-------------------+ +-------------------+ +-------------------+
| Explicit Feedback | | Implicit Signals | | System Metrics |
| (ratings, comments| | (clicks, dwell | | (latency, errors, |
| corrections) | | time, refinements)| | token usage) |
+--------+----------+ +--------+----------+ +--------+----------+
| | |
v v v
+----------------------------------------------------------------+
| Feedback Collector |
| Normalizes, validates, and routes feedback events |
+----------------------------+-----------------------------------+
|
v
+----------------------------+-----------------------------------+
| Kafka Topic: feedback-events |
+----------------------------+-----------------------------------+
|
+--------------+----------------+
| | |
v v v
+---------+----+ +------+-------+ +------+--------+
| Learning | | Insights | | Quality |
| Pipeline | | Engine | | Dashboard |
+---------+----+ +------+-------+ +------+--------+
| | |
v v v
+----------------------------------------------------+
| Model Improvement: Prompt tuning, fine-tuning, |
| example selection, routing optimization |
+----------------------------------------------------+Feedback Data Models
The feedback models are defined in feedback/models.py:
FeedbackEvent
class FeedbackEvent(BaseModel):
"""Base feedback event model."""
id: UUID = Field(default_factory=uuid4)
type: FeedbackType
source: FeedbackSource
severity: FeedbackSeverity = FeedbackSeverity.INFO
category: FeedbackCategory
tenant_id: str
user_id: str | None = None
session_id: str | None = None
message_id: str | None = None
timestamp: datetime = Field(default_factory=datetime.utcnow)
data: dict[str, Any] = Field(default_factory=dict)
tags: list[str] = Field(default_factory=list)FeedbackType
class FeedbackType(str, Enum):
LOG = "log" # System log event
METRIC = "metric" # Numeric metric
INTERACTION = "interaction" # User interaction
QUERY = "query" # Query-level feedback
ERROR = "error" # Error report
PERFORMANCE = "performance" # Performance measurement
QUALITY = "quality" # Quality assessment
USER_EXPLICIT = "user_explicit" # Explicit user feedback
EXPLICIT = "explicit" # Direct rating/comment
IMPLICIT = "implicit" # Behavioral signalFeedbackSource
class FeedbackSource(str, Enum):
# System sources
APPLICATION_LOG = "application_log"
SYSTEM_METRIC = "system_metric"
TRACE = "trace"
# User sources
USER_RATING = "user_rating"
USER_COMMENT = "user_comment"
USER_CORRECTION = "user_correction"
# Agent sources
QUERY_RESULT = "query_result"
MODEL_RESPONSE = "model_response"
PIPELINE_OUTPUT = "pipeline_output"
# External sources
API_RESPONSE = "api_response"
DATABASE_QUERY = "database_query"
EXTERNAL_SERVICE = "external_service"FeedbackCategory
class FeedbackCategory(str, Enum):
ACCURACY = "accuracy" # Response correctness
LATENCY = "latency" # Response speed
COST = "cost" # Token/compute cost
ERROR = "error" # Error occurrence
USER_SATISFACTION = "user_satisfaction" # User happiness
QUALITY = "quality" # Overall quality
SECURITY = "security" # Security concern
COMPLIANCE = "compliance" # Compliance issueFeedback Collection
Explicit Feedback
Users provide explicit feedback through the UI:
# Thumbs up/down rating
{
"type": "explicit",
"source": "user_rating",
"category": "accuracy",
"data": {
"rating": "positive", # positive, negative
"message_id": "msg-123",
"query": "Show total sales",
"response": "SELECT SUM(amount)...",
}
}
# Text comment
{
"type": "explicit",
"source": "user_comment",
"category": "quality",
"data": {
"comment": "The chart should show percentages, not raw numbers",
"message_id": "msg-123",
}
}
# SQL correction
{
"type": "explicit",
"source": "user_correction",
"category": "accuracy",
"data": {
"original_sql": "SELECT SUM(amount) FROM orders",
"corrected_sql": "SELECT SUM(amount) FROM orders WHERE status = 'completed'",
"explanation": "Should only count completed orders",
}
}Implicit Feedback
Behavioral signals collected automatically:
| Signal | Indicator | Collection Method |
|---|---|---|
| Query refinement | User refines the question | Detected by REFINE intent |
| Result interaction | User clicks/expands results | Frontend event tracking |
| Dwell time | Time spent viewing response | Frontend timer |
| Follow-up questions | User asks for more detail | Session analysis |
| Export action | User exports results | Export event tracking |
| Dashboard addition | User adds to dashboard | Dashboard event tracking |
| Abandonment | User leaves without interaction | Session timeout analysis |
System Metrics
Automatically collected system performance data:
# Performance feedback
{
"type": "performance",
"source": "system_metric",
"category": "latency",
"data": {
"endpoint": "/api/v1/text-to-sql",
"latency_ms": 1250,
"tokens_used": 3400,
"llm_provider": "openai",
"model": "gpt-4",
}
}Decision Feedback
The decision_feedback.py module tracks decisions made by the agent system:
class DecisionFeedback:
"""Tracks agent decision quality."""
async def record_decision(
self,
session_id: str,
tenant_id: str,
decision_type: str, # routing, sql_generation, visualization
decision_data: dict[str, Any],
outcome: str, # success, failure, correction
confidence: float,
) -> None:
"""Record an agent decision and its outcome."""
event = FeedbackEvent(
type=FeedbackType.QUERY,
source=FeedbackSource.MODEL_RESPONSE,
category=FeedbackCategory.ACCURACY,
tenant_id=tenant_id,
session_id=session_id,
data={
"decision_type": decision_type,
"decision_data": decision_data,
"outcome": outcome,
"confidence": confidence,
},
)
await self._collector.collect(event)Decision Types Tracked
| Decision | Data Recorded | Outcome Signals |
|---|---|---|
| Intent routing | Classified intent, confidence | Correct route vs. user correction |
| SQL generation | Generated SQL, confidence | Execution success vs. error |
| Table selection | Selected tables | Tables actually needed |
| Chart type | Recommended chart | User acceptance or override |
| Analysis type | Statistical method | Insight usefulness rating |
Feedback Pipeline
The feedback pipeline processes events through multiple stages:
class FeedbackPipeline:
"""Multi-stage feedback processing pipeline."""
async def process(self, event: FeedbackEvent) -> None:
"""Process a feedback event through the pipeline."""
# Stage 1: Validation
validated = await self._validate(event)
if not validated:
return
# Stage 2: Enrichment
enriched = await self._enrich(event)
# Stage 3: Classification
classified = await self._classify(enriched)
# Stage 4: Storage
await self._store(classified)
# Stage 5: Routing
await self._route(classified)Pipeline Stages
| Stage | Purpose | Implementation |
|---|---|---|
| Validation | Schema validation, deduplication | Pydantic model validation |
| Enrichment | Add session context, user profile | Join with session/user data |
| Classification | Categorize and score | Rule-based + ML classification |
| Storage | Persist for analysis | PostgreSQL + Kafka publish |
| Routing | Route to learning systems | Topic-based routing |
Kafka Integration
Feedback events are published to Kafka for downstream processing:
# feedback/integration/kafka/config.py
FEEDBACK_TOPICS = {
"feedback-events": "All feedback events",
"feedback-ratings": "User ratings only",
"feedback-corrections": "SQL corrections",
"feedback-metrics": "System performance metrics",
}| Topic | Partition Strategy | Consumers |
|---|---|---|
feedback-events | By tenant_id | Analytics, learning pipeline |
feedback-ratings | By session_id | Quality dashboard |
feedback-corrections | By tenant_id | SQL fine-tuning pipeline |
feedback-metrics | By endpoint | Observability stack |
Learning Pipeline
The learning pipeline in feedback/learning/ uses feedback to improve system performance:
Prompt Tuning
Feedback drives prompt optimization:
class FeedbackDrivenPromptTuner:
"""Tunes prompts based on feedback signals."""
async def analyze_and_tune(
self,
tenant_id: str,
window_days: int = 7,
) -> list[PromptUpdate]:
"""Analyze recent feedback and suggest prompt updates."""
# Collect negative feedback patterns
negative = await self._get_negative_feedback(
tenant_id, days=window_days
)
# Identify common failure patterns
patterns = self._identify_patterns(negative)
# Generate prompt improvements
updates = []
for pattern in patterns:
improvement = await self._generate_improvement(pattern)
updates.append(improvement)
return updatesFew-Shot Example Selection
Corrections are used to build better few-shot examples:
class FewShotSelector:
"""Selects few-shot examples based on feedback."""
async def select_examples(
self,
query: str,
tenant_id: str,
k: int = 3,
) -> list[FewShotExample]:
"""Select the best few-shot examples for a query."""
# Get positively-rated query pairs
positive_pairs = await self._get_positive_pairs(tenant_id)
# Find most similar to current query
similar = self._find_similar(query, positive_pairs, k=k)
return [
FewShotExample(
input_text=pair["question"],
output_text=pair["sql"],
explanation=pair.get("explanation"),
)
for pair in similar
]RLHF Integration
The system supports reinforcement learning from human feedback:
| RLHF Component | Implementation |
|---|---|
| Reward signal | User ratings mapped to numeric rewards |
| Preference pairs | Positive vs. negative response pairs |
| Training data | Corrections formatted as preference data |
| Evaluation | A/B testing of updated vs. baseline prompts |
| Deployment | Gradual rollout via prompt A/B testing |
Feedback Insights
The feedback/insights/ module generates analytics from collected feedback:
class FeedbackInsights:
"""Generate insights from feedback data."""
async def get_quality_report(
self,
tenant_id: str,
period_days: int = 30,
) -> dict[str, Any]:
return {
"overall_satisfaction": await self._satisfaction_score(
tenant_id, period_days
),
"accuracy_rate": await self._accuracy_rate(
tenant_id, period_days
),
"top_failure_patterns": await self._failure_patterns(
tenant_id, period_days
),
"improvement_trend": await self._improvement_trend(
tenant_id, period_days
),
"most_corrected_queries": await self._most_corrected(
tenant_id, period_days
),
}Quality Metrics Dashboard
| Metric | Calculation | Target |
|---|---|---|
| Satisfaction score | Positive ratings / total ratings | > 85% |
| SQL accuracy | Executed without error / total generated | > 90% |
| Correction rate | Corrections / total responses | under 10% |
| Refinement rate | Refinements / total queries | under 20% |
| Abandonment rate | Abandoned sessions / total sessions | under 15% |
API Endpoints
POST /api/v1/feedback # Submit feedback event
GET /api/v1/feedback/insights/{tenant} # Get feedback insights
GET /api/v1/feedback/quality/{tenant} # Get quality metrics
POST /api/v1/feedback/correction # Submit SQL correction
GET /api/v1/feedback/decisions/{session} # Get decision feedback