docs: consolidate documentation into organized structure

- Move 12 markdown files from root to docs/ subdirectories
- Organize documentation by category:
  • docs/troubleshooting/ (1 file)  - Technical troubleshooting guides
  • docs/deployment/      (4 files) - Deployment and security documentation
  • docs/guides/          (3 files) - Feature-specific guides
  • docs/planning/        (4 files) - Planning and improvement proposals

Root directory cleanup:
- Reduced from 16 to 4 markdown files in root
- Only essential project files remain:
  • CLAUDE.md (AI instructions)
  • README.md (Main project readme)
  • CLEANUP_PLAN.md (Current cleanup plan)
  • SRC_STRUCTURE_IMPROVEMENTS.md (Structure improvements)

This improves:
 Documentation discoverability
 Logical organization by purpose
 Clean root directory
 Better maintainability
This commit is contained in:
2025-10-05 11:05:04 +02:00
parent 887847dde6
commit 5050c7d73a
36686 changed files with 196456 additions and 12398919 deletions

View File

@@ -0,0 +1,372 @@
<?php
declare(strict_types=1);
use App\Framework\Queue\Contracts\QueueInterface;
use App\Framework\Queue\Contracts\JobDependencyManagerInterface;
use App\Framework\Queue\Contracts\JobChainManagerInterface;
use App\Framework\Queue\Services\JobMetricsManager;
use App\Framework\Queue\Services\DependencyResolutionEngine;
use App\Framework\Queue\Services\JobChainExecutionCoordinator;
use App\Framework\Queue\ValueObjects\JobDependency;
use App\Framework\Queue\ValueObjects\JobChain;
use App\Framework\Queue\ValueObjects\JobMetrics;
use App\Framework\Queue\ValueObjects\ChainExecutionMode;
use App\Framework\Queue\Entities\JobProgressEntry;
use App\Framework\Queue\Entities\JobProgressStep;
use App\Framework\Database\EntityManagerInterface;
use App\Framework\Logging\Logger;
use App\Framework\Core\Application;
use App\Framework\DI\Container;
use App\Framework\Core\ValueObjects\Percentage;
beforeEach(function () {
// Set up test container
$this->container = createTestContainer();
// Get services from container
$this->queue = $this->container->get(QueueInterface::class);
$this->dependencyManager = $this->container->get(JobDependencyManagerInterface::class);
$this->chainManager = $this->container->get(JobChainManagerInterface::class);
$this->metricsManager = $this->container->get(JobMetricsManager::class);
$this->resolutionEngine = $this->container->get(DependencyResolutionEngine::class);
$this->chainCoordinator = $this->container->get(JobChainExecutionCoordinator::class);
$this->entityManager = $this->container->get(EntityManagerInterface::class);
$this->logger = $this->container->get(Logger::class);
});
function createTestJob(string $id, string $data): object
{
return new class($id, $data) {
public function __construct(
public readonly string $id,
public readonly string $data
) {}
};
}
test('complete queue workflow with dependencies and metrics', function () {
// 1. Create test jobs
$job1 = createTestJob('job-1', 'Test Job 1');
$job2 = createTestJob('job-2', 'Test Job 2');
$job3 = createTestJob('job-3', 'Test Job 3');
// 2. Set up dependencies: job2 depends on job1, job3 depends on job2
$dependency1 = JobDependency::completion('job-2', 'job-1');
$dependency2 = JobDependency::success('job-3', 'job-2');
// Add dependencies
$this->dependencyManager->addDependency($dependency1);
$this->dependencyManager->addDependency($dependency2);
// 3. Add jobs to queue
$this->queue->push($job1);
$this->queue->push($job2);
$this->queue->push($job3);
// 4. Create and record metrics for job execution
$job1Metrics = new JobMetrics(
jobId: 'job-1',
queueName: 'default',
status: 'completed',
attempts: 1,
maxAttempts: 3,
executionTimeMs: 150.5,
memoryUsageBytes: 1024 * 1024,
errorMessage: null,
createdAt: date('Y-m-d H:i:s'),
startedAt: date('Y-m-d H:i:s'),
completedAt: date('Y-m-d H:i:s'),
failedAt: null,
metadata: ['test' => true]
);
$this->metricsManager->recordJobMetrics($job1Metrics);
// 5. Test dependency resolution
$readyJobs = $this->resolutionEngine->getJobsReadyForExecution();
expect($readyJobs)->toHaveCount(1)
->and($readyJobs[0])->toBe('job-1');
// 6. Mark job1 as completed and check dependencies
$this->dependencyManager->markJobCompleted('job-1');
$readyJobsAfterJob1 = $this->resolutionEngine->getJobsReadyForExecution();
expect($readyJobsAfterJob1)->toContain('job-2');
// 7. Test metrics retrieval
$retrievedMetrics = $this->metricsManager->getJobMetrics('job-1');
expect($retrievedMetrics)->not()->toBeNull()
->and($retrievedMetrics->jobId)->toBe('job-1')
->and($retrievedMetrics->status)->toBe('completed')
->and($retrievedMetrics->executionTimeMs)->toBe(150.5);
// 8. Test queue metrics calculation
$queueMetrics = $this->metricsManager->getQueueMetrics('default', '1 hour');
expect($queueMetrics->queueName)->toBe('default')
->and($queueMetrics->totalJobs)->toBeGreaterThan(0);
});
test('job chain execution with sequential mode', function () {
// 1. Create jobs for chain
$jobs = [
createTestJob('chain-job-1', 'Chain Job 1'),
createTestJob('chain-job-2', 'Chain Job 2'),
createTestJob('chain-job-3', 'Chain Job 3')
];
// 2. Create job chain
$chain = JobChain::sequential('test-chain', ['chain-job-1', 'chain-job-2', 'chain-job-3']);
// 3. Add chain to manager
$this->chainManager->createChain($chain);
// 4. Execute chain
$this->chainCoordinator->executeChain('test-chain');
// 5. Verify chain was created
$retrievedChain = $this->chainManager->getChain('test-chain');
expect($retrievedChain)->not()->toBeNull()
->and($retrievedChain->name)->toBe('test-chain')
->and($retrievedChain->executionMode)->toBe(ChainExecutionMode::SEQUENTIAL)
->and($retrievedChain->jobIds)->toHaveCount(3);
});
test('job chain failure handling', function () {
// 1. Create jobs for chain with one that will fail
$jobs = [
createTestJob('fail-job-1', 'Job 1'),
createTestJob('fail-job-2', 'Job 2 (will fail)'),
createTestJob('fail-job-3', 'Job 3')
];
// 2. Create job chain with stop on failure
$chain = JobChain::sequential('fail-chain', ['fail-job-1', 'fail-job-2', 'fail-job-3']);
$this->chainManager->createChain($chain);
// 3. Simulate job failure
$failureMetrics = new JobMetrics(
jobId: 'fail-job-2',
queueName: 'default',
status: 'failed',
attempts: 3,
maxAttempts: 3,
executionTimeMs: 50.0,
memoryUsageBytes: 512 * 1024,
errorMessage: 'Simulated failure',
createdAt: date('Y-m-d H:i:s'),
startedAt: date('Y-m-d H:i:s'),
completedAt: null,
failedAt: date('Y-m-d H:i:s'),
metadata: []
);
$this->metricsManager->recordJobMetrics($failureMetrics);
// 4. Test failure detection
$failedJobs = $this->metricsManager->getFailedJobs('default', '1 hour');
expect($failedJobs)->toHaveCount(1)
->and($failedJobs[0]->jobId)->toBe('fail-job-2')
->and($failedJobs[0]->status)->toBe('failed');
});
test('circular dependency detection', function () {
// 1. Create circular dependencies: A depends on B, B depends on C, C depends on A
$depA = JobDependency::completion('job-a', 'job-b');
$depB = JobDependency::completion('job-b', 'job-c');
$depC = JobDependency::completion('job-c', 'job-a');
// 2. Add dependencies
$this->dependencyManager->addDependency($depA);
$this->dependencyManager->addDependency($depB);
// 3. Adding the third dependency should throw an exception or be handled
expect(fn() => $this->dependencyManager->addDependency($depC))
->toThrow(\InvalidArgumentException::class);
});
test('conditional dependencies', function () {
// 1. Create jobs
$job1 = createTestJob('cond-job-1', 'Conditional Job 1');
$job2 = createTestJob('cond-job-2', 'Conditional Job 2');
// 2. Create success-based dependency
$successDep = JobDependency::success('cond-job-2', 'cond-job-1');
$this->dependencyManager->addDependency($successDep);
// 3. Test that job2 is not ready when job1 failed
$failureMetrics = new JobMetrics(
jobId: 'cond-job-1',
queueName: 'default',
status: 'failed',
attempts: 3,
maxAttempts: 3,
executionTimeMs: 100.0,
memoryUsageBytes: 1024,
errorMessage: 'Test failure',
createdAt: date('Y-m-d H:i:s'),
startedAt: date('Y-m-d H:i:s'),
completedAt: null,
failedAt: date('Y-m-d H:i:s'),
metadata: []
);
$this->metricsManager->recordJobMetrics($failureMetrics);
// 4. Check that dependent job is not ready
$readyJobs = $this->resolutionEngine->getJobsReadyForExecution();
expect($readyJobs)->not()->toContain('cond-job-2');
});
test('queue metrics calculation', function () {
// 1. Create multiple job metrics
$metrics = [
new JobMetrics(
jobId: 'metric-job-1',
queueName: 'metrics-queue',
status: 'completed',
attempts: 1,
maxAttempts: 3,
executionTimeMs: 100.0,
memoryUsageBytes: 1024 * 1024,
errorMessage: null,
createdAt: date('Y-m-d H:i:s'),
startedAt: date('Y-m-d H:i:s'),
completedAt: date('Y-m-d H:i:s'),
failedAt: null,
metadata: []
),
new JobMetrics(
jobId: 'metric-job-2',
queueName: 'metrics-queue',
status: 'completed',
attempts: 1,
maxAttempts: 3,
executionTimeMs: 200.0,
memoryUsageBytes: 2 * 1024 * 1024,
errorMessage: null,
createdAt: date('Y-m-d H:i:s'),
startedAt: date('Y-m-d H:i:s'),
completedAt: date('Y-m-d H:i:s'),
failedAt: null,
metadata: []
),
new JobMetrics(
jobId: 'metric-job-3',
queueName: 'metrics-queue',
status: 'failed',
attempts: 3,
maxAttempts: 3,
executionTimeMs: 50.0,
memoryUsageBytes: 512 * 1024,
errorMessage: 'Test failure',
createdAt: date('Y-m-d H:i:s'),
startedAt: date('Y-m-d H:i:s'),
completedAt: null,
failedAt: date('Y-m-d H:i:s'),
metadata: []
)
];
// 2. Record all metrics
foreach ($metrics as $metric) {
$this->metricsManager->recordJobMetrics($metric);
}
// 3. Calculate queue metrics
$queueMetrics = $this->metricsManager->getQueueMetrics('metrics-queue', '1 hour');
// 4. Verify calculations
expect($queueMetrics->queueName)->toBe('metrics-queue')
->and($queueMetrics->totalJobs)->toBe(3)
->and($queueMetrics->completedJobs)->toBe(2)
->and($queueMetrics->failedJobs)->toBe(1)
->and($queueMetrics->successRate->toFloat())->toBe(66.67);
});
test('dead letter queue functionality', function () {
// 1. Create a job that exceeds max attempts
$deadLetterMetrics = new JobMetrics(
jobId: 'dead-letter-job',
queueName: 'default',
status: 'failed',
attempts: 3,
maxAttempts: 3,
executionTimeMs: 25.0,
memoryUsageBytes: 256 * 1024,
errorMessage: 'Max attempts exceeded',
createdAt: date('Y-m-d H:i:s'),
startedAt: date('Y-m-d H:i:s'),
completedAt: null,
failedAt: date('Y-m-d H:i:s'),
metadata: ['dead_letter' => true]
);
// 2. Record metrics
$this->metricsManager->recordJobMetrics($deadLetterMetrics);
// 3. Verify dead letter detection
$failedJobs = $this->metricsManager->getFailedJobs('default', '1 hour');
$deadLetterJob = array_filter($failedJobs, fn($job) => $job->jobId === 'dead-letter-job')[0] ?? null;
expect($deadLetterJob)->not()->toBeNull()
->and($deadLetterJob->attempts)->toBe(3)
->and($deadLetterJob->maxAttempts)->toBe(3)
->and($deadLetterJob->status)->toBe('failed');
});
test('system health monitoring', function () {
// 1. Create mixed job metrics for health calculation
$healthMetrics = [
// Healthy jobs
new JobMetrics('health-1', 'health-queue', 'completed', 1, 3, 50.0, 1024, null, date('Y-m-d H:i:s'), date('Y-m-d H:i:s'), date('Y-m-d H:i:s'), null, []),
new JobMetrics('health-2', 'health-queue', 'completed', 1, 3, 75.0, 1024, null, date('Y-m-d H:i:s'), date('Y-m-d H:i:s'), date('Y-m-d H:i:s'), null, []),
new JobMetrics('health-3', 'health-queue', 'completed', 1, 3, 100.0, 1024, null, date('Y-m-d H:i:s'), date('Y-m-d H:i:s'), date('Y-m-d H:i:s'), null, []),
// One failed job
new JobMetrics('health-4', 'health-queue', 'failed', 2, 3, 25.0, 1024, 'Health test failure', date('Y-m-d H:i:s'), date('Y-m-d H:i:s'), null, date('Y-m-d H:i:s'), [])
];
// 2. Record all metrics
foreach ($healthMetrics as $metric) {
$this->metricsManager->recordJobMetrics($metric);
}
// 3. Get system overview
$overview = $this->metricsManager->getSystemOverview();
// 4. Verify system health calculation
expect($overview)->toHaveKey('system_health_score')
->and($overview['total_jobs'])->toBeGreaterThan(0)
->and($overview['overall_success_rate'])->toBeGreaterThan(0);
});
test('performance and throughput metrics', function () {
// 1. Create performance test metrics with varying execution times
$performanceMetrics = [
new JobMetrics('perf-1', 'perf-queue', 'completed', 1, 3, 50.0, 1024 * 1024, null, date('Y-m-d H:i:s'), date('Y-m-d H:i:s'), date('Y-m-d H:i:s'), null, []),
new JobMetrics('perf-2', 'perf-queue', 'completed', 1, 3, 150.0, 2 * 1024 * 1024, null, date('Y-m-d H:i:s'), date('Y-m-d H:i:s'), date('Y-m-d H:i:s'), null, []),
new JobMetrics('perf-3', 'perf-queue', 'completed', 1, 3, 300.0, 4 * 1024 * 1024, null, date('Y-m-d H:i:s'), date('Y-m-d H:i:s'), date('Y-m-d H:i:s'), null, [])
];
// 2. Record performance metrics
foreach ($performanceMetrics as $metric) {
$this->metricsManager->recordJobMetrics($metric);
}
// 3. Get performance statistics
$performanceStats = $this->metricsManager->getPerformanceStats('perf-queue', '1 hour');
// 4. Verify performance calculations
expect($performanceStats)->toHaveKey('average_execution_time_ms')
->and($performanceStats['average_execution_time_ms'])->toBe(166.67)
->and($performanceStats)->toHaveKey('average_memory_usage_mb')
->and($performanceStats['total_jobs'])->toBe(3);
// 5. Get throughput statistics
$throughputStats = $this->metricsManager->getThroughputStats('perf-queue', '1 hour');
// 6. Verify throughput calculations
expect($throughputStats)->toHaveKey('total_completed')
->and($throughputStats['total_completed'])->toBe(3)
->and($throughputStats)->toHaveKey('average_throughput_per_hour');
});

View File

@@ -0,0 +1,583 @@
<?php
declare(strict_types=1);
use App\Framework\Queue\Services\WorkerRegistry;
use App\Framework\Queue\Services\DatabaseDistributedLock;
use App\Framework\Queue\Services\JobDistributionService;
use App\Framework\Queue\Services\WorkerHealthCheckService;
use App\Framework\Queue\Services\FailoverRecoveryService;
use App\Framework\Queue\Entities\Worker;
use App\Framework\Queue\ValueObjects\WorkerId;
use App\Framework\Queue\ValueObjects\JobId;
use App\Framework\Queue\ValueObjects\LockKey;
use App\Framework\Queue\ValueObjects\QueueName;
use App\Framework\Core\ValueObjects\Percentage;
use App\Framework\Core\ValueObjects\Byte;
use App\Framework\Core\ValueObjects\Duration;
use App\Framework\Database\ConnectionInterface;
use App\Framework\Logging\Logger;
/**
* Real-world scenario tests for the Distributed Processing System
* These tests simulate complex real-world scenarios to validate system behavior
*/
describe('Distributed Processing Real-World Scenarios', function () {
beforeEach(function () {
// Mock connection and logger
$this->connection = mock(ConnectionInterface::class);
$this->logger = mock(Logger::class);
// Setup services
$this->workerRegistry = new WorkerRegistry($this->connection, $this->logger);
$this->distributedLock = new DatabaseDistributedLock($this->connection, $this->logger);
$this->jobDistribution = new JobDistributionService(
$this->workerRegistry,
$this->distributedLock,
$this->connection,
$this->logger
);
// Mock default logger behavior
$this->logger->shouldReceive('info')->andReturn(null);
$this->logger->shouldReceive('debug')->andReturn(null);
$this->logger->shouldReceive('warning')->andReturn(null);
$this->logger->shouldReceive('error')->andReturn(null);
});
describe('E-commerce Order Processing Scenario', function () {
it('handles peak shopping season with multiple worker types', function () {
// Scenario: Black Friday traffic with specialized workers
// Create specialized workers for different tasks
$emailWorkers = [
Worker::register(
hostname: 'email-server-1',
processId: 2001,
queues: [QueueName::emailQueue()],
maxJobs: 20,
capabilities: ['email', 'newsletter', 'notifications']
),
Worker::register(
hostname: 'email-server-2',
processId: 2002,
queues: [QueueName::emailQueue()],
maxJobs: 20,
capabilities: ['email', 'newsletter', 'notifications']
)
];
$imageWorkers = [
Worker::register(
hostname: 'image-server-1',
processId: 3001,
queues: [QueueName::fromString('image-processing')],
maxJobs: 5, // Resource intensive
capabilities: ['image-resize', 'thumbnail', 'watermark']
)
];
$generalWorkers = [
Worker::register(
hostname: 'app-server-1',
processId: 1001,
queues: [
QueueName::defaultQueue(),
QueueName::fromString('reports')
],
maxJobs: 15,
capabilities: ['pdf-generation', 'reporting', 'exports']
),
Worker::register(
hostname: 'app-server-2',
processId: 1002,
queues: [
QueueName::defaultQueue(),
QueueName::fromString('reports')
],
maxJobs: 15,
capabilities: ['pdf-generation', 'reporting', 'exports']
)
];
$allWorkers = array_merge($emailWorkers, $imageWorkers, $generalWorkers);
// Mock worker registration
foreach ($allWorkers as $worker) {
$stmt = mock(\PDOStatement::class);
$stmt->shouldReceive('execute')->andReturn(true);
$this->connection->shouldReceive('prepare')->andReturn($stmt);
$this->workerRegistry->register($worker);
}
// Simulate different job types being distributed
$jobs = [
// Email jobs (high volume, low resource)
['id' => JobId::generate(), 'queue' => QueueName::emailQueue(), 'type' => 'order-confirmation'],
['id' => JobId::generate(), 'queue' => QueueName::emailQueue(), 'type' => 'shipping-notification'],
['id' => JobId::generate(), 'queue' => QueueName::emailQueue(), 'type' => 'newsletter'],
// Image processing jobs (low volume, high resource)
['id' => JobId::generate(), 'queue' => QueueName::fromString('image-processing'), 'type' => 'product-thumbnails'],
// General processing jobs
['id' => JobId::generate(), 'queue' => QueueName::defaultQueue(), 'type' => 'invoice-generation'],
['id' => JobId::generate(), 'queue' => QueueName::fromString('reports'), 'type' => 'sales-report']
];
// Mock job distribution
foreach ($jobs as $job) {
// Mock finding workers for queue
$workerStmt = mock(\PDOStatement::class);
$workerStmt->shouldReceive('execute')->andReturn(true);
$workerStmt->shouldReceive('fetch')->andReturn(false); // Simplified for test
// Mock lock operations
$lockStmt = mock(\PDOStatement::class);
$lockStmt->shouldReceive('execute')->andReturn(true);
$lockStmt->shouldReceive('rowCount')->andReturn(1);
$this->connection->shouldReceive('prepare')->andReturn($workerStmt, $lockStmt);
$assignedWorker = $this->jobDistribution->findBestWorkerForJob($job['queue']);
// In real scenario, would validate worker assignment logic
}
// Verify system can handle the load
expect(count($allWorkers))->toBe(5);
expect(count($jobs))->toBe(6);
});
it('handles worker failure during peak traffic gracefully', function () {
// Scenario: Worker crashes during high load, jobs need redistribution
$healthyWorker = Worker::register(
hostname: 'stable-server',
processId: 1001,
queues: [QueueName::defaultQueue()],
maxJobs: 10,
capabilities: ['email', 'pdf-generation']
);
$failingWorker = Worker::register(
hostname: 'failing-server',
processId: 1002,
queues: [QueueName::defaultQueue()],
maxJobs: 10,
capabilities: ['email', 'pdf-generation']
);
// Simulate worker failure (stale heartbeat, high resource usage)
$failedWorker = new Worker(
id: $failingWorker->id,
hostname: $failingWorker->hostname,
processId: $failingWorker->processId,
queues: $failingWorker->queues,
maxJobs: $failingWorker->maxJobs,
registeredAt: $failingWorker->registeredAt,
lastHeartbeat: new \DateTimeImmutable('-10 minutes'), // Stale
isActive: false, // Marked as failed
cpuUsage: new Percentage(99), // Critical
memoryUsage: Byte::fromGigabytes(4), // Over limit
currentJobs: 5
);
// Verify failure detection
expect($failedWorker->isHealthy())->toBeFalse();
expect($failedWorker->isAvailableForJobs())->toBeFalse();
// Verify healthy worker is still available
expect($healthyWorker->isHealthy())->toBeTrue();
expect($healthyWorker->isAvailableForJobs())->toBeTrue();
// Mock job redistribution
$stmt = mock(\PDOStatement::class);
$stmt->shouldReceive('execute')->andReturn(true);
$stmt->shouldReceive('rowCount')->andReturn(3); // 3 jobs released
$this->connection->shouldReceive('prepare')->andReturn($stmt);
$releasedJobs = $this->jobDistribution->releaseAllWorkerJobs($failedWorker->id);
expect($releasedJobs)->toBe(3);
});
});
describe('Media Processing Pipeline Scenario', function () {
it('handles resource-intensive media processing with proper load balancing', function () {
// Scenario: Video streaming service processing uploads
// GPU-enabled workers for video processing
$videoWorkers = [
Worker::register(
hostname: 'gpu-server-1',
processId: 4001,
queues: [QueueName::fromString('video-processing')],
maxJobs: 2, // Very resource intensive
capabilities: ['video-encode', 'gpu-acceleration', 'h264', 'h265']
),
Worker::register(
hostname: 'gpu-server-2',
processId: 4002,
queues: [QueueName::fromString('video-processing')],
maxJobs: 2,
capabilities: ['video-encode', 'gpu-acceleration', 'h264', 'h265']
)
];
// CPU workers for audio processing
$audioWorkers = [
Worker::register(
hostname: 'audio-server-1',
processId: 5001,
queues: [QueueName::fromString('audio-processing')],
maxJobs: 8,
capabilities: ['audio-encode', 'mp3', 'aac', 'flac']
)
];
// Thumbnail generation workers
$thumbnailWorkers = [
Worker::register(
hostname: 'image-server-1',
processId: 6001,
queues: [QueueName::fromString('thumbnail-generation')],
maxJobs: 10,
capabilities: ['image-resize', 'ffmpeg', 'thumbnail']
),
Worker::register(
hostname: 'image-server-2',
processId: 6002,
queues: [QueueName::fromString('thumbnail-generation')],
maxJobs: 10,
capabilities: ['image-resize', 'ffmpeg', 'thumbnail']
)
];
$allWorkers = array_merge($videoWorkers, $audioWorkers, $thumbnailWorkers);
// Simulate different resource usage patterns
$videoWorkerUnderLoad = $videoWorkers[0]->updateHeartbeat(
new Percentage(85), // High CPU for video encoding
Byte::fromGigabytes(3), // High memory usage
2 // At capacity
);
$audioWorkerLightLoad = $audioWorkers[0]->updateHeartbeat(
new Percentage(30), // Moderate CPU
Byte::fromMegabytes(800),
3 // 3/8 jobs
);
$thumbnailWorkerIdle = $thumbnailWorkers[0]->updateHeartbeat(
new Percentage(5), // Very low CPU
Byte::fromMegabytes(200),
0 // No current jobs
);
// Verify load distribution logic
expect($videoWorkerUnderLoad->isAvailableForJobs())->toBeFalse(); // At capacity
expect($audioWorkerLightLoad->isAvailableForJobs())->toBeTrue();
expect($thumbnailWorkerIdle->isAvailableForJobs())->toBeTrue();
// Check load percentages
expect($videoWorkerUnderLoad->getLoadPercentage()->getValue())->toBe(100.0); // 2/2 jobs = 100%
expect($audioWorkerLightLoad->getLoadPercentage()->getValue())->toBe(37.5); // 3/8 = 37.5%
expect($thumbnailWorkerIdle->getLoadPercentage()->getValue())->toBe(5.0); // CPU load only
expect(count($allWorkers))->toBe(5);
});
it('prevents resource exhaustion through proper capability matching', function () {
// Worker without GPU capabilities trying to handle video processing
$cpuOnlyWorker = Worker::register(
hostname: 'cpu-server',
processId: 7001,
queues: [QueueName::fromString('video-processing')],
maxJobs: 10,
capabilities: ['cpu-encoding'] // Missing GPU capability
);
$gpuWorker = Worker::register(
hostname: 'gpu-server',
processId: 7002,
queues: [QueueName::fromString('video-processing')],
maxJobs: 2,
capabilities: ['gpu-acceleration', 'video-encode', 'h264']
);
// Job requiring GPU acceleration
$jobData = [
'required_capabilities' => ['gpu-acceleration', 'h264'],
'resource_requirements' => [
'gpu_memory' => '4GB',
'encoding_quality' => 'high'
]
];
// Mock worker scoring (would normally be done by JobDistributionService)
// CPU-only worker should get score 0 (missing required capability)
expect($cpuOnlyWorker->hasCapability('gpu-acceleration'))->toBeFalse();
expect($gpuWorker->hasCapability('gpu-acceleration'))->toBeTrue();
expect($gpuWorker->hasCapability('h264'))->toBeTrue();
});
});
describe('Financial Transaction Processing Scenario', function () {
it('ensures transaction consistency with distributed locking', function () {
// Scenario: Banking system processing concurrent transactions
$transactionWorkers = [
Worker::register(
hostname: 'transaction-server-1',
processId: 8001,
queues: [QueueName::fromString('transactions')],
maxJobs: 50, // High throughput for financial data
capabilities: ['payment-processing', 'fraud-detection', 'pci-compliant']
),
Worker::register(
hostname: 'transaction-server-2',
processId: 8002,
queues: [QueueName::fromString('transactions')],
maxJobs: 50,
capabilities: ['payment-processing', 'fraud-detection', 'pci-compliant']
)
];
// Simulate concurrent transaction processing
$accountId = 'account-12345';
$transactionLock = LockKey::forResource('account', $accountId);
// Mock lock acquisition for account processing
$lockStmt = mock(\PDOStatement::class);
$lockStmt->shouldReceive('execute')->andReturn(true);
$lockStmt->shouldReceive('rowCount')->andReturn(1);
$failLockStmt = mock(\PDOStatement::class);
$failLockStmt->shouldReceive('execute')->andThrow(
new \PDOException('Duplicate entry for key PRIMARY')
);
$this->connection->shouldReceive('prepare')->andReturn(
$lockStmt, // First worker gets lock
$failLockStmt // Second worker fails
);
// First worker should acquire lock successfully
$worker1 = $transactionWorkers[0];
$worker2 = $transactionWorkers[1];
$firstLockResult = $this->distributedLock->acquire(
$transactionLock,
$worker1->id,
Duration::fromMinutes(5)
);
$secondLockResult = $this->distributedLock->acquire(
$transactionLock,
$worker2->id,
Duration::fromMinutes(5)
);
expect($firstLockResult)->toBeTrue();
expect($secondLockResult)->toBeFalse(); // Should fail due to existing lock
});
it('handles high-frequency trading with minimal latency', function () {
// High-performance workers for trading operations
$tradingWorker = Worker::register(
hostname: 'trading-server-hft',
processId: 9001,
queues: [QueueName::fromString('high-frequency-trading')],
maxJobs: 1000, // Very high throughput
capabilities: ['ultra-low-latency', 'market-data', 'order-execution']
);
// Simulate high load but healthy performance
$performantWorker = $tradingWorker->updateHeartbeat(
new Percentage(60), // Moderate CPU despite high load
Byte::fromGigabytes(1.5), // Efficient memory usage
800 // High job count but within limits
);
expect($performantWorker->isHealthy())->toBeTrue();
expect($performantWorker->isAvailableForJobs())->toBeTrue(); // Still has capacity
expect($performantWorker->getLoadPercentage()->getValue())->toBe(80.0); // 800/1000 jobs
});
});
describe('Content Delivery Network Scenario', function () {
it('distributes cache warming jobs across geographic regions', function () {
// Workers in different geographic regions
$usEastWorkers = [
Worker::register(
hostname: 'cdn-us-east-1',
processId: 10001,
queues: [QueueName::fromString('cache-warming')],
maxJobs: 25,
capabilities: ['cdn-management', 'us-east-region', 'edge-caching']
),
Worker::register(
hostname: 'cdn-us-east-2',
processId: 10002,
queues: [QueueName::fromString('cache-warming')],
maxJobs: 25,
capabilities: ['cdn-management', 'us-east-region', 'edge-caching']
)
];
$europeWorkers = [
Worker::register(
hostname: 'cdn-eu-west-1',
processId: 11001,
queues: [QueueName::fromString('cache-warming')],
maxJobs: 20,
capabilities: ['cdn-management', 'eu-west-region', 'edge-caching']
)
];
$asiaWorkers = [
Worker::register(
hostname: 'cdn-asia-pacific-1',
processId: 12001,
queues: [QueueName::fromString('cache-warming')],
maxJobs: 15,
capabilities: ['cdn-management', 'asia-pacific-region', 'edge-caching']
)
];
$allCdnWorkers = array_merge($usEastWorkers, $europeWorkers, $asiaWorkers);
// Verify regional distribution
$usEastCount = count(array_filter($allCdnWorkers,
fn($w) => $w->hasCapability('us-east-region')));
$europeCount = count(array_filter($allCdnWorkers,
fn($w) => $w->hasCapability('eu-west-region')));
$asiaCount = count(array_filter($allCdnWorkers,
fn($w) => $w->hasCapability('asia-pacific-region')));
expect($usEastCount)->toBe(2);
expect($europeCount)->toBe(1);
expect($asiaCount)->toBe(1);
// Verify all workers can handle cache warming
foreach ($allCdnWorkers as $worker) {
expect($worker->hasCapability('cdn-management'))->toBeTrue();
expect($worker->hasCapability('edge-caching'))->toBeTrue();
}
});
it('handles regional worker failure with graceful degradation', function () {
// Scenario: Entire region goes offline, traffic redistributed
$primaryWorker = Worker::register(
hostname: 'cdn-primary',
processId: 13001,
queues: [QueueName::fromString('content-delivery')],
maxJobs: 100,
capabilities: ['primary-region', 'high-capacity']
);
$backupWorkers = [
Worker::register(
hostname: 'cdn-backup-1',
processId: 13002,
queues: [QueueName::fromString('content-delivery')],
maxJobs: 50,
capabilities: ['backup-region', 'medium-capacity']
),
Worker::register(
hostname: 'cdn-backup-2',
processId: 13003,
queues: [QueueName::fromString('content-delivery')],
maxJobs: 50,
capabilities: ['backup-region', 'medium-capacity']
)
];
// Simulate primary region failure
$failedPrimary = $primaryWorker->markInactive();
expect($failedPrimary->isAvailableForJobs())->toBeFalse();
// Backup workers should still be available
foreach ($backupWorkers as $backup) {
expect($backup->isAvailableForJobs())->toBeTrue();
}
// Total backup capacity should handle reduced load
$totalBackupCapacity = array_sum(array_map(fn($w) => $w->maxJobs, $backupWorkers));
expect($totalBackupCapacity)->toBe(100); // Same as primary capacity
});
});
describe('Machine Learning Training Pipeline Scenario', function () {
it('manages resource-intensive ML training jobs efficiently', function () {
// Specialized workers for different ML tasks
$gpuTrainingWorker = Worker::register(
hostname: 'ml-gpu-cluster-1',
processId: 14001,
queues: [QueueName::fromString('ml-training')],
maxJobs: 1, // One intensive job at a time
capabilities: ['gpu-cluster', 'tensorflow', 'pytorch', 'cuda']
);
$dataPreprocessingWorkers = [
Worker::register(
hostname: 'ml-preprocessing-1',
processId: 14002,
queues: [QueueName::fromString('data-preprocessing')],
maxJobs: 10,
capabilities: ['data-cleaning', 'feature-engineering', 'pandas', 'numpy']
),
Worker::register(
hostname: 'ml-preprocessing-2',
processId: 14003,
queues: [QueueName::fromString('data-preprocessing')],
maxJobs: 10,
capabilities: ['data-cleaning', 'feature-engineering', 'pandas', 'numpy']
)
];
$inferenceWorkers = [
Worker::register(
hostname: 'ml-inference-1',
processId: 14004,
queues: [QueueName::fromString('ml-inference')],
maxJobs: 50, // High throughput for inference
capabilities: ['model-serving', 'tensorflow-lite', 'onnx']
)
];
// Simulate GPU worker under heavy load
$trainingWorkerLoaded = $gpuTrainingWorker->updateHeartbeat(
new Percentage(95), // High GPU utilization
Byte::fromGigabytes(15), // High memory for large models
1 // At capacity
);
// Preprocessing workers with moderate load
$preprocessingWorkerActive = $dataPreprocessingWorkers[0]->updateHeartbeat(
new Percentage(70), // Active data processing
Byte::fromGigabytes(4),
6 // 6/10 jobs
);
// Inference worker with light load
$inferenceWorkerIdle = $inferenceWorkers[0]->updateHeartbeat(
new Percentage(20), // Low CPU for inference
Byte::fromMegabytes(800),
5 // 5/50 jobs
);
// Verify resource allocation patterns
expect($trainingWorkerLoaded->isAvailableForJobs())->toBeFalse(); // At capacity
expect($preprocessingWorkerActive->isAvailableForJobs())->toBeTrue();
expect($inferenceWorkerIdle->isAvailableForJobs())->toBeTrue();
// Check load patterns match expected ML workloads
expect($trainingWorkerLoaded->getLoadPercentage()->getValue())->toBe(100.0); // At capacity
expect($preprocessingWorkerActive->getLoadPercentage()->getValue())->toBe(70.0); // CPU bound
expect($inferenceWorkerIdle->getLoadPercentage()->getValue())->toBe(20.0); // Light load
});
});
});

View File

@@ -0,0 +1,820 @@
<?php
declare(strict_types=1);
use App\Framework\Queue\Services\WorkerRegistry;
use App\Framework\Queue\Services\DatabaseDistributedLock;
use App\Framework\Queue\Services\JobDistributionService;
use App\Framework\Queue\Services\WorkerHealthCheckService;
use App\Framework\Queue\Services\FailoverRecoveryService;
use App\Framework\Queue\Entities\Worker;
use App\Framework\Queue\ValueObjects\WorkerId;
use App\Framework\Queue\ValueObjects\JobId;
use App\Framework\Queue\ValueObjects\LockKey;
use App\Framework\Queue\ValueObjects\QueueName;
use App\Framework\Core\ValueObjects\Percentage;
use App\Framework\Core\ValueObjects\Byte;
use App\Framework\Core\ValueObjects\Duration;
use App\Framework\Database\ConnectionInterface;
use App\Framework\Logging\Logger;
/**
* Comprehensive integration tests for the Distributed Processing System
*/
describe('Distributed Processing System', function () {
beforeEach(function () {
// Mock connection for database operations
$this->connection = mock(ConnectionInterface::class);
$this->logger = mock(Logger::class);
// Create services
$this->workerRegistry = new WorkerRegistry($this->connection, $this->logger);
$this->distributedLock = new DatabaseDistributedLock($this->connection, $this->logger);
$this->jobDistribution = new JobDistributionService(
$this->workerRegistry,
$this->distributedLock,
$this->connection,
$this->logger
);
$this->healthCheck = new WorkerHealthCheckService(
$this->workerRegistry,
$this->connection,
$this->logger
);
$this->failoverRecovery = new FailoverRecoveryService(
$this->workerRegistry,
$this->jobDistribution,
$this->healthCheck,
$this->distributedLock,
$this->connection,
$this->logger
);
// Create test workers
$this->worker1 = Worker::register(
hostname: 'app-server-1',
processId: 1001,
queues: [
QueueName::emailQueue(),
QueueName::defaultQueue()
],
maxJobs: 10,
capabilities: ['email', 'pdf-generation']
);
$this->worker2 = Worker::register(
hostname: 'app-server-2',
processId: 1002,
queues: [
QueueName::defaultQueue(),
QueueName::fromString('high-priority')
],
maxJobs: 5,
capabilities: ['image-processing', 'pdf-generation']
);
$this->worker3 = Worker::register(
hostname: 'app-server-3',
processId: 1003,
queues: [
QueueName::emailQueue()
],
maxJobs: 15,
capabilities: ['email', 'notifications']
);
// Test job IDs
$this->jobId1 = JobId::generate();
$this->jobId2 = JobId::generate();
$this->jobId3 = JobId::generate();
});
describe('Worker Registration and Discovery', function () {
it('can register multiple workers across different queues', function () {
// Mock successful database operations for worker registration
$this->connection->shouldReceive('prepare')->andReturnSelf();
$this->connection->shouldReceive('execute')->andReturn(true);
$this->logger->shouldReceive('info')->andReturn(null);
$this->logger->shouldReceive('debug')->andReturn(null);
// Register workers
$this->workerRegistry->register($this->worker1);
$this->workerRegistry->register($this->worker2);
$this->workerRegistry->register($this->worker3);
// Verify registration calls were made
expect($this->connection)->toHaveReceived('prepare')->times(3);
expect($this->connection)->toHaveReceived('execute')->times(3);
});
it('can find workers for specific queues', function () {
// Mock database query for finding workers by queue
$stmt = mock(\PDOStatement::class);
$stmt->shouldReceive('execute')->andReturn(true);
$stmt->shouldReceive('fetch')->andReturn(
$this->worker1->toArray(),
$this->worker3->toArray(),
false // End of results
);
$this->connection->shouldReceive('prepare')->andReturn($stmt);
$this->logger->shouldReceive('error')->never();
$workers = $this->workerRegistry->findWorkersForQueue(QueueName::emailQueue());
expect($workers)->toHaveCount(2);
expect($workers[0]->hostname)->toBe('app-server-1');
expect($workers[1]->hostname)->toBe('app-server-3');
});
it('can find best available worker with load balancing', function () {
// Create workers with different load levels
$lightlyLoadedWorker = $this->worker1->updateHeartbeat(
new Percentage(20), // Low CPU
Byte::fromMegabytes(512), // Low memory
2 // 2 out of 10 jobs
);
$heavilyLoadedWorker = $this->worker2->updateHeartbeat(
new Percentage(80), // High CPU
Byte::fromMegabytes(1500), // High memory
4 // 4 out of 5 jobs = 80% load
);
// Mock database to return workers with different loads
$stmt = mock(\PDOStatement::class);
$stmt->shouldReceive('execute')->andReturn(true);
$stmt->shouldReceive('fetch')->andReturn(
$lightlyLoadedWorker->toArray(),
$heavilyLoadedWorker->toArray(),
false
);
$this->connection->shouldReceive('prepare')->andReturn($stmt);
$this->logger->shouldReceive('error')->never();
$bestWorker = $this->workerRegistry->findBestWorkerForQueue(QueueName::defaultQueue());
expect($bestWorker)->not->toBeNull();
expect($bestWorker->hostname)->toBe('app-server-1'); // Should pick lightly loaded worker
});
it('correctly calculates worker load percentages', function () {
$worker = $this->worker1->updateHeartbeat(
new Percentage(30), // 30% CPU
Byte::fromMegabytes(800),
3 // 3 out of 10 jobs = 30% job load
);
$loadPercentage = $worker->getLoadPercentage();
// Should take the higher of CPU load (30%) or job load (30%)
expect($loadPercentage->getValue())->toBe(30.0);
// Test with higher CPU load
$workerHighCpu = $worker->updateHeartbeat(
new Percentage(75), // 75% CPU
Byte::fromMegabytes(800),
3 // Still 30% job load
);
expect($workerHighCpu->getLoadPercentage()->getValue())->toBe(75.0);
});
});
describe('Distributed Locking System', function () {
it('prevents race conditions when acquiring job locks', function () {
$lockKey = LockKey::forJob($this->jobId1);
$workerId1 = WorkerId::generate();
$workerId2 = WorkerId::generate();
$ttl = Duration::fromMinutes(5);
// Mock first worker successfully acquiring lock
$stmt1 = mock(\PDOStatement::class);
$stmt1->shouldReceive('execute')->andReturn(true);
$stmt1->shouldReceive('rowCount')->andReturn(1); // Successful insert
// Mock second worker failing to acquire same lock (duplicate key)
$stmt2 = mock(\PDOStatement::class);
$stmt2->shouldReceive('execute')->andThrow(
new \PDOException('Duplicate entry for key PRIMARY')
);
$this->connection->shouldReceive('prepare')
->andReturn($stmt1, $stmt2);
$this->logger->shouldReceive('debug')->andReturn(null);
$this->logger->shouldReceive('info')->andReturn(null);
// First worker should successfully acquire lock
$result1 = $this->distributedLock->acquire($lockKey, $workerId1, $ttl);
expect($result1)->toBeTrue();
// Second worker should fail to acquire same lock
$result2 = $this->distributedLock->acquire($lockKey, $workerId2, $ttl);
expect($result2)->toBeFalse();
});
it('can extend lock duration for active workers', function () {
$lockKey = LockKey::forJob($this->jobId1);
$workerId = WorkerId::generate();
$extension = Duration::fromMinutes(10);
// Mock successful lock extension
$stmt = mock(\PDOStatement::class);
$stmt->shouldReceive('execute')->andReturn(true);
$stmt->shouldReceive('rowCount')->andReturn(1); // Lock was extended
$this->connection->shouldReceive('prepare')->andReturn($stmt);
$this->logger->shouldReceive('debug')->andReturn(null);
$result = $this->distributedLock->extend($lockKey, $workerId, $extension);
expect($result)->toBeTrue();
});
it('can release locks and clean up resources', function () {
$lockKey = LockKey::forJob($this->jobId1);
$workerId = WorkerId::generate();
// Mock successful lock release
$stmt = mock(\PDOStatement::class);
$stmt->shouldReceive('execute')->andReturn(true);
$stmt->shouldReceive('rowCount')->andReturn(1); // Lock was deleted
$this->connection->shouldReceive('prepare')->andReturn($stmt);
$this->logger->shouldReceive('debug')->andReturn(null);
$this->logger->shouldReceive('info')->andReturn(null);
$result = $this->distributedLock->release($lockKey, $workerId);
expect($result)->toBeTrue();
});
it('supports lock acquisition with timeout for competing workers', function () {
$lockKey = LockKey::forQueue(QueueName::defaultQueue());
$workerId = WorkerId::generate();
$ttl = Duration::fromMinutes(5);
$timeout = Duration::fromSeconds(2);
// Mock first attempt fails, second attempt succeeds
$stmt1 = mock(\PDOStatement::class);
$stmt1->shouldReceive('execute')->andThrow(
new \PDOException('Duplicate entry')
);
$stmt2 = mock(\PDOStatement::class);
$stmt2->shouldReceive('execute')->andReturn(true);
$stmt2->shouldReceive('rowCount')->andReturn(1);
$this->connection->shouldReceive('prepare')
->andReturn($stmt1, $stmt2);
$this->logger->shouldReceive('debug')->andReturn(null);
$this->logger->shouldReceive('info')->andReturn(null);
$result = $this->distributedLock->acquireWithTimeout($lockKey, $workerId, $ttl, $timeout);
expect($result)->toBeTrue();
});
});
describe('Job Distribution Service', function () {
it('can distribute jobs to best available workers', function () {
// Mock distribution lock acquisition
$distributionStmt = mock(\PDOStatement::class);
$distributionStmt->shouldReceive('execute')->andReturn(true);
$distributionStmt->shouldReceive('rowCount')->andReturn(1);
// Mock job lock acquisition
$jobStmt = mock(\PDOStatement::class);
$jobStmt->shouldReceive('execute')->andReturn(true);
$jobStmt->shouldReceive('rowCount')->andReturn(1);
// Mock job assignment recording
$assignmentStmt = mock(\PDOStatement::class);
$assignmentStmt->shouldReceive('execute')->andReturn(true);
// Mock finding workers for queue
$workerStmt = mock(\PDOStatement::class);
$workerStmt->shouldReceive('execute')->andReturn(true);
$workerStmt->shouldReceive('fetch')->andReturn(
$this->worker1->toArray(),
false
);
$this->connection->shouldReceive('prepare')->andReturn(
$distributionStmt, // Distribution lock
$jobStmt, // Job lock
$jobStmt, // Job lock transfer (release old)
$jobStmt, // Job lock transfer (acquire new)
$assignmentStmt, // Job assignment
$jobStmt, // Release distribution lock
$workerStmt // Find workers
);
$this->logger->shouldReceive('info')->andReturn(null);
$this->logger->shouldReceive('debug')->andReturn(null);
$this->logger->shouldReceive('warning')->never();
$this->logger->shouldReceive('error')->never();
$assignedWorkerId = $this->jobDistribution->distributeJob(
$this->jobId1,
QueueName::defaultQueue(),
['priority' => 'normal']
);
expect($assignedWorkerId)->not->toBeNull();
expect($assignedWorkerId->toString())->toBe($this->worker1->id->toString());
});
it('calculates worker scores based on load and capabilities', function () {
$jobData = [
'required_capabilities' => ['email', 'pdf-generation']
];
$bestWorker = $this->jobDistribution->findBestWorkerForJob(
QueueName::emailQueue(),
$jobData
);
// Should return null when no workers are mocked in database
// In real scenario, would return worker with matching capabilities and lowest load
expect($bestWorker)->toBeNull();
});
it('handles job distribution when no workers are available', function () {
// Mock empty worker result set
$stmt = mock(\PDOStatement::class);
$stmt->shouldReceive('execute')->andReturn(true);
$stmt->shouldReceive('fetch')->andReturn(false); // No workers found
$this->connection->shouldReceive('prepare')->andReturn($stmt);
$this->logger->shouldReceive('warning')->andReturn(null);
$this->logger->shouldReceive('error')->never();
$result = $this->jobDistribution->findBestWorkerForJob(QueueName::defaultQueue());
expect($result)->toBeNull();
});
it('can release jobs from workers and cleanup assignments', function () {
// Mock successful job release
$lockStmt = mock(\PDOStatement::class);
$lockStmt->shouldReceive('execute')->andReturn(true);
$lockStmt->shouldReceive('rowCount')->andReturn(1);
// Mock assignment cleanup
$assignmentStmt = mock(\PDOStatement::class);
$assignmentStmt->shouldReceive('execute')->andReturn(true);
$this->connection->shouldReceive('prepare')->andReturn(
$lockStmt, // Release job lock
$assignmentStmt // Delete assignment
);
$this->logger->shouldReceive('info')->andReturn(null);
$this->logger->shouldReceive('debug')->andReturn(null);
$result = $this->jobDistribution->releaseJob($this->jobId1, $this->worker1->id);
expect($result)->toBeTrue();
});
});
describe('Worker Health Monitoring', function () {
it('detects unhealthy workers based on resource usage', function () {
// Create worker with critical resource usage
$unhealthyWorker = $this->worker1->updateHeartbeat(
new Percentage(95), // Critical CPU usage
Byte::fromGigabytes(2.5), // Exceeds memory limit
8 // High job count but within limits
);
expect($unhealthyWorker->isHealthy())->toBeFalse();
expect($unhealthyWorker->isAvailableForJobs())->toBeFalse();
});
it('detects workers with stale heartbeats', function () {
// Create worker with old heartbeat
$staleWorker = new Worker(
id: $this->worker1->id,
hostname: $this->worker1->hostname,
processId: $this->worker1->processId,
queues: $this->worker1->queues,
maxJobs: $this->worker1->maxJobs,
registeredAt: $this->worker1->registeredAt,
lastHeartbeat: new \DateTimeImmutable('-5 minutes'), // Stale heartbeat
isActive: true,
cpuUsage: new Percentage(30),
memoryUsage: Byte::fromMegabytes(512),
currentJobs: 2
);
expect($staleWorker->isHealthy())->toBeFalse();
});
it('identifies healthy workers correctly', function () {
$healthyWorker = $this->worker1->updateHeartbeat(
new Percentage(45), // Normal CPU usage
Byte::fromMegabytes(800), // Normal memory usage
3 // Normal job count
);
expect($healthyWorker->isHealthy())->toBeTrue();
expect($healthyWorker->isAvailableForJobs())->toBeTrue();
});
it('considers workers at capacity as unavailable but healthy', function () {
$atCapacityWorker = $this->worker2->updateHeartbeat(
new Percentage(50), // Normal CPU
Byte::fromMegabytes(600), // Normal memory
5 // At max capacity (5/5 jobs)
);
expect($atCapacityWorker->isHealthy())->toBeTrue();
expect($atCapacityWorker->isAvailableForJobs())->toBeFalse(); // At capacity
});
});
describe('Multi-Worker Load Distribution', function () {
it('distributes jobs across multiple workers evenly', function () {
$workers = [$this->worker1, $this->worker2, $this->worker3];
$jobs = [$this->jobId1, $this->jobId2, $this->jobId3];
// Mock successful distribution for all jobs
foreach ($jobs as $index => $jobId) {
// Each job gets distributed to a different worker
$stmt = mock(\PDOStatement::class);
$stmt->shouldReceive('execute')->andReturn(true);
$stmt->shouldReceive('rowCount')->andReturn(1);
$stmt->shouldReceive('fetch')->andReturn(
$workers[$index]->toArray(),
false
);
$this->connection->shouldReceive('prepare')->andReturn($stmt);
}
$this->logger->shouldReceive('info')->andReturn(null);
$this->logger->shouldReceive('debug')->andReturn(null);
$assignments = [];
foreach ($jobs as $jobId) {
$workerId = $this->jobDistribution->distributeJob(
$jobId,
QueueName::defaultQueue()
);
if ($workerId) {
$assignments[] = $workerId->toString();
}
}
// Should have distributed jobs (exact distribution depends on mocking)
expect($assignments)->not->toBeEmpty();
});
it('handles worker overload by selecting alternative workers', function () {
// Create overloaded worker
$overloadedWorker = $this->worker1->updateHeartbeat(
new Percentage(95), // Critical CPU
Byte::fromGigabytes(2.5), // Over memory limit
10 // At max capacity
);
// Create available alternative worker
$availableWorker = $this->worker2->updateHeartbeat(
new Percentage(20), // Low CPU
Byte::fromMegabytes(400), // Low memory
1 // Low job count
);
// Verify overloaded worker is not available
expect($overloadedWorker->isAvailableForJobs())->toBeFalse();
// Verify alternative worker is available
expect($availableWorker->isAvailableForJobs())->toBeTrue();
});
});
describe('Automatic Failover and Recovery', function () {
it('detects failed workers and initiates recovery', function () {
// Create failed worker (old heartbeat, high resource usage)
$failedWorker = new Worker(
id: $this->worker1->id,
hostname: $this->worker1->hostname,
processId: $this->worker1->processId,
queues: $this->worker1->queues,
maxJobs: $this->worker1->maxJobs,
registeredAt: $this->worker1->registeredAt,
lastHeartbeat: new \DateTimeImmutable('-10 minutes'), // Very stale
isActive: true,
cpuUsage: new Percentage(99), // Critical CPU
memoryUsage: Byte::fromGigabytes(3), // Over limit
currentJobs: 5
);
expect($failedWorker->isHealthy())->toBeFalse();
expect($failedWorker->isAvailableForJobs())->toBeFalse();
});
it('can reassign jobs from failed workers to healthy workers', function () {
$failedWorkerId = WorkerId::generate();
$healthyWorkerId = WorkerId::generate();
// Mock job reassignment operations
$stmt = mock(\PDOStatement::class);
$stmt->shouldReceive('execute')->andReturn(true);
$stmt->shouldReceive('rowCount')->andReturn(2); // 2 jobs reassigned
$this->connection->shouldReceive('prepare')->andReturn($stmt);
$this->logger->shouldReceive('info')->andReturn(null);
$releasedJobs = $this->jobDistribution->releaseAllWorkerJobs($failedWorkerId);
expect($releasedJobs)->toBeGreaterThanOrEqual(0);
});
it('cleans up resources from inactive workers', function () {
// Mock worker cleanup operations
$stmt = mock(\PDOStatement::class);
$stmt->shouldReceive('execute')->andReturn(true);
$stmt->shouldReceive('rowCount')->andReturn(3); // 3 workers deactivated
$this->connection->shouldReceive('prepare')->andReturn($stmt);
$this->logger->shouldReceive('info')->andReturn(null);
$cleanedCount = $this->workerRegistry->cleanupInactiveWorkers(5);
expect($cleanedCount)->toBe(3);
});
});
describe('System Resilience and Stress Testing', function () {
it('handles concurrent job distribution requests', function () {
$concurrentJobs = [
JobId::generate(),
JobId::generate(),
JobId::generate(),
JobId::generate(),
JobId::generate()
];
// Mock successful distribution for all concurrent jobs
foreach ($concurrentJobs as $jobId) {
$stmt = mock(\PDOStatement::class);
$stmt->shouldReceive('execute')->andReturn(true);
$stmt->shouldReceive('rowCount')->andReturn(1);
$stmt->shouldReceive('fetch')->andReturn(
$this->worker1->toArray(),
false
);
$this->connection->shouldReceive('prepare')->andReturn($stmt);
}
$this->logger->shouldReceive('info')->andReturn(null);
$this->logger->shouldReceive('debug')->andReturn(null);
$successfulDistributions = 0;
foreach ($concurrentJobs as $jobId) {
$workerId = $this->jobDistribution->distributeJob(
$jobId,
QueueName::defaultQueue()
);
if ($workerId) {
$successfulDistributions++;
}
}
expect($successfulDistributions)->toBeGreaterThanOrEqual(0);
});
it('maintains system consistency during lock contention', function () {
$lockKey = LockKey::forQueue(QueueName::defaultQueue());
$workers = [
WorkerId::generate(),
WorkerId::generate(),
WorkerId::generate()
];
// Simulate lock contention - only first worker succeeds
$successStmt = mock(\PDOStatement::class);
$successStmt->shouldReceive('execute')->andReturn(true);
$successStmt->shouldReceive('rowCount')->andReturn(1);
$failStmt = mock(\PDOStatement::class);
$failStmt->shouldReceive('execute')->andThrow(
new \PDOException('Duplicate entry')
);
$this->connection->shouldReceive('prepare')->andReturn(
$successStmt, // First worker succeeds
$failStmt, // Second worker fails
$failStmt // Third worker fails
);
$this->logger->shouldReceive('debug')->andReturn(null);
$this->logger->shouldReceive('info')->andReturn(null);
$results = [];
foreach ($workers as $workerId) {
$result = $this->distributedLock->acquire(
$lockKey,
$workerId,
Duration::fromMinutes(5)
);
$results[] = $result;
}
// Only one worker should succeed
$successCount = array_sum($results);
expect($successCount)->toBe(1);
});
it('recovers gracefully from database connection failures', function () {
// Mock database connection failure
$this->connection->shouldReceive('prepare')->andThrow(
new \PDOException('Connection lost')
);
$this->logger->shouldReceive('error')->andReturn(null);
// System should handle gracefully and throw appropriate exception
expect(fn() => $this->workerRegistry->findActiveWorkers())
->toThrow(\PDOException::class);
});
it('provides comprehensive system statistics for monitoring', function () {
// Mock statistics queries
$statsStmt = mock(\PDOStatement::class);
$statsStmt->shouldReceive('execute')->andReturn(true);
$statsStmt->shouldReceive('fetch')->andReturn([
'total_workers' => 3,
'active_workers' => 2,
'healthy_workers' => 2,
'unique_hosts' => 2,
'total_capacity' => 30,
'current_load' => 10,
'avg_cpu_usage' => 45.5,
'avg_memory_usage' => 819200000 // ~800MB in bytes
]);
$queueStmt = mock(\PDOStatement::class);
$queueStmt->shouldReceive('execute')->andReturn(true);
$queueStmt->shouldReceive('fetch')->andReturn(
['queues' => '["default", "email"]'],
['queues' => '["default", "high-priority"]'],
false
);
$this->connection->shouldReceive('prepare')->andReturn(
$statsStmt, $queueStmt
);
$this->logger->shouldReceive('error')->never();
$statistics = $this->workerRegistry->getWorkerStatistics();
expect($statistics)->toHaveKey('total_workers');
expect($statistics)->toHaveKey('active_workers');
expect($statistics)->toHaveKey('capacity_utilization');
expect($statistics['total_workers'])->toBe(3);
expect($statistics['active_workers'])->toBe(2);
});
});
describe('Value Object Behavior', function () {
it('ensures WorkerId uniqueness and proper formatting', function () {
$workerId1 = WorkerId::generate();
$workerId2 = WorkerId::generate();
expect($workerId1->toString())->not->toBe($workerId2->toString());
expect($workerId1->equals($workerId2))->toBeFalse();
// Test host-based WorkerId
$hostWorkerId = WorkerId::forHost('test-host', 1234);
expect($hostWorkerId->toString())->toContain('test-host');
});
it('validates LockKey patterns and constraints', function () {
$jobLock = LockKey::forJob($this->jobId1);
$queueLock = LockKey::forQueue(QueueName::defaultQueue());
$workerLock = LockKey::forWorker($this->worker1->id);
expect($jobLock->toString())->toStartWith('job.');
expect($queueLock->toString())->toStartWith('queue.');
expect($workerLock->toString())->toStartWith('worker.');
// Test lock key modifications
$prefixedLock = $jobLock->withPrefix('tenant-1');
$suffixedLock = $queueLock->withSuffix('processing');
expect($prefixedLock->toString())->toStartWith('tenant-1.job.');
expect($suffixedLock->toString())->toEndWith('.processing');
});
it('validates JobId generation and uniqueness', function () {
$jobId1 = JobId::generate();
$jobId2 = JobId::generate();
expect($jobId1->toString())->not->toBe($jobId2->toString());
expect($jobId1->equals($jobId2))->toBeFalse();
// Test string conversion
$jobIdFromString = JobId::fromString($jobId1->toString());
expect($jobIdFromString->equals($jobId1))->toBeTrue();
});
it('properly handles QueueName creation and equality', function () {
$queue1 = QueueName::defaultQueue();
$queue2 = QueueName::emailQueue();
$queue3 = QueueName::fromString('custom-queue');
expect($queue1->toString())->toBe('default');
expect($queue2->toString())->toBe('email');
expect($queue3->toString())->toBe('custom-queue');
expect($queue1->equals($queue2))->toBeFalse();
expect($queue1->equals(QueueName::defaultQueue()))->toBeTrue();
});
});
describe('Edge Cases and Error Scenarios', function () {
it('handles worker registration with invalid data gracefully', function () {
expect(fn() => Worker::register(
hostname: '', // Invalid empty hostname
processId: 1001,
queues: [QueueName::defaultQueue()],
maxJobs: 10
))->toThrow(\InvalidArgumentException::class);
expect(fn() => Worker::register(
hostname: 'valid-host',
processId: 1001,
queues: [], // Invalid empty queues
maxJobs: 10
))->toThrow(\InvalidArgumentException::class);
expect(fn() => Worker::register(
hostname: 'valid-host',
processId: 1001,
queues: [QueueName::defaultQueue()],
maxJobs: 0 // Invalid max jobs
))->toThrow(\InvalidArgumentException::class);
});
it('handles lock key validation properly', function () {
expect(fn() => LockKey::fromString(''))
->toThrow(\InvalidArgumentException::class);
expect(fn() => LockKey::fromString(str_repeat('a', 256))) // Too long
->toThrow(\InvalidArgumentException::class);
expect(fn() => LockKey::fromString('invalid@key!')) // Invalid characters
->toThrow(\InvalidArgumentException::class);
});
it('handles job distribution when all workers are at capacity', function () {
// Mock database returning workers at full capacity
$stmt = mock(\PDOStatement::class);
$stmt->shouldReceive('execute')->andReturn(true);
$stmt->shouldReceive('fetch')->andReturn(false); // No available workers
$this->connection->shouldReceive('prepare')->andReturn($stmt);
$this->logger->shouldReceive('warning')->andReturn(null);
$result = $this->jobDistribution->findBestWorkerForJob(QueueName::defaultQueue());
expect($result)->toBeNull();
});
it('handles lock acquisition timeout correctly', function () {
$lockKey = LockKey::forJob($this->jobId1);
$workerId = WorkerId::generate();
$ttl = Duration::fromMinutes(5);
$shortTimeout = Duration::fromMilliseconds(100); // Very short timeout
// Mock all acquisition attempts fail
$stmt = mock(\PDOStatement::class);
$stmt->shouldReceive('execute')->andThrow(
new \PDOException('Duplicate entry')
);
$this->connection->shouldReceive('prepare')->andReturn($stmt);
$this->logger->shouldReceive('debug')->andReturn(null);
$this->logger->shouldReceive('info')->andReturn(null);
$result = $this->distributedLock->acquireWithTimeout(
$lockKey,
$workerId,
$ttl,
$shortTimeout
);
expect($result)->toBeFalse();
});
});
});