// Package execution provides job execution utilities for the worker package execution import ( "fmt" "os" "path/filepath" "strings" "github.com/jfraeys/fetch_ml/internal/container" ) // StageSnapshot stages a snapshot for a job func StageSnapshot(basePath, dataDir, taskID, snapshotID, jobDir string) error { sid := strings.TrimSpace(snapshotID) if sid == "" { return nil } if err := container.ValidateJobName(sid); err != nil { return err } if strings.TrimSpace(taskID) == "" { return fmt.Errorf("missing task id") } if strings.TrimSpace(jobDir) == "" { return fmt.Errorf("missing job dir") } src := filepath.Join(dataDir, "snapshots", sid) return StageSnapshotFromPath(basePath, taskID, src, jobDir) } // StageSnapshotFromPath stages a snapshot from a specific source path func StageSnapshotFromPath(basePath, taskID, srcPath, jobDir string) error { if strings.TrimSpace(basePath) == "" { return fmt.Errorf("missing base path") } if strings.TrimSpace(taskID) == "" { return fmt.Errorf("missing task id") } if strings.TrimSpace(jobDir) == "" { return fmt.Errorf("missing job dir") } dst := filepath.Join(jobDir, "snapshot") _ = os.RemoveAll(dst) prewarmSrc := filepath.Join(basePath, ".prewarm", "snapshots", taskID) if info, err := os.Stat(prewarmSrc); err == nil && info.IsDir() { // Use prewarmed snapshot if available return os.Rename(prewarmSrc, dst) } return CopyDir(srcPath, dst) }