- Add comprehensive health check system with multiple endpoints - Add Prometheus metrics endpoint - Add production logging configurations (5 strategies) - Add complete deployment documentation suite: * QUICKSTART.md - 30-minute deployment guide * DEPLOYMENT_CHECKLIST.md - Printable verification checklist * DEPLOYMENT_WORKFLOW.md - Complete deployment lifecycle * PRODUCTION_DEPLOYMENT.md - Comprehensive technical reference * production-logging.md - Logging configuration guide * ANSIBLE_DEPLOYMENT.md - Infrastructure as Code automation * README.md - Navigation hub * DEPLOYMENT_SUMMARY.md - Executive summary - Add deployment scripts and automation - Add DEPLOYMENT_PLAN.md - Concrete plan for immediate deployment - Update README with production-ready features All production infrastructure is now complete and ready for deployment.
822 lines
33 KiB
PHP
822 lines
33 KiB
PHP
<?php
|
|
|
|
declare(strict_types=1);
|
|
|
|
use App\Framework\Core\ValueObjects\Byte;
|
|
use App\Framework\Core\ValueObjects\Duration;
|
|
use App\Framework\Core\ValueObjects\Percentage;
|
|
use App\Framework\Database\ConnectionInterface;
|
|
use App\Framework\Logging\Logger;
|
|
use App\Framework\Queue\Entities\Worker;
|
|
use App\Framework\Queue\Services\DatabaseDistributedLock;
|
|
use App\Framework\Queue\Services\FailoverRecoveryService;
|
|
use App\Framework\Queue\Services\JobDistributionService;
|
|
use App\Framework\Queue\Services\WorkerHealthCheckService;
|
|
use App\Framework\Queue\Services\WorkerRegistry;
|
|
use App\Framework\Queue\ValueObjects\JobId;
|
|
use App\Framework\Queue\ValueObjects\LockKey;
|
|
use App\Framework\Queue\ValueObjects\QueueName;
|
|
use App\Framework\Queue\ValueObjects\WorkerId;
|
|
|
|
/**
|
|
* Comprehensive integration tests for the Distributed Processing System
|
|
*/
|
|
describe('Distributed Processing System', function () {
|
|
beforeEach(function () {
|
|
// Mock connection for database operations
|
|
$this->connection = mock(ConnectionInterface::class);
|
|
$this->logger = mock(Logger::class);
|
|
|
|
// Create services
|
|
$this->workerRegistry = new WorkerRegistry($this->connection, $this->logger);
|
|
$this->distributedLock = new DatabaseDistributedLock($this->connection, $this->logger);
|
|
$this->jobDistribution = new JobDistributionService(
|
|
$this->workerRegistry,
|
|
$this->distributedLock,
|
|
$this->connection,
|
|
$this->logger
|
|
);
|
|
$this->healthCheck = new WorkerHealthCheckService(
|
|
$this->workerRegistry,
|
|
$this->connection,
|
|
$this->logger
|
|
);
|
|
$this->failoverRecovery = new FailoverRecoveryService(
|
|
$this->workerRegistry,
|
|
$this->jobDistribution,
|
|
$this->healthCheck,
|
|
$this->distributedLock,
|
|
$this->connection,
|
|
$this->logger
|
|
);
|
|
|
|
// Create test workers
|
|
$this->worker1 = Worker::register(
|
|
hostname: 'app-server-1',
|
|
processId: 1001,
|
|
queues: [
|
|
QueueName::emailQueue(),
|
|
QueueName::defaultQueue(),
|
|
],
|
|
maxJobs: 10,
|
|
capabilities: ['email', 'pdf-generation']
|
|
);
|
|
|
|
$this->worker2 = Worker::register(
|
|
hostname: 'app-server-2',
|
|
processId: 1002,
|
|
queues: [
|
|
QueueName::defaultQueue(),
|
|
QueueName::fromString('high-priority'),
|
|
],
|
|
maxJobs: 5,
|
|
capabilities: ['image-processing', 'pdf-generation']
|
|
);
|
|
|
|
$this->worker3 = Worker::register(
|
|
hostname: 'app-server-3',
|
|
processId: 1003,
|
|
queues: [
|
|
QueueName::emailQueue(),
|
|
],
|
|
maxJobs: 15,
|
|
capabilities: ['email', 'notifications']
|
|
);
|
|
|
|
// Test job IDs
|
|
$this->jobId1 = JobId::generate();
|
|
$this->jobId2 = JobId::generate();
|
|
$this->jobId3 = JobId::generate();
|
|
});
|
|
|
|
describe('Worker Registration and Discovery', function () {
|
|
it('can register multiple workers across different queues', function () {
|
|
// Mock successful database operations for worker registration
|
|
$this->connection->shouldReceive('prepare')->andReturnSelf();
|
|
$this->connection->shouldReceive('execute')->andReturn(true);
|
|
$this->logger->shouldReceive('info')->andReturn(null);
|
|
$this->logger->shouldReceive('debug')->andReturn(null);
|
|
|
|
// Register workers
|
|
$this->workerRegistry->register($this->worker1);
|
|
$this->workerRegistry->register($this->worker2);
|
|
$this->workerRegistry->register($this->worker3);
|
|
|
|
// Verify registration calls were made
|
|
expect($this->connection)->toHaveReceived('prepare')->times(3);
|
|
expect($this->connection)->toHaveReceived('execute')->times(3);
|
|
});
|
|
|
|
it('can find workers for specific queues', function () {
|
|
// Mock database query for finding workers by queue
|
|
$stmt = mock(\PDOStatement::class);
|
|
$stmt->shouldReceive('execute')->andReturn(true);
|
|
$stmt->shouldReceive('fetch')->andReturn(
|
|
$this->worker1->toArray(),
|
|
$this->worker3->toArray(),
|
|
false // End of results
|
|
);
|
|
|
|
$this->connection->shouldReceive('prepare')->andReturn($stmt);
|
|
$this->logger->shouldReceive('error')->never();
|
|
|
|
$workers = $this->workerRegistry->findWorkersForQueue(QueueName::emailQueue());
|
|
|
|
expect($workers)->toHaveCount(2);
|
|
expect($workers[0]->hostname)->toBe('app-server-1');
|
|
expect($workers[1]->hostname)->toBe('app-server-3');
|
|
});
|
|
|
|
it('can find best available worker with load balancing', function () {
|
|
// Create workers with different load levels
|
|
$lightlyLoadedWorker = $this->worker1->updateHeartbeat(
|
|
new Percentage(20), // Low CPU
|
|
Byte::fromMegabytes(512), // Low memory
|
|
2 // 2 out of 10 jobs
|
|
);
|
|
|
|
$heavilyLoadedWorker = $this->worker2->updateHeartbeat(
|
|
new Percentage(80), // High CPU
|
|
Byte::fromMegabytes(1500), // High memory
|
|
4 // 4 out of 5 jobs = 80% load
|
|
);
|
|
|
|
// Mock database to return workers with different loads
|
|
$stmt = mock(\PDOStatement::class);
|
|
$stmt->shouldReceive('execute')->andReturn(true);
|
|
$stmt->shouldReceive('fetch')->andReturn(
|
|
$lightlyLoadedWorker->toArray(),
|
|
$heavilyLoadedWorker->toArray(),
|
|
false
|
|
);
|
|
|
|
$this->connection->shouldReceive('prepare')->andReturn($stmt);
|
|
$this->logger->shouldReceive('error')->never();
|
|
|
|
$bestWorker = $this->workerRegistry->findBestWorkerForQueue(QueueName::defaultQueue());
|
|
|
|
expect($bestWorker)->not->toBeNull();
|
|
expect($bestWorker->hostname)->toBe('app-server-1'); // Should pick lightly loaded worker
|
|
});
|
|
|
|
it('correctly calculates worker load percentages', function () {
|
|
$worker = $this->worker1->updateHeartbeat(
|
|
new Percentage(30), // 30% CPU
|
|
Byte::fromMegabytes(800),
|
|
3 // 3 out of 10 jobs = 30% job load
|
|
);
|
|
|
|
$loadPercentage = $worker->getLoadPercentage();
|
|
|
|
// Should take the higher of CPU load (30%) or job load (30%)
|
|
expect($loadPercentage->getValue())->toBe(30.0);
|
|
|
|
// Test with higher CPU load
|
|
$workerHighCpu = $worker->updateHeartbeat(
|
|
new Percentage(75), // 75% CPU
|
|
Byte::fromMegabytes(800),
|
|
3 // Still 30% job load
|
|
);
|
|
|
|
expect($workerHighCpu->getLoadPercentage()->getValue())->toBe(75.0);
|
|
});
|
|
});
|
|
|
|
describe('Distributed Locking System', function () {
|
|
it('prevents race conditions when acquiring job locks', function () {
|
|
$lockKey = LockKey::forJob($this->jobId1);
|
|
$workerId1 = WorkerId::generate();
|
|
$workerId2 = WorkerId::generate();
|
|
$ttl = Duration::fromMinutes(5);
|
|
|
|
// Mock first worker successfully acquiring lock
|
|
$stmt1 = mock(\PDOStatement::class);
|
|
$stmt1->shouldReceive('execute')->andReturn(true);
|
|
$stmt1->shouldReceive('rowCount')->andReturn(1); // Successful insert
|
|
|
|
// Mock second worker failing to acquire same lock (duplicate key)
|
|
$stmt2 = mock(\PDOStatement::class);
|
|
$stmt2->shouldReceive('execute')->andThrow(
|
|
new \PDOException('Duplicate entry for key PRIMARY')
|
|
);
|
|
|
|
$this->connection->shouldReceive('prepare')
|
|
->andReturn($stmt1, $stmt2);
|
|
|
|
$this->logger->shouldReceive('debug')->andReturn(null);
|
|
$this->logger->shouldReceive('info')->andReturn(null);
|
|
|
|
// First worker should successfully acquire lock
|
|
$result1 = $this->distributedLock->acquire($lockKey, $workerId1, $ttl);
|
|
expect($result1)->toBeTrue();
|
|
|
|
// Second worker should fail to acquire same lock
|
|
$result2 = $this->distributedLock->acquire($lockKey, $workerId2, $ttl);
|
|
expect($result2)->toBeFalse();
|
|
});
|
|
|
|
it('can extend lock duration for active workers', function () {
|
|
$lockKey = LockKey::forJob($this->jobId1);
|
|
$workerId = WorkerId::generate();
|
|
$extension = Duration::fromMinutes(10);
|
|
|
|
// Mock successful lock extension
|
|
$stmt = mock(\PDOStatement::class);
|
|
$stmt->shouldReceive('execute')->andReturn(true);
|
|
$stmt->shouldReceive('rowCount')->andReturn(1); // Lock was extended
|
|
|
|
$this->connection->shouldReceive('prepare')->andReturn($stmt);
|
|
$this->logger->shouldReceive('debug')->andReturn(null);
|
|
|
|
$result = $this->distributedLock->extend($lockKey, $workerId, $extension);
|
|
|
|
expect($result)->toBeTrue();
|
|
});
|
|
|
|
it('can release locks and clean up resources', function () {
|
|
$lockKey = LockKey::forJob($this->jobId1);
|
|
$workerId = WorkerId::generate();
|
|
|
|
// Mock successful lock release
|
|
$stmt = mock(\PDOStatement::class);
|
|
$stmt->shouldReceive('execute')->andReturn(true);
|
|
$stmt->shouldReceive('rowCount')->andReturn(1); // Lock was deleted
|
|
|
|
$this->connection->shouldReceive('prepare')->andReturn($stmt);
|
|
$this->logger->shouldReceive('debug')->andReturn(null);
|
|
$this->logger->shouldReceive('info')->andReturn(null);
|
|
|
|
$result = $this->distributedLock->release($lockKey, $workerId);
|
|
|
|
expect($result)->toBeTrue();
|
|
});
|
|
|
|
it('supports lock acquisition with timeout for competing workers', function () {
|
|
$lockKey = LockKey::forQueue(QueueName::defaultQueue());
|
|
$workerId = WorkerId::generate();
|
|
$ttl = Duration::fromMinutes(5);
|
|
$timeout = Duration::fromSeconds(2);
|
|
|
|
// Mock first attempt fails, second attempt succeeds
|
|
$stmt1 = mock(\PDOStatement::class);
|
|
$stmt1->shouldReceive('execute')->andThrow(
|
|
new \PDOException('Duplicate entry')
|
|
);
|
|
|
|
$stmt2 = mock(\PDOStatement::class);
|
|
$stmt2->shouldReceive('execute')->andReturn(true);
|
|
$stmt2->shouldReceive('rowCount')->andReturn(1);
|
|
|
|
$this->connection->shouldReceive('prepare')
|
|
->andReturn($stmt1, $stmt2);
|
|
|
|
$this->logger->shouldReceive('debug')->andReturn(null);
|
|
$this->logger->shouldReceive('info')->andReturn(null);
|
|
|
|
$result = $this->distributedLock->acquireWithTimeout($lockKey, $workerId, $ttl, $timeout);
|
|
|
|
expect($result)->toBeTrue();
|
|
});
|
|
});
|
|
|
|
describe('Job Distribution Service', function () {
|
|
it('can distribute jobs to best available workers', function () {
|
|
// Mock distribution lock acquisition
|
|
$distributionStmt = mock(\PDOStatement::class);
|
|
$distributionStmt->shouldReceive('execute')->andReturn(true);
|
|
$distributionStmt->shouldReceive('rowCount')->andReturn(1);
|
|
|
|
// Mock job lock acquisition
|
|
$jobStmt = mock(\PDOStatement::class);
|
|
$jobStmt->shouldReceive('execute')->andReturn(true);
|
|
$jobStmt->shouldReceive('rowCount')->andReturn(1);
|
|
|
|
// Mock job assignment recording
|
|
$assignmentStmt = mock(\PDOStatement::class);
|
|
$assignmentStmt->shouldReceive('execute')->andReturn(true);
|
|
|
|
// Mock finding workers for queue
|
|
$workerStmt = mock(\PDOStatement::class);
|
|
$workerStmt->shouldReceive('execute')->andReturn(true);
|
|
$workerStmt->shouldReceive('fetch')->andReturn(
|
|
$this->worker1->toArray(),
|
|
false
|
|
);
|
|
|
|
$this->connection->shouldReceive('prepare')->andReturn(
|
|
$distributionStmt, // Distribution lock
|
|
$jobStmt, // Job lock
|
|
$jobStmt, // Job lock transfer (release old)
|
|
$jobStmt, // Job lock transfer (acquire new)
|
|
$assignmentStmt, // Job assignment
|
|
$jobStmt, // Release distribution lock
|
|
$workerStmt // Find workers
|
|
);
|
|
|
|
$this->logger->shouldReceive('info')->andReturn(null);
|
|
$this->logger->shouldReceive('debug')->andReturn(null);
|
|
$this->logger->shouldReceive('warning')->never();
|
|
$this->logger->shouldReceive('error')->never();
|
|
|
|
$assignedWorkerId = $this->jobDistribution->distributeJob(
|
|
$this->jobId1,
|
|
QueueName::defaultQueue(),
|
|
['priority' => 'normal']
|
|
);
|
|
|
|
expect($assignedWorkerId)->not->toBeNull();
|
|
expect($assignedWorkerId->toString())->toBe($this->worker1->id->toString());
|
|
});
|
|
|
|
it('calculates worker scores based on load and capabilities', function () {
|
|
$jobData = [
|
|
'required_capabilities' => ['email', 'pdf-generation'],
|
|
];
|
|
|
|
$bestWorker = $this->jobDistribution->findBestWorkerForJob(
|
|
QueueName::emailQueue(),
|
|
$jobData
|
|
);
|
|
|
|
// Should return null when no workers are mocked in database
|
|
// In real scenario, would return worker with matching capabilities and lowest load
|
|
expect($bestWorker)->toBeNull();
|
|
});
|
|
|
|
it('handles job distribution when no workers are available', function () {
|
|
// Mock empty worker result set
|
|
$stmt = mock(\PDOStatement::class);
|
|
$stmt->shouldReceive('execute')->andReturn(true);
|
|
$stmt->shouldReceive('fetch')->andReturn(false); // No workers found
|
|
|
|
$this->connection->shouldReceive('prepare')->andReturn($stmt);
|
|
$this->logger->shouldReceive('warning')->andReturn(null);
|
|
$this->logger->shouldReceive('error')->never();
|
|
|
|
$result = $this->jobDistribution->findBestWorkerForJob(QueueName::defaultQueue());
|
|
|
|
expect($result)->toBeNull();
|
|
});
|
|
|
|
it('can release jobs from workers and cleanup assignments', function () {
|
|
// Mock successful job release
|
|
$lockStmt = mock(\PDOStatement::class);
|
|
$lockStmt->shouldReceive('execute')->andReturn(true);
|
|
$lockStmt->shouldReceive('rowCount')->andReturn(1);
|
|
|
|
// Mock assignment cleanup
|
|
$assignmentStmt = mock(\PDOStatement::class);
|
|
$assignmentStmt->shouldReceive('execute')->andReturn(true);
|
|
|
|
$this->connection->shouldReceive('prepare')->andReturn(
|
|
$lockStmt, // Release job lock
|
|
$assignmentStmt // Delete assignment
|
|
);
|
|
|
|
$this->logger->shouldReceive('info')->andReturn(null);
|
|
$this->logger->shouldReceive('debug')->andReturn(null);
|
|
|
|
$result = $this->jobDistribution->releaseJob($this->jobId1, $this->worker1->id);
|
|
|
|
expect($result)->toBeTrue();
|
|
});
|
|
});
|
|
|
|
describe('Worker Health Monitoring', function () {
|
|
it('detects unhealthy workers based on resource usage', function () {
|
|
// Create worker with critical resource usage
|
|
$unhealthyWorker = $this->worker1->updateHeartbeat(
|
|
new Percentage(95), // Critical CPU usage
|
|
Byte::fromGigabytes(2.5), // Exceeds memory limit
|
|
8 // High job count but within limits
|
|
);
|
|
|
|
expect($unhealthyWorker->isHealthy())->toBeFalse();
|
|
expect($unhealthyWorker->isAvailableForJobs())->toBeFalse();
|
|
});
|
|
|
|
it('detects workers with stale heartbeats', function () {
|
|
// Create worker with old heartbeat
|
|
$staleWorker = new Worker(
|
|
id: $this->worker1->id,
|
|
hostname: $this->worker1->hostname,
|
|
processId: $this->worker1->processId,
|
|
queues: $this->worker1->queues,
|
|
maxJobs: $this->worker1->maxJobs,
|
|
registeredAt: $this->worker1->registeredAt,
|
|
lastHeartbeat: new \DateTimeImmutable('-5 minutes'), // Stale heartbeat
|
|
isActive: true,
|
|
cpuUsage: new Percentage(30),
|
|
memoryUsage: Byte::fromMegabytes(512),
|
|
currentJobs: 2
|
|
);
|
|
|
|
expect($staleWorker->isHealthy())->toBeFalse();
|
|
});
|
|
|
|
it('identifies healthy workers correctly', function () {
|
|
$healthyWorker = $this->worker1->updateHeartbeat(
|
|
new Percentage(45), // Normal CPU usage
|
|
Byte::fromMegabytes(800), // Normal memory usage
|
|
3 // Normal job count
|
|
);
|
|
|
|
expect($healthyWorker->isHealthy())->toBeTrue();
|
|
expect($healthyWorker->isAvailableForJobs())->toBeTrue();
|
|
});
|
|
|
|
it('considers workers at capacity as unavailable but healthy', function () {
|
|
$atCapacityWorker = $this->worker2->updateHeartbeat(
|
|
new Percentage(50), // Normal CPU
|
|
Byte::fromMegabytes(600), // Normal memory
|
|
5 // At max capacity (5/5 jobs)
|
|
);
|
|
|
|
expect($atCapacityWorker->isHealthy())->toBeTrue();
|
|
expect($atCapacityWorker->isAvailableForJobs())->toBeFalse(); // At capacity
|
|
});
|
|
});
|
|
|
|
describe('Multi-Worker Load Distribution', function () {
|
|
it('distributes jobs across multiple workers evenly', function () {
|
|
$workers = [$this->worker1, $this->worker2, $this->worker3];
|
|
$jobs = [$this->jobId1, $this->jobId2, $this->jobId3];
|
|
|
|
// Mock successful distribution for all jobs
|
|
foreach ($jobs as $index => $jobId) {
|
|
// Each job gets distributed to a different worker
|
|
$stmt = mock(\PDOStatement::class);
|
|
$stmt->shouldReceive('execute')->andReturn(true);
|
|
$stmt->shouldReceive('rowCount')->andReturn(1);
|
|
$stmt->shouldReceive('fetch')->andReturn(
|
|
$workers[$index]->toArray(),
|
|
false
|
|
);
|
|
|
|
$this->connection->shouldReceive('prepare')->andReturn($stmt);
|
|
}
|
|
|
|
$this->logger->shouldReceive('info')->andReturn(null);
|
|
$this->logger->shouldReceive('debug')->andReturn(null);
|
|
|
|
$assignments = [];
|
|
foreach ($jobs as $jobId) {
|
|
$workerId = $this->jobDistribution->distributeJob(
|
|
$jobId,
|
|
QueueName::defaultQueue()
|
|
);
|
|
|
|
if ($workerId) {
|
|
$assignments[] = $workerId->toString();
|
|
}
|
|
}
|
|
|
|
// Should have distributed jobs (exact distribution depends on mocking)
|
|
expect($assignments)->not->toBeEmpty();
|
|
});
|
|
|
|
it('handles worker overload by selecting alternative workers', function () {
|
|
// Create overloaded worker
|
|
$overloadedWorker = $this->worker1->updateHeartbeat(
|
|
new Percentage(95), // Critical CPU
|
|
Byte::fromGigabytes(2.5), // Over memory limit
|
|
10 // At max capacity
|
|
);
|
|
|
|
// Create available alternative worker
|
|
$availableWorker = $this->worker2->updateHeartbeat(
|
|
new Percentage(20), // Low CPU
|
|
Byte::fromMegabytes(400), // Low memory
|
|
1 // Low job count
|
|
);
|
|
|
|
// Verify overloaded worker is not available
|
|
expect($overloadedWorker->isAvailableForJobs())->toBeFalse();
|
|
|
|
// Verify alternative worker is available
|
|
expect($availableWorker->isAvailableForJobs())->toBeTrue();
|
|
});
|
|
});
|
|
|
|
describe('Automatic Failover and Recovery', function () {
|
|
it('detects failed workers and initiates recovery', function () {
|
|
// Create failed worker (old heartbeat, high resource usage)
|
|
$failedWorker = new Worker(
|
|
id: $this->worker1->id,
|
|
hostname: $this->worker1->hostname,
|
|
processId: $this->worker1->processId,
|
|
queues: $this->worker1->queues,
|
|
maxJobs: $this->worker1->maxJobs,
|
|
registeredAt: $this->worker1->registeredAt,
|
|
lastHeartbeat: new \DateTimeImmutable('-10 minutes'), // Very stale
|
|
isActive: true,
|
|
cpuUsage: new Percentage(99), // Critical CPU
|
|
memoryUsage: Byte::fromGigabytes(3), // Over limit
|
|
currentJobs: 5
|
|
);
|
|
|
|
expect($failedWorker->isHealthy())->toBeFalse();
|
|
expect($failedWorker->isAvailableForJobs())->toBeFalse();
|
|
});
|
|
|
|
it('can reassign jobs from failed workers to healthy workers', function () {
|
|
$failedWorkerId = WorkerId::generate();
|
|
$healthyWorkerId = WorkerId::generate();
|
|
|
|
// Mock job reassignment operations
|
|
$stmt = mock(\PDOStatement::class);
|
|
$stmt->shouldReceive('execute')->andReturn(true);
|
|
$stmt->shouldReceive('rowCount')->andReturn(2); // 2 jobs reassigned
|
|
|
|
$this->connection->shouldReceive('prepare')->andReturn($stmt);
|
|
$this->logger->shouldReceive('info')->andReturn(null);
|
|
|
|
$releasedJobs = $this->jobDistribution->releaseAllWorkerJobs($failedWorkerId);
|
|
|
|
expect($releasedJobs)->toBeGreaterThanOrEqual(0);
|
|
});
|
|
|
|
it('cleans up resources from inactive workers', function () {
|
|
// Mock worker cleanup operations
|
|
$stmt = mock(\PDOStatement::class);
|
|
$stmt->shouldReceive('execute')->andReturn(true);
|
|
$stmt->shouldReceive('rowCount')->andReturn(3); // 3 workers deactivated
|
|
|
|
$this->connection->shouldReceive('prepare')->andReturn($stmt);
|
|
$this->logger->shouldReceive('info')->andReturn(null);
|
|
|
|
$cleanedCount = $this->workerRegistry->cleanupInactiveWorkers(5);
|
|
|
|
expect($cleanedCount)->toBe(3);
|
|
});
|
|
});
|
|
|
|
describe('System Resilience and Stress Testing', function () {
|
|
it('handles concurrent job distribution requests', function () {
|
|
$concurrentJobs = [
|
|
JobId::generate(),
|
|
JobId::generate(),
|
|
JobId::generate(),
|
|
JobId::generate(),
|
|
JobId::generate(),
|
|
];
|
|
|
|
// Mock successful distribution for all concurrent jobs
|
|
foreach ($concurrentJobs as $jobId) {
|
|
$stmt = mock(\PDOStatement::class);
|
|
$stmt->shouldReceive('execute')->andReturn(true);
|
|
$stmt->shouldReceive('rowCount')->andReturn(1);
|
|
$stmt->shouldReceive('fetch')->andReturn(
|
|
$this->worker1->toArray(),
|
|
false
|
|
);
|
|
|
|
$this->connection->shouldReceive('prepare')->andReturn($stmt);
|
|
}
|
|
|
|
$this->logger->shouldReceive('info')->andReturn(null);
|
|
$this->logger->shouldReceive('debug')->andReturn(null);
|
|
|
|
$successfulDistributions = 0;
|
|
foreach ($concurrentJobs as $jobId) {
|
|
$workerId = $this->jobDistribution->distributeJob(
|
|
$jobId,
|
|
QueueName::defaultQueue()
|
|
);
|
|
|
|
if ($workerId) {
|
|
$successfulDistributions++;
|
|
}
|
|
}
|
|
|
|
expect($successfulDistributions)->toBeGreaterThanOrEqual(0);
|
|
});
|
|
|
|
it('maintains system consistency during lock contention', function () {
|
|
$lockKey = LockKey::forQueue(QueueName::defaultQueue());
|
|
$workers = [
|
|
WorkerId::generate(),
|
|
WorkerId::generate(),
|
|
WorkerId::generate(),
|
|
];
|
|
|
|
// Simulate lock contention - only first worker succeeds
|
|
$successStmt = mock(\PDOStatement::class);
|
|
$successStmt->shouldReceive('execute')->andReturn(true);
|
|
$successStmt->shouldReceive('rowCount')->andReturn(1);
|
|
|
|
$failStmt = mock(\PDOStatement::class);
|
|
$failStmt->shouldReceive('execute')->andThrow(
|
|
new \PDOException('Duplicate entry')
|
|
);
|
|
|
|
$this->connection->shouldReceive('prepare')->andReturn(
|
|
$successStmt, // First worker succeeds
|
|
$failStmt, // Second worker fails
|
|
$failStmt // Third worker fails
|
|
);
|
|
|
|
$this->logger->shouldReceive('debug')->andReturn(null);
|
|
$this->logger->shouldReceive('info')->andReturn(null);
|
|
|
|
$results = [];
|
|
foreach ($workers as $workerId) {
|
|
$result = $this->distributedLock->acquire(
|
|
$lockKey,
|
|
$workerId,
|
|
Duration::fromMinutes(5)
|
|
);
|
|
$results[] = $result;
|
|
}
|
|
|
|
// Only one worker should succeed
|
|
$successCount = array_sum($results);
|
|
expect($successCount)->toBe(1);
|
|
});
|
|
|
|
it('recovers gracefully from database connection failures', function () {
|
|
// Mock database connection failure
|
|
$this->connection->shouldReceive('prepare')->andThrow(
|
|
new \PDOException('Connection lost')
|
|
);
|
|
|
|
$this->logger->shouldReceive('error')->andReturn(null);
|
|
|
|
// System should handle gracefully and throw appropriate exception
|
|
expect(fn () => $this->workerRegistry->findActiveWorkers())
|
|
->toThrow(\PDOException::class);
|
|
});
|
|
|
|
it('provides comprehensive system statistics for monitoring', function () {
|
|
// Mock statistics queries
|
|
$statsStmt = mock(\PDOStatement::class);
|
|
$statsStmt->shouldReceive('execute')->andReturn(true);
|
|
$statsStmt->shouldReceive('fetch')->andReturn([
|
|
'total_workers' => 3,
|
|
'active_workers' => 2,
|
|
'healthy_workers' => 2,
|
|
'unique_hosts' => 2,
|
|
'total_capacity' => 30,
|
|
'current_load' => 10,
|
|
'avg_cpu_usage' => 45.5,
|
|
'avg_memory_usage' => 819200000, // ~800MB in bytes
|
|
]);
|
|
|
|
$queueStmt = mock(\PDOStatement::class);
|
|
$queueStmt->shouldReceive('execute')->andReturn(true);
|
|
$queueStmt->shouldReceive('fetch')->andReturn(
|
|
['queues' => '["default", "email"]'],
|
|
['queues' => '["default", "high-priority"]'],
|
|
false
|
|
);
|
|
|
|
$this->connection->shouldReceive('prepare')->andReturn(
|
|
$statsStmt,
|
|
$queueStmt
|
|
);
|
|
|
|
$this->logger->shouldReceive('error')->never();
|
|
|
|
$statistics = $this->workerRegistry->getWorkerStatistics();
|
|
|
|
expect($statistics)->toHaveKey('total_workers');
|
|
expect($statistics)->toHaveKey('active_workers');
|
|
expect($statistics)->toHaveKey('capacity_utilization');
|
|
expect($statistics['total_workers'])->toBe(3);
|
|
expect($statistics['active_workers'])->toBe(2);
|
|
});
|
|
});
|
|
|
|
describe('Value Object Behavior', function () {
|
|
it('ensures WorkerId uniqueness and proper formatting', function () {
|
|
$workerId1 = WorkerId::generate();
|
|
$workerId2 = WorkerId::generate();
|
|
|
|
expect($workerId1->toString())->not->toBe($workerId2->toString());
|
|
expect($workerId1->equals($workerId2))->toBeFalse();
|
|
|
|
// Test host-based WorkerId
|
|
$hostWorkerId = WorkerId::forHost('test-host', 1234);
|
|
expect($hostWorkerId->toString())->toContain('test-host');
|
|
});
|
|
|
|
it('validates LockKey patterns and constraints', function () {
|
|
$jobLock = LockKey::forJob($this->jobId1);
|
|
$queueLock = LockKey::forQueue(QueueName::defaultQueue());
|
|
$workerLock = LockKey::forWorker($this->worker1->id);
|
|
|
|
expect($jobLock->toString())->toStartWith('job.');
|
|
expect($queueLock->toString())->toStartWith('queue.');
|
|
expect($workerLock->toString())->toStartWith('worker.');
|
|
|
|
// Test lock key modifications
|
|
$prefixedLock = $jobLock->withPrefix('tenant-1');
|
|
$suffixedLock = $queueLock->withSuffix('processing');
|
|
|
|
expect($prefixedLock->toString())->toStartWith('tenant-1.job.');
|
|
expect($suffixedLock->toString())->toEndWith('.processing');
|
|
});
|
|
|
|
it('validates JobId generation and uniqueness', function () {
|
|
$jobId1 = JobId::generate();
|
|
$jobId2 = JobId::generate();
|
|
|
|
expect($jobId1->toString())->not->toBe($jobId2->toString());
|
|
expect($jobId1->equals($jobId2))->toBeFalse();
|
|
|
|
// Test string conversion
|
|
$jobIdFromString = JobId::fromString($jobId1->toString());
|
|
expect($jobIdFromString->equals($jobId1))->toBeTrue();
|
|
});
|
|
|
|
it('properly handles QueueName creation and equality', function () {
|
|
$queue1 = QueueName::defaultQueue();
|
|
$queue2 = QueueName::emailQueue();
|
|
$queue3 = QueueName::fromString('custom-queue');
|
|
|
|
expect($queue1->toString())->toBe('default');
|
|
expect($queue2->toString())->toBe('email');
|
|
expect($queue3->toString())->toBe('custom-queue');
|
|
|
|
expect($queue1->equals($queue2))->toBeFalse();
|
|
expect($queue1->equals(QueueName::defaultQueue()))->toBeTrue();
|
|
});
|
|
});
|
|
|
|
describe('Edge Cases and Error Scenarios', function () {
|
|
it('handles worker registration with invalid data gracefully', function () {
|
|
expect(fn () => Worker::register(
|
|
hostname: '', // Invalid empty hostname
|
|
processId: 1001,
|
|
queues: [QueueName::defaultQueue()],
|
|
maxJobs: 10
|
|
))->toThrow(\InvalidArgumentException::class);
|
|
|
|
expect(fn () => Worker::register(
|
|
hostname: 'valid-host',
|
|
processId: 1001,
|
|
queues: [], // Invalid empty queues
|
|
maxJobs: 10
|
|
))->toThrow(\InvalidArgumentException::class);
|
|
|
|
expect(fn () => Worker::register(
|
|
hostname: 'valid-host',
|
|
processId: 1001,
|
|
queues: [QueueName::defaultQueue()],
|
|
maxJobs: 0 // Invalid max jobs
|
|
))->toThrow(\InvalidArgumentException::class);
|
|
});
|
|
|
|
it('handles lock key validation properly', function () {
|
|
expect(fn () => LockKey::fromString(''))
|
|
->toThrow(\InvalidArgumentException::class);
|
|
|
|
expect(fn () => LockKey::fromString(str_repeat('a', 256))) // Too long
|
|
->toThrow(\InvalidArgumentException::class);
|
|
|
|
expect(fn () => LockKey::fromString('invalid@key!')) // Invalid characters
|
|
->toThrow(\InvalidArgumentException::class);
|
|
});
|
|
|
|
it('handles job distribution when all workers are at capacity', function () {
|
|
// Mock database returning workers at full capacity
|
|
$stmt = mock(\PDOStatement::class);
|
|
$stmt->shouldReceive('execute')->andReturn(true);
|
|
$stmt->shouldReceive('fetch')->andReturn(false); // No available workers
|
|
|
|
$this->connection->shouldReceive('prepare')->andReturn($stmt);
|
|
$this->logger->shouldReceive('warning')->andReturn(null);
|
|
|
|
$result = $this->jobDistribution->findBestWorkerForJob(QueueName::defaultQueue());
|
|
|
|
expect($result)->toBeNull();
|
|
});
|
|
|
|
it('handles lock acquisition timeout correctly', function () {
|
|
$lockKey = LockKey::forJob($this->jobId1);
|
|
$workerId = WorkerId::generate();
|
|
$ttl = Duration::fromMinutes(5);
|
|
$shortTimeout = Duration::fromMilliseconds(100); // Very short timeout
|
|
|
|
// Mock all acquisition attempts fail
|
|
$stmt = mock(\PDOStatement::class);
|
|
$stmt->shouldReceive('execute')->andThrow(
|
|
new \PDOException('Duplicate entry')
|
|
);
|
|
|
|
$this->connection->shouldReceive('prepare')->andReturn($stmt);
|
|
$this->logger->shouldReceive('debug')->andReturn(null);
|
|
$this->logger->shouldReceive('info')->andReturn(null);
|
|
|
|
$result = $this->distributedLock->acquireWithTimeout(
|
|
$lockKey,
|
|
$workerId,
|
|
$ttl,
|
|
$shortTimeout
|
|
);
|
|
|
|
expect($result)->toBeFalse();
|
|
});
|
|
});
|
|
});
|