diff --git a/internal/worker/metrics.go b/internal/worker/metrics.go index b3f0c75..d2f5e90 100644 --- a/internal/worker/metrics.go +++ b/internal/worker/metrics.go @@ -2,6 +2,7 @@ package worker import ( "net/http" + "strconv" "time" "github.com/prometheus/client_golang/prometheus" @@ -135,8 +136,75 @@ func (w *Worker) setupMetricsExporter() { return float64(w.config.MaxWorkers) })) - // Note: Resource metrics temporarily disabled during migration - // These will be re-enabled once resource manager is integrated + // Resource metrics + if w.resources != nil { + reg.MustRegister(prometheus.NewGaugeFunc(prometheus.GaugeOpts{ + Name: "fetchml_resources_cpu_total", + Help: "Total CPU tokens managed by the worker resource manager.", + ConstLabels: labels, + }, func() float64 { + return float64(w.resources.Snapshot().TotalCPU) + })) + reg.MustRegister(prometheus.NewGaugeFunc(prometheus.GaugeOpts{ + Name: "fetchml_resources_cpu_free", + Help: "Free CPU tokens currently available in the worker resource manager.", + ConstLabels: labels, + }, func() float64 { + return float64(w.resources.Snapshot().FreeCPU) + })) + reg.MustRegister(prometheus.NewGaugeFunc(prometheus.GaugeOpts{ + Name: "fetchml_resources_acquire_total", + Help: "Total resource acquisition attempts.", + ConstLabels: labels, + }, func() float64 { + return float64(w.resources.Snapshot().AcquireTotal) + })) + reg.MustRegister(prometheus.NewGaugeFunc(prometheus.GaugeOpts{ + Name: "fetchml_resources_acquire_wait_total", + Help: "Total resource acquisitions that had to wait for resources.", + ConstLabels: labels, + }, func() float64 { + return float64(w.resources.Snapshot().AcquireWaitTotal) + })) + reg.MustRegister(prometheus.NewGaugeFunc(prometheus.GaugeOpts{ + Name: "fetchml_resources_acquire_timeout_total", + Help: "Total resource acquisition attempts that timed out.", + ConstLabels: labels, + }, func() float64 { + return float64(w.resources.Snapshot().AcquireTimeoutTotal) + })) + reg.MustRegister(prometheus.NewGaugeFunc(prometheus.GaugeOpts{ + Name: "fetchml_resources_acquire_wait_seconds_total", + Help: "Total seconds spent waiting for resources across all acquisitions.", + ConstLabels: labels, + }, func() float64 { + return w.resources.Snapshot().AcquireWaitSeconds + })) + + snap := w.resources.Snapshot() + for i := range snap.GPUFree { + gpuLabels := prometheus.Labels{"worker_id": w.id, "gpu_index": strconv.Itoa(i)} + idx := i + reg.MustRegister(prometheus.NewGaugeFunc(prometheus.GaugeOpts{ + Name: "fetchml_resources_gpu_slots_total", + Help: "Total GPU slots per GPU index.", + ConstLabels: gpuLabels, + }, func() float64 { + return float64(w.resources.Snapshot().SlotsPerGPU) + })) + reg.MustRegister(prometheus.NewGaugeFunc(prometheus.GaugeOpts{ + Name: "fetchml_resources_gpu_slots_free", + Help: "Free GPU slots per GPU index.", + ConstLabels: gpuLabels, + }, func() float64 { + s := w.resources.Snapshot() + if idx < 0 || idx >= len(s.GPUFree) { + return 0 + } + return float64(s.GPUFree[idx]) + })) + } + } mux := http.NewServeMux() mux.Handle("/metrics", promhttp.HandlerFor(reg, promhttp.HandlerOpts{}))