MATIH Platform is in active MVP development. Documentation reflects current implementation status.
15. Workbench Architecture
Realtime

Real-Time Collaboration

The MATIH frontend provides real-time capabilities through a WebSocket provider with auto-reconnect and message queuing, Server-Sent Events for streaming AI responses, presence tracking for collaborative editing, and a notification center for cross-platform event delivery. This section covers the complete real-time infrastructure at frontend/shared/src/realtime/, its integration with workbench components, and the patterns for building real-time features.


Real-Time Architecture

frontend/shared/src/realtime/
  WebSocketProvider.tsx     # WebSocket connection management
  EventSource.tsx           # Server-Sent Events for streaming
  LiveUpdates.tsx           # Live data update components
  Presence.tsx              # User presence tracking
  NotificationCenter.tsx    # Real-time notification delivery
  queryStatus.ts            # Query execution status tracking
  types.ts                  # TypeScript interfaces
  index.ts                  # Barrel exports

Technology Selection

MechanismUse CaseDirectionProtocol
WebSocketBidirectional real-timeClient to Server (bidirectional)ws:// / wss://
Server-Sent EventsStreaming AI responsesServer to ClientHTTP SSE
BroadcastChannelCross-tab synchronizationTab to Tab (bidirectional)Browser API
PollingFallback for WebSocket failureClient to ServerHTTP

WebSocket Provider

The WebSocketProvider at frontend/shared/src/realtime/WebSocketProvider.tsx manages WebSocket connections with auto-reconnect, message queuing, and channel-based subscriptions.

Configuration

interface WebSocketConfig {
  url: string;                            // WebSocket server URL
  protocols?: string | string[];          // Sub-protocols
  autoReconnect?: boolean;                // Auto-reconnect on disconnect (default: true)
  maxReconnectAttempts?: number;          // Max reconnection attempts (default: 10)
  reconnectDelay?: number;               // Initial reconnect delay in ms (default: 1000)
  reconnectDelayMultiplier?: number;      // Backoff multiplier (default: 1.5)
  maxReconnectDelay?: number;             // Maximum backoff delay in ms (default: 30000)
  pingInterval?: number;                  // Heartbeat interval in ms (default: 30000)
  queueSize?: number;                     // Max queued messages when disconnected (default: 100)
  authToken?: string;                     // Authentication token for connection
  headers?: Record<string, string>;       // Additional headers as query params
  onOpen?: () => void;                    // Connection established callback
  onClose?: (event: CloseEvent) => void;  // Connection closed callback
  onError?: (event: Event) => void;       // Connection error callback
  onMessage?: (message: WebSocketMessage) => void; // Global message handler
}

Default Configuration

ParameterDefaultDescription
autoReconnecttrueAutomatically attempt reconnection
maxReconnectAttempts10Give up after 10 failed attempts
reconnectDelay1,000 msInitial delay between reconnect attempts
reconnectDelayMultiplier1.5Exponential backoff factor
maxReconnectDelay30,000 msMaximum delay cap
pingInterval30,000 msSend ping every 30 seconds
queueSize100Buffer up to 100 messages while disconnected

Connection Lifecycle

              connect()
                 |
                 v
          [CONNECTING]
                 |
          WebSocket.onopen
                 |
                 v
          [CONNECTED] <-----+
                 |           |
            onclose      reconnect()
                 |           |
                 v           |
         [DISCONNECTED]      |
                 |           |
      autoReconnect?         |
                 |           |
                 v           |
         [RECONNECTING] ----+
                 |
          attempts > max?
                 |
                 v
         [DISCONNECTED] (final)

Reconnection Backoff Schedule

With default configuration (1000ms base, 1.5x multiplier, 30000ms max):

AttemptDelayCumulative
11,000 ms1,000 ms
21,500 ms2,500 ms
32,250 ms4,750 ms
43,375 ms8,125 ms
55,063 ms13,188 ms
67,594 ms20,782 ms
711,391 ms32,173 ms
817,086 ms49,259 ms
925,629 ms74,888 ms
1030,000 ms (capped)104,888 ms

Message Format

All WebSocket messages use a consistent JSON envelope:

interface WebSocketMessage<T = unknown> {
  type: string;              // Message type (e.g., 'query.update', 'notification')
  channel?: string;          // Subscription channel
  payload?: T;               // Message payload
  id?: string;               // Unique message identifier
  timestamp?: number;        // Unix timestamp
}

Channel Subscriptions

Components subscribe to specific channels to receive targeted messages:

import { useWebSocketChannel } from '@matih/shared/realtime';
 
function QueryStatusPanel({ queryId }: { queryId: string }) {
  const [status, setStatus] = useState<QueryStatus>('pending');
 
  useWebSocketChannel<QueryStatusUpdate>(
    `query.${queryId}`,
    (data) => {
      setStatus(data.status);
    },
    [queryId]
  );
 
  return <StatusBadge status={status} />;
}

When a component subscribes to a channel, the provider automatically sends a subscription message to the server. When all subscribers for a channel have unsubscribed, an unsubscribe message is sent.

Message Queuing

When the WebSocket is disconnected, messages are queued in memory up to the configured queueSize. Upon reconnection, queued messages are flushed in order:

Disconnected state:
  send(msg1) -> [Queue: msg1]
  send(msg2) -> [Queue: msg1, msg2]
  send(msg3) -> [Queue: msg1, msg2, msg3]

Reconnect:
  onopen -> flush queue
  WebSocket.send(msg1)
  WebSocket.send(msg2)
  WebSocket.send(msg3)
  [Queue: empty]

Heartbeat

The provider sends periodic ping messages to keep the connection alive and detect stale connections:

// Sent every pingInterval (default 30s)
{
  "type": "ping",
  "timestamp": 1707580800000
}
 
// Expected response from server
{
  "type": "pong",
  "timestamp": 1707580800050
}

Pong messages are silently consumed and not dispatched to subscribers.


Provider Setup

The WebSocketProvider wraps the application at the root level:

import { WebSocketProvider } from '@matih/shared/realtime';
 
function App() {
  return (
    <WebSocketProvider
      config={{
        url: `${import.meta.env.VITE_WS_URL}/ws`,
        authToken: authContext.token,
        headers: {
          'X-Tenant-ID': tenantId,
        },
        onOpen: () => console.log('WebSocket connected'),
        onClose: () => console.log('WebSocket disconnected'),
        onError: (error) => telemetry.captureError('WebSocket error', error),
      }}
    >
      <RouterProvider router={router} />
    </WebSocketProvider>
  );
}

Hooks API

The real-time module exports three hooks:

HookPurposeReturns
useWebSocket()Full WebSocket context{ status, send, subscribe, unsubscribe, connect, disconnect, reconnect }
useWebSocketChannel<T>(channel, handler, deps)Subscribe to a channelAutomatic cleanup on unmount
useWebSocketStatus()Connection status onlyWebSocketStatus ('connected'

Server-Sent Events

The EventSource component provides SSE-based streaming for AI response generation, where the server sends a continuous stream of tokens.

Streaming Pattern

// Using SSE for AI response streaming
import { useEventSource } from '@matih/shared/realtime';
 
function StreamingResponse({ conversationId }: { conversationId: string }) {
  const { data, error, status } = useEventSource(
    `/api/ai/v1/chat/stream?conversationId=${conversationId}`,
    {
      onToken: (token) => appendToDisplay(token),
      onComplete: (response) => finalizeMessage(response),
      onError: (error) => showError(error),
    }
  );
 
  return (
    <div>
      {status === 'streaming' && <StreamingIndicator />}
      <div className="response-content">{data}</div>
    </div>
  );
}

SSE vs WebSocket for Streaming

AspectSSEWebSocket
DirectionServer to client onlyBidirectional
ProtocolHTTP/1.1 or HTTP/2Dedicated protocol
ReconnectionBuilt-in auto-reconnectCustom implementation
Use caseAI response streamingGeneral real-time communication
Browser supportUniversalUniversal
Proxy compatibilityExcellent (HTTP)Sometimes problematic

The platform uses SSE for AI response streaming because it is unidirectional, works well with HTTP proxies and load balancers, and has built-in reconnection semantics.


Presence Tracking

The Presence component tracks which users are currently viewing or editing shared resources:

Presence Display

Dashboard: Q4 Revenue Analysis
  Currently viewing: [Avatar: Alice] [Avatar: Bob] [Avatar: Carol] +2 more
  Editing: [Avatar: Alice] (filter panel)

Presence Protocol

// Join presence
send({
  type: 'presence.join',
  channel: `dashboard.${dashboardId}`,
  payload: {
    userId: currentUser.id,
    name: currentUser.name,
    avatar: currentUser.avatar,
    activity: 'viewing',
  },
});
 
// Update activity
send({
  type: 'presence.update',
  channel: `dashboard.${dashboardId}`,
  payload: {
    activity: 'editing',
    component: 'filter-panel',
  },
});
 
// Leave (sent automatically on component unmount)
send({
  type: 'presence.leave',
  channel: `dashboard.${dashboardId}`,
});

Live Updates

The LiveUpdates component provides automatic data refresh for components that need to stay current:

Update TypeMechanismUse Case
Push updateWebSocket messageDashboard widget data, query results
Poll refreshConfigurable intervalMetrics, status panels
Event-triggeredSpecific event subscriptionPipeline completion, model deployment

Live Dashboard Widget

import { useLiveData } from '@matih/shared/realtime';
 
function RevenueWidget({ widgetId }: { widgetId: string }) {
  const { data, lastUpdated } = useLiveData({
    channel: `widget.${widgetId}`,
    fetchFn: () => api.get(`/widgets/${widgetId}/data`),
    pollInterval: 60000,       // Fallback poll every 60s
    enableWebSocket: true,     // Prefer WebSocket push
  });
 
  return (
    <div>
      <MetricDisplay value={data?.revenue} />
      <span className="last-updated">Updated {formatRelative(lastUpdated)}</span>
    </div>
  );
}

Notification Center

The NotificationCenter in the real-time module handles incoming notifications from the WebSocket:

Notification Types

TypeSourceExample
Query completionQuery Engine"Your query completed in 3.2s (45,231 rows)"
Pipeline statusPipeline Service"Pipeline customer_enrichment completed successfully"
Model deploymentML Service"Model churn_predictor_v3 deployed to production"
Approval requestAI Service"Action requires approval: Deploy pipeline"
Data quality alertData Quality Service"Quality check failed: orders.amount null rate > 5%"
CollaborationBI Service"Alice shared dashboard 'Q4 Revenue' with you"
System alertPlatform"Scheduled maintenance in 30 minutes"

Notification Flow

Backend Service emits event to Kafka
    |
    v
Notification Service consumes event
    |
    v
Notification Service pushes via WebSocket
    |
    v
WebSocketProvider receives message
    |
    v
NotificationCenter component updates:
  - Badge count incremented
  - Toast displayed (if enabled)
  - Sound played (if enabled)
  - Notification added to list

Query Status Tracking

The queryStatus.ts module tracks the execution status of long-running queries via WebSocket:

Query Status States

StatusDescriptionUI Treatment
pendingQuery submitted, waiting in queueSpinner with position in queue
runningQuery executingProgress bar with elapsed time
completedQuery finished successfullyResult count and execution time
failedQuery execution failedError message with details
cancelledQuery cancelled by userCancellation confirmation
timeoutQuery exceeded time limitTimeout message with retry option

Connection Status Indicator

The shared useConnectionStatus hook provides application-wide connectivity awareness:

function ConnectionIndicator() {
  const wsStatus = useWebSocketStatus();
 
  const statusConfig = {
    connected: { label: 'Connected', color: 'green' },
    connecting: { label: 'Connecting...', color: 'yellow' },
    reconnecting: { label: 'Reconnecting...', color: 'yellow' },
    disconnected: { label: 'Disconnected', color: 'red' },
    error: { label: 'Connection Error', color: 'red' },
  };
 
  const config = statusConfig[wsStatus];
 
  return (
    <div className="connection-indicator">
      <span className={`dot dot-${config.color}`} />
      <span>{config.label}</span>
    </div>
  );
}

Key Source Files

ComponentLocation
WebSocket providerfrontend/shared/src/realtime/WebSocketProvider.tsx
EventSource (SSE)frontend/shared/src/realtime/EventSource.tsx
Live updatesfrontend/shared/src/realtime/LiveUpdates.tsx
Presence trackingfrontend/shared/src/realtime/Presence.tsx
Notification centerfrontend/shared/src/realtime/NotificationCenter.tsx
Query statusfrontend/shared/src/realtime/queryStatus.ts
Type definitionsfrontend/shared/src/realtime/types.ts
Barrel exportfrontend/shared/src/realtime/index.ts
Connection status hookfrontend/shared/src/hooks/useConnectionStatus.tsx
Offline detection hookfrontend/shared/src/hooks/useOfflineDetection.ts