Personalization Engine
The AI Service includes a personalization engine that learns from user interactions to deliver increasingly tailored experiences. The engine tracks conversation patterns, query preferences, visualization choices, and interaction behaviors to build per-user profiles that influence everything from query suggestions to chart formatting. This section covers the personalization architecture, user profiling, preference learning, and multi-tenant isolation.
Architecture
The personalization subsystem is organized under data-plane/ai-service/src/personalization/:
personalization/
engine.py # PersonalizationEngine - core engine
models.py # User profiles, preferences, interaction models
feedback.py # FeedbackCollector integration
router.py # Personalized routingPersonalizationEngine
class PersonalizationEngine:
"""Core personalization engine that learns from user interactions.
Features:
- Conversation memory management
- User preference learning
- Usage pattern analysis
- Personalized suggestions
- Multi-tenant isolation
"""
def __init__(
self,
redis_client: Any | None = None,
postgres_client: Any | None = None,
kafka_consumer: Any | None = None,
):
self._redis = redis_client
self._postgres = postgres_client
self._kafka = kafka_consumer
self._feedback_collector = FeedbackCollector(
redis_client=redis_client,
)
# In-memory caches
self._user_contexts: dict[str, UserContext] = {}
self._profiles: dict[str, PersonalizationProfile] = {}
self._conversations: dict[str, ConversationMemory] = {}
# Learning parameters
self._learning_rate = 0.1
self._decay_factor = 0.95
self._min_confidence = 0.3
self._max_history_size = 100Data Models
UserContext
@dataclass
class UserContext:
"""Current context for a user."""
user_id: str
tenant_id: str
active_session: str | None = None
recent_queries: list[str] = field(default_factory=list)
frequent_tables: list[str] = field(default_factory=list)
preferred_chart_types: dict[str, int] = field(default_factory=dict)
last_activity: datetime = field(default_factory=datetime.utcnow)PersonalizationProfile
@dataclass
class PersonalizationProfile:
"""Long-term user personalization profile."""
user_id: str
tenant_id: str
preferences: dict[str, UserPreference] = field(default_factory=dict)
interaction_history: list[UserInteraction] = field(default_factory=list)
skill_level: str = "intermediate" # beginner, intermediate, advanced
domain_expertise: list[str] = field(default_factory=list)
communication_style: str = "detailed" # brief, detailed, technical
created_at: datetime = field(default_factory=datetime.utcnow)
updated_at: datetime = field(default_factory=datetime.utcnow)UserPreference
@dataclass
class UserPreference:
"""A single user preference with confidence."""
category: PreferenceCategory
key: str
value: Any
confidence: float # 0.0-1.0, increases with repeated observation
observation_count: int # Number of times this preference was observed
last_updated: datetime
class PreferenceCategory(str, Enum):
VISUALIZATION = "visualization" # Chart type, color, format
QUERY = "query" # SQL style, default filters
DATA = "data" # Preferred tables, columns
PRESENTATION = "presentation" # Response format, detail level
NOTIFICATION = "notification" # Alert preferencesUserInteraction
@dataclass
class UserInteraction:
"""A recorded user interaction for learning."""
id: str
interaction_type: InteractionType
timestamp: datetime
data: dict[str, Any]
feedback: UserFeedback | None = None
class InteractionType(str, Enum):
QUERY = "query" # Asked a question
VISUALIZATION = "visualization" # Interacted with a chart
EXPORT = "export" # Exported data
DASHBOARD = "dashboard" # Modified a dashboard
FEEDBACK = "feedback" # Provided feedback
REFINEMENT = "refinement" # Refined a query
EXPLORATION = "exploration" # Explored schemaConversation Memory
The engine manages per-user conversation memory:
async def start_conversation(
self,
user_id: str,
tenant_id: str,
initial_context: dict | None = None,
) -> ConversationMemory:
"""Start a new conversation for a user."""
conversation = ConversationMemory(
conversation_id=str(uuid.uuid4()),
user_id=user_id,
tenant_id=tenant_id,
context=initial_context or {},
)
self._conversations[conversation.conversation_id] = conversation
# Store in Redis for persistence
if self._redis:
key = f"{self._conversation_prefix}{conversation.conversation_id}"
await self._redis.setex(
key,
86400, # 24 hour TTL
json.dumps(conversation.to_dict()),
)
return conversationPreference Learning
The engine learns preferences from user interactions using exponential moving average:
Learning Algorithm
async def learn_from_interaction(
self,
user_id: str,
tenant_id: str,
interaction: UserInteraction,
) -> None:
"""Update user profile based on a new interaction."""
profile = await self._get_or_create_profile(user_id, tenant_id)
# Extract preference signals
signals = self._extract_signals(interaction)
for signal in signals:
key = f"{signal.category.value}:{signal.key}"
existing = profile.preferences.get(key)
if existing:
# Update with exponential moving average
new_confidence = (
existing.confidence * (1 - self._learning_rate) +
signal.confidence * self._learning_rate
)
existing.value = signal.value
existing.confidence = new_confidence
existing.observation_count += 1
existing.last_updated = datetime.utcnow()
else:
# New preference
profile.preferences[key] = UserPreference(
category=signal.category,
key=signal.key,
value=signal.value,
confidence=signal.confidence * self._learning_rate,
observation_count=1,
last_updated=datetime.utcnow(),
)
# Apply time decay to all preferences
self._apply_decay(profile)
# Persist updated profile
await self._save_profile(profile)Signal Extraction
The engine extracts preference signals from different interaction types:
| Interaction | Signals Extracted |
|---|---|
| Query | Tables used, aggregation patterns, filter patterns |
| Visualization | Chart type preference, color scheme, axis choices |
| Export | Preferred format (CSV, Excel, PDF), column selection |
| Dashboard | Layout preferences, widget sizes, refresh intervals |
| Refinement | What the user corrected (implicit negative signal) |
| Feedback | Direct positive/negative preference signal |
def _extract_signals(
self,
interaction: UserInteraction,
) -> list[UserPreference]:
"""Extract preference signals from an interaction."""
signals = []
if interaction.interaction_type == InteractionType.VISUALIZATION:
chart_type = interaction.data.get("chart_type")
if chart_type:
signals.append(UserPreference(
category=PreferenceCategory.VISUALIZATION,
key="preferred_chart_type",
value=chart_type,
confidence=0.7,
observation_count=1,
last_updated=datetime.utcnow(),
))
elif interaction.interaction_type == InteractionType.QUERY:
tables = interaction.data.get("tables_used", [])
for table in tables:
signals.append(UserPreference(
category=PreferenceCategory.DATA,
key=f"frequent_table:{table}",
value=table,
confidence=0.5,
observation_count=1,
last_updated=datetime.utcnow(),
))
return signalsTime Decay
Preferences decay over time to adapt to changing user behavior:
def _apply_decay(self, profile: PersonalizationProfile) -> None:
"""Apply time decay to preferences."""
now = datetime.utcnow()
for key, pref in list(profile.preferences.items()):
age_days = (now - pref.last_updated).days
if age_days > 0:
decay = self._decay_factor ** age_days
pref.confidence *= decay
# Remove low-confidence preferences
if pref.confidence < self._min_confidence:
del profile.preferences[key]Personalized Suggestions
The engine generates personalized query suggestions:
async def get_suggestions(
self,
user_id: str,
tenant_id: str,
context: str | None = None,
limit: int = 5,
) -> list[dict[str, Any]]:
"""Get personalized query suggestions for a user."""
profile = await self._get_or_create_profile(user_id, tenant_id)
# Get user's frequent tables
frequent_tables = self._get_frequent_tables(profile)
# Get user's common query patterns
common_patterns = self._get_common_patterns(profile)
# Generate suggestions based on:
# 1. Frequently used tables
# 2. Common query patterns
# 3. Similar users' popular queries
# 4. Current context (if provided)
suggestions = await self._generate_suggestions(
frequent_tables=frequent_tables,
common_patterns=common_patterns,
context=context,
limit=limit,
)
return suggestionsSuggestion Types
| Type | Source | Example |
|---|---|---|
| Frequent query | User's past queries | "Show total sales (you ask this weekly)" |
| Related query | Current context | "Also try: sales by product category" |
| Trending query | Organization-wide | "Trending: Q4 budget analysis" |
| New data alert | Schema changes | "New table available: marketing_campaigns" |
Personalized Routing
The router.py module adjusts agent routing based on user preferences:
class PersonalizedRouter:
"""Routes queries with user preference consideration."""
async def route(
self,
query: str,
user_id: str,
tenant_id: str,
) -> dict[str, Any]:
"""Route query considering user preferences."""
profile = await self._engine.get_profile(user_id, tenant_id)
# Adjust routing based on skill level
if profile.skill_level == "beginner":
# Add more explanation, use simpler charts
routing_hints = {
"explanation_level": "detailed",
"chart_complexity": "simple",
}
elif profile.skill_level == "advanced":
# Skip explanations, use advanced charts
routing_hints = {
"explanation_level": "brief",
"chart_complexity": "advanced",
}
# Apply visualization preferences
chart_pref = profile.preferences.get(
"visualization:preferred_chart_type"
)
if chart_pref and chart_pref.confidence > 0.6:
routing_hints["default_chart_type"] = chart_pref.value
return routing_hintsMulti-Tenant Isolation
All personalization data is strictly isolated by tenant:
| Data Type | Isolation Mechanism |
|---|---|
| User profiles | Stored with (tenant_id, user_id) composite key |
| Preferences | Redis keys prefixed with personalization:profile:{tenant_id}:{user_id} |
| Conversations | Redis keys prefixed with personalization:conversation:{conversation_id} |
| Suggestions | Generated using only the tenant's schema and data |
| Cross-user analytics | Aggregated within tenant boundary only |
Storage Architecture
| Data | Primary Store | Cache | TTL |
|---|---|---|---|
| User profiles | PostgreSQL | Redis | 1 hour |
| Active conversations | Redis | In-memory | 24 hours |
| Interaction history | PostgreSQL | None | 90 days retention |
| Preference snapshots | PostgreSQL | Redis | Updated on change |
| Suggestions cache | Redis | In-memory | 15 minutes |
Feedback Collector Integration
The personalization engine integrates with the feedback subsystem:
class FeedbackCollector:
"""Collects user feedback for personalization."""
async def collect_feedback(
self,
user_id: str,
tenant_id: str,
feedback: UserFeedback,
) -> None:
"""Process user feedback for personalization."""
# Create interaction from feedback
interaction = UserInteraction(
id=str(uuid4()),
interaction_type=InteractionType.FEEDBACK,
timestamp=datetime.utcnow(),
data=feedback.to_dict(),
feedback=feedback,
)
# Learn from the feedback
await self._engine.learn_from_interaction(
user_id=user_id,
tenant_id=tenant_id,
interaction=interaction,
)