// Package lifecycle provides service job lifecycle management for long-running services. package lifecycle import ( "context" "fmt" "net/http" "os/exec" "syscall" "time" "github.com/jfraeys/fetch_ml/internal/domain" "github.com/jfraeys/fetch_ml/internal/scheduler" ) // ServiceManager handles the lifecycle of long-running service jobs (Jupyter, vLLM) type ServiceManager struct { task *domain.Task spec ServiceSpec cmd *exec.Cmd port int stateMgr *StateManager logger Logger healthCheck *scheduler.HealthCheck httpClient *http.Client // Reusable HTTP client for health checks } // ServiceSpec defines the specification for a service job type ServiceSpec struct { Command []string Env map[string]string Port int HealthCheck *scheduler.HealthCheck } // NewServiceManager creates a new service manager func NewServiceManager(task *domain.Task, spec ServiceSpec, port int, stateMgr *StateManager, logger Logger) *ServiceManager { return &ServiceManager{ task: task, spec: spec, port: port, stateMgr: stateMgr, logger: logger, healthCheck: spec.HealthCheck, httpClient: &http.Client{Timeout: 5 * time.Second}, // Shared client with connection reuse } } // Run executes the service lifecycle: start, wait for ready, health loop func (sm *ServiceManager) Run(ctx context.Context) error { if err := sm.start(); err != nil { sm.logger.Error("service start failed", "task_id", sm.task.ID, "error", err) if sm.stateMgr != nil { if err := sm.stateMgr.Transition(sm.task, StateFailed); err != nil { sm.logger.Error("failed to transition to failed", "task_id", sm.task.ID, "error", err) } } return fmt.Errorf("start service: %w", err) } // Wait for readiness readyCtx, cancel := context.WithTimeout(ctx, 120*time.Second) defer cancel() if err := sm.waitReady(readyCtx); err != nil { sm.logger.Error("service readiness check failed", "task_id", sm.task.ID, "error", err) if sm.stateMgr != nil { if err := sm.stateMgr.Transition(sm.task, StateFailed); err != nil { sm.logger.Error("failed to transition to failed", "task_id", sm.task.ID, "error", err) } } _ = sm.stop() return fmt.Errorf("wait ready: %w", err) } // Transition to serving state if sm.stateMgr != nil { if err := sm.stateMgr.Transition(sm.task, StateServing); err != nil { sm.logger.Error("failed to transition to serving", "task_id", sm.task.ID, "error", err) } } sm.logger.Info("service is serving", "task_id", sm.task.ID, "port", sm.port) // Run health loop return sm.healthLoop(ctx) } // start launches the service process func (sm *ServiceManager) start() error { if len(sm.spec.Command) == 0 { return fmt.Errorf("no command specified") } sm.cmd = exec.Command(sm.spec.Command[0], sm.spec.Command[1:]...) // Set environment for k, v := range sm.spec.Env { sm.cmd.Env = append(sm.cmd.Env, fmt.Sprintf("%s=%s", k, v)) } // Start process if err := sm.cmd.Start(); err != nil { return fmt.Errorf("start process: %w", err) } return nil } // waitReady blocks until the service passes readiness check or timeout func (sm *ServiceManager) waitReady(ctx context.Context) error { if sm.healthCheck == nil || sm.healthCheck.ReadinessEndpoint == "" { // No readiness check - assume ready immediately return nil } ticker := time.NewTicker(1 * time.Second) defer ticker.Stop() for { select { case <-ctx.Done(): return ctx.Err() case <-ticker.C: if sm.checkReadiness() { return nil } } } } // healthLoop monitors service health until context cancellation func (sm *ServiceManager) healthLoop(ctx context.Context) error { if sm.healthCheck == nil { // No health check - just wait for context cancellation <-ctx.Done() return sm.gracefulStop() } interval := time.Duration(sm.healthCheck.IntervalSecs) * time.Second if interval == 0 { interval = 15 * time.Second } ticker := time.NewTicker(interval) defer ticker.Stop() for { select { case <-ctx.Done(): return sm.gracefulStop() case <-ticker.C: if !sm.checkLiveness() { sm.logger.Error("service liveness check failed", "task_id", sm.task.ID) if sm.stateMgr != nil { if err := sm.stateMgr.Transition(sm.task, StateFailed); err != nil { sm.logger.Error("failed to transition to failed", "task_id", sm.task.ID, "error", err) return fmt.Errorf("transition to failed: %w", err) } } return fmt.Errorf("liveness check failed") } } } } // checkLiveness returns true if the service process is alive func (sm *ServiceManager) checkLiveness() bool { if sm.cmd == nil || sm.cmd.Process == nil { return false } // Check process state if sm.healthCheck != nil && sm.healthCheck.LivenessEndpoint != "" { // HTTP liveness check using shared client resp, err := sm.httpClient.Get(sm.healthCheck.LivenessEndpoint) if err != nil { return false } defer resp.Body.Close() return resp.StatusCode == 200 } // Process existence check return sm.cmd.Process.Signal(syscall.Signal(0)) == nil } // checkReadiness returns true if the service is ready to accept traffic func (sm *ServiceManager) checkReadiness() bool { if sm.healthCheck == nil || sm.healthCheck.ReadinessEndpoint == "" { return true } // Use shared HTTP client for connection reuse resp, err := sm.httpClient.Get(sm.healthCheck.ReadinessEndpoint) if err != nil { return false } defer resp.Body.Close() return resp.StatusCode == 200 } // gracefulStop stops the service gracefully with a timeout func (sm *ServiceManager) gracefulStop() error { sm.logger.Info("gracefully stopping service", "task_id", sm.task.ID) if sm.stateMgr != nil { if err := sm.stateMgr.Transition(sm.task, StateStopping); err != nil { sm.logger.Error("failed to transition to stopping", "task_id", sm.task.ID, "error", err) } } if sm.cmd == nil || sm.cmd.Process == nil { if sm.stateMgr != nil { if err := sm.stateMgr.Transition(sm.task, StateCompleted); err != nil { sm.logger.Error("failed to transition to completed", "task_id", sm.task.ID, "error", err) } } return nil } // Send SIGTERM for graceful shutdown if err := sm.cmd.Process.Signal(syscall.SIGTERM); err != nil { sm.logger.Warn("SIGTERM failed, using SIGKILL", "task_id", sm.task.ID, "error", err) if err := sm.cmd.Process.Kill(); err != nil { sm.logger.Error("failed to kill process", "task_id", sm.task.ID, "error", err) } } else { // Wait for graceful shutdown done := make(chan error, 1) go func() { done <- sm.cmd.Wait() }() select { case <-done: // Graceful shutdown completed case <-time.After(30 * time.Second): // Timeout - force kill sm.logger.Warn("graceful shutdown timeout, forcing kill", "task_id", sm.task.ID) if err := sm.cmd.Process.Kill(); err != nil { sm.logger.Error("failed to kill process", "task_id", sm.task.ID, "error", err) } } } if sm.stateMgr != nil { if err := sm.stateMgr.Transition(sm.task, StateCompleted); err != nil { sm.logger.Error("failed to transition to completed", "task_id", sm.task.ID, "error", err) } } return nil } // stop forcefully stops the service func (sm *ServiceManager) stop() error { if sm.cmd == nil || sm.cmd.Process == nil { return nil } return sm.cmd.Process.Kill() } // GetPort returns the assigned port for the service func (sm *ServiceManager) GetPort() int { return sm.port }