getJobId() ?? JobId::generate(); // Store job in persistence layer $this->persistence->storeJob( jobId: $jobId, queueType: $this->queueType, jobData: $payload->job, maxAttempts: $payload->retryStrategy->maxAttempts, metadata: $payload->metadata ); // Add job ID to payload and push to base queue $payloadWithId = $payload->withJobId($jobId); $this->baseQueue->push($payloadWithId); } /** * {@inheritdoc} */ public function pop(): ?JobPayload { $payload = $this->baseQueue->pop(); if ($payload === null) { return null; } $jobId = $payload->getJobId(); if ($jobId !== null) { // Mark job as processing in persistence layer try { $this->persistence->markAsProcessing($jobId); } catch (\Throwable $e) { // If we can't mark as processing, put it back in queue $this->baseQueue->push($payload); throw $e; } } return $payload; } /** * {@inheritdoc} */ public function peek(): ?JobPayload { return $this->baseQueue->peek(); } /** * {@inheritdoc} */ public function size(): int { return $this->baseQueue->size(); } /** * {@inheritdoc} */ public function clear(): int { $count = $this->baseQueue->clear(); // Note: We don't clear persistence layer here as it maintains job history // Use separate cleanup methods for persistence if needed return $count; } /** * {@inheritdoc} */ public function getStats(): array { $baseStats = $this->baseQueue->getStats(); $persistenceStats = $this->persistence->getStats(); return array_merge($baseStats, [ 'persistence' => $persistenceStats, 'queue_type' => $this->queueType->value, ]); } /** * Mark job as completed */ public function markJobCompleted(JobId $jobId, array $result = []): void { $this->persistence->markAsCompleted($jobId, $result); } /** * Mark job as failed */ public function markJobFailed(JobId $jobId, string $errorMessage, ?\Throwable $exception = null): void { $jobState = $this->persistence->markAsFailed($jobId, $errorMessage, $exception); // If job can be retried, put it back in queue if ($jobState->canRetry()) { // Get original job data from metadata $jobData = $jobState->metadata['job_data'] ?? null; if ($jobData !== null) { $retryPayload = JobPayload::create($jobData, $jobState->priority) ->withJobId($jobId) ->withMetadata($jobState->metadata); $this->baseQueue->push($retryPayload); $this->persistence->markForRetry($jobId, $errorMessage); } } } /** * Get job state from persistence */ public function getJobState(JobId $jobId): ?\App\Framework\Queue\ValueObjects\JobState { return $this->persistence->getJobState($jobId); } /** * Get the underlying base queue */ public function getBaseQueue(): Queue { return $this->baseQueue; } /** * Get the persistence layer */ public function getPersistenceLayer(): JobPersistenceLayer { return $this->persistence; } /** * Get retryable failed jobs and re-queue them */ public function requeueFailedJobs(int $limit = 50): int { $retryableJobs = $this->persistence->getRetryableJobs($limit); $requeued = 0; foreach ($retryableJobs as $jobState) { $jobData = $jobState->metadata['job_data'] ?? null; if ($jobData === null) { continue; } $retryPayload = JobPayload::create($jobData, $jobState->priority) ->withJobId($jobState->jobId) ->withMetadata($jobState->metadata); $this->baseQueue->push($retryPayload); $this->persistence->markForRetry($jobState->jobId, 'Manual retry'); $requeued++; } return $requeued; } /** * Clean up old job records */ public function cleanupJobs(?\App\Framework\Core\ValueObjects\Duration $olderThan = null): int { return $this->persistence->cleanup($olderThan); } }