pool = new SseConnectionPool(); }); it('adds connection to pool', function () { $stream = new MockSseStream(); $connectionId = ConnectionId::generate(); $channel = SseChannel::forUser('user123'); $connection = new SseConnection( connectionId: $connectionId, stream: $stream, channels: [$channel], userId: 'user123' ); $this->pool->add($connection); expect($this->pool->has($connectionId))->toBeTrue(); expect($this->pool->count())->toBe(1); }); it('retrieves connection by id', function () { $stream = new MockSseStream(); $connectionId = ConnectionId::generate(); $channel = SseChannel::forUser('user123'); $connection = new SseConnection( connectionId: $connectionId, stream: $stream, channels: [$channel] ); $this->pool->add($connection); $retrieved = $this->pool->get($connectionId); expect($retrieved)->toBeInstanceOf(SseConnection::class); expect($retrieved->connectionId->toString())->toBe($connectionId->toString()); }); it('removes connection from pool', function () { $stream = new MockSseStream(); $connectionId = ConnectionId::generate(); $channel = SseChannel::system(); $connection = new SseConnection( connectionId: $connectionId, stream: $stream, channels: [$channel] ); $this->pool->add($connection); expect($this->pool->count())->toBe(1); $this->pool->remove($connectionId); expect($this->pool->has($connectionId))->toBeFalse(); expect($this->pool->count())->toBe(0); }); it('broadcasts event to subscribed connections', function () { $channel = SseChannel::forUser('user123'); // Create two connections subscribed to same channel $stream1 = new MockSseStream(); $connection1 = new SseConnection( connectionId: ConnectionId::generate(), stream: $stream1, channels: [$channel], userId: 'user123' ); $stream2 = new MockSseStream(); $connection2 = new SseConnection( connectionId: ConnectionId::generate(), stream: $stream2, channels: [$channel], userId: 'user123' ); $this->pool->add($connection1); $this->pool->add($connection2); // Broadcast event $event = new SseEvent( data: json_encode(['message' => 'test']), event: 'test-event' ); $sent = $this->pool->broadcast($event, $channel); expect($sent)->toBe(2); expect($stream1->getSentEventsCount())->toBe(1); expect($stream2->getSentEventsCount())->toBe(1); }); it('does not broadcast to unsubscribed connections', function () { $userChannel = SseChannel::forUser('user123'); $systemChannel = SseChannel::system(); $stream1 = new MockSseStream(); $connection1 = new SseConnection( connectionId: ConnectionId::generate(), stream: $stream1, channels: [$userChannel] ); $this->pool->add($connection1); // Broadcast to different channel $event = new SseEvent(data: 'test', event: 'test'); $sent = $this->pool->broadcast($event, $systemChannel); expect($sent)->toBe(0); expect($stream1->getSentEventsCount())->toBe(0); }); it('removes dead connections during broadcast', function () { $channel = SseChannel::system(); $stream = new MockSseStream(); $connectionId = ConnectionId::generate(); $connection = new SseConnection( connectionId: $connectionId, stream: $stream, channels: [$channel] ); $this->pool->add($connection); // Simulate connection death $stream->simulateDisconnect(); // Broadcast event $event = new SseEvent(data: 'test', event: 'test'); $sent = $this->pool->broadcast($event, $channel); expect($sent)->toBe(0); expect($this->pool->has($connectionId))->toBeFalse(); }); it('sends heartbeats to all connections', function () { $stream1 = new MockSseStream(); $connection1 = new SseConnection( connectionId: ConnectionId::generate(), stream: $stream1, channels: [SseChannel::system()], lastHeartbeat: new DateTimeImmutable('-35 seconds') ); $stream2 = new MockSseStream(); $connection2 = new SseConnection( connectionId: ConnectionId::generate(), stream: $stream2, channels: [SseChannel::system()], lastHeartbeat: new DateTimeImmutable('-35 seconds') ); $this->pool->add($connection1); $this->pool->add($connection2); $sent = $this->pool->sendHeartbeats(heartbeatIntervalSeconds: 30); expect($sent)->toBe(2); expect(count($stream1->getSentEventsByType('heartbeat')))->toBe(1); expect(count($stream2->getSentEventsByType('heartbeat')))->toBe(1); }); it('does not send heartbeat if not needed', function () { $stream = new MockSseStream(); $connection = new SseConnection( connectionId: ConnectionId::generate(), stream: $stream, channels: [SseChannel::system()], lastHeartbeat: new DateTimeImmutable('-10 seconds') ); $this->pool->add($connection); $sent = $this->pool->sendHeartbeats(heartbeatIntervalSeconds: 30); expect($sent)->toBe(0); expect($stream->getSentEventsCount())->toBe(0); }); it('cleans up dead connections', function () { // Add alive connection $aliveStream = new MockSseStream(); $aliveConnection = new SseConnection( connectionId: ConnectionId::generate(), stream: $aliveStream, channels: [SseChannel::system()] ); // Add dead connection $deadStream = new MockSseStream(); $deadStream->simulateDisconnect(); $deadConnection = new SseConnection( connectionId: ConnectionId::generate(), stream: $deadStream, channels: [SseChannel::system()] ); $this->pool->add($aliveConnection); $this->pool->add($deadConnection); expect($this->pool->count())->toBe(2); $removed = $this->pool->cleanup(); expect($removed)->toBe(1); expect($this->pool->count())->toBe(1); }); it('enforces maximum connections limit', function () { $pool = new SseConnectionPool(); // Add 10000 connections (max limit) for ($i = 0; $i < 10000; $i++) { $stream = new MockSseStream(); $connection = new SseConnection( connectionId: ConnectionId::generate(), stream: $stream, channels: [SseChannel::system()] ); $pool->add($connection); } expect($pool->count())->toBe(10000); // Try to add one more - should throw exception expect(function () use ($pool) { $stream = new MockSseStream(); $connection = new SseConnection( connectionId: ConnectionId::generate(), stream: $stream, channels: [SseChannel::system()] ); $pool->add($connection); })->toThrow(RuntimeException::class, 'Maximum connection limit reached'); }); it('enforces per-user connection limit', function () { $userId = 'user123'; // Add 5 connections for same user (max limit) for ($i = 0; $i < 5; $i++) { $stream = new MockSseStream(); $connection = new SseConnection( connectionId: ConnectionId::generate(), stream: $stream, channels: [SseChannel::forUser($userId)], userId: $userId ); $this->pool->add($connection); } expect($this->pool->count())->toBe(5); // Try to add 6th connection for same user - should throw expect(function () use ($userId) { $stream = new MockSseStream(); $connection = new SseConnection( connectionId: ConnectionId::generate(), stream: $stream, channels: [SseChannel::forUser($userId)], userId: $userId ); $this->pool->add($connection); })->toThrow(RuntimeException::class, 'Maximum connections per user limit reached'); }); it('provides accurate statistics', function () { // Add authenticated connection $authStream = new MockSseStream(); $authConnection = new SseConnection( connectionId: ConnectionId::generate(), stream: $authStream, channels: [SseChannel::forUser('user123')], userId: 'user123' ); // Add anonymous connection $anonStream = new MockSseStream(); $anonConnection = new SseConnection( connectionId: ConnectionId::generate(), stream: $anonStream, channels: [SseChannel::system()] ); $this->pool->add($authConnection); $this->pool->add($anonConnection); $stats = $this->pool->getStats(); expect($stats['total_connections'])->toBe(2); expect($stats['authenticated_connections'])->toBe(1); expect($stats['anonymous_connections'])->toBe(1); expect($stats['max_connections'])->toBe(10000); expect($stats['max_connections_per_user'])->toBe(5); }); it('tracks connections by channel', function () { $userChannel = SseChannel::forUser('user123'); $systemChannel = SseChannel::system(); $stream1 = new MockSseStream(); $connection1 = new SseConnection( connectionId: ConnectionId::generate(), stream: $stream1, channels: [$userChannel] ); $stream2 = new MockSseStream(); $connection2 = new SseConnection( connectionId: ConnectionId::generate(), stream: $stream2, channels: [$systemChannel] ); $stream3 = new MockSseStream(); $connection3 = new SseConnection( connectionId: ConnectionId::generate(), stream: $stream3, channels: [$userChannel] ); $this->pool->add($connection1); $this->pool->add($connection2); $this->pool->add($connection3); $userConnections = $this->pool->getConnectionsForChannel($userChannel); $systemConnections = $this->pool->getConnectionsForChannel($systemChannel); expect(count($userConnections))->toBe(2); expect(count($systemConnections))->toBe(1); }); it('clears all connections', function () { for ($i = 0; $i < 3; $i++) { $stream = new MockSseStream(); $connection = new SseConnection( connectionId: ConnectionId::generate(), stream: $stream, channels: [SseChannel::system()] ); $this->pool->add($connection); } expect($this->pool->count())->toBe(3); $this->pool->clear(); expect($this->pool->count())->toBe(0); }); it('handles multiple channels per connection', function () { $userChannel = SseChannel::forUser('user123'); $systemChannel = SseChannel::system(); $stream = new MockSseStream(); $connection = new SseConnection( connectionId: ConnectionId::generate(), stream: $stream, channels: [$userChannel, $systemChannel], userId: 'user123' ); $this->pool->add($connection); // Should receive events from both channels $userEvent = new SseEvent(data: 'user-event', event: 'test'); $sentUser = $this->pool->broadcast($userEvent, $userChannel); $stream->clear(); $systemEvent = new SseEvent(data: 'system-event', event: 'test'); $sentSystem = $this->pool->broadcast($systemEvent, $systemChannel); expect($sentUser)->toBe(1); expect($sentSystem)->toBe(1); }); it('returns all active connections', function () { for ($i = 0; $i < 3; $i++) { $stream = new MockSseStream(); $connection = new SseConnection( connectionId: ConnectionId::generate(), stream: $stream, channels: [SseChannel::system()] ); $this->pool->add($connection); } $allConnections = $this->pool->all(); expect($allConnections)->toBeArray(); expect(count($allConnections))->toBe(3); expect($allConnections[0])->toBeInstanceOf(SseConnection::class); }); });