BREAKING CHANGE: Legacy worker files removed, Worker struct simplified Changes: 1. worker.go - Simplified to 8 fields using composed dependencies: - runLoop, runner, metrics, health (from new packages) - Removed: server, queue, running, datasetCache, ctx, cancel, etc. 2. factory.go - Updated NewWorker to use new structure - Uses lifecycle.NewRunLoop - Integrates jupyter.Manager properly 3. Removed legacy files: - execution.go (1,016 lines) - data_integrity.go (929 lines) - runloop.go (555 lines) - jupyter_task.go (144 lines) - simplified.go (demonstration no longer needed) 4. Fixed references to use new packages: - hash_selector.go -> integrity.DirOverallSHA256Hex - snapshot_store.go -> integrity.NormalizeSHA256ChecksumHex - metrics.go - Removed resource-dependent metrics temporarily 5. Added RecordQueueLatency to metrics.Metrics for lifecycle.MetricsRecorder Worker struct: 27 fields -> 8 fields (70% reduction) Build status: Compiles successfully
160 lines
5.7 KiB
Go
160 lines
5.7 KiB
Go
package worker
|
|
|
|
import (
|
|
"net/http"
|
|
"time"
|
|
|
|
"github.com/prometheus/client_golang/prometheus"
|
|
"github.com/prometheus/client_golang/prometheus/collectors"
|
|
"github.com/prometheus/client_golang/prometheus/promhttp"
|
|
)
|
|
|
|
// setupMetricsExporter initializes the Prometheus metrics exporter
|
|
func (w *Worker) setupMetricsExporter() {
|
|
if !w.config.Metrics.Enabled {
|
|
return
|
|
}
|
|
|
|
reg := prometheus.NewRegistry()
|
|
reg.MustRegister(
|
|
collectors.NewProcessCollector(collectors.ProcessCollectorOpts{}),
|
|
collectors.NewGoCollector(),
|
|
)
|
|
|
|
labels := prometheus.Labels{"worker_id": w.id}
|
|
reg.MustRegister(prometheus.NewGaugeFunc(prometheus.GaugeOpts{
|
|
Name: "fetchml_tasks_processed_total",
|
|
Help: "Total tasks processed successfully by this worker.",
|
|
ConstLabels: labels,
|
|
}, func() float64 {
|
|
return float64(w.metrics.TasksProcessed.Load())
|
|
}))
|
|
reg.MustRegister(prometheus.NewGaugeFunc(prometheus.GaugeOpts{
|
|
Name: "fetchml_tasks_failed_total",
|
|
Help: "Total tasks failed by this worker.",
|
|
ConstLabels: labels,
|
|
}, func() float64 {
|
|
return float64(w.metrics.TasksFailed.Load())
|
|
}))
|
|
reg.MustRegister(prometheus.NewGaugeFunc(prometheus.GaugeOpts{
|
|
Name: "fetchml_tasks_active",
|
|
Help: "Number of tasks currently running on this worker.",
|
|
ConstLabels: labels,
|
|
}, func() float64 {
|
|
return float64(w.runningCount())
|
|
}))
|
|
reg.MustRegister(prometheus.NewGaugeFunc(prometheus.GaugeOpts{
|
|
Name: "fetchml_tasks_queued",
|
|
Help: "Latest observed queue depth from Redis.",
|
|
ConstLabels: labels,
|
|
}, func() float64 {
|
|
return float64(w.metrics.QueuedTasks.Load())
|
|
}))
|
|
reg.MustRegister(prometheus.NewGaugeFunc(prometheus.GaugeOpts{
|
|
Name: "fetchml_data_transferred_bytes_total",
|
|
Help: "Total bytes transferred while fetching datasets.",
|
|
ConstLabels: labels,
|
|
}, func() float64 {
|
|
return float64(w.metrics.DataTransferred.Load())
|
|
}))
|
|
reg.MustRegister(prometheus.NewGaugeFunc(prometheus.GaugeOpts{
|
|
Name: "fetchml_data_fetch_time_seconds_total",
|
|
Help: "Total time spent fetching datasets (seconds).",
|
|
ConstLabels: labels,
|
|
}, func() float64 {
|
|
return float64(w.metrics.DataFetchTime.Load()) / float64(time.Second)
|
|
}))
|
|
reg.MustRegister(prometheus.NewGaugeFunc(prometheus.GaugeOpts{
|
|
Name: "fetchml_execution_time_seconds_total",
|
|
Help: "Total execution time for completed tasks (seconds).",
|
|
ConstLabels: labels,
|
|
}, func() float64 {
|
|
return float64(w.metrics.ExecutionTime.Load()) / float64(time.Second)
|
|
}))
|
|
reg.MustRegister(prometheus.NewGaugeFunc(prometheus.GaugeOpts{
|
|
Name: "fetchml_prewarm_env_hit_total",
|
|
Help: "Total environment prewarm hits (warmed image already existed).",
|
|
ConstLabels: labels,
|
|
}, func() float64 {
|
|
return float64(w.metrics.PrewarmEnvHit.Load())
|
|
}))
|
|
reg.MustRegister(prometheus.NewGaugeFunc(prometheus.GaugeOpts{
|
|
Name: "fetchml_prewarm_env_miss_total",
|
|
Help: "Total environment prewarm misses (warmed image did not exist yet).",
|
|
ConstLabels: labels,
|
|
}, func() float64 {
|
|
return float64(w.metrics.PrewarmEnvMiss.Load())
|
|
}))
|
|
reg.MustRegister(prometheus.NewGaugeFunc(prometheus.GaugeOpts{
|
|
Name: "fetchml_prewarm_env_built_total",
|
|
Help: "Total environment prewarm images built.",
|
|
ConstLabels: labels,
|
|
}, func() float64 {
|
|
return float64(w.metrics.PrewarmEnvBuilt.Load())
|
|
}))
|
|
reg.MustRegister(prometheus.NewGaugeFunc(prometheus.GaugeOpts{
|
|
Name: "fetchml_prewarm_env_time_seconds_total",
|
|
Help: "Total time spent building prewarm images (seconds).",
|
|
ConstLabels: labels,
|
|
}, func() float64 {
|
|
return float64(w.metrics.PrewarmEnvTime.Load()) / float64(time.Second)
|
|
}))
|
|
reg.MustRegister(prometheus.NewGaugeFunc(prometheus.GaugeOpts{
|
|
Name: "fetchml_prewarm_snapshot_hit_total",
|
|
Help: "Total prewarmed snapshot hits (snapshots found in .prewarm/).",
|
|
ConstLabels: labels,
|
|
}, func() float64 {
|
|
return float64(w.metrics.PrewarmSnapshotHit.Load())
|
|
}))
|
|
reg.MustRegister(prometheus.NewGaugeFunc(prometheus.GaugeOpts{
|
|
Name: "fetchml_prewarm_snapshot_miss_total",
|
|
Help: "Total prewarmed snapshot misses (snapshots not found in .prewarm/).",
|
|
ConstLabels: labels,
|
|
}, func() float64 {
|
|
return float64(w.metrics.PrewarmSnapshotMiss.Load())
|
|
}))
|
|
reg.MustRegister(prometheus.NewGaugeFunc(prometheus.GaugeOpts{
|
|
Name: "fetchml_prewarm_snapshot_built_total",
|
|
Help: "Total snapshots prewarmed into .prewarm/.",
|
|
ConstLabels: labels,
|
|
}, func() float64 {
|
|
return float64(w.metrics.PrewarmSnapshotBuilt.Load())
|
|
}))
|
|
reg.MustRegister(prometheus.NewGaugeFunc(prometheus.GaugeOpts{
|
|
Name: "fetchml_prewarm_snapshot_time_seconds_total",
|
|
Help: "Total time spent prewarming snapshots (seconds).",
|
|
ConstLabels: labels,
|
|
}, func() float64 {
|
|
return float64(w.metrics.PrewarmSnapshotTime.Load()) / float64(time.Second)
|
|
}))
|
|
reg.MustRegister(prometheus.NewGaugeFunc(prometheus.GaugeOpts{
|
|
Name: "fetchml_worker_max_concurrency",
|
|
Help: "Configured maximum concurrent tasks for this worker.",
|
|
ConstLabels: labels,
|
|
}, func() float64 {
|
|
return float64(w.config.MaxWorkers)
|
|
}))
|
|
|
|
// Note: Resource metrics temporarily disabled during migration
|
|
// These will be re-enabled once resource manager is integrated
|
|
|
|
mux := http.NewServeMux()
|
|
mux.Handle("/metrics", promhttp.HandlerFor(reg, promhttp.HandlerOpts{}))
|
|
|
|
srv := &http.Server{
|
|
Addr: w.config.Metrics.ListenAddr,
|
|
Handler: mux,
|
|
ReadHeaderTimeout: 5 * time.Second,
|
|
}
|
|
|
|
w.metricsSrv = srv
|
|
go func() {
|
|
w.logger.Info("metrics exporter listening",
|
|
"addr", w.config.Metrics.ListenAddr,
|
|
"enabled", w.config.Metrics.Enabled)
|
|
if err := srv.ListenAndServe(); err != nil && err != http.ErrServerClosed {
|
|
w.logger.Warn("metrics exporter stopped",
|
|
"error", err)
|
|
}
|
|
}()
|
|
}
|