MATIH Platform is in active MVP development. Documentation reflects current implementation status.
12. AI Service
Integrations
WebSocket Communication

WebSocket Communication

The AI Service uses WebSocket connections to deliver real-time streaming responses from agents to frontend clients. This enables progressive rendering of agent reasoning, intermediate results, and final answers without polling. The implementation is built on FastAPI's native WebSocket support with custom connection management, heartbeat monitoring, and graceful disconnect handling.


Connection Lifecycle

WebSocket connections follow a structured lifecycle:

  1. Connect: Client opens a WebSocket at /ws/chat/:session_id with JWT as a query parameter
  2. Authenticate: Server validates the JWT and extracts tenant and user context
  3. Ready: Server sends a connection_ready event confirming the session is active
  4. Exchange: Client sends messages, server streams back typed events
  5. Heartbeat: Server sends periodic pings; client responds with pongs
  6. Disconnect: Either side closes the connection gracefully
@router.websocket("/ws/chat/{session_id}")
async def chat_websocket(websocket: WebSocket, session_id: str):
    await websocket.accept()
    try:
        while True:
            data = await websocket.receive_json()
            async for chunk in orchestrator.stream(session_id, data["content"]):
                await websocket.send_json(chunk.to_dict())
    except WebSocketDisconnect:
        connection_manager.disconnect(session_id)

Message Protocol

All WebSocket messages follow a typed event structure:

Client Messages

TypeDescriptionPayload Fields
messageUser chat messagecontent (string)
refineQuery refinementrefinement (string)
cancelCancel current operationnone
pongHeartbeat responsenone

Server Events

TypeDescriptionPayload Fields
connection_readyConnection establishedsession_id, capabilities
stream_startAgent begins processingagent (string)
textIncremental text chunkcontent (string)
sqlGenerated SQL querysql (string), dialect (string)
dataQuery result datacolumns, rows
visualizationChart specificationtype, config
insightAnalytical insightcontent (string)
tool_callAgent invoked a tooltool, input
tool_resultTool execution resulttool, output
statusProcessing status updatestatus (string)
errorError during processingcode, message
stream_endProcessing completeexecution_time_ms
pingServer heartbeatnone

Connection Manager

The ConnectionManager class tracks active WebSocket connections per tenant:

class ConnectionManager:
    def __init__(self):
        self._connections: dict[str, WebSocket] = {}
 
    async def connect(self, session_id: str, websocket: WebSocket):
        self._connections[session_id] = websocket
 
    def disconnect(self, session_id: str):
        self._connections.pop(session_id, None)
 
    async def broadcast(self, tenant_id: str, message: dict):
        for sid, ws in self._connections.items():
            if sid.startswith(tenant_id):
                await ws.send_json(message)

Configuration

Environment VariableDefaultDescription
WS_MAX_CONNECTIONS100Maximum concurrent WebSocket connections
WS_HEARTBEAT_INTERVAL30Seconds between server heartbeat pings
WS_IDLE_TIMEOUT300Seconds before idle connection is closed
WS_MAX_MESSAGE_SIZE65536Maximum message size in bytes

Error Handling

WebSocket errors are sent as typed error events before closing the connection:

{
  "type": "error",
  "code": "SESSION_NOT_FOUND",
  "message": "Session sess-abc123 not found"
}
Error CodeDescription
AUTH_FAILEDJWT validation failed
SESSION_NOT_FOUNDRequested session does not exist
RATE_LIMITEDToo many concurrent connections
INTERNAL_ERRORUnexpected server error