fetch_ml/tests/unit/scheduler/capability_routing_test.go
Jeremie Fraeys 57787e1e7b
feat(scheduler): implement capability-based routing and hub v2
Add comprehensive capability routing system to scheduler hub:
- Capability-aware worker matching with requirement/offer negotiation
- Hub v2 protocol with structured message types and heartbeat management
- Worker capability advertisement and dynamic routing decisions
- Orphan recovery for disconnected workers with state reconciliation
- Template-based job scheduling with capability constraints

Add extensive test coverage:
- Unit tests for capability routing logic and heartbeat mechanics
- Unit tests for orphan recovery scenarios
- E2E tests for capability routing across multiple workers
- Hub capabilities integration tests
- Scheduler fixture helpers for test setup

Protocol improvements:
- Define structured protocol messages for hub-worker communication
- Add capability matching algorithm with scoring
- Implement graceful worker disconnection handling
2026-03-12 12:00:05 -04:00

527 lines
15 KiB
Go

package scheduler_test
import (
"testing"
"time"
"github.com/jfraeys/fetch_ml/internal/scheduler"
fixtures "github.com/jfraeys/fetch_ml/tests/fixtures"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
)
// TestCapabilityRouting_BackendMatching validates GPU backend compatibility
func TestCapabilityRouting_BackendMatching(t *testing.T) {
tests := []struct {
name string
workerCaps scheduler.WorkerCapabilities
jobSpec scheduler.JobSpec
wantAdmit bool
}{
{
name: "nvidia backend matches nvidia job",
workerCaps: scheduler.WorkerCapabilities{
GPUBackend: scheduler.BackendNVIDIA,
GPUCount: 4,
},
jobSpec: scheduler.JobSpec{
ID: "nvidia-match-job",
Type: scheduler.JobTypeBatch,
SlotPool: "batch",
GPUBackend: "nvidia",
GPUCount: 2,
},
wantAdmit: true,
},
{
name: "metal backend matches metal job",
workerCaps: scheduler.WorkerCapabilities{
GPUBackend: scheduler.BackendMetal,
GPUCount: 2,
},
jobSpec: scheduler.JobSpec{
ID: "metal-match-job",
Type: scheduler.JobTypeBatch,
GPUBackend: "metal",
GPUCount: 1,
},
wantAdmit: true,
},
{
name: "nvidia worker rejects metal job",
workerCaps: scheduler.WorkerCapabilities{
GPUBackend: scheduler.BackendNVIDIA,
GPUCount: 4,
},
jobSpec: scheduler.JobSpec{
ID: "metal-reject-job",
Type: scheduler.JobTypeBatch,
GPUBackend: "metal",
GPUCount: 1,
},
wantAdmit: false,
},
{
name: "any backend accepted when job has no preference",
workerCaps: scheduler.WorkerCapabilities{
GPUBackend: scheduler.BackendVulkan,
GPUCount: 2,
},
jobSpec: scheduler.JobSpec{
ID: "any-backend-job",
Type: scheduler.JobTypeBatch,
GPUBackend: "",
GPUCount: 1,
},
wantAdmit: true,
},
{
name: "cpu worker accepts cpu job",
workerCaps: scheduler.WorkerCapabilities{
GPUBackend: scheduler.BackendCPU,
GPUCount: 0,
CPUCount: 8,
},
jobSpec: scheduler.JobSpec{
ID: "cpu-job",
Type: scheduler.JobTypeBatch,
GPUBackend: "cpu",
GPUCount: 0,
},
wantAdmit: true,
},
{
name: "cpu worker rejects gpu job",
workerCaps: scheduler.WorkerCapabilities{
GPUBackend: scheduler.BackendCPU,
GPUCount: 0,
},
jobSpec: scheduler.JobSpec{
ID: "gpu-reject-job",
Type: scheduler.JobTypeBatch,
GPUBackend: "nvidia",
GPUCount: 1,
},
wantAdmit: false,
},
}
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
fixture := fixtures.NewSchedulerTestFixture(t, fixtures.DefaultHubConfig())
defer fixture.Cleanup()
worker := fixture.CreateWorker("backend-test-worker", tt.workerCaps)
// Submit job first
fixture.SubmitJob(tt.jobSpec)
// Signal ready to trigger job assignment
worker.SignalReady(scheduler.SlotStatus{BatchTotal: 4, BatchInUse: 0}, "polling")
msg := worker.RecvTimeout(2 * time.Second)
gotAdmit := msg.Type == scheduler.MsgJobAssign
if gotAdmit != tt.wantAdmit {
t.Errorf("backend matching: got admit=%v, want=%v", gotAdmit, tt.wantAdmit)
}
})
}
}
// TestCapabilityRouting_VRAMRequirements validates VRAM filtering
func TestCapabilityRouting_VRAMRequirements(t *testing.T) {
tests := []struct {
name string
workerCaps scheduler.WorkerCapabilities
jobSpec scheduler.JobSpec
wantAdmit bool
}{
{
name: "sufficient VRAM - job needs 16GB, worker has 32GB",
workerCaps: scheduler.WorkerCapabilities{
GPUBackend: scheduler.BackendNVIDIA,
GPUCount: 2,
VRAMGB: 32.0,
},
jobSpec: scheduler.JobSpec{
ID: "vram-sufficient-job",
Type: scheduler.JobTypeBatch,
SlotPool: "batch",
MinVRAMGB: 16.0,
GPUCount: 1,
},
wantAdmit: true,
},
{
name: "insufficient VRAM - job needs 16GB, worker has 8GB",
workerCaps: scheduler.WorkerCapabilities{
GPUBackend: scheduler.BackendNVIDIA,
GPUCount: 2,
VRAMGB: 8.0,
},
jobSpec: scheduler.JobSpec{
ID: "vram-insufficient-job",
Type: scheduler.JobTypeBatch,
SlotPool: "batch",
MinVRAMGB: 16.0,
GPUCount: 1,
},
wantAdmit: false,
},
{
name: "no VRAM requirement - any VRAM accepted",
workerCaps: scheduler.WorkerCapabilities{
GPUBackend: scheduler.BackendNVIDIA,
GPUCount: 2,
VRAMGB: 4.0,
},
jobSpec: scheduler.JobSpec{
ID: "no-vram-job",
Type: scheduler.JobTypeBatch,
SlotPool: "batch",
MinVRAMGB: 0,
GPUCount: 1,
},
wantAdmit: true,
},
{
name: "multi-GPU VRAM - job needs 48GB, worker has 48GB total (2x24GB)",
workerCaps: scheduler.WorkerCapabilities{
GPUBackend: scheduler.BackendNVIDIA,
GPUCount: 2,
VRAMGB: 48.0, // Total VRAM across all GPUs
},
jobSpec: scheduler.JobSpec{
ID: "multi-vram-job",
Type: scheduler.JobTypeBatch,
SlotPool: "batch",
MinVRAMGB: 48.0,
GPUCount: 2,
},
wantAdmit: true,
},
}
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
fixture := fixtures.NewSchedulerTestFixture(t, fixtures.DefaultHubConfig())
defer fixture.Cleanup()
worker := fixture.CreateWorker("vram-test-worker", tt.workerCaps)
// Submit job first, then signal ready to trigger assignment
fixture.SubmitJob(tt.jobSpec)
worker.SignalReady(scheduler.SlotStatus{BatchTotal: 4, BatchInUse: 0}, "polling")
msg := worker.RecvTimeout(2 * time.Second)
gotAdmit := msg.Type == scheduler.MsgJobAssign
if gotAdmit != tt.wantAdmit {
t.Errorf("VRAM filtering: got admit=%v, want=%v", gotAdmit, tt.wantAdmit)
}
})
}
}
// TestCapabilityRouting_CPURequirements validates CPU core filtering
func TestCapabilityRouting_CPURequirements(t *testing.T) {
tests := []struct {
name string
workerCaps scheduler.WorkerCapabilities
jobSpec scheduler.JobSpec
wantAdmit bool
}{
{
name: "sufficient CPU cores - job needs 8, worker has 16",
workerCaps: scheduler.WorkerCapabilities{
GPUBackend: scheduler.BackendCPU,
CPUCount: 16,
},
jobSpec: scheduler.JobSpec{
ID: "cpu-sufficient-job",
Type: scheduler.JobTypeBatch,
SlotPool: "batch",
MinCPUCores: 8,
GPUCount: 0,
},
wantAdmit: true,
},
{
name: "insufficient CPU cores - job needs 8, worker has 4",
workerCaps: scheduler.WorkerCapabilities{
GPUBackend: scheduler.BackendCPU,
CPUCount: 4,
},
jobSpec: scheduler.JobSpec{
ID: "cpu-insufficient-job",
Type: scheduler.JobTypeBatch,
SlotPool: "batch",
MinCPUCores: 8,
GPUCount: 0,
},
wantAdmit: false,
},
{
name: "no CPU requirement - any CPU count accepted",
workerCaps: scheduler.WorkerCapabilities{
GPUBackend: scheduler.BackendCPU,
CPUCount: 2,
},
jobSpec: scheduler.JobSpec{
ID: "no-cpu-req-job",
Type: scheduler.JobTypeBatch,
SlotPool: "batch",
MinCPUCores: 0,
GPUCount: 0,
},
wantAdmit: true,
},
}
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
fixture := fixtures.NewSchedulerTestFixture(t, fixtures.DefaultHubConfig())
defer fixture.Cleanup()
worker := fixture.CreateWorker("cpu-test-worker", tt.workerCaps)
// Submit job first, then signal ready to trigger assignment
fixture.SubmitJob(tt.jobSpec)
worker.SignalReady(scheduler.SlotStatus{BatchTotal: 4, BatchInUse: 0}, "polling")
msg := worker.RecvTimeout(2 * time.Second)
gotAdmit := msg.Type == scheduler.MsgJobAssign
if gotAdmit != tt.wantAdmit {
t.Errorf("CPU filtering: got admit=%v, want=%v", gotAdmit, tt.wantAdmit)
}
})
}
}
// TestCapabilityRouting_MultiGPUPlacement validates multi-GPU job placement
func TestCapabilityRouting_MultiGPUPlacement(t *testing.T) {
tests := []struct {
name string
workerCaps scheduler.WorkerCapabilities
jobGPUs int
wantAdmit bool
}{
{
name: "job needs 4 GPUs, worker has 8 - admitted",
workerCaps: scheduler.WorkerCapabilities{
GPUBackend: scheduler.BackendNVIDIA,
GPUCount: 8,
},
jobGPUs: 4,
wantAdmit: true,
},
{
name: "job needs 4 GPUs, worker has 2 - rejected",
workerCaps: scheduler.WorkerCapabilities{
GPUBackend: scheduler.BackendNVIDIA,
GPUCount: 2,
},
jobGPUs: 4,
wantAdmit: false,
},
{
name: "job needs 4 GPUs, worker has exactly 4 - admitted",
workerCaps: scheduler.WorkerCapabilities{
GPUBackend: scheduler.BackendNVIDIA,
GPUCount: 4,
},
jobGPUs: 4,
wantAdmit: true,
},
{
name: "job needs 0 GPUs (CPU), worker has GPUs - admitted",
workerCaps: scheduler.WorkerCapabilities{
GPUBackend: scheduler.BackendNVIDIA,
GPUCount: 4,
CPUCount: 8,
},
jobGPUs: 0,
wantAdmit: true,
},
}
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
fixture := fixtures.NewSchedulerTestFixture(t, fixtures.DefaultHubConfig())
defer fixture.Cleanup()
worker := fixture.CreateWorker("multi-gpu-test-worker", tt.workerCaps)
// Submit job first, then signal ready to trigger assignment
fixture.SubmitJob(scheduler.JobSpec{
ID: "multi-gpu-job",
Type: scheduler.JobTypeBatch,
SlotPool: "batch",
GPUCount: tt.jobGPUs,
})
worker.SignalReady(scheduler.SlotStatus{BatchTotal: 4, BatchInUse: 0}, "polling")
msg := worker.RecvTimeout(2 * time.Second)
gotAdmit := msg.Type == scheduler.MsgJobAssign
if gotAdmit != tt.wantAdmit {
t.Errorf("multi-GPU placement: got admit=%v, want=%v", gotAdmit, tt.wantAdmit)
}
})
}
}
// TestCapabilityRouting_ReservedGPUAccounting validates reservation system
func TestCapabilityRouting_ReservedGPUAccounting(t *testing.T) {
fixture := fixtures.NewSchedulerTestFixture(t, fixtures.DefaultHubConfig())
defer fixture.Cleanup()
// Create worker with 8 GPUs
worker := fixture.CreateWorker("reservation-test-worker", scheduler.WorkerCapabilities{
GPUBackend: scheduler.BackendNVIDIA,
GPUCount: 8,
})
// Submit first job
fixture.SubmitJob(scheduler.JobSpec{
ID: "job-1",
Type: scheduler.JobTypeBatch,
SlotPool: "batch",
GPUCount: 4,
})
// Signal ready to trigger assignment
worker.SignalReady(scheduler.SlotStatus{BatchTotal: 4, BatchInUse: 0}, "polling")
msg1 := worker.RecvTimeout(2 * time.Second)
require.Equal(t, scheduler.MsgJobAssign, msg1.Type, "first job should be assigned")
// Accept first job to reserve its GPUs
worker.AcceptJob("job-1")
// Submit second job
fixture.SubmitJob(scheduler.JobSpec{
ID: "job-2",
Type: scheduler.JobTypeBatch,
SlotPool: "batch",
GPUCount: 4,
})
// Signal ready again to trigger second job assignment
worker.SignalReady(scheduler.SlotStatus{BatchTotal: 4, BatchInUse: 1}, "polling")
// Worker still has 4 GPUs available (8 total - 4 reserved = 4 available)
// Job needs 4, so it should be assigned
msg2 := worker.RecvTimeout(2 * time.Second)
assert.Equal(t, scheduler.MsgJobAssign, msg2.Type, "second job should be assigned - 4 GPUs still available")
}
// TestCapabilityRouting_JobTierPriority validates job tier interactions with capabilities
func TestCapabilityRouting_JobTierPriority(t *testing.T) {
fixture := fixtures.NewSchedulerTestFixture(t, fixtures.DefaultHubConfig())
defer fixture.Cleanup()
// Create workers with different capabilities
gpuWorker := fixture.CreateWorker("tier-gpu-worker", scheduler.WorkerCapabilities{
GPUBackend: scheduler.BackendNVIDIA,
GPUCount: 4,
})
cpuWorker := fixture.CreateWorker("tier-cpu-worker", scheduler.WorkerCapabilities{
GPUBackend: scheduler.BackendCPU,
CPUCount: 8,
})
// Submit training job (high priority tier, needs GPU)
fixture.SubmitJob(scheduler.JobSpec{
ID: "training-job",
Type: scheduler.JobTypeBatch,
SlotPool: "batch",
JobTier: scheduler.TierTraining,
GPUCount: 2,
})
// Submit data processing job (lower priority tier, CPU only)
fixture.SubmitJob(scheduler.JobSpec{
ID: "data-job",
Type: scheduler.JobTypeBatch,
SlotPool: "batch",
JobTier: scheduler.TierDataProcessing,
GPUCount: 0,
})
// Signal both workers ready to trigger job assignment
gpuWorker.SignalReady(scheduler.SlotStatus{BatchTotal: 4, BatchInUse: 0}, "polling")
cpuWorker.SignalReady(scheduler.SlotStatus{BatchTotal: 4, BatchInUse: 0}, "polling")
// GPU worker should get training job (it requires GPUs)
msg1 := gpuWorker.RecvTimeout(2 * time.Second)
require.Equal(t, scheduler.MsgJobAssign, msg1.Type, "GPU worker should get training job")
// CPU worker should get data job
msg2 := cpuWorker.RecvTimeout(2 * time.Second)
require.Equal(t, scheduler.MsgJobAssign, msg2.Type, "CPU worker should get data job")
}
// TestCapabilityRouting_MixedCapabilitiesRace validates race-free capability matching
func TestCapabilityRouting_MixedCapabilitiesRace(t *testing.T) {
// This test verifies that when multiple workers with different capabilities
// are ready, jobs are routed to the correct workers based on requirements
fixture := fixtures.NewSchedulerTestFixture(t, fixtures.DefaultHubConfig())
defer fixture.Cleanup()
// Create workers with different GPU counts
worker2GPU := fixture.CreateWorker("race-2gpu", scheduler.WorkerCapabilities{
GPUBackend: scheduler.BackendNVIDIA,
GPUCount: 2,
})
worker8GPU := fixture.CreateWorker("race-8gpu", scheduler.WorkerCapabilities{
GPUBackend: scheduler.BackendNVIDIA,
GPUCount: 8,
})
// Both signal ready
worker2GPU.SignalReady(scheduler.SlotStatus{BatchTotal: 4, BatchInUse: 0}, "polling")
worker8GPU.SignalReady(scheduler.SlotStatus{BatchTotal: 4, BatchInUse: 0}, "polling")
// Submit job needing 4 GPUs
fixture.SubmitJob(scheduler.JobSpec{
ID: "race-job-4gpu",
Type: scheduler.JobTypeBatch,
SlotPool: "batch",
GPUCount: 4,
})
// Signal ready after job submission to trigger assignment
worker2GPU.SignalReady(scheduler.SlotStatus{BatchTotal: 4, BatchInUse: 0}, "polling")
worker8GPU.SignalReady(scheduler.SlotStatus{BatchTotal: 4, BatchInUse: 0}, "polling")
// Should go to 8GPU worker (2GPU can't handle it)
var assignedWorker *fixtures.MockWorker
deadline := time.After(2 * time.Second)
checkTimeout := time.After(100 * time.Millisecond)
for assignedWorker == nil {
select {
case msg := <-worker2GPU.RecvCh:
if msg.Type == scheduler.MsgJobAssign {
assignedWorker = worker2GPU
}
case msg := <-worker8GPU.RecvCh:
if msg.Type == scheduler.MsgJobAssign {
assignedWorker = worker8GPU
}
case <-checkTimeout:
// No assignment yet, signal ready again to trigger
worker2GPU.SignalReady(scheduler.SlotStatus{BatchTotal: 4, BatchInUse: 0}, "polling")
worker8GPU.SignalReady(scheduler.SlotStatus{BatchTotal: 4, BatchInUse: 0}, "polling")
checkTimeout = time.After(100 * time.Millisecond)
case <-deadline:
t.Fatal("timeout waiting for job assignment")
}
}
assert.Equal(t, worker8GPU, assignedWorker, "4-GPU job should go to 8-GPU worker")
}