Fix bug in scheduler hub orphan reconciliation: - Move delete(h.pendingAcceptance, taskID) inside the requeue success block - Prevents premature cleanup when requeue fails Add comprehensive test infrastructure: - hub_test_helpers.go: New test helper utilities (78 lines) - Mock scheduler components for isolated testing - Test fixture setup and teardown helpers Refactor and enhance hub capabilities tests: - Significant restructuring of hub_capabilities_test.go (213 lines changed) - Improved test coverage for worker capability matching Add comprehensive orphan recovery tests: - internal/scheduler/orphan_recovery_test.go (451 lines) - Tests orphaned job detection and recovery - Covers requeue logic, timeout handling, state cleanup
342 lines
7.9 KiB
Go
342 lines
7.9 KiB
Go
package scheduler_test
|
|
|
|
import (
|
|
"testing"
|
|
"time"
|
|
|
|
"github.com/jfraeys/fetch_ml/internal/scheduler"
|
|
)
|
|
|
|
func TestCanAdmit_BackendMatching(t *testing.T) {
|
|
h := scheduler.NewTestSchedulerHub(scheduler.HubConfig{})
|
|
h.SetReservationsForTest(make(map[string]*scheduler.Reservation))
|
|
|
|
tests := []struct {
|
|
name string
|
|
workerCaps scheduler.WorkerCapabilities
|
|
jobSpec scheduler.JobSpec
|
|
want bool
|
|
}{
|
|
{
|
|
name: "backend matches nvidia",
|
|
workerCaps: scheduler.WorkerCapabilities{
|
|
GPUBackend: scheduler.BackendNVIDIA,
|
|
GPUCount: 4,
|
|
},
|
|
jobSpec: scheduler.JobSpec{
|
|
GPUBackend: "nvidia",
|
|
GPUCount: 2,
|
|
},
|
|
want: true,
|
|
},
|
|
{
|
|
name: "backend matches metal",
|
|
workerCaps: scheduler.WorkerCapabilities{
|
|
GPUBackend: scheduler.BackendMetal,
|
|
GPUCount: 2,
|
|
},
|
|
jobSpec: scheduler.JobSpec{
|
|
GPUBackend: "metal",
|
|
GPUCount: 1,
|
|
},
|
|
want: true,
|
|
},
|
|
{
|
|
name: "backend mismatch nvidia vs metal",
|
|
workerCaps: scheduler.WorkerCapabilities{
|
|
GPUBackend: scheduler.BackendNVIDIA,
|
|
GPUCount: 4,
|
|
},
|
|
jobSpec: scheduler.JobSpec{
|
|
GPUBackend: "metal",
|
|
GPUCount: 1,
|
|
},
|
|
want: false,
|
|
},
|
|
{
|
|
name: "no backend required - any matches",
|
|
workerCaps: scheduler.WorkerCapabilities{
|
|
GPUBackend: scheduler.BackendVulkan,
|
|
GPUCount: 2,
|
|
},
|
|
jobSpec: scheduler.JobSpec{
|
|
GPUBackend: "",
|
|
GPUCount: 1,
|
|
},
|
|
want: true,
|
|
},
|
|
{
|
|
name: "cpu job on cpu worker",
|
|
workerCaps: scheduler.WorkerCapabilities{
|
|
GPUBackend: scheduler.BackendCPU,
|
|
GPUCount: 0,
|
|
CPUCount: 8,
|
|
},
|
|
jobSpec: scheduler.JobSpec{
|
|
GPUBackend: "cpu",
|
|
GPUCount: 0,
|
|
},
|
|
want: true,
|
|
},
|
|
}
|
|
|
|
for _, tt := range tests {
|
|
t.Run(tt.name, func(t *testing.T) {
|
|
wc := scheduler.NewWorkerConnForTest(tt.workerCaps, scheduler.SlotStatus{BatchTotal: 4})
|
|
task := &scheduler.Task{
|
|
ID: "test-task",
|
|
Spec: tt.jobSpec,
|
|
}
|
|
got := h.CanAdmitForTest(task, wc)
|
|
if got != tt.want {
|
|
t.Errorf("CanAdmitForTest() = %v, want %v", got, tt.want)
|
|
}
|
|
})
|
|
}
|
|
}
|
|
|
|
func TestCanAdmit_VRAMRequirements(t *testing.T) {
|
|
h := scheduler.NewTestSchedulerHub(scheduler.HubConfig{})
|
|
h.SetReservationsForTest(make(map[string]*scheduler.Reservation))
|
|
|
|
tests := []struct {
|
|
name string
|
|
workerCaps scheduler.WorkerCapabilities
|
|
jobSpec scheduler.JobSpec
|
|
want bool
|
|
}{
|
|
{
|
|
name: "sufficient VRAM",
|
|
workerCaps: scheduler.WorkerCapabilities{
|
|
GPUBackend: scheduler.BackendNVIDIA,
|
|
GPUCount: 2,
|
|
VRAMGB: 32.0,
|
|
},
|
|
jobSpec: scheduler.JobSpec{
|
|
MinVRAMGB: 16.0,
|
|
GPUCount: 1,
|
|
},
|
|
want: true,
|
|
},
|
|
{
|
|
name: "insufficient VRAM",
|
|
workerCaps: scheduler.WorkerCapabilities{
|
|
GPUBackend: scheduler.BackendNVIDIA,
|
|
GPUCount: 2,
|
|
VRAMGB: 8.0,
|
|
},
|
|
jobSpec: scheduler.JobSpec{
|
|
MinVRAMGB: 16.0,
|
|
GPUCount: 1,
|
|
},
|
|
want: false,
|
|
},
|
|
{
|
|
name: "no VRAM required",
|
|
workerCaps: scheduler.WorkerCapabilities{
|
|
GPUBackend: scheduler.BackendNVIDIA,
|
|
GPUCount: 2,
|
|
VRAMGB: 8.0,
|
|
},
|
|
jobSpec: scheduler.JobSpec{
|
|
MinVRAMGB: 0,
|
|
GPUCount: 1,
|
|
},
|
|
want: true,
|
|
},
|
|
}
|
|
|
|
for _, tt := range tests {
|
|
t.Run(tt.name, func(t *testing.T) {
|
|
wc := scheduler.NewWorkerConnForTest(tt.workerCaps, scheduler.SlotStatus{BatchTotal: 4})
|
|
task := &scheduler.Task{
|
|
ID: "test-task",
|
|
Spec: tt.jobSpec,
|
|
}
|
|
got := h.CanAdmitForTest(task, wc)
|
|
if got != tt.want {
|
|
t.Errorf("CanAdmitForTest() = %v, want %v", got, tt.want)
|
|
}
|
|
})
|
|
}
|
|
}
|
|
|
|
func TestCanAdmit_CPUCoresRequirements(t *testing.T) {
|
|
h := scheduler.NewTestSchedulerHub(scheduler.HubConfig{})
|
|
h.SetReservationsForTest(make(map[string]*scheduler.Reservation))
|
|
|
|
tests := []struct {
|
|
name string
|
|
workerCaps scheduler.WorkerCapabilities
|
|
jobSpec scheduler.JobSpec
|
|
want bool
|
|
}{
|
|
{
|
|
name: "sufficient CPU cores",
|
|
workerCaps: scheduler.WorkerCapabilities{
|
|
GPUBackend: scheduler.BackendCPU,
|
|
CPUCount: 16,
|
|
},
|
|
jobSpec: scheduler.JobSpec{
|
|
MinCPUCores: 8,
|
|
GPUCount: 0,
|
|
},
|
|
want: true,
|
|
},
|
|
{
|
|
name: "insufficient CPU cores",
|
|
workerCaps: scheduler.WorkerCapabilities{
|
|
GPUBackend: scheduler.BackendCPU,
|
|
CPUCount: 4,
|
|
},
|
|
jobSpec: scheduler.JobSpec{
|
|
MinCPUCores: 8,
|
|
GPUCount: 0,
|
|
},
|
|
want: false,
|
|
},
|
|
}
|
|
|
|
for _, tt := range tests {
|
|
t.Run(tt.name, func(t *testing.T) {
|
|
wc := scheduler.NewWorkerConnForTest(tt.workerCaps, scheduler.SlotStatus{BatchTotal: 4})
|
|
task := &scheduler.Task{
|
|
ID: "test-task",
|
|
Spec: tt.jobSpec,
|
|
}
|
|
got := h.CanAdmitForTest(task, wc)
|
|
if got != tt.want {
|
|
t.Errorf("CanAdmitForTest() = %v, want %v", got, tt.want)
|
|
}
|
|
})
|
|
}
|
|
}
|
|
|
|
func TestCanAdmit_ReservedGPUs(t *testing.T) {
|
|
h := scheduler.NewTestSchedulerHub(scheduler.HubConfig{})
|
|
h.SetReservationsForTest(map[string]*scheduler.Reservation{
|
|
"res-1": {TaskID: "task-1", GPUCount: 2},
|
|
"res-2": {TaskID: "task-2", GPUCount: 2},
|
|
})
|
|
|
|
tests := []struct {
|
|
name string
|
|
workerCaps scheduler.WorkerCapabilities
|
|
jobSpec scheduler.JobSpec
|
|
want bool
|
|
}{
|
|
{
|
|
name: "enough GPUs after reservations",
|
|
workerCaps: scheduler.WorkerCapabilities{
|
|
GPUBackend: scheduler.BackendNVIDIA,
|
|
GPUCount: 8,
|
|
},
|
|
jobSpec: scheduler.JobSpec{
|
|
GPUCount: 4,
|
|
},
|
|
want: true,
|
|
},
|
|
{
|
|
name: "not enough GPUs after reservations",
|
|
workerCaps: scheduler.WorkerCapabilities{
|
|
GPUBackend: scheduler.BackendNVIDIA,
|
|
GPUCount: 4,
|
|
},
|
|
jobSpec: scheduler.JobSpec{
|
|
GPUCount: 2,
|
|
},
|
|
want: false,
|
|
},
|
|
}
|
|
|
|
for _, tt := range tests {
|
|
t.Run(tt.name, func(t *testing.T) {
|
|
wc := scheduler.NewWorkerConnForTest(tt.workerCaps, scheduler.SlotStatus{BatchTotal: 4})
|
|
task := &scheduler.Task{
|
|
ID: "test-task",
|
|
Spec: tt.jobSpec,
|
|
}
|
|
got := h.CanAdmitForTest(task, wc)
|
|
if got != tt.want {
|
|
t.Errorf("CanAdmitForTest() = %v, want %v", got, tt.want)
|
|
}
|
|
})
|
|
}
|
|
}
|
|
|
|
func TestReconcileOrphans_TierGracePeriods(t *testing.T) {
|
|
tests := []struct {
|
|
name string
|
|
jobTier scheduler.JobTier
|
|
accepted bool
|
|
assignedAt time.Time
|
|
wantRequeued bool
|
|
}{
|
|
{
|
|
name: "data_processing tier - short grace period",
|
|
jobTier: scheduler.TierDataProcessing,
|
|
accepted: true,
|
|
assignedAt: time.Now().Add(-35 * time.Second),
|
|
wantRequeued: true,
|
|
},
|
|
{
|
|
name: "training tier - long grace period",
|
|
jobTier: scheduler.TierTraining,
|
|
accepted: true,
|
|
assignedAt: time.Now().Add(-5 * time.Minute),
|
|
wantRequeued: false,
|
|
},
|
|
{
|
|
name: "training tier - past grace period",
|
|
jobTier: scheduler.TierTraining,
|
|
accepted: true,
|
|
assignedAt: time.Now().Add(-11 * time.Minute),
|
|
wantRequeued: true,
|
|
},
|
|
{
|
|
name: "evaluation tier - 2min grace",
|
|
jobTier: scheduler.TierEvaluation,
|
|
accepted: true,
|
|
assignedAt: time.Now().Add(-3 * time.Minute),
|
|
wantRequeued: true,
|
|
},
|
|
}
|
|
|
|
for _, tt := range tests {
|
|
t.Run(tt.name, func(t *testing.T) {
|
|
h := scheduler.NewTestSchedulerHub(scheduler.HubConfig{
|
|
AcceptanceTimeoutSecs: 60,
|
|
TestGracePeriods: map[scheduler.JobTier]time.Duration{
|
|
scheduler.TierDataProcessing: 30 * time.Second,
|
|
scheduler.TierTraining: 10 * time.Minute,
|
|
scheduler.TierEvaluation: 2 * time.Minute,
|
|
},
|
|
})
|
|
|
|
task := &scheduler.Task{
|
|
ID: "test-task",
|
|
Status: "assigned",
|
|
Spec: scheduler.JobSpec{
|
|
JobTier: tt.jobTier,
|
|
},
|
|
}
|
|
|
|
h.SetPendingAcceptanceForTest("test-task", &scheduler.JobAssignment{
|
|
TaskID: "test-task",
|
|
WorkerID: "disconnected-worker",
|
|
AssignedAt: tt.assignedAt,
|
|
Accepted: tt.accepted,
|
|
Task: task,
|
|
})
|
|
|
|
h.ReconcileOrphansForTest()
|
|
|
|
_, stillPending := h.GetPendingAcceptanceForTest("test-task")
|
|
wasRequeued := !stillPending
|
|
|
|
if wasRequeued != tt.wantRequeued {
|
|
t.Errorf("ReconcileOrphansForTest() requeued=%v, want=%v", wasRequeued, tt.wantRequeued)
|
|
}
|
|
})
|
|
}
|
|
}
|