package scheduler import ( "context" "fmt" "io" "log/slog" "net/http" "os/exec" "syscall" "time" ) // ServiceManager handles the lifecycle of service-type jobs // Services transition: preparing → serving → stopping → completed/failed // Unlike batch jobs, services run indefinitely until explicitly cancelled // and have health checks for liveness and readiness type ServiceManager struct { task *Task spec *JobSpec port int cmd *exec.Cmd cancel context.CancelFunc healthy bool ready bool lastHealth time.Time stateMachine *StateMachine } // StateMachine manages service state transitions // It ensures valid transitions and notifies the scheduler of changes type StateMachine struct { current string onChange func(oldState, newState string) } // NewServiceManager creates a new service manager for a task func NewServiceManager(task *Task, spec *JobSpec, port int) *ServiceManager { return &ServiceManager{ task: task, spec: spec, port: port, healthy: false, ready: false, stateMachine: &StateMachine{ current: "preparing", onChange: nil, }, } } // SetStateChangeCallback sets a callback for state transitions func (sm *ServiceManager) SetStateChangeCallback(cb func(oldState, newState string)) { if sm.stateMachine != nil { sm.stateMachine.onChange = cb } } // Run starts and manages the service lifecycle // It runs prolog, starts the service, waits for readiness, then health checks func (sm *ServiceManager) Run(ctx context.Context) error { // Create cancellable context for the service svcCtx, cancel := context.WithCancel(ctx) sm.cancel = cancel defer cancel() // Run prolog if configured if len(sm.spec.Prolog) > 0 { sm.transition("preparing") if err := sm.runProlog(svcCtx); err != nil { sm.transition("failed") return fmt.Errorf("prolog failed: %w", err) } } // Start the service process if err := sm.startService(svcCtx); err != nil { sm.transition("failed") return fmt.Errorf("start service failed: %w", err) } // Wait for readiness (if health check configured) if sm.spec.HealthCheck != nil && sm.spec.HealthCheck.ReadinessEndpoint != "" { sm.transition("preparing") if err := sm.waitReady(svcCtx, 120*time.Second); err != nil { sm.stopService() sm.transition("failed") return fmt.Errorf("readiness check failed: %w", err) } } // Mark as serving sm.transition("serving") sm.ready = true // Run health check loop return sm.healthLoop(svcCtx) } // Stop gracefully stops the service // It runs epilog with a fresh context (ignores job cancellation) func (sm *ServiceManager) Stop() error { // Cancel service context if sm.cancel != nil { sm.cancel() } // Run epilog with fresh context - must complete even if job cancelled epilogCtx, cancel := context.WithTimeout(context.Background(), 60*time.Second) defer cancel() sm.transition("stopping") if len(sm.spec.Epilog) > 0 { if err := sm.runEpilog(epilogCtx); err != nil { slog.Warn("epilog failed", "task", sm.task.ID, "error", err) } } // Ensure service is stopped sm.stopService() sm.transition("completed") return nil } // runProlog executes prolog commands before starting the service func (sm *ServiceManager) runProlog(ctx context.Context) error { for _, cmdStr := range sm.spec.Prolog { cmd := sm.buildCommand(ctx, cmdStr) if err := cmd.Run(); err != nil { return fmt.Errorf("prolog command failed: %s, error: %w", cmdStr, err) } } return nil } // startService starts the main service process func (sm *ServiceManager) startService(ctx context.Context) error { if len(sm.spec.Command) == 0 { return fmt.Errorf("no command specified for service") } cmd := sm.buildCommand(ctx, sm.spec.Command[0], sm.spec.Command[1:]...) // Set up process group for clean termination (Unix-specific) setProcessGroup(cmd) if err := cmd.Start(); err != nil { return err } sm.cmd = cmd return nil } // stopService stops the service process func (sm *ServiceManager) stopService() { if sm.cmd == nil || sm.cmd.Process == nil { return } // Try graceful termination first sm.cmd.Process.Signal(syscall.SIGTERM) // Wait for graceful shutdown or timeout done := make(chan error, 1) go func() { done <- sm.cmd.Wait() }() select { case <-done: // Graceful shutdown succeeded case <-time.After(10 * time.Second): // Force kill process group (Unix-specific) killProcessGroup(sm.cmd) } } // runEpilog executes epilog commands after service stops func (sm *ServiceManager) runEpilog(ctx context.Context) error { for _, cmdStr := range sm.spec.Epilog { cmd := sm.buildCommand(ctx, cmdStr) if err := cmd.Run(); err != nil { slog.Warn("epilog command failed", "task", sm.task.ID, "cmd", cmdStr, "error", err) // Continue with other epilog commands even if one fails } } return nil } // healthLoop runs health checks periodically // Returns when context is cancelled or health check fails func (sm *ServiceManager) healthLoop(ctx context.Context) error { if sm.spec.HealthCheck == nil { // No health check configured - just wait for context cancellation <-ctx.Done() return nil } interval := time.Duration(sm.spec.HealthCheck.IntervalSecs) * time.Second if interval < 5*time.Second { interval = 15 * time.Second // Minimum 15s between checks } ticker := time.NewTicker(interval) defer ticker.Stop() for { select { case <-ctx.Done(): return nil case <-ticker.C: // Check liveness if !sm.checkLiveness() { sm.transition("failed") return fmt.Errorf("liveness check failed") } sm.healthy = true // Check readiness (if configured) if sm.spec.HealthCheck.ReadinessEndpoint != "" { sm.ready = sm.checkReadiness() } sm.lastHealth = time.Now() } } } // waitReady waits for the service to become ready func (sm *ServiceManager) waitReady(ctx context.Context, timeout time.Duration) error { if sm.spec.HealthCheck == nil || sm.spec.HealthCheck.ReadinessEndpoint == "" { return nil // No readiness check configured } deadline := time.Now().Add(timeout) checkInterval := 2 * time.Second for time.Now().Before(deadline) { if sm.checkReadiness() { return nil } select { case <-ctx.Done(): return ctx.Err() case <-time.After(checkInterval): // Continue checking } } return fmt.Errorf("readiness check timed out after %v", timeout) } // checkLiveness checks if the service process is running func (sm *ServiceManager) checkLiveness() bool { if sm.cmd == nil || sm.cmd.Process == nil { return false } // Check if process is still running if !isProcessRunning(sm.cmd) { return false } // If liveness endpoint configured, also check HTTP if sm.spec.HealthCheck != nil && sm.spec.HealthCheck.LivenessEndpoint != "" { return sm.checkHTTPEndpoint(sm.spec.HealthCheck.LivenessEndpoint, 2*time.Second) } return true } // checkReadiness checks if the service is ready to receive traffic func (sm *ServiceManager) checkReadiness() bool { if sm.spec.HealthCheck == nil || sm.spec.HealthCheck.ReadinessEndpoint == "" { return sm.healthy // Fall back to liveness } return sm.checkHTTPEndpoint(sm.spec.HealthCheck.ReadinessEndpoint, 5*time.Second) } // checkHTTPEndpoint makes an HTTP GET request to check endpoint health func (sm *ServiceManager) checkHTTPEndpoint(endpoint string, timeout time.Duration) bool { client := &http.Client{ Timeout: timeout, } resp, err := client.Get(endpoint) if err != nil { return false } defer resp.Body.Close() // Drain body to allow connection reuse io.Copy(io.Discard, resp.Body) // 2xx status codes indicate success return resp.StatusCode >= 200 && resp.StatusCode < 300 } // transition changes the service state func (sm *ServiceManager) transition(newState string) { if sm.stateMachine == nil { return } oldState := sm.stateMachine.current if oldState == newState { return } sm.stateMachine.current = newState // Update task status sm.task.Status = newState // Notify callback if sm.stateMachine.onChange != nil { sm.stateMachine.onChange(oldState, newState) } slog.Info("service state transition", "task", sm.task.ID, "from", oldState, "to", newState) } // buildCommand creates an exec.Cmd with environment variables func (sm *ServiceManager) buildCommand(ctx context.Context, name string, args ...string) *exec.Cmd { cmd := exec.CommandContext(ctx, name, args...) // Set environment variables env := make([]string, 0, len(sm.spec.Env)+4) for k, v := range sm.spec.Env { env = append(env, fmt.Sprintf("%s=%s", k, v)) } // Add service-specific variables env = append(env, fmt.Sprintf("SERVICE_PORT=%d", sm.port), fmt.Sprintf("TASK_ID=%s", sm.task.ID), ) cmd.Env = env return cmd } // IsHealthy returns true if the service is healthy (process running) func (sm *ServiceManager) IsHealthy() bool { return sm.healthy } // IsReady returns true if the service is ready to receive traffic func (sm *ServiceManager) IsReady() bool { return sm.ready } // GetState returns the current service state func (sm *ServiceManager) GetState() string { if sm.stateMachine == nil { return "unknown" } return sm.stateMachine.current }