- Update task domain model - Improve scheduler hub and priority queue - Enhance protocol definitions - Update manifest schema and run handling
183 lines
4 KiB
Go
183 lines
4 KiB
Go
package scheduler
|
|
|
|
import (
|
|
"container/heap"
|
|
"sync"
|
|
"time"
|
|
)
|
|
|
|
// Task represents a job in the priority queue
|
|
type Task struct {
|
|
ID string
|
|
Priority int
|
|
SubmittedAt time.Time
|
|
Spec JobSpec
|
|
Status string
|
|
WorkerID string
|
|
Metadata map[string]string // Additional task metadata (snapshot SHA, etc.)
|
|
index int // for heap interface
|
|
|
|
// FirstAssignedAt is set once when the task is first assigned to a worker.
|
|
// It never changes, even on re-queue after worker failure.
|
|
FirstAssignedAt time.Time
|
|
|
|
// MaxRuntime is the cached computed value from JobSpec.MaxRuntimeHours.
|
|
// 0 means use default (24h), capped at 168h (7d).
|
|
MaxRuntime time.Duration
|
|
}
|
|
|
|
// EffectivePriority returns the priority with aging applied
|
|
func (t *Task) EffectivePriority(agingRate float64, now time.Time) float64 {
|
|
age := now.Sub(t.SubmittedAt).Minutes()
|
|
return float64(t.Priority) + age*agingRate
|
|
}
|
|
|
|
// taskHeap is the internal heap implementation
|
|
type taskHeap struct {
|
|
items []*Task
|
|
agingRate float64
|
|
}
|
|
|
|
func (h taskHeap) Len() int { return len(h.items) }
|
|
|
|
func (h taskHeap) Less(i, j int) bool {
|
|
// Higher priority first, then older first on ties
|
|
now := time.Now()
|
|
pi := h.items[i].EffectivePriority(h.agingRate, now)
|
|
pj := h.items[j].EffectivePriority(h.agingRate, now)
|
|
if pi != pj {
|
|
return pi > pj
|
|
}
|
|
return h.items[i].SubmittedAt.Before(h.items[j].SubmittedAt)
|
|
}
|
|
|
|
func (h taskHeap) Swap(i, j int) {
|
|
h.items[i], h.items[j] = h.items[j], h.items[i]
|
|
h.items[i].index = i
|
|
h.items[j].index = j
|
|
}
|
|
|
|
func (h *taskHeap) Push(x any) {
|
|
n := len(h.items)
|
|
task := x.(*Task)
|
|
task.index = n
|
|
h.items = append(h.items, task)
|
|
}
|
|
|
|
func (h *taskHeap) Pop() any {
|
|
old := h.items
|
|
n := len(old)
|
|
task := old[n-1]
|
|
old[n-1] = nil // avoid memory leak
|
|
task.index = -1
|
|
h.items = old[:n-1]
|
|
return task
|
|
}
|
|
|
|
// PriorityQueue implements a thread-safe priority queue for tasks
|
|
type PriorityQueue struct {
|
|
heap *taskHeap
|
|
mu sync.RWMutex
|
|
byID map[string]*Task
|
|
agingRate float64
|
|
}
|
|
|
|
// NewPriorityQueue creates a new priority queue
|
|
func NewPriorityQueue(agingRate float64) *PriorityQueue {
|
|
if agingRate == 0 {
|
|
agingRate = 0.1 // default: 0.1 per minute
|
|
}
|
|
return &PriorityQueue{
|
|
heap: &taskHeap{
|
|
items: make([]*Task, 0),
|
|
agingRate: agingRate,
|
|
},
|
|
byID: make(map[string]*Task),
|
|
agingRate: agingRate,
|
|
}
|
|
}
|
|
|
|
// Len returns the number of items in the queue
|
|
func (pq *PriorityQueue) Len() int {
|
|
pq.mu.RLock()
|
|
defer pq.mu.RUnlock()
|
|
return len(pq.heap.items)
|
|
}
|
|
|
|
// Add adds a task to the queue
|
|
func (pq *PriorityQueue) Add(task *Task) {
|
|
pq.mu.Lock()
|
|
defer pq.mu.Unlock()
|
|
|
|
if _, exists := pq.byID[task.ID]; exists {
|
|
return // already in queue
|
|
}
|
|
|
|
pq.byID[task.ID] = task
|
|
heap.Push(pq.heap, task)
|
|
}
|
|
|
|
// Take removes and returns the highest priority task
|
|
func (pq *PriorityQueue) Take() *Task {
|
|
pq.mu.Lock()
|
|
defer pq.mu.Unlock()
|
|
|
|
if len(pq.heap.items) == 0 {
|
|
return nil
|
|
}
|
|
|
|
task := heap.Pop(pq.heap).(*Task)
|
|
delete(pq.byID, task.ID)
|
|
return task
|
|
}
|
|
|
|
// Peek returns the highest priority task without removing it
|
|
func (pq *PriorityQueue) Peek() *Task {
|
|
pq.mu.RLock()
|
|
defer pq.mu.RUnlock()
|
|
|
|
if len(pq.heap.items) == 0 {
|
|
return nil
|
|
}
|
|
return pq.heap.items[0]
|
|
}
|
|
|
|
// Items returns a copy of all items in priority order
|
|
func (pq *PriorityQueue) Items() []*Task {
|
|
pq.mu.RLock()
|
|
defer pq.mu.RUnlock()
|
|
|
|
result := make([]*Task, len(pq.heap.items))
|
|
copy(result, pq.heap.items)
|
|
return result
|
|
}
|
|
|
|
// Get returns a task by ID
|
|
func (pq *PriorityQueue) Get(taskID string) *Task {
|
|
pq.mu.RLock()
|
|
defer pq.mu.RUnlock()
|
|
return pq.byID[taskID]
|
|
}
|
|
|
|
// Remove removes a task from the queue
|
|
func (pq *PriorityQueue) Remove(taskID string) bool {
|
|
pq.mu.Lock()
|
|
defer pq.mu.Unlock()
|
|
|
|
task, exists := pq.byID[taskID]
|
|
if !exists {
|
|
return false
|
|
}
|
|
|
|
heap.Remove(pq.heap, task.index)
|
|
delete(pq.byID, task.ID)
|
|
return true
|
|
}
|
|
|
|
// Contains checks if a task is in the queue
|
|
func (pq *PriorityQueue) Contains(taskID string) bool {
|
|
pq.mu.RLock()
|
|
defer pq.mu.RUnlock()
|
|
_, exists := pq.byID[taskID]
|
|
return exists
|
|
}
|