diff --git a/internal/scheduler/hub.go b/internal/scheduler/hub.go index 36ad5d3..6b88183 100644 --- a/internal/scheduler/hub.go +++ b/internal/scheduler/hub.go @@ -729,7 +729,14 @@ func (h *SchedulerHub) getTask(taskID string) *Task { if t != nil { return t } - return h.runningTasks[taskID] + if t, ok := h.runningTasks[taskID]; ok { + return t + } + // Check pending acceptance (assigned but not yet accepted) + if assignment, ok := h.pendingAcceptance[taskID]; ok { + return assignment.Task + } + return nil } func (h *SchedulerHub) restoreJob(ev StateEvent) { diff --git a/tests/e2e/scheduler/restart_recovery_test.go b/tests/e2e/scheduler/restart_recovery_test.go index 327b55c..4fbdad7 100644 --- a/tests/e2e/scheduler/restart_recovery_test.go +++ b/tests/e2e/scheduler/restart_recovery_test.go @@ -233,19 +233,34 @@ func TestEndToEndJobLifecycle(t *testing.T) { worker1.SignalReady(scheduler.SlotStatus{BatchTotal: 4, BatchInUse: 0}, "polling") worker2.SignalReady(scheduler.SlotStatus{BatchTotal: 4, BatchInUse: 0}, "polling") - // One worker should receive the job + // One worker should receive the job (with retry for race condition) var assignedWorker *fixtures.MockWorker - select { - case msg := <-worker1.RecvCh: - if msg.Type == scheduler.MsgJobAssign { - assignedWorker = worker1 + deadline := time.Now().Add(2 * time.Second) + for time.Now().Before(deadline) && assignedWorker == nil { + select { + case msg, ok := <-worker1.RecvCh: + if !ok { + continue + } + if msg.Type == scheduler.MsgJobAssign { + assignedWorker = worker1 + } else { + time.Sleep(10 * time.Millisecond) + } + case msg, ok := <-worker2.RecvCh: + if !ok { + continue + } + if msg.Type == scheduler.MsgJobAssign { + assignedWorker = worker2 + } else { + time.Sleep(10 * time.Millisecond) + } + case <-time.After(100 * time.Millisecond): + // Timeout: signal ready again to trigger job assignment + worker1.SignalReady(scheduler.SlotStatus{BatchTotal: 4, BatchInUse: 0}, "polling") + worker2.SignalReady(scheduler.SlotStatus{BatchTotal: 4, BatchInUse: 0}, "polling") } - case msg := <-worker2.RecvCh: - if msg.Type == scheduler.MsgJobAssign { - assignedWorker = worker2 - } - case <-time.After(2 * time.Second): - t.Fatal("timeout waiting for job assignment") } require.NotNil(t, assignedWorker, "one worker should receive the job")