MATIH Platform is in active MVP development. Documentation reflects current implementation status.
11. Pipelines & Data Engineering
Flink Streaming Jobs
Session Analytics

Session Analytics

The Session Analytics Flink SQL job aggregates conversational session metrics from the matih.ai.state-changes Kafka topic into 15-minute tumbling windows. It tracks session counts, duration, completion rates, and user activity per tenant.

Source: infrastructure/flink/jobs/session-analytics.sql


Job Configuration

PropertyValue
Source topicmatih.ai.state-changes
Consumer groupflink-session-analytics
Sink tablepolaris.matih_analytics.session_analytics
Window typeTumbling, 15 minutes
Watermark delay30 seconds
Filterentity_type = 'session_phase'

Source Schema

The job reads from the matih.ai.state-changes Kafka topic with the following schema:

ColumnTypeDescription
event_idSTRINGUnique event identifier
event_typeSTRINGEvent classification
tenant_idSTRINGTenant identifier (NOT NULL)
entity_typeSTRINGEntity type (filtered to session_phase)
entity_idSTRINGSession identifier
from_stateSTRINGPrevious state
to_stateSTRINGNew state
triggered_bySTRINGUser or system that triggered the transition
metadataSTRINGJSON string with session-level metrics
timestampTIMESTAMP(3)Event timestamp with watermark

Output Columns

ColumnExpressionDescription
window_startTUMBLE_START(timestamp, 15 MIN)Window start time
window_endTUMBLE_END(timestamp, 15 MIN)Window end time
tenant_idGroup keyTenant identifier
total_sessionsCOUNT(DISTINCT entity_id)Unique sessions in window
avg_duration_msAVG(metadata.duration_ms)Average session duration
avg_turnsAVG(metadata.turn_count)Average conversation turns
completion_rateCompleted / totalFraction reaching completed state
unique_usersCOUNT(DISTINCT triggered_by)Unique users in window
top_agentsFrom metadata.agent_idAgents active in window
top_intentsFrom metadata.intentIntents detected in window

Metadata Extraction

The metadata field is a JSON string. Values are extracted using JSON_VALUE():

AVG(CAST(JSON_VALUE(metadata, '$.duration_ms') AS DOUBLE)) AS avg_duration_ms
AVG(CAST(JSON_VALUE(metadata, '$.turn_count') AS DOUBLE))  AS avg_turns

The transition to completed state should include duration_ms and turn_count in the metadata payload.


Completion Rate Calculation

CAST(SUM(CASE WHEN to_state = 'completed' THEN 1 ELSE 0 END) AS DOUBLE)
    / GREATEST(COUNT(DISTINCT entity_id), 1) AS completion_rate

The GREATEST(..., 1) prevents division by zero when no sessions appear in a window.


Downstream Consumers

The session_analytics Iceberg table is queried by:

ConsumerPurpose
AI Service dashboard APISession metrics in BI dashboards
Grafana dashboardsOperational monitoring
Trino ad-hoc queriesAnalytics investigations

Related Pages