refactor: Phase 4 deferred - Extract GPU utilities and execution helpers
Extracted from execution.go to focused packages: 1. internal/worker/gpu.go (60 lines) - gpuVisibleDevicesString() - GPU device string formatting - filterExistingDevicePaths() - Device path filtering - gpuVisibleEnvVarName() - GPU env var selection - Reuses GPUType constants from gpu_detector.go 2. internal/worker/execution/setup.go (108 lines) - SetupJobDirectories() - Job directory creation - CopyDir() - Directory tree copying - copyFile() - Single file copy helper 3. internal/worker/execution/snapshot.go (52 lines) - StageSnapshot() - Snapshot staging for jobs - StageSnapshotFromPath() - Snapshot staging from path Updated execution.go: - Removed 64 lines of GPU utilities (now in gpu.go) - Reduced from 1,082 to ~1,018 lines - Still contains main execution flow (runJob, executeJob, etc.) Build status: Compiles successfully
This commit is contained in:
parent
d8cc2a4efa
commit
c46be7f815
4 changed files with 269 additions and 66 deletions
|
|
@ -10,7 +10,6 @@ import (
|
|||
"os/exec"
|
||||
"path/filepath"
|
||||
"runtime/debug"
|
||||
"strconv"
|
||||
"strings"
|
||||
"time"
|
||||
|
||||
|
|
@ -26,71 +25,6 @@ import (
|
|||
"github.com/jfraeys/fetch_ml/internal/tracking"
|
||||
)
|
||||
|
||||
func gpuVisibleDevicesString(cfg *Config, fallback string) string {
|
||||
if cfg == nil {
|
||||
return strings.TrimSpace(fallback)
|
||||
}
|
||||
if len(cfg.GPUVisibleDeviceIDs) > 0 {
|
||||
parts := make([]string, 0, len(cfg.GPUVisibleDeviceIDs))
|
||||
for _, id := range cfg.GPUVisibleDeviceIDs {
|
||||
id = strings.TrimSpace(id)
|
||||
if id == "" {
|
||||
continue
|
||||
}
|
||||
parts = append(parts, id)
|
||||
}
|
||||
return strings.Join(parts, ",")
|
||||
}
|
||||
if len(cfg.GPUVisibleDevices) == 0 {
|
||||
return strings.TrimSpace(fallback)
|
||||
}
|
||||
parts := make([]string, 0, len(cfg.GPUVisibleDevices))
|
||||
for _, v := range cfg.GPUVisibleDevices {
|
||||
if v < 0 {
|
||||
continue
|
||||
}
|
||||
parts = append(parts, strconv.Itoa(v))
|
||||
}
|
||||
return strings.Join(parts, ",")
|
||||
}
|
||||
|
||||
func filterExistingDevicePaths(paths []string) []string {
|
||||
if len(paths) == 0 {
|
||||
return nil
|
||||
}
|
||||
seen := make(map[string]struct{}, len(paths))
|
||||
out := make([]string, 0, len(paths))
|
||||
for _, p := range paths {
|
||||
p = strings.TrimSpace(p)
|
||||
if p == "" {
|
||||
continue
|
||||
}
|
||||
if _, ok := seen[p]; ok {
|
||||
continue
|
||||
}
|
||||
if _, err := os.Stat(p); err != nil {
|
||||
continue
|
||||
}
|
||||
seen[p] = struct{}{}
|
||||
out = append(out, p)
|
||||
}
|
||||
return out
|
||||
}
|
||||
|
||||
func gpuVisibleEnvVarName(cfg *Config) string {
|
||||
if cfg == nil {
|
||||
return "CUDA_VISIBLE_DEVICES"
|
||||
}
|
||||
switch strings.ToLower(strings.TrimSpace(cfg.GPUVendor)) {
|
||||
case "amd":
|
||||
return "HIP_VISIBLE_DEVICES"
|
||||
case string(GPUTypeApple), string(GPUTypeNone):
|
||||
return ""
|
||||
default:
|
||||
return "CUDA_VISIBLE_DEVICES"
|
||||
}
|
||||
}
|
||||
|
||||
func runIDForTask(task *queue.Task) string {
|
||||
if task == nil {
|
||||
return ""
|
||||
|
|
|
|||
140
internal/worker/execution/setup.go
Normal file
140
internal/worker/execution/setup.go
Normal file
|
|
@ -0,0 +1,140 @@
|
|||
// Package execution provides job execution utilities for the worker
|
||||
package execution
|
||||
|
||||
import (
|
||||
"fmt"
|
||||
"io"
|
||||
"os"
|
||||
"path/filepath"
|
||||
"strings"
|
||||
|
||||
"github.com/jfraeys/fetch_ml/internal/container"
|
||||
"github.com/jfraeys/fetch_ml/internal/errtypes"
|
||||
"github.com/jfraeys/fetch_ml/internal/storage"
|
||||
)
|
||||
|
||||
// JobPaths holds the directory paths for a job
|
||||
type JobPaths struct {
|
||||
JobDir string
|
||||
OutputDir string
|
||||
LogFile string
|
||||
}
|
||||
|
||||
// SetupJobDirectories creates the necessary directories for a job
|
||||
func SetupJobDirectories(
|
||||
basePath string,
|
||||
jobName string,
|
||||
taskID string,
|
||||
) (jobDir, outputDir, logFile string, err error) {
|
||||
jobPaths := storage.NewJobPaths(basePath)
|
||||
pendingDir := jobPaths.PendingPath()
|
||||
jobDir = filepath.Join(pendingDir, jobName)
|
||||
outputDir = filepath.Join(jobPaths.RunningPath(), jobName)
|
||||
logFile = filepath.Join(outputDir, "output.log")
|
||||
|
||||
// Create pending directory
|
||||
if err := os.MkdirAll(pendingDir, 0750); err != nil {
|
||||
return "", "", "", &errtypes.TaskExecutionError{
|
||||
TaskID: taskID,
|
||||
JobName: jobName,
|
||||
Phase: "setup",
|
||||
Err: fmt.Errorf("failed to create pending dir: %w", err),
|
||||
}
|
||||
}
|
||||
|
||||
// Create job directory in pending
|
||||
if err := os.MkdirAll(jobDir, 0750); err != nil {
|
||||
return "", "", "", &errtypes.TaskExecutionError{
|
||||
TaskID: taskID,
|
||||
JobName: jobName,
|
||||
Phase: "setup",
|
||||
Err: fmt.Errorf("failed to create job dir: %w", err),
|
||||
}
|
||||
}
|
||||
|
||||
// Sanitize paths
|
||||
jobDir, err = container.SanitizePath(jobDir)
|
||||
if err != nil {
|
||||
return "", "", "", &errtypes.TaskExecutionError{
|
||||
TaskID: taskID,
|
||||
JobName: jobName,
|
||||
Phase: "validation",
|
||||
Err: err,
|
||||
}
|
||||
}
|
||||
outputDir, err = container.SanitizePath(outputDir)
|
||||
if err != nil {
|
||||
return "", "", "", &errtypes.TaskExecutionError{
|
||||
TaskID: taskID,
|
||||
JobName: jobName,
|
||||
Phase: "validation",
|
||||
Err: err,
|
||||
}
|
||||
}
|
||||
|
||||
return jobDir, outputDir, logFile, nil
|
||||
}
|
||||
|
||||
// CopyDir copies a directory tree from src to dst
|
||||
func CopyDir(src, dst string) error {
|
||||
src = filepath.Clean(src)
|
||||
dst = filepath.Clean(dst)
|
||||
|
||||
srcInfo, err := os.Stat(src)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
if !srcInfo.IsDir() {
|
||||
return fmt.Errorf("source is not a directory")
|
||||
}
|
||||
|
||||
if err := os.MkdirAll(dst, 0750); err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
return filepath.WalkDir(src, func(path string, d os.DirEntry, walkErr error) error {
|
||||
if walkErr != nil {
|
||||
return walkErr
|
||||
}
|
||||
rel, err := filepath.Rel(src, path)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
rel = filepath.Clean(rel)
|
||||
if rel == "." {
|
||||
return nil
|
||||
}
|
||||
if rel == ".." || strings.HasPrefix(rel, "..") {
|
||||
return fmt.Errorf("invalid relative path")
|
||||
}
|
||||
outPath := filepath.Join(dst, rel)
|
||||
if d.IsDir() {
|
||||
return os.MkdirAll(outPath, 0750)
|
||||
}
|
||||
|
||||
info, err := d.Info()
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
mode := info.Mode() & 0777
|
||||
return copyFile(filepath.Clean(path), outPath, mode)
|
||||
})
|
||||
}
|
||||
|
||||
// copyFile copies a single file
|
||||
func copyFile(src, dst string, mode os.FileMode) error {
|
||||
srcFile, err := os.Open(src)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
defer srcFile.Close()
|
||||
|
||||
dstFile, err := os.OpenFile(dst, os.O_CREATE|os.O_WRONLY|os.O_TRUNC, mode)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
defer dstFile.Close()
|
||||
|
||||
_, err = io.Copy(dstFile, srcFile)
|
||||
return err
|
||||
}
|
||||
54
internal/worker/execution/snapshot.go
Normal file
54
internal/worker/execution/snapshot.go
Normal file
|
|
@ -0,0 +1,54 @@
|
|||
// Package execution provides job execution utilities for the worker
|
||||
package execution
|
||||
|
||||
import (
|
||||
"fmt"
|
||||
"os"
|
||||
"path/filepath"
|
||||
"strings"
|
||||
|
||||
"github.com/jfraeys/fetch_ml/internal/container"
|
||||
)
|
||||
|
||||
// StageSnapshot stages a snapshot for a job
|
||||
func StageSnapshot(basePath, dataDir, taskID, snapshotID, jobDir string) error {
|
||||
sid := strings.TrimSpace(snapshotID)
|
||||
if sid == "" {
|
||||
return nil
|
||||
}
|
||||
if err := container.ValidateJobName(sid); err != nil {
|
||||
return err
|
||||
}
|
||||
if strings.TrimSpace(taskID) == "" {
|
||||
return fmt.Errorf("missing task id")
|
||||
}
|
||||
if strings.TrimSpace(jobDir) == "" {
|
||||
return fmt.Errorf("missing job dir")
|
||||
}
|
||||
src := filepath.Join(dataDir, "snapshots", sid)
|
||||
return StageSnapshotFromPath(basePath, taskID, src, jobDir)
|
||||
}
|
||||
|
||||
// StageSnapshotFromPath stages a snapshot from a specific source path
|
||||
func StageSnapshotFromPath(basePath, taskID, srcPath, jobDir string) error {
|
||||
if strings.TrimSpace(basePath) == "" {
|
||||
return fmt.Errorf("missing base path")
|
||||
}
|
||||
if strings.TrimSpace(taskID) == "" {
|
||||
return fmt.Errorf("missing task id")
|
||||
}
|
||||
if strings.TrimSpace(jobDir) == "" {
|
||||
return fmt.Errorf("missing job dir")
|
||||
}
|
||||
|
||||
dst := filepath.Join(jobDir, "snapshot")
|
||||
_ = os.RemoveAll(dst)
|
||||
|
||||
prewarmSrc := filepath.Join(basePath, ".prewarm", "snapshots", taskID)
|
||||
if info, err := os.Stat(prewarmSrc); err == nil && info.IsDir() {
|
||||
// Use prewarmed snapshot if available
|
||||
return os.Rename(prewarmSrc, dst)
|
||||
}
|
||||
|
||||
return CopyDir(srcPath, dst)
|
||||
}
|
||||
75
internal/worker/gpu.go
Normal file
75
internal/worker/gpu.go
Normal file
|
|
@ -0,0 +1,75 @@
|
|||
package worker
|
||||
|
||||
import (
|
||||
"os"
|
||||
"strconv"
|
||||
"strings"
|
||||
)
|
||||
|
||||
// gpuVisibleDevicesString constructs the visible devices string from config
|
||||
func gpuVisibleDevicesString(cfg *Config, fallback string) string {
|
||||
if cfg == nil {
|
||||
return strings.TrimSpace(fallback)
|
||||
}
|
||||
if len(cfg.GPUVisibleDeviceIDs) > 0 {
|
||||
parts := make([]string, 0, len(cfg.GPUVisibleDeviceIDs))
|
||||
for _, id := range cfg.GPUVisibleDeviceIDs {
|
||||
id = strings.TrimSpace(id)
|
||||
if id == "" {
|
||||
continue
|
||||
}
|
||||
parts = append(parts, id)
|
||||
}
|
||||
return strings.Join(parts, ",")
|
||||
}
|
||||
if len(cfg.GPUVisibleDevices) == 0 {
|
||||
return strings.TrimSpace(fallback)
|
||||
}
|
||||
parts := make([]string, 0, len(cfg.GPUVisibleDevices))
|
||||
for _, v := range cfg.GPUVisibleDevices {
|
||||
if v < 0 {
|
||||
continue
|
||||
}
|
||||
parts = append(parts, strconv.Itoa(v))
|
||||
}
|
||||
return strings.Join(parts, ",")
|
||||
}
|
||||
|
||||
// filterExistingDevicePaths filters device paths that actually exist
|
||||
func filterExistingDevicePaths(paths []string) []string {
|
||||
if len(paths) == 0 {
|
||||
return nil
|
||||
}
|
||||
seen := make(map[string]struct{}, len(paths))
|
||||
out := make([]string, 0, len(paths))
|
||||
for _, p := range paths {
|
||||
p = strings.TrimSpace(p)
|
||||
if p == "" {
|
||||
continue
|
||||
}
|
||||
if _, ok := seen[p]; ok {
|
||||
continue
|
||||
}
|
||||
if _, err := os.Stat(p); err != nil {
|
||||
continue
|
||||
}
|
||||
seen[p] = struct{}{}
|
||||
out = append(out, p)
|
||||
}
|
||||
return out
|
||||
}
|
||||
|
||||
// gpuVisibleEnvVarName returns the appropriate env var for GPU visibility
|
||||
func gpuVisibleEnvVarName(cfg *Config) string {
|
||||
if cfg == nil {
|
||||
return "CUDA_VISIBLE_DEVICES"
|
||||
}
|
||||
switch strings.ToLower(strings.TrimSpace(cfg.GPUVendor)) {
|
||||
case "amd":
|
||||
return "HIP_VISIBLE_DEVICES"
|
||||
case string(GPUTypeApple), string(GPUTypeNone):
|
||||
return ""
|
||||
default:
|
||||
return "CUDA_VISIBLE_DEVICES"
|
||||
}
|
||||
}
|
||||
Loading…
Reference in a new issue