package queue import ( "github.com/prometheus/client_golang/prometheus" "github.com/prometheus/client_golang/prometheus/promauto" ) var ( // QueueDepth tracks the number of tasks in the queue. QueueDepth = promauto.NewGauge(prometheus.GaugeOpts{ Name: "fetch_ml_queue_depth", Help: "Number of tasks in the queue", }) // TasksQueued tracks the total number of tasks queued. TasksQueued = promauto.NewCounter(prometheus.CounterOpts{ Name: "fetch_ml_tasks_queued_total", Help: "Total number of tasks queued", }) // TaskDuration tracks task execution duration in seconds. TaskDuration = promauto.NewHistogramVec(prometheus.HistogramOpts{ Name: "fetch_ml_task_duration_seconds", Help: "Task execution duration in seconds", Buckets: []float64{1, 5, 10, 30, 60, 120, 300, 600, 1800, 3600}, // 1s to 1h }, []string{"job_name", "status"}) // TasksCompleted tracks the total number of completed tasks. TasksCompleted = promauto.NewCounterVec(prometheus.CounterOpts{ Name: "fetch_ml_tasks_completed_total", Help: "Total number of completed tasks", }, []string{"job_name", "status"}) // TaskFailures tracks failed tasks by error category. TaskFailures = promauto.NewCounterVec(prometheus.CounterOpts{ Name: "fetch_ml_task_failures_total", Help: "Total number of failed tasks by error category", }, []string{"job_name", "error_category"}) // TaskRetries tracks the total number of task retries. TaskRetries = promauto.NewCounterVec(prometheus.CounterOpts{ Name: "fetch_ml_task_retries_total", Help: "Total number of task retries", }, []string{"job_name", "error_category"}) // LeaseExpirations tracks expired leases that were reclaimed. LeaseExpirations = promauto.NewCounter(prometheus.CounterOpts{ Name: "fetch_ml_lease_expirations_total", Help: "Total number of expired leases reclaimed", }) // LeaseRenewals tracks successful lease renewals. LeaseRenewals = promauto.NewCounterVec(prometheus.CounterOpts{ Name: "fetch_ml_lease_renewals_total", Help: "Total number of successful lease renewals", }, []string{"worker_id"}) // DLQSize tracks the number of tasks in dead letter queue. DLQSize = promauto.NewGauge(prometheus.GaugeOpts{ Name: "fetch_ml_dlq_size", Help: "Number of tasks in dead letter queue", }) // DLQAdditions tracks tasks moved to dead letter queue. DLQAdditions = promauto.NewCounterVec(prometheus.CounterOpts{ Name: "fetch_ml_dlq_additions_total", Help: "Total number of tasks moved to DLQ", }, []string{"reason"}) // ActiveTasks tracks currently executing tasks. ActiveTasks = promauto.NewGaugeVec(prometheus.GaugeOpts{ Name: "fetch_ml_active_tasks", Help: "Number of currently executing tasks", }, []string{"worker_id"}) // WorkerHeartbeats tracks worker heartbeat events. WorkerHeartbeats = promauto.NewCounterVec(prometheus.CounterOpts{ Name: "fetch_ml_worker_heartbeats_total", Help: "Total number of worker heartbeats", }, []string{"worker_id"}) ) // RecordTaskStart records when a task starts func RecordTaskStart(_, workerID string) { ActiveTasks.WithLabelValues(workerID).Inc() } // RecordTaskEnd records when a task completes func RecordTaskEnd(jobName, workerID, status string, durationSeconds float64) { ActiveTasks.WithLabelValues(workerID).Dec() TaskDuration.WithLabelValues(jobName, status).Observe(durationSeconds) TasksCompleted.WithLabelValues(jobName, status).Inc() } // RecordTaskFailure records a task failure with error category func RecordTaskFailure(jobName string, errorCategory ErrorCategory) { TaskFailures.WithLabelValues(jobName, string(errorCategory)).Inc() } // RecordTaskRetry records a task retry func RecordTaskRetry(jobName string, errorCategory ErrorCategory) { TaskRetries.WithLabelValues(jobName, string(errorCategory)).Inc() } // RecordLeaseExpiration records a lease expiration func RecordLeaseExpiration() { LeaseExpirations.Inc() } // RecordLeaseRenewal records a successful lease renewal func RecordLeaseRenewal(workerID string) { LeaseRenewals.WithLabelValues(workerID).Inc() } // RecordDLQAddition records a task being moved to DLQ func RecordDLQAddition(reason string) { DLQAdditions.WithLabelValues(reason).Inc() DLQSize.Inc() } // UpdateQueueDepth updates the current queue depth gauge func UpdateQueueDepth(depth int64) { QueueDepth.Set(float64(depth)) }