diff --git a/internal/domain/task.go b/internal/domain/task.go index 798560c..6395eb8 100644 --- a/internal/domain/task.go +++ b/internal/domain/task.go @@ -38,6 +38,18 @@ type Task struct { RetryCount int `json:"retry_count"` MaxRetries int `json:"max_retries"` Priority int64 `json:"priority"` + + // FirstAssignedAt is set once when the task is first assigned to a worker. + // It never changes, even on re-queue after worker failure. + FirstAssignedAt time.Time `json:"first_assigned_at,omitempty"` + + // MaxRuntime is the cached computed value from JobSpec.MaxRuntimeHours. + // 0 means use default (24h), capped at 168h (7d). + MaxRuntime time.Duration `json:"max_runtime,omitempty"` + + // RemainingTime is the wall-clock budget left when assigned to a worker. + // Set by the scheduler on assignment. + RemainingTime time.Duration `json:"remaining_time,omitempty"` } // Attempt represents a single execution attempt of a task diff --git a/internal/manifest/run_manifest.go b/internal/manifest/run_manifest.go index 0b8583f..fdbfa9b 100644 --- a/internal/manifest/run_manifest.go +++ b/internal/manifest/run_manifest.go @@ -160,7 +160,7 @@ type RunManifest struct { ExperimentManifestSHA string `json:"experiment_manifest_sha,omitempty"` DepsManifestName string `json:"deps_manifest_name,omitempty"` DepsManifestSHA string `json:"deps_manifest_sha,omitempty"` - TrainScriptPath string `json:"train_script_path,omitempty"` + EntrypointPath string `json:"entrypoint,omitempty"` WorkerVersion string `json:"worker_version,omitempty"` RunID string `json:"run_id"` ImageDigest string `json:"image_digest,omitempty"` diff --git a/internal/manifest/schema.json b/internal/manifest/schema.json index 8415ebe..839a9f3 100644 --- a/internal/manifest/schema.json +++ b/internal/manifest/schema.json @@ -236,7 +236,7 @@ "deps_manifest_sha": { "type": "string" }, - "train_script_path": { + "entrypoint": { "type": "string" }, "worker_version": { diff --git a/internal/scheduler/hub.go b/internal/scheduler/hub.go index cc77ac6..0fb0498 100644 --- a/internal/scheduler/hub.go +++ b/internal/scheduler/hub.go @@ -447,11 +447,57 @@ func (h *SchedulerHub) canAdmit(candidate *Task, worker *WorkerConn) bool { return worker.capabilities.GPUCount >= candidate.Spec.GPUCount } +// canRequeue checks if a task can be re-queued based on wall-clock elapsed time. +// Returns false if the task has exceeded its MaxRuntime budget. +func (h *SchedulerHub) canRequeue(task *Task) bool { + if task.FirstAssignedAt.IsZero() { + return true // Never assigned, can always re-queue + } + + elapsed := time.Since(task.FirstAssignedAt) + maxRuntime := task.MaxRuntime + if maxRuntime == 0 { + maxRuntime = 24 * time.Hour // Default 24h + } + + if elapsed > maxRuntime { + // Task exceeded wall-clock budget - fail it + slog.Info("task exceeded max runtime, failing", + "task_id", task.ID, + "elapsed", elapsed, + "max_runtime", maxRuntime) + return false + } + return true +} + func (h *SchedulerHub) assignTask(task *Task, wc *WorkerConn) Message { // Remove from queue first (prevent double-assignment) h.batchQueue.Remove(task.ID) h.serviceQueue.Remove(task.ID) + // Set FirstAssignedAt if this is the first assignment + if task.FirstAssignedAt.IsZero() { + task.FirstAssignedAt = time.Now() + } + + // Cache MaxRuntime from spec + maxHours := task.Spec.MaxRuntimeHours + if maxHours <= 0 { + maxHours = 24 // Default 24h + } + if maxHours > 168 { + maxHours = 168 // Hard cap at 7d + } + task.MaxRuntime = time.Duration(maxHours) * time.Hour + + // Calculate remaining time budget + elapsed := time.Since(task.FirstAssignedAt) + remaining := task.MaxRuntime - elapsed + if remaining < 0 { + remaining = 0 + } + // Track pending acceptance with task reference h.mu.Lock() h.pendingAcceptance[task.ID] = &JobAssignment{ @@ -471,9 +517,15 @@ func (h *SchedulerHub) assignTask(task *Task, wc *WorkerConn) Message { WorkerID: wc.workerID, }) + // Send job assignment with remaining time budget + payload := JobAssignPayload{ + Spec: task.Spec, + RemainingTime: remaining, + } + return Message{ Type: MsgJobAssign, - Payload: mustMarshal(task.Spec), + Payload: mustMarshal(payload), } } diff --git a/internal/scheduler/priority_queue.go b/internal/scheduler/priority_queue.go index 4fcfd48..4fd35d8 100644 --- a/internal/scheduler/priority_queue.go +++ b/internal/scheduler/priority_queue.go @@ -16,6 +16,14 @@ type Task struct { WorkerID string Metadata map[string]string // Additional task metadata (snapshot SHA, etc.) index int // for heap interface + + // FirstAssignedAt is set once when the task is first assigned to a worker. + // It never changes, even on re-queue after worker failure. + FirstAssignedAt time.Time + + // MaxRuntime is the cached computed value from JobSpec.MaxRuntimeHours. + // 0 means use default (24h), capped at 168h (7d). + MaxRuntime time.Duration } // EffectivePriority returns the priority with aging applied diff --git a/internal/scheduler/protocol.go b/internal/scheduler/protocol.go index efa7c18..aa517c7 100644 --- a/internal/scheduler/protocol.go +++ b/internal/scheduler/protocol.go @@ -106,6 +106,10 @@ type JobSpec struct { GPUType string `json:"gpu_type,omitempty"` NodeCount int `json:"node_count"` + // MaxRuntimeHours is the maximum wall-clock time for this job. + // 0 = default (24h), capped at 168h (7d) by the scheduler. + MaxRuntimeHours int `json:"max_runtime_hours,omitempty"` + Command []string `json:"command"` Env map[string]string `json:"env"` @@ -136,3 +140,9 @@ type ServiceHealthPayload struct { Healthy bool `json:"healthy"` Message string `json:"message,omitempty"` } + +// JobAssignPayload is sent from scheduler to worker when assigning a task. +type JobAssignPayload struct { + Spec JobSpec `json:"spec"` + RemainingTime time.Duration `json:"remaining_time"` // Wall-clock budget left +}