fetch_ml/internal/worker/executor/runner.go
Jeremie Fraeys 0b5e99f720
refactor(scheduler,worker): improve service management and GPU detection
Scheduler enhancements:
- auth.go: Group membership validation in authentication
- hub.go: Task distribution with group affinity
- port_allocator.go: Dynamic port allocation with conflict resolution
- scheduler_conn.go: Connection pooling and retry logic
- service_manager.go: Lifecycle management for scheduler services
- service_templates.go: Template-based service configuration
- state.go: Persistent state management with recovery

Worker improvements:
- config.go: Extended configuration for task visibility rules
- execution/setup.go: Sandboxed execution environment setup
- executor/container.go: Container runtime integration
- executor/runner.go: Task runner with visibility enforcement
- gpu_detector.go: Robust GPU detection (NVIDIA, AMD, Apple Silicon, CPU fallback)
- integrity/validate.go: Data integrity validation
- lifecycle/runloop.go: Improved runloop with graceful shutdown
- lifecycle/service_manager.go: Service lifecycle coordination
- process/isolation.go + isolation_unix.go: Process isolation with namespaces/cgroups
- tenant/manager.go: Multi-tenant resource isolation
- tenant/middleware.go: Tenant context propagation
- worker.go: Core worker with group-scoped task execution
2026-03-08 13:03:15 -04:00

201 lines
5.3 KiB
Go

// Package executor provides job execution implementations
package executor
import (
"context"
"fmt"
"os"
"path/filepath"
"time"
"github.com/jfraeys/fetch_ml/internal/errtypes"
"github.com/jfraeys/fetch_ml/internal/logging"
"github.com/jfraeys/fetch_ml/internal/manifest"
"github.com/jfraeys/fetch_ml/internal/queue"
"github.com/jfraeys/fetch_ml/internal/storage"
"github.com/jfraeys/fetch_ml/internal/worker/execution"
"github.com/jfraeys/fetch_ml/internal/worker/interfaces"
)
// JobRunner orchestrates job execution by delegating to specific executors
type JobRunner struct {
local interfaces.JobExecutor
container interfaces.JobExecutor
writer interfaces.ManifestWriter
logger *logging.Logger
}
// NewJobRunner creates a new job runner
func NewJobRunner(
local interfaces.JobExecutor,
container interfaces.JobExecutor,
writer interfaces.ManifestWriter,
logger *logging.Logger,
) *JobRunner {
return &JobRunner{
local: local,
container: container,
writer: writer,
logger: logger,
}
}
// ExecutionMode determines which executor to use
type ExecutionMode int
const (
// ModeAuto selects based on configuration
ModeAuto ExecutionMode = iota
// ModeLocal forces local execution
ModeLocal
// ModeContainer forces container execution
ModeContainer
)
// Run executes a job with the given mode
func (r *JobRunner) Run(
ctx context.Context,
task *queue.Task,
basePath string,
mode ExecutionMode,
localMode bool,
gpuEnv interfaces.ExecutionEnv,
) error {
// 1. Setup directories
jobDir, outputDir, logFile, err := execution.SetupJobDirectories(basePath, task.JobName, task.ID)
if err != nil {
return err
}
// 2. Create execution environment
env := interfaces.ExecutionEnv{
JobDir: jobDir,
OutputDir: outputDir,
LogFile: logFile,
GPUDevices: gpuEnv.GPUDevices,
GPUEnvVar: gpuEnv.GPUEnvVar,
GPUDevicesStr: gpuEnv.GPUDevicesStr,
}
// 3. Select executor
executor := r.selectExecutor(mode, localMode)
// 3.5 Validate manifest completeness before execution
if r.writer != nil {
// Load current manifest and validate
if m, err := manifest.LoadFromDir(outputDir); err == nil {
validator := manifest.NewValidator()
if err := validator.Validate(m); err != nil {
r.logger.Error("manifest validation failed - execution blocked",
"task", task.ID,
"error", err)
return &errtypes.TaskExecutionError{
TaskID: task.ID,
JobName: task.JobName,
Phase: "validation",
Message: "manifest incomplete - execution blocked",
Err: err,
Recoverable: false, // Can't retry - manifest is missing data
}
}
}
}
// 4. Pre-execution manifest update
if r.writer != nil {
r.writer.Upsert(outputDir, task, func(m *manifest.RunManifest) {
m.MarkStarted(time.Now().UTC())
m.GPUDevices = gpuEnv.GPUDevices
if gpuEnv.GPUEnvVar != "" {
m.Metadata["gpu_visible_devices"] = gpuEnv.GPUDevicesStr
m.Metadata["gpu_visible_env"] = gpuEnv.GPUEnvVar
}
})
}
// 5. Execute
execErr := executor.Execute(ctx, task, env)
// 6. Post-execution cleanup
return r.finalize(task, outputDir, basePath, execErr)
}
func (r *JobRunner) selectExecutor(mode ExecutionMode, localMode bool) interfaces.JobExecutor {
switch mode {
case ModeLocal:
return r.local
case ModeContainer:
return r.container
default: // ModeAuto
if localMode {
return r.local
}
return r.container
}
}
func (r *JobRunner) finalize(
task *queue.Task,
outputDir string,
basePath string,
execErr error,
) error {
jobPaths := storage.NewJobPaths(basePath)
if execErr != nil {
// Handle failure
failedDir := filepath.Join(jobPaths.FailedPath(), task.JobName)
if err := os.MkdirAll(filepath.Dir(failedDir), 0750); err != nil {
r.logger.Warn("failed to create failed directory", "path", filepath.Dir(failedDir), "error", err)
}
if err := os.RemoveAll(failedDir); err != nil {
r.logger.Warn("failed to remove failed directory", "path", failedDir, "error", err)
}
if r.writer != nil {
r.writer.Upsert(outputDir, task, func(m *manifest.RunManifest) {
now := time.Now().UTC()
exitCode := 1
m.MarkFinished(now, &exitCode, execErr)
})
}
if err := os.Rename(outputDir, failedDir); err != nil {
r.logger.Warn("failed to move job to failed dir", "job", task.JobName, "error", err)
}
if taskErr, ok := execErr.(*errtypes.TaskExecutionError); ok {
return taskErr
}
return &errtypes.TaskExecutionError{
TaskID: task.ID,
JobName: task.JobName,
Phase: "execution",
Err: execErr,
}
}
// Handle success
finishedDir := filepath.Join(jobPaths.FinishedPath(), task.JobName)
if err := os.MkdirAll(filepath.Dir(finishedDir), 0750); err != nil {
r.logger.Warn("failed to create finished directory", "path", filepath.Dir(finishedDir), "error", err)
}
if err := os.RemoveAll(finishedDir); err != nil {
r.logger.Warn("failed to remove finished directory", "path", finishedDir, "error", err)
}
if r.writer != nil {
r.writer.Upsert(outputDir, task, func(m *manifest.RunManifest) {
now := time.Now().UTC()
exitCode := 0
m.MarkFinished(now, &exitCode, nil)
})
}
if err := os.Rename(outputDir, finishedDir); err != nil {
r.logger.Warn("failed to move job to finished dir", "job", task.JobName, "error", err)
return fmt.Errorf("failed to move job to finished: %w", err)
}
return nil
}