From 0687ffa21f621fe7c29cdc4b9aaef76239ac7187 Mon Sep 17 00:00:00 2001 From: Jeremie Fraeys Date: Wed, 18 Feb 2026 15:45:30 -0500 Subject: [PATCH] refactor: move queue spec tests to tests/unit/ and fix test failures - Move queue_spec_test.go from internal/queue/ to tests/unit/queue/ - Update imports to use github.com/jfraeys/fetch_ml/internal/queue - Remove duplicate docker-compose.dev.yml from root (exists in deployments/) - Fix spec tests: add required Status field, JobName field - Fix loop variable capture in priority ordering test - Fix missing closing brace between test functions - Fix existing queue_test.go: change 50ms to 1s for Redis min duration All tests pass: go test ./tests/unit/queue/... --- docker-compose.dev.yml | 54 ------------- .../unit}/queue/queue_spec_test.go | 78 +++++++++++-------- tests/unit/queue/queue_test.go | 2 +- 3 files changed, 48 insertions(+), 86 deletions(-) delete mode 100644 docker-compose.dev.yml rename {internal => tests/unit}/queue/queue_spec_test.go (61%) diff --git a/docker-compose.dev.yml b/docker-compose.dev.yml deleted file mode 100644 index 39edd32..0000000 --- a/docker-compose.dev.yml +++ /dev/null @@ -1,54 +0,0 @@ -# Developer-focused Docker Compose for local development -# Simplified from deployments/docker-compose.dev.yml for quick local dev - -services: - redis: - image: redis:7-alpine - ports: - - "6379:6379" - volumes: - - redis_data:/data - command: redis-server --appendonly yes - - api-server: - build: - context: . - dockerfile: build/docker/simple.Dockerfile - ports: - - "9101:9101" - volumes: - - .:/workspace - - ./data/dev/logs:/logs - - ./data/dev/experiments:/data/experiments - - ./data/dev/active:/data/active - environment: - - LOG_LEVEL=debug - - LOG_FORMAT=text - - ENV=development - depends_on: - - redis - command: ["/bin/sh", "-c", "mkdir -p /data/experiments /data/active && exec /usr/local/bin/api-server -config /app/configs/api/dev.yaml"] - - worker: - build: - context: . - dockerfile: build/docker/simple.Dockerfile - ports: - - "9102:9102" - volumes: - - .:/workspace - - ./data/dev/logs:/logs - - ./data/dev/active:/data/active - - ./data/dev/experiments:/data/experiments - - /var/run/docker.sock:/var/run/docker.sock - environment: - - LOG_LEVEL=debug - - LOG_FORMAT=text - - ENV=development - depends_on: - - redis - - api-server - command: ["/usr/local/bin/worker", "-config", "/app/configs/worker.yaml"] - -volumes: - redis_data: diff --git a/internal/queue/queue_spec_test.go b/tests/unit/queue/queue_spec_test.go similarity index 61% rename from internal/queue/queue_spec_test.go rename to tests/unit/queue/queue_spec_test.go index 67067c5..7eb75ea 100644 --- a/internal/queue/queue_spec_test.go +++ b/tests/unit/queue/queue_spec_test.go @@ -1,8 +1,13 @@ package queue import ( + "os" + "path/filepath" + "strings" "testing" "time" + + "github.com/jfraeys/fetch_ml/internal/queue" ) // TestTaskPrioritizationSpec documents the scheduler's priority and FIFO behavior. @@ -10,47 +15,47 @@ import ( func TestTaskPrioritizationSpec(t *testing.T) { tests := []struct { name string - tasks []Task + tasks []queue.Task expected []string // IDs in expected execution order }{ { name: "higher priority runs first", - tasks: []Task{ - {ID: "low", Priority: 1, CreatedAt: time.Unix(100, 0)}, - {ID: "high", Priority: 10, CreatedAt: time.Unix(100, 0)}, + tasks: []queue.Task{ + {ID: "low", JobName: "low-job", Status: "queued", Priority: 1, CreatedAt: time.Unix(100, 0)}, + {ID: "high", JobName: "high-job", Status: "queued", Priority: 10, CreatedAt: time.Unix(100, 0)}, }, expected: []string{"high", "low"}, }, { name: "FIFO for same priority", - tasks: []Task{ - {ID: "first", Priority: 5, CreatedAt: time.Unix(100, 0)}, - {ID: "second", Priority: 5, CreatedAt: time.Unix(200, 0)}, + tasks: []queue.Task{ + {ID: "first", JobName: "first-job", Status: "queued", Priority: 5, CreatedAt: time.Unix(100, 0)}, + {ID: "second", JobName: "second-job", Status: "queued", Priority: 5, CreatedAt: time.Unix(200, 0)}, }, expected: []string{"first", "second"}, }, { name: "mixed priorities and creation times", - tasks: []Task{ - {ID: "medium-early", Priority: 5, CreatedAt: time.Unix(100, 0)}, - {ID: "high-late", Priority: 10, CreatedAt: time.Unix(300, 0)}, - {ID: "low-early", Priority: 1, CreatedAt: time.Unix(50, 0)}, + tasks: []queue.Task{ + {ID: "medium-early", JobName: "me-job", Status: "queued", Priority: 5, CreatedAt: time.Unix(100, 0)}, + {ID: "high-late", JobName: "hl-job", Status: "queued", Priority: 10, CreatedAt: time.Unix(300, 0)}, + {ID: "low-early", JobName: "le-job", Status: "queued", Priority: 1, CreatedAt: time.Unix(50, 0)}, }, expected: []string{"high-late", "medium-early", "low-early"}, }, { name: "negative priority is lowest", - tasks: []Task{ - {ID: "negative", Priority: -1, CreatedAt: time.Unix(100, 0)}, - {ID: "positive", Priority: 1, CreatedAt: time.Unix(100, 0)}, + tasks: []queue.Task{ + {ID: "negative", JobName: "neg-job", Status: "queued", Priority: -1, CreatedAt: time.Unix(100, 0)}, + {ID: "positive", JobName: "pos-job", Status: "queued", Priority: 1, CreatedAt: time.Unix(100, 0)}, }, expected: []string{"positive", "negative"}, }, { name: "zero priority is default", - tasks: []Task{ - {ID: "zero", Priority: 0, CreatedAt: time.Unix(100, 0)}, - {ID: "positive", Priority: 1, CreatedAt: time.Unix(100, 0)}, + tasks: []queue.Task{ + {ID: "zero", JobName: "zero-job", Status: "queued", Priority: 0, CreatedAt: time.Unix(100, 0)}, + {ID: "positive", JobName: "pos-job", Status: "queued", Priority: 1, CreatedAt: time.Unix(100, 0)}, }, expected: []string{"positive", "zero"}, }, @@ -60,7 +65,7 @@ func TestTaskPrioritizationSpec(t *testing.T) { t.Run(tt.name, func(t *testing.T) { // Create a queue and add tasks tmpDir := t.TempDir() - q, err := NewFilesystemQueue(tmpDir) + q, err := queue.NewFilesystemQueue(tmpDir) if err != nil { t.Fatalf("failed to create queue: %v", err) } @@ -106,16 +111,17 @@ func TestTaskPrioritizationSpec(t *testing.T) { // TestQueueSpec_ClaimAndComplete documents the claim-complete lifecycle func TestQueueSpec_ClaimAndComplete(t *testing.T) { tmpDir := t.TempDir() - q, err := NewFilesystemQueue(tmpDir) + q, err := queue.NewFilesystemQueue(tmpDir) if err != nil { t.Fatalf("failed to create queue: %v", err) } defer q.Close() // Add a task - task := &Task{ + task := &queue.Task{ ID: "task-1", JobName: "test-job", + Status: "queued", Priority: 5, CreatedAt: time.Now(), } @@ -123,34 +129,41 @@ func TestQueueSpec_ClaimAndComplete(t *testing.T) { t.Fatalf("failed to add task: %v", err) } - // Get the task + // Get the task (moves it from pending to running) claimed, err := q.GetNextTask() if err != nil { t.Fatalf("failed to get task: %v", err) } if claimed == nil { - t.Fatal("expected to claim a task, got nil") + t.Fatal("expected to get a task, got nil") } if claimed.ID != task.ID { t.Errorf("expected task %s, got %s", task.ID, claimed.ID) } - // Verify task is no longer in queue - tasks, err := q.GetAllTasks() + // Verify task is no longer in pending (it's now in running) + pendingDir := filepath.Join(tmpDir, "pending", "entries") + entries, err := os.ReadDir(pendingDir) if err != nil { - t.Fatalf("failed to get tasks: %v", err) + t.Fatalf("failed to read pending dir: %v", err) } - for _, tsk := range tasks { - if tsk.ID == task.ID { - t.Error("claimed task should not be in queue") + for _, e := range entries { + if strings.Contains(e.Name(), task.ID) { + t.Error("task should not be in pending after GetNextTask") } } + + // Verify task is in running + runningPath := filepath.Join(tmpDir, "running", task.ID+".json") + if _, err := os.Stat(runningPath); os.IsNotExist(err) { + t.Error("task should be in running directory after GetNextTask") + } } // TestQueueSpec_TaskPriorityOrdering documents numeric priority ordering func TestQueueSpec_TaskPriorityOrdering(t *testing.T) { tmpDir := t.TempDir() - q, err := NewFilesystemQueue(tmpDir) + q, err := queue.NewFilesystemQueue(tmpDir) if err != nil { t.Fatalf("failed to create queue: %v", err) } @@ -158,16 +171,19 @@ func TestQueueSpec_TaskPriorityOrdering(t *testing.T) { // Add tasks with various priorities priorities := []int64{100, 50, 200, 1, 75} - for i, p := range priorities { - task := &Task{ + i := 0 + for _, p := range priorities { + task := &queue.Task{ ID: "task-" + string(rune('a'+i)), JobName: "job-" + string(rune('a'+i)), + Status: "queued", Priority: p, CreatedAt: time.Now(), } if err := q.AddTask(task); err != nil { t.Fatalf("failed to add task: %v", err) } + i++ } // Expected order: 200, 100, 75, 50, 1 (descending) diff --git a/tests/unit/queue/queue_test.go b/tests/unit/queue/queue_test.go index c0c3998..9027d9c 100644 --- a/tests/unit/queue/queue_test.go +++ b/tests/unit/queue/queue_test.go @@ -153,7 +153,7 @@ func TestTaskQueue(t *testing.T) { } require.NoError(t, tq.AddTask(task)) - leasedTask, err := tq.GetNextTaskWithLeaseBlocking(workerID, 1*time.Minute, 50*time.Millisecond) + leasedTask, err := tq.GetNextTaskWithLeaseBlocking(workerID, 1*time.Minute, 1*time.Second) require.NoError(t, err) require.NotNil(t, leasedTask) assert.Equal(t, workerID, leasedTask.LeasedBy)