diff --git a/internal/worker/gpu.go b/internal/worker/gpu.go deleted file mode 100644 index bd9d4b7..0000000 --- a/internal/worker/gpu.go +++ /dev/null @@ -1,75 +0,0 @@ -package worker - -import ( - "os" - "strconv" - "strings" -) - -// _gpuVisibleDevicesString constructs the visible devices string from config -func _gpuVisibleDevicesString(cfg *Config, fallback string) string { - if cfg == nil { - return strings.TrimSpace(fallback) - } - if len(cfg.GPUVisibleDeviceIDs) > 0 { - parts := make([]string, 0, len(cfg.GPUVisibleDeviceIDs)) - for _, id := range cfg.GPUVisibleDeviceIDs { - id = strings.TrimSpace(id) - if id == "" { - continue - } - parts = append(parts, id) - } - return strings.Join(parts, ",") - } - if len(cfg.GPUVisibleDevices) == 0 { - return strings.TrimSpace(fallback) - } - parts := make([]string, 0, len(cfg.GPUVisibleDevices)) - for _, v := range cfg.GPUVisibleDevices { - if v < 0 { - continue - } - parts = append(parts, strconv.Itoa(v)) - } - return strings.Join(parts, ",") -} - -// _filterExistingDevicePaths filters device paths that actually exist -func _filterExistingDevicePaths(paths []string) []string { - if len(paths) == 0 { - return nil - } - seen := make(map[string]struct{}, len(paths)) - out := make([]string, 0, len(paths)) - for _, p := range paths { - p = strings.TrimSpace(p) - if p == "" { - continue - } - if _, ok := seen[p]; ok { - continue - } - if _, err := os.Stat(p); err != nil { - continue - } - seen[p] = struct{}{} - out = append(out, p) - } - return out -} - -// _gpuVisibleEnvVarName returns the appropriate env var for GPU visibility -func _gpuVisibleEnvVarName(cfg *Config) string { - if cfg == nil { - return "CUDA_VISIBLE_DEVICES" - } - switch strings.ToLower(strings.TrimSpace(cfg.GPUVendor)) { - case "amd": - return "HIP_VISIBLE_DEVICES" - case string(GPUTypeApple), string(GPUTypeNone): - return "" - default: - return "CUDA_VISIBLE_DEVICES" - } -} diff --git a/internal/worker/metrics.go b/internal/worker/metrics.go deleted file mode 100644 index ca9f530..0000000 --- a/internal/worker/metrics.go +++ /dev/null @@ -1,228 +0,0 @@ -package worker - -import ( - "net/http" - "strconv" - "time" - - "github.com/prometheus/client_golang/prometheus" - "github.com/prometheus/client_golang/prometheus/collectors" - "github.com/prometheus/client_golang/prometheus/promhttp" -) - -// _setupMetricsExporter initializes the Prometheus metrics exporter -func (w *Worker) _setupMetricsExporter() { - if !w.config.Metrics.Enabled { - return - } - - reg := prometheus.NewRegistry() - reg.MustRegister( - collectors.NewProcessCollector(collectors.ProcessCollectorOpts{}), - collectors.NewGoCollector(), - ) - - labels := prometheus.Labels{"worker_id": w.id} - reg.MustRegister(prometheus.NewGaugeFunc(prometheus.GaugeOpts{ - Name: "fetchml_tasks_processed_total", - Help: "Total tasks processed successfully by this worker.", - ConstLabels: labels, - }, func() float64 { - return float64(w.metrics.TasksProcessed.Load()) - })) - reg.MustRegister(prometheus.NewGaugeFunc(prometheus.GaugeOpts{ - Name: "fetchml_tasks_failed_total", - Help: "Total tasks failed by this worker.", - ConstLabels: labels, - }, func() float64 { - return float64(w.metrics.TasksFailed.Load()) - })) - reg.MustRegister(prometheus.NewGaugeFunc(prometheus.GaugeOpts{ - Name: "fetchml_tasks_active", - Help: "Number of tasks currently running on this worker.", - ConstLabels: labels, - }, func() float64 { - return float64(w.runningCount()) - })) - reg.MustRegister(prometheus.NewGaugeFunc(prometheus.GaugeOpts{ - Name: "fetchml_tasks_queued", - Help: "Latest observed queue depth from Redis.", - ConstLabels: labels, - }, func() float64 { - return float64(w.metrics.QueuedTasks.Load()) - })) - reg.MustRegister(prometheus.NewGaugeFunc(prometheus.GaugeOpts{ - Name: "fetchml_data_transferred_bytes_total", - Help: "Total bytes transferred while fetching datasets.", - ConstLabels: labels, - }, func() float64 { - return float64(w.metrics.DataTransferred.Load()) - })) - reg.MustRegister(prometheus.NewGaugeFunc(prometheus.GaugeOpts{ - Name: "fetchml_data_fetch_time_seconds_total", - Help: "Total time spent fetching datasets (seconds).", - ConstLabels: labels, - }, func() float64 { - return float64(w.metrics.DataFetchTime.Load()) / float64(time.Second) - })) - reg.MustRegister(prometheus.NewGaugeFunc(prometheus.GaugeOpts{ - Name: "fetchml_execution_time_seconds_total", - Help: "Total execution time for completed tasks (seconds).", - ConstLabels: labels, - }, func() float64 { - return float64(w.metrics.ExecutionTime.Load()) / float64(time.Second) - })) - reg.MustRegister(prometheus.NewGaugeFunc(prometheus.GaugeOpts{ - Name: "fetchml_prewarm_env_hit_total", - Help: "Total environment prewarm hits (warmed image already existed).", - ConstLabels: labels, - }, func() float64 { - return float64(w.metrics.PrewarmEnvHit.Load()) - })) - reg.MustRegister(prometheus.NewGaugeFunc(prometheus.GaugeOpts{ - Name: "fetchml_prewarm_env_miss_total", - Help: "Total environment prewarm misses (warmed image did not exist yet).", - ConstLabels: labels, - }, func() float64 { - return float64(w.metrics.PrewarmEnvMiss.Load()) - })) - reg.MustRegister(prometheus.NewGaugeFunc(prometheus.GaugeOpts{ - Name: "fetchml_prewarm_env_built_total", - Help: "Total environment prewarm images built.", - ConstLabels: labels, - }, func() float64 { - return float64(w.metrics.PrewarmEnvBuilt.Load()) - })) - reg.MustRegister(prometheus.NewGaugeFunc(prometheus.GaugeOpts{ - Name: "fetchml_prewarm_env_time_seconds_total", - Help: "Total time spent building prewarm images (seconds).", - ConstLabels: labels, - }, func() float64 { - return float64(w.metrics.PrewarmEnvTime.Load()) / float64(time.Second) - })) - reg.MustRegister(prometheus.NewGaugeFunc(prometheus.GaugeOpts{ - Name: "fetchml_prewarm_snapshot_hit_total", - Help: "Total prewarmed snapshot hits (snapshots found in .prewarm/).", - ConstLabels: labels, - }, func() float64 { - return float64(w.metrics.PrewarmSnapshotHit.Load()) - })) - reg.MustRegister(prometheus.NewGaugeFunc(prometheus.GaugeOpts{ - Name: "fetchml_prewarm_snapshot_miss_total", - Help: "Total prewarmed snapshot misses (snapshots not found in .prewarm/).", - ConstLabels: labels, - }, func() float64 { - return float64(w.metrics.PrewarmSnapshotMiss.Load()) - })) - reg.MustRegister(prometheus.NewGaugeFunc(prometheus.GaugeOpts{ - Name: "fetchml_prewarm_snapshot_built_total", - Help: "Total snapshots prewarmed into .prewarm/.", - ConstLabels: labels, - }, func() float64 { - return float64(w.metrics.PrewarmSnapshotBuilt.Load()) - })) - reg.MustRegister(prometheus.NewGaugeFunc(prometheus.GaugeOpts{ - Name: "fetchml_prewarm_snapshot_time_seconds_total", - Help: "Total time spent prewarming snapshots (seconds).", - ConstLabels: labels, - }, func() float64 { - return float64(w.metrics.PrewarmSnapshotTime.Load()) / float64(time.Second) - })) - reg.MustRegister(prometheus.NewGaugeFunc(prometheus.GaugeOpts{ - Name: "fetchml_worker_max_concurrency", - Help: "Configured maximum concurrent tasks for this worker.", - ConstLabels: labels, - }, func() float64 { - return float64(w.config.MaxWorkers) - })) - - // Resource metrics - if w.resources != nil { - reg.MustRegister(prometheus.NewGaugeFunc(prometheus.GaugeOpts{ - Name: "fetchml_resources_cpu_total", - Help: "Total CPU tokens managed by the worker resource manager.", - ConstLabels: labels, - }, func() float64 { - return float64(w.resources.Snapshot().TotalCPU) - })) - reg.MustRegister(prometheus.NewGaugeFunc(prometheus.GaugeOpts{ - Name: "fetchml_resources_cpu_free", - Help: "Free CPU tokens currently available in the worker resource manager.", - ConstLabels: labels, - }, func() float64 { - return float64(w.resources.Snapshot().FreeCPU) - })) - reg.MustRegister(prometheus.NewGaugeFunc(prometheus.GaugeOpts{ - Name: "fetchml_resources_acquire_total", - Help: "Total resource acquisition attempts.", - ConstLabels: labels, - }, func() float64 { - return float64(w.resources.Snapshot().AcquireTotal) - })) - reg.MustRegister(prometheus.NewGaugeFunc(prometheus.GaugeOpts{ - Name: "fetchml_resources_acquire_wait_total", - Help: "Total resource acquisitions that had to wait for resources.", - ConstLabels: labels, - }, func() float64 { - return float64(w.resources.Snapshot().AcquireWaitTotal) - })) - reg.MustRegister(prometheus.NewGaugeFunc(prometheus.GaugeOpts{ - Name: "fetchml_resources_acquire_timeout_total", - Help: "Total resource acquisition attempts that timed out.", - ConstLabels: labels, - }, func() float64 { - return float64(w.resources.Snapshot().AcquireTimeoutTotal) - })) - reg.MustRegister(prometheus.NewGaugeFunc(prometheus.GaugeOpts{ - Name: "fetchml_resources_acquire_wait_seconds_total", - Help: "Total seconds spent waiting for resources across all acquisitions.", - ConstLabels: labels, - }, func() float64 { - return w.resources.Snapshot().AcquireWaitSeconds - })) - - snap := w.resources.Snapshot() - for i := range snap.GPUFree { - gpuLabels := prometheus.Labels{"worker_id": w.id, "gpu_index": strconv.Itoa(i)} - idx := i - reg.MustRegister(prometheus.NewGaugeFunc(prometheus.GaugeOpts{ - Name: "fetchml_resources_gpu_slots_total", - Help: "Total GPU slots per GPU index.", - ConstLabels: gpuLabels, - }, func() float64 { - return float64(w.resources.Snapshot().SlotsPerGPU) - })) - reg.MustRegister(prometheus.NewGaugeFunc(prometheus.GaugeOpts{ - Name: "fetchml_resources_gpu_slots_free", - Help: "Free GPU slots per GPU index.", - ConstLabels: gpuLabels, - }, func() float64 { - s := w.resources.Snapshot() - if idx < 0 || idx >= len(s.GPUFree) { - return 0 - } - return float64(s.GPUFree[idx]) - })) - } - } - - mux := http.NewServeMux() - mux.Handle("/metrics", promhttp.HandlerFor(reg, promhttp.HandlerOpts{})) - - srv := &http.Server{ - Addr: w.config.Metrics.ListenAddr, - Handler: mux, - ReadHeaderTimeout: 5 * time.Second, - } - - w.metricsSrv = srv - go func() { - w.logger.Info("metrics exporter listening", - "addr", w.config.Metrics.ListenAddr, - "enabled", w.config.Metrics.Enabled) - if err := srv.ListenAndServe(); err != nil && err != http.ErrServerClosed { - w.logger.Warn("metrics exporter stopped", - "error", err) - } - }() -} diff --git a/internal/worker/worker.go b/internal/worker/worker.go index 6b04dcd..dd5e957 100644 --- a/internal/worker/worker.go +++ b/internal/worker/worker.go @@ -33,11 +33,6 @@ type JupyterManager interface { ListInstalledPackages(ctx context.Context, serviceName string) ([]jupyter.InstalledPackage, error) } -// _isValidName validates that input strings contain only safe characters. -func _isValidName(input string) bool { - return len(input) > 0 && len(input) < 256 -} - // MLServer is an alias for network.MLServer for backward compatibility. type MLServer = network.MLServer @@ -136,11 +131,6 @@ func (w *Worker) runningCount() int { return w.runLoop.RunningCount() } -func (w *Worker) _getGPUDetector() GPUDetector { - factory := &GPUDetectorFactory{} - return factory.CreateDetector(w.config) -} - // SelectDependencyManifest re-exports the executor function for API helpers. // It detects the dependency manifest file in the given directory. func SelectDependencyManifest(filesPath string) (string, error) {