fetch_ml/internal/scheduler/service_manager.go
Jeremie Fraeys 43e6446587
feat(scheduler): implement multi-tenant job scheduler with gang scheduling
Add new scheduler component for distributed ML workload orchestration:
- Hub-based coordination for multi-worker clusters
- Pacing controller for rate limiting job submissions
- Priority queue with preemption support
- Port allocator for dynamic service discovery
- Protocol handlers for worker-scheduler communication
- Service manager with OS-specific implementations
- Connection management and state persistence
- Template system for service deployment

Includes comprehensive test suite:
- Unit tests for all core components
- Integration tests for distributed scenarios
- Benchmark tests for performance validation
- Mock fixtures for isolated testing

Refs: scheduler-architecture.md
2026-02-26 12:03:23 -05:00

367 lines
9 KiB
Go

package scheduler
import (
"context"
"fmt"
"io"
"log/slog"
"net/http"
"os/exec"
"syscall"
"time"
)
// ServiceManager handles the lifecycle of service-type jobs
// Services transition: preparing → serving → stopping → completed/failed
// Unlike batch jobs, services run indefinitely until explicitly cancelled
// and have health checks for liveness and readiness
type ServiceManager struct {
task *Task
spec *JobSpec
port int
cmd *exec.Cmd
cancel context.CancelFunc
healthy bool
ready bool
lastHealth time.Time
stateMachine *StateMachine
}
// StateMachine manages service state transitions
// It ensures valid transitions and notifies the scheduler of changes
type StateMachine struct {
current string
onChange func(oldState, newState string)
}
// NewServiceManager creates a new service manager for a task
func NewServiceManager(task *Task, spec *JobSpec, port int) *ServiceManager {
return &ServiceManager{
task: task,
spec: spec,
port: port,
healthy: false,
ready: false,
stateMachine: &StateMachine{
current: "preparing",
onChange: nil,
},
}
}
// SetStateChangeCallback sets a callback for state transitions
func (sm *ServiceManager) SetStateChangeCallback(cb func(oldState, newState string)) {
if sm.stateMachine != nil {
sm.stateMachine.onChange = cb
}
}
// Run starts and manages the service lifecycle
// It runs prolog, starts the service, waits for readiness, then health checks
func (sm *ServiceManager) Run(ctx context.Context) error {
// Create cancellable context for the service
svcCtx, cancel := context.WithCancel(ctx)
sm.cancel = cancel
defer cancel()
// Run prolog if configured
if len(sm.spec.Prolog) > 0 {
sm.transition("preparing")
if err := sm.runProlog(svcCtx); err != nil {
sm.transition("failed")
return fmt.Errorf("prolog failed: %w", err)
}
}
// Start the service process
if err := sm.startService(svcCtx); err != nil {
sm.transition("failed")
return fmt.Errorf("start service failed: %w", err)
}
// Wait for readiness (if health check configured)
if sm.spec.HealthCheck != nil && sm.spec.HealthCheck.ReadinessEndpoint != "" {
sm.transition("preparing")
if err := sm.waitReady(svcCtx, 120*time.Second); err != nil {
sm.stopService()
sm.transition("failed")
return fmt.Errorf("readiness check failed: %w", err)
}
}
// Mark as serving
sm.transition("serving")
sm.ready = true
// Run health check loop
return sm.healthLoop(svcCtx)
}
// Stop gracefully stops the service
// It runs epilog with a fresh context (ignores job cancellation)
func (sm *ServiceManager) Stop() error {
// Cancel service context
if sm.cancel != nil {
sm.cancel()
}
// Run epilog with fresh context - must complete even if job cancelled
epilogCtx, cancel := context.WithTimeout(context.Background(), 60*time.Second)
defer cancel()
sm.transition("stopping")
if len(sm.spec.Epilog) > 0 {
if err := sm.runEpilog(epilogCtx); err != nil {
slog.Warn("epilog failed", "task", sm.task.ID, "error", err)
}
}
// Ensure service is stopped
sm.stopService()
sm.transition("completed")
return nil
}
// runProlog executes prolog commands before starting the service
func (sm *ServiceManager) runProlog(ctx context.Context) error {
for _, cmdStr := range sm.spec.Prolog {
cmd := sm.buildCommand(ctx, cmdStr)
if err := cmd.Run(); err != nil {
return fmt.Errorf("prolog command failed: %s, error: %w", cmdStr, err)
}
}
return nil
}
// startService starts the main service process
func (sm *ServiceManager) startService(ctx context.Context) error {
if len(sm.spec.Command) == 0 {
return fmt.Errorf("no command specified for service")
}
cmd := sm.buildCommand(ctx, sm.spec.Command[0], sm.spec.Command[1:]...)
// Set up process group for clean termination (Unix-specific)
setProcessGroup(cmd)
if err := cmd.Start(); err != nil {
return err
}
sm.cmd = cmd
return nil
}
// stopService stops the service process
func (sm *ServiceManager) stopService() {
if sm.cmd == nil || sm.cmd.Process == nil {
return
}
// Try graceful termination first
sm.cmd.Process.Signal(syscall.SIGTERM)
// Wait for graceful shutdown or timeout
done := make(chan error, 1)
go func() {
done <- sm.cmd.Wait()
}()
select {
case <-done:
// Graceful shutdown succeeded
case <-time.After(10 * time.Second):
// Force kill process group (Unix-specific)
killProcessGroup(sm.cmd)
}
}
// runEpilog executes epilog commands after service stops
func (sm *ServiceManager) runEpilog(ctx context.Context) error {
for _, cmdStr := range sm.spec.Epilog {
cmd := sm.buildCommand(ctx, cmdStr)
if err := cmd.Run(); err != nil {
slog.Warn("epilog command failed", "task", sm.task.ID, "cmd", cmdStr, "error", err)
// Continue with other epilog commands even if one fails
}
}
return nil
}
// healthLoop runs health checks periodically
// Returns when context is cancelled or health check fails
func (sm *ServiceManager) healthLoop(ctx context.Context) error {
if sm.spec.HealthCheck == nil {
// No health check configured - just wait for context cancellation
<-ctx.Done()
return nil
}
interval := time.Duration(sm.spec.HealthCheck.IntervalSecs) * time.Second
if interval < 5*time.Second {
interval = 15 * time.Second // Minimum 15s between checks
}
ticker := time.NewTicker(interval)
defer ticker.Stop()
for {
select {
case <-ctx.Done():
return nil
case <-ticker.C:
// Check liveness
if !sm.checkLiveness() {
sm.transition("failed")
return fmt.Errorf("liveness check failed")
}
sm.healthy = true
// Check readiness (if configured)
if sm.spec.HealthCheck.ReadinessEndpoint != "" {
sm.ready = sm.checkReadiness()
}
sm.lastHealth = time.Now()
}
}
}
// waitReady waits for the service to become ready
func (sm *ServiceManager) waitReady(ctx context.Context, timeout time.Duration) error {
if sm.spec.HealthCheck == nil || sm.spec.HealthCheck.ReadinessEndpoint == "" {
return nil // No readiness check configured
}
deadline := time.Now().Add(timeout)
checkInterval := 2 * time.Second
for time.Now().Before(deadline) {
if sm.checkReadiness() {
return nil
}
select {
case <-ctx.Done():
return ctx.Err()
case <-time.After(checkInterval):
// Continue checking
}
}
return fmt.Errorf("readiness check timed out after %v", timeout)
}
// checkLiveness checks if the service process is running
func (sm *ServiceManager) checkLiveness() bool {
if sm.cmd == nil || sm.cmd.Process == nil {
return false
}
// Check if process is still running
if !isProcessRunning(sm.cmd) {
return false
}
// If liveness endpoint configured, also check HTTP
if sm.spec.HealthCheck != nil && sm.spec.HealthCheck.LivenessEndpoint != "" {
return sm.checkHTTPEndpoint(sm.spec.HealthCheck.LivenessEndpoint, 2*time.Second)
}
return true
}
// checkReadiness checks if the service is ready to receive traffic
func (sm *ServiceManager) checkReadiness() bool {
if sm.spec.HealthCheck == nil || sm.spec.HealthCheck.ReadinessEndpoint == "" {
return sm.healthy // Fall back to liveness
}
return sm.checkHTTPEndpoint(sm.spec.HealthCheck.ReadinessEndpoint, 5*time.Second)
}
// checkHTTPEndpoint makes an HTTP GET request to check endpoint health
func (sm *ServiceManager) checkHTTPEndpoint(endpoint string, timeout time.Duration) bool {
client := &http.Client{
Timeout: timeout,
}
resp, err := client.Get(endpoint)
if err != nil {
return false
}
defer resp.Body.Close()
// Drain body to allow connection reuse
io.Copy(io.Discard, resp.Body)
// 2xx status codes indicate success
return resp.StatusCode >= 200 && resp.StatusCode < 300
}
// transition changes the service state
func (sm *ServiceManager) transition(newState string) {
if sm.stateMachine == nil {
return
}
oldState := sm.stateMachine.current
if oldState == newState {
return
}
sm.stateMachine.current = newState
// Update task status
sm.task.Status = newState
// Notify callback
if sm.stateMachine.onChange != nil {
sm.stateMachine.onChange(oldState, newState)
}
slog.Info("service state transition",
"task", sm.task.ID,
"from", oldState,
"to", newState)
}
// buildCommand creates an exec.Cmd with environment variables
func (sm *ServiceManager) buildCommand(ctx context.Context, name string, args ...string) *exec.Cmd {
cmd := exec.CommandContext(ctx, name, args...)
// Set environment variables
env := make([]string, 0, len(sm.spec.Env)+4)
for k, v := range sm.spec.Env {
env = append(env, fmt.Sprintf("%s=%s", k, v))
}
// Add service-specific variables
env = append(env,
fmt.Sprintf("SERVICE_PORT=%d", sm.port),
fmt.Sprintf("TASK_ID=%s", sm.task.ID),
)
cmd.Env = env
return cmd
}
// IsHealthy returns true if the service is healthy (process running)
func (sm *ServiceManager) IsHealthy() bool {
return sm.healthy
}
// IsReady returns true if the service is ready to receive traffic
func (sm *ServiceManager) IsReady() bool {
return sm.ready
}
// GetState returns the current service state
func (sm *ServiceManager) GetState() string {
if sm.stateMachine == nil {
return "unknown"
}
return sm.stateMachine.current
}