# SSE System Documentation **Server-Sent Events (SSE) Integration** für das Custom PHP Framework - Technische Dokumentation. ## Übersicht Das SSE-System ermöglicht **Echtzeit-Updates von Server zu Client** über HTTP-basierte unidirektionale Streams. LiveComponents können automatisch Updates empfangen und sich selbst neu rendern, ohne dass JavaScript-Code geschrieben werden muss. ### Kernfeatures - ✅ **Auto-Connection**: Components mit `getSseChannel()` verbinden sich automatisch - ✅ **Channel-Based Routing**: Typed channels für verschiedene Update-Typen - ✅ **Event Broadcasting**: EventDispatcher Integration für nahtlose Updates - ✅ **Automatic Reconnection**: Exponential Backoff mit Jitter (1s → 30s max) - ✅ **Heartbeat Monitoring**: 45s Client-Timeout-Erkennung - ✅ **DOM Patching**: Fragment-basierte partielle Rendering - ✅ **Connection Pooling**: Effizientes Teilen von Connections zwischen Components ## Architektur ``` ┌─────────────────────────────────────────────────────────────┐ │ Browser (Frontend) │ ├─────────────────────────────────────────────────────────────┤ │ LiveComponentManager │ │ ├─ setupSseConnection() - Auto-detect data-sse-channel │ │ ├─ handleSseComponentUpdate() - Full component refresh │ │ └─ handleSseComponentFragments() - Partial DOM updates │ │ │ │ SseClient (EventSource Wrapper) │ │ ├─ connect() - EventSource mit exponential backoff │ │ ├─ disconnect() - Graceful shutdown │ │ └─ Reference counting für shared connections │ └─────────────────────────────────────────────────────────────┘ ↓ SSE Stream (text/event-stream) ┌─────────────────────────────────────────────────────────────┐ │ PHP Backend │ ├─────────────────────────────────────────────────────────────┤ │ ComponentRegistry │ │ └─ renderWithWrapper() - Auto-inject data-sse-channel │ │ │ │ SseEventBroadcaster (EventListener) │ │ ├─ onComponentUpdated() - Broadcast component updates │ │ └─ Filters by channel subscription │ │ │ │ SseConnectionPool │ │ ├─ Connection Management │ │ ├─ Heartbeat Scheduling (30s interval) │ │ └─ Broadcast to filtered connections │ │ │ │ /sse/events/{channel} Route │ │ ├─ text/event-stream Response │ │ ├─ Generator-based streaming │ │ └─ Connection lifecycle management │ └─────────────────────────────────────────────────────────────┘ ``` ## Backend-Komponenten ### 1. SseChannel (Value Object) **Location**: `src/Framework/Sse/ValueObjects/SseChannel.php` Typed Channel-Identifier mit Validierung: ```php final readonly class SseChannel { public function __construct( public string $value ) { if (empty($value)) { throw new InvalidArgumentException('Channel cannot be empty'); } } public static function global(): self { return new self('global'); } public static function user(string $userId): self { return new self("user:$userId"); } public function equals(self $other): bool { return $this->value === $other->value; } } ``` ### 2. SseConnection (Value Object) **Location**: `src/Framework/Sse/ValueObjects/SseConnection.php` Repräsentiert eine aktive SSE-Verbindung: ```php final readonly class SseConnection { public function __construct( public string $id, // Unique connection ID public SseChannel $channel, // Subscribed channel public \DateTimeImmutable $connectedAt, public ?\DateTimeImmutable $lastHeartbeat = null, public array $metadata = [] // User-Agent, IP, etc. ) {} public function isExpired(int $timeoutSeconds = 60): bool { $lastActivity = $this->lastHeartbeat ?? $this->connectedAt; $now = new \DateTimeImmutable(); return ($now->getTimestamp() - $lastActivity->getTimestamp()) > $timeoutSeconds; } public function withHeartbeat(\DateTimeImmutable $timestamp): self { return new self( id: $this->id, channel: $this->channel, connectedAt: $this->connectedAt, lastHeartbeat: $timestamp, metadata: $this->metadata ); } } ``` ### 3. SseConnectionPool **Location**: `src/Framework/Sse/SseConnectionPool.php` Verwaltet alle aktiven SSE-Verbindungen: ```php final class SseConnectionPool { /** @var array */ private array $connections = []; /** @var array */ private array $generators = []; public function add(SseConnection $connection, Generator $generator): void { $this->connections[$connection->id] = $connection; $this->generators[$connection->id] = $generator; } public function remove(string $connectionId): void { unset($this->connections[$connectionId]); unset($this->generators[$connectionId]); } /** * Send event to all connections subscribed to channel */ public function broadcast(SseChannel $channel, SseEvent $event): void { foreach ($this->connections as $connectionId => $connection) { if ($connection->channel->equals($channel)) { $generator = $this->generators[$connectionId]; $generator->send($event); } } } /** * Send heartbeat to all connections */ public function sendHeartbeats(): void { $heartbeat = SseEvent::heartbeat(); foreach ($this->generators as $generator) { $generator->send($heartbeat); } } /** * Remove expired connections */ public function cleanupExpired(int $timeoutSeconds = 60): int { $removed = 0; foreach ($this->connections as $connectionId => $connection) { if ($connection->isExpired($timeoutSeconds)) { $this->remove($connectionId); $removed++; } } return $removed; } } ``` ### 4. SseEvent (Value Object) **Location**: `src/Framework/Sse/ValueObjects/SseEvent.php` SSE-Event mit Typ und Daten: ```php final readonly class SseEvent { public function __construct( public SseEventType $type, public array $data = [], public ?string $id = null, public ?int $retry = null ) {} public static function heartbeat(): self { return new self( type: SseEventType::HEARTBEAT, data: ['timestamp' => time()] ); } public static function componentUpdate( string $componentId, array $state, string $html ): self { return new self( type: SseEventType::COMPONENT_UPDATE, data: [ 'componentId' => $componentId, 'state' => $state, 'html' => $html ] ); } public static function componentFragments( string $componentId, array $fragments ): self { return new self( type: SseEventType::COMPONENT_FRAGMENTS, data: [ 'componentId' => $componentId, 'fragments' => $fragments ] ); } public function toSseFormat(): string { $output = ''; // Event type if ($this->type !== SseEventType::MESSAGE) { $output .= "event: {$this->type->value}\n"; } // Event ID (for reconnection) if ($this->id !== null) { $output .= "id: {$this->id}\n"; } // Retry interval (milliseconds) if ($this->retry !== null) { $output .= "retry: {$this->retry}\n"; } // Data (JSON-encoded) $output .= "data: " . json_encode($this->data) . "\n\n"; return $output; } } ``` ### 5. SseEventBroadcaster (Event Listener) **Location**: `src/Framework/Sse/SseEventBroadcaster.php` Hört auf `ComponentUpdatedEvent` und broadcastet SSE-Events: ```php #[EventHandler] final readonly class SseEventBroadcaster { public function __construct( private SseConnectionPool $connectionPool, private ComponentRegistry $componentRegistry ) {} public function onComponentUpdated(ComponentUpdatedEvent $event): void { // Get component instance to extract SSE channel $component = $this->componentRegistry->get($event->componentId); // Check if component supports SSE if (!method_exists($component, 'getSseChannel')) { return; // Not an SSE-capable component } $channel = new SseChannel($component->getSseChannel()); // Render updated component $html = $this->componentRegistry->render($component); // Create SSE event $sseEvent = SseEvent::componentUpdate( componentId: $event->componentId->toString(), state: $event->newState->toArray(), html: $html ); // Broadcast to all subscribed connections $this->connectionPool->broadcast($channel, $sseEvent); } } ``` ### 6. SSE Route Controller **Location**: `src/Application/Api/SseController.php` ```php final readonly class SseController { #[Route(path: '/sse/events/{channel}', method: Method::GET)] public function events(string $channel, HttpRequest $request): StreamedResponse { $sseChannel = new SseChannel($channel); $connectionId = uniqid('sse_', true); $connection = new SseConnection( id: $connectionId, channel: $sseChannel, connectedAt: new \DateTimeImmutable(), metadata: [ 'user_agent' => $request->headers->getFirst(HeaderKey::USER_AGENT), 'ip' => $request->server->getClientIp() ] ); // Generator für SSE-Stream $generator = $this->createEventGenerator(); // Connection registrieren $this->connectionPool->add($connection, $generator); // Cleanup bei Verbindungsabbruch register_shutdown_function(function() use ($connectionId) { $this->connectionPool->remove($connectionId); }); return new StreamedResponse( generator: $generator, headers: [ 'Content-Type' => 'text/event-stream', 'Cache-Control' => 'no-cache', 'X-Accel-Buffering' => 'no' ] ); } private function createEventGenerator(): Generator { // Initial connection event yield SseEvent::heartbeat(); // Keep connection open and wait for events while (true) { // Generator wird von außen mit send() gefüttert $event = yield; if ($event === null) { // Heartbeat alle 30s sleep(30); yield SseEvent::heartbeat(); } else { yield $event; } } } } ``` ## Frontend-Komponenten ### 1. SseClient **Location**: `resources/js/modules/sse/index.js` EventSource-Wrapper mit Reconnection-Logik: ```javascript export class SseClient { constructor(channels = [], options = {}) { this.channels = channels; this.baseUrl = options.baseUrl || '/sse/events'; this.autoReconnect = options.autoReconnect ?? true; this.heartbeatTimeout = options.heartbeatTimeout || 45000; this.eventSource = null; this.reconnectAttempts = 0; this.maxReconnectDelay = 30000; this.heartbeatTimer = null; this.listeners = new Map(); this.isConnected = false; } connect() { if (this.eventSource) { return; // Already connected } const channel = this.channels[0] || 'global'; const url = `${this.baseUrl}/${channel}`; this.eventSource = new EventSource(url); this.eventSource.onopen = () => { console.log(`[SSE] Connected to ${channel}`); this.isConnected = true; this.reconnectAttempts = 0; this.resetHeartbeatTimer(); this.emit('connect', { channel }); }; this.eventSource.onerror = (error) => { console.error('[SSE] Connection error:', error); this.isConnected = false; this.clearHeartbeatTimer(); if (this.eventSource.readyState === EventSource.CLOSED) { this.eventSource = null; if (this.autoReconnect) { this.scheduleReconnect(); } } }; // Event-Listener registrieren this.eventSource.addEventListener('heartbeat', () => { this.resetHeartbeatTimer(); }); this.eventSource.addEventListener('component-update', (event) => { const data = JSON.parse(event.data); this.emit('component-update', data); }); this.eventSource.addEventListener('component-fragments', (event) => { const data = JSON.parse(event.data); this.emit('component-fragments', data); }); } disconnect() { this.autoReconnect = false; this.clearHeartbeatTimer(); if (this.eventSource) { this.eventSource.close(); this.eventSource = null; } this.isConnected = false; this.reconnectAttempts = 0; } scheduleReconnect() { const delay = Math.min( 1000 * Math.pow(2, this.reconnectAttempts) + Math.random() * 1000, this.maxReconnectDelay ); console.log(`[SSE] Reconnecting in ${delay}ms (attempt ${this.reconnectAttempts + 1})`); setTimeout(() => { this.reconnectAttempts++; this.connect(); }, delay); } resetHeartbeatTimer() { this.clearHeartbeatTimer(); this.heartbeatTimer = setTimeout(() => { console.warn('[SSE] Heartbeat timeout - connection lost'); this.disconnect(); if (this.autoReconnect) { this.scheduleReconnect(); } }, this.heartbeatTimeout); } clearHeartbeatTimer() { if (this.heartbeatTimer) { clearTimeout(this.heartbeatTimer); this.heartbeatTimer = null; } } on(eventType, handler) { if (!this.listeners.has(eventType)) { this.listeners.set(eventType, []); } this.listeners.get(eventType).push(handler); } off(eventType, handler) { if (this.listeners.has(eventType)) { const handlers = this.listeners.get(eventType); const index = handlers.indexOf(handler); if (index !== -1) { handlers.splice(index, 1); } } } emit(eventType, data) { if (this.listeners.has(eventType)) { this.listeners.get(eventType).forEach(handler => handler(data)); } } } ``` ### 2. LiveComponentManager SSE Integration **Location**: `resources/js/modules/livecomponent/index.js` ```javascript class LiveComponentManager { constructor() { this.components = new Map(); this.sseClients = new Map(); // channel → SseClient this.componentChannels = new Map(); // componentId → channel } register(element) { const componentId = element.dataset.liveComponent; // ... existing registration logic ... // Setup SSE if component has data-sse-channel attribute this.setupSseConnection(element, componentId); } setupSseConnection(element, componentId) { const sseChannel = element.dataset.sseChannel; if (!sseChannel) { return; // Component doesn't support SSE } console.log(`[LiveComponent] Setting up SSE for ${componentId} on channel ${sseChannel}`); // Track component → channel mapping this.componentChannels.set(componentId, sseChannel); // Reuse existing SSE client for this channel or create new one let sseClient = this.sseClients.get(sseChannel); if (!sseClient) { sseClient = new SseClient([sseChannel], { autoReconnect: true, heartbeatTimeout: 45000 }); // Register event handlers this.registerSseHandlers(sseClient, sseChannel); // Connect to SSE stream sseClient.connect(); // Store for reuse this.sseClients.set(sseChannel, sseClient); } } registerSseHandlers(sseClient, channel) { // Handle component updates sseClient.on('component-update', (data) => { this.handleSseComponentUpdate(data, channel); }); // Handle partial fragment updates sseClient.on('component-fragments', (data) => { this.handleSseComponentFragments(data, channel); }); // Connection status sseClient.on('connect', () => { console.log(`[LiveComponent] SSE connected: ${channel}`); }); } handleSseComponentUpdate(data, channel) { const { componentId, state, html, events } = data; const config = this.components.get(componentId); if (!config) { console.warn(`[LiveComponent] Component ${componentId} not found for SSE update`); return; } console.log(`[LiveComponent] SSE update received for ${componentId}`); // Update component HTML if (html) { config.element.innerHTML = html; this.setupActionHandlers(config.element); this.setupFileUploadHandlers(config.element); } // Update component state if (state) { config.element.dataset.state = JSON.stringify(state); } // Dispatch server-side events if (events) { this.handleServerEvents(events); } } handleSseComponentFragments(data, channel) { const { componentId, fragments } = data; const config = this.components.get(componentId); if (!config) { return; } console.log(`[LiveComponent] SSE fragment update for ${componentId}`, fragments); // Use DomPatcher for partial updates this.updateFragments(config.element, fragments); } cleanup(componentId) { const channel = this.componentChannels.get(componentId); if (!channel) { return; } // Remove channel mapping this.componentChannels.delete(componentId); // Check if any other component uses this channel const stillInUse = Array.from(this.componentChannels.values()) .some(ch => ch === channel); // Only disconnect if no other components use this channel if (!stillInUse) { const sseClient = this.sseClients.get(channel); if (sseClient) { console.log(`[LiveComponent] Disconnecting SSE: ${channel}`); sseClient.disconnect(); this.sseClients.delete(channel); } } } } ``` ## Channel-Strategien ### Global Channel **Verwendung**: System-weite Updates für alle Benutzer ```php public function getSseChannel(): string { return 'global'; } ``` **Beispiele**: - System-Announcements - Public Activity Feeds - Global Notification Center ### User-Specific Channels **Verwendung**: Benutzer-spezifische Updates ```php public function getSseChannel(): string { $instanceId = $this->id->instanceId; // "user-123" if ($instanceId === 'guest' || $instanceId === 'global') { return 'global'; } return "user:{$instanceId}"; } ``` **Beispiele**: - Private Benachrichtigungen - User-spezifische Activity Feeds - Personal Dashboards ### Context-Based Channels **Verwendung**: Kontext-spezifische Updates (Posts, Articles, Rooms) ```php public function getSseChannel(): string { $context = $this->id->instanceId; // "post-123", "article-456" return "comments:{$context}"; } ``` **Beispiele**: - Post/Article Comments - Chat Rooms - Document Collaboration ### Presence Channels **Verwendung**: Real-time Präsenz-Tracking ```php public function getSseChannel(): string { $room = $this->id->instanceId; // "lobby", "room-5" return "presence:{$room}"; } ``` **Beispiele**: - Room Presence - Active User Lists - Collaborative Editing Sessions ## Event-System Integration ### ComponentUpdatedEvent Wird automatisch dispatched wenn eine Component-Action ausgeführt wird: ```php final readonly class LiveComponentActionHandler { public function handleAction( string $componentId, string $actionName, array $parameters ): ComponentData { // Action ausführen $newState = $component->$actionName(...$parameters); // Event dispatchen für SSE Broadcasting $this->eventDispatcher->dispatch( new ComponentUpdatedEvent( componentId: ComponentId::fromString($componentId), actionName: $actionName, oldState: $component->getData(), newState: $newState ) ); return $newState; } } ``` ### Custom Events Du kannst auch manuelle SSE-Updates dispatchen: ```php // In deinem Service/Controller $event = new ComponentUpdatedEvent( componentId: ComponentId::fromString('notification-center:user-123'), actionName: 'external_notification', oldState: $oldState, newState: $newState ); $this->eventDispatcher->dispatch($event); ``` ## Performance-Aspekte ### Connection Pooling - **Shared Connections**: Mehrere Components auf demselben Channel teilen eine SSE-Verbindung - **Reference Counting**: Connection wird erst geschlossen wenn keine Components mehr subscriben - **Memory Efficiency**: Nur eine EventSource pro Channel ### Heartbeat Strategy - **Server-Side**: 30s Heartbeat-Interval - **Client-Side**: 45s Timeout-Erkennung - **Graceful Degradation**: Automatic Reconnection mit Exponential Backoff ### Reconnection Strategy ```javascript // Exponential Backoff mit Jitter delay = min( 1000 * 2^attempts + random(0, 1000), 30000 // max 30s ) ``` **Reconnection Times**: - Attempt 1: ~1s + jitter - Attempt 2: ~2s + jitter - Attempt 3: ~4s + jitter - Attempt 4: ~8s + jitter - Attempt 5: ~16s + jitter - Attempt 6+: 30s (capped) ### Bandwidth Optimization - **Partial Updates**: Fragment-basierte Updates statt Full-Render - **JSON Compression**: Minimale Payload-Größe - **Conditional Rendering**: Nur betroffene Components updaten ## Error Handling ### Server-Side Errors ```php try { $sseEvent = SseEvent::componentUpdate(...); $this->connectionPool->broadcast($channel, $sseEvent); } catch (\Exception $e) { $this->logger->error('SSE broadcast failed', [ 'channel' => $channel->value, 'error' => $e->getMessage() ]); // Connection wird im Pool behalten, Retry bei nächstem Event } ``` ### Client-Side Errors ```javascript sseClient.on('error', (error) => { console.error('[SSE] Error:', error); // Automatic reconnection wenn enabled if (this.autoReconnect) { this.scheduleReconnect(); } }); ``` ### Connection Timeout - **Heartbeat Monitoring**: 45s Client-Timeout - **Server Cleanup**: Expired Connections werden nach 60s entfernt - **Automatic Reconnect**: Client reconnected automatisch ## Best Practices ### 1. Channel Design ✅ **DO**: Spezifische Channels für verschiedene Kontexte ```php return "comments:post-{$postId}"; // Spezifisch ``` ❌ **DON'T**: Über-generalisierte Channels ```php return "all-updates"; // Zu breit ``` ### 2. Event Payload ✅ **DO**: Minimale Payloads mit nur notwendigen Daten ```php SseEvent::componentUpdate( componentId: $id, state: $state, html: $html // Nur wenn Full-Render notwendig ); ``` ❌ **DON'T**: Große Payloads mit unnötigen Daten ### 3. Connection Management ✅ **DO**: Reference Counting für Shared Connections ```javascript // Automatisch in LiveComponentManager ``` ❌ **DON'T**: Eine Connection pro Component ```javascript // Vermeiden: new SseClient() für jede Component ``` ### 4. Error Handling ✅ **DO**: Graceful Degradation ```javascript sseClient.on('error', () => { // Continue operation without real-time updates this.fallbackToPolling(); }); ``` ❌ **DON'T**: Hard Failures bei SSE-Problemen ### 5. Testing ✅ **DO**: Mock SSE in Tests ```php $mockBroadcaster = new class implements EventListener { public function onComponentUpdated($event): void { // Track calls in test } }; ``` ## Debugging ### Server-Side Logging ```php // In SseEventBroadcaster $this->logger->debug('SSE broadcast', [ 'channel' => $channel->value, 'component' => $event->componentId->toString(), 'action' => $event->actionName, 'connections' => count($this->connectionPool->getByChannel($channel)) ]); ``` ### Client-Side Logging ```javascript // Enable verbose logging const sseClient = new SseClient(['user:123'], { debug: true // Logs alle Events }); ``` ### Browser DevTools - **Network Tab**: SSE-Stream als `eventsource` Type sichtbar - **Console**: SSE-Events werden geloggt - **EventSource Status**: `readyState` Property prüfen ## Troubleshooting ### Component Updates kommen nicht an **Diagnose**: 1. Prüfe ob Component `getSseChannel()` implementiert 2. Prüfe ob `data-sse-channel` Attribute im HTML vorhanden ist 3. Prüfe Browser Console für SSE Connection-Status 4. Prüfe Server-Logs für Broadcasting-Events **Lösung**: ```php // Component muss getSseChannel() haben public function getSseChannel(): string { return "user:{$this->userId}"; } // ComponentRegistry fügt automatisch data-sse-channel hinzu ``` ### Reconnection-Loops **Ursache**: Server wirft Exceptions bei SSE-Route **Lösung**: Prüfe Server-Logs und fixe Exception-Ursache ### Memory Leaks **Ursache**: Connections werden nicht cleanup'd **Lösung**: ```php // Automatisches Cleanup alle 5 Minuten $this->scheduler->schedule( 'sse-cleanup', IntervalSchedule::every(Duration::fromMinutes(5)), fn() => $this->connectionPool->cleanupExpired() ); ``` ## Sicherheit ### Channel-Isolation - Channels sind **nicht authentifiziert** - verwende Server-Side Filtering - User-spezifische Channels müssen Server-Side validiert werden ```php #[Route('/sse/events/{channel}')] public function events(string $channel, HttpRequest $request): StreamedResponse { // Validiere dass User auf Channel zugreifen darf if (str_starts_with($channel, 'user:')) { $userId = substr($channel, 5); if ($userId !== $request->user->id) { throw new UnauthorizedException(); } } // ... rest of implementation } ``` ### XSS Prevention - Alle HTML aus SSE-Updates wird escaped - Verwende DOMPurify für User-Generated Content ### Rate Limiting - Implementiere Rate Limiting für SSE-Connections - Limit: Max 5 Connections pro User ## Framework Integration ### Mit Event System ```php // EventDispatcher → SseEventBroadcaster → SseConnectionPool $this->eventDispatcher->dispatch(new ComponentUpdatedEvent(...)); ``` ### Mit Queue System ```php // Für verzögerte Broadcasts $this->queue->push(new BroadcastSseEventJob( channel: $channel, event: $event )); ``` ### Mit Cache System ```php // Cache letzte Events für neue Connections (Replay) $this->cache->remember( CacheKey::fromString("sse:last-events:{$channel}"), fn() => $this->getLastEvents($channel), Duration::fromMinutes(5) ); ``` ## Zusammenfassung Das SSE-System bietet: ✅ **Echtzeit-Updates** ohne Polling oder WebSockets ✅ **Zero-Config** für Components mit getSseChannel() ✅ **Auto-Reconnection** mit intelligenter Backoff-Strategie ✅ **Channel-Based Routing** für flexible Update-Strategien ✅ **Event-Integration** für nahtlose Backend-Updates ✅ **Connection Pooling** für Performance-Optimierung ✅ **Framework-Compliant** mit Value Objects und Readonly Classes **Next Steps**: - Siehe [SSE Integration Guide](./sse-integration-guide.md) für praktische Implementierung - Siehe [LiveComponents System](./livecomponents-system.md) für Component-Entwicklung