MATIH Platform is in active MVP development. Documentation reflects current implementation status.
12. AI Service
Personalization

Personalization Engine

The AI Service includes a personalization engine that learns from user interactions to deliver increasingly tailored experiences. The engine tracks conversation patterns, query preferences, visualization choices, and interaction behaviors to build per-user profiles that influence everything from query suggestions to chart formatting. This section covers the personalization architecture, user profiling, preference learning, and multi-tenant isolation.


Architecture

The personalization subsystem is organized under data-plane/ai-service/src/personalization/:

personalization/
  engine.py          # PersonalizationEngine - core engine
  models.py          # User profiles, preferences, interaction models
  feedback.py        # FeedbackCollector integration
  router.py          # Personalized routing

PersonalizationEngine

class PersonalizationEngine:
    """Core personalization engine that learns from user interactions.
 
    Features:
    - Conversation memory management
    - User preference learning
    - Usage pattern analysis
    - Personalized suggestions
    - Multi-tenant isolation
    """
 
    def __init__(
        self,
        redis_client: Any | None = None,
        postgres_client: Any | None = None,
        kafka_consumer: Any | None = None,
    ):
        self._redis = redis_client
        self._postgres = postgres_client
        self._kafka = kafka_consumer
        self._feedback_collector = FeedbackCollector(
            redis_client=redis_client,
        )
 
        # In-memory caches
        self._user_contexts: dict[str, UserContext] = {}
        self._profiles: dict[str, PersonalizationProfile] = {}
        self._conversations: dict[str, ConversationMemory] = {}
 
        # Learning parameters
        self._learning_rate = 0.1
        self._decay_factor = 0.95
        self._min_confidence = 0.3
        self._max_history_size = 100

Data Models

UserContext

@dataclass
class UserContext:
    """Current context for a user."""
    user_id: str
    tenant_id: str
    active_session: str | None = None
    recent_queries: list[str] = field(default_factory=list)
    frequent_tables: list[str] = field(default_factory=list)
    preferred_chart_types: dict[str, int] = field(default_factory=dict)
    last_activity: datetime = field(default_factory=datetime.utcnow)

PersonalizationProfile

@dataclass
class PersonalizationProfile:
    """Long-term user personalization profile."""
    user_id: str
    tenant_id: str
    preferences: dict[str, UserPreference] = field(default_factory=dict)
    interaction_history: list[UserInteraction] = field(default_factory=list)
    skill_level: str = "intermediate"     # beginner, intermediate, advanced
    domain_expertise: list[str] = field(default_factory=list)
    communication_style: str = "detailed"  # brief, detailed, technical
    created_at: datetime = field(default_factory=datetime.utcnow)
    updated_at: datetime = field(default_factory=datetime.utcnow)

UserPreference

@dataclass
class UserPreference:
    """A single user preference with confidence."""
    category: PreferenceCategory
    key: str
    value: Any
    confidence: float        # 0.0-1.0, increases with repeated observation
    observation_count: int   # Number of times this preference was observed
    last_updated: datetime
 
class PreferenceCategory(str, Enum):
    VISUALIZATION = "visualization"   # Chart type, color, format
    QUERY = "query"                   # SQL style, default filters
    DATA = "data"                     # Preferred tables, columns
    PRESENTATION = "presentation"     # Response format, detail level
    NOTIFICATION = "notification"     # Alert preferences

UserInteraction

@dataclass
class UserInteraction:
    """A recorded user interaction for learning."""
    id: str
    interaction_type: InteractionType
    timestamp: datetime
    data: dict[str, Any]
    feedback: UserFeedback | None = None
 
class InteractionType(str, Enum):
    QUERY = "query"                 # Asked a question
    VISUALIZATION = "visualization"  # Interacted with a chart
    EXPORT = "export"               # Exported data
    DASHBOARD = "dashboard"         # Modified a dashboard
    FEEDBACK = "feedback"           # Provided feedback
    REFINEMENT = "refinement"       # Refined a query
    EXPLORATION = "exploration"     # Explored schema

Conversation Memory

The engine manages per-user conversation memory:

async def start_conversation(
    self,
    user_id: str,
    tenant_id: str,
    initial_context: dict | None = None,
) -> ConversationMemory:
    """Start a new conversation for a user."""
    conversation = ConversationMemory(
        conversation_id=str(uuid.uuid4()),
        user_id=user_id,
        tenant_id=tenant_id,
        context=initial_context or {},
    )
 
    self._conversations[conversation.conversation_id] = conversation
 
    # Store in Redis for persistence
    if self._redis:
        key = f"{self._conversation_prefix}{conversation.conversation_id}"
        await self._redis.setex(
            key,
            86400,  # 24 hour TTL
            json.dumps(conversation.to_dict()),
        )
 
    return conversation

Preference Learning

The engine learns preferences from user interactions using exponential moving average:

Learning Algorithm

async def learn_from_interaction(
    self,
    user_id: str,
    tenant_id: str,
    interaction: UserInteraction,
) -> None:
    """Update user profile based on a new interaction."""
    profile = await self._get_or_create_profile(user_id, tenant_id)
 
    # Extract preference signals
    signals = self._extract_signals(interaction)
 
    for signal in signals:
        key = f"{signal.category.value}:{signal.key}"
        existing = profile.preferences.get(key)
 
        if existing:
            # Update with exponential moving average
            new_confidence = (
                existing.confidence * (1 - self._learning_rate) +
                signal.confidence * self._learning_rate
            )
            existing.value = signal.value
            existing.confidence = new_confidence
            existing.observation_count += 1
            existing.last_updated = datetime.utcnow()
        else:
            # New preference
            profile.preferences[key] = UserPreference(
                category=signal.category,
                key=signal.key,
                value=signal.value,
                confidence=signal.confidence * self._learning_rate,
                observation_count=1,
                last_updated=datetime.utcnow(),
            )
 
    # Apply time decay to all preferences
    self._apply_decay(profile)
 
    # Persist updated profile
    await self._save_profile(profile)

Signal Extraction

The engine extracts preference signals from different interaction types:

InteractionSignals Extracted
QueryTables used, aggregation patterns, filter patterns
VisualizationChart type preference, color scheme, axis choices
ExportPreferred format (CSV, Excel, PDF), column selection
DashboardLayout preferences, widget sizes, refresh intervals
RefinementWhat the user corrected (implicit negative signal)
FeedbackDirect positive/negative preference signal
def _extract_signals(
    self,
    interaction: UserInteraction,
) -> list[UserPreference]:
    """Extract preference signals from an interaction."""
    signals = []
 
    if interaction.interaction_type == InteractionType.VISUALIZATION:
        chart_type = interaction.data.get("chart_type")
        if chart_type:
            signals.append(UserPreference(
                category=PreferenceCategory.VISUALIZATION,
                key="preferred_chart_type",
                value=chart_type,
                confidence=0.7,
                observation_count=1,
                last_updated=datetime.utcnow(),
            ))
 
    elif interaction.interaction_type == InteractionType.QUERY:
        tables = interaction.data.get("tables_used", [])
        for table in tables:
            signals.append(UserPreference(
                category=PreferenceCategory.DATA,
                key=f"frequent_table:{table}",
                value=table,
                confidence=0.5,
                observation_count=1,
                last_updated=datetime.utcnow(),
            ))
 
    return signals

Time Decay

Preferences decay over time to adapt to changing user behavior:

def _apply_decay(self, profile: PersonalizationProfile) -> None:
    """Apply time decay to preferences."""
    now = datetime.utcnow()
    for key, pref in list(profile.preferences.items()):
        age_days = (now - pref.last_updated).days
        if age_days > 0:
            decay = self._decay_factor ** age_days
            pref.confidence *= decay
 
            # Remove low-confidence preferences
            if pref.confidence < self._min_confidence:
                del profile.preferences[key]

Personalized Suggestions

The engine generates personalized query suggestions:

async def get_suggestions(
    self,
    user_id: str,
    tenant_id: str,
    context: str | None = None,
    limit: int = 5,
) -> list[dict[str, Any]]:
    """Get personalized query suggestions for a user."""
    profile = await self._get_or_create_profile(user_id, tenant_id)
 
    # Get user's frequent tables
    frequent_tables = self._get_frequent_tables(profile)
 
    # Get user's common query patterns
    common_patterns = self._get_common_patterns(profile)
 
    # Generate suggestions based on:
    # 1. Frequently used tables
    # 2. Common query patterns
    # 3. Similar users' popular queries
    # 4. Current context (if provided)
    suggestions = await self._generate_suggestions(
        frequent_tables=frequent_tables,
        common_patterns=common_patterns,
        context=context,
        limit=limit,
    )
 
    return suggestions

Suggestion Types

TypeSourceExample
Frequent queryUser's past queries"Show total sales (you ask this weekly)"
Related queryCurrent context"Also try: sales by product category"
Trending queryOrganization-wide"Trending: Q4 budget analysis"
New data alertSchema changes"New table available: marketing_campaigns"

Personalized Routing

The router.py module adjusts agent routing based on user preferences:

class PersonalizedRouter:
    """Routes queries with user preference consideration."""
 
    async def route(
        self,
        query: str,
        user_id: str,
        tenant_id: str,
    ) -> dict[str, Any]:
        """Route query considering user preferences."""
        profile = await self._engine.get_profile(user_id, tenant_id)
 
        # Adjust routing based on skill level
        if profile.skill_level == "beginner":
            # Add more explanation, use simpler charts
            routing_hints = {
                "explanation_level": "detailed",
                "chart_complexity": "simple",
            }
        elif profile.skill_level == "advanced":
            # Skip explanations, use advanced charts
            routing_hints = {
                "explanation_level": "brief",
                "chart_complexity": "advanced",
            }
 
        # Apply visualization preferences
        chart_pref = profile.preferences.get(
            "visualization:preferred_chart_type"
        )
        if chart_pref and chart_pref.confidence > 0.6:
            routing_hints["default_chart_type"] = chart_pref.value
 
        return routing_hints

Multi-Tenant Isolation

All personalization data is strictly isolated by tenant:

Data TypeIsolation Mechanism
User profilesStored with (tenant_id, user_id) composite key
PreferencesRedis keys prefixed with personalization:profile:{tenant_id}:{user_id}
ConversationsRedis keys prefixed with personalization:conversation:{conversation_id}
SuggestionsGenerated using only the tenant's schema and data
Cross-user analyticsAggregated within tenant boundary only

Storage Architecture

DataPrimary StoreCacheTTL
User profilesPostgreSQLRedis1 hour
Active conversationsRedisIn-memory24 hours
Interaction historyPostgreSQLNone90 days retention
Preference snapshotsPostgreSQLRedisUpdated on change
Suggestions cacheRedisIn-memory15 minutes

Feedback Collector Integration

The personalization engine integrates with the feedback subsystem:

class FeedbackCollector:
    """Collects user feedback for personalization."""
 
    async def collect_feedback(
        self,
        user_id: str,
        tenant_id: str,
        feedback: UserFeedback,
    ) -> None:
        """Process user feedback for personalization."""
        # Create interaction from feedback
        interaction = UserInteraction(
            id=str(uuid4()),
            interaction_type=InteractionType.FEEDBACK,
            timestamp=datetime.utcnow(),
            data=feedback.to_dict(),
            feedback=feedback,
        )
 
        # Learn from the feedback
        await self._engine.learn_from_interaction(
            user_id=user_id,
            tenant_id=tenant_id,
            interaction=interaction,
        )