MATIH Platform is in active MVP development. Documentation reflects current implementation status.
12. AI Service
Session Management

Session and Context Management

The AI Service maintains conversation sessions that persist across multiple user interactions, enabling multi-turn conversations with context awareness, conversation history, and state management. The session subsystem uses a pluggable storage backend with Redis as the primary store and PostgreSQL as a durable fallback. This section covers the session lifecycle, storage architecture, memory management, context summarization, and privacy controls.


Session Architecture

User Request (with session_id)
    |
    v
+-------------------+
| SessionManager    |  <-- High-level conversation tracking
+-------------------+
    |
    v
+-------------------+
| SessionStore      |  <-- Pluggable storage interface
+-------------------+
    |
    +------- Redis (primary) ------> Fast, TTL-based
    |
    +------- PostgreSQL (fallback) -> Durable, queryable
    |
    +------- InMemory (dev) -------> Development only

Session Data Model

The SessionData model in session/store.py defines the core session structure:

@dataclass
class SessionData:
    """Core session data structure."""
    session_id: str
    tenant_id: str
    user_id: str | None = None
    created_at: datetime = field(default_factory=datetime.utcnow)
    last_active: datetime = field(default_factory=datetime.utcnow)
    status: SessionStatus = SessionStatus.ACTIVE
    metadata: dict[str, Any] = field(default_factory=dict)
 
class SessionStatus(str, Enum):
    ACTIVE = "active"
    EXPIRED = "expired"
    CLOSED = "closed"
    ARCHIVED = "archived"

Conversation Messages

Each session contains a history of conversation messages:

@dataclass
class ConversationMessage:
    """A message in a conversation."""
    id: str
    role: MessageRole          # USER, ASSISTANT, SYSTEM, TOOL
    content: str
    timestamp: datetime
 
    # Optional context fields
    tool_calls: list[dict]     # Tool call requests
    tool_call_id: str | None   # Tool response reference
    tool_name: str | None      # Tool that generated this message
 
    # SQL context
    sql: str | None            # Generated SQL
    sql_executed: bool         # Whether SQL was executed
    sql_result: dict | None    # Query results
 
    # Visualization context
    visualization: dict | None # Chart configuration
    chart_type: str | None     # Chart type
 
    # Analysis context
    insights: list[str]        # Generated insights

This rich message model captures not just the text of each message but also the artifacts produced during processing, enabling the UI to render SQL, charts, and insights inline with the conversation.


SessionStore Interface

The SessionStore abstract base class defines the storage contract:

class SessionStore(ABC):
    """Abstract interface for session storage."""
 
    @abstractmethod
    async def save(self, session: SessionData) -> None:
        """Save or update a session."""
        ...
 
    @abstractmethod
    async def get(self, session_id: str) -> SessionData | None:
        """Get a session by ID."""
        ...
 
    @abstractmethod
    async def delete(self, session_id: str) -> None:
        """Delete a session."""
        ...
 
    @abstractmethod
    async def list_sessions(
        self,
        tenant_id: str,
        user_id: str | None = None,
        status: SessionStatus | None = None,
        limit: int = 50,
    ) -> list[SessionData]:
        """List sessions with optional filtering."""
        ...
 
    @abstractmethod
    async def update_activity(self, session_id: str) -> None:
        """Update last_active timestamp."""
        ...

Storage Backends

Redis Backend

The primary storage backend uses Redis for fast access with automatic TTL-based expiration:

class RedisSessionStore(SessionStore):
    """Redis-backed session store."""
 
    KEY_PREFIX = "session:"
    DEFAULT_TTL = 3600 * 24  # 24 hours
 
    def __init__(self, redis_client, ttl: int | None = None):
        self._redis = redis_client
        self._ttl = ttl or self.DEFAULT_TTL
 
    async def save(self, session: SessionData) -> None:
        key = f"{self.KEY_PREFIX}{session.session_id}"
        data = json.dumps(session.to_dict())
        await self._redis.setex(key, self._ttl, data)
 
        # Maintain tenant index
        tenant_key = f"tenant:{session.tenant_id}:sessions"
        await self._redis.sadd(tenant_key, session.session_id)
 
    async def get(self, session_id: str) -> SessionData | None:
        key = f"{self.KEY_PREFIX}{session_id}"
        data = await self._redis.get(key)
        if data:
            return SessionData.from_dict(json.loads(data))
        return None
Redis ConfigurationValue
Key prefixsession:{session_id}
Tenant indextenant:{tenant_id}:sessions (SET)
Default TTL24 hours
SerializationJSON

PostgreSQL Backend

The PostgreSQL backend provides durable storage for sessions that must survive Redis restarts:

class PostgresSessionStore(SessionStore):
    """PostgreSQL-backed session store."""
 
    async def save(self, session: SessionData) -> None:
        async with self._pool.acquire() as conn:
            await conn.execute("""
                INSERT INTO sessions (
                    session_id, tenant_id, user_id,
                    created_at, last_active, status, metadata
                ) VALUES ($1, $2, $3, $4, $5, $6, $7)
                ON CONFLICT (session_id)
                DO UPDATE SET
                    last_active = EXCLUDED.last_active,
                    status = EXCLUDED.status,
                    metadata = EXCLUDED.metadata
            """, session.session_id, session.tenant_id, ...)

Store Factory

The factory pattern selects the appropriate backend:

def create_session_store_from_settings() -> SessionStore:
    """Create session store based on application settings."""
    settings = get_settings()
 
    if settings.redis_url:
        try:
            redis_client = create_redis_client(settings.redis_url)
            return RedisSessionStore(redis_client)
        except Exception as e:
            logger.warning("Redis unavailable, falling back", error=str(e))
 
    if settings.database_url:
        return PostgresSessionStore(settings.database_url)
 
    logger.warning("No persistent store, using in-memory sessions")
    return InMemorySessionStore()

SessionManager

The SessionManager in session/manager.py provides high-level session management:

class SessionManager:
    """High-level session management for conversations."""
 
    async def create_session(
        self,
        tenant_id: str,
        user_id: str | None = None,
        initial_context: dict | None = None,
    ) -> str:
        """Create a new conversation session."""
        session_id = str(uuid4())
        session = SessionData(
            session_id=session_id,
            tenant_id=tenant_id,
            user_id=user_id,
            metadata={"context": initial_context or {}},
        )
        await self._store.save(session)
        return session_id
 
    async def add_message(
        self,
        session_id: str,
        message: ConversationMessage,
    ) -> None:
        """Add a message to the session history."""
        session = await self._store.get(session_id)
        if not session:
            raise SessionNotFoundError(session_id)
 
        messages = session.metadata.get("messages", [])
        messages.append(message.to_dict())
        session.metadata["messages"] = messages
        session.last_active = datetime.utcnow()
 
        await self._store.save(session)
 
    async def get_history(
        self,
        session_id: str,
        limit: int | None = None,
    ) -> list[ConversationMessage]:
        """Get conversation history for a session."""
        session = await self._store.get(session_id)
        if not session:
            return []
 
        messages = session.metadata.get("messages", [])
        if limit:
            messages = messages[-limit:]
 
        return [ConversationMessage.from_dict(m) for m in messages]

Context Tracking

Each session maintains contextual state that accumulates across turns:

Session Context Fields

FieldPurposePersistence
messagesFull conversation historyStored in session metadata
current_tablesTables referenced in current querySession-scoped
active_filtersCurrently applied data filtersSession-scoped
last_sqlMost recent generated SQLFor REFINE/UNDO intents
last_resultsMost recent query resultsFor follow-up questions
visualization_prefsChart preferences from this sessionSession-scoped
schema_contextCached schema informationSession-scoped

Context Accumulation

The BI orchestrator accumulates context through the AgentState:

# After SQL generation
state.generated_sql = result.query.sql
state.query_results = query_results
 
# After analysis
state.analysis_results = analysis_result
state.insights = ["Revenue grew 15%", "APAC leads growth"]
 
# After visualization
state.chart_config = {"type": "bar", "x": "region", "y": "revenue"}
 
# On follow-up: "Now show it as a line chart"
# The state retains the data and analysis, only updating visualization

Conversation Summarization

The summarization.py module compresses long conversations to fit within LLM context windows:

class ConversationSummarizer:
    """Summarizes conversation history for context management."""
 
    def __init__(self, llm_client, max_summary_tokens: int = 500):
        self._llm = llm_client
        self._max_tokens = max_summary_tokens
 
    async def summarize(
        self,
        messages: list[ConversationMessage],
    ) -> str:
        """Generate a concise summary of the conversation."""
        conversation_text = "\n".join(
            f"{m.role.value}: {m.content}" for m in messages
        )
 
        response = await self._llm.chat([
            {"role": "system", "content": (
                "Summarize the following conversation between a user and "
                "a data analytics assistant. Focus on: questions asked, "
                "data explored, insights found, and current analysis state."
            )},
            {"role": "user", "content": conversation_text},
        ])
 
        return response.get("content", "")

Summarization Strategy

Conversation LengthStrategy
Under 10 messagesSend all messages as context
10-30 messagesSend summary + last 10 messages
Over 30 messagesSend summary + last 5 messages

Memory Service

The memory_service.py provides cross-cutting memory operations:

class MemoryService:
    """Manages memory across sessions and conversations."""
 
    async def get_relevant_context(
        self,
        session_id: str,
        query: str,
        max_tokens: int = 2000,
    ) -> dict[str, Any]:
        """Get relevant context for a new query."""
        session = await self._store.get(session_id)
 
        return {
            "summary": session.metadata.get("summary", ""),
            "recent_messages": await self._get_recent_messages(
                session_id, limit=5
            ),
            "active_tables": session.metadata.get("current_tables", []),
            "active_filters": session.metadata.get("active_filters", []),
            "last_sql": session.metadata.get("last_sql"),
        }

Session Privacy

The privacy.py module handles PII in session data:

class SessionPrivacyManager:
    """Manages privacy controls for session data."""
 
    async def sanitize_for_storage(
        self,
        message: ConversationMessage,
        tenant_id: str,
    ) -> ConversationMessage:
        """Remove PII before storing in session."""
        sanitized_content = self._pii_detector.redact(
            message.content,
            categories=["email", "phone", "ssn", "credit_card"],
        )
        return ConversationMessage(
            role=message.role,
            content=sanitized_content,
            **{k: v for k, v in message.__dict__.items()
               if k not in ("role", "content")},
        )
 
    async def cleanup_expired(
        self,
        tenant_id: str,
        retention_days: int = 30,
    ) -> int:
        """Delete sessions older than retention period."""
        cutoff = datetime.utcnow() - timedelta(days=retention_days)
        return await self._store.delete_before(tenant_id, cutoff)

Data Retention

Tenant TierSession RetentionHistory Retention
Free7 daysLast 50 messages
Standard30 daysLast 200 messages
Enterprise90 daysUnlimited
ComplianceConfigurablePer compliance policy

Cross-Session Context

The cross_session_persistence.py module enables sharing context across sessions for the same user:

class CrossSessionPersistence:
    """Persists user context across conversation sessions."""
 
    async def get_user_context(
        self,
        tenant_id: str,
        user_id: str,
    ) -> dict[str, Any]:
        """Get accumulated user context from past sessions."""
        return {
            "frequent_tables": await self._get_frequent_tables(
                tenant_id, user_id
            ),
            "common_queries": await self._get_common_queries(
                tenant_id, user_id
            ),
            "preferences": await self._get_preferences(
                tenant_id, user_id
            ),
            "last_session_summary": await self._get_last_summary(
                tenant_id, user_id
            ),
        }

This enables scenarios like "Continue where I left off" or "Show me the same report as last time."


Session Lifecycle

Create Session
    |
    v
+----------+    Message    +----------+
| ACTIVE   | <-----------> | ACTIVE   |  (loop: add messages, update state)
+----------+               +----------+
    |
    | (24h inactivity)
    v
+----------+
| EXPIRED  |
+----------+
    |
    | (retention period)
    v
+----------+
| ARCHIVED | --> Deleted after retention
+----------+
TransitionTriggerAction
Create -> ActiveAPI callInitialize session state
Active -> ActiveUser messageUpdate last_active, add message
Active -> ExpiredTTL timeoutMark as expired, no new messages
Active -> ClosedUser closesExplicit close, generate summary
Expired -> ArchivedCleanup jobMove to cold storage
Archived -> DeletedRetention policyPermanent deletion