chore: remove orphaned unused functions from worker package
Deleted files: - internal/worker/gpu.go (76 lines) - all 3 functions unused: _gpuVisibleDevicesString, _filterExistingDevicePaths, _gpuVisibleEnvVarName - internal/worker/metrics.go (229 lines) - _setupMetricsExporter method unused Modified: - internal/worker/worker.go - removed _isValidName and _getGPUDetector functions All tests pass, build compiles successfully.
This commit is contained in:
parent
3694d4e56f
commit
bd2b99b09c
3 changed files with 0 additions and 313 deletions
|
|
@ -1,75 +0,0 @@
|
|||
package worker
|
||||
|
||||
import (
|
||||
"os"
|
||||
"strconv"
|
||||
"strings"
|
||||
)
|
||||
|
||||
// _gpuVisibleDevicesString constructs the visible devices string from config
|
||||
func _gpuVisibleDevicesString(cfg *Config, fallback string) string {
|
||||
if cfg == nil {
|
||||
return strings.TrimSpace(fallback)
|
||||
}
|
||||
if len(cfg.GPUVisibleDeviceIDs) > 0 {
|
||||
parts := make([]string, 0, len(cfg.GPUVisibleDeviceIDs))
|
||||
for _, id := range cfg.GPUVisibleDeviceIDs {
|
||||
id = strings.TrimSpace(id)
|
||||
if id == "" {
|
||||
continue
|
||||
}
|
||||
parts = append(parts, id)
|
||||
}
|
||||
return strings.Join(parts, ",")
|
||||
}
|
||||
if len(cfg.GPUVisibleDevices) == 0 {
|
||||
return strings.TrimSpace(fallback)
|
||||
}
|
||||
parts := make([]string, 0, len(cfg.GPUVisibleDevices))
|
||||
for _, v := range cfg.GPUVisibleDevices {
|
||||
if v < 0 {
|
||||
continue
|
||||
}
|
||||
parts = append(parts, strconv.Itoa(v))
|
||||
}
|
||||
return strings.Join(parts, ",")
|
||||
}
|
||||
|
||||
// _filterExistingDevicePaths filters device paths that actually exist
|
||||
func _filterExistingDevicePaths(paths []string) []string {
|
||||
if len(paths) == 0 {
|
||||
return nil
|
||||
}
|
||||
seen := make(map[string]struct{}, len(paths))
|
||||
out := make([]string, 0, len(paths))
|
||||
for _, p := range paths {
|
||||
p = strings.TrimSpace(p)
|
||||
if p == "" {
|
||||
continue
|
||||
}
|
||||
if _, ok := seen[p]; ok {
|
||||
continue
|
||||
}
|
||||
if _, err := os.Stat(p); err != nil {
|
||||
continue
|
||||
}
|
||||
seen[p] = struct{}{}
|
||||
out = append(out, p)
|
||||
}
|
||||
return out
|
||||
}
|
||||
|
||||
// _gpuVisibleEnvVarName returns the appropriate env var for GPU visibility
|
||||
func _gpuVisibleEnvVarName(cfg *Config) string {
|
||||
if cfg == nil {
|
||||
return "CUDA_VISIBLE_DEVICES"
|
||||
}
|
||||
switch strings.ToLower(strings.TrimSpace(cfg.GPUVendor)) {
|
||||
case "amd":
|
||||
return "HIP_VISIBLE_DEVICES"
|
||||
case string(GPUTypeApple), string(GPUTypeNone):
|
||||
return ""
|
||||
default:
|
||||
return "CUDA_VISIBLE_DEVICES"
|
||||
}
|
||||
}
|
||||
|
|
@ -1,228 +0,0 @@
|
|||
package worker
|
||||
|
||||
import (
|
||||
"net/http"
|
||||
"strconv"
|
||||
"time"
|
||||
|
||||
"github.com/prometheus/client_golang/prometheus"
|
||||
"github.com/prometheus/client_golang/prometheus/collectors"
|
||||
"github.com/prometheus/client_golang/prometheus/promhttp"
|
||||
)
|
||||
|
||||
// _setupMetricsExporter initializes the Prometheus metrics exporter
|
||||
func (w *Worker) _setupMetricsExporter() {
|
||||
if !w.config.Metrics.Enabled {
|
||||
return
|
||||
}
|
||||
|
||||
reg := prometheus.NewRegistry()
|
||||
reg.MustRegister(
|
||||
collectors.NewProcessCollector(collectors.ProcessCollectorOpts{}),
|
||||
collectors.NewGoCollector(),
|
||||
)
|
||||
|
||||
labels := prometheus.Labels{"worker_id": w.id}
|
||||
reg.MustRegister(prometheus.NewGaugeFunc(prometheus.GaugeOpts{
|
||||
Name: "fetchml_tasks_processed_total",
|
||||
Help: "Total tasks processed successfully by this worker.",
|
||||
ConstLabels: labels,
|
||||
}, func() float64 {
|
||||
return float64(w.metrics.TasksProcessed.Load())
|
||||
}))
|
||||
reg.MustRegister(prometheus.NewGaugeFunc(prometheus.GaugeOpts{
|
||||
Name: "fetchml_tasks_failed_total",
|
||||
Help: "Total tasks failed by this worker.",
|
||||
ConstLabels: labels,
|
||||
}, func() float64 {
|
||||
return float64(w.metrics.TasksFailed.Load())
|
||||
}))
|
||||
reg.MustRegister(prometheus.NewGaugeFunc(prometheus.GaugeOpts{
|
||||
Name: "fetchml_tasks_active",
|
||||
Help: "Number of tasks currently running on this worker.",
|
||||
ConstLabels: labels,
|
||||
}, func() float64 {
|
||||
return float64(w.runningCount())
|
||||
}))
|
||||
reg.MustRegister(prometheus.NewGaugeFunc(prometheus.GaugeOpts{
|
||||
Name: "fetchml_tasks_queued",
|
||||
Help: "Latest observed queue depth from Redis.",
|
||||
ConstLabels: labels,
|
||||
}, func() float64 {
|
||||
return float64(w.metrics.QueuedTasks.Load())
|
||||
}))
|
||||
reg.MustRegister(prometheus.NewGaugeFunc(prometheus.GaugeOpts{
|
||||
Name: "fetchml_data_transferred_bytes_total",
|
||||
Help: "Total bytes transferred while fetching datasets.",
|
||||
ConstLabels: labels,
|
||||
}, func() float64 {
|
||||
return float64(w.metrics.DataTransferred.Load())
|
||||
}))
|
||||
reg.MustRegister(prometheus.NewGaugeFunc(prometheus.GaugeOpts{
|
||||
Name: "fetchml_data_fetch_time_seconds_total",
|
||||
Help: "Total time spent fetching datasets (seconds).",
|
||||
ConstLabels: labels,
|
||||
}, func() float64 {
|
||||
return float64(w.metrics.DataFetchTime.Load()) / float64(time.Second)
|
||||
}))
|
||||
reg.MustRegister(prometheus.NewGaugeFunc(prometheus.GaugeOpts{
|
||||
Name: "fetchml_execution_time_seconds_total",
|
||||
Help: "Total execution time for completed tasks (seconds).",
|
||||
ConstLabels: labels,
|
||||
}, func() float64 {
|
||||
return float64(w.metrics.ExecutionTime.Load()) / float64(time.Second)
|
||||
}))
|
||||
reg.MustRegister(prometheus.NewGaugeFunc(prometheus.GaugeOpts{
|
||||
Name: "fetchml_prewarm_env_hit_total",
|
||||
Help: "Total environment prewarm hits (warmed image already existed).",
|
||||
ConstLabels: labels,
|
||||
}, func() float64 {
|
||||
return float64(w.metrics.PrewarmEnvHit.Load())
|
||||
}))
|
||||
reg.MustRegister(prometheus.NewGaugeFunc(prometheus.GaugeOpts{
|
||||
Name: "fetchml_prewarm_env_miss_total",
|
||||
Help: "Total environment prewarm misses (warmed image did not exist yet).",
|
||||
ConstLabels: labels,
|
||||
}, func() float64 {
|
||||
return float64(w.metrics.PrewarmEnvMiss.Load())
|
||||
}))
|
||||
reg.MustRegister(prometheus.NewGaugeFunc(prometheus.GaugeOpts{
|
||||
Name: "fetchml_prewarm_env_built_total",
|
||||
Help: "Total environment prewarm images built.",
|
||||
ConstLabels: labels,
|
||||
}, func() float64 {
|
||||
return float64(w.metrics.PrewarmEnvBuilt.Load())
|
||||
}))
|
||||
reg.MustRegister(prometheus.NewGaugeFunc(prometheus.GaugeOpts{
|
||||
Name: "fetchml_prewarm_env_time_seconds_total",
|
||||
Help: "Total time spent building prewarm images (seconds).",
|
||||
ConstLabels: labels,
|
||||
}, func() float64 {
|
||||
return float64(w.metrics.PrewarmEnvTime.Load()) / float64(time.Second)
|
||||
}))
|
||||
reg.MustRegister(prometheus.NewGaugeFunc(prometheus.GaugeOpts{
|
||||
Name: "fetchml_prewarm_snapshot_hit_total",
|
||||
Help: "Total prewarmed snapshot hits (snapshots found in .prewarm/).",
|
||||
ConstLabels: labels,
|
||||
}, func() float64 {
|
||||
return float64(w.metrics.PrewarmSnapshotHit.Load())
|
||||
}))
|
||||
reg.MustRegister(prometheus.NewGaugeFunc(prometheus.GaugeOpts{
|
||||
Name: "fetchml_prewarm_snapshot_miss_total",
|
||||
Help: "Total prewarmed snapshot misses (snapshots not found in .prewarm/).",
|
||||
ConstLabels: labels,
|
||||
}, func() float64 {
|
||||
return float64(w.metrics.PrewarmSnapshotMiss.Load())
|
||||
}))
|
||||
reg.MustRegister(prometheus.NewGaugeFunc(prometheus.GaugeOpts{
|
||||
Name: "fetchml_prewarm_snapshot_built_total",
|
||||
Help: "Total snapshots prewarmed into .prewarm/.",
|
||||
ConstLabels: labels,
|
||||
}, func() float64 {
|
||||
return float64(w.metrics.PrewarmSnapshotBuilt.Load())
|
||||
}))
|
||||
reg.MustRegister(prometheus.NewGaugeFunc(prometheus.GaugeOpts{
|
||||
Name: "fetchml_prewarm_snapshot_time_seconds_total",
|
||||
Help: "Total time spent prewarming snapshots (seconds).",
|
||||
ConstLabels: labels,
|
||||
}, func() float64 {
|
||||
return float64(w.metrics.PrewarmSnapshotTime.Load()) / float64(time.Second)
|
||||
}))
|
||||
reg.MustRegister(prometheus.NewGaugeFunc(prometheus.GaugeOpts{
|
||||
Name: "fetchml_worker_max_concurrency",
|
||||
Help: "Configured maximum concurrent tasks for this worker.",
|
||||
ConstLabels: labels,
|
||||
}, func() float64 {
|
||||
return float64(w.config.MaxWorkers)
|
||||
}))
|
||||
|
||||
// Resource metrics
|
||||
if w.resources != nil {
|
||||
reg.MustRegister(prometheus.NewGaugeFunc(prometheus.GaugeOpts{
|
||||
Name: "fetchml_resources_cpu_total",
|
||||
Help: "Total CPU tokens managed by the worker resource manager.",
|
||||
ConstLabels: labels,
|
||||
}, func() float64 {
|
||||
return float64(w.resources.Snapshot().TotalCPU)
|
||||
}))
|
||||
reg.MustRegister(prometheus.NewGaugeFunc(prometheus.GaugeOpts{
|
||||
Name: "fetchml_resources_cpu_free",
|
||||
Help: "Free CPU tokens currently available in the worker resource manager.",
|
||||
ConstLabels: labels,
|
||||
}, func() float64 {
|
||||
return float64(w.resources.Snapshot().FreeCPU)
|
||||
}))
|
||||
reg.MustRegister(prometheus.NewGaugeFunc(prometheus.GaugeOpts{
|
||||
Name: "fetchml_resources_acquire_total",
|
||||
Help: "Total resource acquisition attempts.",
|
||||
ConstLabels: labels,
|
||||
}, func() float64 {
|
||||
return float64(w.resources.Snapshot().AcquireTotal)
|
||||
}))
|
||||
reg.MustRegister(prometheus.NewGaugeFunc(prometheus.GaugeOpts{
|
||||
Name: "fetchml_resources_acquire_wait_total",
|
||||
Help: "Total resource acquisitions that had to wait for resources.",
|
||||
ConstLabels: labels,
|
||||
}, func() float64 {
|
||||
return float64(w.resources.Snapshot().AcquireWaitTotal)
|
||||
}))
|
||||
reg.MustRegister(prometheus.NewGaugeFunc(prometheus.GaugeOpts{
|
||||
Name: "fetchml_resources_acquire_timeout_total",
|
||||
Help: "Total resource acquisition attempts that timed out.",
|
||||
ConstLabels: labels,
|
||||
}, func() float64 {
|
||||
return float64(w.resources.Snapshot().AcquireTimeoutTotal)
|
||||
}))
|
||||
reg.MustRegister(prometheus.NewGaugeFunc(prometheus.GaugeOpts{
|
||||
Name: "fetchml_resources_acquire_wait_seconds_total",
|
||||
Help: "Total seconds spent waiting for resources across all acquisitions.",
|
||||
ConstLabels: labels,
|
||||
}, func() float64 {
|
||||
return w.resources.Snapshot().AcquireWaitSeconds
|
||||
}))
|
||||
|
||||
snap := w.resources.Snapshot()
|
||||
for i := range snap.GPUFree {
|
||||
gpuLabels := prometheus.Labels{"worker_id": w.id, "gpu_index": strconv.Itoa(i)}
|
||||
idx := i
|
||||
reg.MustRegister(prometheus.NewGaugeFunc(prometheus.GaugeOpts{
|
||||
Name: "fetchml_resources_gpu_slots_total",
|
||||
Help: "Total GPU slots per GPU index.",
|
||||
ConstLabels: gpuLabels,
|
||||
}, func() float64 {
|
||||
return float64(w.resources.Snapshot().SlotsPerGPU)
|
||||
}))
|
||||
reg.MustRegister(prometheus.NewGaugeFunc(prometheus.GaugeOpts{
|
||||
Name: "fetchml_resources_gpu_slots_free",
|
||||
Help: "Free GPU slots per GPU index.",
|
||||
ConstLabels: gpuLabels,
|
||||
}, func() float64 {
|
||||
s := w.resources.Snapshot()
|
||||
if idx < 0 || idx >= len(s.GPUFree) {
|
||||
return 0
|
||||
}
|
||||
return float64(s.GPUFree[idx])
|
||||
}))
|
||||
}
|
||||
}
|
||||
|
||||
mux := http.NewServeMux()
|
||||
mux.Handle("/metrics", promhttp.HandlerFor(reg, promhttp.HandlerOpts{}))
|
||||
|
||||
srv := &http.Server{
|
||||
Addr: w.config.Metrics.ListenAddr,
|
||||
Handler: mux,
|
||||
ReadHeaderTimeout: 5 * time.Second,
|
||||
}
|
||||
|
||||
w.metricsSrv = srv
|
||||
go func() {
|
||||
w.logger.Info("metrics exporter listening",
|
||||
"addr", w.config.Metrics.ListenAddr,
|
||||
"enabled", w.config.Metrics.Enabled)
|
||||
if err := srv.ListenAndServe(); err != nil && err != http.ErrServerClosed {
|
||||
w.logger.Warn("metrics exporter stopped",
|
||||
"error", err)
|
||||
}
|
||||
}()
|
||||
}
|
||||
|
|
@ -33,11 +33,6 @@ type JupyterManager interface {
|
|||
ListInstalledPackages(ctx context.Context, serviceName string) ([]jupyter.InstalledPackage, error)
|
||||
}
|
||||
|
||||
// _isValidName validates that input strings contain only safe characters.
|
||||
func _isValidName(input string) bool {
|
||||
return len(input) > 0 && len(input) < 256
|
||||
}
|
||||
|
||||
// MLServer is an alias for network.MLServer for backward compatibility.
|
||||
type MLServer = network.MLServer
|
||||
|
||||
|
|
@ -136,11 +131,6 @@ func (w *Worker) runningCount() int {
|
|||
return w.runLoop.RunningCount()
|
||||
}
|
||||
|
||||
func (w *Worker) _getGPUDetector() GPUDetector {
|
||||
factory := &GPUDetectorFactory{}
|
||||
return factory.CreateDetector(w.config)
|
||||
}
|
||||
|
||||
// SelectDependencyManifest re-exports the executor function for API helpers.
|
||||
// It detects the dependency manifest file in the given directory.
|
||||
func SelectDependencyManifest(filesPath string) (string, error) {
|
||||
|
|
|
|||
Loading…
Reference in a new issue