package resources import ( "context" "errors" "fmt" "math" "sort" "strconv" "strings" "sync" "sync/atomic" "time" "github.com/jfraeys/fetch_ml/internal/queue" ) type Manager struct { cond *sync.Cond gpuFree []int totalCPU int freeCPU int slotsPerGPU int acquireTotal atomic.Int64 acquireWaitTotal atomic.Int64 acquireTimeoutTotal atomic.Int64 acquireWaitNanos atomic.Int64 mu sync.Mutex } type Snapshot struct { GPUFree []int TotalCPU int FreeCPU int SlotsPerGPU int AcquireTotal int64 AcquireWaitTotal int64 AcquireTimeoutTotal int64 AcquireWaitSeconds float64 } func FormatCUDAVisibleDevices(lease *Lease) string { if lease == nil { return "-1" } if len(lease.gpus) == 0 { return "-1" } idx := make([]int, 0, len(lease.gpus)) for _, g := range lease.gpus { idx = append(idx, g.Index) } sort.Ints(idx) parts := make([]string, 0, len(idx)) for _, i := range idx { parts = append(parts, strconv.Itoa(i)) } return strings.Join(parts, ",") } type GPUAllocation struct { Index int Slots int } type Lease struct { m *Manager gpus []GPUAllocation cpu int } func (l *Lease) CPU() int { return l.cpu } func (l *Lease) GPUs() []GPUAllocation { out := make([]GPUAllocation, len(l.gpus)) copy(out, l.gpus) return out } func (l *Lease) Release() { if l == nil || l.m == nil { return } m := l.m m.mu.Lock() defer m.mu.Unlock() if l.cpu > 0 { m.freeCPU += l.cpu if m.freeCPU > m.totalCPU { m.freeCPU = m.totalCPU } } for _, g := range l.gpus { if g.Index >= 0 && g.Index < len(m.gpuFree) { m.gpuFree[g.Index] += g.Slots if m.gpuFree[g.Index] > m.slotsPerGPU { m.gpuFree[g.Index] = m.slotsPerGPU } } } m.cond.Broadcast() } type Options struct { TotalCPU int GPUCount int SlotsPerGPU int } func NewManager(opts Options) (*Manager, error) { if opts.TotalCPU < 0 { return nil, fmt.Errorf("total cpu must be >= 0") } if opts.GPUCount < 0 { return nil, fmt.Errorf("gpu count must be >= 0") } if opts.SlotsPerGPU <= 0 { opts.SlotsPerGPU = 1 } m := &Manager{ totalCPU: opts.TotalCPU, freeCPU: opts.TotalCPU, slotsPerGPU: opts.SlotsPerGPU, gpuFree: make([]int, opts.GPUCount), } for i := range m.gpuFree { m.gpuFree[i] = m.slotsPerGPU } m.cond = sync.NewCond(&m.mu) return m, nil } func (m *Manager) Snapshot() Snapshot { if m == nil { return Snapshot{} } m.mu.Lock() gpuFree := make([]int, len(m.gpuFree)) copy(gpuFree, m.gpuFree) totalCPU := m.totalCPU freeCPU := m.freeCPU slotsPerGPU := m.slotsPerGPU m.mu.Unlock() waitNanos := m.acquireWaitNanos.Load() return Snapshot{ TotalCPU: totalCPU, FreeCPU: freeCPU, SlotsPerGPU: slotsPerGPU, GPUFree: gpuFree, AcquireTotal: m.acquireTotal.Load(), AcquireWaitTotal: m.acquireWaitTotal.Load(), AcquireTimeoutTotal: m.acquireTimeoutTotal.Load(), AcquireWaitSeconds: float64(waitNanos) / float64(time.Second), } } func (m *Manager) Acquire(ctx context.Context, task *queue.Task) (*Lease, error) { if m == nil { return nil, fmt.Errorf("resource manager is nil") } if task == nil { return nil, fmt.Errorf("task is nil") } if ctx == nil { return nil, fmt.Errorf("context is nil") } m.acquireTotal.Add(1) start := time.Now() waited := false reqCPU := task.CPU if reqCPU < 0 { return nil, fmt.Errorf("cpu request must be >= 0") } if reqCPU > m.totalCPU { return nil, fmt.Errorf("cpu request %d exceeds total cpu %d", reqCPU, m.totalCPU) } reqGPU := task.GPU if reqGPU < 0 { return nil, fmt.Errorf("gpu request must be >= 0") } if reqGPU > len(m.gpuFree) { return nil, fmt.Errorf("gpu request %d exceeds available gpus %d", reqGPU, len(m.gpuFree)) } slotsPerTaskGPU, err := m.gpuSlotsForTask(task.GPUMemory) if err != nil { return nil, err } m.mu.Lock() defer m.mu.Unlock() for { if ctx.Err() != nil { if errors.Is(ctx.Err(), context.DeadlineExceeded) { m.acquireTimeoutTotal.Add(1) } return nil, ctx.Err() } gpuAlloc, ok := m.tryAllocateGPUsLocked(reqGPU, slotsPerTaskGPU) if ok && (reqCPU == 0 || m.freeCPU >= reqCPU) { if reqCPU > 0 { m.freeCPU -= reqCPU } for _, g := range gpuAlloc { m.gpuFree[g.Index] -= g.Slots } if waited { m.acquireWaitTotal.Add(1) m.acquireWaitNanos.Add(time.Since(start).Nanoseconds()) } return &Lease{cpu: reqCPU, gpus: gpuAlloc, m: m}, nil } waited = true done := make(chan struct{}) go func() { select { case <-ctx.Done(): m.mu.Lock() m.cond.Broadcast() m.mu.Unlock() case <-done: } }() m.cond.Wait() close(done) } } func (m *Manager) gpuSlotsForTask(gpuMem string) (int, error) { if m.slotsPerGPU <= 0 { return 1, nil } if strings.TrimSpace(gpuMem) == "" { return m.slotsPerGPU, nil } if frac, ok := parseFraction(strings.TrimSpace(gpuMem)); ok { if frac <= 0 { return 1, nil } if frac > 1 { frac = 1 } slots := int(math.Ceil(frac * float64(m.slotsPerGPU))) if slots < 1 { slots = 1 } if slots > m.slotsPerGPU { slots = m.slotsPerGPU } return slots, nil } return m.slotsPerGPU, nil } func (m *Manager) tryAllocateGPUsLocked(reqGPU int, slotsPerTaskGPU int) ([]GPUAllocation, bool) { if reqGPU == 0 { return nil, true } if slotsPerTaskGPU <= 0 { slotsPerTaskGPU = m.slotsPerGPU } alloc := make([]GPUAllocation, 0, reqGPU) used := make(map[int]struct{}, reqGPU) for len(alloc) < reqGPU { bestIdx := -1 bestFree := -1 for i := 0; i < len(m.gpuFree); i++ { if _, ok := used[i]; ok { continue } free := m.gpuFree[i] if free >= slotsPerTaskGPU && free > bestFree { bestFree = free bestIdx = i } } if bestIdx < 0 { return nil, false } used[bestIdx] = struct{}{} alloc = append(alloc, GPUAllocation{Index: bestIdx, Slots: slotsPerTaskGPU}) } return alloc, true } func parseFraction(s string) (float64, bool) { if s == "" { return 0, false } if strings.HasSuffix(s, "%") { v := strings.TrimSuffix(s, "%") f, err := strconv.ParseFloat(strings.TrimSpace(v), 64) if err != nil { return 0, false } return f / 100.0, true } f, err := strconv.ParseFloat(s, 64) if err != nil { return 0, false } if f > 1 { return 0, false } return f, true }