diff --git a/internal/scheduler/service_templates.go b/internal/scheduler/service_templates.go index 40b1e83..3647bb3 100644 --- a/internal/scheduler/service_templates.go +++ b/internal/scheduler/service_templates.go @@ -59,7 +59,7 @@ type ServiceMount struct { // {{SECRET:xxx}} - Secret value from scheduler's secret store // JupyterLabTemplate is the default JupyterLab service configuration. -// Sysadmins can disable Jupyter by setting service_slots: 0 in worker config, +// Users can disable Jupyter by setting service_slots: 0 in worker config, // or by not registering this template with the scheduler. var JupyterLabTemplate = ServiceTemplate{ JobType: "service", diff --git a/internal/worker/config.go b/internal/worker/config.go index f42c905..ad9006f 100644 --- a/internal/worker/config.go +++ b/internal/worker/config.go @@ -44,7 +44,7 @@ type Config struct { SSHKey string `yaml:"ssh_key"` Port int `yaml:"port"` BasePath string `yaml:"base_path"` - TrainScript string `yaml:"train_script"` + Entrypoint string `yaml:"entrypoint"` RedisURL string `yaml:"redis_url"` RedisAddr string `yaml:"redis_addr"` RedisPassword string `yaml:"redis_password"` @@ -107,11 +107,7 @@ type Config struct { Mode string `yaml:"mode"` // Scheduler configuration for distributed mode - Scheduler struct { - Address string `yaml:"address"` - Cert string `yaml:"cert"` - Token string `yaml:"token"` - } `yaml:"scheduler"` + Scheduler SchedulerConfig `yaml:"scheduler"` // Plugins configuration Plugins map[string]factory.PluginConfig `yaml:"plugins"` @@ -140,7 +136,56 @@ type SnapshotStoreConfig struct { MaxRetries int `yaml:"max_retries"` } -// AppleGPUConfig holds configuration for Apple M-series GPU support +// SchedulerConfig holds configurable heartbeat and lease settings for distributed mode. +type SchedulerConfig struct { + Address string `yaml:"address"` + Cert string `yaml:"cert"` + Token string `yaml:"token"` + HeartbeatIntervalSecs int `yaml:"heartbeat_interval_secs"` // default: 30 + TaskLeaseDurationSecs int `yaml:"task_lease_duration_secs"` // default: 90 (3x heartbeat) +} + +// Validate checks that lease and heartbeat settings are valid. +// Enforces 2-10x ratio between lease duration and heartbeat interval. +func (sc *SchedulerConfig) Validate() error { + // Apply defaults if zero + if sc.HeartbeatIntervalSecs == 0 { + sc.HeartbeatIntervalSecs = 30 + } + if sc.TaskLeaseDurationSecs == 0 { + sc.TaskLeaseDurationSecs = 90 + } + + heartbeat := time.Duration(sc.HeartbeatIntervalSecs) * time.Second + lease := time.Duration(sc.TaskLeaseDurationSecs) * time.Second + + if lease <= heartbeat { + return fmt.Errorf( + "task_lease_duration_secs (%s) must be greater than heartbeat_interval_secs (%s)", + lease, heartbeat, + ) + } + + ratio := lease.Seconds() / heartbeat.Seconds() + if ratio < 2.0 { + return fmt.Errorf( + "task_lease_duration_secs must be at least 2× heartbeat_interval_secs "+ + "(got %.1f×) — too small a margin for transient network issues", + ratio, + ) + } + + if ratio > 10.0 { + return fmt.Errorf( + "task_lease_duration_secs is %.1f× heartbeat_interval_secs — "+ + "dead workers won't be detected for %s, consider reducing lease duration", + ratio, lease, + ) + } + + return nil +} + type AppleGPUConfig struct { Enabled bool `yaml:"enabled"` MetalDevice string `yaml:"metal_device"` diff --git a/internal/worker/executor/container.go b/internal/worker/executor/container.go index 433e0ea..ac79f4e 100644 --- a/internal/worker/executor/container.go +++ b/internal/worker/executor/container.go @@ -28,7 +28,7 @@ type ContainerConfig struct { PodmanImage string ContainerResults string ContainerWorkspace string - TrainScript string + Entrypoint string BasePath string AppleGPUEnabled bool } @@ -298,7 +298,7 @@ func (e *ContainerExecutor) runPodman( podmanCfg container.PodmanConfig, selectedImage string, ) error { - scriptPath := filepath.Join(podmanCfg.ContainerWorkspace, e.config.TrainScript) + scriptPath := filepath.Join(podmanCfg.ContainerWorkspace, e.config.Entrypoint) manifestName, err := SelectDependencyManifest(filepath.Join(env.OutputDir, "code")) if err != nil { diff --git a/internal/worker/lifecycle/runloop.go b/internal/worker/lifecycle/runloop.go index c9e0546..1133a9c 100644 --- a/internal/worker/lifecycle/runloop.go +++ b/internal/worker/lifecycle/runloop.go @@ -205,7 +205,7 @@ func (r *RunLoop) executeTask(task *queue.Task) { } } - taskCtx, cancel := context.WithTimeout(r.ctx, 24*time.Hour) + taskCtx, cancel := context.WithTimeout(r.ctx, task.RemainingTime) defer cancel() if err := r.executor.Execute(taskCtx, task); err != nil { diff --git a/internal/worker/worker.go b/internal/worker/worker.go index 382b62b..0a8c820 100644 --- a/internal/worker/worker.go +++ b/internal/worker/worker.go @@ -6,6 +6,7 @@ import ( "encoding/json" "fmt" "log/slog" + "math/rand" "net/http" "os" "path/filepath" @@ -65,7 +66,17 @@ func (w *Worker) Start() { // heartbeatLoop sends periodic heartbeats with slot status to scheduler func (w *Worker) heartbeatLoop() { - ticker := time.NewTicker(10 * time.Second) + // Use configured interval or default to 10s + intervalSecs := w.Config.Scheduler.HeartbeatIntervalSecs + if intervalSecs == 0 { + intervalSecs = 10 + } + + // Add jitter (0-5s) to prevent thundering herd + jitter := time.Duration(rand.Intn(5)) * time.Second + interval := time.Duration(intervalSecs)*time.Second + jitter + + ticker := time.NewTicker(interval) defer ticker.Stop() for { diff --git a/scheduler b/scheduler deleted file mode 100755 index e866776..0000000 Binary files a/scheduler and /dev/null differ