- Remove middleware reference from Gitea Traefik labels (caused routing issues) - Optimize Gitea connection pool settings (MAX_IDLE_CONNS=30, authentication_timeout=180s) - Add explicit service reference in Traefik labels - Fix intermittent 504 timeouts by improving PostgreSQL connection handling Fixes Gitea unreachability via git.michaelschiemer.de
15 KiB
Scheduler-Queue Pipeline
Komplette Dokumentation der Scheduler-Queue Integration Pipeline im Custom PHP Framework.
Übersicht
Die Scheduler-Queue Pipeline ist eine vollständig integrierte Lösung für zeitbasierte Aufgabenplanung und asynchrone Job-Ausführung. Sie kombiniert das Framework's Scheduler System mit dem erweiterten Queue System für robuste, skalierbare Background-Processing.
Architektur
┌─────────────────┐ ┌─────────────────┐ ┌─────────────────┐ ┌─────────────────┐
│ Scheduler │───▶│ Job Dispatch │───▶│ Queue System │───▶│ Background Exec │
│ System │ │ & Validation │ │ (13 Tables) │ │ & Logging │
└─────────────────┘ └─────────────────┘ └─────────────────┘ └─────────────────┘
│ │ │ │
Cron/Interval JobPayload FileQueue/Redis Named Job Classes
OneTime/Manual Value Objects w/ Metrics w/ handle() Method
Schedule Types Type Safety & Monitoring Result Logging
Pipeline Komponenten
1. Scheduler System
Location: src/Framework/Scheduler/
Schedule Types:
- CronSchedule: Traditionelle Cron-basierte Zeitplanung
- IntervalSchedule: Wiederholende Ausführung in festen Intervallen
- OneTimeSchedule: Einmalige Ausführung zu bestimmter Zeit
- ManualSchedule: Manuelle Trigger-basierte Ausführung
Core Services:
SchedulerService: Hauptservice für Task-ManagementTaskExecutionResult: Value Object für Execution-ErgebnisseTimestamp/Duration: Zeit-Value Objects mit Framework-Compliance
2. Job Dispatch & Validation
Integration Layer zwischen Scheduler und Queue
Key Features:
- Type Safety: Verwendung von JobPayload Value Objects
- Named Job Classes: Vermeidung von Anonymous Classes (ClassName Restrictions)
- Validation: Input-Validierung vor Queue-Dispatch
- Error Handling: Graceful Fallbacks bei Dispatch-Fehlern
3. Queue System
Location: src/Framework/Queue/
Database Schema (13 Specialized Tables):
jobs -- Haupt-Job-Queue
job_batches -- Batch-Job-Verwaltung
job_history -- Execution-Historie
job_metrics -- Performance-Metriken
dead_letter_jobs -- Fehlgeschlagene Jobs
job_priorities -- Prioritäts-basierte Scheduling
job_dependencies -- Inter-Job Dependencies
delayed_jobs -- Zeit-verzögerte Ausführung
worker_processes -- Worker-Management
job_locks -- Distributed Locking
job_progress -- Progress Tracking
recurring_jobs -- Wiederkehrende Patterns
job_tags -- Kategorisierung & Filtering
Core Features:
- Multi-Driver Support: FileQueue, Redis, Database
- Priority Scheduling: High/Medium/Low Priority Queues
- Metrics & Monitoring: Real-time Performance Tracking
- Dead Letter Queue: Automatic Failed Job Management
- Distributed Locking: Multi-Worker Coordination
4. Background Execution & Logging
Job Processing mit umfassender Protokollierung
Execution Pattern:
final class ScheduledBackgroundJob
{
public function handle(): array
{
// Business Logic Execution
$result = $this->performWork();
// Automatic Logging
$this->logExecution($result);
return $result;
}
}
Verwendung
Basic Scheduler-Queue Integration
use App\Framework\Scheduler\Services\SchedulerService;
use App\Framework\Scheduler\Schedules\IntervalSchedule;
use App\Framework\Queue\Queue;
use App\Framework\Queue\ValueObjects\JobPayload;
use App\Framework\Core\ValueObjects\Duration;
// 1. Schedule Setup
$scheduler = $container->get(SchedulerService::class);
$queue = $container->get(Queue::class);
// 2. Create Interval Schedule (every 10 minutes)
$schedule = IntervalSchedule::every(Duration::fromMinutes(10));
// 3. Register Task with Queue Dispatch
$scheduler->schedule('email-cleanup', $schedule, function() use ($queue) {
$job = new EmailCleanupJob(
olderThan: Duration::fromDays(30),
batchSize: 100
);
$payload = JobPayload::immediate($job);
$queue->push($payload);
return ['status' => 'queued', 'timestamp' => time()];
});
Advanced Pipeline Configuration
// Complex Multi-Stage Pipeline
final class ReportGenerationPipeline
{
public function __construct(
private readonly SchedulerService $scheduler,
private readonly Queue $queue
) {}
public function setupDailyReports(): void
{
// Stage 1: Data Collection (Daily at 2 AM)
$this->scheduler->schedule(
'daily-data-collection',
CronSchedule::fromExpression('0 2 * * *'),
fn() => $this->dispatchDataCollection()
);
// Stage 2: Report Generation (After data collection)
$this->scheduler->schedule(
'daily-report-generation',
CronSchedule::fromExpression('30 2 * * *'),
fn() => $this->dispatchReportGeneration()
);
// Stage 3: Distribution (After generation)
$this->scheduler->schedule(
'daily-report-distribution',
CronSchedule::fromExpression('0 3 * * *'),
fn() => $this->dispatchReportDistribution()
);
}
private function dispatchDataCollection(): array
{
$job = new DataCollectionJob(
sources: ['database', 'analytics', 'external_apis'],
target_date: Timestamp::yesterday()
);
$payload = JobPayload::withPriority($job, Priority::HIGH);
$this->queue->push($payload);
return ['stage' => 'data_collection', 'queued_at' => time()];
}
}
Named Job Classes (Best Practice)
// ✅ Framework-Compliant Job Class
final class EmailCleanupJob
{
public function __construct(
private readonly Duration $olderThan,
private readonly int $batchSize = 100
) {}
public function handle(): array
{
$deleted = $this->emailService->deleteOldEmails(
$this->olderThan,
$this->batchSize
);
$this->logCleanupResults($deleted);
return [
'deleted_count' => count($deleted),
'batch_size' => $this->batchSize,
'cleanup_threshold' => $this->olderThan->toDays() . ' days'
];
}
public function getType(): string
{
return 'email-cleanup';
}
}
Pipeline Monitoring
Health Checks
// Pipeline Health Verification
final class PipelineHealthChecker
{
public function checkPipelineHealth(): PipelineHealthReport
{
return new PipelineHealthReport([
'scheduler_status' => $this->checkSchedulerHealth(),
'queue_status' => $this->checkQueueHealth(),
'integration_status' => $this->checkIntegrationHealth(),
'performance_metrics' => $this->gatherPerformanceMetrics()
]);
}
private function checkSchedulerHealth(): array
{
$dueTasks = $this->scheduler->getDueTasks();
$nextExecution = $this->scheduler->getNextExecutionTime();
return [
'due_tasks_count' => count($dueTasks),
'next_execution' => $nextExecution?->format('Y-m-d H:i:s'),
'status' => count($dueTasks) < 100 ? 'healthy' : 'overloaded'
];
}
private function checkQueueHealth(): array
{
$stats = $this->queue->getStats();
return [
'queue_size' => $stats['total_size'],
'priority_distribution' => $stats['priority_breakdown'],
'processing_rate' => $this->calculateProcessingRate(),
'status' => $stats['total_size'] < 1000 ? 'healthy' : 'backlog'
];
}
}
Performance Metrics
// Real-time Pipeline Performance
final class PipelineMetricsCollector
{
public function collectMetrics(string $timeframe = '1hour'): array
{
return [
'scheduler_metrics' => [
'tasks_executed' => $this->getExecutedTasksCount($timeframe),
'average_execution_time' => $this->getAverageExecutionTime($timeframe),
'success_rate' => $this->getSchedulerSuccessRate($timeframe)
],
'queue_metrics' => [
'jobs_processed' => $this->getProcessedJobsCount($timeframe),
'average_wait_time' => $this->getAverageWaitTime($timeframe),
'throughput' => $this->calculateThroughput($timeframe)
],
'integration_metrics' => [
'dispatch_success_rate' => $this->getDispatchSuccessRate($timeframe),
'end_to_end_latency' => $this->getEndToEndLatency($timeframe),
'error_rate' => $this->getIntegrationErrorRate($timeframe)
]
];
}
}
Troubleshooting
Häufige Probleme
1. Jobs werden nicht ausgeführt
# Diagnose Queue Status
docker exec php php console.php queue:status
# Check Scheduler Tasks
docker exec php php console.php scheduler:status
# Verify Integration
docker exec php php tests/debug/test-scheduler-queue-integration-fixed.php
2. Memory Leaks bei großen Jobs
// Memory-effiziente Job Implementation
final class LargeDataProcessingJob
{
public function handle(): array
{
// Batch Processing to prevent memory exhaustion
$batch = $this->dataSource->getBatch($this->batchSize);
while (!empty($batch)) {
$this->processBatch($batch);
// Force garbage collection
gc_collect_cycles();
$batch = $this->dataSource->getNextBatch($this->batchSize);
}
return ['processed' => $this->totalProcessed];
}
}
3. Queue Backlog Management
// Automatic Backlog Resolution
final class BacklogManager
{
public function resolveBacklog(): void
{
$stats = $this->queue->getStats();
if ($stats['total_size'] > $this->backlogThreshold) {
// Scale up workers
$this->workerManager->scaleUp($this->calculateRequiredWorkers($stats));
// Prioritize critical jobs
$this->queue->reprioritize(['high_priority_types']);
// Alert operations team
$this->alertManager->sendBacklogAlert($stats);
}
}
}
Testing
Integration Tests
// Complete Pipeline Testing
describe('Scheduler Queue Pipeline', function () {
it('handles full pipeline flow', function () {
// 1. Setup Scheduler Task
$schedule = IntervalSchedule::every(Duration::fromSeconds(1));
$this->scheduler->schedule('test-task', $schedule, function() {
$job = new TestPipelineJob('pipeline-test');
$payload = JobPayload::immediate($job);
$this->queue->push($payload);
return ['dispatched' => true];
});
// 2. Execute Scheduler
$results = $this->scheduler->executeDueTasks();
expect($results)->toHaveCount(1);
expect($results[0]->success)->toBeTrue();
// 3. Process Queue
$jobPayload = $this->queue->pop();
expect($jobPayload)->not->toBeNull();
$result = $jobPayload->job->handle();
expect($result['status'])->toBe('completed');
// 4. Verify End-to-End
expect($this->queue->size())->toBe(0);
});
});
Performance Tests
// Pipeline Performance Benchmarks
describe('Pipeline Performance', function () {
it('processes 1000 jobs within 30 seconds', function () {
$startTime = microtime(true);
// Dispatch 1000 jobs via scheduler
for ($i = 0; $i < 1000; $i++) {
$job = new BenchmarkJob("job-{$i}");
$payload = JobPayload::immediate($job);
$this->queue->push($payload);
}
// Process all jobs
while ($this->queue->size() > 0) {
$jobPayload = $this->queue->pop();
$jobPayload->job->handle();
}
$executionTime = microtime(true) - $startTime;
expect($executionTime)->toBeLessThan(30.0);
expect($this->queue->size())->toBe(0);
});
});
Best Practices
1. Job Design
- Named Classes: Immer Named Classes statt Anonymous Functions verwenden
- Type Safety: JobPayload Value Objects für alle Queue-Operationen
- Idempotency: Jobs sollten mehrfach ausführbar sein ohne Seiteneffekte
- Error Handling: Graceful Degradation bei Fehlern implementieren
2. Scheduler Configuration
- Reasonable Intervals: Nicht zu aggressive Scheduling-Intervalle
- Resource Awareness: CPU/Memory-Limits bei Task-Design beachten
- Monitoring: Kontinuierliche Überwachung der Execution-Times
- Failover: Backup-Strategien für kritische Tasks
3. Queue Management
- Priority Classes: Sinnvolle Priorisierung für verschiedene Job-Types
- Batch Processing: Große Datenmengen in Batches aufteilen
- Dead Letter Handling: Automatische Failed-Job-Recovery
- Metrics Collection: Performance-Daten für Optimierung sammeln
4. Production Deployment
- Health Checks: Regelmäßige Pipeline-Health-Verification
- Alerting: Automatische Benachrichtigungen bei Problemen
- Scaling: Auto-Scaling für Worker-Processes
- Backup: Disaster-Recovery-Strategien für Queue-Daten
Framework Integration
Die Pipeline nutzt konsequent Framework-Patterns:
- Value Objects: Timestamp, Duration, JobPayload, Priority
- Readonly Classes: Unveränderliche Job-Definitionen
- Event System: Integration mit Framework's Event Dispatcher
- DI Container: Automatic Service Resolution
- Attribute Discovery: Convention-over-Configuration
- MCP Integration: AI-gestützte Pipeline-Analyse
Performance Charakteristiken
Typische Performance-Werte:
- Job Dispatch Latency: < 50ms
- Queue Throughput: 1000+ Jobs/Minute (FileQueue)
- Memory Usage: < 50MB für Standard-Jobs
- Scheduler Precision: ±1 Second für Cron-basierte Tasks
- End-to-End Latency: < 500ms für einfache Jobs
Skalierungscharakteristiken:
- Horizontal Scaling: Multi-Worker Support
- Queue Capacity: 100,000+ Jobs (Database-backed)
- Scheduler Load: 10,000+ concurrent scheduled tasks
- Memory Efficiency: Linear scaling mit Job-Complexity