diff --git a/internal/worker/simplified.go b/internal/worker/simplified.go new file mode 100644 index 0000000..e543eda --- /dev/null +++ b/internal/worker/simplified.go @@ -0,0 +1,108 @@ +// Package worker provides the ML task worker implementation +package worker + +import ( + "time" + + "github.com/jfraeys/fetch_ml/internal/logging" + "github.com/jfraeys/fetch_ml/internal/queue" + "github.com/jfraeys/fetch_ml/internal/worker/executor" + "github.com/jfraeys/fetch_ml/internal/worker/lifecycle" +) + +// SimplifiedWorker demonstrates the target architecture for the worker refactor. +// This is Phase 5 of the architectural refactoring plan. +// +// Instead of 27 fields in the monolithic Worker struct, this uses composed +// dependencies that implement clear interfaces. +// +// Key improvements: +// - Dependencies are injected via constructor +// - Clear separation of concerns (execution, lifecycle, metrics) +// - Each component can be mocked for testing +// - No direct access to low-level resources +type SimplifiedWorker struct { + id string + config *Config + logger *logging.Logger + + // Composed dependencies from previous phases + runLoop *lifecycle.RunLoop + runner *executor.JobRunner + metrics lifecycle.MetricsRecorder + health *lifecycle.HealthMonitor +} + +// SimplifiedWorkerConfig holds configuration for the simplified worker +type SimplifiedWorkerConfig struct { + ID string + Config *Config + Logger *logging.Logger + Queue queue.Backend + JobRunner *executor.JobRunner + Metrics lifecycle.MetricsRecorder + Executor lifecycle.TaskExecutor +} + +// NewSimplifiedWorker creates a new simplified worker +func NewSimplifiedWorker(cfg SimplifiedWorkerConfig) *SimplifiedWorker { + // Build run loop configuration from worker config + runLoopConfig := lifecycle.RunLoopConfig{ + WorkerID: cfg.ID, + MaxWorkers: cfg.Config.MaxWorkers, + PollInterval: time.Duration(cfg.Config.PollInterval) * time.Second, + TaskLeaseDuration: cfg.Config.TaskLeaseDuration, + HeartbeatInterval: cfg.Config.HeartbeatInterval, + GracefulTimeout: cfg.Config.GracefulTimeout, + PrewarmEnabled: cfg.Config.PrewarmEnabled, + } + + // Create run loop + runLoop := lifecycle.NewRunLoop( + runLoopConfig, + cfg.Queue, + cfg.Executor, + cfg.Metrics, + cfg.Logger, + ) + + return &SimplifiedWorker{ + id: cfg.ID, + config: cfg.Config, + logger: cfg.Logger, + runLoop: runLoop, + runner: cfg.JobRunner, + metrics: cfg.Metrics, + health: lifecycle.NewHealthMonitor(), + } +} + +// Start begins the worker's main processing loop +func (w *SimplifiedWorker) Start() { + w.logger.Info("simplified worker starting", + "worker_id", w.id, + "max_concurrent", w.config.MaxWorkers) + + w.runLoop.Start() +} + +// Stop gracefully shuts down the worker +func (w *SimplifiedWorker) Stop() { + w.logger.Info("simplified worker stopping", "worker_id", w.id) + w.runLoop.Stop() +} + +// IsHealthy returns true if the worker is healthy +func (w *SimplifiedWorker) IsHealthy() bool { + return w.health.IsHealthy(5 * time.Minute) +} + +// GetMetrics returns current worker metrics +func (w *SimplifiedWorker) GetMetrics() map[string]any { + // Simplified metrics - real implementation would aggregate from components + return map[string]any{ + "worker_id": w.id, + "max_workers": w.config.MaxWorkers, + "healthy": w.IsHealthy(), + } +}