test(benchmarks): update benchmark tests with job cleanup and improvements

**Payload Performance Test:**
- Add job cleanup after each iteration using DeleteJob()
- Ensure isolated memory measurements between test runs

**All Benchmark Tests:**
- General improvements and maintenance updates
This commit is contained in:
Jeremie Fraeys 2026-02-23 18:03:54 -05:00
parent 54ddab887e
commit be67cb77d3
No known key found for this signature in database
18 changed files with 188 additions and 157 deletions

View file

@ -21,19 +21,20 @@ func newBenchmarkHTTPClient() *http.Client {
transport := &http.Transport{
Proxy: http.ProxyFromEnvironment,
DialContext: (&net.Dialer{
Timeout: 30 * time.Second,
Timeout: 5 * time.Second,
KeepAlive: 30 * time.Second,
}).DialContext,
ForceAttemptHTTP2: true,
MaxIdleConns: 256,
MaxIdleConnsPerHost: 256,
ForceAttemptHTTP2: false, // Disable HTTP/2 for benchmark stability
MaxIdleConns: 500,
MaxIdleConnsPerHost: 500,
MaxConnsPerHost: 500,
IdleConnTimeout: 90 * time.Second,
TLSHandshakeTimeout: 10 * time.Second,
TLSHandshakeTimeout: 5 * time.Second,
ExpectContinueTimeout: 1 * time.Second,
}
return &http.Client{
Timeout: 30 * time.Second,
Timeout: 10 * time.Second,
Transport: transport,
}
}
@ -77,13 +78,12 @@ func BenchmarkAPIServerCreateJobSimple(b *testing.B) {
client := &http.Client{Timeout: 30 * time.Second}
baseURL := "http://" + addr
b.ResetTimer()
b.ReportAllocs()
for i := 0; i < b.N; i++ {
jobData := map[string]interface{}{
for i := 0; b.Loop(); i++ {
jobData := map[string]any{
"job_name": fmt.Sprintf("benchmark-job-%d", i),
"args": map[string]interface{}{
"args": map[string]any{
"model": "test-model",
"data": generateTestPayload(1024),
},
@ -136,10 +136,9 @@ func BenchmarkMetricsCollection(b *testing.B) {
registry.MustRegister(counter, histogram)
b.ResetTimer()
b.ReportAllocs()
for i := 0; i < b.N; i++ {
for i := 0; b.Loop(); i++ {
counter.Inc()
histogram.Observe(float64(i) * 0.001)
}
@ -205,8 +204,23 @@ func setupTestAPIServer(_ *testing.B) *httptest.Server {
w.WriteHeader(http.StatusOK)
_ = json.NewEncoder(w).Encode(map[string]string{"status": "ok"})
})
mux.HandleFunc("/ws", func(w http.ResponseWriter, _ *http.Request) {
w.WriteHeader(http.StatusOK)
mux.HandleFunc("/ws", func(w http.ResponseWriter, r *http.Request) {
upgrader := websocket.Upgrader{
CheckOrigin: func(_ *http.Request) bool { return true },
}
conn, err := upgrader.Upgrade(w, r, nil)
if err != nil {
return
}
defer conn.Close()
// Echo back any messages
for {
mt, message, err := conn.ReadMessage()
if err != nil {
break
}
_ = conn.WriteMessage(mt, message)
}
})
return httptest.NewServer(mux)
@ -214,12 +228,18 @@ func setupTestAPIServer(_ *testing.B) *httptest.Server {
// benchmarkCreateJob tests job creation performance
func benchmarkCreateJob(b *testing.B, baseURL string, client *http.Client) {
for i := 0; i < b.N; i++ {
jobData := map[string]interface{}{
"job_name": fmt.Sprintf("benchmark-job-%d", i),
"args": map[string]interface{}{
// Pre-generate test payload to avoid allocation overhead in hot path
testPayload := generateTestPayload(1024)
// Use static job name to avoid fmt.Sprintf overhead in benchmark loop
jobName := "benchmark-job"
for i := 0; b.Loop(); i++ {
_ = i // Avoid unused variable warning
jobData := map[string]any{
"job_name": jobName,
"args": map[string]any{
"model": "test-model",
"data": generateTestPayload(1024), // 1KB payload
"data": testPayload,
},
"priority": 0,
}
@ -244,7 +264,7 @@ func benchmarkCreateJob(b *testing.B, baseURL string, client *http.Client) {
// benchmarkListJobs tests job listing performance
func benchmarkListJobs(b *testing.B, baseURL string, client *http.Client) {
for i := 0; i < b.N; i++ {
for i := 0; b.Loop(); i++ {
req, err := http.NewRequestWithContext(context.Background(), "GET", baseURL+"/api/v1/jobs", nil)
if err != nil {
b.Fatalf("Failed to create request: %v", err)
@ -274,20 +294,21 @@ func BenchmarkAPIServerListJobs(b *testing.B) {
func BenchmarkWebSocketConnection(b *testing.B) {
server := setupTestAPIServer(b)
defer server.Close()
b.ReportAllocs()
for i := 0; i < b.N; i++ {
// Convert HTTP URL to WebSocket URL
wsURL := strings.Replace(server.URL, "http://", "ws://", 1)
wsURL += "/ws"
// Convert HTTP URL to WebSocket URL once
wsURL := strings.Replace(server.URL, "http://", "ws://", 1) + "/ws"
for b.Loop() {
conn, resp, err := websocket.DefaultDialer.Dial(wsURL, nil)
if resp != nil && resp.Body != nil {
_ = resp.Body.Close()
}
if err != nil {
// Skip iteration if WebSocket server isn't available
continue
b.Fatalf("WebSocket dial failed: %v", err)
}
// Send close message and wait for server to close
_ = conn.WriteMessage(websocket.CloseMessage, websocket.FormatCloseMessage(websocket.CloseNormalClosure, ""))
_ = conn.Close()
}
}

View file

@ -3,6 +3,7 @@ package benchmarks
import (
"os"
"path/filepath"
"strings"
"testing"
"github.com/jfraeys/fetch_ml/internal/worker"
@ -38,6 +39,9 @@ func BenchmarkArtifactScanNative(b *testing.B) {
for b.Loop() {
_, err := worker.ScanArtifactsNative(tmpDir)
if err != nil {
if strings.Contains(err.Error(), "native artifact scanner requires") {
b.Skip("Native artifact scanner not available: ", err)
}
b.Fatal(err)
}
}
@ -65,6 +69,9 @@ func BenchmarkArtifactScanLarge(b *testing.B) {
for b.Loop() {
_, err := worker.ScanArtifactsNative(tmpDir)
if err != nil {
if strings.Contains(err.Error(), "native artifact scanner requires") {
b.Skip("Native artifact scanner not available: ", err)
}
b.Fatal(err)
}
}

View file

@ -132,10 +132,9 @@ type BenchmarkMonitoringConfig struct {
func BenchmarkConfigYAMLUnmarshal(b *testing.B) {
data := []byte(sampleServerConfig)
b.ResetTimer()
b.ReportAllocs()
for i := 0; i < b.N; i++ {
for b.Loop() {
var cfg BenchmarkServerConfig
err := yaml.Unmarshal(data, &cfg)
if err != nil {

View file

@ -49,11 +49,10 @@ func BenchmarkSequentialHashes(b *testing.B) {
testDir := createSmallDataset(b)
b.ReportAllocs()
b.ResetTimer()
for i := 0; i < b.N; i++ {
for b.Loop() {
// Simulate viewing 10 datasets (like TUI scrolling)
for j := 0; j < 10; j++ {
for range 10 {
_, err := worker.DirOverallSHA256Hex(testDir)
if err != nil {
b.Fatal(err)

View file

@ -38,7 +38,7 @@ func BenchmarkDirOverallSHA256Hex_Native(b *testing.B) {
if err := os.MkdirAll(metaDir, 0750); err != nil {
b.Fatal(err)
}
for i := 0; i < 10; i++ {
for i := range 10 {
if err := os.WriteFile(
filepath.Join(metaDir, "file"+string(rune('0'+i))+".json"),
[]byte(`{"key": "value"}`),
@ -51,7 +51,7 @@ func BenchmarkDirOverallSHA256Hex_Native(b *testing.B) {
b.ResetTimer()
b.ReportAllocs()
for i := 0; i < b.N; i++ {
for b.Loop() {
_, err := worker.DirOverallSHA256Hex(tmpDir)
if err != nil {
b.Fatal(err)
@ -82,10 +82,9 @@ func BenchmarkDirOverallSHA256HexLarge_Native(b *testing.B) {
}
}
b.ResetTimer()
b.ReportAllocs()
for i := 0; i < b.N; i++ {
for b.Loop() {
_, err := worker.DirOverallSHA256Hex(tmpDir)
if err != nil {
b.Fatal(err)

View file

@ -42,7 +42,7 @@ func BenchmarkDirOverallSHA256Hex(b *testing.B) {
if err := os.MkdirAll(metaDir, 0750); err != nil {
b.Fatal(err)
}
for i := 0; i < 10; i++ {
for i := range 10 {
if err := os.WriteFile(
filepath.Join(metaDir, "file"+string(rune('0'+i))+".json"),
[]byte(`{"key": "value"}`),
@ -55,7 +55,7 @@ func BenchmarkDirOverallSHA256Hex(b *testing.B) {
b.ResetTimer()
b.ReportAllocs()
for i := 0; i < b.N; i++ {
for b.Loop() {
_, err := worker.DirOverallSHA256Hex(tmpDir)
if err != nil {
b.Fatal(err)
@ -68,7 +68,7 @@ func BenchmarkDirOverallSHA256HexLarge(b *testing.B) {
tmpDir := b.TempDir()
// Create 50 files of 100KB each = ~5MB total
for i := 0; i < 50; i++ {
for i := range 50 {
subdir := filepath.Join(tmpDir, "data", string(rune('a'+i%26)))
if err := os.MkdirAll(subdir, 0750); err != nil {
b.Fatal(err)
@ -88,7 +88,7 @@ func BenchmarkDirOverallSHA256HexLarge(b *testing.B) {
b.Run("Sequential", func(b *testing.B) {
b.ReportAllocs()
for i := 0; i < b.N; i++ {
for b.Loop() {
_, err := worker.DirOverallSHA256Hex(tmpDir)
if err != nil {
b.Fatal(err)
@ -98,7 +98,7 @@ func BenchmarkDirOverallSHA256HexLarge(b *testing.B) {
b.Run("ParallelGo", func(b *testing.B) {
b.ReportAllocs()
for i := 0; i < b.N; i++ {
for b.Loop() {
_, err := worker.DirOverallSHA256Hex(tmpDir)
if err != nil {
b.Fatal(err)
@ -107,10 +107,10 @@ func BenchmarkDirOverallSHA256HexLarge(b *testing.B) {
})
b.Run("Native", func(b *testing.B) {
// This requires FETCHML_NATIVE_LIBS=1 to actually use native
// This requires -tags native_libs to actually use native
// Otherwise falls back to Go implementation
b.ReportAllocs()
for i := 0; i < b.N; i++ {
for b.Loop() {
_, err := worker.DirOverallSHA256Hex(tmpDir)
if err != nil {
b.Fatal(err)

View file

@ -3,6 +3,7 @@ package benchmarks
import (
"os"
"path/filepath"
"strings"
"testing"
"github.com/jfraeys/fetch_ml/internal/worker"
@ -10,7 +11,7 @@ import (
)
// BenchmarkDatasetSizeComparison finds the crossover point where native wins
// Run with: FETCHML_NATIVE_LIBS=1 go test -tags native_libs -bench=BenchmarkDatasetSize ./tests/benchmarks/
// Run with: go test -tags native_libs -bench=BenchmarkDatasetSize ./tests/benchmarks/
func BenchmarkDatasetSizeComparison(b *testing.B) {
sizes := []struct {
name string
@ -48,9 +49,12 @@ func BenchmarkDatasetSizeComparison(b *testing.B) {
b.ResetTimer()
b.ReportAllocs()
for i := 0; i < b.N; i++ {
// Use DirOverallSHA256Hex which calls native via build tag
_, err := worker.DirOverallSHA256Hex(tmpDir)
// Use DirOverallSHA256HexNative which calls native C++ implementation
_, err := worker.DirOverallSHA256HexNative(tmpDir)
if err != nil {
if strings.Contains(err.Error(), "native hash requires") {
b.Skip("Native hash not available: ", err)
}
b.Fatal(err)
}
}
@ -64,7 +68,7 @@ func createTestFiles(b *testing.B, dir string, numFiles int, fileSize int) {
data[i] = byte(i % 256)
}
for i := 0; i < numFiles; i++ {
for i := range numFiles {
path := filepath.Join(dir, "data", string(rune('a'+i%26)), "chunk.bin")
if err := os.MkdirAll(filepath.Dir(path), 0750); err != nil {
b.Fatal(err)

View file

@ -12,7 +12,7 @@ func TestGoNativeLeakStress(t *testing.T) {
tmpDir := t.TempDir()
// Create multiple test files
for i := 0; i < 10; i++ {
for i := range 10 {
content := make([]byte, 1024*1024) // 1MB each
for j := range content {
content[j] = byte(i * j)
@ -23,7 +23,7 @@ func TestGoNativeLeakStress(t *testing.T) {
}
// Run 1000 hash operations through Go wrapper
for i := 0; i < 1000; i++ {
for i := range 1000 {
hash, err := worker.DirOverallSHA256Hex(tmpDir)
if err != nil {
t.Fatalf("Hash %d failed: %v", i, err)
@ -49,14 +49,14 @@ func TestGoNativeArtifactScanLeak(t *testing.T) {
tmpDir := t.TempDir()
// Create test files
for i := 0; i < 50; i++ {
for i := range 50 {
if err := os.WriteFile(tmpDir+"/file_"+string(rune('a'+i%26))+".txt", []byte("data"), 0644); err != nil {
t.Fatal(err)
}
}
// Run 100 scans
for i := 0; i < 100; i++ {
for i := range 100 {
_, err := worker.ScanArtifactsNative(tmpDir)
if err != nil {
t.Logf("Scan %d: %v (may be expected if native disabled)", i, err)

View file

@ -17,7 +17,7 @@ func BenchmarkTaskJSONMarshal(b *testing.B) {
b.ResetTimer()
b.ReportAllocs()
for i := 0; i < b.N; i++ {
for b.Loop() {
_, err := json.Marshal(task)
if err != nil {
b.Fatal(err)
@ -37,7 +37,7 @@ func BenchmarkTaskJSONUnmarshal(b *testing.B) {
b.ResetTimer()
b.ReportAllocs()
for i := 0; i < b.N; i++ {
for b.Loop() {
var t queue.Task
err := json.Unmarshal(data, &t)
if err != nil {
@ -53,7 +53,7 @@ func BenchmarkTaskJSONRoundTrip(b *testing.B) {
b.ResetTimer()
b.ReportAllocs()
for i := 0; i < b.N; i++ {
for b.Loop() {
data, err := json.Marshal(task)
if err != nil {
b.Fatal(err)
@ -87,7 +87,7 @@ func BenchmarkPrewarmStateJSONMarshal(b *testing.B) {
b.ResetTimer()
b.ReportAllocs()
for i := 0; i < b.N; i++ {
for b.Loop() {
_, err := json.Marshal(state)
if err != nil {
b.Fatal(err)
@ -119,7 +119,7 @@ func BenchmarkPrewarmStateJSONUnmarshal(b *testing.B) {
b.ResetTimer()
b.ReportAllocs()
for i := 0; i < b.N; i++ {
for b.Loop() {
var s queue.PrewarmState
err := json.Unmarshal(data, &s)
if err != nil {
@ -139,7 +139,7 @@ func BenchmarkTaskBatchJSONMarshal(b *testing.B) {
b.ResetTimer()
b.ReportAllocs()
for i := 0; i < b.N; i++ {
for b.Loop() {
for _, task := range tasks {
_, err := json.Marshal(task)
if err != nil {

View file

@ -20,10 +20,9 @@ func BenchmarkResolveWorkspacePath(b *testing.B) {
"deep/nested/workspace/name",
}
b.ResetTimer()
b.ReportAllocs()
for i := 0; i < b.N; i++ {
for i := 0; b.Loop(); i++ {
path := testPaths[i%len(testPaths)]
// Simulate the resolveWorkspacePath logic inline
ws := strings.TrimSpace(path)
@ -50,7 +49,7 @@ func BenchmarkStringTrimSpace(b *testing.B) {
b.ResetTimer()
b.ReportAllocs()
for i := 0; i < b.N; i++ {
for i := 0; b.Loop(); i++ {
_ = strings.TrimSpace(testInputs[i%len(testInputs)])
}
}
@ -69,7 +68,7 @@ func BenchmarkStringSplit(b *testing.B) {
b.ResetTimer()
b.ReportAllocs()
for i := 0; i < b.N; i++ {
for i := 0; b.Loop(); i++ {
_ = strings.Split(testInputs[i%len(testInputs)], ",")
}
}
@ -89,7 +88,7 @@ func BenchmarkStringHasPrefix(b *testing.B) {
b.ResetTimer()
b.ReportAllocs()
for i := 0; i < b.N; i++ {
for i := 0; b.Loop(); i++ {
_ = strings.HasPrefix(testInputs[i%len(testInputs)], prefixes[i%len(prefixes)])
}
}
@ -110,7 +109,7 @@ func BenchmarkStringEqualFold(b *testing.B) {
b.ResetTimer()
b.ReportAllocs()
for i := 0; i < b.N; i++ {
for i := 0; b.Loop(); i++ {
pair := testPairs[i%len(testPairs)]
_ = strings.EqualFold(pair.a, pair.b)
}
@ -131,7 +130,7 @@ func BenchmarkFilepathClean(b *testing.B) {
b.ResetTimer()
b.ReportAllocs()
for i := 0; i < b.N; i++ {
for i := 0; b.Loop(); i++ {
_ = filepath.Clean(testPaths[i%len(testPaths)])
}
}
@ -149,7 +148,7 @@ func BenchmarkFilepathJoin(b *testing.B) {
b.ResetTimer()
b.ReportAllocs()
for i := 0; i < b.N; i++ {
for i := 0; b.Loop(); i++ {
parts := components[i%len(components)]
_ = filepath.Join(parts...)
}
@ -169,7 +168,7 @@ func BenchmarkStrconvAtoi(b *testing.B) {
b.ResetTimer()
b.ReportAllocs()
for i := 0; i < b.N; i++ {
for i := 0; b.Loop(); i++ {
_, _ = strconv.Atoi(testInputs[i%len(testInputs)])
}
}
@ -183,7 +182,7 @@ func BenchmarkTimeFormat(b *testing.B) {
b.ResetTimer()
b.ReportAllocs()
for i := 0; i < b.N; i++ {
for i := 0; b.Loop(); i++ {
_ = now.Format(format)
}
}
@ -197,7 +196,7 @@ func BenchmarkSprintfConcat(b *testing.B) {
b.ResetTimer()
b.ReportAllocs()
for i := 0; i < b.N; i++ {
for i := 0; b.Loop(); i++ {
_ = names[i%len(names)] + "_" + timestamps[i%len(timestamps)]
}
}
@ -216,7 +215,7 @@ func BenchmarkPackageListParsing(b *testing.B) {
b.ResetTimer()
b.ReportAllocs()
for i := 0; i < b.N; i++ {
for i := 0; b.Loop(); i++ {
input := testInputs[i%len(testInputs)]
parts := strings.Split(input, ",")
out := make([]string, 0, len(parts))
@ -242,10 +241,9 @@ func BenchmarkEnvLookup(b *testing.B) {
"NONEXISTENT_VAR",
}
b.ResetTimer()
b.ReportAllocs()
for i := 0; i < b.N; i++ {
for i := 0; b.Loop(); i++ {
_, _ = os.LookupEnv(keys[i%len(keys)])
}
}
@ -260,10 +258,9 @@ func BenchmarkCombinedJupyterHotPath(b *testing.B) {
"deep/nested/name",
}
b.ResetTimer()
b.ReportAllocs()
for i := 0; i < b.N; i++ {
for i := 0; b.Loop(); i++ {
// Simulate resolveWorkspacePath + path building
ws := strings.TrimSpace(testWorkspaces[i%len(testWorkspaces)])
clean := filepath.Clean(ws)

View file

@ -26,10 +26,9 @@ func BenchmarkLogSanitizeMessage(b *testing.B) {
"Authentication token: eyJ0eXAiOiJKV1QiLCJhbGciOiJIUzI1NiJ9.test.signature",
}
b.ResetTimer()
b.ReportAllocs()
for i := 0; i < b.N; i++ {
for i := 0; b.Loop(); i++ {
msg := messages[i%len(messages)]
_ = logging.SanitizeLogMessage(msg)
}
@ -50,10 +49,9 @@ func BenchmarkLogSanitizeArgs(b *testing.B) {
"timestamp", "2024-01-01T00:00:00Z",
}
b.ResetTimer()
b.ReportAllocs()
for i := 0; i < b.N; i++ {
for b.Loop() {
_ = logging.SanitizeArgs(args)
}
}
@ -73,10 +71,9 @@ func BenchmarkLogSanitizeHighVolume(b *testing.B) {
"Webhook received with authorization=Bearer eyJ0eXAiOiJKV1Qi.test.sig",
}
b.ResetTimer()
b.ReportAllocs()
for i := 0; i < b.N; i++ {
for b.Loop() {
// Simulate batch processing of multiple messages
for _, msg := range testMessages {
_ = logging.SanitizeLogMessage(msg)

View file

@ -7,6 +7,7 @@ import (
"testing"
"time"
"github.com/alicebob/miniredis/v2"
"github.com/jfraeys/fetch_ml/internal/metrics"
"github.com/jfraeys/fetch_ml/internal/storage"
fixtures "github.com/jfraeys/fetch_ml/tests/fixtures"
@ -87,7 +88,7 @@ func benchmarkMLExperiment(b *testing.B, db *storage.DB, rdb *redis.Client, expT
_ = executionTime // Use executionTime to avoid unused variable warning
// Record experiment metrics
for j := 0; j < 5; j++ {
for j := range 5 {
metricName := fmt.Sprintf("metric_%d_%d", j, i)
err := db.RecordJobMetric(expID, metricName, fmt.Sprintf("%.2f", float64(j)*1.5))
if err != nil {
@ -106,10 +107,10 @@ func benchmarkConcurrentExperiments(b *testing.B, db *storage.DB, rdb *redis.Cli
b.ResetTimer()
// Create experiments concurrently
for i := 0; i < b.N; i++ {
for i := 0; b.Loop(); i++ {
done := make(chan bool, numExperiments)
for exp := 0; exp < numExperiments; exp++ {
for exp := range numExperiments {
go func(expID int) {
defer func() { done <- true }()
@ -161,9 +162,9 @@ func benchmarkExperimentMetrics(b *testing.B, db *storage.DB, _ *redis.Client) {
b.ReportAllocs()
// Record metrics for all jobs
for i := 0; i < b.N; i++ {
for i := 0; b.Loop(); i++ {
for _, jobID := range jobIDs {
for j := 0; j < metricsPerJob; j++ {
for j := range metricsPerJob {
metricName := fmt.Sprintf("metric_%d_%d", j, i)
metricValue := fmt.Sprintf("%.6f", float64(i*j)*0.001)
@ -207,7 +208,7 @@ func BenchmarkDatasetOperations(b *testing.B) {
}
func benchmarkDatasetCreation(b *testing.B, db *storage.DB) {
for i := 0; i < b.N; i++ {
for i := 0; b.Loop(); i++ {
datasetID := fmt.Sprintf("dataset-%d-%d", i, time.Now().UnixNano())
// Create a job first for foreign key constraint
@ -244,7 +245,7 @@ func benchmarkDatasetCreation(b *testing.B, db *storage.DB) {
func benchmarkDatasetRetrieval(b *testing.B, db *storage.DB) {
// Pre-create datasets
numDatasets := 100
for i := 0; i < numDatasets; i++ {
for i := range numDatasets {
datasetID := fmt.Sprintf("dataset-%d-%d", i, time.Now().UnixNano())
// Create a job first
@ -260,7 +261,7 @@ func benchmarkDatasetRetrieval(b *testing.B, db *storage.DB) {
_ = db.RecordJobMetric(datasetID, "dataset_type", "training")
}
for i := 0; i < b.N; i++ {
for i := 0; b.Loop(); i++ {
datasetID := fmt.Sprintf("dataset-%d", i%numDatasets)
// Simulate dataset metadata retrieval
@ -274,7 +275,7 @@ func benchmarkDatasetUpdate(b *testing.B, db *storage.DB) {
// Pre-create datasets
numDatasets := 50
datasetIDs := make([]string, numDatasets)
for i := 0; i < numDatasets; i++ {
for i := range numDatasets {
datasetID := fmt.Sprintf("dataset-%d-%d", i, time.Now().UnixNano())
datasetIDs[i] = datasetID
@ -290,7 +291,7 @@ func benchmarkDatasetUpdate(b *testing.B, db *storage.DB) {
_ = db.RecordJobMetric(datasetID, "dataset_size", fmt.Sprintf("%d", 1024))
}
for i := 0; i < b.N; i++ {
for i := 0; b.Loop(); i++ {
datasetID := datasetIDs[i%numDatasets]
// Update dataset metadata
@ -309,24 +310,24 @@ func benchmarkDatasetUpdate(b *testing.B, db *storage.DB) {
// Helper functions
func setupBenchmarkRedis(b *testing.B) *redis.Client {
// Start in-memory Redis server
s, err := miniredis.Run()
if err != nil {
b.Fatalf("failed to start miniredis: %v", err)
}
rdb := redis.NewClient(&redis.Options{
Addr: "localhost:6379",
Password: "",
DB: 5, // Use DB 5 for benchmarks
Addr: s.Addr(),
})
ctx := context.Background()
if err := rdb.Ping(ctx).Err(); err != nil {
b.Skipf("Redis not available, skipping benchmark: %v", err)
return nil
b.Fatalf("miniredis ping failed: %v", err)
}
// Clean up the test database
rdb.FlushDB(ctx)
b.Cleanup(func() {
rdb.FlushDB(ctx)
_ = rdb.Close()
rdb.Close()
s.Close()
})
return rdb
@ -350,7 +351,7 @@ func createMLExperiment(db *storage.DB, rdb *redis.Client, expID string, numJobs
}
// Create individual jobs for the experiment
for i := 0; i < numJobs; i++ {
for i := range numJobs {
jobID := fmt.Sprintf("%s-job-%d", expID, i)
payload := generateMLPayload(payloadSize, i)
@ -382,7 +383,7 @@ func executeMLExperiment(db *storage.DB, rdb *redis.Client, expID string, numJob
ctx := context.Background()
// Process all jobs in the experiment
for i := 0; i < numJobs; i++ {
for i := range numJobs {
jobID := fmt.Sprintf("%s-job-%d", expID, i)
// Update job status to running

View file

@ -12,7 +12,7 @@ func BenchmarkNativeQueueBasic(b *testing.B) {
// Only run if native libs available
if !queue.UseNativeQueue {
b.Skip("Native queue not enabled (set FETCHML_NATIVE_LIBS=1)")
b.Skip("Native queue not enabled (build with -tags native_libs)")
}
q, err := queue.NewNativeQueue(tmpDir)
@ -28,10 +28,9 @@ func BenchmarkNativeQueueBasic(b *testing.B) {
Priority: 100,
}
b.ResetTimer()
b.ReportAllocs()
for i := 0; i < b.N; i++ {
for i := 0; b.Loop(); i++ {
task.ID = "test-" + string(rune('0'+i%10))
if err := q.AddTask(task); err != nil {
b.Fatal(err)

View file

@ -11,7 +11,7 @@ import (
// Expected: 5x speedup, 99% allocation reduction
func BenchmarkNativeQueueRebuildIndex(b *testing.B) {
if !queue.UseNativeQueue {
b.Skip("Native queue not enabled (set FETCHML_NATIVE_LIBS=1 or build with -tags native_libs)")
b.Skip("Native queue not enabled (build with -tags native_libs)")
}
tmpDir := b.TempDir()
@ -52,7 +52,7 @@ func BenchmarkNativeQueueRebuildIndex(b *testing.B) {
// BenchmarkNativeQueueClaimNext profiles task claiming from binary heap
func BenchmarkNativeQueueClaimNext(b *testing.B) {
if !queue.UseNativeQueue {
b.Skip("Native queue not enabled (set FETCHML_NATIVE_LIBS=1 or build with -tags native_libs)")
b.Skip("Native queue not enabled (build with -tags native_libs)")
}
tmpDir := b.TempDir()
@ -63,7 +63,7 @@ func BenchmarkNativeQueueClaimNext(b *testing.B) {
defer q.Close()
// Seed with tasks
for i := 0; i < 100; i++ {
for i := range 100 {
task := &queue.Task{
ID: "task-" + string(rune('0'+i/10)) + string(rune('0'+i%10)),
JobName: "job-" + string(rune('0'+i/10)),
@ -74,10 +74,9 @@ func BenchmarkNativeQueueClaimNext(b *testing.B) {
}
}
b.ResetTimer()
b.ReportAllocs()
for i := 0; i < b.N; i++ {
for b.Loop() {
// Binary heap pop - no JSON parsing
_, _ = q.PeekNextTask()
}
@ -86,7 +85,7 @@ func BenchmarkNativeQueueClaimNext(b *testing.B) {
// BenchmarkNativeQueueGetAllTasks profiles full task scan from binary index
func BenchmarkNativeQueueGetAllTasks(b *testing.B) {
if !queue.UseNativeQueue {
b.Skip("Native queue not enabled (set FETCHML_NATIVE_LIBS=1 or build with -tags native_libs)")
b.Skip("Native queue not enabled (build with -tags native_libs)")
}
tmpDir := b.TempDir()
@ -97,7 +96,7 @@ func BenchmarkNativeQueueGetAllTasks(b *testing.B) {
defer q.Close()
// Seed with tasks
for i := 0; i < 100; i++ {
for i := range 100 {
task := &queue.Task{
ID: "task-" + string(rune('0'+i/10)) + string(rune('0'+i%10)),
JobName: "job-" + string(rune('0'+i/10)),
@ -108,10 +107,9 @@ func BenchmarkNativeQueueGetAllTasks(b *testing.B) {
}
}
b.ResetTimer()
b.ReportAllocs()
for i := 0; i < b.N; i++ {
for b.Loop() {
_, err := q.GetAllTasks()
if err != nil {
b.Fatal(err)

View file

@ -8,32 +8,33 @@ import (
"testing"
"time"
"github.com/alicebob/miniredis/v2"
"github.com/jfraeys/fetch_ml/internal/metrics"
"github.com/jfraeys/fetch_ml/internal/storage"
fixtures "github.com/jfraeys/fetch_ml/tests/fixtures"
"github.com/redis/go-redis/v9"
)
// setupPerformanceRedis creates a Redis client for performance testing
// setupPerformanceRedis creates a Redis client using miniredis for performance testing
func setupPerformanceRedis(t *testing.T) *redis.Client {
// Start in-memory Redis server
s, err := miniredis.Run()
if err != nil {
t.Fatalf("failed to start miniredis: %v", err)
}
rdb := redis.NewClient(&redis.Options{
Addr: "localhost:6379",
Password: "",
DB: 4, // Use DB 4 for performance tests to avoid conflicts
Addr: s.Addr(),
})
ctx := context.Background()
if err := rdb.Ping(ctx).Err(); err != nil {
t.Skipf("Redis not available, skipping performance test: %v", err)
return nil
t.Fatalf("miniredis ping failed: %v", err)
}
// Clean up the test database
rdb.FlushDB(ctx)
t.Cleanup(func() {
rdb.FlushDB(ctx)
defer func() { _ = rdb.Close() }()
rdb.Close()
s.Close()
})
return rdb
@ -74,7 +75,7 @@ func TestPayloadPerformanceSmall(t *testing.T) {
start := time.Now()
// Create jobs with small payloads
for i := 0; i < numJobs; i++ {
for i := range numJobs {
jobID := fmt.Sprintf("small-payload-job-%d", i)
// Create small payload
@ -112,7 +113,7 @@ func TestPayloadPerformanceSmall(t *testing.T) {
// Process jobs
start = time.Now()
for i := 0; i < numJobs; i++ {
for i := range numJobs {
jobID := fmt.Sprintf("small-payload-job-%d", i)
// Update job status
@ -143,9 +144,9 @@ func TestPayloadPerformanceSmall(t *testing.T) {
avgTimePerJob := totalTime / time.Duration(numJobs)
t.Logf("Performance Results:")
t.Logf(" Total time: %v", totalTime)
t.Logf(" Jobs per second: %.2f", jobsPerSecond)
t.Logf(" Average time per job: %v", avgTimePerJob)
t.Logf("\tTotal time: %v", totalTime)
t.Logf("\tJobs per second: %.2f", jobsPerSecond)
t.Logf("\tAverage time per job: %v", avgTimePerJob)
// Verify performance thresholds
if jobsPerSecond < 50 { // Should handle at least 50 jobs/second for small payloads
@ -195,7 +196,7 @@ func TestPayloadPerformanceLarge(t *testing.T) {
start := time.Now()
// Create jobs with large payloads
for i := 0; i < numJobs; i++ {
for i := range numJobs {
jobID := fmt.Sprintf("large-payload-job-%d", i)
// Create large payload
@ -233,7 +234,7 @@ func TestPayloadPerformanceLarge(t *testing.T) {
// Process jobs
start = time.Now()
for i := 0; i < numJobs; i++ {
for i := range numJobs {
jobID := fmt.Sprintf("large-payload-job-%d", i)
// Update job status
@ -358,7 +359,7 @@ func TestPayloadPerformanceConcurrent(t *testing.T) {
// Create jobs concurrently
done := make(chan bool, numWorkers)
for worker := 0; worker < numWorkers; worker++ {
for worker := range numWorkers {
go func(w int) {
defer func() { done <- true }()
@ -400,7 +401,7 @@ func TestPayloadPerformanceConcurrent(t *testing.T) {
}
// Wait for all workers to complete
for i := 0; i < numWorkers; i++ {
for range numWorkers {
<-done
}
@ -410,11 +411,11 @@ func TestPayloadPerformanceConcurrent(t *testing.T) {
// Process jobs concurrently
start = time.Now()
for worker := 0; worker < numWorkers; worker++ {
for worker := range numWorkers {
go func(w int) {
defer func() { done <- true }()
for i := 0; i < jobsPerWorker; i++ {
for i := range jobsPerWorker {
jobID := fmt.Sprintf("concurrent-job-w%d-i%d", w, i)
// Update job status
@ -442,7 +443,7 @@ func TestPayloadPerformanceConcurrent(t *testing.T) {
}
// Wait for all workers to complete
for i := 0; i < numWorkers; i++ {
for range numWorkers {
<-done
}
@ -456,10 +457,10 @@ func TestPayloadPerformanceConcurrent(t *testing.T) {
concurrencyFactor := float64(totalJobs) / float64(creationTime.Seconds()) / 50 // Relative to baseline
t.Logf("Concurrent Performance Results:")
t.Logf(" Total time: %v", totalTime)
t.Logf(" Jobs per second: %.2f", jobsPerSecond)
t.Logf(" Average time per job: %v", avgTimePerJob)
t.Logf(" Concurrency factor: %.2f", concurrencyFactor)
t.Logf("\tTotal time: %v", totalTime)
t.Logf("\tJobs per second: %.2f", jobsPerSecond)
t.Logf("\tAverage time per job: %v", avgTimePerJob)
t.Logf("\tConcurrency factor: %.2f", concurrencyFactor)
// Verify concurrent performance benefits
if jobsPerSecond < 100 { // Should handle at least 100 jobs/second with concurrency
@ -545,7 +546,7 @@ func TestPayloadMemoryUsage(t *testing.T) {
runtime.ReadMemStats(&memBefore)
// Create jobs with specific payload size
for i := 0; i < numJobs; i++ {
for i := range numJobs {
jobID := fmt.Sprintf("memory-test-%d-%d", payloadSize, i)
payload := make([]byte, payloadSize)
@ -584,8 +585,12 @@ func TestPayloadMemoryUsage(t *testing.T) {
t.Errorf("Memory overhead too high for %d byte payloads: %.2fx (expected <= 10x)", payloadSize, payloadOverhead)
}
// TODO: Clean up jobs for next iteration
// Note: In a real implementation, we'd need a way to delete jobs
// For now, we'll just continue as the test will cleanup automatically
// Clean up jobs for next iteration
for i := range numJobs {
jobID := fmt.Sprintf("memory-test-%d-%d", payloadSize, i)
if err := db.DeleteJob(jobID); err != nil {
t.Logf("Warning: failed to delete job %s: %v", jobID, err)
}
}
}
}

View file

@ -24,7 +24,7 @@ func BenchmarkFilesystemQueueRebuildIndex(b *testing.B) {
defer q.Close()
// Seed with tasks
for i := 0; i < 100; i++ {
for i := range 100 {
task := &queue.Task{
ID: "task-" + string(rune('0'+i/10)) + string(rune('0'+i%10)),
JobName: "job-" + string(rune('0'+i/10)),
@ -39,7 +39,7 @@ func BenchmarkFilesystemQueueRebuildIndex(b *testing.B) {
b.ReportAllocs()
// Benchmark just the rebuild (not the full AddTask)
for i := 0; i < b.N; i++ {
for i := 0; b.Loop(); i++ {
// Force rebuild by adding one more task
task := &queue.Task{
ID: "bench-task-" + string(rune('0'+i%10)),
@ -62,7 +62,7 @@ func BenchmarkFilesystemQueueClaimNext(b *testing.B) {
defer q.Close()
// Seed with tasks
for i := 0; i < 100; i++ {
for i := range 100 {
task := &queue.Task{
ID: "task-" + string(rune('0'+i/10)) + string(rune('0'+i%10)),
JobName: "job-" + string(rune('0'+i/10)),
@ -76,7 +76,7 @@ func BenchmarkFilesystemQueueClaimNext(b *testing.B) {
b.ResetTimer()
b.ReportAllocs()
for i := 0; i < b.N; i++ {
for b.Loop() {
// This triggers ReadDir + JSON unmarshal + sort
_, _ = q.PeekNextTask()
}
@ -92,7 +92,7 @@ func BenchmarkFilesystemQueueGetAllTasks(b *testing.B) {
defer q.Close()
// Seed with tasks
for i := 0; i < 100; i++ {
for i := range 100 {
task := &queue.Task{
ID: "task-" + string(rune('0'+i/10)) + string(rune('0'+i%10)),
JobName: "job-" + string(rune('0'+i/10)),
@ -106,7 +106,7 @@ func BenchmarkFilesystemQueueGetAllTasks(b *testing.B) {
b.ResetTimer()
b.ReportAllocs()
for i := 0; i < b.N; i++ {
for b.Loop() {
_, err := q.GetAllTasks()
if err != nil {
b.Fatal(err)

View file

@ -75,7 +75,7 @@ func BenchmarkResponsePacketSerialize(b *testing.B) {
func benchmarkSerializePacket(b *testing.B, packet *api.ResponsePacket) {
b.Helper()
b.ReportAllocs()
for i := 0; i < b.N; i++ {
for b.Loop() {
if _, err := packet.Serialize(); err != nil {
b.Fatalf("serialize failed: %v", err)
}
@ -85,7 +85,7 @@ func benchmarkSerializePacket(b *testing.B, packet *api.ResponsePacket) {
func benchmarkLegacySerializePacket(b *testing.B, packet *api.ResponsePacket) {
b.Helper()
b.ReportAllocs()
for i := 0; i < b.N; i++ {
for b.Loop() {
if _, err := legacySerializePacket(packet); err != nil {
b.Fatalf("legacy serialize failed: %v", err)
}

View file

@ -6,6 +6,7 @@ import (
"compress/gzip"
"os"
"path/filepath"
"strings"
"testing"
"github.com/jfraeys/fetch_ml/internal/worker"
@ -16,10 +17,9 @@ func BenchmarkExtractTarGzGo(b *testing.B) {
tmpDir := b.TempDir()
archivePath := createStreamingTestArchive(b, tmpDir, 100, 1024) // 100 files, 1KB each
b.ResetTimer()
b.ReportAllocs()
for i := 0; i < b.N; i++ {
for i := 0; b.Loop(); i++ {
dstDir := filepath.Join(tmpDir, "extract_go_"+string(rune('0'+i%10)))
if err := os.MkdirAll(dstDir, 0750); err != nil {
b.Fatal(err)
@ -36,15 +36,17 @@ func BenchmarkExtractTarGzNative(b *testing.B) {
tmpDir := b.TempDir()
archivePath := createStreamingTestArchive(b, tmpDir, 100, 1024)
b.ResetTimer()
b.ReportAllocs()
for i := 0; i < b.N; i++ {
for i := 0; b.Loop(); i++ {
dstDir := filepath.Join(tmpDir, "extract_native_"+string(rune('0'+i%10)))
if err := os.MkdirAll(dstDir, 0750); err != nil {
b.Fatal(err)
}
if err := worker.ExtractTarGzNative(archivePath, dstDir); err != nil {
if strings.Contains(err.Error(), "native tar.gz extractor requires") {
b.Skip("Native tar.gz extractor not available: ", err)
}
b.Fatal(err)
}
}
@ -76,7 +78,7 @@ func BenchmarkExtractTarGzSizes(b *testing.B) {
func benchmarkBoth(b *testing.B, archivePath, tmpDir string) {
b.Run("Go", func(b *testing.B) {
b.ReportAllocs()
for i := 0; i < b.N; i++ {
for i := 0; b.Loop(); i++ {
dstDir := filepath.Join(tmpDir, "go_"+string(rune('0'+i%10)))
if err := os.MkdirAll(dstDir, 0750); err != nil {
b.Fatal(err)
@ -89,12 +91,15 @@ func benchmarkBoth(b *testing.B, archivePath, tmpDir string) {
b.Run("Native", func(b *testing.B) {
b.ReportAllocs()
for i := 0; i < b.N; i++ {
for i := 0; b.Loop(); i++ {
dstDir := filepath.Join(tmpDir, "native_"+string(rune('0'+i%10)))
if err := os.MkdirAll(dstDir, 0750); err != nil {
b.Fatal(err)
}
if err := worker.ExtractTarGzNative(archivePath, dstDir); err != nil {
if strings.Contains(err.Error(), "native tar.gz extractor requires") {
b.Skip("Native tar.gz extractor not available: ", err)
}
b.Fatal(err)
}
}