From ba9a3584126e008c90670caa287b24e42dd7155a Mon Sep 17 00:00:00 2001 From: Jeremie Fraeys Date: Thu, 5 Mar 2026 14:40:43 -0500 Subject: [PATCH] fix(scheduler): resolve TestEndToEndJobLifecycle race and getTask bug ## Problem TestEndToEndJobLifecycle was failing with two issues: 1. Race condition: Workers signaled ready before job was processed, receiving MsgNoWork instead of MsgJobAssign 2. getTask() didn't check pendingAcceptance - assigned-but-not-yet-accepted tasks returned nil ## Changes ### Test Fix (restart_recovery_test.go) - Replace single-shot select with retry loop that re-signals workers as ready - Handle both assignment and non-assignment messages correctly - Add 10ms delay between non-assignment messages to allow job processing - Use 2-second deadline with 100ms timeout intervals ### Scheduler Fix (hub.go) - Extend getTask() to check pendingAcceptance map after batch/service queues - Allows GetTask() to find tasks in 'assigned' state before acceptance - Maintains backward compatibility with existing queue/running lookups ## Testing make test now passes: 475 passed, 0 failed, 34 skipped --- internal/scheduler/hub.go | 9 ++++- tests/e2e/scheduler/restart_recovery_test.go | 37 ++++++++++++++------ 2 files changed, 34 insertions(+), 12 deletions(-) diff --git a/internal/scheduler/hub.go b/internal/scheduler/hub.go index 36ad5d3..6b88183 100644 --- a/internal/scheduler/hub.go +++ b/internal/scheduler/hub.go @@ -729,7 +729,14 @@ func (h *SchedulerHub) getTask(taskID string) *Task { if t != nil { return t } - return h.runningTasks[taskID] + if t, ok := h.runningTasks[taskID]; ok { + return t + } + // Check pending acceptance (assigned but not yet accepted) + if assignment, ok := h.pendingAcceptance[taskID]; ok { + return assignment.Task + } + return nil } func (h *SchedulerHub) restoreJob(ev StateEvent) { diff --git a/tests/e2e/scheduler/restart_recovery_test.go b/tests/e2e/scheduler/restart_recovery_test.go index 327b55c..4fbdad7 100644 --- a/tests/e2e/scheduler/restart_recovery_test.go +++ b/tests/e2e/scheduler/restart_recovery_test.go @@ -233,19 +233,34 @@ func TestEndToEndJobLifecycle(t *testing.T) { worker1.SignalReady(scheduler.SlotStatus{BatchTotal: 4, BatchInUse: 0}, "polling") worker2.SignalReady(scheduler.SlotStatus{BatchTotal: 4, BatchInUse: 0}, "polling") - // One worker should receive the job + // One worker should receive the job (with retry for race condition) var assignedWorker *fixtures.MockWorker - select { - case msg := <-worker1.RecvCh: - if msg.Type == scheduler.MsgJobAssign { - assignedWorker = worker1 + deadline := time.Now().Add(2 * time.Second) + for time.Now().Before(deadline) && assignedWorker == nil { + select { + case msg, ok := <-worker1.RecvCh: + if !ok { + continue + } + if msg.Type == scheduler.MsgJobAssign { + assignedWorker = worker1 + } else { + time.Sleep(10 * time.Millisecond) + } + case msg, ok := <-worker2.RecvCh: + if !ok { + continue + } + if msg.Type == scheduler.MsgJobAssign { + assignedWorker = worker2 + } else { + time.Sleep(10 * time.Millisecond) + } + case <-time.After(100 * time.Millisecond): + // Timeout: signal ready again to trigger job assignment + worker1.SignalReady(scheduler.SlotStatus{BatchTotal: 4, BatchInUse: 0}, "polling") + worker2.SignalReady(scheduler.SlotStatus{BatchTotal: 4, BatchInUse: 0}, "polling") } - case msg := <-worker2.RecvCh: - if msg.Type == scheduler.MsgJobAssign { - assignedWorker = worker2 - } - case <-time.After(2 * time.Second): - t.Fatal("timeout waiting for job assignment") } require.NotNil(t, assignedWorker, "one worker should receive the job")