fetch_ml/internal/worker/metrics.go
Jeremie Fraeys 51698d60de
refactor(phase7): Restore resource metrics in metrics.go
- Re-enabled all resource metrics (CPU, GPU, acquisition stats)
- Metrics are conditionally registered only when w.resources != nil
- Added nil check to prevent panics if resource manager not initialized

Build status: Compiles successfully
2026-02-17 16:38:47 -05:00

228 lines
8.2 KiB
Go

package worker
import (
"net/http"
"strconv"
"time"
"github.com/prometheus/client_golang/prometheus"
"github.com/prometheus/client_golang/prometheus/collectors"
"github.com/prometheus/client_golang/prometheus/promhttp"
)
// setupMetricsExporter initializes the Prometheus metrics exporter
func (w *Worker) setupMetricsExporter() {
if !w.config.Metrics.Enabled {
return
}
reg := prometheus.NewRegistry()
reg.MustRegister(
collectors.NewProcessCollector(collectors.ProcessCollectorOpts{}),
collectors.NewGoCollector(),
)
labels := prometheus.Labels{"worker_id": w.id}
reg.MustRegister(prometheus.NewGaugeFunc(prometheus.GaugeOpts{
Name: "fetchml_tasks_processed_total",
Help: "Total tasks processed successfully by this worker.",
ConstLabels: labels,
}, func() float64 {
return float64(w.metrics.TasksProcessed.Load())
}))
reg.MustRegister(prometheus.NewGaugeFunc(prometheus.GaugeOpts{
Name: "fetchml_tasks_failed_total",
Help: "Total tasks failed by this worker.",
ConstLabels: labels,
}, func() float64 {
return float64(w.metrics.TasksFailed.Load())
}))
reg.MustRegister(prometheus.NewGaugeFunc(prometheus.GaugeOpts{
Name: "fetchml_tasks_active",
Help: "Number of tasks currently running on this worker.",
ConstLabels: labels,
}, func() float64 {
return float64(w.runningCount())
}))
reg.MustRegister(prometheus.NewGaugeFunc(prometheus.GaugeOpts{
Name: "fetchml_tasks_queued",
Help: "Latest observed queue depth from Redis.",
ConstLabels: labels,
}, func() float64 {
return float64(w.metrics.QueuedTasks.Load())
}))
reg.MustRegister(prometheus.NewGaugeFunc(prometheus.GaugeOpts{
Name: "fetchml_data_transferred_bytes_total",
Help: "Total bytes transferred while fetching datasets.",
ConstLabels: labels,
}, func() float64 {
return float64(w.metrics.DataTransferred.Load())
}))
reg.MustRegister(prometheus.NewGaugeFunc(prometheus.GaugeOpts{
Name: "fetchml_data_fetch_time_seconds_total",
Help: "Total time spent fetching datasets (seconds).",
ConstLabels: labels,
}, func() float64 {
return float64(w.metrics.DataFetchTime.Load()) / float64(time.Second)
}))
reg.MustRegister(prometheus.NewGaugeFunc(prometheus.GaugeOpts{
Name: "fetchml_execution_time_seconds_total",
Help: "Total execution time for completed tasks (seconds).",
ConstLabels: labels,
}, func() float64 {
return float64(w.metrics.ExecutionTime.Load()) / float64(time.Second)
}))
reg.MustRegister(prometheus.NewGaugeFunc(prometheus.GaugeOpts{
Name: "fetchml_prewarm_env_hit_total",
Help: "Total environment prewarm hits (warmed image already existed).",
ConstLabels: labels,
}, func() float64 {
return float64(w.metrics.PrewarmEnvHit.Load())
}))
reg.MustRegister(prometheus.NewGaugeFunc(prometheus.GaugeOpts{
Name: "fetchml_prewarm_env_miss_total",
Help: "Total environment prewarm misses (warmed image did not exist yet).",
ConstLabels: labels,
}, func() float64 {
return float64(w.metrics.PrewarmEnvMiss.Load())
}))
reg.MustRegister(prometheus.NewGaugeFunc(prometheus.GaugeOpts{
Name: "fetchml_prewarm_env_built_total",
Help: "Total environment prewarm images built.",
ConstLabels: labels,
}, func() float64 {
return float64(w.metrics.PrewarmEnvBuilt.Load())
}))
reg.MustRegister(prometheus.NewGaugeFunc(prometheus.GaugeOpts{
Name: "fetchml_prewarm_env_time_seconds_total",
Help: "Total time spent building prewarm images (seconds).",
ConstLabels: labels,
}, func() float64 {
return float64(w.metrics.PrewarmEnvTime.Load()) / float64(time.Second)
}))
reg.MustRegister(prometheus.NewGaugeFunc(prometheus.GaugeOpts{
Name: "fetchml_prewarm_snapshot_hit_total",
Help: "Total prewarmed snapshot hits (snapshots found in .prewarm/).",
ConstLabels: labels,
}, func() float64 {
return float64(w.metrics.PrewarmSnapshotHit.Load())
}))
reg.MustRegister(prometheus.NewGaugeFunc(prometheus.GaugeOpts{
Name: "fetchml_prewarm_snapshot_miss_total",
Help: "Total prewarmed snapshot misses (snapshots not found in .prewarm/).",
ConstLabels: labels,
}, func() float64 {
return float64(w.metrics.PrewarmSnapshotMiss.Load())
}))
reg.MustRegister(prometheus.NewGaugeFunc(prometheus.GaugeOpts{
Name: "fetchml_prewarm_snapshot_built_total",
Help: "Total snapshots prewarmed into .prewarm/.",
ConstLabels: labels,
}, func() float64 {
return float64(w.metrics.PrewarmSnapshotBuilt.Load())
}))
reg.MustRegister(prometheus.NewGaugeFunc(prometheus.GaugeOpts{
Name: "fetchml_prewarm_snapshot_time_seconds_total",
Help: "Total time spent prewarming snapshots (seconds).",
ConstLabels: labels,
}, func() float64 {
return float64(w.metrics.PrewarmSnapshotTime.Load()) / float64(time.Second)
}))
reg.MustRegister(prometheus.NewGaugeFunc(prometheus.GaugeOpts{
Name: "fetchml_worker_max_concurrency",
Help: "Configured maximum concurrent tasks for this worker.",
ConstLabels: labels,
}, func() float64 {
return float64(w.config.MaxWorkers)
}))
// Resource metrics
if w.resources != nil {
reg.MustRegister(prometheus.NewGaugeFunc(prometheus.GaugeOpts{
Name: "fetchml_resources_cpu_total",
Help: "Total CPU tokens managed by the worker resource manager.",
ConstLabels: labels,
}, func() float64 {
return float64(w.resources.Snapshot().TotalCPU)
}))
reg.MustRegister(prometheus.NewGaugeFunc(prometheus.GaugeOpts{
Name: "fetchml_resources_cpu_free",
Help: "Free CPU tokens currently available in the worker resource manager.",
ConstLabels: labels,
}, func() float64 {
return float64(w.resources.Snapshot().FreeCPU)
}))
reg.MustRegister(prometheus.NewGaugeFunc(prometheus.GaugeOpts{
Name: "fetchml_resources_acquire_total",
Help: "Total resource acquisition attempts.",
ConstLabels: labels,
}, func() float64 {
return float64(w.resources.Snapshot().AcquireTotal)
}))
reg.MustRegister(prometheus.NewGaugeFunc(prometheus.GaugeOpts{
Name: "fetchml_resources_acquire_wait_total",
Help: "Total resource acquisitions that had to wait for resources.",
ConstLabels: labels,
}, func() float64 {
return float64(w.resources.Snapshot().AcquireWaitTotal)
}))
reg.MustRegister(prometheus.NewGaugeFunc(prometheus.GaugeOpts{
Name: "fetchml_resources_acquire_timeout_total",
Help: "Total resource acquisition attempts that timed out.",
ConstLabels: labels,
}, func() float64 {
return float64(w.resources.Snapshot().AcquireTimeoutTotal)
}))
reg.MustRegister(prometheus.NewGaugeFunc(prometheus.GaugeOpts{
Name: "fetchml_resources_acquire_wait_seconds_total",
Help: "Total seconds spent waiting for resources across all acquisitions.",
ConstLabels: labels,
}, func() float64 {
return w.resources.Snapshot().AcquireWaitSeconds
}))
snap := w.resources.Snapshot()
for i := range snap.GPUFree {
gpuLabels := prometheus.Labels{"worker_id": w.id, "gpu_index": strconv.Itoa(i)}
idx := i
reg.MustRegister(prometheus.NewGaugeFunc(prometheus.GaugeOpts{
Name: "fetchml_resources_gpu_slots_total",
Help: "Total GPU slots per GPU index.",
ConstLabels: gpuLabels,
}, func() float64 {
return float64(w.resources.Snapshot().SlotsPerGPU)
}))
reg.MustRegister(prometheus.NewGaugeFunc(prometheus.GaugeOpts{
Name: "fetchml_resources_gpu_slots_free",
Help: "Free GPU slots per GPU index.",
ConstLabels: gpuLabels,
}, func() float64 {
s := w.resources.Snapshot()
if idx < 0 || idx >= len(s.GPUFree) {
return 0
}
return float64(s.GPUFree[idx])
}))
}
}
mux := http.NewServeMux()
mux.Handle("/metrics", promhttp.HandlerFor(reg, promhttp.HandlerOpts{}))
srv := &http.Server{
Addr: w.config.Metrics.ListenAddr,
Handler: mux,
ReadHeaderTimeout: 5 * time.Second,
}
w.metricsSrv = srv
go func() {
w.logger.Info("metrics exporter listening",
"addr", w.config.Metrics.ListenAddr,
"enabled", w.config.Metrics.Enabled)
if err := srv.ListenAndServe(); err != nil && err != http.ErrServerClosed {
w.logger.Warn("metrics exporter stopped",
"error", err)
}
}()
}