fix(scheduler): resolve TestEndToEndJobLifecycle race and getTask bug
## Problem TestEndToEndJobLifecycle was failing with two issues: 1. Race condition: Workers signaled ready before job was processed, receiving MsgNoWork instead of MsgJobAssign 2. getTask() didn't check pendingAcceptance - assigned-but-not-yet-accepted tasks returned nil ## Changes ### Test Fix (restart_recovery_test.go) - Replace single-shot select with retry loop that re-signals workers as ready - Handle both assignment and non-assignment messages correctly - Add 10ms delay between non-assignment messages to allow job processing - Use 2-second deadline with 100ms timeout intervals ### Scheduler Fix (hub.go) - Extend getTask() to check pendingAcceptance map after batch/service queues - Allows GetTask() to find tasks in 'assigned' state before acceptance - Maintains backward compatibility with existing queue/running lookups ## Testing make test now passes: 475 passed, 0 failed, 34 skipped
This commit is contained in:
parent
8ee98eaf7f
commit
ba9a358412
2 changed files with 34 additions and 12 deletions
|
|
@ -729,7 +729,14 @@ func (h *SchedulerHub) getTask(taskID string) *Task {
|
|||
if t != nil {
|
||||
return t
|
||||
}
|
||||
return h.runningTasks[taskID]
|
||||
if t, ok := h.runningTasks[taskID]; ok {
|
||||
return t
|
||||
}
|
||||
// Check pending acceptance (assigned but not yet accepted)
|
||||
if assignment, ok := h.pendingAcceptance[taskID]; ok {
|
||||
return assignment.Task
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
func (h *SchedulerHub) restoreJob(ev StateEvent) {
|
||||
|
|
|
|||
|
|
@ -233,19 +233,34 @@ func TestEndToEndJobLifecycle(t *testing.T) {
|
|||
worker1.SignalReady(scheduler.SlotStatus{BatchTotal: 4, BatchInUse: 0}, "polling")
|
||||
worker2.SignalReady(scheduler.SlotStatus{BatchTotal: 4, BatchInUse: 0}, "polling")
|
||||
|
||||
// One worker should receive the job
|
||||
// One worker should receive the job (with retry for race condition)
|
||||
var assignedWorker *fixtures.MockWorker
|
||||
select {
|
||||
case msg := <-worker1.RecvCh:
|
||||
if msg.Type == scheduler.MsgJobAssign {
|
||||
assignedWorker = worker1
|
||||
deadline := time.Now().Add(2 * time.Second)
|
||||
for time.Now().Before(deadline) && assignedWorker == nil {
|
||||
select {
|
||||
case msg, ok := <-worker1.RecvCh:
|
||||
if !ok {
|
||||
continue
|
||||
}
|
||||
if msg.Type == scheduler.MsgJobAssign {
|
||||
assignedWorker = worker1
|
||||
} else {
|
||||
time.Sleep(10 * time.Millisecond)
|
||||
}
|
||||
case msg, ok := <-worker2.RecvCh:
|
||||
if !ok {
|
||||
continue
|
||||
}
|
||||
if msg.Type == scheduler.MsgJobAssign {
|
||||
assignedWorker = worker2
|
||||
} else {
|
||||
time.Sleep(10 * time.Millisecond)
|
||||
}
|
||||
case <-time.After(100 * time.Millisecond):
|
||||
// Timeout: signal ready again to trigger job assignment
|
||||
worker1.SignalReady(scheduler.SlotStatus{BatchTotal: 4, BatchInUse: 0}, "polling")
|
||||
worker2.SignalReady(scheduler.SlotStatus{BatchTotal: 4, BatchInUse: 0}, "polling")
|
||||
}
|
||||
case msg := <-worker2.RecvCh:
|
||||
if msg.Type == scheduler.MsgJobAssign {
|
||||
assignedWorker = worker2
|
||||
}
|
||||
case <-time.After(2 * time.Second):
|
||||
t.Fatal("timeout waiting for job assignment")
|
||||
}
|
||||
|
||||
require.NotNil(t, assignedWorker, "one worker should receive the job")
|
||||
|
|
|
|||
Loading…
Reference in a new issue