From be67cb77d3552fa8d24ede31d95703c8960d0c70 Mon Sep 17 00:00:00 2001 From: Jeremie Fraeys Date: Mon, 23 Feb 2026 18:03:54 -0500 Subject: [PATCH] test(benchmarks): update benchmark tests with job cleanup and improvements **Payload Performance Test:** - Add job cleanup after each iteration using DeleteJob() - Ensure isolated memory measurements between test runs **All Benchmark Tests:** - General improvements and maintenance updates --- tests/benchmarks/api_benchmark_test.go | 73 ++++++++++++------- .../benchmarks/artifact_scanner_bench_test.go | 7 ++ tests/benchmarks/config_parsing_bench_test.go | 3 +- tests/benchmarks/context_reuse_bench_test.go | 5 +- .../dataset_hash_bench_native_test.go | 7 +- tests/benchmarks/dataset_hash_bench_test.go | 14 ++-- .../dataset_size_comparison_test.go | 12 ++- tests/benchmarks/go_native_leak_test.go | 8 +- .../json_serialization_bench_test.go | 12 +-- .../benchmarks/jupyter_service_bench_test.go | 29 ++++---- tests/benchmarks/log_sanitize_bench_test.go | 9 +-- .../ml_experiment_benchmark_test.go | 45 ++++++------ tests/benchmarks/native_queue_basic_test.go | 5 +- tests/benchmarks/native_queue_bench_test.go | 16 ++-- tests/benchmarks/payload_performance_test.go | 67 +++++++++-------- tests/benchmarks/queue_bench_test.go | 12 +-- .../response_packet_benchmark_test.go | 4 +- tests/benchmarks/streaming_io_bench_test.go | 17 +++-- 18 files changed, 188 insertions(+), 157 deletions(-) diff --git a/tests/benchmarks/api_benchmark_test.go b/tests/benchmarks/api_benchmark_test.go index 08fb8bb..e3048ca 100644 --- a/tests/benchmarks/api_benchmark_test.go +++ b/tests/benchmarks/api_benchmark_test.go @@ -21,19 +21,20 @@ func newBenchmarkHTTPClient() *http.Client { transport := &http.Transport{ Proxy: http.ProxyFromEnvironment, DialContext: (&net.Dialer{ - Timeout: 30 * time.Second, + Timeout: 5 * time.Second, KeepAlive: 30 * time.Second, }).DialContext, - ForceAttemptHTTP2: true, - MaxIdleConns: 256, - MaxIdleConnsPerHost: 256, + ForceAttemptHTTP2: false, // Disable HTTP/2 for benchmark stability + MaxIdleConns: 500, + MaxIdleConnsPerHost: 500, + MaxConnsPerHost: 500, IdleConnTimeout: 90 * time.Second, - TLSHandshakeTimeout: 10 * time.Second, + TLSHandshakeTimeout: 5 * time.Second, ExpectContinueTimeout: 1 * time.Second, } return &http.Client{ - Timeout: 30 * time.Second, + Timeout: 10 * time.Second, Transport: transport, } } @@ -77,13 +78,12 @@ func BenchmarkAPIServerCreateJobSimple(b *testing.B) { client := &http.Client{Timeout: 30 * time.Second} baseURL := "http://" + addr - b.ResetTimer() b.ReportAllocs() - for i := 0; i < b.N; i++ { - jobData := map[string]interface{}{ + for i := 0; b.Loop(); i++ { + jobData := map[string]any{ "job_name": fmt.Sprintf("benchmark-job-%d", i), - "args": map[string]interface{}{ + "args": map[string]any{ "model": "test-model", "data": generateTestPayload(1024), }, @@ -136,10 +136,9 @@ func BenchmarkMetricsCollection(b *testing.B) { registry.MustRegister(counter, histogram) - b.ResetTimer() b.ReportAllocs() - for i := 0; i < b.N; i++ { + for i := 0; b.Loop(); i++ { counter.Inc() histogram.Observe(float64(i) * 0.001) } @@ -205,8 +204,23 @@ func setupTestAPIServer(_ *testing.B) *httptest.Server { w.WriteHeader(http.StatusOK) _ = json.NewEncoder(w).Encode(map[string]string{"status": "ok"}) }) - mux.HandleFunc("/ws", func(w http.ResponseWriter, _ *http.Request) { - w.WriteHeader(http.StatusOK) + mux.HandleFunc("/ws", func(w http.ResponseWriter, r *http.Request) { + upgrader := websocket.Upgrader{ + CheckOrigin: func(_ *http.Request) bool { return true }, + } + conn, err := upgrader.Upgrade(w, r, nil) + if err != nil { + return + } + defer conn.Close() + // Echo back any messages + for { + mt, message, err := conn.ReadMessage() + if err != nil { + break + } + _ = conn.WriteMessage(mt, message) + } }) return httptest.NewServer(mux) @@ -214,12 +228,18 @@ func setupTestAPIServer(_ *testing.B) *httptest.Server { // benchmarkCreateJob tests job creation performance func benchmarkCreateJob(b *testing.B, baseURL string, client *http.Client) { - for i := 0; i < b.N; i++ { - jobData := map[string]interface{}{ - "job_name": fmt.Sprintf("benchmark-job-%d", i), - "args": map[string]interface{}{ + // Pre-generate test payload to avoid allocation overhead in hot path + testPayload := generateTestPayload(1024) + // Use static job name to avoid fmt.Sprintf overhead in benchmark loop + jobName := "benchmark-job" + + for i := 0; b.Loop(); i++ { + _ = i // Avoid unused variable warning + jobData := map[string]any{ + "job_name": jobName, + "args": map[string]any{ "model": "test-model", - "data": generateTestPayload(1024), // 1KB payload + "data": testPayload, }, "priority": 0, } @@ -244,7 +264,7 @@ func benchmarkCreateJob(b *testing.B, baseURL string, client *http.Client) { // benchmarkListJobs tests job listing performance func benchmarkListJobs(b *testing.B, baseURL string, client *http.Client) { - for i := 0; i < b.N; i++ { + for i := 0; b.Loop(); i++ { req, err := http.NewRequestWithContext(context.Background(), "GET", baseURL+"/api/v1/jobs", nil) if err != nil { b.Fatalf("Failed to create request: %v", err) @@ -274,20 +294,21 @@ func BenchmarkAPIServerListJobs(b *testing.B) { func BenchmarkWebSocketConnection(b *testing.B) { server := setupTestAPIServer(b) defer server.Close() + b.ReportAllocs() - for i := 0; i < b.N; i++ { - // Convert HTTP URL to WebSocket URL - wsURL := strings.Replace(server.URL, "http://", "ws://", 1) - wsURL += "/ws" + // Convert HTTP URL to WebSocket URL once + wsURL := strings.Replace(server.URL, "http://", "ws://", 1) + "/ws" + for b.Loop() { conn, resp, err := websocket.DefaultDialer.Dial(wsURL, nil) if resp != nil && resp.Body != nil { _ = resp.Body.Close() } if err != nil { - // Skip iteration if WebSocket server isn't available - continue + b.Fatalf("WebSocket dial failed: %v", err) } + // Send close message and wait for server to close + _ = conn.WriteMessage(websocket.CloseMessage, websocket.FormatCloseMessage(websocket.CloseNormalClosure, "")) _ = conn.Close() } } diff --git a/tests/benchmarks/artifact_scanner_bench_test.go b/tests/benchmarks/artifact_scanner_bench_test.go index f237ef2..2b31b95 100644 --- a/tests/benchmarks/artifact_scanner_bench_test.go +++ b/tests/benchmarks/artifact_scanner_bench_test.go @@ -3,6 +3,7 @@ package benchmarks import ( "os" "path/filepath" + "strings" "testing" "github.com/jfraeys/fetch_ml/internal/worker" @@ -38,6 +39,9 @@ func BenchmarkArtifactScanNative(b *testing.B) { for b.Loop() { _, err := worker.ScanArtifactsNative(tmpDir) if err != nil { + if strings.Contains(err.Error(), "native artifact scanner requires") { + b.Skip("Native artifact scanner not available: ", err) + } b.Fatal(err) } } @@ -65,6 +69,9 @@ func BenchmarkArtifactScanLarge(b *testing.B) { for b.Loop() { _, err := worker.ScanArtifactsNative(tmpDir) if err != nil { + if strings.Contains(err.Error(), "native artifact scanner requires") { + b.Skip("Native artifact scanner not available: ", err) + } b.Fatal(err) } } diff --git a/tests/benchmarks/config_parsing_bench_test.go b/tests/benchmarks/config_parsing_bench_test.go index e5ad50a..4c27e08 100644 --- a/tests/benchmarks/config_parsing_bench_test.go +++ b/tests/benchmarks/config_parsing_bench_test.go @@ -132,10 +132,9 @@ type BenchmarkMonitoringConfig struct { func BenchmarkConfigYAMLUnmarshal(b *testing.B) { data := []byte(sampleServerConfig) - b.ResetTimer() b.ReportAllocs() - for i := 0; i < b.N; i++ { + for b.Loop() { var cfg BenchmarkServerConfig err := yaml.Unmarshal(data, &cfg) if err != nil { diff --git a/tests/benchmarks/context_reuse_bench_test.go b/tests/benchmarks/context_reuse_bench_test.go index c569b44..7786898 100644 --- a/tests/benchmarks/context_reuse_bench_test.go +++ b/tests/benchmarks/context_reuse_bench_test.go @@ -49,11 +49,10 @@ func BenchmarkSequentialHashes(b *testing.B) { testDir := createSmallDataset(b) b.ReportAllocs() - b.ResetTimer() - for i := 0; i < b.N; i++ { + for b.Loop() { // Simulate viewing 10 datasets (like TUI scrolling) - for j := 0; j < 10; j++ { + for range 10 { _, err := worker.DirOverallSHA256Hex(testDir) if err != nil { b.Fatal(err) diff --git a/tests/benchmarks/dataset_hash_bench_native_test.go b/tests/benchmarks/dataset_hash_bench_native_test.go index c88a25a..3164aa3 100644 --- a/tests/benchmarks/dataset_hash_bench_native_test.go +++ b/tests/benchmarks/dataset_hash_bench_native_test.go @@ -38,7 +38,7 @@ func BenchmarkDirOverallSHA256Hex_Native(b *testing.B) { if err := os.MkdirAll(metaDir, 0750); err != nil { b.Fatal(err) } - for i := 0; i < 10; i++ { + for i := range 10 { if err := os.WriteFile( filepath.Join(metaDir, "file"+string(rune('0'+i))+".json"), []byte(`{"key": "value"}`), @@ -51,7 +51,7 @@ func BenchmarkDirOverallSHA256Hex_Native(b *testing.B) { b.ResetTimer() b.ReportAllocs() - for i := 0; i < b.N; i++ { + for b.Loop() { _, err := worker.DirOverallSHA256Hex(tmpDir) if err != nil { b.Fatal(err) @@ -82,10 +82,9 @@ func BenchmarkDirOverallSHA256HexLarge_Native(b *testing.B) { } } - b.ResetTimer() b.ReportAllocs() - for i := 0; i < b.N; i++ { + for b.Loop() { _, err := worker.DirOverallSHA256Hex(tmpDir) if err != nil { b.Fatal(err) diff --git a/tests/benchmarks/dataset_hash_bench_test.go b/tests/benchmarks/dataset_hash_bench_test.go index a32b2d9..bdf260c 100644 --- a/tests/benchmarks/dataset_hash_bench_test.go +++ b/tests/benchmarks/dataset_hash_bench_test.go @@ -42,7 +42,7 @@ func BenchmarkDirOverallSHA256Hex(b *testing.B) { if err := os.MkdirAll(metaDir, 0750); err != nil { b.Fatal(err) } - for i := 0; i < 10; i++ { + for i := range 10 { if err := os.WriteFile( filepath.Join(metaDir, "file"+string(rune('0'+i))+".json"), []byte(`{"key": "value"}`), @@ -55,7 +55,7 @@ func BenchmarkDirOverallSHA256Hex(b *testing.B) { b.ResetTimer() b.ReportAllocs() - for i := 0; i < b.N; i++ { + for b.Loop() { _, err := worker.DirOverallSHA256Hex(tmpDir) if err != nil { b.Fatal(err) @@ -68,7 +68,7 @@ func BenchmarkDirOverallSHA256HexLarge(b *testing.B) { tmpDir := b.TempDir() // Create 50 files of 100KB each = ~5MB total - for i := 0; i < 50; i++ { + for i := range 50 { subdir := filepath.Join(tmpDir, "data", string(rune('a'+i%26))) if err := os.MkdirAll(subdir, 0750); err != nil { b.Fatal(err) @@ -88,7 +88,7 @@ func BenchmarkDirOverallSHA256HexLarge(b *testing.B) { b.Run("Sequential", func(b *testing.B) { b.ReportAllocs() - for i := 0; i < b.N; i++ { + for b.Loop() { _, err := worker.DirOverallSHA256Hex(tmpDir) if err != nil { b.Fatal(err) @@ -98,7 +98,7 @@ func BenchmarkDirOverallSHA256HexLarge(b *testing.B) { b.Run("ParallelGo", func(b *testing.B) { b.ReportAllocs() - for i := 0; i < b.N; i++ { + for b.Loop() { _, err := worker.DirOverallSHA256Hex(tmpDir) if err != nil { b.Fatal(err) @@ -107,10 +107,10 @@ func BenchmarkDirOverallSHA256HexLarge(b *testing.B) { }) b.Run("Native", func(b *testing.B) { - // This requires FETCHML_NATIVE_LIBS=1 to actually use native + // This requires -tags native_libs to actually use native // Otherwise falls back to Go implementation b.ReportAllocs() - for i := 0; i < b.N; i++ { + for b.Loop() { _, err := worker.DirOverallSHA256Hex(tmpDir) if err != nil { b.Fatal(err) diff --git a/tests/benchmarks/dataset_size_comparison_test.go b/tests/benchmarks/dataset_size_comparison_test.go index 63084d9..1b6a802 100644 --- a/tests/benchmarks/dataset_size_comparison_test.go +++ b/tests/benchmarks/dataset_size_comparison_test.go @@ -3,6 +3,7 @@ package benchmarks import ( "os" "path/filepath" + "strings" "testing" "github.com/jfraeys/fetch_ml/internal/worker" @@ -10,7 +11,7 @@ import ( ) // BenchmarkDatasetSizeComparison finds the crossover point where native wins -// Run with: FETCHML_NATIVE_LIBS=1 go test -tags native_libs -bench=BenchmarkDatasetSize ./tests/benchmarks/ +// Run with: go test -tags native_libs -bench=BenchmarkDatasetSize ./tests/benchmarks/ func BenchmarkDatasetSizeComparison(b *testing.B) { sizes := []struct { name string @@ -48,9 +49,12 @@ func BenchmarkDatasetSizeComparison(b *testing.B) { b.ResetTimer() b.ReportAllocs() for i := 0; i < b.N; i++ { - // Use DirOverallSHA256Hex which calls native via build tag - _, err := worker.DirOverallSHA256Hex(tmpDir) + // Use DirOverallSHA256HexNative which calls native C++ implementation + _, err := worker.DirOverallSHA256HexNative(tmpDir) if err != nil { + if strings.Contains(err.Error(), "native hash requires") { + b.Skip("Native hash not available: ", err) + } b.Fatal(err) } } @@ -64,7 +68,7 @@ func createTestFiles(b *testing.B, dir string, numFiles int, fileSize int) { data[i] = byte(i % 256) } - for i := 0; i < numFiles; i++ { + for i := range numFiles { path := filepath.Join(dir, "data", string(rune('a'+i%26)), "chunk.bin") if err := os.MkdirAll(filepath.Dir(path), 0750); err != nil { b.Fatal(err) diff --git a/tests/benchmarks/go_native_leak_test.go b/tests/benchmarks/go_native_leak_test.go index 81d15b2..1b67370 100644 --- a/tests/benchmarks/go_native_leak_test.go +++ b/tests/benchmarks/go_native_leak_test.go @@ -12,7 +12,7 @@ func TestGoNativeLeakStress(t *testing.T) { tmpDir := t.TempDir() // Create multiple test files - for i := 0; i < 10; i++ { + for i := range 10 { content := make([]byte, 1024*1024) // 1MB each for j := range content { content[j] = byte(i * j) @@ -23,7 +23,7 @@ func TestGoNativeLeakStress(t *testing.T) { } // Run 1000 hash operations through Go wrapper - for i := 0; i < 1000; i++ { + for i := range 1000 { hash, err := worker.DirOverallSHA256Hex(tmpDir) if err != nil { t.Fatalf("Hash %d failed: %v", i, err) @@ -49,14 +49,14 @@ func TestGoNativeArtifactScanLeak(t *testing.T) { tmpDir := t.TempDir() // Create test files - for i := 0; i < 50; i++ { + for i := range 50 { if err := os.WriteFile(tmpDir+"/file_"+string(rune('a'+i%26))+".txt", []byte("data"), 0644); err != nil { t.Fatal(err) } } // Run 100 scans - for i := 0; i < 100; i++ { + for i := range 100 { _, err := worker.ScanArtifactsNative(tmpDir) if err != nil { t.Logf("Scan %d: %v (may be expected if native disabled)", i, err) diff --git a/tests/benchmarks/json_serialization_bench_test.go b/tests/benchmarks/json_serialization_bench_test.go index 2d49917..432ba3a 100644 --- a/tests/benchmarks/json_serialization_bench_test.go +++ b/tests/benchmarks/json_serialization_bench_test.go @@ -17,7 +17,7 @@ func BenchmarkTaskJSONMarshal(b *testing.B) { b.ResetTimer() b.ReportAllocs() - for i := 0; i < b.N; i++ { + for b.Loop() { _, err := json.Marshal(task) if err != nil { b.Fatal(err) @@ -37,7 +37,7 @@ func BenchmarkTaskJSONUnmarshal(b *testing.B) { b.ResetTimer() b.ReportAllocs() - for i := 0; i < b.N; i++ { + for b.Loop() { var t queue.Task err := json.Unmarshal(data, &t) if err != nil { @@ -53,7 +53,7 @@ func BenchmarkTaskJSONRoundTrip(b *testing.B) { b.ResetTimer() b.ReportAllocs() - for i := 0; i < b.N; i++ { + for b.Loop() { data, err := json.Marshal(task) if err != nil { b.Fatal(err) @@ -87,7 +87,7 @@ func BenchmarkPrewarmStateJSONMarshal(b *testing.B) { b.ResetTimer() b.ReportAllocs() - for i := 0; i < b.N; i++ { + for b.Loop() { _, err := json.Marshal(state) if err != nil { b.Fatal(err) @@ -119,7 +119,7 @@ func BenchmarkPrewarmStateJSONUnmarshal(b *testing.B) { b.ResetTimer() b.ReportAllocs() - for i := 0; i < b.N; i++ { + for b.Loop() { var s queue.PrewarmState err := json.Unmarshal(data, &s) if err != nil { @@ -139,7 +139,7 @@ func BenchmarkTaskBatchJSONMarshal(b *testing.B) { b.ResetTimer() b.ReportAllocs() - for i := 0; i < b.N; i++ { + for b.Loop() { for _, task := range tasks { _, err := json.Marshal(task) if err != nil { diff --git a/tests/benchmarks/jupyter_service_bench_test.go b/tests/benchmarks/jupyter_service_bench_test.go index 39d9aaa..4804b81 100644 --- a/tests/benchmarks/jupyter_service_bench_test.go +++ b/tests/benchmarks/jupyter_service_bench_test.go @@ -20,10 +20,9 @@ func BenchmarkResolveWorkspacePath(b *testing.B) { "deep/nested/workspace/name", } - b.ResetTimer() b.ReportAllocs() - for i := 0; i < b.N; i++ { + for i := 0; b.Loop(); i++ { path := testPaths[i%len(testPaths)] // Simulate the resolveWorkspacePath logic inline ws := strings.TrimSpace(path) @@ -50,7 +49,7 @@ func BenchmarkStringTrimSpace(b *testing.B) { b.ResetTimer() b.ReportAllocs() - for i := 0; i < b.N; i++ { + for i := 0; b.Loop(); i++ { _ = strings.TrimSpace(testInputs[i%len(testInputs)]) } } @@ -69,7 +68,7 @@ func BenchmarkStringSplit(b *testing.B) { b.ResetTimer() b.ReportAllocs() - for i := 0; i < b.N; i++ { + for i := 0; b.Loop(); i++ { _ = strings.Split(testInputs[i%len(testInputs)], ",") } } @@ -89,7 +88,7 @@ func BenchmarkStringHasPrefix(b *testing.B) { b.ResetTimer() b.ReportAllocs() - for i := 0; i < b.N; i++ { + for i := 0; b.Loop(); i++ { _ = strings.HasPrefix(testInputs[i%len(testInputs)], prefixes[i%len(prefixes)]) } } @@ -110,7 +109,7 @@ func BenchmarkStringEqualFold(b *testing.B) { b.ResetTimer() b.ReportAllocs() - for i := 0; i < b.N; i++ { + for i := 0; b.Loop(); i++ { pair := testPairs[i%len(testPairs)] _ = strings.EqualFold(pair.a, pair.b) } @@ -131,7 +130,7 @@ func BenchmarkFilepathClean(b *testing.B) { b.ResetTimer() b.ReportAllocs() - for i := 0; i < b.N; i++ { + for i := 0; b.Loop(); i++ { _ = filepath.Clean(testPaths[i%len(testPaths)]) } } @@ -149,7 +148,7 @@ func BenchmarkFilepathJoin(b *testing.B) { b.ResetTimer() b.ReportAllocs() - for i := 0; i < b.N; i++ { + for i := 0; b.Loop(); i++ { parts := components[i%len(components)] _ = filepath.Join(parts...) } @@ -169,7 +168,7 @@ func BenchmarkStrconvAtoi(b *testing.B) { b.ResetTimer() b.ReportAllocs() - for i := 0; i < b.N; i++ { + for i := 0; b.Loop(); i++ { _, _ = strconv.Atoi(testInputs[i%len(testInputs)]) } } @@ -183,7 +182,7 @@ func BenchmarkTimeFormat(b *testing.B) { b.ResetTimer() b.ReportAllocs() - for i := 0; i < b.N; i++ { + for i := 0; b.Loop(); i++ { _ = now.Format(format) } } @@ -197,7 +196,7 @@ func BenchmarkSprintfConcat(b *testing.B) { b.ResetTimer() b.ReportAllocs() - for i := 0; i < b.N; i++ { + for i := 0; b.Loop(); i++ { _ = names[i%len(names)] + "_" + timestamps[i%len(timestamps)] } } @@ -216,7 +215,7 @@ func BenchmarkPackageListParsing(b *testing.B) { b.ResetTimer() b.ReportAllocs() - for i := 0; i < b.N; i++ { + for i := 0; b.Loop(); i++ { input := testInputs[i%len(testInputs)] parts := strings.Split(input, ",") out := make([]string, 0, len(parts)) @@ -242,10 +241,9 @@ func BenchmarkEnvLookup(b *testing.B) { "NONEXISTENT_VAR", } - b.ResetTimer() b.ReportAllocs() - for i := 0; i < b.N; i++ { + for i := 0; b.Loop(); i++ { _, _ = os.LookupEnv(keys[i%len(keys)]) } } @@ -260,10 +258,9 @@ func BenchmarkCombinedJupyterHotPath(b *testing.B) { "deep/nested/name", } - b.ResetTimer() b.ReportAllocs() - for i := 0; i < b.N; i++ { + for i := 0; b.Loop(); i++ { // Simulate resolveWorkspacePath + path building ws := strings.TrimSpace(testWorkspaces[i%len(testWorkspaces)]) clean := filepath.Clean(ws) diff --git a/tests/benchmarks/log_sanitize_bench_test.go b/tests/benchmarks/log_sanitize_bench_test.go index 7635d04..8965c37 100644 --- a/tests/benchmarks/log_sanitize_bench_test.go +++ b/tests/benchmarks/log_sanitize_bench_test.go @@ -26,10 +26,9 @@ func BenchmarkLogSanitizeMessage(b *testing.B) { "Authentication token: eyJ0eXAiOiJKV1QiLCJhbGciOiJIUzI1NiJ9.test.signature", } - b.ResetTimer() b.ReportAllocs() - for i := 0; i < b.N; i++ { + for i := 0; b.Loop(); i++ { msg := messages[i%len(messages)] _ = logging.SanitizeLogMessage(msg) } @@ -50,10 +49,9 @@ func BenchmarkLogSanitizeArgs(b *testing.B) { "timestamp", "2024-01-01T00:00:00Z", } - b.ResetTimer() b.ReportAllocs() - for i := 0; i < b.N; i++ { + for b.Loop() { _ = logging.SanitizeArgs(args) } } @@ -73,10 +71,9 @@ func BenchmarkLogSanitizeHighVolume(b *testing.B) { "Webhook received with authorization=Bearer eyJ0eXAiOiJKV1Qi.test.sig", } - b.ResetTimer() b.ReportAllocs() - for i := 0; i < b.N; i++ { + for b.Loop() { // Simulate batch processing of multiple messages for _, msg := range testMessages { _ = logging.SanitizeLogMessage(msg) diff --git a/tests/benchmarks/ml_experiment_benchmark_test.go b/tests/benchmarks/ml_experiment_benchmark_test.go index 8f07d33..6efb959 100644 --- a/tests/benchmarks/ml_experiment_benchmark_test.go +++ b/tests/benchmarks/ml_experiment_benchmark_test.go @@ -7,6 +7,7 @@ import ( "testing" "time" + "github.com/alicebob/miniredis/v2" "github.com/jfraeys/fetch_ml/internal/metrics" "github.com/jfraeys/fetch_ml/internal/storage" fixtures "github.com/jfraeys/fetch_ml/tests/fixtures" @@ -87,7 +88,7 @@ func benchmarkMLExperiment(b *testing.B, db *storage.DB, rdb *redis.Client, expT _ = executionTime // Use executionTime to avoid unused variable warning // Record experiment metrics - for j := 0; j < 5; j++ { + for j := range 5 { metricName := fmt.Sprintf("metric_%d_%d", j, i) err := db.RecordJobMetric(expID, metricName, fmt.Sprintf("%.2f", float64(j)*1.5)) if err != nil { @@ -106,10 +107,10 @@ func benchmarkConcurrentExperiments(b *testing.B, db *storage.DB, rdb *redis.Cli b.ResetTimer() // Create experiments concurrently - for i := 0; i < b.N; i++ { + for i := 0; b.Loop(); i++ { done := make(chan bool, numExperiments) - for exp := 0; exp < numExperiments; exp++ { + for exp := range numExperiments { go func(expID int) { defer func() { done <- true }() @@ -161,9 +162,9 @@ func benchmarkExperimentMetrics(b *testing.B, db *storage.DB, _ *redis.Client) { b.ReportAllocs() // Record metrics for all jobs - for i := 0; i < b.N; i++ { + for i := 0; b.Loop(); i++ { for _, jobID := range jobIDs { - for j := 0; j < metricsPerJob; j++ { + for j := range metricsPerJob { metricName := fmt.Sprintf("metric_%d_%d", j, i) metricValue := fmt.Sprintf("%.6f", float64(i*j)*0.001) @@ -207,7 +208,7 @@ func BenchmarkDatasetOperations(b *testing.B) { } func benchmarkDatasetCreation(b *testing.B, db *storage.DB) { - for i := 0; i < b.N; i++ { + for i := 0; b.Loop(); i++ { datasetID := fmt.Sprintf("dataset-%d-%d", i, time.Now().UnixNano()) // Create a job first for foreign key constraint @@ -244,7 +245,7 @@ func benchmarkDatasetCreation(b *testing.B, db *storage.DB) { func benchmarkDatasetRetrieval(b *testing.B, db *storage.DB) { // Pre-create datasets numDatasets := 100 - for i := 0; i < numDatasets; i++ { + for i := range numDatasets { datasetID := fmt.Sprintf("dataset-%d-%d", i, time.Now().UnixNano()) // Create a job first @@ -260,7 +261,7 @@ func benchmarkDatasetRetrieval(b *testing.B, db *storage.DB) { _ = db.RecordJobMetric(datasetID, "dataset_type", "training") } - for i := 0; i < b.N; i++ { + for i := 0; b.Loop(); i++ { datasetID := fmt.Sprintf("dataset-%d", i%numDatasets) // Simulate dataset metadata retrieval @@ -274,7 +275,7 @@ func benchmarkDatasetUpdate(b *testing.B, db *storage.DB) { // Pre-create datasets numDatasets := 50 datasetIDs := make([]string, numDatasets) - for i := 0; i < numDatasets; i++ { + for i := range numDatasets { datasetID := fmt.Sprintf("dataset-%d-%d", i, time.Now().UnixNano()) datasetIDs[i] = datasetID @@ -290,7 +291,7 @@ func benchmarkDatasetUpdate(b *testing.B, db *storage.DB) { _ = db.RecordJobMetric(datasetID, "dataset_size", fmt.Sprintf("%d", 1024)) } - for i := 0; i < b.N; i++ { + for i := 0; b.Loop(); i++ { datasetID := datasetIDs[i%numDatasets] // Update dataset metadata @@ -309,24 +310,24 @@ func benchmarkDatasetUpdate(b *testing.B, db *storage.DB) { // Helper functions func setupBenchmarkRedis(b *testing.B) *redis.Client { + // Start in-memory Redis server + s, err := miniredis.Run() + if err != nil { + b.Fatalf("failed to start miniredis: %v", err) + } + rdb := redis.NewClient(&redis.Options{ - Addr: "localhost:6379", - Password: "", - DB: 5, // Use DB 5 for benchmarks + Addr: s.Addr(), }) ctx := context.Background() if err := rdb.Ping(ctx).Err(); err != nil { - b.Skipf("Redis not available, skipping benchmark: %v", err) - return nil + b.Fatalf("miniredis ping failed: %v", err) } - // Clean up the test database - rdb.FlushDB(ctx) - b.Cleanup(func() { - rdb.FlushDB(ctx) - _ = rdb.Close() + rdb.Close() + s.Close() }) return rdb @@ -350,7 +351,7 @@ func createMLExperiment(db *storage.DB, rdb *redis.Client, expID string, numJobs } // Create individual jobs for the experiment - for i := 0; i < numJobs; i++ { + for i := range numJobs { jobID := fmt.Sprintf("%s-job-%d", expID, i) payload := generateMLPayload(payloadSize, i) @@ -382,7 +383,7 @@ func executeMLExperiment(db *storage.DB, rdb *redis.Client, expID string, numJob ctx := context.Background() // Process all jobs in the experiment - for i := 0; i < numJobs; i++ { + for i := range numJobs { jobID := fmt.Sprintf("%s-job-%d", expID, i) // Update job status to running diff --git a/tests/benchmarks/native_queue_basic_test.go b/tests/benchmarks/native_queue_basic_test.go index 63946e1..6bd1280 100644 --- a/tests/benchmarks/native_queue_basic_test.go +++ b/tests/benchmarks/native_queue_basic_test.go @@ -12,7 +12,7 @@ func BenchmarkNativeQueueBasic(b *testing.B) { // Only run if native libs available if !queue.UseNativeQueue { - b.Skip("Native queue not enabled (set FETCHML_NATIVE_LIBS=1)") + b.Skip("Native queue not enabled (build with -tags native_libs)") } q, err := queue.NewNativeQueue(tmpDir) @@ -28,10 +28,9 @@ func BenchmarkNativeQueueBasic(b *testing.B) { Priority: 100, } - b.ResetTimer() b.ReportAllocs() - for i := 0; i < b.N; i++ { + for i := 0; b.Loop(); i++ { task.ID = "test-" + string(rune('0'+i%10)) if err := q.AddTask(task); err != nil { b.Fatal(err) diff --git a/tests/benchmarks/native_queue_bench_test.go b/tests/benchmarks/native_queue_bench_test.go index 6b7652d..a36a64e 100644 --- a/tests/benchmarks/native_queue_bench_test.go +++ b/tests/benchmarks/native_queue_bench_test.go @@ -11,7 +11,7 @@ import ( // Expected: 5x speedup, 99% allocation reduction func BenchmarkNativeQueueRebuildIndex(b *testing.B) { if !queue.UseNativeQueue { - b.Skip("Native queue not enabled (set FETCHML_NATIVE_LIBS=1 or build with -tags native_libs)") + b.Skip("Native queue not enabled (build with -tags native_libs)") } tmpDir := b.TempDir() @@ -52,7 +52,7 @@ func BenchmarkNativeQueueRebuildIndex(b *testing.B) { // BenchmarkNativeQueueClaimNext profiles task claiming from binary heap func BenchmarkNativeQueueClaimNext(b *testing.B) { if !queue.UseNativeQueue { - b.Skip("Native queue not enabled (set FETCHML_NATIVE_LIBS=1 or build with -tags native_libs)") + b.Skip("Native queue not enabled (build with -tags native_libs)") } tmpDir := b.TempDir() @@ -63,7 +63,7 @@ func BenchmarkNativeQueueClaimNext(b *testing.B) { defer q.Close() // Seed with tasks - for i := 0; i < 100; i++ { + for i := range 100 { task := &queue.Task{ ID: "task-" + string(rune('0'+i/10)) + string(rune('0'+i%10)), JobName: "job-" + string(rune('0'+i/10)), @@ -74,10 +74,9 @@ func BenchmarkNativeQueueClaimNext(b *testing.B) { } } - b.ResetTimer() b.ReportAllocs() - for i := 0; i < b.N; i++ { + for b.Loop() { // Binary heap pop - no JSON parsing _, _ = q.PeekNextTask() } @@ -86,7 +85,7 @@ func BenchmarkNativeQueueClaimNext(b *testing.B) { // BenchmarkNativeQueueGetAllTasks profiles full task scan from binary index func BenchmarkNativeQueueGetAllTasks(b *testing.B) { if !queue.UseNativeQueue { - b.Skip("Native queue not enabled (set FETCHML_NATIVE_LIBS=1 or build with -tags native_libs)") + b.Skip("Native queue not enabled (build with -tags native_libs)") } tmpDir := b.TempDir() @@ -97,7 +96,7 @@ func BenchmarkNativeQueueGetAllTasks(b *testing.B) { defer q.Close() // Seed with tasks - for i := 0; i < 100; i++ { + for i := range 100 { task := &queue.Task{ ID: "task-" + string(rune('0'+i/10)) + string(rune('0'+i%10)), JobName: "job-" + string(rune('0'+i/10)), @@ -108,10 +107,9 @@ func BenchmarkNativeQueueGetAllTasks(b *testing.B) { } } - b.ResetTimer() b.ReportAllocs() - for i := 0; i < b.N; i++ { + for b.Loop() { _, err := q.GetAllTasks() if err != nil { b.Fatal(err) diff --git a/tests/benchmarks/payload_performance_test.go b/tests/benchmarks/payload_performance_test.go index 42bb42d..76e29a4 100644 --- a/tests/benchmarks/payload_performance_test.go +++ b/tests/benchmarks/payload_performance_test.go @@ -8,32 +8,33 @@ import ( "testing" "time" + "github.com/alicebob/miniredis/v2" "github.com/jfraeys/fetch_ml/internal/metrics" "github.com/jfraeys/fetch_ml/internal/storage" fixtures "github.com/jfraeys/fetch_ml/tests/fixtures" "github.com/redis/go-redis/v9" ) -// setupPerformanceRedis creates a Redis client for performance testing +// setupPerformanceRedis creates a Redis client using miniredis for performance testing func setupPerformanceRedis(t *testing.T) *redis.Client { + // Start in-memory Redis server + s, err := miniredis.Run() + if err != nil { + t.Fatalf("failed to start miniredis: %v", err) + } + rdb := redis.NewClient(&redis.Options{ - Addr: "localhost:6379", - Password: "", - DB: 4, // Use DB 4 for performance tests to avoid conflicts + Addr: s.Addr(), }) ctx := context.Background() if err := rdb.Ping(ctx).Err(); err != nil { - t.Skipf("Redis not available, skipping performance test: %v", err) - return nil + t.Fatalf("miniredis ping failed: %v", err) } - // Clean up the test database - rdb.FlushDB(ctx) - t.Cleanup(func() { - rdb.FlushDB(ctx) - defer func() { _ = rdb.Close() }() + rdb.Close() + s.Close() }) return rdb @@ -74,7 +75,7 @@ func TestPayloadPerformanceSmall(t *testing.T) { start := time.Now() // Create jobs with small payloads - for i := 0; i < numJobs; i++ { + for i := range numJobs { jobID := fmt.Sprintf("small-payload-job-%d", i) // Create small payload @@ -112,7 +113,7 @@ func TestPayloadPerformanceSmall(t *testing.T) { // Process jobs start = time.Now() - for i := 0; i < numJobs; i++ { + for i := range numJobs { jobID := fmt.Sprintf("small-payload-job-%d", i) // Update job status @@ -143,9 +144,9 @@ func TestPayloadPerformanceSmall(t *testing.T) { avgTimePerJob := totalTime / time.Duration(numJobs) t.Logf("Performance Results:") - t.Logf(" Total time: %v", totalTime) - t.Logf(" Jobs per second: %.2f", jobsPerSecond) - t.Logf(" Average time per job: %v", avgTimePerJob) + t.Logf("\tTotal time: %v", totalTime) + t.Logf("\tJobs per second: %.2f", jobsPerSecond) + t.Logf("\tAverage time per job: %v", avgTimePerJob) // Verify performance thresholds if jobsPerSecond < 50 { // Should handle at least 50 jobs/second for small payloads @@ -195,7 +196,7 @@ func TestPayloadPerformanceLarge(t *testing.T) { start := time.Now() // Create jobs with large payloads - for i := 0; i < numJobs; i++ { + for i := range numJobs { jobID := fmt.Sprintf("large-payload-job-%d", i) // Create large payload @@ -233,7 +234,7 @@ func TestPayloadPerformanceLarge(t *testing.T) { // Process jobs start = time.Now() - for i := 0; i < numJobs; i++ { + for i := range numJobs { jobID := fmt.Sprintf("large-payload-job-%d", i) // Update job status @@ -358,7 +359,7 @@ func TestPayloadPerformanceConcurrent(t *testing.T) { // Create jobs concurrently done := make(chan bool, numWorkers) - for worker := 0; worker < numWorkers; worker++ { + for worker := range numWorkers { go func(w int) { defer func() { done <- true }() @@ -400,7 +401,7 @@ func TestPayloadPerformanceConcurrent(t *testing.T) { } // Wait for all workers to complete - for i := 0; i < numWorkers; i++ { + for range numWorkers { <-done } @@ -410,11 +411,11 @@ func TestPayloadPerformanceConcurrent(t *testing.T) { // Process jobs concurrently start = time.Now() - for worker := 0; worker < numWorkers; worker++ { + for worker := range numWorkers { go func(w int) { defer func() { done <- true }() - for i := 0; i < jobsPerWorker; i++ { + for i := range jobsPerWorker { jobID := fmt.Sprintf("concurrent-job-w%d-i%d", w, i) // Update job status @@ -442,7 +443,7 @@ func TestPayloadPerformanceConcurrent(t *testing.T) { } // Wait for all workers to complete - for i := 0; i < numWorkers; i++ { + for range numWorkers { <-done } @@ -456,10 +457,10 @@ func TestPayloadPerformanceConcurrent(t *testing.T) { concurrencyFactor := float64(totalJobs) / float64(creationTime.Seconds()) / 50 // Relative to baseline t.Logf("Concurrent Performance Results:") - t.Logf(" Total time: %v", totalTime) - t.Logf(" Jobs per second: %.2f", jobsPerSecond) - t.Logf(" Average time per job: %v", avgTimePerJob) - t.Logf(" Concurrency factor: %.2f", concurrencyFactor) + t.Logf("\tTotal time: %v", totalTime) + t.Logf("\tJobs per second: %.2f", jobsPerSecond) + t.Logf("\tAverage time per job: %v", avgTimePerJob) + t.Logf("\tConcurrency factor: %.2f", concurrencyFactor) // Verify concurrent performance benefits if jobsPerSecond < 100 { // Should handle at least 100 jobs/second with concurrency @@ -545,7 +546,7 @@ func TestPayloadMemoryUsage(t *testing.T) { runtime.ReadMemStats(&memBefore) // Create jobs with specific payload size - for i := 0; i < numJobs; i++ { + for i := range numJobs { jobID := fmt.Sprintf("memory-test-%d-%d", payloadSize, i) payload := make([]byte, payloadSize) @@ -584,8 +585,12 @@ func TestPayloadMemoryUsage(t *testing.T) { t.Errorf("Memory overhead too high for %d byte payloads: %.2fx (expected <= 10x)", payloadSize, payloadOverhead) } - // TODO: Clean up jobs for next iteration - // Note: In a real implementation, we'd need a way to delete jobs - // For now, we'll just continue as the test will cleanup automatically + // Clean up jobs for next iteration + for i := range numJobs { + jobID := fmt.Sprintf("memory-test-%d-%d", payloadSize, i) + if err := db.DeleteJob(jobID); err != nil { + t.Logf("Warning: failed to delete job %s: %v", jobID, err) + } + } } } diff --git a/tests/benchmarks/queue_bench_test.go b/tests/benchmarks/queue_bench_test.go index 640371c..7631046 100644 --- a/tests/benchmarks/queue_bench_test.go +++ b/tests/benchmarks/queue_bench_test.go @@ -24,7 +24,7 @@ func BenchmarkFilesystemQueueRebuildIndex(b *testing.B) { defer q.Close() // Seed with tasks - for i := 0; i < 100; i++ { + for i := range 100 { task := &queue.Task{ ID: "task-" + string(rune('0'+i/10)) + string(rune('0'+i%10)), JobName: "job-" + string(rune('0'+i/10)), @@ -39,7 +39,7 @@ func BenchmarkFilesystemQueueRebuildIndex(b *testing.B) { b.ReportAllocs() // Benchmark just the rebuild (not the full AddTask) - for i := 0; i < b.N; i++ { + for i := 0; b.Loop(); i++ { // Force rebuild by adding one more task task := &queue.Task{ ID: "bench-task-" + string(rune('0'+i%10)), @@ -62,7 +62,7 @@ func BenchmarkFilesystemQueueClaimNext(b *testing.B) { defer q.Close() // Seed with tasks - for i := 0; i < 100; i++ { + for i := range 100 { task := &queue.Task{ ID: "task-" + string(rune('0'+i/10)) + string(rune('0'+i%10)), JobName: "job-" + string(rune('0'+i/10)), @@ -76,7 +76,7 @@ func BenchmarkFilesystemQueueClaimNext(b *testing.B) { b.ResetTimer() b.ReportAllocs() - for i := 0; i < b.N; i++ { + for b.Loop() { // This triggers ReadDir + JSON unmarshal + sort _, _ = q.PeekNextTask() } @@ -92,7 +92,7 @@ func BenchmarkFilesystemQueueGetAllTasks(b *testing.B) { defer q.Close() // Seed with tasks - for i := 0; i < 100; i++ { + for i := range 100 { task := &queue.Task{ ID: "task-" + string(rune('0'+i/10)) + string(rune('0'+i%10)), JobName: "job-" + string(rune('0'+i/10)), @@ -106,7 +106,7 @@ func BenchmarkFilesystemQueueGetAllTasks(b *testing.B) { b.ResetTimer() b.ReportAllocs() - for i := 0; i < b.N; i++ { + for b.Loop() { _, err := q.GetAllTasks() if err != nil { b.Fatal(err) diff --git a/tests/benchmarks/response_packet_benchmark_test.go b/tests/benchmarks/response_packet_benchmark_test.go index f5200ca..cf52617 100644 --- a/tests/benchmarks/response_packet_benchmark_test.go +++ b/tests/benchmarks/response_packet_benchmark_test.go @@ -75,7 +75,7 @@ func BenchmarkResponsePacketSerialize(b *testing.B) { func benchmarkSerializePacket(b *testing.B, packet *api.ResponsePacket) { b.Helper() b.ReportAllocs() - for i := 0; i < b.N; i++ { + for b.Loop() { if _, err := packet.Serialize(); err != nil { b.Fatalf("serialize failed: %v", err) } @@ -85,7 +85,7 @@ func benchmarkSerializePacket(b *testing.B, packet *api.ResponsePacket) { func benchmarkLegacySerializePacket(b *testing.B, packet *api.ResponsePacket) { b.Helper() b.ReportAllocs() - for i := 0; i < b.N; i++ { + for b.Loop() { if _, err := legacySerializePacket(packet); err != nil { b.Fatalf("legacy serialize failed: %v", err) } diff --git a/tests/benchmarks/streaming_io_bench_test.go b/tests/benchmarks/streaming_io_bench_test.go index cc30359..5010b03 100644 --- a/tests/benchmarks/streaming_io_bench_test.go +++ b/tests/benchmarks/streaming_io_bench_test.go @@ -6,6 +6,7 @@ import ( "compress/gzip" "os" "path/filepath" + "strings" "testing" "github.com/jfraeys/fetch_ml/internal/worker" @@ -16,10 +17,9 @@ func BenchmarkExtractTarGzGo(b *testing.B) { tmpDir := b.TempDir() archivePath := createStreamingTestArchive(b, tmpDir, 100, 1024) // 100 files, 1KB each - b.ResetTimer() b.ReportAllocs() - for i := 0; i < b.N; i++ { + for i := 0; b.Loop(); i++ { dstDir := filepath.Join(tmpDir, "extract_go_"+string(rune('0'+i%10))) if err := os.MkdirAll(dstDir, 0750); err != nil { b.Fatal(err) @@ -36,15 +36,17 @@ func BenchmarkExtractTarGzNative(b *testing.B) { tmpDir := b.TempDir() archivePath := createStreamingTestArchive(b, tmpDir, 100, 1024) - b.ResetTimer() b.ReportAllocs() - for i := 0; i < b.N; i++ { + for i := 0; b.Loop(); i++ { dstDir := filepath.Join(tmpDir, "extract_native_"+string(rune('0'+i%10))) if err := os.MkdirAll(dstDir, 0750); err != nil { b.Fatal(err) } if err := worker.ExtractTarGzNative(archivePath, dstDir); err != nil { + if strings.Contains(err.Error(), "native tar.gz extractor requires") { + b.Skip("Native tar.gz extractor not available: ", err) + } b.Fatal(err) } } @@ -76,7 +78,7 @@ func BenchmarkExtractTarGzSizes(b *testing.B) { func benchmarkBoth(b *testing.B, archivePath, tmpDir string) { b.Run("Go", func(b *testing.B) { b.ReportAllocs() - for i := 0; i < b.N; i++ { + for i := 0; b.Loop(); i++ { dstDir := filepath.Join(tmpDir, "go_"+string(rune('0'+i%10))) if err := os.MkdirAll(dstDir, 0750); err != nil { b.Fatal(err) @@ -89,12 +91,15 @@ func benchmarkBoth(b *testing.B, archivePath, tmpDir string) { b.Run("Native", func(b *testing.B) { b.ReportAllocs() - for i := 0; i < b.N; i++ { + for i := 0; b.Loop(); i++ { dstDir := filepath.Join(tmpDir, "native_"+string(rune('0'+i%10))) if err := os.MkdirAll(dstDir, 0750); err != nil { b.Fatal(err) } if err := worker.ExtractTarGzNative(archivePath, dstDir); err != nil { + if strings.Contains(err.Error(), "native tar.gz extractor requires") { + b.Skip("Native tar.gz extractor not available: ", err) + } b.Fatal(err) } }