Files
michaelschiemer/src/Framework/Queue/DistributedJobCoordinator.php
Michael Schiemer fc3d7e6357 feat(Production): Complete production deployment infrastructure
- Add comprehensive health check system with multiple endpoints
- Add Prometheus metrics endpoint
- Add production logging configurations (5 strategies)
- Add complete deployment documentation suite:
  * QUICKSTART.md - 30-minute deployment guide
  * DEPLOYMENT_CHECKLIST.md - Printable verification checklist
  * DEPLOYMENT_WORKFLOW.md - Complete deployment lifecycle
  * PRODUCTION_DEPLOYMENT.md - Comprehensive technical reference
  * production-logging.md - Logging configuration guide
  * ANSIBLE_DEPLOYMENT.md - Infrastructure as Code automation
  * README.md - Navigation hub
  * DEPLOYMENT_SUMMARY.md - Executive summary
- Add deployment scripts and automation
- Add DEPLOYMENT_PLAN.md - Concrete plan for immediate deployment
- Update README with production-ready features

All production infrastructure is now complete and ready for deployment.
2025-10-25 19:18:37 +02:00

228 lines
6.1 KiB
PHP

<?php
declare(strict_types=1);
namespace App\Framework\Queue;
use App\Framework\Core\ValueObjects\Duration;
use App\Framework\Core\ValueObjects\Timestamp;
use App\Framework\DateTime\Clock;
use App\Framework\Redis\RedisConnectionInterface;
/**
* Distributed Job Coordinator für Multi-Server Job Processing
*
* Verhindert dass derselbe Job auf mehreren Servern gleichzeitig
* ausgeführt wird mittels Redis-basierter Distributed Locks.
*/
final readonly class DistributedJobCoordinator
{
private const LOCK_PREFIX = 'job:lock:';
private const HEARTBEAT_PREFIX = 'job:heartbeat:';
public function __construct(
private RedisConnectionInterface $redis,
private Clock $clock,
private string $serverId = 'unknown'
) {
}
/**
* Versucht einen Lock für einen Job zu akquirieren
*
* @return DistributedLock|null Lock bei Erfolg, null wenn bereits gelockt
*/
public function acquireLock(string $jobId, Duration $ttl): ?DistributedLock
{
$lockKey = self::LOCK_PREFIX . $jobId;
$ownerId = $this->generateOwnerId();
$now = $this->clock->time();
$expiresAt = Timestamp::fromFloat($now->toFloat() + $ttl->toSeconds());
$lockData = json_encode([
'owner_id' => $ownerId,
'acquired_at' => $now->toFloat(),
'expires_at' => $expiresAt->toFloat(),
'server_id' => $this->serverId,
]);
// Check if lock already exists (NX semantics)
if ($this->redis->exists($lockKey) > 0) {
return null; // Lock bereits von anderem Process gehalten
}
// SET with Expiration
$success = $this->redis->set(
$lockKey,
$lockData,
(int) $ttl->toSeconds()
);
if (!$success) {
return null; // Fehler beim Setzen
}
$lockId = uniqid('lock_', true);
return new DistributedLock(
lockId: $lockId,
jobId: $jobId,
ownerId: $ownerId,
acquiredAt: $now,
expiresAt: $expiresAt,
ttl: $ttl
);
}
/**
* Gibt einen Lock frei
*/
public function releaseLock(DistributedLock $lock): bool
{
$lockKey = self::LOCK_PREFIX . $lock->jobId;
// Lösche Lock nur wenn wir der Owner sind
$lockData = $this->redis->get($lockKey);
if ($lockData === false) {
return false; // Lock existiert nicht mehr
}
$data = json_decode($lockData, true);
if ($data['owner_id'] !== $lock->ownerId) {
return false; // Lock gehört jemand anderem
}
$this->redis->delete($lockKey);
$this->redis->delete(self::HEARTBEAT_PREFIX . $lock->jobId);
return true;
}
/**
* Erneuert einen Lock (Heartbeat)
*
* Verhindert dass Lock während langer Job-Ausführung expired
*/
public function heartbeat(DistributedLock $lock): bool
{
$lockKey = self::LOCK_PREFIX . $lock->jobId;
$heartbeatKey = self::HEARTBEAT_PREFIX . $lock->jobId;
// Prüfe ob wir noch Lock-Owner sind
$lockData = $this->redis->get($lockKey);
if ($lockData === false) {
return false; // Lock expired
}
$data = json_decode($lockData, true);
if ($data['owner_id'] !== $lock->ownerId) {
return false; // Lock von jemand anderem übernommen
}
// Erneuere TTL
$ttlSeconds = $lock->ttl?->toSeconds() ?? 300;
$this->redis->expire($lockKey, (int) $ttlSeconds);
// Setze Heartbeat Timestamp
$this->redis->set(
$heartbeatKey,
(string) $this->clock->time()->toFloat(),
(int) $ttlSeconds
);
return true;
}
/**
* Prüft ob ein Job aktuell gelockt ist
*/
public function isLocked(string $jobId): bool
{
$lockKey = self::LOCK_PREFIX . $jobId;
return $this->redis->exists($lockKey) > 0;
}
/**
* Holt Lock-Informationen für einen Job
*/
public function getLockInfo(string $jobId): ?array
{
$lockKey = self::LOCK_PREFIX . $jobId;
$lockData = $this->redis->get($lockKey);
if ($lockData === false) {
return null;
}
$data = json_decode($lockData, true);
$ttl = $this->redis->ttl($lockKey);
return [
'job_id' => $jobId,
'owner_id' => $data['owner_id'] ?? 'unknown',
'server_id' => $data['server_id'] ?? 'unknown',
'acquired_at' => $data['acquired_at'] ?? null,
'expires_at' => $data['expires_at'] ?? null,
'ttl_remaining' => $ttl > 0 ? $ttl : 0,
];
}
/**
* Holt letzten Heartbeat-Timestamp für einen Job
*/
public function getLastHeartbeat(string $jobId): ?Timestamp
{
$heartbeatKey = self::HEARTBEAT_PREFIX . $jobId;
$timestamp = $this->redis->get($heartbeatKey);
if ($timestamp === false) {
return null;
}
return Timestamp::fromFloat((float) $timestamp);
}
/**
* Forciert Lock-Release (z.B. bei Dead Lock Detection)
*/
public function forceRelease(string $jobId): bool
{
$lockKey = self::LOCK_PREFIX . $jobId;
$heartbeatKey = self::HEARTBEAT_PREFIX . $jobId;
$deleted = $this->redis->delete($lockKey, $heartbeatKey);
return $deleted > 0;
}
/**
* Gibt alle aktiven Locks zurück
*/
public function getActiveLocks(): array
{
$pattern = self::LOCK_PREFIX . '*';
$keys = $this->redis->command('KEYS', $pattern);
$locks = [];
foreach ($keys as $key) {
$jobId = str_replace(self::LOCK_PREFIX, '', $key);
$lockInfo = $this->getLockInfo($jobId);
if ($lockInfo !== null) {
$locks[] = $lockInfo;
}
}
return $locks;
}
private function generateOwnerId(): string
{
return sprintf(
'%s:%s:%d',
$this->serverId,
gethostname(),
getmypid()
);
}
}