Real-Time Collaboration
The MATIH frontend provides real-time capabilities through a WebSocket provider with auto-reconnect and message queuing, Server-Sent Events for streaming AI responses, presence tracking for collaborative editing, and a notification center for cross-platform event delivery. This section covers the complete real-time infrastructure at frontend/shared/src/realtime/, its integration with workbench components, and the patterns for building real-time features.
Real-Time Architecture
frontend/shared/src/realtime/
WebSocketProvider.tsx # WebSocket connection management
EventSource.tsx # Server-Sent Events for streaming
LiveUpdates.tsx # Live data update components
Presence.tsx # User presence tracking
NotificationCenter.tsx # Real-time notification delivery
queryStatus.ts # Query execution status tracking
types.ts # TypeScript interfaces
index.ts # Barrel exportsTechnology Selection
| Mechanism | Use Case | Direction | Protocol |
|---|---|---|---|
| WebSocket | Bidirectional real-time | Client to Server (bidirectional) | ws:// / wss:// |
| Server-Sent Events | Streaming AI responses | Server to Client | HTTP SSE |
| BroadcastChannel | Cross-tab synchronization | Tab to Tab (bidirectional) | Browser API |
| Polling | Fallback for WebSocket failure | Client to Server | HTTP |
WebSocket Provider
The WebSocketProvider at frontend/shared/src/realtime/WebSocketProvider.tsx manages WebSocket connections with auto-reconnect, message queuing, and channel-based subscriptions.
Configuration
interface WebSocketConfig {
url: string; // WebSocket server URL
protocols?: string | string[]; // Sub-protocols
autoReconnect?: boolean; // Auto-reconnect on disconnect (default: true)
maxReconnectAttempts?: number; // Max reconnection attempts (default: 10)
reconnectDelay?: number; // Initial reconnect delay in ms (default: 1000)
reconnectDelayMultiplier?: number; // Backoff multiplier (default: 1.5)
maxReconnectDelay?: number; // Maximum backoff delay in ms (default: 30000)
pingInterval?: number; // Heartbeat interval in ms (default: 30000)
queueSize?: number; // Max queued messages when disconnected (default: 100)
authToken?: string; // Authentication token for connection
headers?: Record<string, string>; // Additional headers as query params
onOpen?: () => void; // Connection established callback
onClose?: (event: CloseEvent) => void; // Connection closed callback
onError?: (event: Event) => void; // Connection error callback
onMessage?: (message: WebSocketMessage) => void; // Global message handler
}Default Configuration
| Parameter | Default | Description |
|---|---|---|
autoReconnect | true | Automatically attempt reconnection |
maxReconnectAttempts | 10 | Give up after 10 failed attempts |
reconnectDelay | 1,000 ms | Initial delay between reconnect attempts |
reconnectDelayMultiplier | 1.5 | Exponential backoff factor |
maxReconnectDelay | 30,000 ms | Maximum delay cap |
pingInterval | 30,000 ms | Send ping every 30 seconds |
queueSize | 100 | Buffer up to 100 messages while disconnected |
Connection Lifecycle
connect()
|
v
[CONNECTING]
|
WebSocket.onopen
|
v
[CONNECTED] <-----+
| |
onclose reconnect()
| |
v |
[DISCONNECTED] |
| |
autoReconnect? |
| |
v |
[RECONNECTING] ----+
|
attempts > max?
|
v
[DISCONNECTED] (final)Reconnection Backoff Schedule
With default configuration (1000ms base, 1.5x multiplier, 30000ms max):
| Attempt | Delay | Cumulative |
|---|---|---|
| 1 | 1,000 ms | 1,000 ms |
| 2 | 1,500 ms | 2,500 ms |
| 3 | 2,250 ms | 4,750 ms |
| 4 | 3,375 ms | 8,125 ms |
| 5 | 5,063 ms | 13,188 ms |
| 6 | 7,594 ms | 20,782 ms |
| 7 | 11,391 ms | 32,173 ms |
| 8 | 17,086 ms | 49,259 ms |
| 9 | 25,629 ms | 74,888 ms |
| 10 | 30,000 ms (capped) | 104,888 ms |
Message Format
All WebSocket messages use a consistent JSON envelope:
interface WebSocketMessage<T = unknown> {
type: string; // Message type (e.g., 'query.update', 'notification')
channel?: string; // Subscription channel
payload?: T; // Message payload
id?: string; // Unique message identifier
timestamp?: number; // Unix timestamp
}Channel Subscriptions
Components subscribe to specific channels to receive targeted messages:
import { useWebSocketChannel } from '@matih/shared/realtime';
function QueryStatusPanel({ queryId }: { queryId: string }) {
const [status, setStatus] = useState<QueryStatus>('pending');
useWebSocketChannel<QueryStatusUpdate>(
`query.${queryId}`,
(data) => {
setStatus(data.status);
},
[queryId]
);
return <StatusBadge status={status} />;
}When a component subscribes to a channel, the provider automatically sends a subscription message to the server. When all subscribers for a channel have unsubscribed, an unsubscribe message is sent.
Message Queuing
When the WebSocket is disconnected, messages are queued in memory up to the configured queueSize. Upon reconnection, queued messages are flushed in order:
Disconnected state:
send(msg1) -> [Queue: msg1]
send(msg2) -> [Queue: msg1, msg2]
send(msg3) -> [Queue: msg1, msg2, msg3]
Reconnect:
onopen -> flush queue
WebSocket.send(msg1)
WebSocket.send(msg2)
WebSocket.send(msg3)
[Queue: empty]Heartbeat
The provider sends periodic ping messages to keep the connection alive and detect stale connections:
// Sent every pingInterval (default 30s)
{
"type": "ping",
"timestamp": 1707580800000
}
// Expected response from server
{
"type": "pong",
"timestamp": 1707580800050
}Pong messages are silently consumed and not dispatched to subscribers.
Provider Setup
The WebSocketProvider wraps the application at the root level:
import { WebSocketProvider } from '@matih/shared/realtime';
function App() {
return (
<WebSocketProvider
config={{
url: `${import.meta.env.VITE_WS_URL}/ws`,
authToken: authContext.token,
headers: {
'X-Tenant-ID': tenantId,
},
onOpen: () => console.log('WebSocket connected'),
onClose: () => console.log('WebSocket disconnected'),
onError: (error) => telemetry.captureError('WebSocket error', error),
}}
>
<RouterProvider router={router} />
</WebSocketProvider>
);
}Hooks API
The real-time module exports three hooks:
| Hook | Purpose | Returns |
|---|---|---|
useWebSocket() | Full WebSocket context | { status, send, subscribe, unsubscribe, connect, disconnect, reconnect } |
useWebSocketChannel<T>(channel, handler, deps) | Subscribe to a channel | Automatic cleanup on unmount |
useWebSocketStatus() | Connection status only | WebSocketStatus ('connected' |
Server-Sent Events
The EventSource component provides SSE-based streaming for AI response generation, where the server sends a continuous stream of tokens.
Streaming Pattern
// Using SSE for AI response streaming
import { useEventSource } from '@matih/shared/realtime';
function StreamingResponse({ conversationId }: { conversationId: string }) {
const { data, error, status } = useEventSource(
`/api/ai/v1/chat/stream?conversationId=${conversationId}`,
{
onToken: (token) => appendToDisplay(token),
onComplete: (response) => finalizeMessage(response),
onError: (error) => showError(error),
}
);
return (
<div>
{status === 'streaming' && <StreamingIndicator />}
<div className="response-content">{data}</div>
</div>
);
}SSE vs WebSocket for Streaming
| Aspect | SSE | WebSocket |
|---|---|---|
| Direction | Server to client only | Bidirectional |
| Protocol | HTTP/1.1 or HTTP/2 | Dedicated protocol |
| Reconnection | Built-in auto-reconnect | Custom implementation |
| Use case | AI response streaming | General real-time communication |
| Browser support | Universal | Universal |
| Proxy compatibility | Excellent (HTTP) | Sometimes problematic |
The platform uses SSE for AI response streaming because it is unidirectional, works well with HTTP proxies and load balancers, and has built-in reconnection semantics.
Presence Tracking
The Presence component tracks which users are currently viewing or editing shared resources:
Presence Display
Dashboard: Q4 Revenue Analysis
Currently viewing: [Avatar: Alice] [Avatar: Bob] [Avatar: Carol] +2 more
Editing: [Avatar: Alice] (filter panel)Presence Protocol
// Join presence
send({
type: 'presence.join',
channel: `dashboard.${dashboardId}`,
payload: {
userId: currentUser.id,
name: currentUser.name,
avatar: currentUser.avatar,
activity: 'viewing',
},
});
// Update activity
send({
type: 'presence.update',
channel: `dashboard.${dashboardId}`,
payload: {
activity: 'editing',
component: 'filter-panel',
},
});
// Leave (sent automatically on component unmount)
send({
type: 'presence.leave',
channel: `dashboard.${dashboardId}`,
});Live Updates
The LiveUpdates component provides automatic data refresh for components that need to stay current:
| Update Type | Mechanism | Use Case |
|---|---|---|
| Push update | WebSocket message | Dashboard widget data, query results |
| Poll refresh | Configurable interval | Metrics, status panels |
| Event-triggered | Specific event subscription | Pipeline completion, model deployment |
Live Dashboard Widget
import { useLiveData } from '@matih/shared/realtime';
function RevenueWidget({ widgetId }: { widgetId: string }) {
const { data, lastUpdated } = useLiveData({
channel: `widget.${widgetId}`,
fetchFn: () => api.get(`/widgets/${widgetId}/data`),
pollInterval: 60000, // Fallback poll every 60s
enableWebSocket: true, // Prefer WebSocket push
});
return (
<div>
<MetricDisplay value={data?.revenue} />
<span className="last-updated">Updated {formatRelative(lastUpdated)}</span>
</div>
);
}Notification Center
The NotificationCenter in the real-time module handles incoming notifications from the WebSocket:
Notification Types
| Type | Source | Example |
|---|---|---|
| Query completion | Query Engine | "Your query completed in 3.2s (45,231 rows)" |
| Pipeline status | Pipeline Service | "Pipeline customer_enrichment completed successfully" |
| Model deployment | ML Service | "Model churn_predictor_v3 deployed to production" |
| Approval request | AI Service | "Action requires approval: Deploy pipeline" |
| Data quality alert | Data Quality Service | "Quality check failed: orders.amount null rate > 5%" |
| Collaboration | BI Service | "Alice shared dashboard 'Q4 Revenue' with you" |
| System alert | Platform | "Scheduled maintenance in 30 minutes" |
Notification Flow
Backend Service emits event to Kafka
|
v
Notification Service consumes event
|
v
Notification Service pushes via WebSocket
|
v
WebSocketProvider receives message
|
v
NotificationCenter component updates:
- Badge count incremented
- Toast displayed (if enabled)
- Sound played (if enabled)
- Notification added to listQuery Status Tracking
The queryStatus.ts module tracks the execution status of long-running queries via WebSocket:
Query Status States
| Status | Description | UI Treatment |
|---|---|---|
pending | Query submitted, waiting in queue | Spinner with position in queue |
running | Query executing | Progress bar with elapsed time |
completed | Query finished successfully | Result count and execution time |
failed | Query execution failed | Error message with details |
cancelled | Query cancelled by user | Cancellation confirmation |
timeout | Query exceeded time limit | Timeout message with retry option |
Connection Status Indicator
The shared useConnectionStatus hook provides application-wide connectivity awareness:
function ConnectionIndicator() {
const wsStatus = useWebSocketStatus();
const statusConfig = {
connected: { label: 'Connected', color: 'green' },
connecting: { label: 'Connecting...', color: 'yellow' },
reconnecting: { label: 'Reconnecting...', color: 'yellow' },
disconnected: { label: 'Disconnected', color: 'red' },
error: { label: 'Connection Error', color: 'red' },
};
const config = statusConfig[wsStatus];
return (
<div className="connection-indicator">
<span className={`dot dot-${config.color}`} />
<span>{config.label}</span>
</div>
);
}Key Source Files
| Component | Location |
|---|---|
| WebSocket provider | frontend/shared/src/realtime/WebSocketProvider.tsx |
| EventSource (SSE) | frontend/shared/src/realtime/EventSource.tsx |
| Live updates | frontend/shared/src/realtime/LiveUpdates.tsx |
| Presence tracking | frontend/shared/src/realtime/Presence.tsx |
| Notification center | frontend/shared/src/realtime/NotificationCenter.tsx |
| Query status | frontend/shared/src/realtime/queryStatus.ts |
| Type definitions | frontend/shared/src/realtime/types.ts |
| Barrel export | frontend/shared/src/realtime/index.ts |
| Connection status hook | frontend/shared/src/hooks/useConnectionStatus.tsx |
| Offline detection hook | frontend/shared/src/hooks/useOfflineDetection.ts |