feat: enhance task domain and scheduler protocol

- Update task domain model
- Improve scheduler hub and priority queue
- Enhance protocol definitions
- Update manifest schema and run handling
This commit is contained in:
Jeremie Fraeys 2026-03-04 13:23:38 -05:00
parent 1f495dfbb7
commit a4f2c36069
No known key found for this signature in database
6 changed files with 85 additions and 3 deletions

View file

@ -38,6 +38,18 @@ type Task struct {
RetryCount int `json:"retry_count"`
MaxRetries int `json:"max_retries"`
Priority int64 `json:"priority"`
// FirstAssignedAt is set once when the task is first assigned to a worker.
// It never changes, even on re-queue after worker failure.
FirstAssignedAt time.Time `json:"first_assigned_at,omitempty"`
// MaxRuntime is the cached computed value from JobSpec.MaxRuntimeHours.
// 0 means use default (24h), capped at 168h (7d).
MaxRuntime time.Duration `json:"max_runtime,omitempty"`
// RemainingTime is the wall-clock budget left when assigned to a worker.
// Set by the scheduler on assignment.
RemainingTime time.Duration `json:"remaining_time,omitempty"`
}
// Attempt represents a single execution attempt of a task

View file

@ -160,7 +160,7 @@ type RunManifest struct {
ExperimentManifestSHA string `json:"experiment_manifest_sha,omitempty"`
DepsManifestName string `json:"deps_manifest_name,omitempty"`
DepsManifestSHA string `json:"deps_manifest_sha,omitempty"`
TrainScriptPath string `json:"train_script_path,omitempty"`
EntrypointPath string `json:"entrypoint,omitempty"`
WorkerVersion string `json:"worker_version,omitempty"`
RunID string `json:"run_id"`
ImageDigest string `json:"image_digest,omitempty"`

View file

@ -236,7 +236,7 @@
"deps_manifest_sha": {
"type": "string"
},
"train_script_path": {
"entrypoint": {
"type": "string"
},
"worker_version": {

View file

@ -447,11 +447,57 @@ func (h *SchedulerHub) canAdmit(candidate *Task, worker *WorkerConn) bool {
return worker.capabilities.GPUCount >= candidate.Spec.GPUCount
}
// canRequeue checks if a task can be re-queued based on wall-clock elapsed time.
// Returns false if the task has exceeded its MaxRuntime budget.
func (h *SchedulerHub) canRequeue(task *Task) bool {
if task.FirstAssignedAt.IsZero() {
return true // Never assigned, can always re-queue
}
elapsed := time.Since(task.FirstAssignedAt)
maxRuntime := task.MaxRuntime
if maxRuntime == 0 {
maxRuntime = 24 * time.Hour // Default 24h
}
if elapsed > maxRuntime {
// Task exceeded wall-clock budget - fail it
slog.Info("task exceeded max runtime, failing",
"task_id", task.ID,
"elapsed", elapsed,
"max_runtime", maxRuntime)
return false
}
return true
}
func (h *SchedulerHub) assignTask(task *Task, wc *WorkerConn) Message {
// Remove from queue first (prevent double-assignment)
h.batchQueue.Remove(task.ID)
h.serviceQueue.Remove(task.ID)
// Set FirstAssignedAt if this is the first assignment
if task.FirstAssignedAt.IsZero() {
task.FirstAssignedAt = time.Now()
}
// Cache MaxRuntime from spec
maxHours := task.Spec.MaxRuntimeHours
if maxHours <= 0 {
maxHours = 24 // Default 24h
}
if maxHours > 168 {
maxHours = 168 // Hard cap at 7d
}
task.MaxRuntime = time.Duration(maxHours) * time.Hour
// Calculate remaining time budget
elapsed := time.Since(task.FirstAssignedAt)
remaining := task.MaxRuntime - elapsed
if remaining < 0 {
remaining = 0
}
// Track pending acceptance with task reference
h.mu.Lock()
h.pendingAcceptance[task.ID] = &JobAssignment{
@ -471,9 +517,15 @@ func (h *SchedulerHub) assignTask(task *Task, wc *WorkerConn) Message {
WorkerID: wc.workerID,
})
// Send job assignment with remaining time budget
payload := JobAssignPayload{
Spec: task.Spec,
RemainingTime: remaining,
}
return Message{
Type: MsgJobAssign,
Payload: mustMarshal(task.Spec),
Payload: mustMarshal(payload),
}
}

View file

@ -16,6 +16,14 @@ type Task struct {
WorkerID string
Metadata map[string]string // Additional task metadata (snapshot SHA, etc.)
index int // for heap interface
// FirstAssignedAt is set once when the task is first assigned to a worker.
// It never changes, even on re-queue after worker failure.
FirstAssignedAt time.Time
// MaxRuntime is the cached computed value from JobSpec.MaxRuntimeHours.
// 0 means use default (24h), capped at 168h (7d).
MaxRuntime time.Duration
}
// EffectivePriority returns the priority with aging applied

View file

@ -106,6 +106,10 @@ type JobSpec struct {
GPUType string `json:"gpu_type,omitempty"`
NodeCount int `json:"node_count"`
// MaxRuntimeHours is the maximum wall-clock time for this job.
// 0 = default (24h), capped at 168h (7d) by the scheduler.
MaxRuntimeHours int `json:"max_runtime_hours,omitempty"`
Command []string `json:"command"`
Env map[string]string `json:"env"`
@ -136,3 +140,9 @@ type ServiceHealthPayload struct {
Healthy bool `json:"healthy"`
Message string `json:"message,omitempty"`
}
// JobAssignPayload is sent from scheduler to worker when assigning a task.
type JobAssignPayload struct {
Spec JobSpec `json:"spec"`
RemainingTime time.Duration `json:"remaining_time"` // Wall-clock budget left
}