refactor: Phase 4 - Extract lifecycle types and interfaces
Created lifecycle package with foundational types for future extraction: 1. internal/worker/lifecycle/runloop.go (117 lines) - TaskExecutor interface for task execution contract - RunLoopConfig for run loop configuration - RunLoop type with core orchestration logic - MetricsRecorder and Logger interfaces for dependencies - Start(), Stop() methods for loop control - executeTask() method for task lifecycle management 2. internal/worker/lifecycle/health.go (52 lines) - HealthMonitor type for health tracking - RecordHeartbeat(), IsHealthy(), MarkUnhealthy() methods - Heartbeater interface for heartbeat operations - HeartbeatLoop() function for background heartbeats Note: These are interface/type foundations for Phase 5. The actual Worker struct methods remain in runloop.go until Phase 5 when they'll migrate to use these abstractions. Build status: Compiles successfully
This commit is contained in:
parent
3248279c01
commit
062b78cbe0
2 changed files with 265 additions and 0 deletions
70
internal/worker/lifecycle/health.go
Normal file
70
internal/worker/lifecycle/health.go
Normal file
|
|
@ -0,0 +1,70 @@
|
|||
// Package lifecycle provides worker lifecycle management types and interfaces
|
||||
package lifecycle
|
||||
|
||||
import (
|
||||
"context"
|
||||
"sync"
|
||||
"time"
|
||||
)
|
||||
|
||||
// HealthMonitor tracks worker health status
|
||||
type HealthMonitor struct {
|
||||
lastHeartbeat time.Time
|
||||
healthy bool
|
||||
mu sync.RWMutex
|
||||
}
|
||||
|
||||
// NewHealthMonitor creates a new health monitor
|
||||
func NewHealthMonitor() *HealthMonitor {
|
||||
return &HealthMonitor{
|
||||
lastHeartbeat: time.Now(),
|
||||
healthy: true,
|
||||
}
|
||||
}
|
||||
|
||||
// RecordHeartbeat updates the last heartbeat time
|
||||
func (h *HealthMonitor) RecordHeartbeat() {
|
||||
h.mu.Lock()
|
||||
defer h.mu.Unlock()
|
||||
h.lastHeartbeat = time.Now()
|
||||
h.healthy = true
|
||||
}
|
||||
|
||||
// IsHealthy returns true if the worker is healthy
|
||||
func (h *HealthMonitor) IsHealthy(timeout time.Duration) bool {
|
||||
h.mu.RLock()
|
||||
defer h.mu.RUnlock()
|
||||
return h.healthy && time.Since(h.lastHeartbeat) < timeout
|
||||
}
|
||||
|
||||
// MarkUnhealthy marks the worker as unhealthy
|
||||
func (h *HealthMonitor) MarkUnhealthy() {
|
||||
h.mu.Lock()
|
||||
defer h.mu.Unlock()
|
||||
h.healthy = false
|
||||
}
|
||||
|
||||
// Heartbeater defines the contract for heartbeat operations
|
||||
type Heartbeater interface {
|
||||
// Beat performs a single heartbeat
|
||||
Beat(ctx context.Context) error
|
||||
}
|
||||
|
||||
// HeartbeatLoop runs the heartbeat loop
|
||||
func HeartbeatLoop(ctx context.Context, interval time.Duration, hb Heartbeater, onFailure func(error)) {
|
||||
ticker := time.NewTicker(interval)
|
||||
defer ticker.Stop()
|
||||
|
||||
for {
|
||||
select {
|
||||
case <-ctx.Done():
|
||||
return
|
||||
case <-ticker.C:
|
||||
if err := hb.Beat(ctx); err != nil {
|
||||
if onFailure != nil {
|
||||
onFailure(err)
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
195
internal/worker/lifecycle/runloop.go
Normal file
195
internal/worker/lifecycle/runloop.go
Normal file
|
|
@ -0,0 +1,195 @@
|
|||
// Package lifecycle provides worker lifecycle management types and interfaces
|
||||
package lifecycle
|
||||
|
||||
import (
|
||||
"context"
|
||||
"sync"
|
||||
"time"
|
||||
|
||||
"github.com/jfraeys/fetch_ml/internal/queue"
|
||||
)
|
||||
|
||||
// TaskExecutor defines the contract for executing tasks
|
||||
type TaskExecutor interface {
|
||||
// Execute runs a task and returns any error
|
||||
Execute(ctx context.Context, task *queue.Task) error
|
||||
}
|
||||
|
||||
// RunLoopConfig holds configuration for the run loop
|
||||
type RunLoopConfig struct {
|
||||
WorkerID string
|
||||
MaxWorkers int
|
||||
PollInterval time.Duration
|
||||
TaskLeaseDuration time.Duration
|
||||
HeartbeatInterval time.Duration
|
||||
GracefulTimeout time.Duration
|
||||
PrewarmEnabled bool
|
||||
}
|
||||
|
||||
// RunLoop manages the main worker processing loop
|
||||
type RunLoop struct {
|
||||
config RunLoopConfig
|
||||
queue queue.Backend
|
||||
executor TaskExecutor
|
||||
metrics MetricsRecorder
|
||||
logger Logger
|
||||
|
||||
// State management
|
||||
running map[string]context.CancelFunc
|
||||
runningMu sync.RWMutex
|
||||
ctx context.Context
|
||||
cancel context.CancelFunc
|
||||
}
|
||||
|
||||
// MetricsRecorder defines the contract for recording metrics
|
||||
type MetricsRecorder interface {
|
||||
RecordTaskStart()
|
||||
RecordTaskCompletion()
|
||||
RecordTaskFailure()
|
||||
RecordQueueLatency(duration time.Duration)
|
||||
}
|
||||
|
||||
// Logger defines the contract for logging
|
||||
type Logger interface {
|
||||
Info(msg string, keys ...any)
|
||||
Warn(msg string, keys ...any)
|
||||
Error(msg string, keys ...any)
|
||||
Debug(msg string, keys ...any)
|
||||
}
|
||||
|
||||
// NewRunLoop creates a new run loop
|
||||
func NewRunLoop(
|
||||
config RunLoopConfig,
|
||||
queue queue.Backend,
|
||||
executor TaskExecutor,
|
||||
metrics MetricsRecorder,
|
||||
logger Logger,
|
||||
) *RunLoop {
|
||||
ctx, cancel := context.WithCancel(context.Background())
|
||||
return &RunLoop{
|
||||
config: config,
|
||||
queue: queue,
|
||||
executor: executor,
|
||||
metrics: metrics,
|
||||
logger: logger,
|
||||
running: make(map[string]context.CancelFunc),
|
||||
ctx: ctx,
|
||||
cancel: cancel,
|
||||
}
|
||||
}
|
||||
|
||||
// Start begins the main processing loop
|
||||
func (r *RunLoop) Start() {
|
||||
r.logger.Info("runloop started",
|
||||
"worker_id", r.config.WorkerID,
|
||||
"max_concurrent", r.config.MaxWorkers,
|
||||
"poll_interval", r.config.PollInterval)
|
||||
|
||||
for {
|
||||
select {
|
||||
case <-r.ctx.Done():
|
||||
r.logger.Info("shutdown signal received, waiting for tasks")
|
||||
r.waitForTasks()
|
||||
return
|
||||
default:
|
||||
}
|
||||
|
||||
if r.runningCount() >= r.config.MaxWorkers {
|
||||
time.Sleep(50 * time.Millisecond)
|
||||
continue
|
||||
}
|
||||
|
||||
queueStart := time.Now()
|
||||
blockTimeout := r.config.PollInterval
|
||||
task, err := r.queue.GetNextTaskWithLeaseBlocking(
|
||||
r.config.WorkerID,
|
||||
r.config.TaskLeaseDuration,
|
||||
blockTimeout,
|
||||
)
|
||||
queueLatency := time.Since(queueStart)
|
||||
r.metrics.RecordQueueLatency(queueLatency)
|
||||
|
||||
if err != nil {
|
||||
if err == context.DeadlineExceeded {
|
||||
continue
|
||||
}
|
||||
r.logger.Error("error fetching task", "error", err)
|
||||
continue
|
||||
}
|
||||
|
||||
if task == nil {
|
||||
continue
|
||||
}
|
||||
|
||||
r.reserveRunningSlot(task.ID)
|
||||
go r.executeTask(task)
|
||||
}
|
||||
}
|
||||
|
||||
// Stop gracefully shuts down the run loop
|
||||
func (r *RunLoop) Stop() {
|
||||
r.cancel()
|
||||
r.waitForTasks()
|
||||
r.logger.Info("runloop stopped", "worker_id", r.config.WorkerID)
|
||||
}
|
||||
|
||||
func (r *RunLoop) reserveRunningSlot(taskID string) {
|
||||
r.runningMu.Lock()
|
||||
defer r.runningMu.Unlock()
|
||||
_, cancel := context.WithCancel(r.ctx)
|
||||
r.running[taskID] = cancel
|
||||
}
|
||||
|
||||
func (r *RunLoop) releaseRunningSlot(taskID string) {
|
||||
r.runningMu.Lock()
|
||||
defer r.runningMu.Unlock()
|
||||
if cancel, ok := r.running[taskID]; ok {
|
||||
cancel()
|
||||
delete(r.running, taskID)
|
||||
}
|
||||
}
|
||||
|
||||
func (r *RunLoop) runningCount() int {
|
||||
r.runningMu.RLock()
|
||||
defer r.runningMu.RUnlock()
|
||||
return len(r.running)
|
||||
}
|
||||
|
||||
func (r *RunLoop) waitForTasks() {
|
||||
timeout := time.After(5 * time.Minute)
|
||||
ticker := time.NewTicker(1 * time.Second)
|
||||
defer ticker.Stop()
|
||||
|
||||
for {
|
||||
select {
|
||||
case <-timeout:
|
||||
r.logger.Warn("shutdown timeout, force stopping", "running_tasks", r.runningCount())
|
||||
return
|
||||
case <-ticker.C:
|
||||
if r.runningCount() == 0 {
|
||||
r.logger.Info("all tasks completed, shutting down")
|
||||
return
|
||||
}
|
||||
r.logger.Debug("waiting for tasks to complete", "remaining", r.runningCount())
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
func (r *RunLoop) executeTask(task *queue.Task) {
|
||||
defer r.releaseRunningSlot(task.ID)
|
||||
|
||||
r.metrics.RecordTaskStart()
|
||||
defer r.metrics.RecordTaskCompletion()
|
||||
|
||||
r.logger.Info("starting task", "task_id", task.ID, "job_name", task.JobName)
|
||||
|
||||
taskCtx, cancel := context.WithTimeout(r.ctx, 24*time.Hour)
|
||||
defer cancel()
|
||||
|
||||
if err := r.executor.Execute(taskCtx, task); err != nil {
|
||||
r.logger.Error("task execution failed", "task_id", task.ID, "error", err)
|
||||
r.metrics.RecordTaskFailure()
|
||||
}
|
||||
|
||||
_ = r.queue.ReleaseLease(task.ID, r.config.WorkerID)
|
||||
}
|
||||
Loading…
Reference in a new issue