WebSocket and SSE Streaming
The AI Service implements real-time streaming through two complementary protocols: WebSocket for bidirectional communication and Server-Sent Events (SSE) for unidirectional response streaming. The streaming subsystem handles progressive response delivery, status updates, tool execution feedback, cancellation support, and backpressure management. This section covers the streaming architecture, event protocol, and connection management.
Streaming Architecture
Client (Browser/App)
|
+-- WebSocket (/ws/chat) --------> ConnectionManager
| |
| v
| AgentOrchestrator
| |
+-- SSE (GET /stream) ----------> StreamManager
| |
v v
+---------------------------------------------------+
| Stream Event Pipeline |
| StreamEvent -> Serialization -> Delivery |
| + Backpressure + Cancellation + Heartbeat |
+---------------------------------------------------+Core Types
The streaming subsystem defines its event types in agents/streaming.py:
StreamEventType
class StreamEventType(str, Enum):
"""Types of stream events."""
# Content events
TOKEN = "token" # Single token from LLM
CHUNK = "chunk" # Text chunk (multiple tokens)
CONTENT = "content" # Complete content block
# Progress events
PROGRESS = "progress" # Progress percentage
STATUS = "status" # Status update message
STEP_START = "step_start" # Processing step started
STEP_COMPLETE = "step_complete" # Processing step completed
# Control events
START = "start" # Stream started
END = "end" # Stream ended
ERROR = "error" # Error occurred
CANCEL = "cancel" # Stream cancelled
PAUSE = "pause" # Stream paused
RESUME = "resume" # Stream resumed
# Data events
TOOL_CALL = "tool_call" # Agent calling a tool
TOOL_RESULT = "tool_result" # Tool execution result
CITATION = "citation" # Source citation
METADATA = "metadata" # Additional metadata
# Heartbeat
PING = "ping" # Keep-alive pingStreamEvent
@dataclass
class StreamEvent(Generic[T]):
"""A single event in a stream."""
id: str # Unique event ID
type: StreamEventType # Event type
data: T # Event payload
timestamp: datetime # Event timestamp
sequence: int = 0 # Sequence number for ordering
metadata: dict[str, Any] = field(default_factory=dict)
def to_dict(self) -> dict[str, Any]:
"""Convert to dictionary for serialization."""
return {
"id": self.id,
"type": self.type.value,
"data": self.data,
"timestamp": self.timestamp.isoformat(),
"sequence": self.sequence,
"metadata": self.metadata,
}
def to_sse(self) -> str:
"""Convert to Server-Sent Event format."""
data_json = json.dumps(self.to_dict(), default=str)
return f"id: {self.id}\nevent: {self.type.value}\ndata: {data_json}\n\n"StreamStatus
class StreamStatus(str, Enum):
"""Status of a stream."""
PENDING = "pending" # Stream created, not yet active
ACTIVE = "active" # Currently streaming
PAUSED = "paused" # Temporarily paused
COMPLETED = "completed" # Successfully completed
CANCELLED = "cancelled" # Cancelled by client
ERROR = "error" # Terminated due to errorWebSocket Protocol
The WebSocket API in agents/websocket_api.py provides bidirectional communication for agent conversations.
Message Types
class MessageType(str, Enum):
"""Types of WebSocket messages."""
# Client -> Server
CHAT = "chat" # User message
CANCEL = "cancel" # Cancel current operation
FEEDBACK = "feedback" # User feedback on response
APPROVAL = "approval" # HITL approval decision
PING = "ping" # Client keepalive
# Server -> Client
TEXT = "text" # Text response chunk
TOOL_START = "tool_start" # Tool execution started
TOOL_RESULT = "tool_result" # Tool execution result
STATUS = "status" # Status update
ERROR = "error" # Error message
APPROVAL_REQUIRED = "approval_required" # HITL approval needed
GUARDRAIL_WARNING = "guardrail_warning" # Content safety warning
COMPLETE = "complete" # Response complete
PONG = "pong" # Server keepalive responseWebSocket Message Format
@dataclass
class WebSocketMessage:
"""A WebSocket message."""
type: MessageType
data: dict[str, Any]
id: str # Message ID for correlation
timestamp: datetime
def to_json(self) -> str:
return json.dumps({
"type": self.type.value,
"data": self.data,
"id": self.id,
"timestamp": self.timestamp.isoformat(),
})
@classmethod
def from_json(cls, data: str) -> "WebSocketMessage":
parsed = json.loads(data)
return cls(
type=MessageType(parsed.get("type", "chat")),
data=parsed.get("data", {}),
id=parsed.get("id", str(uuid4())),
)Connection Manager
The ConnectionManager handles WebSocket lifecycle:
class ConnectionManager:
"""Manages WebSocket connections."""
def __init__(self) -> None:
self._connections: dict[str, WebSocket] = {}
self._sessions: dict[str, str] = {} # conn_id -> session_id
self._user_connections: dict[str, set[str]] = {} # user_id -> conn_ids
async def connect(
self,
websocket: WebSocket,
connection_id: str,
session_id: str,
user_id: str | None = None,
) -> None:
"""Accept and register a WebSocket connection."""
await websocket.accept()
self._connections[connection_id] = websocket
self._sessions[connection_id] = session_id
if user_id:
self._user_connections.setdefault(user_id, set()).add(connection_id)
async def disconnect(self, connection_id: str) -> None:
"""Remove a WebSocket connection."""
self._connections.pop(connection_id, None)
self._sessions.pop(connection_id, None)
async def send_message(
self,
connection_id: str,
message: WebSocketMessage,
) -> None:
"""Send a message to a specific connection."""
ws = self._connections.get(connection_id)
if ws:
await ws.send_text(message.to_json())WebSocket Endpoint
router = APIRouter(prefix="/ws", tags=["WebSocket"])
@router.websocket("/chat")
async def websocket_chat(
websocket: WebSocket,
session_id: str = Query(None),
tenant_id: str = Query(...),
token: str = Query(...),
):
"""WebSocket endpoint for real-time chat."""
# Validate JWT token
user_id = validate_token(token)
connection_id = str(uuid4())
await manager.connect(websocket, connection_id, session_id, user_id)
try:
while True:
# Receive message from client
raw = await websocket.receive_text()
message = WebSocketMessage.from_json(raw)
if message.type == MessageType.CHAT:
# Process through agent orchestrator with streaming
async for chunk in orchestrator.process_message(
agent_id="default",
session_id=session_id,
tenant_id=tenant_id,
message=message.data.get("content", ""),
stream=True,
):
await manager.send_message(
connection_id,
WebSocketMessage(
type=MessageType.TEXT,
data={"content": chunk.content},
),
)
# Send completion signal
await manager.send_message(
connection_id,
WebSocketMessage(type=MessageType.COMPLETE, data={}),
)
elif message.type == MessageType.CANCEL:
# Cancel current operation
await cancel_stream(session_id)
elif message.type == MessageType.PING:
await manager.send_message(
connection_id,
WebSocketMessage(type=MessageType.PONG, data={}),
)
except WebSocketDisconnect:
await manager.disconnect(connection_id)Server-Sent Events (SSE)
SSE provides a simpler unidirectional streaming protocol for clients that do not need to send messages during streaming:
SSE Endpoint
from fastapi.responses import StreamingResponse
@app.post("/api/v1/conversation/stream")
async def stream_conversation(
request: ConversationRequest,
) -> StreamingResponse:
"""Stream conversation response via SSE."""
async def event_generator():
async for chunk in orchestrator.process_message(
agent_id=request.agent_id,
session_id=request.session_id,
tenant_id=request.tenant_id,
message=request.message,
stream=True,
):
event = StreamEvent(
id=str(uuid4()),
type=StreamEventType.CHUNK,
data=chunk.content,
)
yield event.to_sse()
# Send final event
yield StreamEvent(
id=str(uuid4()),
type=StreamEventType.END,
data="",
).to_sse()
return StreamingResponse(
event_generator(),
media_type="text/event-stream",
headers={
"Cache-Control": "no-cache",
"Connection": "keep-alive",
"X-Accel-Buffering": "no",
},
)SSE Wire Format
id: evt-001
event: start
data: {"id":"evt-001","type":"start","data":"","timestamp":"2026-02-12T10:00:00"}
id: evt-002
event: step_start
data: {"id":"evt-002","type":"step_start","data":"Classifying intent..."}
id: evt-003
event: chunk
data: {"id":"evt-003","type":"chunk","data":"Based on your data,"}
id: evt-004
event: chunk
data: {"id":"evt-004","type":"chunk","data":" total revenue for Q4"}
id: evt-005
event: tool_call
data: {"id":"evt-005","type":"tool_call","data":{"name":"execute_sql","args":{}}}
id: evt-006
event: tool_result
data: {"id":"evt-006","type":"tool_result","data":{"rows":100}}
id: evt-007
event: end
data: {"id":"evt-007","type":"end","data":"","metadata":{"tokens":245}}
StreamChunk Protocol
The orchestrator produces StreamChunk objects during streaming:
@dataclass
class StreamChunk:
"""Streaming chunk from agent."""
type: str # text, tool_call, tool_result, status, error
content: str = ""
data: dict[str, Any] = field(default_factory=dict)
is_final: bool = False| Chunk Type | Content | When Emitted |
|---|---|---|
text | Token or text fragment | During LLM generation |
tool_call | Tool name and arguments | When agent calls a tool |
tool_result | Tool execution output | After tool completes |
status | Status message | During processing steps |
error | Error description | On processing error |
Backpressure Handling
The streaming subsystem implements backpressure to prevent overwhelming slow clients:
class BackpressureManager:
"""Manages backpressure for stream consumers."""
def __init__(self, max_buffer_size: int = 100):
self._buffer: asyncio.Queue = asyncio.Queue(maxsize=max_buffer_size)
self._paused: bool = False
async def put(self, event: StreamEvent) -> None:
"""Put event into buffer, pausing if full."""
try:
self._buffer.put_nowait(event)
except asyncio.QueueFull:
self._paused = True
await self._buffer.put(event) # Block until space
self._paused = False
async def get(self) -> StreamEvent:
"""Get next event from buffer."""
return await self._buffer.get()Cancellation Support
Clients can cancel in-flight requests through both WebSocket and HTTP:
WebSocket Cancellation
# Client sends cancel message
{"type": "cancel", "data": {"session_id": "sess-123"}, "id": "msg-456"}
# Server handles cancellation
elif message.type == MessageType.CANCEL:
await cancel_stream(session_id)
await manager.send_message(
connection_id,
WebSocketMessage(
type=MessageType.STATUS,
data={"status": "cancelled"},
),
)HTTP Cancellation
DELETE /api/v1/conversation/stream/{session_id}Heartbeat and Connection Health
The WebSocket subsystem implements heartbeat for connection health monitoring:
| Parameter | Value |
|---|---|
| Client ping interval | 30 seconds |
| Server pong timeout | 10 seconds |
| Idle connection timeout | 5 minutes |
| Max connections per user | 5 |
# Heartbeat task
async def heartbeat_loop(connection_id: str):
while connection_id in manager._connections:
await asyncio.sleep(30)
await manager.send_message(
connection_id,
WebSocketMessage(type=MessageType.PONG, data={}),
)Client Integration
JavaScript WebSocket Client
const ws = new WebSocket(
`wss://api.example.com/ws/chat?tenant_id=acme&session_id=sess-123&token=jwt-token`
);
ws.onmessage = (event) => {
const msg = JSON.parse(event.data);
switch (msg.type) {
case "text":
appendToResponse(msg.data.content);
break;
case "tool_start":
showToolIndicator(msg.data.tool_name);
break;
case "tool_result":
hideToolIndicator();
break;
case "complete":
finalizeResponse();
break;
case "error":
showError(msg.data.message);
break;
}
};
// Send user message
ws.send(JSON.stringify({
type: "chat",
data: { content: "Show me total sales by region" },
id: crypto.randomUUID(),
}));JavaScript SSE Client
const response = await fetch("/api/v1/conversation/stream", {
method: "POST",
headers: { "Content-Type": "application/json" },
body: JSON.stringify({
session_id: "sess-123",
tenant_id: "acme",
message: "Show me total sales",
}),
});
const reader = response.body.getReader();
const decoder = new TextDecoder();
while (true) {
const { done, value } = await reader.read();
if (done) break;
const text = decoder.decode(value);
// Parse SSE events from text
parseSSEEvents(text).forEach(handleEvent);
}