package resources_test import ( "context" "testing" "time" "github.com/jfraeys/fetch_ml/internal/queue" "github.com/jfraeys/fetch_ml/internal/resources" "github.com/stretchr/testify/require" ) func TestManager_CPUAcquireBlocksUntilRelease(t *testing.T) { m, err := resources.NewManager(resources.Options{TotalCPU: 4, GPUCount: 0, SlotsPerGPU: 1}) require.NoError(t, err) task1 := &queue.Task{CPU: 3} lease1, err := m.Acquire(context.Background(), task1) require.NoError(t, err) require.NotNil(t, lease1) ctx, cancel := context.WithTimeout(context.Background(), 50*time.Millisecond) defer cancel() _, err = m.Acquire(ctx, &queue.Task{CPU: 2}) require.Error(t, err) lease1.Release() ctx2, cancel2 := context.WithTimeout(context.Background(), time.Second) defer cancel2() lease2, err := m.Acquire(ctx2, &queue.Task{CPU: 2}) require.NoError(t, err) require.NotNil(t, lease2) lease2.Release() } func TestManager_GPUSlotsAllowSharing(t *testing.T) { m, err := resources.NewManager(resources.Options{TotalCPU: 0, GPUCount: 1, SlotsPerGPU: 4}) require.NoError(t, err) leases := make([]*resources.Lease, 0, 4) for i := 0; i < 4; i++ { l, err := m.Acquire(context.Background(), &queue.Task{GPU: 1, GPUMemory: "0.25"}) require.NoError(t, err) leases = append(leases, l) } ctx, cancel := context.WithTimeout(context.Background(), 50*time.Millisecond) defer cancel() _, err = m.Acquire(ctx, &queue.Task{GPU: 1, GPUMemory: "0.25"}) require.Error(t, err) for _, l := range leases { l.Release() } } func TestManager_MultiGPUExclusiveAllocation(t *testing.T) { m, err := resources.NewManager(resources.Options{TotalCPU: 0, GPUCount: 2, SlotsPerGPU: 1}) require.NoError(t, err) lease, err := m.Acquire(context.Background(), &queue.Task{GPU: 2}) require.NoError(t, err) require.Len(t, lease.GPUs(), 2) ctx, cancel := context.WithTimeout(context.Background(), 50*time.Millisecond) defer cancel() _, err = m.Acquire(ctx, &queue.Task{GPU: 1}) require.Error(t, err) lease.Release() } func TestFormatCUDAVisibleDevices_NoLeaseDisablesGPU(t *testing.T) { require.Equal(t, "-1", resources.FormatCUDAVisibleDevices(nil)) } func TestManager_GPUSlotsAllowSharing_Concurrent(t *testing.T) { m, err := resources.NewManager(resources.Options{TotalCPU: 0, GPUCount: 1, SlotsPerGPU: 4}) require.NoError(t, err) started := make(chan struct{}) release := make(chan struct{}) errCh := make(chan error, 4) leases := make(chan *resources.Lease, 4) for i := 0; i < 4; i++ { go func() { <-started l, err := m.Acquire(context.Background(), &queue.Task{GPU: 1, GPUMemory: "0.25"}) if err != nil { errCh <- err return } leases <- l <-release l.Release() errCh <- nil }() } close(started) deadline := time.After(500 * time.Millisecond) acquired := make([]*resources.Lease, 0, 4) for len(acquired) < 4 { select { case l := <-leases: acquired = append(acquired, l) case <-deadline: t.Fatalf("timed out waiting for leases; got %d", len(acquired)) } } ctx, cancel := context.WithTimeout(context.Background(), 50*time.Millisecond) defer cancel() _, err = m.Acquire(ctx, &queue.Task{GPU: 1, GPUMemory: "0.25"}) require.Error(t, err) close(release) for i := 0; i < 4; i++ { require.NoError(t, <-errCh) } } func TestManager_CPUOnlyNotBlockedWhenGPUSaturated(t *testing.T) { m, err := resources.NewManager(resources.Options{TotalCPU: 4, GPUCount: 1, SlotsPerGPU: 1}) require.NoError(t, err) gpuLease, err := m.Acquire(context.Background(), &queue.Task{GPU: 1}) require.NoError(t, err) defer gpuLease.Release() done := make(chan error, 1) go func() { lease, err := m.Acquire(context.Background(), &queue.Task{CPU: 1}) if err == nil { lease.Release() } done <- err }() select { case err := <-done: require.NoError(t, err) case <-time.After(200 * time.Millisecond): t.Fatal("cpu-only acquire unexpectedly blocked by gpu saturation") } } func TestManager_AcquireMetrics_RecordWaitAndTimeout(t *testing.T) { m, err := resources.NewManager(resources.Options{TotalCPU: 1, GPUCount: 0, SlotsPerGPU: 1}) require.NoError(t, err) lease, err := m.Acquire(context.Background(), &queue.Task{CPU: 1}) require.NoError(t, err) defer lease.Release() ctx, cancel := context.WithTimeout(context.Background(), 50*time.Millisecond) defer cancel() _, err = m.Acquire(ctx, &queue.Task{CPU: 1}) require.Error(t, err) s := m.Snapshot() require.GreaterOrEqual(t, s.AcquireTotal, int64(2)) require.GreaterOrEqual(t, s.AcquireTimeoutTotal, int64(1)) }