- Add comprehensive health check system with multiple endpoints - Add Prometheus metrics endpoint - Add production logging configurations (5 strategies) - Add complete deployment documentation suite: * QUICKSTART.md - 30-minute deployment guide * DEPLOYMENT_CHECKLIST.md - Printable verification checklist * DEPLOYMENT_WORKFLOW.md - Complete deployment lifecycle * PRODUCTION_DEPLOYMENT.md - Comprehensive technical reference * production-logging.md - Logging configuration guide * ANSIBLE_DEPLOYMENT.md - Infrastructure as Code automation * README.md - Navigation hub * DEPLOYMENT_SUMMARY.md - Executive summary - Add deployment scripts and automation - Add DEPLOYMENT_PLAN.md - Concrete plan for immediate deployment - Update README with production-ready features All production infrastructure is now complete and ready for deployment.
415 lines
13 KiB
PHP
415 lines
13 KiB
PHP
<?php
|
|
|
|
declare(strict_types=1);
|
|
|
|
use App\Framework\Router\Result\SseEvent;
|
|
use App\Framework\Sse\SseConnectionPool;
|
|
use App\Framework\Sse\ValueObjects\ConnectionId;
|
|
use App\Framework\Sse\ValueObjects\SseChannel;
|
|
use App\Framework\Sse\ValueObjects\SseConnection;
|
|
use Tests\Support\MockSseStream;
|
|
|
|
describe('SseConnectionPool Integration', function () {
|
|
beforeEach(function () {
|
|
$this->pool = new SseConnectionPool();
|
|
});
|
|
|
|
it('adds connection to pool', function () {
|
|
$stream = new MockSseStream();
|
|
$connectionId = ConnectionId::generate();
|
|
$channel = SseChannel::forUser('user123');
|
|
|
|
$connection = new SseConnection(
|
|
connectionId: $connectionId,
|
|
stream: $stream,
|
|
channels: [$channel],
|
|
userId: 'user123'
|
|
);
|
|
|
|
$this->pool->add($connection);
|
|
|
|
expect($this->pool->has($connectionId))->toBeTrue();
|
|
expect($this->pool->count())->toBe(1);
|
|
});
|
|
|
|
it('retrieves connection by id', function () {
|
|
$stream = new MockSseStream();
|
|
$connectionId = ConnectionId::generate();
|
|
$channel = SseChannel::forUser('user123');
|
|
|
|
$connection = new SseConnection(
|
|
connectionId: $connectionId,
|
|
stream: $stream,
|
|
channels: [$channel]
|
|
);
|
|
|
|
$this->pool->add($connection);
|
|
|
|
$retrieved = $this->pool->get($connectionId);
|
|
|
|
expect($retrieved)->toBeInstanceOf(SseConnection::class);
|
|
expect($retrieved->connectionId->toString())->toBe($connectionId->toString());
|
|
});
|
|
|
|
it('removes connection from pool', function () {
|
|
$stream = new MockSseStream();
|
|
$connectionId = ConnectionId::generate();
|
|
$channel = SseChannel::system();
|
|
|
|
$connection = new SseConnection(
|
|
connectionId: $connectionId,
|
|
stream: $stream,
|
|
channels: [$channel]
|
|
);
|
|
|
|
$this->pool->add($connection);
|
|
expect($this->pool->count())->toBe(1);
|
|
|
|
$this->pool->remove($connectionId);
|
|
|
|
expect($this->pool->has($connectionId))->toBeFalse();
|
|
expect($this->pool->count())->toBe(0);
|
|
});
|
|
|
|
it('broadcasts event to subscribed connections', function () {
|
|
$channel = SseChannel::forUser('user123');
|
|
|
|
// Create two connections subscribed to same channel
|
|
$stream1 = new MockSseStream();
|
|
$connection1 = new SseConnection(
|
|
connectionId: ConnectionId::generate(),
|
|
stream: $stream1,
|
|
channels: [$channel],
|
|
userId: 'user123'
|
|
);
|
|
|
|
$stream2 = new MockSseStream();
|
|
$connection2 = new SseConnection(
|
|
connectionId: ConnectionId::generate(),
|
|
stream: $stream2,
|
|
channels: [$channel],
|
|
userId: 'user123'
|
|
);
|
|
|
|
$this->pool->add($connection1);
|
|
$this->pool->add($connection2);
|
|
|
|
// Broadcast event
|
|
$event = new SseEvent(
|
|
data: json_encode(['message' => 'test']),
|
|
event: 'test-event'
|
|
);
|
|
|
|
$sent = $this->pool->broadcast($event, $channel);
|
|
|
|
expect($sent)->toBe(2);
|
|
expect($stream1->getSentEventsCount())->toBe(1);
|
|
expect($stream2->getSentEventsCount())->toBe(1);
|
|
});
|
|
|
|
it('does not broadcast to unsubscribed connections', function () {
|
|
$userChannel = SseChannel::forUser('user123');
|
|
$systemChannel = SseChannel::system();
|
|
|
|
$stream1 = new MockSseStream();
|
|
$connection1 = new SseConnection(
|
|
connectionId: ConnectionId::generate(),
|
|
stream: $stream1,
|
|
channels: [$userChannel]
|
|
);
|
|
|
|
$this->pool->add($connection1);
|
|
|
|
// Broadcast to different channel
|
|
$event = new SseEvent(data: 'test', event: 'test');
|
|
$sent = $this->pool->broadcast($event, $systemChannel);
|
|
|
|
expect($sent)->toBe(0);
|
|
expect($stream1->getSentEventsCount())->toBe(0);
|
|
});
|
|
|
|
it('removes dead connections during broadcast', function () {
|
|
$channel = SseChannel::system();
|
|
|
|
$stream = new MockSseStream();
|
|
$connectionId = ConnectionId::generate();
|
|
$connection = new SseConnection(
|
|
connectionId: $connectionId,
|
|
stream: $stream,
|
|
channels: [$channel]
|
|
);
|
|
|
|
$this->pool->add($connection);
|
|
|
|
// Simulate connection death
|
|
$stream->simulateDisconnect();
|
|
|
|
// Broadcast event
|
|
$event = new SseEvent(data: 'test', event: 'test');
|
|
$sent = $this->pool->broadcast($event, $channel);
|
|
|
|
expect($sent)->toBe(0);
|
|
expect($this->pool->has($connectionId))->toBeFalse();
|
|
});
|
|
|
|
it('sends heartbeats to all connections', function () {
|
|
$stream1 = new MockSseStream();
|
|
$connection1 = new SseConnection(
|
|
connectionId: ConnectionId::generate(),
|
|
stream: $stream1,
|
|
channels: [SseChannel::system()],
|
|
lastHeartbeat: new DateTimeImmutable('-35 seconds')
|
|
);
|
|
|
|
$stream2 = new MockSseStream();
|
|
$connection2 = new SseConnection(
|
|
connectionId: ConnectionId::generate(),
|
|
stream: $stream2,
|
|
channels: [SseChannel::system()],
|
|
lastHeartbeat: new DateTimeImmutable('-35 seconds')
|
|
);
|
|
|
|
$this->pool->add($connection1);
|
|
$this->pool->add($connection2);
|
|
|
|
$sent = $this->pool->sendHeartbeats(heartbeatIntervalSeconds: 30);
|
|
|
|
expect($sent)->toBe(2);
|
|
expect(count($stream1->getSentEventsByType('heartbeat')))->toBe(1);
|
|
expect(count($stream2->getSentEventsByType('heartbeat')))->toBe(1);
|
|
});
|
|
|
|
it('does not send heartbeat if not needed', function () {
|
|
$stream = new MockSseStream();
|
|
$connection = new SseConnection(
|
|
connectionId: ConnectionId::generate(),
|
|
stream: $stream,
|
|
channels: [SseChannel::system()],
|
|
lastHeartbeat: new DateTimeImmutable('-10 seconds')
|
|
);
|
|
|
|
$this->pool->add($connection);
|
|
|
|
$sent = $this->pool->sendHeartbeats(heartbeatIntervalSeconds: 30);
|
|
|
|
expect($sent)->toBe(0);
|
|
expect($stream->getSentEventsCount())->toBe(0);
|
|
});
|
|
|
|
it('cleans up dead connections', function () {
|
|
// Add alive connection
|
|
$aliveStream = new MockSseStream();
|
|
$aliveConnection = new SseConnection(
|
|
connectionId: ConnectionId::generate(),
|
|
stream: $aliveStream,
|
|
channels: [SseChannel::system()]
|
|
);
|
|
|
|
// Add dead connection
|
|
$deadStream = new MockSseStream();
|
|
$deadStream->simulateDisconnect();
|
|
$deadConnection = new SseConnection(
|
|
connectionId: ConnectionId::generate(),
|
|
stream: $deadStream,
|
|
channels: [SseChannel::system()]
|
|
);
|
|
|
|
$this->pool->add($aliveConnection);
|
|
$this->pool->add($deadConnection);
|
|
|
|
expect($this->pool->count())->toBe(2);
|
|
|
|
$removed = $this->pool->cleanup();
|
|
|
|
expect($removed)->toBe(1);
|
|
expect($this->pool->count())->toBe(1);
|
|
});
|
|
|
|
it('enforces maximum connections limit', function () {
|
|
$pool = new SseConnectionPool();
|
|
|
|
// Add 10000 connections (max limit)
|
|
for ($i = 0; $i < 10000; $i++) {
|
|
$stream = new MockSseStream();
|
|
$connection = new SseConnection(
|
|
connectionId: ConnectionId::generate(),
|
|
stream: $stream,
|
|
channels: [SseChannel::system()]
|
|
);
|
|
$pool->add($connection);
|
|
}
|
|
|
|
expect($pool->count())->toBe(10000);
|
|
|
|
// Try to add one more - should throw exception
|
|
expect(function () use ($pool) {
|
|
$stream = new MockSseStream();
|
|
$connection = new SseConnection(
|
|
connectionId: ConnectionId::generate(),
|
|
stream: $stream,
|
|
channels: [SseChannel::system()]
|
|
);
|
|
$pool->add($connection);
|
|
})->toThrow(RuntimeException::class, 'Maximum connection limit reached');
|
|
});
|
|
|
|
it('enforces per-user connection limit', function () {
|
|
$userId = 'user123';
|
|
|
|
// Add 5 connections for same user (max limit)
|
|
for ($i = 0; $i < 5; $i++) {
|
|
$stream = new MockSseStream();
|
|
$connection = new SseConnection(
|
|
connectionId: ConnectionId::generate(),
|
|
stream: $stream,
|
|
channels: [SseChannel::forUser($userId)],
|
|
userId: $userId
|
|
);
|
|
$this->pool->add($connection);
|
|
}
|
|
|
|
expect($this->pool->count())->toBe(5);
|
|
|
|
// Try to add 6th connection for same user - should throw
|
|
expect(function () use ($userId) {
|
|
$stream = new MockSseStream();
|
|
$connection = new SseConnection(
|
|
connectionId: ConnectionId::generate(),
|
|
stream: $stream,
|
|
channels: [SseChannel::forUser($userId)],
|
|
userId: $userId
|
|
);
|
|
$this->pool->add($connection);
|
|
})->toThrow(RuntimeException::class, 'Maximum connections per user limit reached');
|
|
});
|
|
|
|
it('provides accurate statistics', function () {
|
|
// Add authenticated connection
|
|
$authStream = new MockSseStream();
|
|
$authConnection = new SseConnection(
|
|
connectionId: ConnectionId::generate(),
|
|
stream: $authStream,
|
|
channels: [SseChannel::forUser('user123')],
|
|
userId: 'user123'
|
|
);
|
|
|
|
// Add anonymous connection
|
|
$anonStream = new MockSseStream();
|
|
$anonConnection = new SseConnection(
|
|
connectionId: ConnectionId::generate(),
|
|
stream: $anonStream,
|
|
channels: [SseChannel::system()]
|
|
);
|
|
|
|
$this->pool->add($authConnection);
|
|
$this->pool->add($anonConnection);
|
|
|
|
$stats = $this->pool->getStats();
|
|
|
|
expect($stats['total_connections'])->toBe(2);
|
|
expect($stats['authenticated_connections'])->toBe(1);
|
|
expect($stats['anonymous_connections'])->toBe(1);
|
|
expect($stats['max_connections'])->toBe(10000);
|
|
expect($stats['max_connections_per_user'])->toBe(5);
|
|
});
|
|
|
|
it('tracks connections by channel', function () {
|
|
$userChannel = SseChannel::forUser('user123');
|
|
$systemChannel = SseChannel::system();
|
|
|
|
$stream1 = new MockSseStream();
|
|
$connection1 = new SseConnection(
|
|
connectionId: ConnectionId::generate(),
|
|
stream: $stream1,
|
|
channels: [$userChannel]
|
|
);
|
|
|
|
$stream2 = new MockSseStream();
|
|
$connection2 = new SseConnection(
|
|
connectionId: ConnectionId::generate(),
|
|
stream: $stream2,
|
|
channels: [$systemChannel]
|
|
);
|
|
|
|
$stream3 = new MockSseStream();
|
|
$connection3 = new SseConnection(
|
|
connectionId: ConnectionId::generate(),
|
|
stream: $stream3,
|
|
channels: [$userChannel]
|
|
);
|
|
|
|
$this->pool->add($connection1);
|
|
$this->pool->add($connection2);
|
|
$this->pool->add($connection3);
|
|
|
|
$userConnections = $this->pool->getConnectionsForChannel($userChannel);
|
|
$systemConnections = $this->pool->getConnectionsForChannel($systemChannel);
|
|
|
|
expect(count($userConnections))->toBe(2);
|
|
expect(count($systemConnections))->toBe(1);
|
|
});
|
|
|
|
it('clears all connections', function () {
|
|
for ($i = 0; $i < 3; $i++) {
|
|
$stream = new MockSseStream();
|
|
$connection = new SseConnection(
|
|
connectionId: ConnectionId::generate(),
|
|
stream: $stream,
|
|
channels: [SseChannel::system()]
|
|
);
|
|
$this->pool->add($connection);
|
|
}
|
|
|
|
expect($this->pool->count())->toBe(3);
|
|
|
|
$this->pool->clear();
|
|
|
|
expect($this->pool->count())->toBe(0);
|
|
});
|
|
|
|
it('handles multiple channels per connection', function () {
|
|
$userChannel = SseChannel::forUser('user123');
|
|
$systemChannel = SseChannel::system();
|
|
|
|
$stream = new MockSseStream();
|
|
$connection = new SseConnection(
|
|
connectionId: ConnectionId::generate(),
|
|
stream: $stream,
|
|
channels: [$userChannel, $systemChannel],
|
|
userId: 'user123'
|
|
);
|
|
|
|
$this->pool->add($connection);
|
|
|
|
// Should receive events from both channels
|
|
$userEvent = new SseEvent(data: 'user-event', event: 'test');
|
|
$sentUser = $this->pool->broadcast($userEvent, $userChannel);
|
|
|
|
$stream->clear();
|
|
|
|
$systemEvent = new SseEvent(data: 'system-event', event: 'test');
|
|
$sentSystem = $this->pool->broadcast($systemEvent, $systemChannel);
|
|
|
|
expect($sentUser)->toBe(1);
|
|
expect($sentSystem)->toBe(1);
|
|
});
|
|
|
|
it('returns all active connections', function () {
|
|
for ($i = 0; $i < 3; $i++) {
|
|
$stream = new MockSseStream();
|
|
$connection = new SseConnection(
|
|
connectionId: ConnectionId::generate(),
|
|
stream: $stream,
|
|
channels: [SseChannel::system()]
|
|
);
|
|
$this->pool->add($connection);
|
|
}
|
|
|
|
$allConnections = $this->pool->all();
|
|
|
|
expect($allConnections)->toBeArray();
|
|
expect(count($allConnections))->toBe(3);
|
|
expect($allConnections[0])->toBeInstanceOf(SseConnection::class);
|
|
});
|
|
});
|