- Remove middleware reference from Gitea Traefik labels (caused routing issues) - Optimize Gitea connection pool settings (MAX_IDLE_CONNS=30, authentication_timeout=180s) - Add explicit service reference in Traefik labels - Fix intermittent 504 timeouts by improving PostgreSQL connection handling Fixes Gitea unreachability via git.michaelschiemer.de
27 KiB
Queue System
Comprehensive documentation of the asynchronous job processing system in the Custom PHP Framework.
Overview
The Queue System provides robust, scalable background job processing with support for multiple queue drivers, priority-based execution, retry mechanisms, dead letter queues, and comprehensive monitoring.
Core Features:
- Multi-Driver Support: File-based, Redis, and Database-backed queues
- Priority Queues: Critical, high, normal, low priority levels
- Delayed Jobs: Schedule jobs for future execution
- Retry Strategies: Exponential backoff with configurable attempts
- Dead Letter Queue: Automatic failed job management
- Job Persistence: Full job lifecycle tracking and history
- Progress Tracking: Real-time job progress monitoring
- Distributed Locking: Multi-worker coordination
- Metrics & Monitoring: Performance statistics and health checks
Queue Interface
The framework provides a unified Queue interface that all queue drivers implement:
interface Queue
{
/**
* Push a job to the queue
*/
public function push(JobPayload $payload): void;
/**
* Pop the next available job from the queue
*/
public function pop(): ?JobPayload;
/**
* Peek at the next job without removing it
*/
public function peek(): ?JobPayload;
/**
* Get the number of jobs in the queue
*/
public function size(): int;
/**
* Clear all jobs from the queue
*/
public function clear(): int;
/**
* Get queue statistics
*/
public function getStats(): array;
}
JobPayload Value Object
All jobs are wrapped in a JobPayload value object that provides type safety and configuration:
final readonly class JobPayload
{
public function __construct(
public object $job, // The job instance
public QueuePriority $priority, // Priority level
public Duration $delay, // Execution delay
public ?Duration $timeout = null, // Max execution time
public ?RetryStrategy $retryStrategy = null, // Retry configuration
public ?JobMetadata $metadata = null // Additional metadata
) {}
}
Factory Methods:
// Immediate execution with high priority
$payload = JobPayload::immediate($job);
// Delayed execution
$payload = JobPayload::delayed($job, Duration::fromMinutes(5));
// Critical priority with timeout
$payload = JobPayload::critical($job);
// Background job with retries
$payload = JobPayload::background($job);
// Custom configuration
$payload = JobPayload::create(
job: $job,
priority: QueuePriority::high(),
delay: Duration::fromSeconds(30),
timeout: Duration::fromMinutes(5),
retryStrategy: new ExponentialBackoffStrategy(maxAttempts: 3)
);
Queue Drivers
File-Based Queue (Default)
Simple file-based queue implementation for development and small-scale applications:
Features:
- No external dependencies
- Persistent across restarts
- Suitable for development and small production loads
- Priority-based sorting
Usage:
use App\Framework\Queue\FileQueue;
$queue = new FileQueue(
directory: '/var/www/storage/queues',
queueName: 'default'
);
$queue->push(JobPayload::immediate($job));
Redis Queue (Production)
High-performance Redis-backed queue with advanced features:
Features:
- High throughput (10,000+ jobs/second)
- Redis sorted sets for priority queues
- Delayed job support via timestamp scoring
- Atomic operations for thread safety
- Real-time statistics
Architecture:
- Priority Queue: Redis sorted set with score = priority + timestamp
- Delayed Queue: Redis sorted set with score = execution timestamp
- Statistics: Redis hash with counters
Usage:
use App\Framework\Queue\RedisQueue;
$queue = new RedisQueue(
connection: $redisConnection,
queueName: 'production',
serializer: new PhpSerializer()
);
$queue->push(JobPayload::immediate($job));
// Statistics
$stats = $queue->getStats();
// [
// 'total_size' => 1523,
// 'priority_queue_size' => 1200,
// 'delayed_queue_size' => 323,
// 'priority_breakdown' => ['high' => 150, 'normal' => 1050, 'low' => 323]
// ]
Score Calculation:
// Lower score = higher priority in Redis
$priorityScore = 1000 - $priority->value; // Invert priority
$timeScore = $currentTime / 1000000; // Microsecond precision for FIFO
$finalScore = $priorityScore + $timeScore;
Delayed Job Processing:
// Automatic processing of ready delayed jobs
private function processDelayedJobs(): void
{
$currentTime = time();
// Get all delayed jobs where score <= current time
$readyJobs = $this->redis->zRangeByScore($this->delayedKey, 0, $currentTime);
foreach ($readyJobs as $serializedPayload) {
// Remove from delayed queue
$this->redis->zRem($this->delayedKey, $serializedPayload);
// Add to priority queue
$this->redis->zAdd($this->priorityKey, $score, $serializedPayload);
}
}
Persistent Queue Decorator
Wraps any queue implementation with job persistence and tracking:
final readonly class PersistentQueue implements Queue
{
public function __construct(
private Queue $baseQueue,
private JobPersistenceLayer $persistence,
private QueueType $queueType
) {}
public function push(JobPayload $payload): void
{
// Generate job ID
$jobId = JobId::generate();
// Store in persistence layer
$this->persistence->storeJob(
jobId: $jobId,
queueType: $this->queueType,
jobData: $payload->job,
maxAttempts: $payload->retryStrategy->maxAttempts
);
// Push to base queue with job ID
$this->baseQueue->push($payload->withJobId($jobId));
}
public function markJobCompleted(JobId $jobId, array $result = []): void
{
$this->persistence->markAsCompleted($jobId, $result);
}
public function markJobFailed(JobId $jobId, string $errorMessage): void
{
$jobState = $this->persistence->markAsFailed($jobId, $errorMessage);
// Automatic retry if applicable
if ($jobState->canRetry()) {
$this->baseQueue->push($retryPayload);
$this->persistence->markForRetry($jobId, $errorMessage);
}
}
}
Background Jobs
Creating a Job
Jobs are simple PHP classes with a handle() method:
final readonly class SendWelcomeEmailJob
{
public function __construct(
private UserId $userId,
private Email $email,
private UserName $userName
) {}
public function handle(): array
{
// Business logic
$this->emailService->send(
to: $this->email,
template: 'welcome',
data: ['name' => $this->userName->value]
);
// Return result for logging
return [
'user_id' => $this->userId->toString(),
'email' => $this->email->getMasked(),
'sent_at' => time()
];
}
public function getType(): string
{
return 'email.welcome';
}
}
Dispatching Jobs
use App\Framework\Queue\Queue;
use App\Framework\Queue\ValueObjects\JobPayload;
// Immediate execution
$job = new SendWelcomeEmailJob($userId, $email, $userName);
$queue->push(JobPayload::immediate($job));
// Delayed execution (5 minutes)
$queue->push(JobPayload::delayed($job, Duration::fromMinutes(5)));
// Background job with retries
$queue->push(JobPayload::background($job));
// Custom configuration
$queue->push(JobPayload::create(
job: $job,
priority: QueuePriority::high(),
timeout: Duration::fromMinutes(5),
retryStrategy: new ExponentialBackoffStrategy(maxAttempts: 3)
));
Processing Jobs
// Worker loop
while (true) {
$payload = $queue->pop();
if ($payload === null) {
sleep(1); // Wait for new jobs
continue;
}
try {
// Execute job
$result = $payload->job->handle();
// Mark as completed
if ($queue instanceof PersistentQueue) {
$queue->markJobCompleted($payload->getJobId(), $result);
}
} catch (\Throwable $e) {
// Mark as failed
if ($queue instanceof PersistentQueue) {
$queue->markJobFailed(
$payload->getJobId(),
$e->getMessage(),
$e
);
}
}
}
Priority System
QueuePriority Value Object
enum QueuePriority: int
{
case CRITICAL = 1000; // System-critical operations
case HIGH = 750; // User-facing operations
case NORMAL = 500; // Standard background tasks
case LOW = 250; // Non-urgent maintenance
public static function critical(): self { return self::CRITICAL; }
public static function high(): self { return self::HIGH; }
public static function normal(): self { return self::NORMAL; }
public static function low(): self { return self::LOW; }
public function toString(): string
{
return match ($this) {
self::CRITICAL => 'critical',
self::HIGH => 'high',
self::NORMAL => 'normal',
self::LOW => 'low',
};
}
}
Priority Usage Examples
// Critical: Payment processing, real-time notifications
$queue->push(JobPayload::create(
job: new ProcessPaymentJob($payment),
priority: QueuePriority::critical(),
timeout: Duration::fromSeconds(30)
));
// High: User-triggered emails, report generation
$queue->push(JobPayload::create(
job: new GenerateReportJob($userId),
priority: QueuePriority::high()
));
// Normal: Scheduled maintenance, cache warming
$queue->push(JobPayload::create(
job: new WarmCacheJob(),
priority: QueuePriority::normal()
));
// Low: Cleanup tasks, analytics aggregation
$queue->push(JobPayload::create(
job: new CleanupOldLogsJob(),
priority: QueuePriority::low()
));
Retry Mechanisms
RetryStrategy Interface
interface RetryStrategy
{
public function shouldRetry(int $attempts): bool;
public function getDelay(int $attempts): Duration;
public function getMaxAttempts(): int;
}
Exponential Backoff Strategy
final readonly class ExponentialBackoffStrategy implements RetryStrategy
{
public function __construct(
private int $maxAttempts = 3,
private int $baseDelaySeconds = 60,
private int $maxDelaySeconds = 3600
) {}
public function shouldRetry(int $attempts): bool
{
return $attempts < $this->maxAttempts;
}
public function getDelay(int $attempts): Duration
{
// 2^attempts * baseDelay, capped at maxDelay
$delaySeconds = min(
pow(2, $attempts) * $this->baseDelaySeconds,
$this->maxDelaySeconds
);
return Duration::fromSeconds($delaySeconds);
}
public function getMaxAttempts(): int
{
return $this->maxAttempts;
}
}
Custom Retry Strategy
final readonly class CustomRetryStrategy implements RetryStrategy
{
public function __construct(
private array $retryDelays = [60, 300, 1800] // 1min, 5min, 30min
) {}
public function shouldRetry(int $attempts): bool
{
return $attempts < count($this->retryDelays);
}
public function getDelay(int $attempts): Duration
{
$index = min($attempts, count($this->retryDelays) - 1);
return Duration::fromSeconds($this->retryDelays[$index]);
}
public function getMaxAttempts(): int
{
return count($this->retryDelays);
}
}
Usage with Jobs
// Standard exponential backoff
$payload = JobPayload::create(
job: $job,
retryStrategy: new ExponentialBackoffStrategy(
maxAttempts: 5,
baseDelaySeconds: 60, // Start with 1 minute
maxDelaySeconds: 3600 // Cap at 1 hour
)
);
// Custom retry schedule
$payload = JobPayload::create(
job: $job,
retryStrategy: new CustomRetryStrategy([60, 300, 900, 3600])
);
// No retries
$payload = JobPayload::create(
job: $job,
retryStrategy: null // Job fails permanently on first error
);
Failed Jobs & Dead Letter Queue
JobStatus Lifecycle
enum JobStatus: string
{
case PENDING = 'pending'; // Waiting in queue
case PROCESSING = 'processing'; // Currently executing
case COMPLETED = 'completed'; // Successfully completed
case FAILED = 'failed'; // Failed with error
case RETRYING = 'retrying'; // Failed, will retry
case CANCELLED = 'cancelled'; // Manually cancelled
case EXPIRED = 'expired'; // Timeout exceeded
public function isFinal(): bool
{
return match ($this) {
self::COMPLETED, self::CANCELLED, self::EXPIRED => true,
default => false
};
}
public function isActive(): bool
{
return match ($this) {
self::PENDING, self::PROCESSING, self::RETRYING => true,
default => false
};
}
public function isFailure(): bool
{
return match ($this) {
self::FAILED, self::EXPIRED => true,
default => false
};
}
}
Dead Letter Queue Manager
When jobs exceed max retry attempts, they are automatically moved to a dead letter queue:
final readonly class DeadLetterManager
{
public function __construct(
private DeadLetterQueueInterface $deadLetterQueue,
private ProductionJobPersistenceLayer $persistenceLayer
) {}
/**
* Move failed job to dead letter queue
*/
public function moveJobToDeadLetterQueue(
JobIndexEntry $failedJob,
FailureReason $failureReason
): void {
$dlqName = DeadLetterQueueName::forQueue(
QueueName::fromString($failedJob->queueType)
);
$deadLetterJob = DeadLetterJob::fromFailedJob(
failedJob: $failedJob,
deadLetterQueueName: $dlqName,
failureReason: $failureReason
);
$this->deadLetterQueue->addFailedJob($deadLetterJob);
$this->persistenceLayer->deleteJob($failedJob->jobId);
}
/**
* Handle job failure with automatic DLQ routing
*/
public function handleJobFailure(
string $jobId,
\Throwable $exception,
int $maxAttempts = 3
): bool {
$jobEntry = $this->persistenceLayer->getJobById($jobId);
if ($jobEntry->attempts >= $maxAttempts) {
$this->moveJobToDeadLetterQueue(
failedJob: $jobEntry,
failureReason: FailureReason::fromException($exception)
);
return true;
}
return false;
}
}
Managing Failed Jobs
use App\Framework\Queue\Services\DeadLetterManager;
// Get failed jobs
$failedJobs = $deadLetterManager->getFailedJobs(
originalQueue: QueueName::fromString('emails'),
limit: 100
);
// Retry a specific job
$deadLetterManager->retryJob($deadLetterJobId);
// Retry all jobs in a dead letter queue
$retriedCount = $deadLetterManager->retryAllJobs($deadLetterQueueName);
// Clear dead letter queue
$deletedCount = $deadLetterManager->clearDeadLetterQueue($deadLetterQueueName);
// Get statistics
$stats = $deadLetterManager->getStatistics();
// [
// 'default.dlq' => ['count' => 23, 'oldest' => '2024-01-15 10:30:00'],
// 'emails.dlq' => ['count' => 5, 'oldest' => '2024-01-16 14:20:00']
// ]
Queue Monitoring
Queue Statistics
// Get comprehensive statistics
$stats = $queue->getStats();
// Example output:
[
'total_size' => 1523,
'priority_queue_size' => 1200,
'delayed_queue_size' => 323,
'priority_breakdown' => [
'critical' => 15,
'high' => 150,
'normal' => 1050,
'low' => 308
],
'stats' => [
'total_pushed' => 125430,
'total_popped' => 123907,
'high_pushed' => 15234,
'normal_pushed' => 95678,
'low_pushed' => 14518,
'last_activity' => 1705410234
],
'persistence' => [
'total_jobs' => 125430,
'completed' => 120145,
'failed' => 3762,
'retrying' => 1523
]
]
Job Metrics
use App\Framework\Queue\Services\JobMetricsManager;
// Get job performance metrics
$metrics = $metricsManager->getJobMetrics($jobId);
// [
// 'execution_time_ms' => 1234,
// 'memory_used_mb' => 45.2,
// 'peak_memory_mb' => 52.8,
// 'attempts' => 1,
// 'last_attempt_at' => '2024-01-16 15:30:00'
// ]
// Get aggregated metrics by job type
$aggregated = $metricsManager->getAggregatedMetrics('email.welcome');
// [
// 'total_executions' => 1523,
// 'avg_execution_time_ms' => 856,
// 'p95_execution_time_ms' => 1450,
// 'p99_execution_time_ms' => 2340,
// 'success_rate' => 0.987,
// 'avg_memory_mb' => 38.4
// ]
Progress Tracking
For long-running jobs, track progress:
final readonly class LargeDataImportJob
{
public function __construct(
private ProgressManager $progressManager,
private JobId $jobId,
private array $dataFiles
) {}
public function handle(): array
{
$totalSteps = count($this->dataFiles);
foreach ($this->dataFiles as $index => $file) {
// Process file
$this->processFile($file);
// Update progress
$this->progressManager->updateProgress(
jobId: $this->jobId,
currentStep: $index + 1,
totalSteps: $totalSteps,
message: "Processing {$file}"
);
}
return ['files_processed' => $totalSteps];
}
}
// Query progress
$progress = $progressManager->getProgress($jobId);
// [
// 'current_step' => 45,
// 'total_steps' => 100,
// 'percentage' => 45.0,
// 'message' => 'Processing data_2024_01_16.csv',
// 'updated_at' => '2024-01-16 15:32:45'
// ]
Health Monitoring
use App\Framework\Queue\Services\WorkerHealthCheckService;
// Check worker health
$health = $healthCheckService->checkAllWorkers();
// [
// 'healthy_workers' => 8,
// 'unhealthy_workers' => 1,
// 'total_workers' => 9,
// 'workers' => [
// ['id' => 'worker-1', 'status' => 'healthy', 'last_heartbeat' => '2024-01-16 15:33:00'],
// ['id' => 'worker-2', 'status' => 'unhealthy', 'last_heartbeat' => '2024-01-16 15:20:15']
// ]
// ]
// Register worker heartbeat
$healthCheckService->registerHeartbeat($workerId);
// Cleanup stale workers
$cleanedCount = $healthCheckService->cleanupStaleWorkers(
staleThreshold: Duration::fromMinutes(5)
);
Best Practices
1. Job Design
Keep Jobs Small and Focused:
// ✅ Good: Single responsibility
final readonly class SendEmailJob
{
public function handle(): array
{
$this->emailService->send($this->email, $this->template);
return ['sent' => true];
}
}
// ❌ Bad: Multiple responsibilities
final readonly class SendEmailAndUpdateUserJob
{
public function handle(): array
{
$this->emailService->send($this->email);
$this->userRepository->update($this->userId); // Separate concern
$this->analyticsService->track($this->event); // Separate concern
}
}
Make Jobs Idempotent:
final readonly class ProcessPaymentJob
{
public function handle(): array
{
// Check if already processed (idempotency)
if ($this->paymentRepository->isProcessed($this->paymentId)) {
return ['status' => 'already_processed'];
}
// Process payment
$result = $this->paymentGateway->charge($this->payment);
// Mark as processed
$this->paymentRepository->markAsProcessed($this->paymentId);
return $result;
}
}
Use Value Objects for Job Data:
// ✅ Good: Type-safe value objects
final readonly class SendInvoiceJob
{
public function __construct(
private InvoiceId $invoiceId,
private Email $recipientEmail,
private Money $amount
) {}
}
// ❌ Bad: Primitive arrays
final readonly class SendInvoiceJob
{
public function __construct(
private array $data // What structure? What validation?
) {}
}
2. Priority Assignment
Critical: System-critical operations that must execute immediately
- Payment processing
- Real-time notifications
- Security alerts
High: User-facing operations with user waiting
- Password reset emails
- Welcome emails
- Report generation (user-triggered)
Normal: Standard background tasks
- Scheduled maintenance
- Cache warming
- Analytics processing
Low: Non-urgent maintenance
- Log cleanup
- Old data archival
- Non-critical aggregations
3. Retry Strategy
Use Exponential Backoff for transient failures:
// External API calls, network operations
JobPayload::create(
job: $apiCallJob,
retryStrategy: new ExponentialBackoffStrategy(
maxAttempts: 5,
baseDelaySeconds: 60,
maxDelaySeconds: 3600
)
);
No Retries for permanent failures:
// Invalid input, business logic violations
JobPayload::create(
job: $validationJob,
retryStrategy: null // Fail immediately
);
Custom Schedule for specific requirements:
// Time-sensitive operations
JobPayload::create(
job: $timeSensitiveJob,
retryStrategy: new CustomRetryStrategy([30, 60, 120]) // Quick retries
);
4. Monitoring
Track Key Metrics:
- Queue depth by priority
- Average job execution time
- Success/failure rates
- Dead letter queue size
- Worker health status
Set Up Alerts:
// Alert on high queue depth
if ($stats['total_size'] > 10000) {
$alerting->send('Queue depth exceeded threshold');
}
// Alert on high failure rate
$failureRate = $stats['persistence']['failed'] / $stats['persistence']['total_jobs'];
if ($failureRate > 0.05) { // 5% failure rate
$alerting->send('High job failure rate detected');
}
// Alert on dead letter queue growth
if ($dlqStats['count'] > 100) {
$alerting->send('Dead letter queue requires attention');
}
5. Resource Management
Implement Timeouts:
JobPayload::create(
job: $job,
timeout: Duration::fromMinutes(5) // Kill job after 5 minutes
);
Manage Memory:
final readonly class LargeDataProcessingJob
{
public function handle(): array
{
// Process in batches to avoid memory exhaustion
foreach ($this->getBatches() as $batch) {
$this->processBatch($batch);
// Force garbage collection
gc_collect_cycles();
}
}
}
Connection Pooling:
// Reuse connections across jobs
final readonly class DatabaseJob
{
public function handle(): array
{
// Use connection pool instead of new connections
$connection = $this->connectionPool->acquire();
try {
// Execute queries
$result = $connection->query($sql);
} finally {
// Always release back to pool
$this->connectionPool->release($connection);
}
}
}
6. Error Handling
Graceful Degradation:
final readonly class EmailJob
{
public function handle(): array
{
try {
$this->emailService->send($this->email);
return ['status' => 'sent'];
} catch (EmailProviderException $e) {
// Log error but don't fail job
$this->logger->warning('Email provider unavailable', [
'error' => $e->getMessage()
]);
// Use fallback provider
$this->fallbackEmailService->send($this->email);
return ['status' => 'sent_via_fallback'];
}
}
}
Detailed Error Logging:
try {
$job->handle();
} catch (\Throwable $e) {
$this->logger->error('Job failed', [
'job_id' => $jobId,
'job_class' => get_class($job),
'error' => $e->getMessage(),
'trace' => $e->getTraceAsString(),
'attempts' => $attempts,
'metadata' => $payload->metadata->toArray()
]);
throw $e;
}
7. Testing
Unit Test Jobs:
it('sends welcome email', function () {
$job = new SendWelcomeEmailJob(
userId: UserId::generate(),
email: new Email('test@example.com'),
userName: new UserName('Test User')
);
$result = $job->handle();
expect($result['sent_at'])->toBeGreaterThan(0);
});
Integration Test Queue Operations:
it('processes jobs in priority order', function () {
$queue = new RedisQueue($connection);
// Push jobs with different priorities
$queue->push(JobPayload::create($lowJob, QueuePriority::low()));
$queue->push(JobPayload::create($highJob, QueuePriority::high()));
$queue->push(JobPayload::create($normalJob, QueuePriority::normal()));
// Pop and verify order
$first = $queue->pop();
expect($first->job)->toBe($highJob);
$second = $queue->pop();
expect($second->job)->toBe($normalJob);
$third = $queue->pop();
expect($third->job)->toBe($lowJob);
});
Framework Integration
With Event System
// Dispatch jobs in response to events
#[OnEvent(priority: 100)]
public function onUserRegistered(UserRegisteredEvent $event): void
{
$job = new SendWelcomeEmailJob(
userId: $event->userId,
email: $event->email,
userName: $event->userName
);
$this->queue->push(JobPayload::immediate($job));
}
With Scheduler
See Scheduler-Queue Pipeline for complete integration documentation.
// Schedule recurring job dispatch
$scheduler->schedule(
'send-daily-digest',
CronSchedule::fromExpression('0 8 * * *'), // Every day at 8 AM
fn() => $this->queue->push(JobPayload::immediate(new SendDailyDigestJob()))
);
With Command Bus
// Dispatch jobs via command handlers
final readonly class CreateUserHandler
{
public function handle(CreateUserCommand $command): User
{
$user = $this->userRepository->create($command);
// Queue welcome email
$this->queue->push(JobPayload::immediate(
new SendWelcomeEmailJob($user->id, $user->email, $user->name)
));
return $user;
}
}
Console Commands
The framework provides console commands for queue management:
# Monitor queue status
php console.php queue:status
# Process jobs (worker)
php console.php queue:work --queue=default --timeout=60
# Retry failed jobs
php console.php queue:retry --queue=default --limit=50
# Clear queue
php console.php queue:clear --queue=default
# View failed jobs
php console.php queue:failed --limit=20
# Clean up old jobs
php console.php queue:cleanup --older-than=7d
Performance Characteristics
Redis Queue:
- Throughput: 10,000+ jobs/second
- Latency: <5ms per operation
- Memory: ~200 bytes per job (serialized)
- Scalability: Horizontal (add Redis instances)
File Queue:
- Throughput: 1,000+ jobs/second
- Latency: <20ms per operation
- Memory: Minimal (disk-based)
- Scalability: Vertical (single server)
Job Execution:
- Overhead: <10ms per job (framework overhead)
- Timeout: Configurable per job
- Memory: Depends on job complexity
- Concurrency: Multiple workers supported