refactor: Phase 2 - Extract executor implementations
Created executor package with extracted job execution logic: 1. internal/worker/executor/local.go (104 lines) - LocalExecutor implements JobExecutor interface - Execute() method for local bash script execution - generateScript() helper for creating experiment scripts 2. internal/worker/executor/container.go (229 lines) - ContainerExecutor implements JobExecutor interface - Execute() method for podman container execution - EnvironmentPool interface for image caching - Tracking tool provisioning (MLflow, TensorBoard, Wandb) - Volume and cache setup - selectDependencyManifest() helper 3. internal/worker/executor/runner.go (131 lines) - JobRunner orchestrates execution - ExecutionMode enum (Auto, Local, Container) - Run() method with directory setup and executor selection - finalize() for success/failure handling Key design decisions: - Executors depend on interfaces (ManifestWriter, not Worker) - JobRunner composes both executors - No direct Worker dependencies in executor package - SetupJobDirectories reused from execution package Build status: Compiles successfully
This commit is contained in:
parent
ae0a370fb4
commit
22f3d66f1d
3 changed files with 760 additions and 0 deletions
445
internal/worker/executor/container.go
Normal file
445
internal/worker/executor/container.go
Normal file
|
|
@ -0,0 +1,445 @@
|
|||
// Package executor provides job execution implementations
|
||||
package executor
|
||||
|
||||
import (
|
||||
"context"
|
||||
"fmt"
|
||||
"os"
|
||||
"path/filepath"
|
||||
"strings"
|
||||
"time"
|
||||
|
||||
"github.com/jfraeys/fetch_ml/internal/config"
|
||||
"github.com/jfraeys/fetch_ml/internal/container"
|
||||
"github.com/jfraeys/fetch_ml/internal/errtypes"
|
||||
"github.com/jfraeys/fetch_ml/internal/fileutil"
|
||||
"github.com/jfraeys/fetch_ml/internal/logging"
|
||||
"github.com/jfraeys/fetch_ml/internal/manifest"
|
||||
"github.com/jfraeys/fetch_ml/internal/queue"
|
||||
"github.com/jfraeys/fetch_ml/internal/storage"
|
||||
"github.com/jfraeys/fetch_ml/internal/telemetry"
|
||||
"github.com/jfraeys/fetch_ml/internal/tracking"
|
||||
"github.com/jfraeys/fetch_ml/internal/worker/interfaces"
|
||||
)
|
||||
|
||||
// ContainerConfig holds configuration for container execution
|
||||
type ContainerConfig struct {
|
||||
PodmanImage string
|
||||
ContainerResults string
|
||||
ContainerWorkspace string
|
||||
TrainScript string
|
||||
BasePath string
|
||||
AppleGPUEnabled bool
|
||||
}
|
||||
|
||||
// ContainerExecutor executes jobs in containers using podman
|
||||
type ContainerExecutor struct {
|
||||
logger *logging.Logger
|
||||
writer interfaces.ManifestWriter
|
||||
registry *tracking.Registry
|
||||
envPool EnvironmentPool
|
||||
config ContainerConfig
|
||||
}
|
||||
|
||||
// EnvironmentPool interface for environment image pooling
|
||||
type EnvironmentPool interface {
|
||||
WarmImageTag(depsSHA string) (string, error)
|
||||
ImageExists(ctx context.Context, tag string) (bool, error)
|
||||
}
|
||||
|
||||
// NewContainerExecutor creates a new container job executor
|
||||
func NewContainerExecutor(
|
||||
logger *logging.Logger,
|
||||
writer interfaces.ManifestWriter,
|
||||
cfg ContainerConfig,
|
||||
) *ContainerExecutor {
|
||||
return &ContainerExecutor{
|
||||
logger: logger,
|
||||
writer: writer,
|
||||
config: cfg,
|
||||
}
|
||||
}
|
||||
|
||||
// SetRegistry sets the tracking registry (optional)
|
||||
func (e *ContainerExecutor) SetRegistry(registry *tracking.Registry) {
|
||||
e.registry = registry
|
||||
}
|
||||
|
||||
// SetEnvPool sets the environment pool (optional)
|
||||
func (e *ContainerExecutor) SetEnvPool(pool EnvironmentPool) {
|
||||
e.envPool = pool
|
||||
}
|
||||
|
||||
// Execute runs a job in a container
|
||||
func (e *ContainerExecutor) Execute(ctx context.Context, task *queue.Task, env interfaces.ExecutionEnv) error {
|
||||
containerResults := e.config.ContainerResults
|
||||
if containerResults == "" {
|
||||
containerResults = config.DefaultContainerResults
|
||||
}
|
||||
|
||||
containerWorkspace := e.config.ContainerWorkspace
|
||||
if containerWorkspace == "" {
|
||||
containerWorkspace = config.DefaultContainerWorkspace
|
||||
}
|
||||
|
||||
jobPaths := storage.NewJobPaths(e.config.BasePath)
|
||||
|
||||
// Setup tracking environment
|
||||
trackingEnv, err := e.setupTrackingEnv(ctx, task)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
defer e.teardownTracking(ctx, task)
|
||||
|
||||
// Setup volumes
|
||||
volumes := e.setupVolumes(trackingEnv, env.OutputDir)
|
||||
|
||||
// Setup container environment
|
||||
if strings.TrimSpace(env.GPUEnvVar) != "" {
|
||||
trackingEnv[env.GPUEnvVar] = strings.TrimSpace(env.GPUDevicesStr)
|
||||
}
|
||||
|
||||
snap := filepath.Join(env.OutputDir, "snapshot")
|
||||
if info, err := os.Stat(snap); err == nil && info.IsDir() {
|
||||
trackingEnv["FETCH_ML_SNAPSHOT_DIR"] = "/snapshot"
|
||||
if strings.TrimSpace(task.SnapshotID) != "" {
|
||||
trackingEnv["FETCH_ML_SNAPSHOT_ID"] = strings.TrimSpace(task.SnapshotID)
|
||||
}
|
||||
volumes[snap] = "/snapshot:ro"
|
||||
}
|
||||
|
||||
cpusOverride, memOverride := container.PodmanResourceOverrides(task.CPU, task.MemoryGB)
|
||||
|
||||
// Select image (with warm cache check)
|
||||
selectedImage := e.selectImage(ctx, task)
|
||||
|
||||
// Build podman config
|
||||
podmanCfg := container.PodmanConfig{
|
||||
Image: selectedImage,
|
||||
Workspace: filepath.Join(env.OutputDir, "code"),
|
||||
Results: filepath.Join(env.OutputDir, "results"),
|
||||
ContainerWorkspace: containerWorkspace,
|
||||
ContainerResults: containerResults,
|
||||
AppleGPU: e.config.AppleGPUEnabled,
|
||||
GPUDevices: env.GPUDevices,
|
||||
Env: trackingEnv,
|
||||
Volumes: volumes,
|
||||
Memory: memOverride,
|
||||
CPUs: cpusOverride,
|
||||
}
|
||||
|
||||
// Build and execute command
|
||||
return e.runPodman(ctx, task, env, jobPaths, podmanCfg, selectedImage)
|
||||
}
|
||||
|
||||
func (e *ContainerExecutor) setupTrackingEnv(ctx context.Context, task *queue.Task) (map[string]string, error) {
|
||||
if e.registry == nil || task.Tracking == nil {
|
||||
return make(map[string]string), nil
|
||||
}
|
||||
|
||||
configs := make(map[string]tracking.ToolConfig)
|
||||
|
||||
if task.Tracking.MLflow != nil && task.Tracking.MLflow.Enabled {
|
||||
mode := tracking.ModeSidecar
|
||||
if task.Tracking.MLflow.Mode != "" {
|
||||
mode = tracking.ToolMode(task.Tracking.MLflow.Mode)
|
||||
}
|
||||
configs["mlflow"] = tracking.ToolConfig{
|
||||
Enabled: true,
|
||||
Mode: mode,
|
||||
Settings: map[string]any{
|
||||
"job_name": task.JobName,
|
||||
"tracking_uri": task.Tracking.MLflow.TrackingURI,
|
||||
},
|
||||
}
|
||||
}
|
||||
|
||||
if task.Tracking.TensorBoard != nil && task.Tracking.TensorBoard.Enabled {
|
||||
mode := tracking.ModeSidecar
|
||||
if task.Tracking.TensorBoard.Mode != "" {
|
||||
mode = tracking.ToolMode(task.Tracking.TensorBoard.Mode)
|
||||
}
|
||||
configs["tensorboard"] = tracking.ToolConfig{
|
||||
Enabled: true,
|
||||
Mode: mode,
|
||||
Settings: map[string]any{
|
||||
"job_name": task.JobName,
|
||||
},
|
||||
}
|
||||
}
|
||||
|
||||
if task.Tracking.Wandb != nil && task.Tracking.Wandb.Enabled {
|
||||
mode := tracking.ModeRemote
|
||||
if task.Tracking.Wandb.Mode != "" {
|
||||
mode = tracking.ToolMode(task.Tracking.Wandb.Mode)
|
||||
}
|
||||
configs["wandb"] = tracking.ToolConfig{
|
||||
Enabled: true,
|
||||
Mode: mode,
|
||||
Settings: map[string]any{
|
||||
"api_key": task.Tracking.Wandb.APIKey,
|
||||
"project": task.Tracking.Wandb.Project,
|
||||
"entity": task.Tracking.Wandb.Entity,
|
||||
},
|
||||
}
|
||||
}
|
||||
|
||||
if len(configs) == 0 {
|
||||
return make(map[string]string), nil
|
||||
}
|
||||
|
||||
env, err := e.registry.ProvisionAll(ctx, task.ID, configs)
|
||||
if err != nil {
|
||||
return nil, &errtypes.TaskExecutionError{
|
||||
TaskID: task.ID,
|
||||
JobName: task.JobName,
|
||||
Phase: "tracking_provision",
|
||||
Err: err,
|
||||
}
|
||||
}
|
||||
|
||||
return env, nil
|
||||
}
|
||||
|
||||
func (e *ContainerExecutor) teardownTracking(ctx context.Context, task *queue.Task) {
|
||||
if e.registry != nil && task.Tracking != nil {
|
||||
e.registry.TeardownAll(ctx, task.ID)
|
||||
}
|
||||
}
|
||||
|
||||
func (e *ContainerExecutor) setupVolumes(trackingEnv map[string]string, outputDir string) map[string]string {
|
||||
volumes := make(map[string]string)
|
||||
|
||||
if val, ok := trackingEnv["TENSORBOARD_HOST_LOG_DIR"]; ok {
|
||||
containerPath := "/tracking/tensorboard"
|
||||
volumes[val] = containerPath + ":rw"
|
||||
trackingEnv["TENSORBOARD_LOG_DIR"] = containerPath
|
||||
delete(trackingEnv, "TENSORBOARD_HOST_LOG_DIR")
|
||||
}
|
||||
|
||||
cacheRoot := filepath.Join(e.config.BasePath, ".cache")
|
||||
os.MkdirAll(cacheRoot, 0755)
|
||||
volumes[cacheRoot] = "/workspace/.cache:rw"
|
||||
|
||||
defaultEnv := map[string]string{
|
||||
"HF_HOME": "/workspace/.cache/huggingface",
|
||||
"TRANSFORMERS_CACHE": "/workspace/.cache/huggingface/hub",
|
||||
"HF_DATASETS_CACHE": "/workspace/.cache/huggingface/datasets",
|
||||
"TORCH_HOME": "/workspace/.cache/torch",
|
||||
"TORCH_HUB_DIR": "/workspace/.cache/torch/hub",
|
||||
"KERAS_HOME": "/workspace/.cache/keras",
|
||||
"CUDA_CACHE_PATH": "/workspace/.cache/cuda",
|
||||
"PIP_CACHE_DIR": "/workspace/.cache/pip",
|
||||
}
|
||||
|
||||
for k, v := range defaultEnv {
|
||||
if _, ok := trackingEnv[k]; !ok {
|
||||
trackingEnv[k] = v
|
||||
}
|
||||
}
|
||||
|
||||
return volumes
|
||||
}
|
||||
|
||||
func (e *ContainerExecutor) selectImage(ctx context.Context, task *queue.Task) string {
|
||||
selectedImage := e.config.PodmanImage
|
||||
|
||||
if e.envPool == nil || task.Metadata == nil {
|
||||
return selectedImage
|
||||
}
|
||||
|
||||
depsSHA := strings.TrimSpace(task.Metadata["deps_manifest_sha256"])
|
||||
if depsSHA == "" {
|
||||
return selectedImage
|
||||
}
|
||||
|
||||
warmTag, err := e.envPool.WarmImageTag(depsSHA)
|
||||
if err != nil {
|
||||
return selectedImage
|
||||
}
|
||||
|
||||
inspectCtx, cancel := context.WithTimeout(ctx, 2*time.Second)
|
||||
defer cancel()
|
||||
|
||||
exists, err := e.envPool.ImageExists(inspectCtx, warmTag)
|
||||
if err == nil && exists {
|
||||
return warmTag
|
||||
}
|
||||
|
||||
return selectedImage
|
||||
}
|
||||
|
||||
func (e *ContainerExecutor) runPodman(
|
||||
ctx context.Context,
|
||||
task *queue.Task,
|
||||
env interfaces.ExecutionEnv,
|
||||
jobPaths *storage.JobPaths,
|
||||
podmanCfg container.PodmanConfig,
|
||||
selectedImage string,
|
||||
) error {
|
||||
scriptPath := filepath.Join(podmanCfg.ContainerWorkspace, e.config.TrainScript)
|
||||
|
||||
manifestName, err := selectDependencyManifest(filepath.Join(env.OutputDir, "code"))
|
||||
if err != nil {
|
||||
return &errtypes.TaskExecutionError{
|
||||
TaskID: task.ID,
|
||||
JobName: task.JobName,
|
||||
Phase: "validation",
|
||||
Err: err,
|
||||
}
|
||||
}
|
||||
depsPath := filepath.Join(podmanCfg.ContainerWorkspace, manifestName)
|
||||
|
||||
var extraArgs []string
|
||||
if task.Args != "" {
|
||||
extraArgs = strings.Fields(task.Args)
|
||||
}
|
||||
|
||||
// Open log file
|
||||
logFileHandle, err := fileutil.SecureOpenFile(env.LogFile, os.O_CREATE|os.O_WRONLY|os.O_APPEND, 0600)
|
||||
if err != nil {
|
||||
e.logger.Warn("failed to open log file for podman output", "path", env.LogFile, "error", err)
|
||||
}
|
||||
|
||||
// Build command
|
||||
podmanCmd := container.BuildPodmanCommand(ctx, podmanCfg, scriptPath, depsPath, extraArgs)
|
||||
|
||||
// Update manifest
|
||||
if e.writer != nil {
|
||||
e.writer.Upsert(env.OutputDir, task, func(m *manifest.RunManifest) {
|
||||
m.PodmanImage = strings.TrimSpace(selectedImage)
|
||||
m.Command = podmanCmd.Path
|
||||
if len(podmanCmd.Args) > 1 {
|
||||
m.Args = strings.Join(podmanCmd.Args[1:], " ")
|
||||
} else {
|
||||
m.Args = ""
|
||||
}
|
||||
})
|
||||
}
|
||||
|
||||
if logFileHandle != nil {
|
||||
podmanCmd.Stdout = logFileHandle
|
||||
podmanCmd.Stderr = logFileHandle
|
||||
defer logFileHandle.Close()
|
||||
}
|
||||
|
||||
e.logger.Info("executing podman job",
|
||||
"job", task.JobName,
|
||||
"image", selectedImage,
|
||||
"workspace", podmanCfg.Workspace,
|
||||
"results", podmanCfg.Results)
|
||||
|
||||
// Execute
|
||||
containerStart := time.Now()
|
||||
if err := podmanCmd.Run(); err != nil {
|
||||
containerDuration := time.Since(containerStart)
|
||||
return e.handleFailure(task, env, jobPaths, err, containerDuration)
|
||||
}
|
||||
|
||||
containerDuration := time.Since(containerStart)
|
||||
return e.handleSuccess(task, env, jobPaths, containerDuration)
|
||||
}
|
||||
|
||||
func (e *ContainerExecutor) handleFailure(
|
||||
task *queue.Task,
|
||||
env interfaces.ExecutionEnv,
|
||||
jobPaths *storage.JobPaths,
|
||||
runErr error,
|
||||
duration time.Duration,
|
||||
) error {
|
||||
if e.writer != nil {
|
||||
e.writer.Upsert(env.OutputDir, task, func(m *manifest.RunManifest) {
|
||||
now := time.Now().UTC()
|
||||
exitCode := 1
|
||||
m.ExecutionDurationMS = duration.Milliseconds()
|
||||
m.MarkFinished(now, &exitCode, runErr)
|
||||
})
|
||||
}
|
||||
|
||||
failedDir := filepath.Join(jobPaths.FailedPath(), task.JobName)
|
||||
os.MkdirAll(filepath.Dir(failedDir), 0750)
|
||||
os.RemoveAll(failedDir)
|
||||
|
||||
telemetry.ExecWithMetrics(
|
||||
e.logger,
|
||||
"move failed job",
|
||||
100*time.Millisecond,
|
||||
func() (string, error) {
|
||||
if err := os.Rename(env.OutputDir, failedDir); err != nil {
|
||||
return "", fmt.Errorf("rename to failed failed: %w", err)
|
||||
}
|
||||
return "", nil
|
||||
})
|
||||
|
||||
return fmt.Errorf("execution failed: %w", runErr)
|
||||
}
|
||||
|
||||
func (e *ContainerExecutor) handleSuccess(
|
||||
task *queue.Task,
|
||||
env interfaces.ExecutionEnv,
|
||||
jobPaths *storage.JobPaths,
|
||||
duration time.Duration,
|
||||
) error {
|
||||
if e.writer != nil {
|
||||
e.writer.Upsert(env.OutputDir, task, func(m *manifest.RunManifest) {
|
||||
m.ExecutionDurationMS = duration.Milliseconds()
|
||||
})
|
||||
}
|
||||
|
||||
finalizeStart := time.Now()
|
||||
finishedDir := filepath.Join(jobPaths.FinishedPath(), task.JobName)
|
||||
|
||||
if e.writer != nil {
|
||||
e.writer.Upsert(env.OutputDir, task, func(m *manifest.RunManifest) {
|
||||
now := time.Now().UTC()
|
||||
exitCode := 0
|
||||
m.FinalizeDurationMS = time.Since(finalizeStart).Milliseconds()
|
||||
m.MarkFinished(now, &exitCode, nil)
|
||||
})
|
||||
}
|
||||
|
||||
os.MkdirAll(filepath.Dir(finishedDir), 0750)
|
||||
os.RemoveAll(finishedDir)
|
||||
|
||||
telemetry.ExecWithMetrics(
|
||||
e.logger,
|
||||
"finalize job",
|
||||
100*time.Millisecond,
|
||||
func() (string, error) {
|
||||
if err := os.Rename(env.OutputDir, finishedDir); err != nil {
|
||||
return "", fmt.Errorf("rename to finished failed: %w", err)
|
||||
}
|
||||
return "", nil
|
||||
})
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
func selectDependencyManifest(filesPath string) (string, error) {
|
||||
if filesPath == "" {
|
||||
return "", fmt.Errorf("missing files path")
|
||||
}
|
||||
candidates := []string{
|
||||
"environment.yml",
|
||||
"environment.yaml",
|
||||
"poetry.lock",
|
||||
"pyproject.toml",
|
||||
"requirements.txt",
|
||||
}
|
||||
for _, name := range candidates {
|
||||
p := filepath.Join(filesPath, name)
|
||||
if _, err := os.Stat(p); err == nil {
|
||||
if name == "poetry.lock" {
|
||||
pyprojectPath := filepath.Join(filesPath, "pyproject.toml")
|
||||
if _, err := os.Stat(pyprojectPath); err != nil {
|
||||
return "", fmt.Errorf(
|
||||
"poetry.lock found but pyproject.toml missing (required for Poetry projects)")
|
||||
}
|
||||
}
|
||||
return name, nil
|
||||
}
|
||||
}
|
||||
return "", fmt.Errorf(
|
||||
"missing dependency manifest (supported: environment.yml, environment.yaml, " +
|
||||
"poetry.lock, pyproject.toml, requirements.txt)")
|
||||
}
|
||||
145
internal/worker/executor/local.go
Normal file
145
internal/worker/executor/local.go
Normal file
|
|
@ -0,0 +1,145 @@
|
|||
// Package executor provides job execution implementations
|
||||
package executor
|
||||
|
||||
import (
|
||||
"context"
|
||||
"fmt"
|
||||
"log"
|
||||
"os"
|
||||
"os/exec"
|
||||
"path/filepath"
|
||||
"strings"
|
||||
|
||||
"github.com/jfraeys/fetch_ml/internal/errtypes"
|
||||
"github.com/jfraeys/fetch_ml/internal/fileutil"
|
||||
"github.com/jfraeys/fetch_ml/internal/logging"
|
||||
"github.com/jfraeys/fetch_ml/internal/manifest"
|
||||
"github.com/jfraeys/fetch_ml/internal/queue"
|
||||
"github.com/jfraeys/fetch_ml/internal/worker/interfaces"
|
||||
)
|
||||
|
||||
// LocalExecutor executes jobs locally using bash scripts
|
||||
type LocalExecutor struct {
|
||||
logger *logging.Logger
|
||||
writer interfaces.ManifestWriter
|
||||
}
|
||||
|
||||
// NewLocalExecutor creates a new local job executor
|
||||
func NewLocalExecutor(logger *logging.Logger, writer interfaces.ManifestWriter) *LocalExecutor {
|
||||
return &LocalExecutor{
|
||||
logger: logger,
|
||||
writer: writer,
|
||||
}
|
||||
}
|
||||
|
||||
// Execute runs a job locally
|
||||
func (e *LocalExecutor) Execute(ctx context.Context, task *queue.Task, env interfaces.ExecutionEnv) error {
|
||||
// Generate and write script
|
||||
scriptContent := generateScript(task)
|
||||
scriptPath := filepath.Join(env.OutputDir, "run.sh")
|
||||
|
||||
if err := os.WriteFile(scriptPath, []byte(scriptContent), 0600); err != nil {
|
||||
return &errtypes.TaskExecutionError{
|
||||
TaskID: task.ID,
|
||||
JobName: task.JobName,
|
||||
Phase: "execution",
|
||||
Err: fmt.Errorf("failed to write script: %w", err),
|
||||
}
|
||||
}
|
||||
|
||||
// Update manifest
|
||||
if e.writer != nil {
|
||||
e.writer.Upsert(env.OutputDir, task, func(m *manifest.RunManifest) {
|
||||
m.Command = "bash"
|
||||
m.Args = scriptPath
|
||||
})
|
||||
}
|
||||
|
||||
// Open log file
|
||||
logFileHandle, err := fileutil.SecureOpenFile(env.LogFile, os.O_CREATE|os.O_WRONLY|os.O_APPEND, 0600)
|
||||
if err != nil {
|
||||
e.logger.Warn("failed to open log file for local output", "path", env.LogFile, "error", err)
|
||||
return &errtypes.TaskExecutionError{
|
||||
TaskID: task.ID,
|
||||
JobName: task.JobName,
|
||||
Phase: "execution",
|
||||
Err: fmt.Errorf("failed to open log file: %w", err),
|
||||
}
|
||||
}
|
||||
defer func() {
|
||||
if err := logFileHandle.Close(); err != nil {
|
||||
log.Printf("Warning: failed to close log file: %v", err)
|
||||
}
|
||||
}()
|
||||
|
||||
// Execute the script
|
||||
localCmd := exec.CommandContext(ctx, "bash", scriptPath)
|
||||
localEnv := os.Environ()
|
||||
|
||||
if strings.TrimSpace(env.GPUEnvVar) != "" {
|
||||
localEnv = append(localEnv, fmt.Sprintf("%s=%s", env.GPUEnvVar, strings.TrimSpace(env.GPUDevicesStr)))
|
||||
}
|
||||
|
||||
snap := filepath.Join(env.OutputDir, "snapshot")
|
||||
if info, err := os.Stat(snap); err == nil && info.IsDir() {
|
||||
localEnv = append(localEnv, fmt.Sprintf("FETCH_ML_SNAPSHOT_DIR=%s", snap))
|
||||
if strings.TrimSpace(task.SnapshotID) != "" {
|
||||
localEnv = append(localEnv, fmt.Sprintf("FETCH_ML_SNAPSHOT_ID=%s", strings.TrimSpace(task.SnapshotID)))
|
||||
}
|
||||
}
|
||||
|
||||
localCmd.Env = localEnv
|
||||
localCmd.Stdout = logFileHandle
|
||||
localCmd.Stderr = logFileHandle
|
||||
|
||||
e.logger.Info("executing local job",
|
||||
"job", task.JobName,
|
||||
"task_id", task.ID,
|
||||
"script", scriptPath)
|
||||
|
||||
if err := localCmd.Run(); err != nil {
|
||||
return &errtypes.TaskExecutionError{
|
||||
TaskID: task.ID,
|
||||
JobName: task.JobName,
|
||||
Phase: "execution",
|
||||
Err: fmt.Errorf("execution failed: %w", err),
|
||||
}
|
||||
}
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
// generateScript creates a bash script for the experiment
|
||||
func generateScript(task *queue.Task) string {
|
||||
return `#!/bin/bash
|
||||
set -e
|
||||
|
||||
echo "Starting experiment: ` + task.JobName + `"
|
||||
echo "Task ID: ` + task.ID + `"
|
||||
echo "Timestamp: $(date)"
|
||||
|
||||
# Simulate ML experiment
|
||||
echo "Loading data..."
|
||||
sleep 1
|
||||
|
||||
echo "Training model..."
|
||||
sleep 2
|
||||
|
||||
echo "Evaluating model..."
|
||||
sleep 1
|
||||
|
||||
# Generate results
|
||||
ACCURACY=0.95
|
||||
LOSS=0.05
|
||||
EPOCHS=10
|
||||
|
||||
echo ""
|
||||
echo "=== EXPERIMENT RESULTS ==="
|
||||
echo "Accuracy: $ACCURACY"
|
||||
echo "Loss: $LOSS"
|
||||
echo "Epochs: $EPOCHS"
|
||||
echo "Status: SUCCESS"
|
||||
echo "========================="
|
||||
echo "Experiment completed successfully!"
|
||||
`
|
||||
}
|
||||
170
internal/worker/executor/runner.go
Normal file
170
internal/worker/executor/runner.go
Normal file
|
|
@ -0,0 +1,170 @@
|
|||
// Package executor provides job execution implementations
|
||||
package executor
|
||||
|
||||
import (
|
||||
"context"
|
||||
"fmt"
|
||||
"os"
|
||||
"path/filepath"
|
||||
"time"
|
||||
|
||||
"github.com/jfraeys/fetch_ml/internal/errtypes"
|
||||
"github.com/jfraeys/fetch_ml/internal/logging"
|
||||
"github.com/jfraeys/fetch_ml/internal/manifest"
|
||||
"github.com/jfraeys/fetch_ml/internal/queue"
|
||||
"github.com/jfraeys/fetch_ml/internal/storage"
|
||||
"github.com/jfraeys/fetch_ml/internal/worker/execution"
|
||||
"github.com/jfraeys/fetch_ml/internal/worker/interfaces"
|
||||
)
|
||||
|
||||
// JobRunner orchestrates job execution by delegating to specific executors
|
||||
type JobRunner struct {
|
||||
local interfaces.JobExecutor
|
||||
container interfaces.JobExecutor
|
||||
writer interfaces.ManifestWriter
|
||||
logger *logging.Logger
|
||||
}
|
||||
|
||||
// NewJobRunner creates a new job runner
|
||||
func NewJobRunner(
|
||||
local interfaces.JobExecutor,
|
||||
container interfaces.JobExecutor,
|
||||
writer interfaces.ManifestWriter,
|
||||
logger *logging.Logger,
|
||||
) *JobRunner {
|
||||
return &JobRunner{
|
||||
local: local,
|
||||
container: container,
|
||||
writer: writer,
|
||||
logger: logger,
|
||||
}
|
||||
}
|
||||
|
||||
// ExecutionMode determines which executor to use
|
||||
type ExecutionMode int
|
||||
|
||||
const (
|
||||
// ModeAuto selects based on configuration
|
||||
ModeAuto ExecutionMode = iota
|
||||
// ModeLocal forces local execution
|
||||
ModeLocal
|
||||
// ModeContainer forces container execution
|
||||
ModeContainer
|
||||
)
|
||||
|
||||
// Run executes a job with the given mode
|
||||
func (r *JobRunner) Run(
|
||||
ctx context.Context,
|
||||
task *queue.Task,
|
||||
basePath string,
|
||||
mode ExecutionMode,
|
||||
localMode bool,
|
||||
gpuEnv interfaces.ExecutionEnv,
|
||||
) error {
|
||||
// 1. Setup directories
|
||||
jobDir, outputDir, logFile, err := execution.SetupJobDirectories(basePath, task.JobName, task.ID)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
// 2. Create execution environment
|
||||
env := interfaces.ExecutionEnv{
|
||||
JobDir: jobDir,
|
||||
OutputDir: outputDir,
|
||||
LogFile: logFile,
|
||||
GPUDevices: gpuEnv.GPUDevices,
|
||||
GPUEnvVar: gpuEnv.GPUEnvVar,
|
||||
GPUDevicesStr: gpuEnv.GPUDevicesStr,
|
||||
}
|
||||
|
||||
// 3. Select executor
|
||||
executor := r.selectExecutor(mode, localMode)
|
||||
|
||||
// 4. Pre-execution manifest update
|
||||
if r.writer != nil {
|
||||
r.writer.Upsert(outputDir, task, func(m *manifest.RunManifest) {
|
||||
m.MarkStarted(time.Now().UTC())
|
||||
m.GPUDevices = gpuEnv.GPUDevices
|
||||
if gpuEnv.GPUEnvVar != "" {
|
||||
m.Metadata["gpu_visible_devices"] = gpuEnv.GPUDevicesStr
|
||||
m.Metadata["gpu_visible_env"] = gpuEnv.GPUEnvVar
|
||||
}
|
||||
})
|
||||
}
|
||||
|
||||
// 5. Execute
|
||||
execErr := executor.Execute(ctx, task, env)
|
||||
|
||||
// 6. Post-execution cleanup
|
||||
return r.finalize(task, outputDir, basePath, execErr)
|
||||
}
|
||||
|
||||
func (r *JobRunner) selectExecutor(mode ExecutionMode, localMode bool) interfaces.JobExecutor {
|
||||
switch mode {
|
||||
case ModeLocal:
|
||||
return r.local
|
||||
case ModeContainer:
|
||||
return r.container
|
||||
default: // ModeAuto
|
||||
if localMode {
|
||||
return r.local
|
||||
}
|
||||
return r.container
|
||||
}
|
||||
}
|
||||
|
||||
func (r *JobRunner) finalize(
|
||||
task *queue.Task,
|
||||
outputDir string,
|
||||
basePath string,
|
||||
execErr error,
|
||||
) error {
|
||||
jobPaths := storage.NewJobPaths(basePath)
|
||||
|
||||
if execErr != nil {
|
||||
// Handle failure
|
||||
failedDir := filepath.Join(jobPaths.FailedPath(), task.JobName)
|
||||
os.MkdirAll(filepath.Dir(failedDir), 0750)
|
||||
os.RemoveAll(failedDir)
|
||||
|
||||
if r.writer != nil {
|
||||
r.writer.Upsert(outputDir, task, func(m *manifest.RunManifest) {
|
||||
now := time.Now().UTC()
|
||||
exitCode := 1
|
||||
m.MarkFinished(now, &exitCode, execErr)
|
||||
})
|
||||
}
|
||||
|
||||
os.Rename(outputDir, failedDir)
|
||||
|
||||
if taskErr, ok := execErr.(*errtypes.TaskExecutionError); ok {
|
||||
return taskErr
|
||||
}
|
||||
return &errtypes.TaskExecutionError{
|
||||
TaskID: task.ID,
|
||||
JobName: task.JobName,
|
||||
Phase: "execution",
|
||||
Err: execErr,
|
||||
}
|
||||
}
|
||||
|
||||
// Handle success
|
||||
finishedDir := filepath.Join(jobPaths.FinishedPath(), task.JobName)
|
||||
os.MkdirAll(filepath.Dir(finishedDir), 0750)
|
||||
os.RemoveAll(finishedDir)
|
||||
|
||||
if r.writer != nil {
|
||||
r.writer.Upsert(outputDir, task, func(m *manifest.RunManifest) {
|
||||
now := time.Now().UTC()
|
||||
exitCode := 0
|
||||
m.MarkFinished(now, &exitCode, nil)
|
||||
})
|
||||
}
|
||||
|
||||
if err := os.Rename(outputDir, finishedDir); err != nil {
|
||||
r.logger.Warn("failed to move job to finished dir", "job", task.JobName, "error", err)
|
||||
return fmt.Errorf("failed to move job to finished: %w", err)
|
||||
}
|
||||
|
||||
return nil
|
||||
}
|
||||
Loading…
Reference in a new issue