package benchmarks_test import ( "fmt" "testing" "time" "github.com/jfraeys/fetch_ml/internal/scheduler" fixtures "github.com/jfraeys/fetch_ml/tests/fixtures" ) // BenchmarkPriorityQueueAdd measures job enqueue performance func BenchmarkPriorityQueueAdd(b *testing.B) { pq := scheduler.NewPriorityQueue(0.1) b.ReportAllocs() b.ResetTimer() for i := 0; i < b.N; i++ { task := &scheduler.Task{ ID: fmt.Sprintf("task-%d", i), Priority: i % 100, } pq.Add(task) } } // BenchmarkPriorityQueueTake measures job dequeue performance func BenchmarkPriorityQueueTake(b *testing.B) { pq := scheduler.NewPriorityQueue(0.1) // Pre-populate queue for i := 0; i < b.N; i++ { task := &scheduler.Task{ ID: fmt.Sprintf("task-%d", i), Priority: i % 100, } pq.Add(task) } b.ReportAllocs() b.ResetTimer() for i := 0; i < b.N; i++ { pq.Take() } } // BenchmarkPortAllocator measures port allocation performance func BenchmarkPortAllocator(b *testing.B) { pa := scheduler.NewPortAllocator(10000, 20000) b.ReportAllocs() b.ResetTimer() for i := 0; i < b.N; i++ { port, _ := pa.Allocate(fmt.Sprintf("service-%d", i)) _ = port } } // BenchmarkStateStoreAppend measures state persistence performance func BenchmarkStateStoreAppend(b *testing.B) { dir := b.TempDir() store, _ := scheduler.NewStateStore(dir + "/bench.state") event := scheduler.StateEvent{ Type: scheduler.EventJobEnqueued, TaskID: "bench-task", Timestamp: time.Now(), } b.ReportAllocs() b.ResetTimer() for i := 0; i < b.N; i++ { event.TaskID = fmt.Sprintf("bench-task-%d", i) store.Append(event) } } // BenchmarkSchedulerSubmitJob measures job submission throughput func BenchmarkSchedulerSubmitJob(b *testing.B) { // Create isolated state directory stateDir := b.TempDir() // Create scheduler directly for benchmark cfg := scheduler.HubConfig{ BindAddr: "localhost:0", StateDir: stateDir, DefaultBatchSlots: 4, StarvationThresholdMins: 5, AcceptanceTimeoutSecs: 5, } hub, err := scheduler.NewHub(cfg, nil) if err != nil { b.Fatal(err) } defer hub.Stop() if err := hub.Start(); err != nil { b.Fatal(err) } b.ResetTimer() for i := 0; i < b.N; i++ { hub.SubmitJob(scheduler.JobSpec{ ID: fmt.Sprintf("bench-job-%d", i), Type: scheduler.JobTypeBatch, }) } } // BenchmarkWorkerRegistration measures worker registration throughput func BenchmarkWorkerRegistration(b *testing.B) { fixture := fixtures.NewSchedulerTestFixture(b, fixtures.DefaultHubConfig()) defer fixture.Cleanup() b.ResetTimer() for i := 0; i < b.N; i++ { workerID := fmt.Sprintf("bench-worker-%d", i) worker := fixtures.NewMockWorker(b, fixture.Hub, workerID) worker.Register(scheduler.WorkerCapabilities{GPUCount: 0}) worker.Close() } } // BenchmarkHeartbeatProcessing measures heartbeat handling throughput func BenchmarkHeartbeatProcessing(b *testing.B) { fixture := fixtures.NewSchedulerTestFixture(b, fixtures.DefaultHubConfig()) defer fixture.Cleanup() worker := fixture.CreateWorker("bench-hb-worker", scheduler.WorkerCapabilities{GPUCount: 0}) slots := scheduler.SlotStatus{ BatchTotal: 4, BatchInUse: 0, } b.ResetTimer() for i := 0; i < b.N; i++ { worker.SendHeartbeat(slots) } } // BenchmarkJobAssignment measures job scheduling latency func BenchmarkJobAssignment(b *testing.B) { fixture := fixtures.NewSchedulerTestFixture(b, fixtures.DefaultHubConfig()) defer fixture.Cleanup() // Create worker worker := fixture.CreateWorker("bench-assign-worker", scheduler.WorkerCapabilities{GPUCount: 0}) b.ResetTimer() for i := 0; i < b.N; i++ { // Submit job jobID := fmt.Sprintf("bench-assign-%d", i) fixture.SubmitJob(scheduler.JobSpec{ ID: jobID, Type: scheduler.JobTypeBatch, }) // Signal ready to trigger assignment worker.SignalReady(scheduler.SlotStatus{BatchTotal: 4, BatchInUse: 0}, "polling") // Wait for assignment worker.RecvTimeout(100 * time.Millisecond) } } // BenchmarkMultiWorkerScheduling measures scheduling with multiple workers func BenchmarkMultiWorkerScheduling(b *testing.B) { fixture := fixtures.NewSchedulerTestFixture(b, fixtures.DefaultHubConfig()) defer fixture.Cleanup() // Create multiple workers workers := make([]*fixtures.MockWorker, 10) for i := 0; i < 10; i++ { workers[i] = fixture.CreateWorker( fmt.Sprintf("bench-multi-worker-%d", i), scheduler.WorkerCapabilities{GPUCount: 0}, ) } b.ResetTimer() for i := 0; i < b.N; i++ { // Submit job jobID := fmt.Sprintf("bench-multi-%d", i) fixture.SubmitJob(scheduler.JobSpec{ ID: jobID, Type: scheduler.JobTypeBatch, }) // All workers signal ready for _, w := range workers { w.SignalReady(scheduler.SlotStatus{BatchTotal: 4, BatchInUse: 0}, "polling") } // One worker gets the job workers[i%10].RecvTimeout(100 * time.Millisecond) } }