LLM Operations
The LLM Operations Flink SQL job aggregates LLM API call metrics from the matih.ai.llm-ops Kafka topic into 15-minute tumbling windows. It tracks cache hit rates, token usage, costs, latency, and error rates grouped by provider, model, and tenant.
Source: infrastructure/flink/jobs/llm-operations-agg.sql
Job Configuration
| Property | Value |
|---|---|
| Source topic | matih.ai.llm-ops |
| Consumer group | flink-llm-ops-agg |
| Sink table | polaris.matih_analytics.llm_operations_metrics |
| Window type | Tumbling, 15 minutes |
| Watermark delay | 30 seconds |
| Group by | tenant_id, provider, model |
Source Schema
| Column | Type | Description |
|---|---|---|
tenant_id | STRING | Tenant identifier (NOT NULL) |
provider | STRING | LLM provider (openai, azure, anthropic) |
model | STRING | Model name (gpt-4o, claude-3-opus) |
event_type | STRING | Event classification |
cache_hit | BOOLEAN | Whether the response was served from cache |
tokens_input | INT | Input token count |
tokens_output | INT | Output token count |
latency_ms | INT | API call latency in milliseconds |
cost_usd | DOUBLE | Estimated cost in USD |
success | BOOLEAN | Whether the call succeeded |
timestamp | TIMESTAMP(3) | Event timestamp with watermark |
Output Columns
| Column | Expression | Description |
|---|---|---|
window_start | TUMBLE_START(timestamp, 15 MIN) | Window start time |
window_end | TUMBLE_END(timestamp, 15 MIN) | Window end time |
tenant_id | Group key | Tenant identifier |
provider | COALESCE(provider, 'unknown') | LLM provider |
model | COALESCE(model, 'unknown') | Model name |
cache_hit_rate | Cache hits / total | Proportion of cached responses |
avg_tokens_input | AVG(tokens_input) | Average input tokens per call |
avg_tokens_output | AVG(tokens_output) | Average output tokens per call |
total_requests | COUNT(*) | Total LLM API calls |
total_cost_usd | SUM(cost_usd) | Total cost in window |
avg_latency_ms | AVG(latency_ms) | Average API latency |
error_rate | Failed / total | Error rate |
Cache Hit Rate Calculation
CAST(SUM(CASE WHEN cache_hit = TRUE THEN 1 ELSE 0 END) AS DOUBLE)
/ GREATEST(COUNT(*), 1) AS cache_hit_rateA high cache hit rate indicates effective semantic caching. Low rates may suggest cache configuration issues or highly varied queries.
Cost Analysis
The aggregated cost data enables cost optimization:
| Analysis | Query Pattern |
|---|---|
| Cost per tenant | Group by tenant_id, sum total_cost_usd |
| Cost per model | Group by model, sum total_cost_usd |
| Cost trend | Time series of total_cost_usd over windows |
| Cost per request | total_cost_usd / total_requests |
Error Rate Calculation
CAST(SUM(CASE WHEN success = FALSE THEN 1 ELSE 0 END) AS DOUBLE)
/ GREATEST(COUNT(*), 1) AS error_rateCommon error categories include rate limiting (429), context length exceeded (400), and provider outages (503).
Downstream Consumers
| Consumer | Purpose |
|---|---|
| AI Service dashboard API | LLM cost and performance dashboards |
| Billing service | Tenant cost attribution |
| Grafana dashboards | Operational monitoring and alerting |
| Cost optimization engine | Model routing recommendations |
Related Pages
- Agent Performance -- Agent-level metrics
- Session Analytics -- Session metrics
- Flink Overview -- Flink architecture