Files
michaelschiemer/src/Framework/Queue/MachineLearning/QueueJobFeatureExtractor.php
Michael Schiemer 3b623e7afb feat(Deployment): Integrate Ansible deployment via PHP deployment pipeline
- Create AnsibleDeployStage using framework's Process module for secure command execution
- Integrate AnsibleDeployStage into DeploymentPipelineCommands for production deployments
- Add force_deploy flag support in Ansible playbook to override stale locks
- Use PHP deployment module as orchestrator (php console.php deploy:production)
- Fix ErrorAggregationInitializer to use Environment class instead of $_ENV superglobal

Architecture:
- BuildStage → AnsibleDeployStage → HealthCheckStage for production
- Process module provides timeout, error handling, and output capture
- Ansible playbook supports rollback via rollback-git-based.yml
- Zero-downtime deployments with health checks
2025-10-26 14:08:07 +01:00

253 lines
8.5 KiB
PHP

<?php
declare(strict_types=1);
namespace App\Framework\Queue\MachineLearning;
use App\Framework\Queue\MachineLearning\ValueObjects\JobFeatures;
use App\Framework\Queue\Services\JobMetricsManager;
use App\Framework\Queue\ValueObjects\JobMetadata;
use App\Framework\Queue\ValueObjects\JobMetrics;
/**
* Queue Job Feature Extractor
*
* Extracts normalized features from queue job metrics for anomaly detection.
*
* Features extracted:
* - execution_time_variance: Variance from average execution time for job type
* - memory_usage_pattern: Deviation from typical memory usage
* - retry_frequency: Normalized retry count
* - failure_rate: Failure ratio for job type
* - queue_depth_correlation: Relationship between execution time and queue size
* - dependency_chain_complexity: Number of dependent jobs
* - payload_size_anomaly: Deviation from typical payload size
* - execution_timing_regularity: Consistency of execution intervals
*/
final readonly class QueueJobFeatureExtractor
{
public function __construct(
private JobMetricsManager $metricsManager
) {}
/**
* Extract JobFeatures from job execution metrics
*
* @param JobMetrics $currentMetrics Current job metrics
* @param JobMetadata $metadata Job metadata
* @param int $queueDepth Current queue size
* @return JobFeatures Normalized feature vector (all values 0.0-1.0)
*/
public function extractFeatures(
JobMetrics $currentMetrics,
JobMetadata $metadata,
int $queueDepth = 0
): JobFeatures {
// Get historical stats for this job type
$historicalStats = $this->metricsManager->getPerformanceStats(
queueName: $currentMetrics->queueName,
timeWindow: '24 hours'
);
return new JobFeatures(
executionTimeVariance: $this->calculateExecutionTimeVariance($currentMetrics, $historicalStats),
memoryUsagePattern: $this->calculateMemoryUsagePattern($currentMetrics, $historicalStats),
retryFrequency: $this->calculateRetryFrequency($currentMetrics),
failureRate: $this->calculateFailureRate($historicalStats),
queueDepthCorrelation: $this->calculateQueueDepthCorrelation($queueDepth),
dependencyChainComplexity: $this->calculateDependencyComplexity($metadata),
payloadSizeAnomaly: $this->calculatePayloadSizeAnomaly($metadata, $historicalStats),
executionTimingRegularity: $this->calculateExecutionTimingRegularity($currentMetrics)
);
}
/**
* Calculate execution time variance (0.0-1.0)
*
* Measures how much the current execution time deviates from the average.
* High variance indicates unstable performance.
*/
private function calculateExecutionTimeVariance(
JobMetrics $metrics,
array $historicalStats
): float {
$avgExecutionTime = $historicalStats['average_execution_time_ms'] ?? 0;
if ($avgExecutionTime <= 0) {
return 0.0; // No historical data yet
}
$currentExecutionTime = $metrics->executionTimeMs;
$deviation = abs($currentExecutionTime - $avgExecutionTime) / $avgExecutionTime;
// Normalize: 0 = exactly average, 1.0 = 10x or more deviation
return min(1.0, $deviation / 10.0);
}
/**
* Calculate memory usage pattern (0.0-1.0)
*
* Measures memory usage anomaly compared to historical average.
*/
private function calculateMemoryUsagePattern(
JobMetrics $metrics,
array $historicalStats
): float {
$avgMemoryUsage = $historicalStats['average_memory_usage_bytes'] ?? 0;
if ($avgMemoryUsage <= 0) {
return 0.0; // No historical data yet
}
$currentMemoryUsage = $metrics->memoryUsageBytes;
$deviation = abs($currentMemoryUsage - $avgMemoryUsage) / $avgMemoryUsage;
// Normalize: 0 = average usage, 1.0 = 5x or more deviation
return min(1.0, $deviation / 5.0);
}
/**
* Calculate retry frequency (0.0-1.0)
*
* Normalized retry count: 0 = no retries, 1.0 = max attempts exhausted
*/
private function calculateRetryFrequency(JobMetrics $metrics): float
{
if ($metrics->maxAttempts <= 1) {
return 0.0; // No retry configuration
}
return min(1.0, $metrics->attempts / $metrics->maxAttempts);
}
/**
* Calculate failure rate (0.0-1.0)
*
* Percentage of failed jobs for this queue over time window.
*/
private function calculateFailureRate(array $historicalStats): float
{
$totalJobs = $historicalStats['total_jobs'] ?? 0;
if ($totalJobs === 0) {
return 0.0;
}
$failedJobs = $historicalStats['failed_jobs'] ?? 0;
return min(1.0, $failedJobs / $totalJobs);
}
/**
* Calculate queue depth correlation (0.0-1.0)
*
* Impact of queue depth on performance.
* High values indicate system is overloaded.
*/
private function calculateQueueDepthCorrelation(int $queueDepth): float
{
// Normalize queue depth: 0 = empty, 1.0 = 1000+ jobs queued
return min(1.0, $queueDepth / 1000.0);
}
/**
* Calculate dependency chain complexity (0.0-1.0)
*
* Currently a placeholder - would analyze job dependency graph.
* For now, use job tags to estimate complexity.
*/
private function calculateDependencyComplexity(JobMetadata $metadata): float
{
$tagCount = count($metadata->tags);
// Simple heuristic: more tags = more complex job
return min(1.0, $tagCount / 10.0);
}
/**
* Calculate payload size anomaly (0.0-1.0)
*
* Deviation from typical payload size for this job type.
* Currently estimates from metadata extra fields.
*/
private function calculatePayloadSizeAnomaly(
JobMetadata $metadata,
array $historicalStats
): float {
$extraFieldCount = count($metadata->extra);
// Simple heuristic: more extra fields = larger payload
// Normalize: 0 = typical, 1.0 = 50+ extra fields
return min(1.0, $extraFieldCount / 50.0);
}
/**
* Calculate execution timing regularity (0.0-1.0)
*
* Measures consistency of execution intervals.
* High regularity (near 1.0) can indicate bot-like behavior.
*/
private function calculateExecutionTimingRegularity(JobMetrics $metrics): float
{
// For now, use job type consistency as proxy
// In a full implementation, would analyze inter-arrival times
// If job has metadata indicating scheduled execution, mark as regular
$metadata = $metrics->metadata ?? [];
if (isset($metadata['scheduled']) && $metadata['scheduled']) {
return 0.9; // Scheduled jobs are highly regular (expected)
}
// Default: moderate regularity for queue jobs
return 0.3;
}
/**
* Extract features from job metrics history for batch analysis
*
* @param string $jobId Job ID to analyze
* @return JobFeatures[] Array of feature vectors over time
*/
public function extractHistoricalFeatures(string $jobId): array
{
$metricsHistory = $this->metricsManager->getJobMetricsHistory($jobId);
$features = [];
foreach ($metricsHistory as $metrics) {
// Create minimal metadata from metrics
$metadata = new JobMetadata(
id: new \App\Framework\Ulid\Ulid(new \App\Framework\DateTime\SystemClock()),
class: \App\Framework\Core\ValueObjects\ClassName::create($metrics->queueName),
type: 'job',
queuedAt: \App\Framework\Core\ValueObjects\Timestamp::now(),
tags: [],
extra: $metrics->metadata ?? []
);
$features[] = $this->extractFeatures($metrics, $metadata, 0);
}
return $features;
}
/**
* Extract features for all recent jobs in a queue
*
* @param string $queueName Queue to analyze
* @param int $limit Maximum number of jobs to analyze
* @return array Array of [JobMetrics, JobFeatures] tuples
*/
public function extractQueueFeatures(string $queueName, int $limit = 100): array
{
// Get recent job metrics for this queue
$historicalStats = $this->metricsManager->getPerformanceStats($queueName, '1 hour');
// This would need a method to get recent jobs - placeholder for now
// In a full implementation, would query job_metrics table for recent jobs
return [];
}
}