// Package lifecycle provides worker lifecycle management types and interfaces package lifecycle import ( "context" "sync" "time" "github.com/jfraeys/fetch_ml/internal/queue" ) // TaskExecutor defines the contract for executing tasks type TaskExecutor interface { // Execute runs a task and returns any error Execute(ctx context.Context, task *queue.Task) error } // RunLoopConfig holds configuration for the run loop type RunLoopConfig struct { WorkerID string MaxWorkers int PollInterval time.Duration TaskLeaseDuration time.Duration HeartbeatInterval time.Duration GracefulTimeout time.Duration PrewarmEnabled bool } // RunLoop manages the main worker processing loop type RunLoop struct { queue queue.Backend executor TaskExecutor metrics MetricsRecorder logger Logger ctx context.Context stateMgr *StateManager running map[string]context.CancelFunc cancel context.CancelFunc config RunLoopConfig runningMu sync.RWMutex } // MetricsRecorder defines the contract for recording metrics type MetricsRecorder interface { RecordTaskStart() RecordTaskCompletion() RecordTaskFailure() RecordQueueLatency(duration time.Duration) } // Logger defines the contract for logging type Logger interface { Info(msg string, keys ...any) Warn(msg string, keys ...any) Error(msg string, keys ...any) Debug(msg string, keys ...any) } // NewRunLoop creates a new run loop func NewRunLoop( config RunLoopConfig, queue queue.Backend, executor TaskExecutor, metrics MetricsRecorder, logger Logger, stateMgr *StateManager, ) *RunLoop { ctx, cancel := context.WithCancel(context.Background()) return &RunLoop{ config: config, queue: queue, executor: executor, metrics: metrics, logger: logger, stateMgr: stateMgr, running: make(map[string]context.CancelFunc), ctx: ctx, cancel: cancel, } } // Start begins the main processing loop func (r *RunLoop) Start() { r.logger.Info("runloop started", "worker_id", r.config.WorkerID, "max_concurrent", r.config.MaxWorkers, "poll_interval", r.config.PollInterval) for { select { case <-r.ctx.Done(): r.logger.Info("shutdown signal received, waiting for tasks") r.waitForTasks() return default: } if r.runningCount() >= r.config.MaxWorkers { time.Sleep(50 * time.Millisecond) continue } queueStart := time.Now() blockTimeout := r.config.PollInterval task, err := r.queue.GetNextTaskWithLeaseBlocking( r.config.WorkerID, r.config.TaskLeaseDuration, blockTimeout, ) queueLatency := time.Since(queueStart) r.metrics.RecordQueueLatency(queueLatency) if err != nil { if err == context.DeadlineExceeded { continue } r.logger.Error("error fetching task", "error", err) continue } if task == nil { continue } r.reserveRunningSlot(task.ID) go r.executeTask(task) } } // Stop gracefully shuts down the run loop func (r *RunLoop) Stop() { r.cancel() r.waitForTasks() r.logger.Info("runloop stopped", "worker_id", r.config.WorkerID) } func (r *RunLoop) reserveRunningSlot(taskID string) { r.runningMu.Lock() defer r.runningMu.Unlock() _, cancel := context.WithCancel(r.ctx) r.running[taskID] = cancel } func (r *RunLoop) releaseRunningSlot(taskID string) { r.runningMu.Lock() defer r.runningMu.Unlock() if cancel, ok := r.running[taskID]; ok { cancel() delete(r.running, taskID) } } // RunningCount returns the number of currently running tasks func (r *RunLoop) RunningCount() int { r.runningMu.RLock() defer r.runningMu.RUnlock() return len(r.running) } func (r *RunLoop) runningCount() int { r.runningMu.RLock() defer r.runningMu.RUnlock() return len(r.running) } func (r *RunLoop) waitForTasks() { timeout := time.After(5 * time.Minute) ticker := time.NewTicker(1 * time.Second) defer ticker.Stop() for { select { case <-timeout: r.logger.Warn("shutdown timeout, force stopping", "running_tasks", r.runningCount()) return case <-ticker.C: if r.runningCount() == 0 { r.logger.Info("all tasks completed, shutting down") return } r.logger.Debug("waiting for tasks to complete", "remaining", r.runningCount()) } } } func (r *RunLoop) executeTask(task *queue.Task) { defer r.releaseRunningSlot(task.ID) // Transition to preparing state if r.stateMgr != nil { if err := r.stateMgr.Transition(task, StatePreparing); err != nil { r.logger.Error("failed to transition task state", "task_id", task.ID, "error", err) } } r.metrics.RecordTaskStart() defer r.metrics.RecordTaskCompletion() r.logger.Info("starting task", "task_id", task.ID, "job_name", task.JobName) // Transition to running state if r.stateMgr != nil { if err := r.stateMgr.Transition(task, StateRunning); err != nil { r.logger.Error("failed to transition task state", "task_id", task.ID, "error", err) } } taskCtx, cancel := context.WithTimeout(r.ctx, task.RemainingTime) defer cancel() if err := r.executor.Execute(taskCtx, task); err != nil { r.logger.Error("task execution failed", "task_id", task.ID, "error", err) r.metrics.RecordTaskFailure() // Transition to failed state if r.stateMgr != nil { if stateErr := r.stateMgr.Transition(task, StateFailed); stateErr != nil { r.logger.Error("failed to transition task to failed state", "task_id", task.ID, "error", stateErr) } } } else { // Transition to completed state if r.stateMgr != nil { if err := r.stateMgr.Transition(task, StateCompleted); err != nil { r.logger.Error("failed to transition task to completed state", "task_id", task.ID, "error", err) } } } _ = r.queue.ReleaseLease(task.ID, r.config.WorkerID) }