/** * SSE (Server-Sent Events) Client Module * * Provides real-time server push capabilities for LiveComponents. * * Features: * - Auto-reconnect with exponential backoff * - Multi-channel subscription * - Event type routing * - Heartbeat monitoring * - Connection state management * * @module sse */ /** * Connection states */ const ConnectionState = { DISCONNECTED: 'disconnected', CONNECTING: 'connecting', CONNECTED: 'connected', ERROR: 'error' }; /** * Reconnection strategy with exponential backoff */ class ReconnectionStrategy { constructor() { this.attempt = 0; this.baseDelay = 1000; // 1s this.maxDelay = 30000; // 30s this.maxAttempts = 10; } /** * Get delay for current attempt with jitter */ getDelay() { const delay = Math.min( this.baseDelay * Math.pow(2, this.attempt), this.maxDelay ); // Add jitter (±25%) const jitter = delay * 0.25 * (Math.random() - 0.5); return Math.floor(delay + jitter); } /** * Check if should retry */ shouldRetry() { return this.attempt < this.maxAttempts; } /** * Record a reconnection attempt */ recordAttempt() { this.attempt++; } /** * Reset strategy (connection successful) */ reset() { this.attempt = 0; } } /** * SSE Client * * Manages Server-Sent Events connection with auto-reconnect. */ export class SseClient { /** * @param {string[]} channels - Channels to subscribe to * @param {object} options - Configuration options */ constructor(channels = [], options = {}) { this.channels = channels; this.options = { autoReconnect: true, heartbeatTimeout: 45000, // 45s (server sends every 30s) ...options }; this.eventSource = null; this.state = ConnectionState.DISCONNECTED; this.reconnectionStrategy = new ReconnectionStrategy(); this.reconnectTimer = null; this.heartbeatTimer = null; this.lastHeartbeat = null; this.eventHandlers = new Map(); // eventType => Set this.stateChangeHandlers = new Set(); this.connectionId = null; } /** * Connect to SSE stream */ connect() { if (this.state === ConnectionState.CONNECTING || this.state === ConnectionState.CONNECTED) { console.warn('[SSE] Already connecting or connected'); return; } this.setState(ConnectionState.CONNECTING); const url = this.buildUrl(); try { this.eventSource = new EventSource(url); // Connection opened this.eventSource.addEventListener('open', () => { console.log('[SSE] Connection opened'); this.setState(ConnectionState.CONNECTED); this.reconnectionStrategy.reset(); this.startHeartbeatMonitoring(); }); // Connection error this.eventSource.addEventListener('error', (e) => { console.error('[SSE] Connection error', e); if (this.eventSource.readyState === EventSource.CLOSED) { this.handleDisconnect(); } }); // Initial connection confirmation this.eventSource.addEventListener('connected', (e) => { const data = JSON.parse(e.data); this.connectionId = data.connection_id; console.log('[SSE] Connected with ID:', this.connectionId); }); // Heartbeat event this.eventSource.addEventListener('heartbeat', (e) => { this.lastHeartbeat = Date.now(); }); // Disconnected event this.eventSource.addEventListener('disconnected', () => { console.log('[SSE] Server initiated disconnect'); this.disconnect(); }); // Error events this.eventSource.addEventListener('error', (e) => { const data = JSON.parse(e.data); console.error('[SSE] Server error:', data); }); } catch (error) { console.error('[SSE] Failed to create EventSource', error); this.setState(ConnectionState.ERROR); this.scheduleReconnect(); } } /** * Disconnect from SSE stream */ disconnect() { if (this.eventSource) { this.eventSource.close(); this.eventSource = null; } this.stopHeartbeatMonitoring(); this.clearReconnectTimer(); this.setState(ConnectionState.DISCONNECTED); this.connectionId = null; } /** * Handle disconnection */ handleDisconnect() { this.setState(ConnectionState.ERROR); this.stopHeartbeatMonitoring(); if (this.options.autoReconnect && this.reconnectionStrategy.shouldRetry()) { this.scheduleReconnect(); } else { this.setState(ConnectionState.DISCONNECTED); } } /** * Schedule reconnection attempt */ scheduleReconnect() { const delay = this.reconnectionStrategy.getDelay(); console.log(`[SSE] Reconnecting in ${delay}ms (attempt ${this.reconnectionStrategy.attempt + 1})`); this.reconnectTimer = setTimeout(() => { this.reconnectionStrategy.recordAttempt(); this.connect(); }, delay); } /** * Clear reconnect timer */ clearReconnectTimer() { if (this.reconnectTimer) { clearTimeout(this.reconnectTimer); this.reconnectTimer = null; } } /** * Start heartbeat monitoring */ startHeartbeatMonitoring() { this.lastHeartbeat = Date.now(); this.heartbeatTimer = setInterval(() => { const timeSinceHeartbeat = Date.now() - this.lastHeartbeat; if (timeSinceHeartbeat > this.options.heartbeatTimeout) { console.warn('[SSE] Heartbeat timeout - connection appears dead'); this.handleDisconnect(); } }, 5000); // Check every 5s } /** * Stop heartbeat monitoring */ stopHeartbeatMonitoring() { if (this.heartbeatTimer) { clearInterval(this.heartbeatTimer); this.heartbeatTimer = null; } } /** * Build SSE stream URL with channels */ buildUrl() { const baseUrl = '/sse/stream'; const channelParam = this.channels.join(','); return `${baseUrl}?channels=${encodeURIComponent(channelParam)}`; } /** * Register event handler * * @param {string} eventType - Event type to listen for * @param {function} handler - Event handler function */ on(eventType, handler) { if (!this.eventHandlers.has(eventType)) { this.eventHandlers.set(eventType, new Set()); // Register EventSource listener if (this.eventSource) { this.eventSource.addEventListener(eventType, (e) => { this.handleEvent(eventType, e); }); } } this.eventHandlers.get(eventType).add(handler); return () => this.off(eventType, handler); } /** * Unregister event handler */ off(eventType, handler) { const handlers = this.eventHandlers.get(eventType); if (handlers) { handlers.delete(handler); } } /** * Handle incoming event */ handleEvent(eventType, event) { const handlers = this.eventHandlers.get(eventType); if (!handlers || handlers.size === 0) return; let data; try { data = JSON.parse(event.data); } catch (e) { console.error('[SSE] Failed to parse event data', e); return; } handlers.forEach(handler => { try { handler(data, event); } catch (e) { console.error('[SSE] Event handler error', e); } }); } /** * Set connection state */ setState(newState) { const oldState = this.state; this.state = newState; if (oldState !== newState) { this.stateChangeHandlers.forEach(handler => { try { handler(newState, oldState); } catch (e) { console.error('[SSE] State change handler error', e); } }); } } /** * Register state change handler */ onStateChange(handler) { this.stateChangeHandlers.add(handler); return () => this.stateChangeHandlers.delete(handler); } /** * Get current connection state */ getState() { return this.state; } /** * Check if connected */ isConnected() { return this.state === ConnectionState.CONNECTED; } /** * Get connection ID */ getConnectionId() { return this.connectionId; } /** * Subscribe to additional channels (requires reconnect) */ addChannels(...newChannels) { const added = newChannels.filter(ch => !this.channels.includes(ch)); if (added.length > 0) { this.channels.push(...added); if (this.isConnected()) { console.log('[SSE] Reconnecting with new channels:', added); this.disconnect(); this.connect(); } } } /** * Unsubscribe from channels (requires reconnect) */ removeChannels(...channelsToRemove) { const before = this.channels.length; this.channels = this.channels.filter(ch => !channelsToRemove.includes(ch)); if (this.channels.length !== before && this.isConnected()) { console.log('[SSE] Reconnecting with removed channels'); this.disconnect(); this.connect(); } } } /** * Global SSE client instance */ let globalSseClient = null; /** * Get or create global SSE client */ export function getGlobalSseClient(channels = []) { if (!globalSseClient) { globalSseClient = new SseClient(channels); } return globalSseClient; } /** * Initialize global SSE client and connect */ export function initSse(channels = [], autoConnect = true) { globalSseClient = new SseClient(channels); if (autoConnect) { globalSseClient.connect(); } return globalSseClient; } export default { SseClient, getGlobalSseClient, initSse, ConnectionState };