Some checks failed
🚀 Build & Deploy Image / Determine Build Necessity (push) Failing after 10m14s
🚀 Build & Deploy Image / Build Runtime Base Image (push) Has been skipped
🚀 Build & Deploy Image / Build Docker Image (push) Has been skipped
🚀 Build & Deploy Image / Run Tests & Quality Checks (push) Has been skipped
🚀 Build & Deploy Image / Auto-deploy to Staging (push) Has been skipped
🚀 Build & Deploy Image / Auto-deploy to Production (push) Has been skipped
Security Vulnerability Scan / Check for Dependency Changes (push) Failing after 11m25s
Security Vulnerability Scan / Composer Security Audit (push) Has been cancelled
- Remove middleware reference from Gitea Traefik labels (caused routing issues) - Optimize Gitea connection pool settings (MAX_IDLE_CONNS=30, authentication_timeout=180s) - Add explicit service reference in Traefik labels - Fix intermittent 504 timeouts by improving PostgreSQL connection handling Fixes Gitea unreachability via git.michaelschiemer.de
1136 lines
27 KiB
Markdown
1136 lines
27 KiB
Markdown
# Queue System
|
|
|
|
Comprehensive documentation of the asynchronous job processing system in the Custom PHP Framework.
|
|
|
|
## Overview
|
|
|
|
The Queue System provides robust, scalable background job processing with support for multiple queue drivers, priority-based execution, retry mechanisms, dead letter queues, and comprehensive monitoring.
|
|
|
|
**Core Features**:
|
|
- **Multi-Driver Support**: File-based, Redis, and Database-backed queues
|
|
- **Priority Queues**: Critical, high, normal, low priority levels
|
|
- **Delayed Jobs**: Schedule jobs for future execution
|
|
- **Retry Strategies**: Exponential backoff with configurable attempts
|
|
- **Dead Letter Queue**: Automatic failed job management
|
|
- **Job Persistence**: Full job lifecycle tracking and history
|
|
- **Progress Tracking**: Real-time job progress monitoring
|
|
- **Distributed Locking**: Multi-worker coordination
|
|
- **Metrics & Monitoring**: Performance statistics and health checks
|
|
|
|
## Queue Interface
|
|
|
|
The framework provides a unified `Queue` interface that all queue drivers implement:
|
|
|
|
```php
|
|
interface Queue
|
|
{
|
|
/**
|
|
* Push a job to the queue
|
|
*/
|
|
public function push(JobPayload $payload): void;
|
|
|
|
/**
|
|
* Pop the next available job from the queue
|
|
*/
|
|
public function pop(): ?JobPayload;
|
|
|
|
/**
|
|
* Peek at the next job without removing it
|
|
*/
|
|
public function peek(): ?JobPayload;
|
|
|
|
/**
|
|
* Get the number of jobs in the queue
|
|
*/
|
|
public function size(): int;
|
|
|
|
/**
|
|
* Clear all jobs from the queue
|
|
*/
|
|
public function clear(): int;
|
|
|
|
/**
|
|
* Get queue statistics
|
|
*/
|
|
public function getStats(): array;
|
|
}
|
|
```
|
|
|
|
### JobPayload Value Object
|
|
|
|
All jobs are wrapped in a `JobPayload` value object that provides type safety and configuration:
|
|
|
|
```php
|
|
final readonly class JobPayload
|
|
{
|
|
public function __construct(
|
|
public object $job, // The job instance
|
|
public QueuePriority $priority, // Priority level
|
|
public Duration $delay, // Execution delay
|
|
public ?Duration $timeout = null, // Max execution time
|
|
public ?RetryStrategy $retryStrategy = null, // Retry configuration
|
|
public ?JobMetadata $metadata = null // Additional metadata
|
|
) {}
|
|
}
|
|
```
|
|
|
|
**Factory Methods**:
|
|
```php
|
|
// Immediate execution with high priority
|
|
$payload = JobPayload::immediate($job);
|
|
|
|
// Delayed execution
|
|
$payload = JobPayload::delayed($job, Duration::fromMinutes(5));
|
|
|
|
// Critical priority with timeout
|
|
$payload = JobPayload::critical($job);
|
|
|
|
// Background job with retries
|
|
$payload = JobPayload::background($job);
|
|
|
|
// Custom configuration
|
|
$payload = JobPayload::create(
|
|
job: $job,
|
|
priority: QueuePriority::high(),
|
|
delay: Duration::fromSeconds(30),
|
|
timeout: Duration::fromMinutes(5),
|
|
retryStrategy: new ExponentialBackoffStrategy(maxAttempts: 3)
|
|
);
|
|
```
|
|
|
|
## Queue Drivers
|
|
|
|
### File-Based Queue (Default)
|
|
|
|
Simple file-based queue implementation for development and small-scale applications:
|
|
|
|
**Features**:
|
|
- No external dependencies
|
|
- Persistent across restarts
|
|
- Suitable for development and small production loads
|
|
- Priority-based sorting
|
|
|
|
**Usage**:
|
|
```php
|
|
use App\Framework\Queue\FileQueue;
|
|
|
|
$queue = new FileQueue(
|
|
directory: '/var/www/storage/queues',
|
|
queueName: 'default'
|
|
);
|
|
|
|
$queue->push(JobPayload::immediate($job));
|
|
```
|
|
|
|
### Redis Queue (Production)
|
|
|
|
High-performance Redis-backed queue with advanced features:
|
|
|
|
**Features**:
|
|
- High throughput (10,000+ jobs/second)
|
|
- Redis sorted sets for priority queues
|
|
- Delayed job support via timestamp scoring
|
|
- Atomic operations for thread safety
|
|
- Real-time statistics
|
|
|
|
**Architecture**:
|
|
- Priority Queue: Redis sorted set with score = priority + timestamp
|
|
- Delayed Queue: Redis sorted set with score = execution timestamp
|
|
- Statistics: Redis hash with counters
|
|
|
|
**Usage**:
|
|
```php
|
|
use App\Framework\Queue\RedisQueue;
|
|
|
|
$queue = new RedisQueue(
|
|
connection: $redisConnection,
|
|
queueName: 'production',
|
|
serializer: new PhpSerializer()
|
|
);
|
|
|
|
$queue->push(JobPayload::immediate($job));
|
|
|
|
// Statistics
|
|
$stats = $queue->getStats();
|
|
// [
|
|
// 'total_size' => 1523,
|
|
// 'priority_queue_size' => 1200,
|
|
// 'delayed_queue_size' => 323,
|
|
// 'priority_breakdown' => ['high' => 150, 'normal' => 1050, 'low' => 323]
|
|
// ]
|
|
```
|
|
|
|
**Score Calculation**:
|
|
```php
|
|
// Lower score = higher priority in Redis
|
|
$priorityScore = 1000 - $priority->value; // Invert priority
|
|
$timeScore = $currentTime / 1000000; // Microsecond precision for FIFO
|
|
$finalScore = $priorityScore + $timeScore;
|
|
```
|
|
|
|
**Delayed Job Processing**:
|
|
```php
|
|
// Automatic processing of ready delayed jobs
|
|
private function processDelayedJobs(): void
|
|
{
|
|
$currentTime = time();
|
|
|
|
// Get all delayed jobs where score <= current time
|
|
$readyJobs = $this->redis->zRangeByScore($this->delayedKey, 0, $currentTime);
|
|
|
|
foreach ($readyJobs as $serializedPayload) {
|
|
// Remove from delayed queue
|
|
$this->redis->zRem($this->delayedKey, $serializedPayload);
|
|
|
|
// Add to priority queue
|
|
$this->redis->zAdd($this->priorityKey, $score, $serializedPayload);
|
|
}
|
|
}
|
|
```
|
|
|
|
### Persistent Queue Decorator
|
|
|
|
Wraps any queue implementation with job persistence and tracking:
|
|
|
|
```php
|
|
final readonly class PersistentQueue implements Queue
|
|
{
|
|
public function __construct(
|
|
private Queue $baseQueue,
|
|
private JobPersistenceLayer $persistence,
|
|
private QueueType $queueType
|
|
) {}
|
|
|
|
public function push(JobPayload $payload): void
|
|
{
|
|
// Generate job ID
|
|
$jobId = JobId::generate();
|
|
|
|
// Store in persistence layer
|
|
$this->persistence->storeJob(
|
|
jobId: $jobId,
|
|
queueType: $this->queueType,
|
|
jobData: $payload->job,
|
|
maxAttempts: $payload->retryStrategy->maxAttempts
|
|
);
|
|
|
|
// Push to base queue with job ID
|
|
$this->baseQueue->push($payload->withJobId($jobId));
|
|
}
|
|
|
|
public function markJobCompleted(JobId $jobId, array $result = []): void
|
|
{
|
|
$this->persistence->markAsCompleted($jobId, $result);
|
|
}
|
|
|
|
public function markJobFailed(JobId $jobId, string $errorMessage): void
|
|
{
|
|
$jobState = $this->persistence->markAsFailed($jobId, $errorMessage);
|
|
|
|
// Automatic retry if applicable
|
|
if ($jobState->canRetry()) {
|
|
$this->baseQueue->push($retryPayload);
|
|
$this->persistence->markForRetry($jobId, $errorMessage);
|
|
}
|
|
}
|
|
}
|
|
```
|
|
|
|
## Background Jobs
|
|
|
|
### Creating a Job
|
|
|
|
Jobs are simple PHP classes with a `handle()` method:
|
|
|
|
```php
|
|
final readonly class SendWelcomeEmailJob
|
|
{
|
|
public function __construct(
|
|
private UserId $userId,
|
|
private Email $email,
|
|
private UserName $userName
|
|
) {}
|
|
|
|
public function handle(): array
|
|
{
|
|
// Business logic
|
|
$this->emailService->send(
|
|
to: $this->email,
|
|
template: 'welcome',
|
|
data: ['name' => $this->userName->value]
|
|
);
|
|
|
|
// Return result for logging
|
|
return [
|
|
'user_id' => $this->userId->toString(),
|
|
'email' => $this->email->getMasked(),
|
|
'sent_at' => time()
|
|
];
|
|
}
|
|
|
|
public function getType(): string
|
|
{
|
|
return 'email.welcome';
|
|
}
|
|
}
|
|
```
|
|
|
|
### Dispatching Jobs
|
|
|
|
```php
|
|
use App\Framework\Queue\Queue;
|
|
use App\Framework\Queue\ValueObjects\JobPayload;
|
|
|
|
// Immediate execution
|
|
$job = new SendWelcomeEmailJob($userId, $email, $userName);
|
|
$queue->push(JobPayload::immediate($job));
|
|
|
|
// Delayed execution (5 minutes)
|
|
$queue->push(JobPayload::delayed($job, Duration::fromMinutes(5)));
|
|
|
|
// Background job with retries
|
|
$queue->push(JobPayload::background($job));
|
|
|
|
// Custom configuration
|
|
$queue->push(JobPayload::create(
|
|
job: $job,
|
|
priority: QueuePriority::high(),
|
|
timeout: Duration::fromMinutes(5),
|
|
retryStrategy: new ExponentialBackoffStrategy(maxAttempts: 3)
|
|
));
|
|
```
|
|
|
|
### Processing Jobs
|
|
|
|
```php
|
|
// Worker loop
|
|
while (true) {
|
|
$payload = $queue->pop();
|
|
|
|
if ($payload === null) {
|
|
sleep(1); // Wait for new jobs
|
|
continue;
|
|
}
|
|
|
|
try {
|
|
// Execute job
|
|
$result = $payload->job->handle();
|
|
|
|
// Mark as completed
|
|
if ($queue instanceof PersistentQueue) {
|
|
$queue->markJobCompleted($payload->getJobId(), $result);
|
|
}
|
|
} catch (\Throwable $e) {
|
|
// Mark as failed
|
|
if ($queue instanceof PersistentQueue) {
|
|
$queue->markJobFailed(
|
|
$payload->getJobId(),
|
|
$e->getMessage(),
|
|
$e
|
|
);
|
|
}
|
|
}
|
|
}
|
|
```
|
|
|
|
## Priority System
|
|
|
|
### QueuePriority Value Object
|
|
|
|
```php
|
|
enum QueuePriority: int
|
|
{
|
|
case CRITICAL = 1000; // System-critical operations
|
|
case HIGH = 750; // User-facing operations
|
|
case NORMAL = 500; // Standard background tasks
|
|
case LOW = 250; // Non-urgent maintenance
|
|
|
|
public static function critical(): self { return self::CRITICAL; }
|
|
public static function high(): self { return self::HIGH; }
|
|
public static function normal(): self { return self::NORMAL; }
|
|
public static function low(): self { return self::LOW; }
|
|
|
|
public function toString(): string
|
|
{
|
|
return match ($this) {
|
|
self::CRITICAL => 'critical',
|
|
self::HIGH => 'high',
|
|
self::NORMAL => 'normal',
|
|
self::LOW => 'low',
|
|
};
|
|
}
|
|
}
|
|
```
|
|
|
|
### Priority Usage Examples
|
|
|
|
```php
|
|
// Critical: Payment processing, real-time notifications
|
|
$queue->push(JobPayload::create(
|
|
job: new ProcessPaymentJob($payment),
|
|
priority: QueuePriority::critical(),
|
|
timeout: Duration::fromSeconds(30)
|
|
));
|
|
|
|
// High: User-triggered emails, report generation
|
|
$queue->push(JobPayload::create(
|
|
job: new GenerateReportJob($userId),
|
|
priority: QueuePriority::high()
|
|
));
|
|
|
|
// Normal: Scheduled maintenance, cache warming
|
|
$queue->push(JobPayload::create(
|
|
job: new WarmCacheJob(),
|
|
priority: QueuePriority::normal()
|
|
));
|
|
|
|
// Low: Cleanup tasks, analytics aggregation
|
|
$queue->push(JobPayload::create(
|
|
job: new CleanupOldLogsJob(),
|
|
priority: QueuePriority::low()
|
|
));
|
|
```
|
|
|
|
## Retry Mechanisms
|
|
|
|
### RetryStrategy Interface
|
|
|
|
```php
|
|
interface RetryStrategy
|
|
{
|
|
public function shouldRetry(int $attempts): bool;
|
|
public function getDelay(int $attempts): Duration;
|
|
public function getMaxAttempts(): int;
|
|
}
|
|
```
|
|
|
|
### Exponential Backoff Strategy
|
|
|
|
```php
|
|
final readonly class ExponentialBackoffStrategy implements RetryStrategy
|
|
{
|
|
public function __construct(
|
|
private int $maxAttempts = 3,
|
|
private int $baseDelaySeconds = 60,
|
|
private int $maxDelaySeconds = 3600
|
|
) {}
|
|
|
|
public function shouldRetry(int $attempts): bool
|
|
{
|
|
return $attempts < $this->maxAttempts;
|
|
}
|
|
|
|
public function getDelay(int $attempts): Duration
|
|
{
|
|
// 2^attempts * baseDelay, capped at maxDelay
|
|
$delaySeconds = min(
|
|
pow(2, $attempts) * $this->baseDelaySeconds,
|
|
$this->maxDelaySeconds
|
|
);
|
|
|
|
return Duration::fromSeconds($delaySeconds);
|
|
}
|
|
|
|
public function getMaxAttempts(): int
|
|
{
|
|
return $this->maxAttempts;
|
|
}
|
|
}
|
|
```
|
|
|
|
### Custom Retry Strategy
|
|
|
|
```php
|
|
final readonly class CustomRetryStrategy implements RetryStrategy
|
|
{
|
|
public function __construct(
|
|
private array $retryDelays = [60, 300, 1800] // 1min, 5min, 30min
|
|
) {}
|
|
|
|
public function shouldRetry(int $attempts): bool
|
|
{
|
|
return $attempts < count($this->retryDelays);
|
|
}
|
|
|
|
public function getDelay(int $attempts): Duration
|
|
{
|
|
$index = min($attempts, count($this->retryDelays) - 1);
|
|
return Duration::fromSeconds($this->retryDelays[$index]);
|
|
}
|
|
|
|
public function getMaxAttempts(): int
|
|
{
|
|
return count($this->retryDelays);
|
|
}
|
|
}
|
|
```
|
|
|
|
### Usage with Jobs
|
|
|
|
```php
|
|
// Standard exponential backoff
|
|
$payload = JobPayload::create(
|
|
job: $job,
|
|
retryStrategy: new ExponentialBackoffStrategy(
|
|
maxAttempts: 5,
|
|
baseDelaySeconds: 60, // Start with 1 minute
|
|
maxDelaySeconds: 3600 // Cap at 1 hour
|
|
)
|
|
);
|
|
|
|
// Custom retry schedule
|
|
$payload = JobPayload::create(
|
|
job: $job,
|
|
retryStrategy: new CustomRetryStrategy([60, 300, 900, 3600])
|
|
);
|
|
|
|
// No retries
|
|
$payload = JobPayload::create(
|
|
job: $job,
|
|
retryStrategy: null // Job fails permanently on first error
|
|
);
|
|
```
|
|
|
|
## Failed Jobs & Dead Letter Queue
|
|
|
|
### JobStatus Lifecycle
|
|
|
|
```php
|
|
enum JobStatus: string
|
|
{
|
|
case PENDING = 'pending'; // Waiting in queue
|
|
case PROCESSING = 'processing'; // Currently executing
|
|
case COMPLETED = 'completed'; // Successfully completed
|
|
case FAILED = 'failed'; // Failed with error
|
|
case RETRYING = 'retrying'; // Failed, will retry
|
|
case CANCELLED = 'cancelled'; // Manually cancelled
|
|
case EXPIRED = 'expired'; // Timeout exceeded
|
|
|
|
public function isFinal(): bool
|
|
{
|
|
return match ($this) {
|
|
self::COMPLETED, self::CANCELLED, self::EXPIRED => true,
|
|
default => false
|
|
};
|
|
}
|
|
|
|
public function isActive(): bool
|
|
{
|
|
return match ($this) {
|
|
self::PENDING, self::PROCESSING, self::RETRYING => true,
|
|
default => false
|
|
};
|
|
}
|
|
|
|
public function isFailure(): bool
|
|
{
|
|
return match ($this) {
|
|
self::FAILED, self::EXPIRED => true,
|
|
default => false
|
|
};
|
|
}
|
|
}
|
|
```
|
|
|
|
### Dead Letter Queue Manager
|
|
|
|
When jobs exceed max retry attempts, they are automatically moved to a dead letter queue:
|
|
|
|
```php
|
|
final readonly class DeadLetterManager
|
|
{
|
|
public function __construct(
|
|
private DeadLetterQueueInterface $deadLetterQueue,
|
|
private ProductionJobPersistenceLayer $persistenceLayer
|
|
) {}
|
|
|
|
/**
|
|
* Move failed job to dead letter queue
|
|
*/
|
|
public function moveJobToDeadLetterQueue(
|
|
JobIndexEntry $failedJob,
|
|
FailureReason $failureReason
|
|
): void {
|
|
$dlqName = DeadLetterQueueName::forQueue(
|
|
QueueName::fromString($failedJob->queueType)
|
|
);
|
|
|
|
$deadLetterJob = DeadLetterJob::fromFailedJob(
|
|
failedJob: $failedJob,
|
|
deadLetterQueueName: $dlqName,
|
|
failureReason: $failureReason
|
|
);
|
|
|
|
$this->deadLetterQueue->addFailedJob($deadLetterJob);
|
|
$this->persistenceLayer->deleteJob($failedJob->jobId);
|
|
}
|
|
|
|
/**
|
|
* Handle job failure with automatic DLQ routing
|
|
*/
|
|
public function handleJobFailure(
|
|
string $jobId,
|
|
\Throwable $exception,
|
|
int $maxAttempts = 3
|
|
): bool {
|
|
$jobEntry = $this->persistenceLayer->getJobById($jobId);
|
|
|
|
if ($jobEntry->attempts >= $maxAttempts) {
|
|
$this->moveJobToDeadLetterQueue(
|
|
failedJob: $jobEntry,
|
|
failureReason: FailureReason::fromException($exception)
|
|
);
|
|
|
|
return true;
|
|
}
|
|
|
|
return false;
|
|
}
|
|
}
|
|
```
|
|
|
|
### Managing Failed Jobs
|
|
|
|
```php
|
|
use App\Framework\Queue\Services\DeadLetterManager;
|
|
|
|
// Get failed jobs
|
|
$failedJobs = $deadLetterManager->getFailedJobs(
|
|
originalQueue: QueueName::fromString('emails'),
|
|
limit: 100
|
|
);
|
|
|
|
// Retry a specific job
|
|
$deadLetterManager->retryJob($deadLetterJobId);
|
|
|
|
// Retry all jobs in a dead letter queue
|
|
$retriedCount = $deadLetterManager->retryAllJobs($deadLetterQueueName);
|
|
|
|
// Clear dead letter queue
|
|
$deletedCount = $deadLetterManager->clearDeadLetterQueue($deadLetterQueueName);
|
|
|
|
// Get statistics
|
|
$stats = $deadLetterManager->getStatistics();
|
|
// [
|
|
// 'default.dlq' => ['count' => 23, 'oldest' => '2024-01-15 10:30:00'],
|
|
// 'emails.dlq' => ['count' => 5, 'oldest' => '2024-01-16 14:20:00']
|
|
// ]
|
|
```
|
|
|
|
## Queue Monitoring
|
|
|
|
### Queue Statistics
|
|
|
|
```php
|
|
// Get comprehensive statistics
|
|
$stats = $queue->getStats();
|
|
|
|
// Example output:
|
|
[
|
|
'total_size' => 1523,
|
|
'priority_queue_size' => 1200,
|
|
'delayed_queue_size' => 323,
|
|
'priority_breakdown' => [
|
|
'critical' => 15,
|
|
'high' => 150,
|
|
'normal' => 1050,
|
|
'low' => 308
|
|
],
|
|
'stats' => [
|
|
'total_pushed' => 125430,
|
|
'total_popped' => 123907,
|
|
'high_pushed' => 15234,
|
|
'normal_pushed' => 95678,
|
|
'low_pushed' => 14518,
|
|
'last_activity' => 1705410234
|
|
],
|
|
'persistence' => [
|
|
'total_jobs' => 125430,
|
|
'completed' => 120145,
|
|
'failed' => 3762,
|
|
'retrying' => 1523
|
|
]
|
|
]
|
|
```
|
|
|
|
### Job Metrics
|
|
|
|
```php
|
|
use App\Framework\Queue\Services\JobMetricsManager;
|
|
|
|
// Get job performance metrics
|
|
$metrics = $metricsManager->getJobMetrics($jobId);
|
|
|
|
// [
|
|
// 'execution_time_ms' => 1234,
|
|
// 'memory_used_mb' => 45.2,
|
|
// 'peak_memory_mb' => 52.8,
|
|
// 'attempts' => 1,
|
|
// 'last_attempt_at' => '2024-01-16 15:30:00'
|
|
// ]
|
|
|
|
// Get aggregated metrics by job type
|
|
$aggregated = $metricsManager->getAggregatedMetrics('email.welcome');
|
|
|
|
// [
|
|
// 'total_executions' => 1523,
|
|
// 'avg_execution_time_ms' => 856,
|
|
// 'p95_execution_time_ms' => 1450,
|
|
// 'p99_execution_time_ms' => 2340,
|
|
// 'success_rate' => 0.987,
|
|
// 'avg_memory_mb' => 38.4
|
|
// ]
|
|
```
|
|
|
|
### Progress Tracking
|
|
|
|
For long-running jobs, track progress:
|
|
|
|
```php
|
|
final readonly class LargeDataImportJob
|
|
{
|
|
public function __construct(
|
|
private ProgressManager $progressManager,
|
|
private JobId $jobId,
|
|
private array $dataFiles
|
|
) {}
|
|
|
|
public function handle(): array
|
|
{
|
|
$totalSteps = count($this->dataFiles);
|
|
|
|
foreach ($this->dataFiles as $index => $file) {
|
|
// Process file
|
|
$this->processFile($file);
|
|
|
|
// Update progress
|
|
$this->progressManager->updateProgress(
|
|
jobId: $this->jobId,
|
|
currentStep: $index + 1,
|
|
totalSteps: $totalSteps,
|
|
message: "Processing {$file}"
|
|
);
|
|
}
|
|
|
|
return ['files_processed' => $totalSteps];
|
|
}
|
|
}
|
|
|
|
// Query progress
|
|
$progress = $progressManager->getProgress($jobId);
|
|
|
|
// [
|
|
// 'current_step' => 45,
|
|
// 'total_steps' => 100,
|
|
// 'percentage' => 45.0,
|
|
// 'message' => 'Processing data_2024_01_16.csv',
|
|
// 'updated_at' => '2024-01-16 15:32:45'
|
|
// ]
|
|
```
|
|
|
|
### Health Monitoring
|
|
|
|
```php
|
|
use App\Framework\Queue\Services\WorkerHealthCheckService;
|
|
|
|
// Check worker health
|
|
$health = $healthCheckService->checkAllWorkers();
|
|
|
|
// [
|
|
// 'healthy_workers' => 8,
|
|
// 'unhealthy_workers' => 1,
|
|
// 'total_workers' => 9,
|
|
// 'workers' => [
|
|
// ['id' => 'worker-1', 'status' => 'healthy', 'last_heartbeat' => '2024-01-16 15:33:00'],
|
|
// ['id' => 'worker-2', 'status' => 'unhealthy', 'last_heartbeat' => '2024-01-16 15:20:15']
|
|
// ]
|
|
// ]
|
|
|
|
// Register worker heartbeat
|
|
$healthCheckService->registerHeartbeat($workerId);
|
|
|
|
// Cleanup stale workers
|
|
$cleanedCount = $healthCheckService->cleanupStaleWorkers(
|
|
staleThreshold: Duration::fromMinutes(5)
|
|
);
|
|
```
|
|
|
|
## Best Practices
|
|
|
|
### 1. Job Design
|
|
|
|
**Keep Jobs Small and Focused**:
|
|
```php
|
|
// ✅ Good: Single responsibility
|
|
final readonly class SendEmailJob
|
|
{
|
|
public function handle(): array
|
|
{
|
|
$this->emailService->send($this->email, $this->template);
|
|
return ['sent' => true];
|
|
}
|
|
}
|
|
|
|
// ❌ Bad: Multiple responsibilities
|
|
final readonly class SendEmailAndUpdateUserJob
|
|
{
|
|
public function handle(): array
|
|
{
|
|
$this->emailService->send($this->email);
|
|
$this->userRepository->update($this->userId); // Separate concern
|
|
$this->analyticsService->track($this->event); // Separate concern
|
|
}
|
|
}
|
|
```
|
|
|
|
**Make Jobs Idempotent**:
|
|
```php
|
|
final readonly class ProcessPaymentJob
|
|
{
|
|
public function handle(): array
|
|
{
|
|
// Check if already processed (idempotency)
|
|
if ($this->paymentRepository->isProcessed($this->paymentId)) {
|
|
return ['status' => 'already_processed'];
|
|
}
|
|
|
|
// Process payment
|
|
$result = $this->paymentGateway->charge($this->payment);
|
|
|
|
// Mark as processed
|
|
$this->paymentRepository->markAsProcessed($this->paymentId);
|
|
|
|
return $result;
|
|
}
|
|
}
|
|
```
|
|
|
|
**Use Value Objects for Job Data**:
|
|
```php
|
|
// ✅ Good: Type-safe value objects
|
|
final readonly class SendInvoiceJob
|
|
{
|
|
public function __construct(
|
|
private InvoiceId $invoiceId,
|
|
private Email $recipientEmail,
|
|
private Money $amount
|
|
) {}
|
|
}
|
|
|
|
// ❌ Bad: Primitive arrays
|
|
final readonly class SendInvoiceJob
|
|
{
|
|
public function __construct(
|
|
private array $data // What structure? What validation?
|
|
) {}
|
|
}
|
|
```
|
|
|
|
### 2. Priority Assignment
|
|
|
|
**Critical**: System-critical operations that must execute immediately
|
|
- Payment processing
|
|
- Real-time notifications
|
|
- Security alerts
|
|
|
|
**High**: User-facing operations with user waiting
|
|
- Password reset emails
|
|
- Welcome emails
|
|
- Report generation (user-triggered)
|
|
|
|
**Normal**: Standard background tasks
|
|
- Scheduled maintenance
|
|
- Cache warming
|
|
- Analytics processing
|
|
|
|
**Low**: Non-urgent maintenance
|
|
- Log cleanup
|
|
- Old data archival
|
|
- Non-critical aggregations
|
|
|
|
### 3. Retry Strategy
|
|
|
|
**Use Exponential Backoff** for transient failures:
|
|
```php
|
|
// External API calls, network operations
|
|
JobPayload::create(
|
|
job: $apiCallJob,
|
|
retryStrategy: new ExponentialBackoffStrategy(
|
|
maxAttempts: 5,
|
|
baseDelaySeconds: 60,
|
|
maxDelaySeconds: 3600
|
|
)
|
|
);
|
|
```
|
|
|
|
**No Retries** for permanent failures:
|
|
```php
|
|
// Invalid input, business logic violations
|
|
JobPayload::create(
|
|
job: $validationJob,
|
|
retryStrategy: null // Fail immediately
|
|
);
|
|
```
|
|
|
|
**Custom Schedule** for specific requirements:
|
|
```php
|
|
// Time-sensitive operations
|
|
JobPayload::create(
|
|
job: $timeSensitiveJob,
|
|
retryStrategy: new CustomRetryStrategy([30, 60, 120]) // Quick retries
|
|
);
|
|
```
|
|
|
|
### 4. Monitoring
|
|
|
|
**Track Key Metrics**:
|
|
- Queue depth by priority
|
|
- Average job execution time
|
|
- Success/failure rates
|
|
- Dead letter queue size
|
|
- Worker health status
|
|
|
|
**Set Up Alerts**:
|
|
```php
|
|
// Alert on high queue depth
|
|
if ($stats['total_size'] > 10000) {
|
|
$alerting->send('Queue depth exceeded threshold');
|
|
}
|
|
|
|
// Alert on high failure rate
|
|
$failureRate = $stats['persistence']['failed'] / $stats['persistence']['total_jobs'];
|
|
if ($failureRate > 0.05) { // 5% failure rate
|
|
$alerting->send('High job failure rate detected');
|
|
}
|
|
|
|
// Alert on dead letter queue growth
|
|
if ($dlqStats['count'] > 100) {
|
|
$alerting->send('Dead letter queue requires attention');
|
|
}
|
|
```
|
|
|
|
### 5. Resource Management
|
|
|
|
**Implement Timeouts**:
|
|
```php
|
|
JobPayload::create(
|
|
job: $job,
|
|
timeout: Duration::fromMinutes(5) // Kill job after 5 minutes
|
|
);
|
|
```
|
|
|
|
**Manage Memory**:
|
|
```php
|
|
final readonly class LargeDataProcessingJob
|
|
{
|
|
public function handle(): array
|
|
{
|
|
// Process in batches to avoid memory exhaustion
|
|
foreach ($this->getBatches() as $batch) {
|
|
$this->processBatch($batch);
|
|
|
|
// Force garbage collection
|
|
gc_collect_cycles();
|
|
}
|
|
}
|
|
}
|
|
```
|
|
|
|
**Connection Pooling**:
|
|
```php
|
|
// Reuse connections across jobs
|
|
final readonly class DatabaseJob
|
|
{
|
|
public function handle(): array
|
|
{
|
|
// Use connection pool instead of new connections
|
|
$connection = $this->connectionPool->acquire();
|
|
|
|
try {
|
|
// Execute queries
|
|
$result = $connection->query($sql);
|
|
} finally {
|
|
// Always release back to pool
|
|
$this->connectionPool->release($connection);
|
|
}
|
|
}
|
|
}
|
|
```
|
|
|
|
### 6. Error Handling
|
|
|
|
**Graceful Degradation**:
|
|
```php
|
|
final readonly class EmailJob
|
|
{
|
|
public function handle(): array
|
|
{
|
|
try {
|
|
$this->emailService->send($this->email);
|
|
return ['status' => 'sent'];
|
|
} catch (EmailProviderException $e) {
|
|
// Log error but don't fail job
|
|
$this->logger->warning('Email provider unavailable', [
|
|
'error' => $e->getMessage()
|
|
]);
|
|
|
|
// Use fallback provider
|
|
$this->fallbackEmailService->send($this->email);
|
|
return ['status' => 'sent_via_fallback'];
|
|
}
|
|
}
|
|
}
|
|
```
|
|
|
|
**Detailed Error Logging**:
|
|
```php
|
|
try {
|
|
$job->handle();
|
|
} catch (\Throwable $e) {
|
|
$this->logger->error('Job failed', [
|
|
'job_id' => $jobId,
|
|
'job_class' => get_class($job),
|
|
'error' => $e->getMessage(),
|
|
'trace' => $e->getTraceAsString(),
|
|
'attempts' => $attempts,
|
|
'metadata' => $payload->metadata->toArray()
|
|
]);
|
|
|
|
throw $e;
|
|
}
|
|
```
|
|
|
|
### 7. Testing
|
|
|
|
**Unit Test Jobs**:
|
|
```php
|
|
it('sends welcome email', function () {
|
|
$job = new SendWelcomeEmailJob(
|
|
userId: UserId::generate(),
|
|
email: new Email('test@example.com'),
|
|
userName: new UserName('Test User')
|
|
);
|
|
|
|
$result = $job->handle();
|
|
|
|
expect($result['sent_at'])->toBeGreaterThan(0);
|
|
});
|
|
```
|
|
|
|
**Integration Test Queue Operations**:
|
|
```php
|
|
it('processes jobs in priority order', function () {
|
|
$queue = new RedisQueue($connection);
|
|
|
|
// Push jobs with different priorities
|
|
$queue->push(JobPayload::create($lowJob, QueuePriority::low()));
|
|
$queue->push(JobPayload::create($highJob, QueuePriority::high()));
|
|
$queue->push(JobPayload::create($normalJob, QueuePriority::normal()));
|
|
|
|
// Pop and verify order
|
|
$first = $queue->pop();
|
|
expect($first->job)->toBe($highJob);
|
|
|
|
$second = $queue->pop();
|
|
expect($second->job)->toBe($normalJob);
|
|
|
|
$third = $queue->pop();
|
|
expect($third->job)->toBe($lowJob);
|
|
});
|
|
```
|
|
|
|
## Framework Integration
|
|
|
|
### With Event System
|
|
|
|
```php
|
|
// Dispatch jobs in response to events
|
|
#[OnEvent(priority: 100)]
|
|
public function onUserRegistered(UserRegisteredEvent $event): void
|
|
{
|
|
$job = new SendWelcomeEmailJob(
|
|
userId: $event->userId,
|
|
email: $event->email,
|
|
userName: $event->userName
|
|
);
|
|
|
|
$this->queue->push(JobPayload::immediate($job));
|
|
}
|
|
```
|
|
|
|
### With Scheduler
|
|
|
|
See [Scheduler-Queue Pipeline](scheduler-queue-pipeline.md) for complete integration documentation.
|
|
|
|
```php
|
|
// Schedule recurring job dispatch
|
|
$scheduler->schedule(
|
|
'send-daily-digest',
|
|
CronSchedule::fromExpression('0 8 * * *'), // Every day at 8 AM
|
|
fn() => $this->queue->push(JobPayload::immediate(new SendDailyDigestJob()))
|
|
);
|
|
```
|
|
|
|
### With Command Bus
|
|
|
|
```php
|
|
// Dispatch jobs via command handlers
|
|
final readonly class CreateUserHandler
|
|
{
|
|
public function handle(CreateUserCommand $command): User
|
|
{
|
|
$user = $this->userRepository->create($command);
|
|
|
|
// Queue welcome email
|
|
$this->queue->push(JobPayload::immediate(
|
|
new SendWelcomeEmailJob($user->id, $user->email, $user->name)
|
|
));
|
|
|
|
return $user;
|
|
}
|
|
}
|
|
```
|
|
|
|
## Console Commands
|
|
|
|
The framework provides console commands for queue management:
|
|
|
|
```bash
|
|
# Monitor queue status
|
|
php console.php queue:status
|
|
|
|
# Process jobs (worker)
|
|
php console.php queue:work --queue=default --timeout=60
|
|
|
|
# Retry failed jobs
|
|
php console.php queue:retry --queue=default --limit=50
|
|
|
|
# Clear queue
|
|
php console.php queue:clear --queue=default
|
|
|
|
# View failed jobs
|
|
php console.php queue:failed --limit=20
|
|
|
|
# Clean up old jobs
|
|
php console.php queue:cleanup --older-than=7d
|
|
```
|
|
|
|
## Performance Characteristics
|
|
|
|
**Redis Queue**:
|
|
- Throughput: 10,000+ jobs/second
|
|
- Latency: <5ms per operation
|
|
- Memory: ~200 bytes per job (serialized)
|
|
- Scalability: Horizontal (add Redis instances)
|
|
|
|
**File Queue**:
|
|
- Throughput: 1,000+ jobs/second
|
|
- Latency: <20ms per operation
|
|
- Memory: Minimal (disk-based)
|
|
- Scalability: Vertical (single server)
|
|
|
|
**Job Execution**:
|
|
- Overhead: <10ms per job (framework overhead)
|
|
- Timeout: Configurable per job
|
|
- Memory: Depends on job complexity
|
|
- Concurrency: Multiple workers supported |