fetch_ml/internal/scheduler/priority_queue.go
Jeremie Fraeys 43e6446587
feat(scheduler): implement multi-tenant job scheduler with gang scheduling
Add new scheduler component for distributed ML workload orchestration:
- Hub-based coordination for multi-worker clusters
- Pacing controller for rate limiting job submissions
- Priority queue with preemption support
- Port allocator for dynamic service discovery
- Protocol handlers for worker-scheduler communication
- Service manager with OS-specific implementations
- Connection management and state persistence
- Template system for service deployment

Includes comprehensive test suite:
- Unit tests for all core components
- Integration tests for distributed scenarios
- Benchmark tests for performance validation
- Mock fixtures for isolated testing

Refs: scheduler-architecture.md
2026-02-26 12:03:23 -05:00

175 lines
3.7 KiB
Go

package scheduler
import (
"container/heap"
"sync"
"time"
)
// Task represents a job in the priority queue
type Task struct {
ID string
Priority int
SubmittedAt time.Time
Spec JobSpec
Status string
WorkerID string
Metadata map[string]string // Additional task metadata (snapshot SHA, etc.)
index int // for heap interface
}
// EffectivePriority returns the priority with aging applied
func (t *Task) EffectivePriority(agingRate float64, now time.Time) float64 {
age := now.Sub(t.SubmittedAt).Minutes()
return float64(t.Priority) + age*agingRate
}
// taskHeap is the internal heap implementation
type taskHeap struct {
items []*Task
agingRate float64
}
func (h taskHeap) Len() int { return len(h.items) }
func (h taskHeap) Less(i, j int) bool {
// Higher priority first, then older first on ties
now := time.Now()
pi := h.items[i].EffectivePriority(h.agingRate, now)
pj := h.items[j].EffectivePriority(h.agingRate, now)
if pi != pj {
return pi > pj
}
return h.items[i].SubmittedAt.Before(h.items[j].SubmittedAt)
}
func (h taskHeap) Swap(i, j int) {
h.items[i], h.items[j] = h.items[j], h.items[i]
h.items[i].index = i
h.items[j].index = j
}
func (h *taskHeap) Push(x any) {
n := len(h.items)
task := x.(*Task)
task.index = n
h.items = append(h.items, task)
}
func (h *taskHeap) Pop() any {
old := h.items
n := len(old)
task := old[n-1]
old[n-1] = nil // avoid memory leak
task.index = -1
h.items = old[:n-1]
return task
}
// PriorityQueue implements a thread-safe priority queue for tasks
type PriorityQueue struct {
heap *taskHeap
mu sync.RWMutex
byID map[string]*Task
agingRate float64
}
// NewPriorityQueue creates a new priority queue
func NewPriorityQueue(agingRate float64) *PriorityQueue {
if agingRate == 0 {
agingRate = 0.1 // default: 0.1 per minute
}
return &PriorityQueue{
heap: &taskHeap{
items: make([]*Task, 0),
agingRate: agingRate,
},
byID: make(map[string]*Task),
agingRate: agingRate,
}
}
// Len returns the number of items in the queue
func (pq *PriorityQueue) Len() int {
pq.mu.RLock()
defer pq.mu.RUnlock()
return len(pq.heap.items)
}
// Add adds a task to the queue
func (pq *PriorityQueue) Add(task *Task) {
pq.mu.Lock()
defer pq.mu.Unlock()
if _, exists := pq.byID[task.ID]; exists {
return // already in queue
}
pq.byID[task.ID] = task
heap.Push(pq.heap, task)
}
// Take removes and returns the highest priority task
func (pq *PriorityQueue) Take() *Task {
pq.mu.Lock()
defer pq.mu.Unlock()
if len(pq.heap.items) == 0 {
return nil
}
task := heap.Pop(pq.heap).(*Task)
delete(pq.byID, task.ID)
return task
}
// Peek returns the highest priority task without removing it
func (pq *PriorityQueue) Peek() *Task {
pq.mu.RLock()
defer pq.mu.RUnlock()
if len(pq.heap.items) == 0 {
return nil
}
return pq.heap.items[0]
}
// Items returns a copy of all items in priority order
func (pq *PriorityQueue) Items() []*Task {
pq.mu.RLock()
defer pq.mu.RUnlock()
result := make([]*Task, len(pq.heap.items))
copy(result, pq.heap.items)
return result
}
// Get returns a task by ID
func (pq *PriorityQueue) Get(taskID string) *Task {
pq.mu.RLock()
defer pq.mu.RUnlock()
return pq.byID[taskID]
}
// Remove removes a task from the queue
func (pq *PriorityQueue) Remove(taskID string) bool {
pq.mu.Lock()
defer pq.mu.Unlock()
task, exists := pq.byID[taskID]
if !exists {
return false
}
heap.Remove(pq.heap, task.index)
delete(pq.byID, task.ID)
return true
}
// Contains checks if a task is in the queue
func (pq *PriorityQueue) Contains(taskID string) bool {
pq.mu.RLock()
defer pq.mu.RUnlock()
_, exists := pq.byID[taskID]
return exists
}