clock = new SystemClock(); // Mock Redis Connection $this->redis = new class implements RedisConnectionInterface { private array $data = []; private array $ttls = []; public function getClient(): \Redis { throw new \RuntimeException('Not implemented in mock'); } public function getDatabase(): int { return 0; } public function getName(): string { return 'mock'; } public function isConnected(): bool { return true; } public function reconnect(): void {} public function flush(): void { $this->data = []; $this->ttls = []; } public function get(string $key): string|false { return $this->data[$key] ?? false; } public function set(string $key, string $value, ?int $ttl = null): bool { $this->data[$key] = $value; if ($ttl !== null) { $this->ttls[$key] = $ttl; } return true; } public function delete(string ...$keys): int { $deleted = 0; foreach ($keys as $key) { if (isset($this->data[$key])) { unset($this->data[$key]); unset($this->ttls[$key]); $deleted++; } } return $deleted; } public function exists(string ...$keys): int { $count = 0; foreach ($keys as $key) { if (isset($this->data[$key])) { $count++; } } return $count; } public function ttl(string $key): int { return $this->ttls[$key] ?? -1; } public function expire(string $key, int $seconds): bool { if (!isset($this->data[$key])) { return false; } $this->ttls[$key] = $seconds; return true; } public function incr(string $key, int $by = 1): int { $current = (int) ($this->data[$key] ?? 0); $new = $current + $by; $this->data[$key] = (string) $new; return $new; } public function decr(string $key, int $by = 1): int { return $this->incr($key, -$by); } public function mGet(array $keys): array { return array_map(fn($k) => $this->data[$k] ?? false, $keys); } public function mSet(array $keyValuePairs): bool { foreach ($keyValuePairs as $key => $value) { $this->data[$key] = $value; } return true; } public function command(string $command, string|int ...$arguments): mixed { return match(strtoupper($command)) { 'KEYS' => $this->keysCommand($arguments[0] ?? '*'), default => throw new \RuntimeException("Command $command not implemented in mock") }; } private function keysCommand(string $pattern): array { $pattern = str_replace('*', '.*', $pattern); $matches = []; foreach (array_keys($this->data) as $key) { if (preg_match('/^' . $pattern . '$/', $key)) { $matches[] = $key; } } return $matches; } }; $this->coordinator = new DistributedJobCoordinator( $this->redis, $this->clock, 'test-server' ); }); it('acquires lock for job', function () { $lock = $this->coordinator->acquireLock( 'test-job-123', Duration::fromMinutes(5) ); expect($lock)->not->toBeNull(); expect($lock)->toBeInstanceOf(DistributedLock::class); expect($lock->jobId)->toBe('test-job-123'); }); it('prevents concurrent lock acquisition', function () { // First lock succeeds $lock1 = $this->coordinator->acquireLock( 'test-job-456', Duration::fromMinutes(5) ); expect($lock1)->not->toBeNull(); // Second lock fails $lock2 = $this->coordinator->acquireLock( 'test-job-456', Duration::fromMinutes(5) ); expect($lock2)->toBeNull(); }); it('releases lock successfully', function () { $lock = $this->coordinator->acquireLock( 'test-job-789', Duration::fromMinutes(5) ); $released = $this->coordinator->releaseLock($lock); expect($released)->toBeTrue(); // Can acquire again after release $lock2 = $this->coordinator->acquireLock( 'test-job-789', Duration::fromMinutes(5) ); expect($lock2)->not->toBeNull(); }); it('checks if job is locked', function () { expect($this->coordinator->isLocked('test-job-999'))->toBeFalse(); $this->coordinator->acquireLock('test-job-999', Duration::fromMinutes(5)); expect($this->coordinator->isLocked('test-job-999'))->toBeTrue(); }); it('sends heartbeat to renew lock', function () { $lock = $this->coordinator->acquireLock( 'test-job-heartbeat', Duration::fromMinutes(5) ); $success = $this->coordinator->heartbeat($lock); expect($success)->toBeTrue(); }); it('gets lock information', function () { $this->coordinator->acquireLock('test-job-info', Duration::fromMinutes(5)); $info = $this->coordinator->getLockInfo('test-job-info'); expect($info)->not->toBeNull(); expect($info['job_id'])->toBe('test-job-info'); expect($info['server_id'])->toBe('test-server'); expect($info['ttl_remaining'])->toBeGreaterThan(0); }); it('force releases stuck locks', function () { $this->coordinator->acquireLock('stuck-job', Duration::fromMinutes(5)); $forced = $this->coordinator->forceRelease('stuck-job'); expect($forced)->toBeTrue(); expect($this->coordinator->isLocked('stuck-job'))->toBeFalse(); }); it('lists all active locks', function () { $this->coordinator->acquireLock('job-1', Duration::fromMinutes(5)); $this->coordinator->acquireLock('job-2', Duration::fromMinutes(5)); $this->coordinator->acquireLock('job-3', Duration::fromMinutes(5)); $locks = $this->coordinator->getActiveLocks(); expect(count($locks))->toBe(3); expect($locks[0]['job_id'])->toContain('job-'); }); it('returns null for non-existent lock info', function () { $info = $this->coordinator->getLockInfo('non-existent-job'); expect($info)->toBeNull(); }); it('handles heartbeat for expired lock', function () { $lock = $this->coordinator->acquireLock('expired-job', Duration::fromSeconds(1)); // Simulate lock release $this->coordinator->releaseLock($lock); // Heartbeat should fail $success = $this->coordinator->heartbeat($lock); expect($success)->toBeFalse(); }); });