// Package executor provides job execution implementations package executor import ( "context" "fmt" "os" "path/filepath" "time" "github.com/jfraeys/fetch_ml/internal/errtypes" "github.com/jfraeys/fetch_ml/internal/logging" "github.com/jfraeys/fetch_ml/internal/manifest" "github.com/jfraeys/fetch_ml/internal/queue" "github.com/jfraeys/fetch_ml/internal/storage" "github.com/jfraeys/fetch_ml/internal/worker/execution" "github.com/jfraeys/fetch_ml/internal/worker/interfaces" ) // JobRunner orchestrates job execution by delegating to specific executors type JobRunner struct { local interfaces.JobExecutor container interfaces.JobExecutor writer interfaces.ManifestWriter logger *logging.Logger } // NewJobRunner creates a new job runner func NewJobRunner( local interfaces.JobExecutor, container interfaces.JobExecutor, writer interfaces.ManifestWriter, logger *logging.Logger, ) *JobRunner { return &JobRunner{ local: local, container: container, writer: writer, logger: logger, } } // ExecutionMode determines which executor to use type ExecutionMode int const ( // ModeAuto selects based on configuration ModeAuto ExecutionMode = iota // ModeLocal forces local execution ModeLocal // ModeContainer forces container execution ModeContainer ) // Run executes a job with the given mode func (r *JobRunner) Run( ctx context.Context, task *queue.Task, basePath string, mode ExecutionMode, localMode bool, gpuEnv interfaces.ExecutionEnv, ) error { // 1. Setup directories jobDir, outputDir, logFile, err := execution.SetupJobDirectories(basePath, task.JobName, task.ID) if err != nil { return err } // 2. Create execution environment env := interfaces.ExecutionEnv{ JobDir: jobDir, OutputDir: outputDir, LogFile: logFile, GPUDevices: gpuEnv.GPUDevices, GPUEnvVar: gpuEnv.GPUEnvVar, GPUDevicesStr: gpuEnv.GPUDevicesStr, } // 3. Select executor executor := r.selectExecutor(mode, localMode) // 3.5 Validate manifest completeness before execution if r.writer != nil { // Load current manifest and validate if m, err := manifest.LoadFromDir(outputDir); err == nil { validator := manifest.NewValidator() if err := validator.Validate(m); err != nil { r.logger.Error("manifest validation failed - execution blocked", "task", task.ID, "error", err) return &errtypes.TaskExecutionError{ TaskID: task.ID, JobName: task.JobName, Phase: "validation", Message: "manifest incomplete - execution blocked", Err: err, Recoverable: false, // Can't retry - manifest is missing data } } } } // 4. Pre-execution manifest update if r.writer != nil { r.writer.Upsert(outputDir, task, func(m *manifest.RunManifest) { m.MarkStarted(time.Now().UTC()) m.GPUDevices = gpuEnv.GPUDevices if gpuEnv.GPUEnvVar != "" { m.Metadata["gpu_visible_devices"] = gpuEnv.GPUDevicesStr m.Metadata["gpu_visible_env"] = gpuEnv.GPUEnvVar } }) } // 5. Execute execErr := executor.Execute(ctx, task, env) // 6. Post-execution cleanup return r.finalize(task, outputDir, basePath, execErr) } func (r *JobRunner) selectExecutor(mode ExecutionMode, localMode bool) interfaces.JobExecutor { switch mode { case ModeLocal: return r.local case ModeContainer: return r.container default: // ModeAuto if localMode { return r.local } return r.container } } func (r *JobRunner) finalize( task *queue.Task, outputDir string, basePath string, execErr error, ) error { jobPaths := storage.NewJobPaths(basePath) if execErr != nil { // Handle failure failedDir := filepath.Join(jobPaths.FailedPath(), task.JobName) os.MkdirAll(filepath.Dir(failedDir), 0750) os.RemoveAll(failedDir) if r.writer != nil { r.writer.Upsert(outputDir, task, func(m *manifest.RunManifest) { now := time.Now().UTC() exitCode := 1 m.MarkFinished(now, &exitCode, execErr) }) } os.Rename(outputDir, failedDir) if taskErr, ok := execErr.(*errtypes.TaskExecutionError); ok { return taskErr } return &errtypes.TaskExecutionError{ TaskID: task.ID, JobName: task.JobName, Phase: "execution", Err: execErr, } } // Handle success finishedDir := filepath.Join(jobPaths.FinishedPath(), task.JobName) os.MkdirAll(filepath.Dir(finishedDir), 0750) os.RemoveAll(finishedDir) if r.writer != nil { r.writer.Upsert(outputDir, task, func(m *manifest.RunManifest) { now := time.Now().UTC() exitCode := 0 m.MarkFinished(now, &exitCode, nil) }) } if err := os.Rename(outputDir, finishedDir); err != nil { r.logger.Warn("failed to move job to finished dir", "job", task.JobName, "error", err) return fmt.Errorf("failed to move job to finished: %w", err) } return nil }