fetch_ml/tests/benchmarks/scheduler_latency_bench_test.go
Jeremie Fraeys f827ee522a
test(tracking/plugins): add PodmanInterface and comprehensive plugin tests for 91% coverage
Refactor plugins to use interface for testability:
- Add PodmanInterface to container package (StartContainer, StopContainer, RemoveContainer)
- Update MLflow plugin to use container.PodmanInterface
- Update TensorBoard plugin to use container.PodmanInterface
- Add comprehensive mocked tests for all three plugins (wandb, mlflow, tensorboard)
- Coverage increased from 18% to 91.4%
2026-03-14 16:59:16 -04:00

264 lines
7 KiB
Go

// Package benchmarks_test provides performance benchmarks with latency histogram tracking
package benchmarks_test
import (
"fmt"
"sort"
"testing"
"time"
"github.com/jfraeys/fetch_ml/internal/scheduler"
fixtures "github.com/jfraeys/fetch_ml/tests/fixtures"
)
// latencyHistogram tracks scheduling latencies for percentile calculation
type latencyHistogram struct {
latencies []time.Duration
}
func newLatencyHistogram(capacity int) *latencyHistogram {
return &latencyHistogram{
latencies: make([]time.Duration, 0, capacity),
}
}
func (h *latencyHistogram) record(d time.Duration) {
h.latencies = append(h.latencies, d)
}
func (h *latencyHistogram) percentile(p float64) time.Duration {
if len(h.latencies) == 0 {
return 0
}
sort.Slice(h.latencies, func(i, j int) bool {
return h.latencies[i] < h.latencies[j]
})
idx := int(float64(len(h.latencies)-1) * p / 100.0)
return h.latencies[idx]
}
func (h *latencyHistogram) min() time.Duration {
if len(h.latencies) == 0 {
return 0
}
min := h.latencies[0]
for _, v := range h.latencies {
if v < min {
min = v
}
}
return min
}
func (h *latencyHistogram) max() time.Duration {
if len(h.latencies) == 0 {
return 0
}
max := h.latencies[0]
for _, v := range h.latencies {
if v > max {
max = v
}
}
return max
}
// BenchmarkSchedulingLatency measures job scheduling latency percentiles
// Reports p50, p95, p99 latencies for job assignment
func BenchmarkSchedulingLatency(b *testing.B) {
fixture := fixtures.NewSchedulerTestFixture(b, fixtures.DefaultHubConfig())
defer fixture.Cleanup()
// Create worker
worker := fixture.CreateWorker("latency-worker", scheduler.WorkerCapabilities{GPUCount: 0})
hist := newLatencyHistogram(b.N)
b.ReportAllocs()
b.ResetTimer()
for i := 0; i < b.N; i++ {
jobID := fmt.Sprintf("latency-job-%d", i)
// Record start time
start := time.Now()
// Submit job
fixture.SubmitJob(scheduler.JobSpec{
ID: jobID,
Type: scheduler.JobTypeBatch,
})
// Signal ready to trigger assignment
worker.SignalReady(scheduler.SlotStatus{BatchTotal: 4, BatchInUse: 0}, "polling")
// Wait for assignment
worker.RecvTimeout(100 * time.Millisecond)
// Record latency
hist.record(time.Since(start))
// Accept and complete job to free slot
worker.AcceptJob(jobID)
worker.CompleteJob(jobID, 0, "")
}
// Report percentiles
b.ReportMetric(float64(hist.min().Microseconds()), "min_us/op")
b.ReportMetric(float64(hist.percentile(50).Microseconds()), "p50_us/op")
b.ReportMetric(float64(hist.percentile(95).Microseconds()), "p95_us/op")
b.ReportMetric(float64(hist.percentile(99).Microseconds()), "p99_us/op")
b.ReportMetric(float64(hist.max().Microseconds()), "max_us/op")
}
// BenchmarkSchedulingLatencyParallel measures scheduling latency under concurrent load
func BenchmarkSchedulingLatencyParallel(b *testing.B) {
fixture := fixtures.NewSchedulerTestFixture(b, fixtures.DefaultHubConfig())
defer fixture.Cleanup()
// Create multiple workers
numWorkers := 10
workers := make([]*fixtures.MockWorker, numWorkers)
for i := 0; i < numWorkers; i++ {
workers[i] = fixture.CreateWorker(
fmt.Sprintf("parallel-latency-worker-%d", i),
scheduler.WorkerCapabilities{GPUCount: 0},
)
}
// Each goroutine tracks its own latencies
type result struct {
latencies []time.Duration
}
results := make(chan result, b.N)
b.ReportAllocs()
b.ResetTimer()
b.RunParallel(func(pb *testing.PB) {
localHist := newLatencyHistogram(1000)
i := 0
for pb.Next() {
jobID := fmt.Sprintf("parallel-latency-job-%d", i)
workerIdx := i % numWorkers
start := time.Now()
fixture.SubmitJob(scheduler.JobSpec{
ID: jobID,
Type: scheduler.JobTypeBatch,
})
workers[workerIdx].SignalReady(scheduler.SlotStatus{BatchTotal: 4, BatchInUse: 0}, "polling")
workers[workerIdx].RecvTimeout(100 * time.Millisecond)
localHist.record(time.Since(start))
workers[workerIdx].AcceptJob(jobID)
workers[workerIdx].CompleteJob(jobID, 0, "")
i++
}
results <- result{latencies: localHist.latencies}
})
// Aggregate results
close(results)
globalHist := newLatencyHistogram(b.N)
for r := range results {
for _, lat := range r.latencies {
globalHist.record(lat)
}
}
// Report percentiles
b.ReportMetric(float64(globalHist.min().Microseconds()), "min_us/op")
b.ReportMetric(float64(globalHist.percentile(50).Microseconds()), "p50_us/op")
b.ReportMetric(float64(globalHist.percentile(95).Microseconds()), "p95_us/op")
b.ReportMetric(float64(globalHist.percentile(99).Microseconds()), "p99_us/op")
b.ReportMetric(float64(globalHist.max().Microseconds()), "max_us/op")
}
// BenchmarkQueueThroughput measures queue operations per second
func BenchmarkQueueThroughput(b *testing.B) {
fixture := fixtures.NewSchedulerTestFixture(b, fixtures.DefaultHubConfig())
defer fixture.Cleanup()
// Create workers
numWorkers := 10
workers := make([]*fixtures.MockWorker, numWorkers)
for i := 0; i < numWorkers; i++ {
workers[i] = fixture.CreateWorker(
fmt.Sprintf("throughput-worker-%d", i),
scheduler.WorkerCapabilities{GPUCount: 0},
)
}
// Pre-create all jobs
jobs := make([]scheduler.JobSpec, b.N)
for i := 0; i < b.N; i++ {
jobs[i] = scheduler.JobSpec{
ID: fmt.Sprintf("throughput-job-%d", i),
Type: scheduler.JobTypeBatch,
}
}
b.ReportAllocs()
b.ResetTimer()
// Submit all jobs as fast as possible
start := time.Now()
for i := 0; i < b.N; i++ {
fixture.SubmitJob(jobs[i])
}
enqueueTime := time.Since(start)
// Process jobs
start = time.Now()
jobsProcessed := 0
for jobsProcessed < b.N {
for _, w := range workers {
w.SignalReady(scheduler.SlotStatus{BatchTotal: 4, BatchInUse: 0}, "polling")
msg := w.RecvTimeout(10 * time.Millisecond)
if msg.Type == scheduler.MsgJobAssign {
jobsProcessed++
}
}
}
processTime := time.Since(start)
// Report throughput metrics
totalTime := enqueueTime + processTime
opsPerSec := float64(b.N) / totalTime.Seconds()
b.ReportMetric(opsPerSec, "jobs/sec")
b.ReportMetric(float64(enqueueTime.Microseconds())/float64(b.N), "enqueue_us/op")
b.ReportMetric(float64(processTime.Microseconds())/float64(b.N), "process_us/op")
}
// BenchmarkWorkerRegistrationLatency measures worker registration time
func BenchmarkWorkerRegistrationLatency(b *testing.B) {
fixture := fixtures.NewSchedulerTestFixture(b, fixtures.DefaultHubConfig())
defer fixture.Cleanup()
hist := newLatencyHistogram(b.N)
b.ReportAllocs()
b.ResetTimer()
for i := 0; i < b.N; i++ {
workerID := fmt.Sprintf("reg-latency-worker-%d", i)
start := time.Now()
worker := fixtures.NewMockWorker(b, fixture.Hub, workerID)
worker.Register(scheduler.WorkerCapabilities{GPUCount: 0})
hist.record(time.Since(start))
worker.Close()
}
// Report percentiles
b.ReportMetric(float64(hist.min().Microseconds()), "min_us/op")
b.ReportMetric(float64(hist.percentile(50).Microseconds()), "p50_us/op")
b.ReportMetric(float64(hist.percentile(95).Microseconds()), "p95_us/op")
b.ReportMetric(float64(hist.percentile(99).Microseconds()), "p99_us/op")
b.ReportMetric(float64(hist.max().Microseconds()), "max_us/op")
}