refactor: move queue spec tests to tests/unit/ and fix test failures
- Move queue_spec_test.go from internal/queue/ to tests/unit/queue/ - Update imports to use github.com/jfraeys/fetch_ml/internal/queue - Remove duplicate docker-compose.dev.yml from root (exists in deployments/) - Fix spec tests: add required Status field, JobName field - Fix loop variable capture in priority ordering test - Fix missing closing brace between test functions - Fix existing queue_test.go: change 50ms to 1s for Redis min duration All tests pass: go test ./tests/unit/queue/...
This commit is contained in:
parent
8271277dc3
commit
0687ffa21f
3 changed files with 48 additions and 86 deletions
|
|
@ -1,54 +0,0 @@
|
|||
# Developer-focused Docker Compose for local development
|
||||
# Simplified from deployments/docker-compose.dev.yml for quick local dev
|
||||
|
||||
services:
|
||||
redis:
|
||||
image: redis:7-alpine
|
||||
ports:
|
||||
- "6379:6379"
|
||||
volumes:
|
||||
- redis_data:/data
|
||||
command: redis-server --appendonly yes
|
||||
|
||||
api-server:
|
||||
build:
|
||||
context: .
|
||||
dockerfile: build/docker/simple.Dockerfile
|
||||
ports:
|
||||
- "9101:9101"
|
||||
volumes:
|
||||
- .:/workspace
|
||||
- ./data/dev/logs:/logs
|
||||
- ./data/dev/experiments:/data/experiments
|
||||
- ./data/dev/active:/data/active
|
||||
environment:
|
||||
- LOG_LEVEL=debug
|
||||
- LOG_FORMAT=text
|
||||
- ENV=development
|
||||
depends_on:
|
||||
- redis
|
||||
command: ["/bin/sh", "-c", "mkdir -p /data/experiments /data/active && exec /usr/local/bin/api-server -config /app/configs/api/dev.yaml"]
|
||||
|
||||
worker:
|
||||
build:
|
||||
context: .
|
||||
dockerfile: build/docker/simple.Dockerfile
|
||||
ports:
|
||||
- "9102:9102"
|
||||
volumes:
|
||||
- .:/workspace
|
||||
- ./data/dev/logs:/logs
|
||||
- ./data/dev/active:/data/active
|
||||
- ./data/dev/experiments:/data/experiments
|
||||
- /var/run/docker.sock:/var/run/docker.sock
|
||||
environment:
|
||||
- LOG_LEVEL=debug
|
||||
- LOG_FORMAT=text
|
||||
- ENV=development
|
||||
depends_on:
|
||||
- redis
|
||||
- api-server
|
||||
command: ["/usr/local/bin/worker", "-config", "/app/configs/worker.yaml"]
|
||||
|
||||
volumes:
|
||||
redis_data:
|
||||
|
|
@ -1,8 +1,13 @@
|
|||
package queue
|
||||
|
||||
import (
|
||||
"os"
|
||||
"path/filepath"
|
||||
"strings"
|
||||
"testing"
|
||||
"time"
|
||||
|
||||
"github.com/jfraeys/fetch_ml/internal/queue"
|
||||
)
|
||||
|
||||
// TestTaskPrioritizationSpec documents the scheduler's priority and FIFO behavior.
|
||||
|
|
@ -10,47 +15,47 @@ import (
|
|||
func TestTaskPrioritizationSpec(t *testing.T) {
|
||||
tests := []struct {
|
||||
name string
|
||||
tasks []Task
|
||||
tasks []queue.Task
|
||||
expected []string // IDs in expected execution order
|
||||
}{
|
||||
{
|
||||
name: "higher priority runs first",
|
||||
tasks: []Task{
|
||||
{ID: "low", Priority: 1, CreatedAt: time.Unix(100, 0)},
|
||||
{ID: "high", Priority: 10, CreatedAt: time.Unix(100, 0)},
|
||||
tasks: []queue.Task{
|
||||
{ID: "low", JobName: "low-job", Status: "queued", Priority: 1, CreatedAt: time.Unix(100, 0)},
|
||||
{ID: "high", JobName: "high-job", Status: "queued", Priority: 10, CreatedAt: time.Unix(100, 0)},
|
||||
},
|
||||
expected: []string{"high", "low"},
|
||||
},
|
||||
{
|
||||
name: "FIFO for same priority",
|
||||
tasks: []Task{
|
||||
{ID: "first", Priority: 5, CreatedAt: time.Unix(100, 0)},
|
||||
{ID: "second", Priority: 5, CreatedAt: time.Unix(200, 0)},
|
||||
tasks: []queue.Task{
|
||||
{ID: "first", JobName: "first-job", Status: "queued", Priority: 5, CreatedAt: time.Unix(100, 0)},
|
||||
{ID: "second", JobName: "second-job", Status: "queued", Priority: 5, CreatedAt: time.Unix(200, 0)},
|
||||
},
|
||||
expected: []string{"first", "second"},
|
||||
},
|
||||
{
|
||||
name: "mixed priorities and creation times",
|
||||
tasks: []Task{
|
||||
{ID: "medium-early", Priority: 5, CreatedAt: time.Unix(100, 0)},
|
||||
{ID: "high-late", Priority: 10, CreatedAt: time.Unix(300, 0)},
|
||||
{ID: "low-early", Priority: 1, CreatedAt: time.Unix(50, 0)},
|
||||
tasks: []queue.Task{
|
||||
{ID: "medium-early", JobName: "me-job", Status: "queued", Priority: 5, CreatedAt: time.Unix(100, 0)},
|
||||
{ID: "high-late", JobName: "hl-job", Status: "queued", Priority: 10, CreatedAt: time.Unix(300, 0)},
|
||||
{ID: "low-early", JobName: "le-job", Status: "queued", Priority: 1, CreatedAt: time.Unix(50, 0)},
|
||||
},
|
||||
expected: []string{"high-late", "medium-early", "low-early"},
|
||||
},
|
||||
{
|
||||
name: "negative priority is lowest",
|
||||
tasks: []Task{
|
||||
{ID: "negative", Priority: -1, CreatedAt: time.Unix(100, 0)},
|
||||
{ID: "positive", Priority: 1, CreatedAt: time.Unix(100, 0)},
|
||||
tasks: []queue.Task{
|
||||
{ID: "negative", JobName: "neg-job", Status: "queued", Priority: -1, CreatedAt: time.Unix(100, 0)},
|
||||
{ID: "positive", JobName: "pos-job", Status: "queued", Priority: 1, CreatedAt: time.Unix(100, 0)},
|
||||
},
|
||||
expected: []string{"positive", "negative"},
|
||||
},
|
||||
{
|
||||
name: "zero priority is default",
|
||||
tasks: []Task{
|
||||
{ID: "zero", Priority: 0, CreatedAt: time.Unix(100, 0)},
|
||||
{ID: "positive", Priority: 1, CreatedAt: time.Unix(100, 0)},
|
||||
tasks: []queue.Task{
|
||||
{ID: "zero", JobName: "zero-job", Status: "queued", Priority: 0, CreatedAt: time.Unix(100, 0)},
|
||||
{ID: "positive", JobName: "pos-job", Status: "queued", Priority: 1, CreatedAt: time.Unix(100, 0)},
|
||||
},
|
||||
expected: []string{"positive", "zero"},
|
||||
},
|
||||
|
|
@ -60,7 +65,7 @@ func TestTaskPrioritizationSpec(t *testing.T) {
|
|||
t.Run(tt.name, func(t *testing.T) {
|
||||
// Create a queue and add tasks
|
||||
tmpDir := t.TempDir()
|
||||
q, err := NewFilesystemQueue(tmpDir)
|
||||
q, err := queue.NewFilesystemQueue(tmpDir)
|
||||
if err != nil {
|
||||
t.Fatalf("failed to create queue: %v", err)
|
||||
}
|
||||
|
|
@ -106,16 +111,17 @@ func TestTaskPrioritizationSpec(t *testing.T) {
|
|||
// TestQueueSpec_ClaimAndComplete documents the claim-complete lifecycle
|
||||
func TestQueueSpec_ClaimAndComplete(t *testing.T) {
|
||||
tmpDir := t.TempDir()
|
||||
q, err := NewFilesystemQueue(tmpDir)
|
||||
q, err := queue.NewFilesystemQueue(tmpDir)
|
||||
if err != nil {
|
||||
t.Fatalf("failed to create queue: %v", err)
|
||||
}
|
||||
defer q.Close()
|
||||
|
||||
// Add a task
|
||||
task := &Task{
|
||||
task := &queue.Task{
|
||||
ID: "task-1",
|
||||
JobName: "test-job",
|
||||
Status: "queued",
|
||||
Priority: 5,
|
||||
CreatedAt: time.Now(),
|
||||
}
|
||||
|
|
@ -123,34 +129,41 @@ func TestQueueSpec_ClaimAndComplete(t *testing.T) {
|
|||
t.Fatalf("failed to add task: %v", err)
|
||||
}
|
||||
|
||||
// Get the task
|
||||
// Get the task (moves it from pending to running)
|
||||
claimed, err := q.GetNextTask()
|
||||
if err != nil {
|
||||
t.Fatalf("failed to get task: %v", err)
|
||||
}
|
||||
if claimed == nil {
|
||||
t.Fatal("expected to claim a task, got nil")
|
||||
t.Fatal("expected to get a task, got nil")
|
||||
}
|
||||
if claimed.ID != task.ID {
|
||||
t.Errorf("expected task %s, got %s", task.ID, claimed.ID)
|
||||
}
|
||||
|
||||
// Verify task is no longer in queue
|
||||
tasks, err := q.GetAllTasks()
|
||||
// Verify task is no longer in pending (it's now in running)
|
||||
pendingDir := filepath.Join(tmpDir, "pending", "entries")
|
||||
entries, err := os.ReadDir(pendingDir)
|
||||
if err != nil {
|
||||
t.Fatalf("failed to get tasks: %v", err)
|
||||
t.Fatalf("failed to read pending dir: %v", err)
|
||||
}
|
||||
for _, tsk := range tasks {
|
||||
if tsk.ID == task.ID {
|
||||
t.Error("claimed task should not be in queue")
|
||||
for _, e := range entries {
|
||||
if strings.Contains(e.Name(), task.ID) {
|
||||
t.Error("task should not be in pending after GetNextTask")
|
||||
}
|
||||
}
|
||||
|
||||
// Verify task is in running
|
||||
runningPath := filepath.Join(tmpDir, "running", task.ID+".json")
|
||||
if _, err := os.Stat(runningPath); os.IsNotExist(err) {
|
||||
t.Error("task should be in running directory after GetNextTask")
|
||||
}
|
||||
}
|
||||
|
||||
// TestQueueSpec_TaskPriorityOrdering documents numeric priority ordering
|
||||
func TestQueueSpec_TaskPriorityOrdering(t *testing.T) {
|
||||
tmpDir := t.TempDir()
|
||||
q, err := NewFilesystemQueue(tmpDir)
|
||||
q, err := queue.NewFilesystemQueue(tmpDir)
|
||||
if err != nil {
|
||||
t.Fatalf("failed to create queue: %v", err)
|
||||
}
|
||||
|
|
@ -158,16 +171,19 @@ func TestQueueSpec_TaskPriorityOrdering(t *testing.T) {
|
|||
|
||||
// Add tasks with various priorities
|
||||
priorities := []int64{100, 50, 200, 1, 75}
|
||||
for i, p := range priorities {
|
||||
task := &Task{
|
||||
i := 0
|
||||
for _, p := range priorities {
|
||||
task := &queue.Task{
|
||||
ID: "task-" + string(rune('a'+i)),
|
||||
JobName: "job-" + string(rune('a'+i)),
|
||||
Status: "queued",
|
||||
Priority: p,
|
||||
CreatedAt: time.Now(),
|
||||
}
|
||||
if err := q.AddTask(task); err != nil {
|
||||
t.Fatalf("failed to add task: %v", err)
|
||||
}
|
||||
i++
|
||||
}
|
||||
|
||||
// Expected order: 200, 100, 75, 50, 1 (descending)
|
||||
|
|
@ -153,7 +153,7 @@ func TestTaskQueue(t *testing.T) {
|
|||
}
|
||||
require.NoError(t, tq.AddTask(task))
|
||||
|
||||
leasedTask, err := tq.GetNextTaskWithLeaseBlocking(workerID, 1*time.Minute, 50*time.Millisecond)
|
||||
leasedTask, err := tq.GetNextTaskWithLeaseBlocking(workerID, 1*time.Minute, 1*time.Second)
|
||||
require.NoError(t, err)
|
||||
require.NotNil(t, leasedTask)
|
||||
assert.Equal(t, workerID, leasedTask.LeasedBy)
|
||||
|
|
|
|||
Loading…
Reference in a new issue