MATIH Platform is in active MVP development. Documentation reflects current implementation status.
12. AI Service
Streaming

WebSocket and SSE Streaming

The AI Service implements real-time streaming through two complementary protocols: WebSocket for bidirectional communication and Server-Sent Events (SSE) for unidirectional response streaming. The streaming subsystem handles progressive response delivery, status updates, tool execution feedback, cancellation support, and backpressure management. This section covers the streaming architecture, event protocol, and connection management.


Streaming Architecture

Client (Browser/App)
    |
    +-- WebSocket (/ws/chat) --------> ConnectionManager
    |                                      |
    |                                      v
    |                              AgentOrchestrator
    |                                      |
    +-- SSE (GET /stream) ----------> StreamManager
    |                                      |
    v                                      v
+---------------------------------------------------+
|              Stream Event Pipeline                 |
|  StreamEvent -> Serialization -> Delivery          |
|  + Backpressure + Cancellation + Heartbeat         |
+---------------------------------------------------+

Core Types

The streaming subsystem defines its event types in agents/streaming.py:

StreamEventType

class StreamEventType(str, Enum):
    """Types of stream events."""
    # Content events
    TOKEN = "token"              # Single token from LLM
    CHUNK = "chunk"              # Text chunk (multiple tokens)
    CONTENT = "content"          # Complete content block
 
    # Progress events
    PROGRESS = "progress"        # Progress percentage
    STATUS = "status"            # Status update message
    STEP_START = "step_start"    # Processing step started
    STEP_COMPLETE = "step_complete"  # Processing step completed
 
    # Control events
    START = "start"              # Stream started
    END = "end"                  # Stream ended
    ERROR = "error"              # Error occurred
    CANCEL = "cancel"            # Stream cancelled
    PAUSE = "pause"              # Stream paused
    RESUME = "resume"            # Stream resumed
 
    # Data events
    TOOL_CALL = "tool_call"      # Agent calling a tool
    TOOL_RESULT = "tool_result"  # Tool execution result
    CITATION = "citation"        # Source citation
    METADATA = "metadata"        # Additional metadata
 
    # Heartbeat
    PING = "ping"                # Keep-alive ping

StreamEvent

@dataclass
class StreamEvent(Generic[T]):
    """A single event in a stream."""
    id: str                        # Unique event ID
    type: StreamEventType          # Event type
    data: T                        # Event payload
    timestamp: datetime            # Event timestamp
    sequence: int = 0              # Sequence number for ordering
    metadata: dict[str, Any] = field(default_factory=dict)
 
    def to_dict(self) -> dict[str, Any]:
        """Convert to dictionary for serialization."""
        return {
            "id": self.id,
            "type": self.type.value,
            "data": self.data,
            "timestamp": self.timestamp.isoformat(),
            "sequence": self.sequence,
            "metadata": self.metadata,
        }
 
    def to_sse(self) -> str:
        """Convert to Server-Sent Event format."""
        data_json = json.dumps(self.to_dict(), default=str)
        return f"id: {self.id}\nevent: {self.type.value}\ndata: {data_json}\n\n"

StreamStatus

class StreamStatus(str, Enum):
    """Status of a stream."""
    PENDING = "pending"        # Stream created, not yet active
    ACTIVE = "active"          # Currently streaming
    PAUSED = "paused"          # Temporarily paused
    COMPLETED = "completed"    # Successfully completed
    CANCELLED = "cancelled"    # Cancelled by client
    ERROR = "error"            # Terminated due to error

WebSocket Protocol

The WebSocket API in agents/websocket_api.py provides bidirectional communication for agent conversations.

Message Types

class MessageType(str, Enum):
    """Types of WebSocket messages."""
 
    # Client -> Server
    CHAT = "chat"                    # User message
    CANCEL = "cancel"                # Cancel current operation
    FEEDBACK = "feedback"            # User feedback on response
    APPROVAL = "approval"            # HITL approval decision
    PING = "ping"                    # Client keepalive
 
    # Server -> Client
    TEXT = "text"                    # Text response chunk
    TOOL_START = "tool_start"        # Tool execution started
    TOOL_RESULT = "tool_result"      # Tool execution result
    STATUS = "status"                # Status update
    ERROR = "error"                  # Error message
    APPROVAL_REQUIRED = "approval_required"  # HITL approval needed
    GUARDRAIL_WARNING = "guardrail_warning"  # Content safety warning
    COMPLETE = "complete"            # Response complete
    PONG = "pong"                    # Server keepalive response

WebSocket Message Format

@dataclass
class WebSocketMessage:
    """A WebSocket message."""
    type: MessageType
    data: dict[str, Any]
    id: str                       # Message ID for correlation
    timestamp: datetime
 
    def to_json(self) -> str:
        return json.dumps({
            "type": self.type.value,
            "data": self.data,
            "id": self.id,
            "timestamp": self.timestamp.isoformat(),
        })
 
    @classmethod
    def from_json(cls, data: str) -> "WebSocketMessage":
        parsed = json.loads(data)
        return cls(
            type=MessageType(parsed.get("type", "chat")),
            data=parsed.get("data", {}),
            id=parsed.get("id", str(uuid4())),
        )

Connection Manager

The ConnectionManager handles WebSocket lifecycle:

class ConnectionManager:
    """Manages WebSocket connections."""
 
    def __init__(self) -> None:
        self._connections: dict[str, WebSocket] = {}
        self._sessions: dict[str, str] = {}        # conn_id -> session_id
        self._user_connections: dict[str, set[str]] = {}  # user_id -> conn_ids
 
    async def connect(
        self,
        websocket: WebSocket,
        connection_id: str,
        session_id: str,
        user_id: str | None = None,
    ) -> None:
        """Accept and register a WebSocket connection."""
        await websocket.accept()
        self._connections[connection_id] = websocket
        self._sessions[connection_id] = session_id
        if user_id:
            self._user_connections.setdefault(user_id, set()).add(connection_id)
 
    async def disconnect(self, connection_id: str) -> None:
        """Remove a WebSocket connection."""
        self._connections.pop(connection_id, None)
        self._sessions.pop(connection_id, None)
 
    async def send_message(
        self,
        connection_id: str,
        message: WebSocketMessage,
    ) -> None:
        """Send a message to a specific connection."""
        ws = self._connections.get(connection_id)
        if ws:
            await ws.send_text(message.to_json())

WebSocket Endpoint

router = APIRouter(prefix="/ws", tags=["WebSocket"])
 
@router.websocket("/chat")
async def websocket_chat(
    websocket: WebSocket,
    session_id: str = Query(None),
    tenant_id: str = Query(...),
    token: str = Query(...),
):
    """WebSocket endpoint for real-time chat."""
    # Validate JWT token
    user_id = validate_token(token)
 
    connection_id = str(uuid4())
    await manager.connect(websocket, connection_id, session_id, user_id)
 
    try:
        while True:
            # Receive message from client
            raw = await websocket.receive_text()
            message = WebSocketMessage.from_json(raw)
 
            if message.type == MessageType.CHAT:
                # Process through agent orchestrator with streaming
                async for chunk in orchestrator.process_message(
                    agent_id="default",
                    session_id=session_id,
                    tenant_id=tenant_id,
                    message=message.data.get("content", ""),
                    stream=True,
                ):
                    await manager.send_message(
                        connection_id,
                        WebSocketMessage(
                            type=MessageType.TEXT,
                            data={"content": chunk.content},
                        ),
                    )
 
                # Send completion signal
                await manager.send_message(
                    connection_id,
                    WebSocketMessage(type=MessageType.COMPLETE, data={}),
                )
 
            elif message.type == MessageType.CANCEL:
                # Cancel current operation
                await cancel_stream(session_id)
 
            elif message.type == MessageType.PING:
                await manager.send_message(
                    connection_id,
                    WebSocketMessage(type=MessageType.PONG, data={}),
                )
 
    except WebSocketDisconnect:
        await manager.disconnect(connection_id)

Server-Sent Events (SSE)

SSE provides a simpler unidirectional streaming protocol for clients that do not need to send messages during streaming:

SSE Endpoint

from fastapi.responses import StreamingResponse
 
@app.post("/api/v1/conversation/stream")
async def stream_conversation(
    request: ConversationRequest,
) -> StreamingResponse:
    """Stream conversation response via SSE."""
 
    async def event_generator():
        async for chunk in orchestrator.process_message(
            agent_id=request.agent_id,
            session_id=request.session_id,
            tenant_id=request.tenant_id,
            message=request.message,
            stream=True,
        ):
            event = StreamEvent(
                id=str(uuid4()),
                type=StreamEventType.CHUNK,
                data=chunk.content,
            )
            yield event.to_sse()
 
        # Send final event
        yield StreamEvent(
            id=str(uuid4()),
            type=StreamEventType.END,
            data="",
        ).to_sse()
 
    return StreamingResponse(
        event_generator(),
        media_type="text/event-stream",
        headers={
            "Cache-Control": "no-cache",
            "Connection": "keep-alive",
            "X-Accel-Buffering": "no",
        },
    )

SSE Wire Format

id: evt-001
event: start
data: {"id":"evt-001","type":"start","data":"","timestamp":"2026-02-12T10:00:00"}

id: evt-002
event: step_start
data: {"id":"evt-002","type":"step_start","data":"Classifying intent..."}

id: evt-003
event: chunk
data: {"id":"evt-003","type":"chunk","data":"Based on your data,"}

id: evt-004
event: chunk
data: {"id":"evt-004","type":"chunk","data":" total revenue for Q4"}

id: evt-005
event: tool_call
data: {"id":"evt-005","type":"tool_call","data":{"name":"execute_sql","args":{}}}

id: evt-006
event: tool_result
data: {"id":"evt-006","type":"tool_result","data":{"rows":100}}

id: evt-007
event: end
data: {"id":"evt-007","type":"end","data":"","metadata":{"tokens":245}}

StreamChunk Protocol

The orchestrator produces StreamChunk objects during streaming:

@dataclass
class StreamChunk:
    """Streaming chunk from agent."""
    type: str         # text, tool_call, tool_result, status, error
    content: str = ""
    data: dict[str, Any] = field(default_factory=dict)
    is_final: bool = False
Chunk TypeContentWhen Emitted
textToken or text fragmentDuring LLM generation
tool_callTool name and argumentsWhen agent calls a tool
tool_resultTool execution outputAfter tool completes
statusStatus messageDuring processing steps
errorError descriptionOn processing error

Backpressure Handling

The streaming subsystem implements backpressure to prevent overwhelming slow clients:

class BackpressureManager:
    """Manages backpressure for stream consumers."""
 
    def __init__(self, max_buffer_size: int = 100):
        self._buffer: asyncio.Queue = asyncio.Queue(maxsize=max_buffer_size)
        self._paused: bool = False
 
    async def put(self, event: StreamEvent) -> None:
        """Put event into buffer, pausing if full."""
        try:
            self._buffer.put_nowait(event)
        except asyncio.QueueFull:
            self._paused = True
            await self._buffer.put(event)  # Block until space
            self._paused = False
 
    async def get(self) -> StreamEvent:
        """Get next event from buffer."""
        return await self._buffer.get()

Cancellation Support

Clients can cancel in-flight requests through both WebSocket and HTTP:

WebSocket Cancellation

# Client sends cancel message
{"type": "cancel", "data": {"session_id": "sess-123"}, "id": "msg-456"}
 
# Server handles cancellation
elif message.type == MessageType.CANCEL:
    await cancel_stream(session_id)
    await manager.send_message(
        connection_id,
        WebSocketMessage(
            type=MessageType.STATUS,
            data={"status": "cancelled"},
        ),
    )

HTTP Cancellation

DELETE /api/v1/conversation/stream/{session_id}

Heartbeat and Connection Health

The WebSocket subsystem implements heartbeat for connection health monitoring:

ParameterValue
Client ping interval30 seconds
Server pong timeout10 seconds
Idle connection timeout5 minutes
Max connections per user5
# Heartbeat task
async def heartbeat_loop(connection_id: str):
    while connection_id in manager._connections:
        await asyncio.sleep(30)
        await manager.send_message(
            connection_id,
            WebSocketMessage(type=MessageType.PONG, data={}),
        )

Client Integration

JavaScript WebSocket Client

const ws = new WebSocket(
    `wss://api.example.com/ws/chat?tenant_id=acme&session_id=sess-123&token=jwt-token`
);
 
ws.onmessage = (event) => {
    const msg = JSON.parse(event.data);
 
    switch (msg.type) {
        case "text":
            appendToResponse(msg.data.content);
            break;
        case "tool_start":
            showToolIndicator(msg.data.tool_name);
            break;
        case "tool_result":
            hideToolIndicator();
            break;
        case "complete":
            finalizeResponse();
            break;
        case "error":
            showError(msg.data.message);
            break;
    }
};
 
// Send user message
ws.send(JSON.stringify({
    type: "chat",
    data: { content: "Show me total sales by region" },
    id: crypto.randomUUID(),
}));

JavaScript SSE Client

const response = await fetch("/api/v1/conversation/stream", {
    method: "POST",
    headers: { "Content-Type": "application/json" },
    body: JSON.stringify({
        session_id: "sess-123",
        tenant_id: "acme",
        message: "Show me total sales",
    }),
});
 
const reader = response.body.getReader();
const decoder = new TextDecoder();
 
while (true) {
    const { done, value } = await reader.read();
    if (done) break;
 
    const text = decoder.decode(value);
    // Parse SSE events from text
    parseSSEEvents(text).forEach(handleEvent);
}