Move unit tests from tests/unit/ to internal/ following Go conventions: - tests/unit/queue/* -> internal/queue/* (dedup, filesystem_fallback, queue_permissions, queue_spec, queue, sqlite_queue tests) - tests/unit/gpu/* -> internal/resources/* (gpu_detector, gpu_golden tests) - tests/unit/resources/* -> internal/resources/* (manager_test.go) Update import paths in test files to reflect new locations. Note: GPU tests consolidated into resources package since GPU detection is part of resource management. Manager tests show significant new test coverage (166 lines).
254 lines
6.6 KiB
Go
254 lines
6.6 KiB
Go
package queue_test
|
|
|
|
import (
|
|
"testing"
|
|
"time"
|
|
|
|
"github.com/alicebob/miniredis/v2"
|
|
"github.com/stretchr/testify/assert"
|
|
"github.com/stretchr/testify/require"
|
|
|
|
"github.com/jfraeys/fetch_ml/internal/queue"
|
|
)
|
|
|
|
const workerID = "worker-1"
|
|
|
|
func TestTaskQueue(t *testing.T) {
|
|
t.Parallel()
|
|
|
|
// Start miniredis
|
|
s, err := miniredis.Run()
|
|
if err != nil {
|
|
t.Fatalf("failed to start miniredis: %v", err)
|
|
}
|
|
t.Cleanup(s.Close)
|
|
|
|
// Create TaskQueue
|
|
cfg := queue.Config{
|
|
RedisAddr: s.Addr(),
|
|
MetricsFlushInterval: 10 * time.Millisecond, // Fast flush for testing
|
|
}
|
|
tq, err := queue.NewTaskQueue(cfg)
|
|
assert.NoError(t, err)
|
|
t.Cleanup(func() {
|
|
if err := tq.Close(); err != nil {
|
|
t.Logf("Warning: failed to close task queue: %v", err)
|
|
}
|
|
})
|
|
|
|
t.Run("AddTask", func(t *testing.T) {
|
|
t.Helper()
|
|
// Use non-parallel subtest because of shared miniredis instance
|
|
task := &queue.Task{
|
|
ID: "task-1",
|
|
JobName: "job-1",
|
|
Status: "queued",
|
|
Priority: 10,
|
|
CreatedAt: time.Now(),
|
|
}
|
|
err = tq.AddTask(task)
|
|
assert.NoError(t, err)
|
|
|
|
// Verify task is in Redis (ZSET)
|
|
score, err := s.ZScore(queue.TaskQueueKey, "task-1")
|
|
assert.NoError(t, err)
|
|
assert.Equal(t, float64(10), score)
|
|
})
|
|
|
|
t.Run("GetNextTask", func(t *testing.T) {
|
|
t.Helper()
|
|
// Add another task
|
|
task := &queue.Task{
|
|
ID: "task-2",
|
|
JobName: "job-2",
|
|
Status: "queued",
|
|
Priority: 20, // Higher priority
|
|
CreatedAt: time.Now(),
|
|
}
|
|
err = tq.AddTask(task)
|
|
assert.NoError(t, err)
|
|
|
|
// Should get task-2 first due to higher priority
|
|
nextTask, err := tq.GetNextTask()
|
|
assert.NoError(t, err)
|
|
assert.NotNil(t, nextTask)
|
|
assert.Equal(t, "task-2", nextTask.ID)
|
|
|
|
// At this point task-2 has been popped; we rely on TaskQueue implementation
|
|
// to maintain Redis state and don't assert internal Redis structures here.
|
|
})
|
|
|
|
t.Run("PeekNextTask", func(t *testing.T) {
|
|
t.Helper()
|
|
// With task-1 still queued (from AddTask), PeekNextTask should return it.
|
|
peeked, err := tq.PeekNextTask()
|
|
require.NoError(t, err)
|
|
require.NotNil(t, peeked)
|
|
assert.Equal(t, "task-1", peeked.ID)
|
|
|
|
// Ensure peeking does not remove the task.
|
|
next, err := tq.GetNextTask()
|
|
require.NoError(t, err)
|
|
require.NotNil(t, next)
|
|
assert.Equal(t, "task-1", next.ID)
|
|
|
|
// Now the queue may be empty; PeekNextTask should return nil.
|
|
emptyPeek, err := tq.PeekNextTask()
|
|
require.NoError(t, err)
|
|
assert.Nil(t, emptyPeek)
|
|
})
|
|
|
|
t.Run("GetNextTaskWithLease", func(t *testing.T) {
|
|
t.Helper()
|
|
task := &queue.Task{
|
|
ID: "task-lease",
|
|
JobName: "job-lease",
|
|
Status: "queued",
|
|
Priority: 15,
|
|
CreatedAt: time.Now(),
|
|
}
|
|
require.NoError(t, tq.AddTask(task))
|
|
|
|
leaseDuration := 1 * time.Minute
|
|
|
|
leasedTask, err := tq.GetNextTaskWithLease(workerID, leaseDuration)
|
|
require.NoError(t, err)
|
|
require.NotNil(t, leasedTask)
|
|
assert.Equal(t, "task-lease", leasedTask.ID)
|
|
assert.Equal(t, workerID, leasedTask.LeasedBy)
|
|
assert.NotNil(t, leasedTask.LeaseExpiry)
|
|
assert.True(t, leasedTask.LeaseExpiry.After(time.Now()))
|
|
})
|
|
|
|
t.Run("RenewLease", func(t *testing.T) {
|
|
t.Helper()
|
|
// Reuse task-lease from previous subtest
|
|
const taskID = "task-lease"
|
|
|
|
// Get initial expiry
|
|
task, err := tq.GetTask(taskID)
|
|
require.NoError(t, err)
|
|
initialExpiry := task.LeaseExpiry
|
|
|
|
// Wait a bit
|
|
time.Sleep(10 * time.Millisecond)
|
|
|
|
// Renew lease
|
|
require.NoError(t, tq.RenewLease(taskID, workerID, 1*time.Minute))
|
|
|
|
// Verify expiry updated
|
|
task, err = tq.GetTask(taskID)
|
|
require.NoError(t, err)
|
|
assert.True(t, task.LeaseExpiry.After(*initialExpiry))
|
|
})
|
|
|
|
t.Run("GetNextTaskWithLeaseBlocking", func(t *testing.T) {
|
|
t.Helper()
|
|
task := &queue.Task{
|
|
ID: "task-lease-blocking",
|
|
JobName: "job-lease-blocking",
|
|
Status: "queued",
|
|
Priority: 5,
|
|
CreatedAt: time.Now(),
|
|
}
|
|
require.NoError(t, tq.AddTask(task))
|
|
|
|
leasedTask, err := tq.GetNextTaskWithLeaseBlocking(workerID, 1*time.Minute, 1*time.Second)
|
|
require.NoError(t, err)
|
|
require.NotNil(t, leasedTask)
|
|
assert.Equal(t, workerID, leasedTask.LeasedBy)
|
|
assert.NotNil(t, leasedTask.LeaseExpiry)
|
|
})
|
|
|
|
t.Run("ReleaseLease", func(t *testing.T) {
|
|
t.Helper()
|
|
const taskID = "task-lease"
|
|
|
|
require.NoError(t, tq.ReleaseLease(taskID, workerID))
|
|
|
|
task, err := tq.GetTask(taskID)
|
|
require.NoError(t, err)
|
|
assert.Nil(t, task.LeaseExpiry)
|
|
assert.Empty(t, task.LeasedBy)
|
|
})
|
|
|
|
t.Run("RetryTaskAndDLQ", func(t *testing.T) {
|
|
t.Helper()
|
|
// RetryTask path
|
|
retryTask := &queue.Task{
|
|
ID: "task-retry",
|
|
JobName: "job-retry",
|
|
Status: "failed",
|
|
Priority: 10,
|
|
CreatedAt: time.Now(),
|
|
MaxRetries: 3,
|
|
RetryCount: 0,
|
|
Error: "some transient error",
|
|
}
|
|
require.NoError(t, tq.AddTask(retryTask))
|
|
|
|
retryTask.Error = "connection timeout"
|
|
require.NoError(t, tq.RetryTask(retryTask))
|
|
|
|
updatedTask, err := tq.GetTask(retryTask.ID)
|
|
require.NoError(t, err)
|
|
assert.Equal(t, 1, updatedTask.RetryCount)
|
|
assert.Equal(t, "queued", updatedTask.Status)
|
|
assert.Empty(t, updatedTask.Error)
|
|
assert.Equal(t, "connection timeout", updatedTask.LastError)
|
|
assert.NotNil(t, updatedTask.NextRetry)
|
|
|
|
// DLQ path
|
|
dlqTask := &queue.Task{
|
|
ID: "task-dlq",
|
|
JobName: "job-dlq",
|
|
Status: "failed",
|
|
Priority: 10,
|
|
CreatedAt: time.Now(),
|
|
MaxRetries: 1,
|
|
RetryCount: 1,
|
|
Error: "fatal error",
|
|
}
|
|
require.NoError(t, tq.AddTask(dlqTask))
|
|
|
|
require.NoError(t, tq.RetryTask(dlqTask))
|
|
|
|
// We don't reach into internal Redis structures here; DLQ behavior is
|
|
// verified indirectly via the presence of the DLQ key below.
|
|
})
|
|
|
|
t.Run("PrewarmState", func(t *testing.T) {
|
|
t.Helper()
|
|
|
|
state := queue.PrewarmState{
|
|
WorkerID: workerID,
|
|
TaskID: "task-prewarm",
|
|
StartedAt: time.Now().UTC().Format(time.RFC3339Nano),
|
|
UpdatedAt: time.Now().UTC().Format(time.RFC3339Nano),
|
|
Phase: "datasets",
|
|
DatasetCnt: 2,
|
|
EnvHit: 1,
|
|
EnvMiss: 2,
|
|
EnvBuilt: 3,
|
|
EnvTimeNs: 4,
|
|
}
|
|
require.NoError(t, tq.SetWorkerPrewarmState(state))
|
|
|
|
got, err := tq.GetWorkerPrewarmState(workerID)
|
|
require.NoError(t, err)
|
|
require.NotNil(t, got)
|
|
assert.Equal(t, state.WorkerID, got.WorkerID)
|
|
assert.Equal(t, state.TaskID, got.TaskID)
|
|
assert.Equal(t, state.Phase, got.Phase)
|
|
assert.Equal(t, state.DatasetCnt, got.DatasetCnt)
|
|
assert.Equal(t, state.EnvHit, got.EnvHit)
|
|
assert.Equal(t, state.EnvMiss, got.EnvMiss)
|
|
assert.Equal(t, state.EnvBuilt, got.EnvBuilt)
|
|
assert.Equal(t, state.EnvTimeNs, got.EnvTimeNs)
|
|
|
|
require.NoError(t, tq.ClearWorkerPrewarmState(workerID))
|
|
got, err = tq.GetWorkerPrewarmState(workerID)
|
|
require.NoError(t, err)
|
|
assert.Nil(t, got)
|
|
})
|
|
}
|