fetch_ml/internal/worker/execution/setup.go
Jeremie Fraeys c46be7f815
refactor: Phase 4 deferred - Extract GPU utilities and execution helpers
Extracted from execution.go to focused packages:

1. internal/worker/gpu.go (60 lines)
   - gpuVisibleDevicesString() - GPU device string formatting
   - filterExistingDevicePaths() - Device path filtering
   - gpuVisibleEnvVarName() - GPU env var selection
   - Reuses GPUType constants from gpu_detector.go

2. internal/worker/execution/setup.go (108 lines)
   - SetupJobDirectories() - Job directory creation
   - CopyDir() - Directory tree copying
   - copyFile() - Single file copy helper

3. internal/worker/execution/snapshot.go (52 lines)
   - StageSnapshot() - Snapshot staging for jobs
   - StageSnapshotFromPath() - Snapshot staging from path

Updated execution.go:
- Removed 64 lines of GPU utilities (now in gpu.go)
- Reduced from 1,082 to ~1,018 lines
- Still contains main execution flow (runJob, executeJob, etc.)

Build status: Compiles successfully
2026-02-17 14:03:11 -05:00

140 lines
3.1 KiB
Go

// Package execution provides job execution utilities for the worker
package execution
import (
"fmt"
"io"
"os"
"path/filepath"
"strings"
"github.com/jfraeys/fetch_ml/internal/container"
"github.com/jfraeys/fetch_ml/internal/errtypes"
"github.com/jfraeys/fetch_ml/internal/storage"
)
// JobPaths holds the directory paths for a job
type JobPaths struct {
JobDir string
OutputDir string
LogFile string
}
// SetupJobDirectories creates the necessary directories for a job
func SetupJobDirectories(
basePath string,
jobName string,
taskID string,
) (jobDir, outputDir, logFile string, err error) {
jobPaths := storage.NewJobPaths(basePath)
pendingDir := jobPaths.PendingPath()
jobDir = filepath.Join(pendingDir, jobName)
outputDir = filepath.Join(jobPaths.RunningPath(), jobName)
logFile = filepath.Join(outputDir, "output.log")
// Create pending directory
if err := os.MkdirAll(pendingDir, 0750); err != nil {
return "", "", "", &errtypes.TaskExecutionError{
TaskID: taskID,
JobName: jobName,
Phase: "setup",
Err: fmt.Errorf("failed to create pending dir: %w", err),
}
}
// Create job directory in pending
if err := os.MkdirAll(jobDir, 0750); err != nil {
return "", "", "", &errtypes.TaskExecutionError{
TaskID: taskID,
JobName: jobName,
Phase: "setup",
Err: fmt.Errorf("failed to create job dir: %w", err),
}
}
// Sanitize paths
jobDir, err = container.SanitizePath(jobDir)
if err != nil {
return "", "", "", &errtypes.TaskExecutionError{
TaskID: taskID,
JobName: jobName,
Phase: "validation",
Err: err,
}
}
outputDir, err = container.SanitizePath(outputDir)
if err != nil {
return "", "", "", &errtypes.TaskExecutionError{
TaskID: taskID,
JobName: jobName,
Phase: "validation",
Err: err,
}
}
return jobDir, outputDir, logFile, nil
}
// CopyDir copies a directory tree from src to dst
func CopyDir(src, dst string) error {
src = filepath.Clean(src)
dst = filepath.Clean(dst)
srcInfo, err := os.Stat(src)
if err != nil {
return err
}
if !srcInfo.IsDir() {
return fmt.Errorf("source is not a directory")
}
if err := os.MkdirAll(dst, 0750); err != nil {
return err
}
return filepath.WalkDir(src, func(path string, d os.DirEntry, walkErr error) error {
if walkErr != nil {
return walkErr
}
rel, err := filepath.Rel(src, path)
if err != nil {
return err
}
rel = filepath.Clean(rel)
if rel == "." {
return nil
}
if rel == ".." || strings.HasPrefix(rel, "..") {
return fmt.Errorf("invalid relative path")
}
outPath := filepath.Join(dst, rel)
if d.IsDir() {
return os.MkdirAll(outPath, 0750)
}
info, err := d.Info()
if err != nil {
return err
}
mode := info.Mode() & 0777
return copyFile(filepath.Clean(path), outPath, mode)
})
}
// copyFile copies a single file
func copyFile(src, dst string, mode os.FileMode) error {
srcFile, err := os.Open(src)
if err != nil {
return err
}
defer srcFile.Close()
dstFile, err := os.OpenFile(dst, os.O_CREATE|os.O_WRONLY|os.O_TRUNC, mode)
if err != nil {
return err
}
defer dstFile.Close()
_, err = io.Copy(dstFile, srcFile)
return err
}