fetch_ml/internal/worker/lifecycle/runloop.go
Jeremie Fraeys 61081655d2
feat: enhance worker execution and scheduler service templates
- Refactor worker configuration management
- Improve container executor lifecycle handling
- Update runloop and worker core logic
- Enhance scheduler service template generation
- Remove obsolete 'scheduler' symlink/directory
2026-03-04 13:24:20 -05:00

231 lines
5.6 KiB
Go

// Package lifecycle provides worker lifecycle management types and interfaces
package lifecycle
import (
"context"
"sync"
"time"
"github.com/jfraeys/fetch_ml/internal/queue"
)
// TaskExecutor defines the contract for executing tasks
type TaskExecutor interface {
// Execute runs a task and returns any error
Execute(ctx context.Context, task *queue.Task) error
}
// RunLoopConfig holds configuration for the run loop
type RunLoopConfig struct {
WorkerID string
MaxWorkers int
PollInterval time.Duration
TaskLeaseDuration time.Duration
HeartbeatInterval time.Duration
GracefulTimeout time.Duration
PrewarmEnabled bool
}
// RunLoop manages the main worker processing loop
type RunLoop struct {
queue queue.Backend
executor TaskExecutor
metrics MetricsRecorder
logger Logger
ctx context.Context
stateMgr *StateManager
running map[string]context.CancelFunc
cancel context.CancelFunc
config RunLoopConfig
runningMu sync.RWMutex
}
// MetricsRecorder defines the contract for recording metrics
type MetricsRecorder interface {
RecordTaskStart()
RecordTaskCompletion()
RecordTaskFailure()
RecordQueueLatency(duration time.Duration)
}
// Logger defines the contract for logging
type Logger interface {
Info(msg string, keys ...any)
Warn(msg string, keys ...any)
Error(msg string, keys ...any)
Debug(msg string, keys ...any)
}
// NewRunLoop creates a new run loop
func NewRunLoop(
config RunLoopConfig,
queue queue.Backend,
executor TaskExecutor,
metrics MetricsRecorder,
logger Logger,
stateMgr *StateManager,
) *RunLoop {
ctx, cancel := context.WithCancel(context.Background())
return &RunLoop{
config: config,
queue: queue,
executor: executor,
metrics: metrics,
logger: logger,
stateMgr: stateMgr,
running: make(map[string]context.CancelFunc),
ctx: ctx,
cancel: cancel,
}
}
// Start begins the main processing loop
func (r *RunLoop) Start() {
r.logger.Info("runloop started",
"worker_id", r.config.WorkerID,
"max_concurrent", r.config.MaxWorkers,
"poll_interval", r.config.PollInterval)
for {
select {
case <-r.ctx.Done():
r.logger.Info("shutdown signal received, waiting for tasks")
r.waitForTasks()
return
default:
}
if r.runningCount() >= r.config.MaxWorkers {
time.Sleep(50 * time.Millisecond)
continue
}
queueStart := time.Now()
blockTimeout := r.config.PollInterval
task, err := r.queue.GetNextTaskWithLeaseBlocking(
r.config.WorkerID,
r.config.TaskLeaseDuration,
blockTimeout,
)
queueLatency := time.Since(queueStart)
r.metrics.RecordQueueLatency(queueLatency)
if err != nil {
if err == context.DeadlineExceeded {
continue
}
r.logger.Error("error fetching task", "error", err)
continue
}
if task == nil {
continue
}
r.reserveRunningSlot(task.ID)
go r.executeTask(task)
}
}
// Stop gracefully shuts down the run loop
func (r *RunLoop) Stop() {
r.cancel()
r.waitForTasks()
r.logger.Info("runloop stopped", "worker_id", r.config.WorkerID)
}
func (r *RunLoop) reserveRunningSlot(taskID string) {
r.runningMu.Lock()
defer r.runningMu.Unlock()
_, cancel := context.WithCancel(r.ctx)
r.running[taskID] = cancel
}
func (r *RunLoop) releaseRunningSlot(taskID string) {
r.runningMu.Lock()
defer r.runningMu.Unlock()
if cancel, ok := r.running[taskID]; ok {
cancel()
delete(r.running, taskID)
}
}
// RunningCount returns the number of currently running tasks
func (r *RunLoop) RunningCount() int {
r.runningMu.RLock()
defer r.runningMu.RUnlock()
return len(r.running)
}
func (r *RunLoop) runningCount() int {
r.runningMu.RLock()
defer r.runningMu.RUnlock()
return len(r.running)
}
func (r *RunLoop) waitForTasks() {
timeout := time.After(5 * time.Minute)
ticker := time.NewTicker(1 * time.Second)
defer ticker.Stop()
for {
select {
case <-timeout:
r.logger.Warn("shutdown timeout, force stopping", "running_tasks", r.runningCount())
return
case <-ticker.C:
if r.runningCount() == 0 {
r.logger.Info("all tasks completed, shutting down")
return
}
r.logger.Debug("waiting for tasks to complete", "remaining", r.runningCount())
}
}
}
func (r *RunLoop) executeTask(task *queue.Task) {
defer r.releaseRunningSlot(task.ID)
// Transition to preparing state
if r.stateMgr != nil {
if err := r.stateMgr.Transition(task, StatePreparing); err != nil {
r.logger.Error("failed to transition task state", "task_id", task.ID, "error", err)
}
}
r.metrics.RecordTaskStart()
defer r.metrics.RecordTaskCompletion()
r.logger.Info("starting task", "task_id", task.ID, "job_name", task.JobName)
// Transition to running state
if r.stateMgr != nil {
if err := r.stateMgr.Transition(task, StateRunning); err != nil {
r.logger.Error("failed to transition task state", "task_id", task.ID, "error", err)
}
}
taskCtx, cancel := context.WithTimeout(r.ctx, task.RemainingTime)
defer cancel()
if err := r.executor.Execute(taskCtx, task); err != nil {
r.logger.Error("task execution failed", "task_id", task.ID, "error", err)
r.metrics.RecordTaskFailure()
// Transition to failed state
if r.stateMgr != nil {
if stateErr := r.stateMgr.Transition(task, StateFailed); stateErr != nil {
r.logger.Error("failed to transition task to failed state", "task_id", task.ID, "error", stateErr)
}
}
} else {
// Transition to completed state
if r.stateMgr != nil {
if err := r.stateMgr.Transition(task, StateCompleted); err != nil {
r.logger.Error("failed to transition task to completed state", "task_id", task.ID, "error", err)
}
}
}
_ = r.queue.ReleaseLease(task.ID, r.config.WorkerID)
}