Session and Context Management
The AI Service maintains conversation sessions that persist across multiple user interactions, enabling multi-turn conversations with context awareness, conversation history, and state management. The session subsystem uses a pluggable storage backend with Redis as the primary store and PostgreSQL as a durable fallback. This section covers the session lifecycle, storage architecture, memory management, context summarization, and privacy controls.
Session Architecture
User Request (with session_id)
|
v
+-------------------+
| SessionManager | <-- High-level conversation tracking
+-------------------+
|
v
+-------------------+
| SessionStore | <-- Pluggable storage interface
+-------------------+
|
+------- Redis (primary) ------> Fast, TTL-based
|
+------- PostgreSQL (fallback) -> Durable, queryable
|
+------- InMemory (dev) -------> Development onlySession Data Model
The SessionData model in session/store.py defines the core session structure:
@dataclass
class SessionData:
"""Core session data structure."""
session_id: str
tenant_id: str
user_id: str | None = None
created_at: datetime = field(default_factory=datetime.utcnow)
last_active: datetime = field(default_factory=datetime.utcnow)
status: SessionStatus = SessionStatus.ACTIVE
metadata: dict[str, Any] = field(default_factory=dict)
class SessionStatus(str, Enum):
ACTIVE = "active"
EXPIRED = "expired"
CLOSED = "closed"
ARCHIVED = "archived"Conversation Messages
Each session contains a history of conversation messages:
@dataclass
class ConversationMessage:
"""A message in a conversation."""
id: str
role: MessageRole # USER, ASSISTANT, SYSTEM, TOOL
content: str
timestamp: datetime
# Optional context fields
tool_calls: list[dict] # Tool call requests
tool_call_id: str | None # Tool response reference
tool_name: str | None # Tool that generated this message
# SQL context
sql: str | None # Generated SQL
sql_executed: bool # Whether SQL was executed
sql_result: dict | None # Query results
# Visualization context
visualization: dict | None # Chart configuration
chart_type: str | None # Chart type
# Analysis context
insights: list[str] # Generated insightsThis rich message model captures not just the text of each message but also the artifacts produced during processing, enabling the UI to render SQL, charts, and insights inline with the conversation.
SessionStore Interface
The SessionStore abstract base class defines the storage contract:
class SessionStore(ABC):
"""Abstract interface for session storage."""
@abstractmethod
async def save(self, session: SessionData) -> None:
"""Save or update a session."""
...
@abstractmethod
async def get(self, session_id: str) -> SessionData | None:
"""Get a session by ID."""
...
@abstractmethod
async def delete(self, session_id: str) -> None:
"""Delete a session."""
...
@abstractmethod
async def list_sessions(
self,
tenant_id: str,
user_id: str | None = None,
status: SessionStatus | None = None,
limit: int = 50,
) -> list[SessionData]:
"""List sessions with optional filtering."""
...
@abstractmethod
async def update_activity(self, session_id: str) -> None:
"""Update last_active timestamp."""
...Storage Backends
Redis Backend
The primary storage backend uses Redis for fast access with automatic TTL-based expiration:
class RedisSessionStore(SessionStore):
"""Redis-backed session store."""
KEY_PREFIX = "session:"
DEFAULT_TTL = 3600 * 24 # 24 hours
def __init__(self, redis_client, ttl: int | None = None):
self._redis = redis_client
self._ttl = ttl or self.DEFAULT_TTL
async def save(self, session: SessionData) -> None:
key = f"{self.KEY_PREFIX}{session.session_id}"
data = json.dumps(session.to_dict())
await self._redis.setex(key, self._ttl, data)
# Maintain tenant index
tenant_key = f"tenant:{session.tenant_id}:sessions"
await self._redis.sadd(tenant_key, session.session_id)
async def get(self, session_id: str) -> SessionData | None:
key = f"{self.KEY_PREFIX}{session_id}"
data = await self._redis.get(key)
if data:
return SessionData.from_dict(json.loads(data))
return None| Redis Configuration | Value |
|---|---|
| Key prefix | session:{session_id} |
| Tenant index | tenant:{tenant_id}:sessions (SET) |
| Default TTL | 24 hours |
| Serialization | JSON |
PostgreSQL Backend
The PostgreSQL backend provides durable storage for sessions that must survive Redis restarts:
class PostgresSessionStore(SessionStore):
"""PostgreSQL-backed session store."""
async def save(self, session: SessionData) -> None:
async with self._pool.acquire() as conn:
await conn.execute("""
INSERT INTO sessions (
session_id, tenant_id, user_id,
created_at, last_active, status, metadata
) VALUES ($1, $2, $3, $4, $5, $6, $7)
ON CONFLICT (session_id)
DO UPDATE SET
last_active = EXCLUDED.last_active,
status = EXCLUDED.status,
metadata = EXCLUDED.metadata
""", session.session_id, session.tenant_id, ...)Store Factory
The factory pattern selects the appropriate backend:
def create_session_store_from_settings() -> SessionStore:
"""Create session store based on application settings."""
settings = get_settings()
if settings.redis_url:
try:
redis_client = create_redis_client(settings.redis_url)
return RedisSessionStore(redis_client)
except Exception as e:
logger.warning("Redis unavailable, falling back", error=str(e))
if settings.database_url:
return PostgresSessionStore(settings.database_url)
logger.warning("No persistent store, using in-memory sessions")
return InMemorySessionStore()SessionManager
The SessionManager in session/manager.py provides high-level session management:
class SessionManager:
"""High-level session management for conversations."""
async def create_session(
self,
tenant_id: str,
user_id: str | None = None,
initial_context: dict | None = None,
) -> str:
"""Create a new conversation session."""
session_id = str(uuid4())
session = SessionData(
session_id=session_id,
tenant_id=tenant_id,
user_id=user_id,
metadata={"context": initial_context or {}},
)
await self._store.save(session)
return session_id
async def add_message(
self,
session_id: str,
message: ConversationMessage,
) -> None:
"""Add a message to the session history."""
session = await self._store.get(session_id)
if not session:
raise SessionNotFoundError(session_id)
messages = session.metadata.get("messages", [])
messages.append(message.to_dict())
session.metadata["messages"] = messages
session.last_active = datetime.utcnow()
await self._store.save(session)
async def get_history(
self,
session_id: str,
limit: int | None = None,
) -> list[ConversationMessage]:
"""Get conversation history for a session."""
session = await self._store.get(session_id)
if not session:
return []
messages = session.metadata.get("messages", [])
if limit:
messages = messages[-limit:]
return [ConversationMessage.from_dict(m) for m in messages]Context Tracking
Each session maintains contextual state that accumulates across turns:
Session Context Fields
| Field | Purpose | Persistence |
|---|---|---|
messages | Full conversation history | Stored in session metadata |
current_tables | Tables referenced in current query | Session-scoped |
active_filters | Currently applied data filters | Session-scoped |
last_sql | Most recent generated SQL | For REFINE/UNDO intents |
last_results | Most recent query results | For follow-up questions |
visualization_prefs | Chart preferences from this session | Session-scoped |
schema_context | Cached schema information | Session-scoped |
Context Accumulation
The BI orchestrator accumulates context through the AgentState:
# After SQL generation
state.generated_sql = result.query.sql
state.query_results = query_results
# After analysis
state.analysis_results = analysis_result
state.insights = ["Revenue grew 15%", "APAC leads growth"]
# After visualization
state.chart_config = {"type": "bar", "x": "region", "y": "revenue"}
# On follow-up: "Now show it as a line chart"
# The state retains the data and analysis, only updating visualizationConversation Summarization
The summarization.py module compresses long conversations to fit within LLM context windows:
class ConversationSummarizer:
"""Summarizes conversation history for context management."""
def __init__(self, llm_client, max_summary_tokens: int = 500):
self._llm = llm_client
self._max_tokens = max_summary_tokens
async def summarize(
self,
messages: list[ConversationMessage],
) -> str:
"""Generate a concise summary of the conversation."""
conversation_text = "\n".join(
f"{m.role.value}: {m.content}" for m in messages
)
response = await self._llm.chat([
{"role": "system", "content": (
"Summarize the following conversation between a user and "
"a data analytics assistant. Focus on: questions asked, "
"data explored, insights found, and current analysis state."
)},
{"role": "user", "content": conversation_text},
])
return response.get("content", "")Summarization Strategy
| Conversation Length | Strategy |
|---|---|
| Under 10 messages | Send all messages as context |
| 10-30 messages | Send summary + last 10 messages |
| Over 30 messages | Send summary + last 5 messages |
Memory Service
The memory_service.py provides cross-cutting memory operations:
class MemoryService:
"""Manages memory across sessions and conversations."""
async def get_relevant_context(
self,
session_id: str,
query: str,
max_tokens: int = 2000,
) -> dict[str, Any]:
"""Get relevant context for a new query."""
session = await self._store.get(session_id)
return {
"summary": session.metadata.get("summary", ""),
"recent_messages": await self._get_recent_messages(
session_id, limit=5
),
"active_tables": session.metadata.get("current_tables", []),
"active_filters": session.metadata.get("active_filters", []),
"last_sql": session.metadata.get("last_sql"),
}Session Privacy
The privacy.py module handles PII in session data:
class SessionPrivacyManager:
"""Manages privacy controls for session data."""
async def sanitize_for_storage(
self,
message: ConversationMessage,
tenant_id: str,
) -> ConversationMessage:
"""Remove PII before storing in session."""
sanitized_content = self._pii_detector.redact(
message.content,
categories=["email", "phone", "ssn", "credit_card"],
)
return ConversationMessage(
role=message.role,
content=sanitized_content,
**{k: v for k, v in message.__dict__.items()
if k not in ("role", "content")},
)
async def cleanup_expired(
self,
tenant_id: str,
retention_days: int = 30,
) -> int:
"""Delete sessions older than retention period."""
cutoff = datetime.utcnow() - timedelta(days=retention_days)
return await self._store.delete_before(tenant_id, cutoff)Data Retention
| Tenant Tier | Session Retention | History Retention |
|---|---|---|
| Free | 7 days | Last 50 messages |
| Standard | 30 days | Last 200 messages |
| Enterprise | 90 days | Unlimited |
| Compliance | Configurable | Per compliance policy |
Cross-Session Context
The cross_session_persistence.py module enables sharing context across sessions for the same user:
class CrossSessionPersistence:
"""Persists user context across conversation sessions."""
async def get_user_context(
self,
tenant_id: str,
user_id: str,
) -> dict[str, Any]:
"""Get accumulated user context from past sessions."""
return {
"frequent_tables": await self._get_frequent_tables(
tenant_id, user_id
),
"common_queries": await self._get_common_queries(
tenant_id, user_id
),
"preferences": await self._get_preferences(
tenant_id, user_id
),
"last_session_summary": await self._get_last_summary(
tenant_id, user_id
),
}This enables scenarios like "Continue where I left off" or "Show me the same report as last time."
Session Lifecycle
Create Session
|
v
+----------+ Message +----------+
| ACTIVE | <-----------> | ACTIVE | (loop: add messages, update state)
+----------+ +----------+
|
| (24h inactivity)
v
+----------+
| EXPIRED |
+----------+
|
| (retention period)
v
+----------+
| ARCHIVED | --> Deleted after retention
+----------+| Transition | Trigger | Action |
|---|---|---|
| Create -> Active | API call | Initialize session state |
| Active -> Active | User message | Update last_active, add message |
| Active -> Expired | TTL timeout | Mark as expired, no new messages |
| Active -> Closed | User closes | Explicit close, generate summary |
| Expired -> Archived | Cleanup job | Move to cold storage |
| Archived -> Deleted | Retention policy | Permanent deletion |