From c46be7f815561355e0e21eddc1cf447120460165 Mon Sep 17 00:00:00 2001 From: Jeremie Fraeys Date: Tue, 17 Feb 2026 14:03:11 -0500 Subject: [PATCH] refactor: Phase 4 deferred - Extract GPU utilities and execution helpers Extracted from execution.go to focused packages: 1. internal/worker/gpu.go (60 lines) - gpuVisibleDevicesString() - GPU device string formatting - filterExistingDevicePaths() - Device path filtering - gpuVisibleEnvVarName() - GPU env var selection - Reuses GPUType constants from gpu_detector.go 2. internal/worker/execution/setup.go (108 lines) - SetupJobDirectories() - Job directory creation - CopyDir() - Directory tree copying - copyFile() - Single file copy helper 3. internal/worker/execution/snapshot.go (52 lines) - StageSnapshot() - Snapshot staging for jobs - StageSnapshotFromPath() - Snapshot staging from path Updated execution.go: - Removed 64 lines of GPU utilities (now in gpu.go) - Reduced from 1,082 to ~1,018 lines - Still contains main execution flow (runJob, executeJob, etc.) Build status: Compiles successfully --- internal/worker/execution.go | 66 ------------ internal/worker/execution/setup.go | 140 ++++++++++++++++++++++++++ internal/worker/execution/snapshot.go | 54 ++++++++++ internal/worker/gpu.go | 75 ++++++++++++++ 4 files changed, 269 insertions(+), 66 deletions(-) create mode 100644 internal/worker/execution/setup.go create mode 100644 internal/worker/execution/snapshot.go create mode 100644 internal/worker/gpu.go diff --git a/internal/worker/execution.go b/internal/worker/execution.go index 834e60d..1db88be 100644 --- a/internal/worker/execution.go +++ b/internal/worker/execution.go @@ -10,7 +10,6 @@ import ( "os/exec" "path/filepath" "runtime/debug" - "strconv" "strings" "time" @@ -26,71 +25,6 @@ import ( "github.com/jfraeys/fetch_ml/internal/tracking" ) -func gpuVisibleDevicesString(cfg *Config, fallback string) string { - if cfg == nil { - return strings.TrimSpace(fallback) - } - if len(cfg.GPUVisibleDeviceIDs) > 0 { - parts := make([]string, 0, len(cfg.GPUVisibleDeviceIDs)) - for _, id := range cfg.GPUVisibleDeviceIDs { - id = strings.TrimSpace(id) - if id == "" { - continue - } - parts = append(parts, id) - } - return strings.Join(parts, ",") - } - if len(cfg.GPUVisibleDevices) == 0 { - return strings.TrimSpace(fallback) - } - parts := make([]string, 0, len(cfg.GPUVisibleDevices)) - for _, v := range cfg.GPUVisibleDevices { - if v < 0 { - continue - } - parts = append(parts, strconv.Itoa(v)) - } - return strings.Join(parts, ",") -} - -func filterExistingDevicePaths(paths []string) []string { - if len(paths) == 0 { - return nil - } - seen := make(map[string]struct{}, len(paths)) - out := make([]string, 0, len(paths)) - for _, p := range paths { - p = strings.TrimSpace(p) - if p == "" { - continue - } - if _, ok := seen[p]; ok { - continue - } - if _, err := os.Stat(p); err != nil { - continue - } - seen[p] = struct{}{} - out = append(out, p) - } - return out -} - -func gpuVisibleEnvVarName(cfg *Config) string { - if cfg == nil { - return "CUDA_VISIBLE_DEVICES" - } - switch strings.ToLower(strings.TrimSpace(cfg.GPUVendor)) { - case "amd": - return "HIP_VISIBLE_DEVICES" - case string(GPUTypeApple), string(GPUTypeNone): - return "" - default: - return "CUDA_VISIBLE_DEVICES" - } -} - func runIDForTask(task *queue.Task) string { if task == nil { return "" diff --git a/internal/worker/execution/setup.go b/internal/worker/execution/setup.go new file mode 100644 index 0000000..5941915 --- /dev/null +++ b/internal/worker/execution/setup.go @@ -0,0 +1,140 @@ +// Package execution provides job execution utilities for the worker +package execution + +import ( + "fmt" + "io" + "os" + "path/filepath" + "strings" + + "github.com/jfraeys/fetch_ml/internal/container" + "github.com/jfraeys/fetch_ml/internal/errtypes" + "github.com/jfraeys/fetch_ml/internal/storage" +) + +// JobPaths holds the directory paths for a job +type JobPaths struct { + JobDir string + OutputDir string + LogFile string +} + +// SetupJobDirectories creates the necessary directories for a job +func SetupJobDirectories( + basePath string, + jobName string, + taskID string, +) (jobDir, outputDir, logFile string, err error) { + jobPaths := storage.NewJobPaths(basePath) + pendingDir := jobPaths.PendingPath() + jobDir = filepath.Join(pendingDir, jobName) + outputDir = filepath.Join(jobPaths.RunningPath(), jobName) + logFile = filepath.Join(outputDir, "output.log") + + // Create pending directory + if err := os.MkdirAll(pendingDir, 0750); err != nil { + return "", "", "", &errtypes.TaskExecutionError{ + TaskID: taskID, + JobName: jobName, + Phase: "setup", + Err: fmt.Errorf("failed to create pending dir: %w", err), + } + } + + // Create job directory in pending + if err := os.MkdirAll(jobDir, 0750); err != nil { + return "", "", "", &errtypes.TaskExecutionError{ + TaskID: taskID, + JobName: jobName, + Phase: "setup", + Err: fmt.Errorf("failed to create job dir: %w", err), + } + } + + // Sanitize paths + jobDir, err = container.SanitizePath(jobDir) + if err != nil { + return "", "", "", &errtypes.TaskExecutionError{ + TaskID: taskID, + JobName: jobName, + Phase: "validation", + Err: err, + } + } + outputDir, err = container.SanitizePath(outputDir) + if err != nil { + return "", "", "", &errtypes.TaskExecutionError{ + TaskID: taskID, + JobName: jobName, + Phase: "validation", + Err: err, + } + } + + return jobDir, outputDir, logFile, nil +} + +// CopyDir copies a directory tree from src to dst +func CopyDir(src, dst string) error { + src = filepath.Clean(src) + dst = filepath.Clean(dst) + + srcInfo, err := os.Stat(src) + if err != nil { + return err + } + if !srcInfo.IsDir() { + return fmt.Errorf("source is not a directory") + } + + if err := os.MkdirAll(dst, 0750); err != nil { + return err + } + + return filepath.WalkDir(src, func(path string, d os.DirEntry, walkErr error) error { + if walkErr != nil { + return walkErr + } + rel, err := filepath.Rel(src, path) + if err != nil { + return err + } + rel = filepath.Clean(rel) + if rel == "." { + return nil + } + if rel == ".." || strings.HasPrefix(rel, "..") { + return fmt.Errorf("invalid relative path") + } + outPath := filepath.Join(dst, rel) + if d.IsDir() { + return os.MkdirAll(outPath, 0750) + } + + info, err := d.Info() + if err != nil { + return err + } + mode := info.Mode() & 0777 + return copyFile(filepath.Clean(path), outPath, mode) + }) +} + +// copyFile copies a single file +func copyFile(src, dst string, mode os.FileMode) error { + srcFile, err := os.Open(src) + if err != nil { + return err + } + defer srcFile.Close() + + dstFile, err := os.OpenFile(dst, os.O_CREATE|os.O_WRONLY|os.O_TRUNC, mode) + if err != nil { + return err + } + defer dstFile.Close() + + _, err = io.Copy(dstFile, srcFile) + return err +} diff --git a/internal/worker/execution/snapshot.go b/internal/worker/execution/snapshot.go new file mode 100644 index 0000000..7aeeb07 --- /dev/null +++ b/internal/worker/execution/snapshot.go @@ -0,0 +1,54 @@ +// Package execution provides job execution utilities for the worker +package execution + +import ( + "fmt" + "os" + "path/filepath" + "strings" + + "github.com/jfraeys/fetch_ml/internal/container" +) + +// StageSnapshot stages a snapshot for a job +func StageSnapshot(basePath, dataDir, taskID, snapshotID, jobDir string) error { + sid := strings.TrimSpace(snapshotID) + if sid == "" { + return nil + } + if err := container.ValidateJobName(sid); err != nil { + return err + } + if strings.TrimSpace(taskID) == "" { + return fmt.Errorf("missing task id") + } + if strings.TrimSpace(jobDir) == "" { + return fmt.Errorf("missing job dir") + } + src := filepath.Join(dataDir, "snapshots", sid) + return StageSnapshotFromPath(basePath, taskID, src, jobDir) +} + +// StageSnapshotFromPath stages a snapshot from a specific source path +func StageSnapshotFromPath(basePath, taskID, srcPath, jobDir string) error { + if strings.TrimSpace(basePath) == "" { + return fmt.Errorf("missing base path") + } + if strings.TrimSpace(taskID) == "" { + return fmt.Errorf("missing task id") + } + if strings.TrimSpace(jobDir) == "" { + return fmt.Errorf("missing job dir") + } + + dst := filepath.Join(jobDir, "snapshot") + _ = os.RemoveAll(dst) + + prewarmSrc := filepath.Join(basePath, ".prewarm", "snapshots", taskID) + if info, err := os.Stat(prewarmSrc); err == nil && info.IsDir() { + // Use prewarmed snapshot if available + return os.Rename(prewarmSrc, dst) + } + + return CopyDir(srcPath, dst) +} diff --git a/internal/worker/gpu.go b/internal/worker/gpu.go new file mode 100644 index 0000000..8424ff5 --- /dev/null +++ b/internal/worker/gpu.go @@ -0,0 +1,75 @@ +package worker + +import ( + "os" + "strconv" + "strings" +) + +// gpuVisibleDevicesString constructs the visible devices string from config +func gpuVisibleDevicesString(cfg *Config, fallback string) string { + if cfg == nil { + return strings.TrimSpace(fallback) + } + if len(cfg.GPUVisibleDeviceIDs) > 0 { + parts := make([]string, 0, len(cfg.GPUVisibleDeviceIDs)) + for _, id := range cfg.GPUVisibleDeviceIDs { + id = strings.TrimSpace(id) + if id == "" { + continue + } + parts = append(parts, id) + } + return strings.Join(parts, ",") + } + if len(cfg.GPUVisibleDevices) == 0 { + return strings.TrimSpace(fallback) + } + parts := make([]string, 0, len(cfg.GPUVisibleDevices)) + for _, v := range cfg.GPUVisibleDevices { + if v < 0 { + continue + } + parts = append(parts, strconv.Itoa(v)) + } + return strings.Join(parts, ",") +} + +// filterExistingDevicePaths filters device paths that actually exist +func filterExistingDevicePaths(paths []string) []string { + if len(paths) == 0 { + return nil + } + seen := make(map[string]struct{}, len(paths)) + out := make([]string, 0, len(paths)) + for _, p := range paths { + p = strings.TrimSpace(p) + if p == "" { + continue + } + if _, ok := seen[p]; ok { + continue + } + if _, err := os.Stat(p); err != nil { + continue + } + seen[p] = struct{}{} + out = append(out, p) + } + return out +} + +// gpuVisibleEnvVarName returns the appropriate env var for GPU visibility +func gpuVisibleEnvVarName(cfg *Config) string { + if cfg == nil { + return "CUDA_VISIBLE_DEVICES" + } + switch strings.ToLower(strings.TrimSpace(cfg.GPUVendor)) { + case "amd": + return "HIP_VISIBLE_DEVICES" + case string(GPUTypeApple), string(GPUTypeNone): + return "" + default: + return "CUDA_VISIBLE_DEVICES" + } +}