WebSocket Communication
The AI Service uses WebSocket connections to deliver real-time streaming responses from agents to frontend clients. This enables progressive rendering of agent reasoning, intermediate results, and final answers without polling. The implementation is built on FastAPI's native WebSocket support with custom connection management, heartbeat monitoring, and graceful disconnect handling.
Connection Lifecycle
WebSocket connections follow a structured lifecycle:
- Connect: Client opens a WebSocket at
/ws/chat/:session_idwith JWT as a query parameter - Authenticate: Server validates the JWT and extracts tenant and user context
- Ready: Server sends a
connection_readyevent confirming the session is active - Exchange: Client sends messages, server streams back typed events
- Heartbeat: Server sends periodic pings; client responds with pongs
- Disconnect: Either side closes the connection gracefully
@router.websocket("/ws/chat/{session_id}")
async def chat_websocket(websocket: WebSocket, session_id: str):
await websocket.accept()
try:
while True:
data = await websocket.receive_json()
async for chunk in orchestrator.stream(session_id, data["content"]):
await websocket.send_json(chunk.to_dict())
except WebSocketDisconnect:
connection_manager.disconnect(session_id)Message Protocol
All WebSocket messages follow a typed event structure:
Client Messages
| Type | Description | Payload Fields |
|---|---|---|
message | User chat message | content (string) |
refine | Query refinement | refinement (string) |
cancel | Cancel current operation | none |
pong | Heartbeat response | none |
Server Events
| Type | Description | Payload Fields |
|---|---|---|
connection_ready | Connection established | session_id, capabilities |
stream_start | Agent begins processing | agent (string) |
text | Incremental text chunk | content (string) |
sql | Generated SQL query | sql (string), dialect (string) |
data | Query result data | columns, rows |
visualization | Chart specification | type, config |
insight | Analytical insight | content (string) |
tool_call | Agent invoked a tool | tool, input |
tool_result | Tool execution result | tool, output |
status | Processing status update | status (string) |
error | Error during processing | code, message |
stream_end | Processing complete | execution_time_ms |
ping | Server heartbeat | none |
Connection Manager
The ConnectionManager class tracks active WebSocket connections per tenant:
class ConnectionManager:
def __init__(self):
self._connections: dict[str, WebSocket] = {}
async def connect(self, session_id: str, websocket: WebSocket):
self._connections[session_id] = websocket
def disconnect(self, session_id: str):
self._connections.pop(session_id, None)
async def broadcast(self, tenant_id: str, message: dict):
for sid, ws in self._connections.items():
if sid.startswith(tenant_id):
await ws.send_json(message)Configuration
| Environment Variable | Default | Description |
|---|---|---|
WS_MAX_CONNECTIONS | 100 | Maximum concurrent WebSocket connections |
WS_HEARTBEAT_INTERVAL | 30 | Seconds between server heartbeat pings |
WS_IDLE_TIMEOUT | 300 | Seconds before idle connection is closed |
WS_MAX_MESSAGE_SIZE | 65536 | Maximum message size in bytes |
Error Handling
WebSocket errors are sent as typed error events before closing the connection:
{
"type": "error",
"code": "SESSION_NOT_FOUND",
"message": "Session sess-abc123 not found"
}| Error Code | Description |
|---|---|
AUTH_FAILED | JWT validation failed |
SESSION_NOT_FOUND | Requested session does not exist |
RATE_LIMITED | Too many concurrent connections |
INTERNAL_ERROR | Unexpected server error |