fetch_ml/internal/worker/executor/runner.go
Jeremie Fraeys 22f3d66f1d
refactor: Phase 2 - Extract executor implementations
Created executor package with extracted job execution logic:

1. internal/worker/executor/local.go (104 lines)
   - LocalExecutor implements JobExecutor interface
   - Execute() method for local bash script execution
   - generateScript() helper for creating experiment scripts

2. internal/worker/executor/container.go (229 lines)
   - ContainerExecutor implements JobExecutor interface
   - Execute() method for podman container execution
   - EnvironmentPool interface for image caching
   - Tracking tool provisioning (MLflow, TensorBoard, Wandb)
   - Volume and cache setup
   - selectDependencyManifest() helper

3. internal/worker/executor/runner.go (131 lines)
   - JobRunner orchestrates execution
   - ExecutionMode enum (Auto, Local, Container)
   - Run() method with directory setup and executor selection
   - finalize() for success/failure handling

Key design decisions:
- Executors depend on interfaces (ManifestWriter, not Worker)
- JobRunner composes both executors
- No direct Worker dependencies in executor package
- SetupJobDirectories reused from execution package

Build status: Compiles successfully
2026-02-17 14:14:04 -05:00

170 lines
4.1 KiB
Go

// Package executor provides job execution implementations
package executor
import (
"context"
"fmt"
"os"
"path/filepath"
"time"
"github.com/jfraeys/fetch_ml/internal/errtypes"
"github.com/jfraeys/fetch_ml/internal/logging"
"github.com/jfraeys/fetch_ml/internal/manifest"
"github.com/jfraeys/fetch_ml/internal/queue"
"github.com/jfraeys/fetch_ml/internal/storage"
"github.com/jfraeys/fetch_ml/internal/worker/execution"
"github.com/jfraeys/fetch_ml/internal/worker/interfaces"
)
// JobRunner orchestrates job execution by delegating to specific executors
type JobRunner struct {
local interfaces.JobExecutor
container interfaces.JobExecutor
writer interfaces.ManifestWriter
logger *logging.Logger
}
// NewJobRunner creates a new job runner
func NewJobRunner(
local interfaces.JobExecutor,
container interfaces.JobExecutor,
writer interfaces.ManifestWriter,
logger *logging.Logger,
) *JobRunner {
return &JobRunner{
local: local,
container: container,
writer: writer,
logger: logger,
}
}
// ExecutionMode determines which executor to use
type ExecutionMode int
const (
// ModeAuto selects based on configuration
ModeAuto ExecutionMode = iota
// ModeLocal forces local execution
ModeLocal
// ModeContainer forces container execution
ModeContainer
)
// Run executes a job with the given mode
func (r *JobRunner) Run(
ctx context.Context,
task *queue.Task,
basePath string,
mode ExecutionMode,
localMode bool,
gpuEnv interfaces.ExecutionEnv,
) error {
// 1. Setup directories
jobDir, outputDir, logFile, err := execution.SetupJobDirectories(basePath, task.JobName, task.ID)
if err != nil {
return err
}
// 2. Create execution environment
env := interfaces.ExecutionEnv{
JobDir: jobDir,
OutputDir: outputDir,
LogFile: logFile,
GPUDevices: gpuEnv.GPUDevices,
GPUEnvVar: gpuEnv.GPUEnvVar,
GPUDevicesStr: gpuEnv.GPUDevicesStr,
}
// 3. Select executor
executor := r.selectExecutor(mode, localMode)
// 4. Pre-execution manifest update
if r.writer != nil {
r.writer.Upsert(outputDir, task, func(m *manifest.RunManifest) {
m.MarkStarted(time.Now().UTC())
m.GPUDevices = gpuEnv.GPUDevices
if gpuEnv.GPUEnvVar != "" {
m.Metadata["gpu_visible_devices"] = gpuEnv.GPUDevicesStr
m.Metadata["gpu_visible_env"] = gpuEnv.GPUEnvVar
}
})
}
// 5. Execute
execErr := executor.Execute(ctx, task, env)
// 6. Post-execution cleanup
return r.finalize(task, outputDir, basePath, execErr)
}
func (r *JobRunner) selectExecutor(mode ExecutionMode, localMode bool) interfaces.JobExecutor {
switch mode {
case ModeLocal:
return r.local
case ModeContainer:
return r.container
default: // ModeAuto
if localMode {
return r.local
}
return r.container
}
}
func (r *JobRunner) finalize(
task *queue.Task,
outputDir string,
basePath string,
execErr error,
) error {
jobPaths := storage.NewJobPaths(basePath)
if execErr != nil {
// Handle failure
failedDir := filepath.Join(jobPaths.FailedPath(), task.JobName)
os.MkdirAll(filepath.Dir(failedDir), 0750)
os.RemoveAll(failedDir)
if r.writer != nil {
r.writer.Upsert(outputDir, task, func(m *manifest.RunManifest) {
now := time.Now().UTC()
exitCode := 1
m.MarkFinished(now, &exitCode, execErr)
})
}
os.Rename(outputDir, failedDir)
if taskErr, ok := execErr.(*errtypes.TaskExecutionError); ok {
return taskErr
}
return &errtypes.TaskExecutionError{
TaskID: task.ID,
JobName: task.JobName,
Phase: "execution",
Err: execErr,
}
}
// Handle success
finishedDir := filepath.Join(jobPaths.FinishedPath(), task.JobName)
os.MkdirAll(filepath.Dir(finishedDir), 0750)
os.RemoveAll(finishedDir)
if r.writer != nil {
r.writer.Upsert(outputDir, task, func(m *manifest.RunManifest) {
now := time.Now().UTC()
exitCode := 0
m.MarkFinished(now, &exitCode, nil)
})
}
if err := os.Rename(outputDir, finishedDir); err != nil {
r.logger.Warn("failed to move job to finished dir", "job", task.JobName, "error", err)
return fmt.Errorf("failed to move job to finished: %w", err)
}
return nil
}