database = $this->createTestDatabase(); $this->workerRegistry = new WorkerRegistry($this->database); $this->loadBalancer = new LoadBalancer($this->workerRegistry); $this->distributionService = new JobDistributionService( $this->database, $this->workerRegistry ); $this->cleanupTestData(); PerformanceTestHelper::warmupDatabase($this->database->getConnection()); } protected function tearDown(): void { $this->cleanupTestData(); } public function testWorkerSelectionLatency(): void { // Create workers with different loads $workers = [ PerformanceTestHelper::createTestWorker('worker_1', 20), PerformanceTestHelper::createTestWorker('worker_2', 15), PerformanceTestHelper::createTestWorker('worker_3', 10), PerformanceTestHelper::createTestWorker('worker_4', 25), PerformanceTestHelper::createTestWorker('worker_5', 30), ]; $this->registerWorkers($workers); $selectionTimes = []; $iterations = 1000; for ($i = 0; $i < $iterations; $i++) { $time = PerformanceTestHelper::measureTime(function () { $this->loadBalancer->selectWorker(new QueueName('test_queue')); }); $selectionTimes[] = $time; } $stats = PerformanceTestHelper::calculateStatistics($selectionTimes); // Validate performance benchmarks $this->assertLessThan(5.0, $stats['avg'], 'Average worker selection time exceeds 5ms'); $this->assertLessThan(10.0, $stats['p95'], 'P95 worker selection time exceeds 10ms'); $this->assertLessThan(20.0, $stats['p99'], 'P99 worker selection time exceeds 20ms'); echo "\nWorker Selection Latency Results:\n"; echo "Iterations: {$iterations}\n"; echo "Performance: " . PerformanceTestHelper::formatStatistics($stats) . "\n"; PerformanceTestHelper::assertPerformance( $selectionTimes, 5.0, 10.0, 'Worker selection' ); } public function testJobDistributionLatency(): void { $workers = $this->createBalancedWorkers(10, 20); $this->registerWorkers($workers); $distributionTimes = []; $iterations = 500; for ($i = 0; $i < $iterations; $i++) { $job = PerformanceTestHelper::createTestJob("dist_job_{$i}"); $time = PerformanceTestHelper::measureTime(function () use ($job) { $this->distributionService->distributeJob($job); }); $distributionTimes[] = $time; } $stats = PerformanceTestHelper::calculateStatistics($distributionTimes); // Validate performance benchmarks $this->assertLessThan(10.0, $stats['avg'], 'Average job distribution time exceeds 10ms'); $this->assertLessThan(20.0, $stats['p95'], 'P95 job distribution time exceeds 20ms'); $this->assertLessThan(50.0, $stats['p99'], 'P99 job distribution time exceeds 50ms'); echo "\nJob Distribution Latency Results:\n"; echo "Iterations: {$iterations}\n"; echo "Performance: " . PerformanceTestHelper::formatStatistics($stats) . "\n"; PerformanceTestHelper::assertPerformance( $distributionTimes, 10.0, 20.0, 'Job distribution' ); } public function testHighThroughputDistribution(): void { $workers = $this->createBalancedWorkers(15, 25); $this->registerWorkers($workers); $jobsPerSecond = 1000; $testDuration = 10; // seconds $totalJobs = $jobsPerSecond * $testDuration; echo "\nHigh Throughput Distribution Test:\n"; echo "Target: {$jobsPerSecond} jobs/second for {$testDuration} seconds\n"; $loadResult = PerformanceTestHelper::simulateLoad( function ($index) { $job = PerformanceTestHelper::createTestJob("load_job_{$index}"); return $this->distributionService->distributeJob($job); }, $totalJobs, 50, // High concurrency $testDuration ); $actualThroughput = $loadResult['throughput_ops_per_sec']; $distributionTimes = array_column($loadResult['results'], 'time_ms'); $stats = PerformanceTestHelper::calculateStatistics($distributionTimes); echo "Actual Throughput: {$actualThroughput} jobs/second\n"; echo "Distribution Performance: " . PerformanceTestHelper::formatStatistics($stats) . "\n"; // Should achieve at least 80% of target throughput $this->assertGreaterThan( $jobsPerSecond * 0.8, $actualThroughput, 'High throughput distribution below 80% of target' ); // Distribution times should remain reasonable under high load $this->assertLessThan(30.0, $stats['avg'], 'Average distribution time too high under load'); $this->assertLessThan(100.0, $stats['p95'], 'P95 distribution time too high under load'); } public function testFairDistributionUnderLoad(): void { $workers = [ PerformanceTestHelper::createTestWorker('worker_1', 10), PerformanceTestHelper::createTestWorker('worker_2', 20), PerformanceTestHelper::createTestWorker('worker_3', 30), PerformanceTestHelper::createTestWorker('worker_4', 15), PerformanceTestHelper::createTestWorker('worker_5', 25), ]; $this->registerWorkers($workers); $jobCount = 1000; $workerAssignments = []; for ($i = 0; $i < $jobCount; $i++) { $job = PerformanceTestHelper::createTestJob("fair_job_{$i}"); $selectedWorker = $this->distributionService->distributeJob($job); if ($selectedWorker) { $workerId = $selectedWorker->id->toString(); $workerAssignments[$workerId] = ($workerAssignments[$workerId] ?? 0) + 1; } } echo "\nFair Distribution Results:\n"; $totalCapacity = array_sum(array_map(fn ($w) => $w->capacity->value, $workers)); foreach ($workers as $worker) { $workerId = $worker->id->toString(); $assignments = $workerAssignments[$workerId] ?? 0; $expectedRatio = $worker->capacity->value / $totalCapacity; $actualRatio = $assignments / $jobCount; $efficiency = ($actualRatio / $expectedRatio) * 100; echo sprintf( "Worker %s: Capacity=%d, Assignments=%d, Expected=%.1f%%, Actual=%.1f%%, Efficiency=%.1f%%\n", $workerId, $worker->capacity->value, $assignments, $expectedRatio * 100, $actualRatio * 100, $efficiency ); // Each worker should get jobs roughly proportional to their capacity // Allow 20% variance for fair distribution $this->assertGreaterThan( 80.0, $efficiency, "Worker {$workerId} received fewer jobs than expected (efficiency: {$efficiency}%)" ); $this->assertLessThan( 120.0, $efficiency, "Worker {$workerId} received more jobs than expected (efficiency: {$efficiency}%)" ); } } public function testMixedCapacityLoadBalancing(): void { // Create workers with very different capacities $workers = [ PerformanceTestHelper::createTestWorker('small_worker', 5), PerformanceTestHelper::createTestWorker('medium_worker_1', 15), PerformanceTestHelper::createTestWorker('medium_worker_2', 20), PerformanceTestHelper::createTestWorker('large_worker', 50), PerformanceTestHelper::createTestWorker('xlarge_worker', 100), ]; $this->registerWorkers($workers); $selectionTimes = []; $iterations = 500; for ($i = 0; $i < $iterations; $i++) { $time = PerformanceTestHelper::measureTime(function () { $this->loadBalancer->selectWorker(new QueueName('test_queue')); }); $selectionTimes[] = $time; } $stats = PerformanceTestHelper::calculateStatistics($selectionTimes); echo "\nMixed Capacity Load Balancing Results:\n"; echo "Worker capacities: 5, 15, 20, 50, 100\n"; echo "Selection performance: " . PerformanceTestHelper::formatStatistics($stats) . "\n"; // Selection should still be fast even with mixed capacities $this->assertLessThan(8.0, $stats['avg'], 'Mixed capacity selection average time too high'); $this->assertLessThan(15.0, $stats['p95'], 'Mixed capacity selection P95 time too high'); } public function testPriorityJobDistribution(): void { $workers = $this->createBalancedWorkers(8, 15); $this->registerWorkers($workers); $priorities = [JobPriority::LOW, JobPriority::NORMAL, JobPriority::HIGH, JobPriority::CRITICAL]; $distributionTimes = []; foreach ($priorities as $priority) { $iterationsPerPriority = 100; for ($i = 0; $i < $iterationsPerPriority; $i++) { $job = PerformanceTestHelper::createTestJob( "priority_job_{$priority->value}_{$i}", $priority ); $time = PerformanceTestHelper::measureTime(function () use ($job) { $this->distributionService->distributeJob($job); }); $distributionTimes[$priority->value][] = $time; } } echo "\nPriority Job Distribution Results:\n"; foreach ($priorities as $priority) { $stats = PerformanceTestHelper::calculateStatistics($distributionTimes[$priority->value]); echo "Priority {$priority->value}: " . PerformanceTestHelper::formatStatistics($stats) . "\n"; // All priorities should have similar distribution performance $this->assertLessThan(15.0, $stats['avg'], "Priority {$priority->value} distribution too slow"); } } public function testWorkerOverloadHandling(): void { // Create workers that will quickly become overloaded $workers = [ PerformanceTestHelper::createTestWorker('worker_1', 2), PerformanceTestHelper::createTestWorker('worker_2', 3), PerformanceTestHelper::createTestWorker('worker_3', 2), ]; $this->registerWorkers($workers); $distributionTimes = []; $successfulDistributions = 0; $failedDistributions = 0; // Try to distribute more jobs than total worker capacity $jobCount = 20; // Total capacity is only 7 for ($i = 0; $i < $jobCount; $i++) { $job = PerformanceTestHelper::createTestJob("overload_job_{$i}"); $result = PerformanceTestHelper::measureTimeWithResult(function () use ($job) { return $this->distributionService->distributeJob($job); }); $distributionTimes[] = $result['time_ms']; if ($result['result'] !== null) { $successfulDistributions++; } else { $failedDistributions++; } } $stats = PerformanceTestHelper::calculateStatistics($distributionTimes); echo "\nWorker Overload Handling Results:\n"; echo "Total jobs: {$jobCount}\n"; echo "Successful distributions: {$successfulDistributions}\n"; echo "Failed distributions: {$failedDistributions}\n"; echo "Distribution performance: " . PerformanceTestHelper::formatStatistics($stats) . "\n"; // Should successfully distribute up to total capacity $this->assertGreaterThanOrEqual(7, $successfulDistributions, 'Should distribute at least 7 jobs'); // Distribution times should remain reasonable even when workers are overloaded $this->assertLessThan(20.0, $stats['avg'], 'Distribution time too high during overload'); } private function createBalancedWorkers(int $count, int $capacity): array { $workers = []; for ($i = 1; $i <= $count; $i++) { $workers[] = PerformanceTestHelper::createTestWorker( "balanced_worker_{$i}", $capacity, WorkerStatus::AVAILABLE ); } return $workers; } private function registerWorkers(array $workers): void { foreach ($workers as $worker) { $this->workerRegistry->registerWorker($worker); } } private function createTestDatabase(): DatabaseManager { $pdo = new \PDO('sqlite::memory:'); $pdo->setAttribute(\PDO::ATTR_ERRMODE, \PDO::ERRMODE_EXCEPTION); // Create required tables with indexes for performance $pdo->exec(' CREATE TABLE workers ( id TEXT PRIMARY KEY, queue_names TEXT NOT NULL, capacity INTEGER NOT NULL, status TEXT NOT NULL, last_heartbeat TEXT NOT NULL, metadata TEXT ) '); $pdo->exec(' CREATE TABLE jobs ( id TEXT PRIMARY KEY, type TEXT NOT NULL, payload TEXT NOT NULL, queue_name TEXT NOT NULL, priority INTEGER NOT NULL, status TEXT NOT NULL, worker_id TEXT, created_at TEXT NOT NULL, started_at TEXT, completed_at TEXT, attempts INTEGER DEFAULT 0, error_message TEXT ) '); // Performance-optimized indexes $pdo->exec('CREATE INDEX idx_workers_status_capacity ON workers(status, capacity DESC)'); $pdo->exec('CREATE INDEX idx_workers_queue_status ON workers(queue_names, status)'); $pdo->exec('CREATE INDEX idx_jobs_worker_status ON jobs(worker_id, status)'); $pdo->exec('CREATE INDEX idx_jobs_priority_created ON jobs(priority DESC, created_at)'); return new DatabaseManager($pdo); } private function cleanupTestData(): void { $pdo = $this->database->getConnection(); $pdo->exec('DELETE FROM workers'); $pdo->exec('DELETE FROM jobs'); } }