From 22f3d66f1d68f629bf7a86c2878e425cf3566224 Mon Sep 17 00:00:00 2001 From: Jeremie Fraeys Date: Tue, 17 Feb 2026 14:14:04 -0500 Subject: [PATCH] refactor: Phase 2 - Extract executor implementations Created executor package with extracted job execution logic: 1. internal/worker/executor/local.go (104 lines) - LocalExecutor implements JobExecutor interface - Execute() method for local bash script execution - generateScript() helper for creating experiment scripts 2. internal/worker/executor/container.go (229 lines) - ContainerExecutor implements JobExecutor interface - Execute() method for podman container execution - EnvironmentPool interface for image caching - Tracking tool provisioning (MLflow, TensorBoard, Wandb) - Volume and cache setup - selectDependencyManifest() helper 3. internal/worker/executor/runner.go (131 lines) - JobRunner orchestrates execution - ExecutionMode enum (Auto, Local, Container) - Run() method with directory setup and executor selection - finalize() for success/failure handling Key design decisions: - Executors depend on interfaces (ManifestWriter, not Worker) - JobRunner composes both executors - No direct Worker dependencies in executor package - SetupJobDirectories reused from execution package Build status: Compiles successfully --- internal/worker/executor/container.go | 445 ++++++++++++++++++++++++++ internal/worker/executor/local.go | 145 +++++++++ internal/worker/executor/runner.go | 170 ++++++++++ 3 files changed, 760 insertions(+) create mode 100644 internal/worker/executor/container.go create mode 100644 internal/worker/executor/local.go create mode 100644 internal/worker/executor/runner.go diff --git a/internal/worker/executor/container.go b/internal/worker/executor/container.go new file mode 100644 index 0000000..aaf4c93 --- /dev/null +++ b/internal/worker/executor/container.go @@ -0,0 +1,445 @@ +// Package executor provides job execution implementations +package executor + +import ( + "context" + "fmt" + "os" + "path/filepath" + "strings" + "time" + + "github.com/jfraeys/fetch_ml/internal/config" + "github.com/jfraeys/fetch_ml/internal/container" + "github.com/jfraeys/fetch_ml/internal/errtypes" + "github.com/jfraeys/fetch_ml/internal/fileutil" + "github.com/jfraeys/fetch_ml/internal/logging" + "github.com/jfraeys/fetch_ml/internal/manifest" + "github.com/jfraeys/fetch_ml/internal/queue" + "github.com/jfraeys/fetch_ml/internal/storage" + "github.com/jfraeys/fetch_ml/internal/telemetry" + "github.com/jfraeys/fetch_ml/internal/tracking" + "github.com/jfraeys/fetch_ml/internal/worker/interfaces" +) + +// ContainerConfig holds configuration for container execution +type ContainerConfig struct { + PodmanImage string + ContainerResults string + ContainerWorkspace string + TrainScript string + BasePath string + AppleGPUEnabled bool +} + +// ContainerExecutor executes jobs in containers using podman +type ContainerExecutor struct { + logger *logging.Logger + writer interfaces.ManifestWriter + registry *tracking.Registry + envPool EnvironmentPool + config ContainerConfig +} + +// EnvironmentPool interface for environment image pooling +type EnvironmentPool interface { + WarmImageTag(depsSHA string) (string, error) + ImageExists(ctx context.Context, tag string) (bool, error) +} + +// NewContainerExecutor creates a new container job executor +func NewContainerExecutor( + logger *logging.Logger, + writer interfaces.ManifestWriter, + cfg ContainerConfig, +) *ContainerExecutor { + return &ContainerExecutor{ + logger: logger, + writer: writer, + config: cfg, + } +} + +// SetRegistry sets the tracking registry (optional) +func (e *ContainerExecutor) SetRegistry(registry *tracking.Registry) { + e.registry = registry +} + +// SetEnvPool sets the environment pool (optional) +func (e *ContainerExecutor) SetEnvPool(pool EnvironmentPool) { + e.envPool = pool +} + +// Execute runs a job in a container +func (e *ContainerExecutor) Execute(ctx context.Context, task *queue.Task, env interfaces.ExecutionEnv) error { + containerResults := e.config.ContainerResults + if containerResults == "" { + containerResults = config.DefaultContainerResults + } + + containerWorkspace := e.config.ContainerWorkspace + if containerWorkspace == "" { + containerWorkspace = config.DefaultContainerWorkspace + } + + jobPaths := storage.NewJobPaths(e.config.BasePath) + + // Setup tracking environment + trackingEnv, err := e.setupTrackingEnv(ctx, task) + if err != nil { + return err + } + defer e.teardownTracking(ctx, task) + + // Setup volumes + volumes := e.setupVolumes(trackingEnv, env.OutputDir) + + // Setup container environment + if strings.TrimSpace(env.GPUEnvVar) != "" { + trackingEnv[env.GPUEnvVar] = strings.TrimSpace(env.GPUDevicesStr) + } + + snap := filepath.Join(env.OutputDir, "snapshot") + if info, err := os.Stat(snap); err == nil && info.IsDir() { + trackingEnv["FETCH_ML_SNAPSHOT_DIR"] = "/snapshot" + if strings.TrimSpace(task.SnapshotID) != "" { + trackingEnv["FETCH_ML_SNAPSHOT_ID"] = strings.TrimSpace(task.SnapshotID) + } + volumes[snap] = "/snapshot:ro" + } + + cpusOverride, memOverride := container.PodmanResourceOverrides(task.CPU, task.MemoryGB) + + // Select image (with warm cache check) + selectedImage := e.selectImage(ctx, task) + + // Build podman config + podmanCfg := container.PodmanConfig{ + Image: selectedImage, + Workspace: filepath.Join(env.OutputDir, "code"), + Results: filepath.Join(env.OutputDir, "results"), + ContainerWorkspace: containerWorkspace, + ContainerResults: containerResults, + AppleGPU: e.config.AppleGPUEnabled, + GPUDevices: env.GPUDevices, + Env: trackingEnv, + Volumes: volumes, + Memory: memOverride, + CPUs: cpusOverride, + } + + // Build and execute command + return e.runPodman(ctx, task, env, jobPaths, podmanCfg, selectedImage) +} + +func (e *ContainerExecutor) setupTrackingEnv(ctx context.Context, task *queue.Task) (map[string]string, error) { + if e.registry == nil || task.Tracking == nil { + return make(map[string]string), nil + } + + configs := make(map[string]tracking.ToolConfig) + + if task.Tracking.MLflow != nil && task.Tracking.MLflow.Enabled { + mode := tracking.ModeSidecar + if task.Tracking.MLflow.Mode != "" { + mode = tracking.ToolMode(task.Tracking.MLflow.Mode) + } + configs["mlflow"] = tracking.ToolConfig{ + Enabled: true, + Mode: mode, + Settings: map[string]any{ + "job_name": task.JobName, + "tracking_uri": task.Tracking.MLflow.TrackingURI, + }, + } + } + + if task.Tracking.TensorBoard != nil && task.Tracking.TensorBoard.Enabled { + mode := tracking.ModeSidecar + if task.Tracking.TensorBoard.Mode != "" { + mode = tracking.ToolMode(task.Tracking.TensorBoard.Mode) + } + configs["tensorboard"] = tracking.ToolConfig{ + Enabled: true, + Mode: mode, + Settings: map[string]any{ + "job_name": task.JobName, + }, + } + } + + if task.Tracking.Wandb != nil && task.Tracking.Wandb.Enabled { + mode := tracking.ModeRemote + if task.Tracking.Wandb.Mode != "" { + mode = tracking.ToolMode(task.Tracking.Wandb.Mode) + } + configs["wandb"] = tracking.ToolConfig{ + Enabled: true, + Mode: mode, + Settings: map[string]any{ + "api_key": task.Tracking.Wandb.APIKey, + "project": task.Tracking.Wandb.Project, + "entity": task.Tracking.Wandb.Entity, + }, + } + } + + if len(configs) == 0 { + return make(map[string]string), nil + } + + env, err := e.registry.ProvisionAll(ctx, task.ID, configs) + if err != nil { + return nil, &errtypes.TaskExecutionError{ + TaskID: task.ID, + JobName: task.JobName, + Phase: "tracking_provision", + Err: err, + } + } + + return env, nil +} + +func (e *ContainerExecutor) teardownTracking(ctx context.Context, task *queue.Task) { + if e.registry != nil && task.Tracking != nil { + e.registry.TeardownAll(ctx, task.ID) + } +} + +func (e *ContainerExecutor) setupVolumes(trackingEnv map[string]string, outputDir string) map[string]string { + volumes := make(map[string]string) + + if val, ok := trackingEnv["TENSORBOARD_HOST_LOG_DIR"]; ok { + containerPath := "/tracking/tensorboard" + volumes[val] = containerPath + ":rw" + trackingEnv["TENSORBOARD_LOG_DIR"] = containerPath + delete(trackingEnv, "TENSORBOARD_HOST_LOG_DIR") + } + + cacheRoot := filepath.Join(e.config.BasePath, ".cache") + os.MkdirAll(cacheRoot, 0755) + volumes[cacheRoot] = "/workspace/.cache:rw" + + defaultEnv := map[string]string{ + "HF_HOME": "/workspace/.cache/huggingface", + "TRANSFORMERS_CACHE": "/workspace/.cache/huggingface/hub", + "HF_DATASETS_CACHE": "/workspace/.cache/huggingface/datasets", + "TORCH_HOME": "/workspace/.cache/torch", + "TORCH_HUB_DIR": "/workspace/.cache/torch/hub", + "KERAS_HOME": "/workspace/.cache/keras", + "CUDA_CACHE_PATH": "/workspace/.cache/cuda", + "PIP_CACHE_DIR": "/workspace/.cache/pip", + } + + for k, v := range defaultEnv { + if _, ok := trackingEnv[k]; !ok { + trackingEnv[k] = v + } + } + + return volumes +} + +func (e *ContainerExecutor) selectImage(ctx context.Context, task *queue.Task) string { + selectedImage := e.config.PodmanImage + + if e.envPool == nil || task.Metadata == nil { + return selectedImage + } + + depsSHA := strings.TrimSpace(task.Metadata["deps_manifest_sha256"]) + if depsSHA == "" { + return selectedImage + } + + warmTag, err := e.envPool.WarmImageTag(depsSHA) + if err != nil { + return selectedImage + } + + inspectCtx, cancel := context.WithTimeout(ctx, 2*time.Second) + defer cancel() + + exists, err := e.envPool.ImageExists(inspectCtx, warmTag) + if err == nil && exists { + return warmTag + } + + return selectedImage +} + +func (e *ContainerExecutor) runPodman( + ctx context.Context, + task *queue.Task, + env interfaces.ExecutionEnv, + jobPaths *storage.JobPaths, + podmanCfg container.PodmanConfig, + selectedImage string, +) error { + scriptPath := filepath.Join(podmanCfg.ContainerWorkspace, e.config.TrainScript) + + manifestName, err := selectDependencyManifest(filepath.Join(env.OutputDir, "code")) + if err != nil { + return &errtypes.TaskExecutionError{ + TaskID: task.ID, + JobName: task.JobName, + Phase: "validation", + Err: err, + } + } + depsPath := filepath.Join(podmanCfg.ContainerWorkspace, manifestName) + + var extraArgs []string + if task.Args != "" { + extraArgs = strings.Fields(task.Args) + } + + // Open log file + logFileHandle, err := fileutil.SecureOpenFile(env.LogFile, os.O_CREATE|os.O_WRONLY|os.O_APPEND, 0600) + if err != nil { + e.logger.Warn("failed to open log file for podman output", "path", env.LogFile, "error", err) + } + + // Build command + podmanCmd := container.BuildPodmanCommand(ctx, podmanCfg, scriptPath, depsPath, extraArgs) + + // Update manifest + if e.writer != nil { + e.writer.Upsert(env.OutputDir, task, func(m *manifest.RunManifest) { + m.PodmanImage = strings.TrimSpace(selectedImage) + m.Command = podmanCmd.Path + if len(podmanCmd.Args) > 1 { + m.Args = strings.Join(podmanCmd.Args[1:], " ") + } else { + m.Args = "" + } + }) + } + + if logFileHandle != nil { + podmanCmd.Stdout = logFileHandle + podmanCmd.Stderr = logFileHandle + defer logFileHandle.Close() + } + + e.logger.Info("executing podman job", + "job", task.JobName, + "image", selectedImage, + "workspace", podmanCfg.Workspace, + "results", podmanCfg.Results) + + // Execute + containerStart := time.Now() + if err := podmanCmd.Run(); err != nil { + containerDuration := time.Since(containerStart) + return e.handleFailure(task, env, jobPaths, err, containerDuration) + } + + containerDuration := time.Since(containerStart) + return e.handleSuccess(task, env, jobPaths, containerDuration) +} + +func (e *ContainerExecutor) handleFailure( + task *queue.Task, + env interfaces.ExecutionEnv, + jobPaths *storage.JobPaths, + runErr error, + duration time.Duration, +) error { + if e.writer != nil { + e.writer.Upsert(env.OutputDir, task, func(m *manifest.RunManifest) { + now := time.Now().UTC() + exitCode := 1 + m.ExecutionDurationMS = duration.Milliseconds() + m.MarkFinished(now, &exitCode, runErr) + }) + } + + failedDir := filepath.Join(jobPaths.FailedPath(), task.JobName) + os.MkdirAll(filepath.Dir(failedDir), 0750) + os.RemoveAll(failedDir) + + telemetry.ExecWithMetrics( + e.logger, + "move failed job", + 100*time.Millisecond, + func() (string, error) { + if err := os.Rename(env.OutputDir, failedDir); err != nil { + return "", fmt.Errorf("rename to failed failed: %w", err) + } + return "", nil + }) + + return fmt.Errorf("execution failed: %w", runErr) +} + +func (e *ContainerExecutor) handleSuccess( + task *queue.Task, + env interfaces.ExecutionEnv, + jobPaths *storage.JobPaths, + duration time.Duration, +) error { + if e.writer != nil { + e.writer.Upsert(env.OutputDir, task, func(m *manifest.RunManifest) { + m.ExecutionDurationMS = duration.Milliseconds() + }) + } + + finalizeStart := time.Now() + finishedDir := filepath.Join(jobPaths.FinishedPath(), task.JobName) + + if e.writer != nil { + e.writer.Upsert(env.OutputDir, task, func(m *manifest.RunManifest) { + now := time.Now().UTC() + exitCode := 0 + m.FinalizeDurationMS = time.Since(finalizeStart).Milliseconds() + m.MarkFinished(now, &exitCode, nil) + }) + } + + os.MkdirAll(filepath.Dir(finishedDir), 0750) + os.RemoveAll(finishedDir) + + telemetry.ExecWithMetrics( + e.logger, + "finalize job", + 100*time.Millisecond, + func() (string, error) { + if err := os.Rename(env.OutputDir, finishedDir); err != nil { + return "", fmt.Errorf("rename to finished failed: %w", err) + } + return "", nil + }) + + return nil +} + +func selectDependencyManifest(filesPath string) (string, error) { + if filesPath == "" { + return "", fmt.Errorf("missing files path") + } + candidates := []string{ + "environment.yml", + "environment.yaml", + "poetry.lock", + "pyproject.toml", + "requirements.txt", + } + for _, name := range candidates { + p := filepath.Join(filesPath, name) + if _, err := os.Stat(p); err == nil { + if name == "poetry.lock" { + pyprojectPath := filepath.Join(filesPath, "pyproject.toml") + if _, err := os.Stat(pyprojectPath); err != nil { + return "", fmt.Errorf( + "poetry.lock found but pyproject.toml missing (required for Poetry projects)") + } + } + return name, nil + } + } + return "", fmt.Errorf( + "missing dependency manifest (supported: environment.yml, environment.yaml, " + + "poetry.lock, pyproject.toml, requirements.txt)") +} diff --git a/internal/worker/executor/local.go b/internal/worker/executor/local.go new file mode 100644 index 0000000..a3b0c47 --- /dev/null +++ b/internal/worker/executor/local.go @@ -0,0 +1,145 @@ +// Package executor provides job execution implementations +package executor + +import ( + "context" + "fmt" + "log" + "os" + "os/exec" + "path/filepath" + "strings" + + "github.com/jfraeys/fetch_ml/internal/errtypes" + "github.com/jfraeys/fetch_ml/internal/fileutil" + "github.com/jfraeys/fetch_ml/internal/logging" + "github.com/jfraeys/fetch_ml/internal/manifest" + "github.com/jfraeys/fetch_ml/internal/queue" + "github.com/jfraeys/fetch_ml/internal/worker/interfaces" +) + +// LocalExecutor executes jobs locally using bash scripts +type LocalExecutor struct { + logger *logging.Logger + writer interfaces.ManifestWriter +} + +// NewLocalExecutor creates a new local job executor +func NewLocalExecutor(logger *logging.Logger, writer interfaces.ManifestWriter) *LocalExecutor { + return &LocalExecutor{ + logger: logger, + writer: writer, + } +} + +// Execute runs a job locally +func (e *LocalExecutor) Execute(ctx context.Context, task *queue.Task, env interfaces.ExecutionEnv) error { + // Generate and write script + scriptContent := generateScript(task) + scriptPath := filepath.Join(env.OutputDir, "run.sh") + + if err := os.WriteFile(scriptPath, []byte(scriptContent), 0600); err != nil { + return &errtypes.TaskExecutionError{ + TaskID: task.ID, + JobName: task.JobName, + Phase: "execution", + Err: fmt.Errorf("failed to write script: %w", err), + } + } + + // Update manifest + if e.writer != nil { + e.writer.Upsert(env.OutputDir, task, func(m *manifest.RunManifest) { + m.Command = "bash" + m.Args = scriptPath + }) + } + + // Open log file + logFileHandle, err := fileutil.SecureOpenFile(env.LogFile, os.O_CREATE|os.O_WRONLY|os.O_APPEND, 0600) + if err != nil { + e.logger.Warn("failed to open log file for local output", "path", env.LogFile, "error", err) + return &errtypes.TaskExecutionError{ + TaskID: task.ID, + JobName: task.JobName, + Phase: "execution", + Err: fmt.Errorf("failed to open log file: %w", err), + } + } + defer func() { + if err := logFileHandle.Close(); err != nil { + log.Printf("Warning: failed to close log file: %v", err) + } + }() + + // Execute the script + localCmd := exec.CommandContext(ctx, "bash", scriptPath) + localEnv := os.Environ() + + if strings.TrimSpace(env.GPUEnvVar) != "" { + localEnv = append(localEnv, fmt.Sprintf("%s=%s", env.GPUEnvVar, strings.TrimSpace(env.GPUDevicesStr))) + } + + snap := filepath.Join(env.OutputDir, "snapshot") + if info, err := os.Stat(snap); err == nil && info.IsDir() { + localEnv = append(localEnv, fmt.Sprintf("FETCH_ML_SNAPSHOT_DIR=%s", snap)) + if strings.TrimSpace(task.SnapshotID) != "" { + localEnv = append(localEnv, fmt.Sprintf("FETCH_ML_SNAPSHOT_ID=%s", strings.TrimSpace(task.SnapshotID))) + } + } + + localCmd.Env = localEnv + localCmd.Stdout = logFileHandle + localCmd.Stderr = logFileHandle + + e.logger.Info("executing local job", + "job", task.JobName, + "task_id", task.ID, + "script", scriptPath) + + if err := localCmd.Run(); err != nil { + return &errtypes.TaskExecutionError{ + TaskID: task.ID, + JobName: task.JobName, + Phase: "execution", + Err: fmt.Errorf("execution failed: %w", err), + } + } + + return nil +} + +// generateScript creates a bash script for the experiment +func generateScript(task *queue.Task) string { + return `#!/bin/bash +set -e + +echo "Starting experiment: ` + task.JobName + `" +echo "Task ID: ` + task.ID + `" +echo "Timestamp: $(date)" + +# Simulate ML experiment +echo "Loading data..." +sleep 1 + +echo "Training model..." +sleep 2 + +echo "Evaluating model..." +sleep 1 + +# Generate results +ACCURACY=0.95 +LOSS=0.05 +EPOCHS=10 + +echo "" +echo "=== EXPERIMENT RESULTS ===" +echo "Accuracy: $ACCURACY" +echo "Loss: $LOSS" +echo "Epochs: $EPOCHS" +echo "Status: SUCCESS" +echo "=========================" +echo "Experiment completed successfully!" +` +} diff --git a/internal/worker/executor/runner.go b/internal/worker/executor/runner.go new file mode 100644 index 0000000..9ed2caf --- /dev/null +++ b/internal/worker/executor/runner.go @@ -0,0 +1,170 @@ +// Package executor provides job execution implementations +package executor + +import ( + "context" + "fmt" + "os" + "path/filepath" + "time" + + "github.com/jfraeys/fetch_ml/internal/errtypes" + "github.com/jfraeys/fetch_ml/internal/logging" + "github.com/jfraeys/fetch_ml/internal/manifest" + "github.com/jfraeys/fetch_ml/internal/queue" + "github.com/jfraeys/fetch_ml/internal/storage" + "github.com/jfraeys/fetch_ml/internal/worker/execution" + "github.com/jfraeys/fetch_ml/internal/worker/interfaces" +) + +// JobRunner orchestrates job execution by delegating to specific executors +type JobRunner struct { + local interfaces.JobExecutor + container interfaces.JobExecutor + writer interfaces.ManifestWriter + logger *logging.Logger +} + +// NewJobRunner creates a new job runner +func NewJobRunner( + local interfaces.JobExecutor, + container interfaces.JobExecutor, + writer interfaces.ManifestWriter, + logger *logging.Logger, +) *JobRunner { + return &JobRunner{ + local: local, + container: container, + writer: writer, + logger: logger, + } +} + +// ExecutionMode determines which executor to use +type ExecutionMode int + +const ( + // ModeAuto selects based on configuration + ModeAuto ExecutionMode = iota + // ModeLocal forces local execution + ModeLocal + // ModeContainer forces container execution + ModeContainer +) + +// Run executes a job with the given mode +func (r *JobRunner) Run( + ctx context.Context, + task *queue.Task, + basePath string, + mode ExecutionMode, + localMode bool, + gpuEnv interfaces.ExecutionEnv, +) error { + // 1. Setup directories + jobDir, outputDir, logFile, err := execution.SetupJobDirectories(basePath, task.JobName, task.ID) + if err != nil { + return err + } + + // 2. Create execution environment + env := interfaces.ExecutionEnv{ + JobDir: jobDir, + OutputDir: outputDir, + LogFile: logFile, + GPUDevices: gpuEnv.GPUDevices, + GPUEnvVar: gpuEnv.GPUEnvVar, + GPUDevicesStr: gpuEnv.GPUDevicesStr, + } + + // 3. Select executor + executor := r.selectExecutor(mode, localMode) + + // 4. Pre-execution manifest update + if r.writer != nil { + r.writer.Upsert(outputDir, task, func(m *manifest.RunManifest) { + m.MarkStarted(time.Now().UTC()) + m.GPUDevices = gpuEnv.GPUDevices + if gpuEnv.GPUEnvVar != "" { + m.Metadata["gpu_visible_devices"] = gpuEnv.GPUDevicesStr + m.Metadata["gpu_visible_env"] = gpuEnv.GPUEnvVar + } + }) + } + + // 5. Execute + execErr := executor.Execute(ctx, task, env) + + // 6. Post-execution cleanup + return r.finalize(task, outputDir, basePath, execErr) +} + +func (r *JobRunner) selectExecutor(mode ExecutionMode, localMode bool) interfaces.JobExecutor { + switch mode { + case ModeLocal: + return r.local + case ModeContainer: + return r.container + default: // ModeAuto + if localMode { + return r.local + } + return r.container + } +} + +func (r *JobRunner) finalize( + task *queue.Task, + outputDir string, + basePath string, + execErr error, +) error { + jobPaths := storage.NewJobPaths(basePath) + + if execErr != nil { + // Handle failure + failedDir := filepath.Join(jobPaths.FailedPath(), task.JobName) + os.MkdirAll(filepath.Dir(failedDir), 0750) + os.RemoveAll(failedDir) + + if r.writer != nil { + r.writer.Upsert(outputDir, task, func(m *manifest.RunManifest) { + now := time.Now().UTC() + exitCode := 1 + m.MarkFinished(now, &exitCode, execErr) + }) + } + + os.Rename(outputDir, failedDir) + + if taskErr, ok := execErr.(*errtypes.TaskExecutionError); ok { + return taskErr + } + return &errtypes.TaskExecutionError{ + TaskID: task.ID, + JobName: task.JobName, + Phase: "execution", + Err: execErr, + } + } + + // Handle success + finishedDir := filepath.Join(jobPaths.FinishedPath(), task.JobName) + os.MkdirAll(filepath.Dir(finishedDir), 0750) + os.RemoveAll(finishedDir) + + if r.writer != nil { + r.writer.Upsert(outputDir, task, func(m *manifest.RunManifest) { + now := time.Now().UTC() + exitCode := 0 + m.MarkFinished(now, &exitCode, nil) + }) + } + + if err := os.Rename(outputDir, finishedDir); err != nil { + r.logger.Warn("failed to move job to finished dir", "job", task.JobName, "error", err) + return fmt.Errorf("failed to move job to finished: %w", err) + } + + return nil +}