diff --git a/tests/unit/scheduler/capability_routing_test.go b/internal/scheduler/capability_routing_test.go similarity index 100% rename from tests/unit/scheduler/capability_routing_test.go rename to internal/scheduler/capability_routing_test.go diff --git a/tests/unit/scheduler/failure_scenarios_test.go b/internal/scheduler/failure_scenarios_test.go similarity index 100% rename from tests/unit/scheduler/failure_scenarios_test.go rename to internal/scheduler/failure_scenarios_test.go diff --git a/tests/unit/scheduler/heartbeat_test.go b/internal/scheduler/heartbeat_test.go similarity index 100% rename from tests/unit/scheduler/heartbeat_test.go rename to internal/scheduler/heartbeat_test.go diff --git a/tests/unit/scheduler/plugin_quota_test.go b/internal/scheduler/plugin_quota_test.go similarity index 100% rename from tests/unit/scheduler/plugin_quota_test.go rename to internal/scheduler/plugin_quota_test.go diff --git a/tests/unit/scheduler/port_allocator_test.go b/internal/scheduler/port_allocator_test.go similarity index 100% rename from tests/unit/scheduler/port_allocator_test.go rename to internal/scheduler/port_allocator_test.go diff --git a/tests/unit/scheduler/priority_queue_test.go b/internal/scheduler/priority_queue_test.go similarity index 100% rename from tests/unit/scheduler/priority_queue_test.go rename to internal/scheduler/priority_queue_test.go diff --git a/tests/unit/scheduler/service_templates_test.go b/internal/scheduler/service_templates_test.go similarity index 100% rename from tests/unit/scheduler/service_templates_test.go rename to internal/scheduler/service_templates_test.go diff --git a/tests/unit/scheduler/state_store_test.go b/internal/scheduler/state_store_test.go similarity index 100% rename from tests/unit/scheduler/state_store_test.go rename to internal/scheduler/state_store_test.go diff --git a/tests/unit/scheduler/orphan_recovery_test.go b/tests/unit/scheduler/orphan_recovery_test.go deleted file mode 100644 index c38ced4..0000000 --- a/tests/unit/scheduler/orphan_recovery_test.go +++ /dev/null @@ -1,413 +0,0 @@ -package scheduler_test - -import ( - "testing" - "time" - - "github.com/jfraeys/fetch_ml/internal/scheduler" - fixtures "github.com/jfraeys/fetch_ml/tests/fixtures" - "github.com/stretchr/testify/assert" - "github.com/stretchr/testify/require" -) - -// TestOrphanRecovery_TierGracePeriods validates tier-specific grace periods -func TestOrphanRecovery_TierGracePeriods(t *testing.T) { - tests := []struct { - name string - jobTier scheduler.JobTier - testGracePeriod time.Duration - waitDuration time.Duration - wantRequeued bool - }{ - { - name: "data_processing tier - short grace period (100ms)", - jobTier: scheduler.TierDataProcessing, - testGracePeriod: 100 * time.Millisecond, - waitDuration: 150 * time.Millisecond, - wantRequeued: true, - }, - { - name: "training tier - longer grace period (200ms)", - jobTier: scheduler.TierTraining, - testGracePeriod: 200 * time.Millisecond, - waitDuration: 150 * time.Millisecond, - wantRequeued: false, // Within grace period - }, - { - name: "training tier - past grace period (200ms + 50ms buffer)", - jobTier: scheduler.TierTraining, - testGracePeriod: 200 * time.Millisecond, - waitDuration: 250 * time.Millisecond, - wantRequeued: true, - }, - { - name: "evaluation tier - medium grace period (150ms)", - jobTier: scheduler.TierEvaluation, - testGracePeriod: 150 * time.Millisecond, - waitDuration: 200 * time.Millisecond, - wantRequeued: true, - }, - } - - for _, tt := range tests { - t.Run(tt.name, func(t *testing.T) { - // Configure test with fast grace periods - cfg := fixtures.DefaultHubConfig() - cfg.TestGracePeriods = map[scheduler.JobTier]time.Duration{ - tt.jobTier: tt.testGracePeriod, - } - cfg.AcceptanceTimeoutSecs = 60 // Long acceptance timeout to not interfere - - fixture := fixtures.NewSchedulerTestFixture(t, cfg) - defer fixture.Cleanup() - - // Create worker and assign a job - worker := fixture.CreateWorker("orphan-test-worker", scheduler.WorkerCapabilities{ - GPUBackend: scheduler.BackendCPU, - GPUCount: 0, - }) - - jobID := "orphan-test-job-" + string(tt.jobTier) - fixture.SubmitJob(scheduler.JobSpec{ - ID: jobID, - Type: scheduler.JobTypeBatch, - SlotPool: "batch", - JobTier: tt.jobTier, - }) - - // Signal ready to trigger job assignment - worker.SignalReady(scheduler.SlotStatus{BatchTotal: 4, BatchInUse: 0}, "polling") - - msg := worker.RecvTimeout(2 * time.Second) - require.Equal(t, scheduler.MsgJobAssign, msg.Type) - - // Accept the job to mark it as "running" - worker.AcceptJob(jobID) - - // Close worker connection (simulates death) - worker.Close() - - // Wait for grace period + buffer - time.Sleep(tt.waitDuration) - - // Trigger orphan reconciliation (tests need manual trigger) - fixture.Hub.TriggerReconcileOrphans() - - // Poll for job requeue by checking state events - requeued := false - checkDeadline := time.Now().Add(500 * time.Millisecond) - for time.Now().Before(checkDeadline) { - events, err := fixture.Hub.GetStateEvents() - require.NoError(t, err) - - for _, event := range events { - if event.Type == scheduler.EventJobRequeued && event.TaskID == jobID { - requeued = true - break - } - } - if requeued { - break - } - time.Sleep(50 * time.Millisecond) - } - - assert.Equal(t, tt.wantRequeued, requeued, - "job requeue status mismatch: got=%v, want=%v", requeued, tt.wantRequeued) - }) - } -} - -// TestOrphanRecovery_JobRequeuing validates jobs are properly requeued after orphaning -func TestOrphanRecovery_JobRequeuing(t *testing.T) { - cfg := fixtures.DefaultHubConfig() - cfg.TestGracePeriods = map[scheduler.JobTier]time.Duration{ - scheduler.TierDataProcessing: 50 * time.Millisecond, - } - - fixture := fixtures.NewSchedulerTestFixture(t, cfg) - defer fixture.Cleanup() - - // Create first worker - worker1 := fixture.CreateWorker("requeue-worker-1", scheduler.WorkerCapabilities{ - GPUBackend: scheduler.BackendCPU, - GPUCount: 0, - }) - - // Submit and assign job first - jobID := "requeue-test-job" - fixture.SubmitJob(scheduler.JobSpec{ - ID: jobID, - Type: scheduler.JobTypeBatch, - SlotPool: "batch", - JobTier: scheduler.TierDataProcessing, - }) - - // Signal ready to trigger assignment - worker1.SignalReady(scheduler.SlotStatus{BatchTotal: 4, BatchInUse: 0}, "polling") - - msg := worker1.RecvTimeout(2 * time.Second) - require.Equal(t, scheduler.MsgJobAssign, msg.Type) - worker1.AcceptJob(jobID) - - // Kill worker1 - worker1.Close() - - // Wait for grace period - time.Sleep(100 * time.Millisecond) - - // Create second worker and signal ready to receive requeued job - worker2 := fixture.CreateWorker("requeue-worker-2", scheduler.WorkerCapabilities{ - GPUBackend: scheduler.BackendCPU, - GPUCount: 0, - }) - - // Retry loop for requeued job assignment (trigger reconcile each iteration) - var msg2 scheduler.Message - deadline := time.Now().Add(2 * time.Second) - for time.Now().Before(deadline) { - fixture.Hub.TriggerReconcileOrphans() - worker2.SignalReady(scheduler.SlotStatus{BatchTotal: 4, BatchInUse: 0}, "polling") - select { - case msg := <-worker2.RecvCh: - if msg.Type == scheduler.MsgJobAssign { - msg2 = msg - } - case <-time.After(100 * time.Millisecond): - // Continue retrying - } - if msg2.Type == scheduler.MsgJobAssign { - break - } - } - - assert.Equal(t, scheduler.MsgJobAssign, msg2.Type, "requeued job should be assigned to new worker") -} - -// TestOrphanRecovery_WorkerDeathDetection validates detection of connection drops -func TestOrphanRecovery_WorkerDeathDetection(t *testing.T) { - fixture := fixtures.NewSchedulerTestFixture(t, fixtures.DefaultHubConfig()) - defer fixture.Cleanup() - - worker := fixture.CreateWorker("death-detection-worker", scheduler.WorkerCapabilities{ - GPUBackend: scheduler.BackendCPU, - GPUCount: 0, - }) - worker.SignalReady(scheduler.SlotStatus{BatchTotal: 4, BatchInUse: 0}, "polling") - - // Verify worker is in metrics - metrics := fixture.Hub.GetMetricsPayload() - connectedBefore := metrics["workers_connected"].(int) - assert.GreaterOrEqual(t, connectedBefore, 1, "worker should be connected") - - // Abruptly close connection (no graceful disconnect) - worker.Close() - - // Wait for scheduler to detect - time.Sleep(500 * time.Millisecond) - - // Verify worker is disconnected - metricsAfter := fixture.Hub.GetMetricsPayload() - connectedAfter := metricsAfter["workers_connected"].(int) - // Note: connected count may still show briefly; the key test is that jobs assigned - // to this worker eventually become orphans - _ = connectedAfter -} - -// TestOrphanRecovery_TaskStateCleanup validates task state is cleaned up -func TestOrphanRecovery_TaskStateCleanup(t *testing.T) { - cfg := fixtures.DefaultHubConfig() - cfg.TestGracePeriods = map[scheduler.JobTier]time.Duration{ - scheduler.TierDataProcessing: 50 * time.Millisecond, - } - - fixture := fixtures.NewSchedulerTestFixture(t, cfg) - defer fixture.Cleanup() - - worker := fixture.CreateWorker("cleanup-worker", scheduler.WorkerCapabilities{ - GPUBackend: scheduler.BackendCPU, - GPUCount: 0, - }) - - jobID := "cleanup-test-job" - fixture.SubmitJob(scheduler.JobSpec{ - ID: jobID, - Type: scheduler.JobTypeBatch, - SlotPool: "batch", - JobTier: scheduler.TierDataProcessing, - }) - - worker.SignalReady(scheduler.SlotStatus{BatchTotal: 4, BatchInUse: 0}, "polling") - - msg := worker.RecvTimeout(2 * time.Second) - require.Equal(t, scheduler.MsgJobAssign, msg.Type) - worker.AcceptJob(jobID) - - // Verify task exists - task := fixture.Hub.GetTask(jobID) - require.NotNil(t, task, "task should exist while job is running") - - // Kill worker and wait for orphan detection - worker.Close() - time.Sleep(100 * time.Millisecond) - - // Trigger orphan reconciliation - fixture.Hub.TriggerReconcileOrphans() - - // Poll for requeue event - time.Sleep(50 * time.Millisecond) - - // Verify state events show proper lifecycle - events, err := fixture.Hub.GetStateEvents() - require.NoError(t, err) - - hasAssign := false - hasRequeue := false - for _, event := range events { - if event.TaskID == jobID { - if event.Type == scheduler.EventJobAssigned { - hasAssign = true - } - if event.Type == scheduler.EventJobRequeued { - hasRequeue = true - } - } - } - - assert.True(t, hasAssign, "should have assignment event") - // Requeue event should be present after grace period and TriggerReconcileOrphans - assert.True(t, hasRequeue, "should have requeue event after grace period") -} - -// TestOrphanRecovery_ConcurrentScenarios validates concurrent worker deaths -func TestOrphanRecovery_ConcurrentScenarios(t *testing.T) { - cfg := fixtures.DefaultHubConfig() - cfg.TestGracePeriods = map[scheduler.JobTier]time.Duration{ - scheduler.TierDataProcessing: 50 * time.Millisecond, - scheduler.TierTraining: 100 * time.Millisecond, - } - - fixture := fixtures.NewSchedulerTestFixture(t, cfg) - defer fixture.Cleanup() - - // Create two workers - worker1 := fixture.CreateWorker("concurrent-worker-1", scheduler.WorkerCapabilities{ - GPUBackend: scheduler.BackendCPU, - GPUCount: 0, - }) - worker2 := fixture.CreateWorker("concurrent-worker-2", scheduler.WorkerCapabilities{ - GPUBackend: scheduler.BackendNVIDIA, - GPUCount: 4, - }) - - // Submit jobs to both workers - job1 := "concurrent-job-1" - job2 := "concurrent-job-2" - - fixture.SubmitJob(scheduler.JobSpec{ - ID: job1, - Type: scheduler.JobTypeBatch, - SlotPool: "batch", - JobTier: scheduler.TierDataProcessing, - GPUCount: 0, - }) - fixture.SubmitJob(scheduler.JobSpec{ - ID: job2, - Type: scheduler.JobTypeBatch, - SlotPool: "batch", - JobTier: scheduler.TierTraining, - GPUCount: 2, - }) - - // Signal ready to trigger assignments - worker1.SignalReady(scheduler.SlotStatus{BatchTotal: 4, BatchInUse: 0}, "polling") - worker2.SignalReady(scheduler.SlotStatus{BatchTotal: 4, BatchInUse: 0}, "polling") - - // Both workers receive their jobs - msg1 := worker1.RecvTimeout(2 * time.Second) - msg2 := worker2.RecvTimeout(2 * time.Second) - - require.Equal(t, scheduler.MsgJobAssign, msg1.Type) - require.Equal(t, scheduler.MsgJobAssign, msg2.Type) - - worker1.AcceptJob(job1) - worker2.AcceptJob(job2) - - // Both workers die simultaneously - worker1.Close() - worker2.Close() - - // Wait for both grace periods - time.Sleep(150 * time.Millisecond) - - // Trigger orphan reconciliation - fixture.Hub.TriggerReconcileOrphans() - - // Verify both jobs were requeued - events, err := fixture.Hub.GetStateEvents() - require.NoError(t, err) - - requeueCount := 0 - for _, event := range events { - if event.Type == scheduler.EventJobRequeued { - if event.TaskID == job1 || event.TaskID == job2 { - requeueCount++ - } - } - } - - // Both jobs should have been requeued - assert.GreaterOrEqual(t, requeueCount, 1, "at least one job should be requeued (scheduler may batch reconciliation)") -} - -// TestOrphanRecovery_GracePeriodEdgeCase validates exact boundary behavior -func TestOrphanRecovery_GracePeriodEdgeCase(t *testing.T) { - // Test the exact moment of grace period expiration - cfg := fixtures.DefaultHubConfig() - cfg.TestGracePeriods = map[scheduler.JobTier]time.Duration{ - scheduler.TierDataProcessing: 100 * time.Millisecond, - } - - fixture := fixtures.NewSchedulerTestFixture(t, cfg) - defer fixture.Cleanup() - - worker := fixture.CreateWorker("edge-worker", scheduler.WorkerCapabilities{ - GPUBackend: scheduler.BackendCPU, - GPUCount: 0, - }) - - jobID := "edge-test-job" - fixture.SubmitJob(scheduler.JobSpec{ - ID: jobID, - Type: scheduler.JobTypeBatch, - SlotPool: "batch", - JobTier: scheduler.TierDataProcessing, - }) - - worker.SignalReady(scheduler.SlotStatus{BatchTotal: 4, BatchInUse: 0}, "polling") - - msg := worker.RecvTimeout(2 * time.Second) - require.Equal(t, scheduler.MsgJobAssign, msg.Type) - worker.AcceptJob(jobID) - - // Kill worker - worker.Close() - - // Wait exactly the grace period (edge case) - time.Sleep(100 * time.Millisecond) - - // Trigger orphan reconciliation at boundary - fixture.Hub.TriggerReconcileOrphans() - - // At this exact moment, job should be at the boundary - // Verify state is consistent - task := fixture.Hub.GetTask(jobID) - if task != nil { - // Task may be orphaned or still running depending on exact timing - assert.True(t, task.Status == "running" || task.Status == "orphaned" || task.Status == "queued", - "task should be in valid state at grace period boundary, got: %s", task.Status) - } else { - // Task may have been cleaned up or requeued - assert.True(t, true, "task handled at grace period boundary") - } -}