Session Analytics
The Session Analytics Flink SQL job aggregates conversational session metrics from the matih.ai.state-changes Kafka topic into 15-minute tumbling windows. It tracks session counts, duration, completion rates, and user activity per tenant.
Source: infrastructure/flink/jobs/session-analytics.sql
Job Configuration
| Property | Value |
|---|---|
| Source topic | matih.ai.state-changes |
| Consumer group | flink-session-analytics |
| Sink table | polaris.matih_analytics.session_analytics |
| Window type | Tumbling, 15 minutes |
| Watermark delay | 30 seconds |
| Filter | entity_type = 'session_phase' |
Source Schema
The job reads from the matih.ai.state-changes Kafka topic with the following schema:
| Column | Type | Description |
|---|---|---|
event_id | STRING | Unique event identifier |
event_type | STRING | Event classification |
tenant_id | STRING | Tenant identifier (NOT NULL) |
entity_type | STRING | Entity type (filtered to session_phase) |
entity_id | STRING | Session identifier |
from_state | STRING | Previous state |
to_state | STRING | New state |
triggered_by | STRING | User or system that triggered the transition |
metadata | STRING | JSON string with session-level metrics |
timestamp | TIMESTAMP(3) | Event timestamp with watermark |
Output Columns
| Column | Expression | Description |
|---|---|---|
window_start | TUMBLE_START(timestamp, 15 MIN) | Window start time |
window_end | TUMBLE_END(timestamp, 15 MIN) | Window end time |
tenant_id | Group key | Tenant identifier |
total_sessions | COUNT(DISTINCT entity_id) | Unique sessions in window |
avg_duration_ms | AVG(metadata.duration_ms) | Average session duration |
avg_turns | AVG(metadata.turn_count) | Average conversation turns |
completion_rate | Completed / total | Fraction reaching completed state |
unique_users | COUNT(DISTINCT triggered_by) | Unique users in window |
top_agents | From metadata.agent_id | Agents active in window |
top_intents | From metadata.intent | Intents detected in window |
Metadata Extraction
The metadata field is a JSON string. Values are extracted using JSON_VALUE():
AVG(CAST(JSON_VALUE(metadata, '$.duration_ms') AS DOUBLE)) AS avg_duration_ms
AVG(CAST(JSON_VALUE(metadata, '$.turn_count') AS DOUBLE)) AS avg_turnsThe transition to completed state should include duration_ms and turn_count in the metadata payload.
Completion Rate Calculation
CAST(SUM(CASE WHEN to_state = 'completed' THEN 1 ELSE 0 END) AS DOUBLE)
/ GREATEST(COUNT(DISTINCT entity_id), 1) AS completion_rateThe GREATEST(..., 1) prevents division by zero when no sessions appear in a window.
Downstream Consumers
The session_analytics Iceberg table is queried by:
| Consumer | Purpose |
|---|---|
| AI Service dashboard API | Session metrics in BI dashboards |
| Grafana dashboards | Operational monitoring |
| Trino ad-hoc queries | Analytics investigations |
Related Pages
- Agent Performance -- Agent-level metrics
- LLM Operations -- LLM usage analytics
- Flink Overview -- Flink architecture