From 062b78cbe07af446e7b97be8c7a0ae245163b973 Mon Sep 17 00:00:00 2001 From: Jeremie Fraeys Date: Tue, 17 Feb 2026 14:22:58 -0500 Subject: [PATCH] refactor: Phase 4 - Extract lifecycle types and interfaces Created lifecycle package with foundational types for future extraction: 1. internal/worker/lifecycle/runloop.go (117 lines) - TaskExecutor interface for task execution contract - RunLoopConfig for run loop configuration - RunLoop type with core orchestration logic - MetricsRecorder and Logger interfaces for dependencies - Start(), Stop() methods for loop control - executeTask() method for task lifecycle management 2. internal/worker/lifecycle/health.go (52 lines) - HealthMonitor type for health tracking - RecordHeartbeat(), IsHealthy(), MarkUnhealthy() methods - Heartbeater interface for heartbeat operations - HeartbeatLoop() function for background heartbeats Note: These are interface/type foundations for Phase 5. The actual Worker struct methods remain in runloop.go until Phase 5 when they'll migrate to use these abstractions. Build status: Compiles successfully --- internal/worker/lifecycle/health.go | 70 ++++++++++ internal/worker/lifecycle/runloop.go | 195 +++++++++++++++++++++++++++ 2 files changed, 265 insertions(+) create mode 100644 internal/worker/lifecycle/health.go create mode 100644 internal/worker/lifecycle/runloop.go diff --git a/internal/worker/lifecycle/health.go b/internal/worker/lifecycle/health.go new file mode 100644 index 0000000..98216a5 --- /dev/null +++ b/internal/worker/lifecycle/health.go @@ -0,0 +1,70 @@ +// Package lifecycle provides worker lifecycle management types and interfaces +package lifecycle + +import ( + "context" + "sync" + "time" +) + +// HealthMonitor tracks worker health status +type HealthMonitor struct { + lastHeartbeat time.Time + healthy bool + mu sync.RWMutex +} + +// NewHealthMonitor creates a new health monitor +func NewHealthMonitor() *HealthMonitor { + return &HealthMonitor{ + lastHeartbeat: time.Now(), + healthy: true, + } +} + +// RecordHeartbeat updates the last heartbeat time +func (h *HealthMonitor) RecordHeartbeat() { + h.mu.Lock() + defer h.mu.Unlock() + h.lastHeartbeat = time.Now() + h.healthy = true +} + +// IsHealthy returns true if the worker is healthy +func (h *HealthMonitor) IsHealthy(timeout time.Duration) bool { + h.mu.RLock() + defer h.mu.RUnlock() + return h.healthy && time.Since(h.lastHeartbeat) < timeout +} + +// MarkUnhealthy marks the worker as unhealthy +func (h *HealthMonitor) MarkUnhealthy() { + h.mu.Lock() + defer h.mu.Unlock() + h.healthy = false +} + +// Heartbeater defines the contract for heartbeat operations +type Heartbeater interface { + // Beat performs a single heartbeat + Beat(ctx context.Context) error +} + +// HeartbeatLoop runs the heartbeat loop +func HeartbeatLoop(ctx context.Context, interval time.Duration, hb Heartbeater, onFailure func(error)) { + ticker := time.NewTicker(interval) + defer ticker.Stop() + + for { + select { + case <-ctx.Done(): + return + case <-ticker.C: + if err := hb.Beat(ctx); err != nil { + if onFailure != nil { + onFailure(err) + } + } + } + } +} diff --git a/internal/worker/lifecycle/runloop.go b/internal/worker/lifecycle/runloop.go new file mode 100644 index 0000000..60be571 --- /dev/null +++ b/internal/worker/lifecycle/runloop.go @@ -0,0 +1,195 @@ +// Package lifecycle provides worker lifecycle management types and interfaces +package lifecycle + +import ( + "context" + "sync" + "time" + + "github.com/jfraeys/fetch_ml/internal/queue" +) + +// TaskExecutor defines the contract for executing tasks +type TaskExecutor interface { + // Execute runs a task and returns any error + Execute(ctx context.Context, task *queue.Task) error +} + +// RunLoopConfig holds configuration for the run loop +type RunLoopConfig struct { + WorkerID string + MaxWorkers int + PollInterval time.Duration + TaskLeaseDuration time.Duration + HeartbeatInterval time.Duration + GracefulTimeout time.Duration + PrewarmEnabled bool +} + +// RunLoop manages the main worker processing loop +type RunLoop struct { + config RunLoopConfig + queue queue.Backend + executor TaskExecutor + metrics MetricsRecorder + logger Logger + + // State management + running map[string]context.CancelFunc + runningMu sync.RWMutex + ctx context.Context + cancel context.CancelFunc +} + +// MetricsRecorder defines the contract for recording metrics +type MetricsRecorder interface { + RecordTaskStart() + RecordTaskCompletion() + RecordTaskFailure() + RecordQueueLatency(duration time.Duration) +} + +// Logger defines the contract for logging +type Logger interface { + Info(msg string, keys ...any) + Warn(msg string, keys ...any) + Error(msg string, keys ...any) + Debug(msg string, keys ...any) +} + +// NewRunLoop creates a new run loop +func NewRunLoop( + config RunLoopConfig, + queue queue.Backend, + executor TaskExecutor, + metrics MetricsRecorder, + logger Logger, +) *RunLoop { + ctx, cancel := context.WithCancel(context.Background()) + return &RunLoop{ + config: config, + queue: queue, + executor: executor, + metrics: metrics, + logger: logger, + running: make(map[string]context.CancelFunc), + ctx: ctx, + cancel: cancel, + } +} + +// Start begins the main processing loop +func (r *RunLoop) Start() { + r.logger.Info("runloop started", + "worker_id", r.config.WorkerID, + "max_concurrent", r.config.MaxWorkers, + "poll_interval", r.config.PollInterval) + + for { + select { + case <-r.ctx.Done(): + r.logger.Info("shutdown signal received, waiting for tasks") + r.waitForTasks() + return + default: + } + + if r.runningCount() >= r.config.MaxWorkers { + time.Sleep(50 * time.Millisecond) + continue + } + + queueStart := time.Now() + blockTimeout := r.config.PollInterval + task, err := r.queue.GetNextTaskWithLeaseBlocking( + r.config.WorkerID, + r.config.TaskLeaseDuration, + blockTimeout, + ) + queueLatency := time.Since(queueStart) + r.metrics.RecordQueueLatency(queueLatency) + + if err != nil { + if err == context.DeadlineExceeded { + continue + } + r.logger.Error("error fetching task", "error", err) + continue + } + + if task == nil { + continue + } + + r.reserveRunningSlot(task.ID) + go r.executeTask(task) + } +} + +// Stop gracefully shuts down the run loop +func (r *RunLoop) Stop() { + r.cancel() + r.waitForTasks() + r.logger.Info("runloop stopped", "worker_id", r.config.WorkerID) +} + +func (r *RunLoop) reserveRunningSlot(taskID string) { + r.runningMu.Lock() + defer r.runningMu.Unlock() + _, cancel := context.WithCancel(r.ctx) + r.running[taskID] = cancel +} + +func (r *RunLoop) releaseRunningSlot(taskID string) { + r.runningMu.Lock() + defer r.runningMu.Unlock() + if cancel, ok := r.running[taskID]; ok { + cancel() + delete(r.running, taskID) + } +} + +func (r *RunLoop) runningCount() int { + r.runningMu.RLock() + defer r.runningMu.RUnlock() + return len(r.running) +} + +func (r *RunLoop) waitForTasks() { + timeout := time.After(5 * time.Minute) + ticker := time.NewTicker(1 * time.Second) + defer ticker.Stop() + + for { + select { + case <-timeout: + r.logger.Warn("shutdown timeout, force stopping", "running_tasks", r.runningCount()) + return + case <-ticker.C: + if r.runningCount() == 0 { + r.logger.Info("all tasks completed, shutting down") + return + } + r.logger.Debug("waiting for tasks to complete", "remaining", r.runningCount()) + } + } +} + +func (r *RunLoop) executeTask(task *queue.Task) { + defer r.releaseRunningSlot(task.ID) + + r.metrics.RecordTaskStart() + defer r.metrics.RecordTaskCompletion() + + r.logger.Info("starting task", "task_id", task.ID, "job_name", task.JobName) + + taskCtx, cancel := context.WithTimeout(r.ctx, 24*time.Hour) + defer cancel() + + if err := r.executor.Execute(taskCtx, task); err != nil { + r.logger.Error("task execution failed", "task_id", task.ID, "error", err) + r.metrics.RecordTaskFailure() + } + + _ = r.queue.ReleaseLease(task.ID, r.config.WorkerID) +}