254 lines
6.6 KiB
Go
254 lines
6.6 KiB
Go
package queue
|
|
|
|
import (
|
|
"testing"
|
|
"time"
|
|
|
|
"github.com/alicebob/miniredis/v2"
|
|
"github.com/stretchr/testify/assert"
|
|
"github.com/stretchr/testify/require"
|
|
|
|
"github.com/jfraeys/fetch_ml/internal/queue"
|
|
)
|
|
|
|
const workerID = "worker-1"
|
|
|
|
func TestTaskQueue(t *testing.T) {
|
|
t.Parallel()
|
|
|
|
// Start miniredis
|
|
s, err := miniredis.Run()
|
|
if err != nil {
|
|
t.Fatalf("failed to start miniredis: %v", err)
|
|
}
|
|
t.Cleanup(s.Close)
|
|
|
|
// Create TaskQueue
|
|
cfg := queue.Config{
|
|
RedisAddr: s.Addr(),
|
|
MetricsFlushInterval: 10 * time.Millisecond, // Fast flush for testing
|
|
}
|
|
tq, err := queue.NewTaskQueue(cfg)
|
|
assert.NoError(t, err)
|
|
t.Cleanup(func() {
|
|
if err := tq.Close(); err != nil {
|
|
t.Logf("Warning: failed to close task queue: %v", err)
|
|
}
|
|
})
|
|
|
|
t.Run("AddTask", func(t *testing.T) {
|
|
t.Helper()
|
|
// Use non-parallel subtest because of shared miniredis instance
|
|
task := &queue.Task{
|
|
ID: "task-1",
|
|
JobName: "job-1",
|
|
Status: "queued",
|
|
Priority: 10,
|
|
CreatedAt: time.Now(),
|
|
}
|
|
err = tq.AddTask(task)
|
|
assert.NoError(t, err)
|
|
|
|
// Verify task is in Redis (ZSET)
|
|
score, err := s.ZScore(queue.TaskQueueKey, "task-1")
|
|
assert.NoError(t, err)
|
|
assert.Equal(t, float64(10), score)
|
|
})
|
|
|
|
t.Run("GetNextTask", func(t *testing.T) {
|
|
t.Helper()
|
|
// Add another task
|
|
task := &queue.Task{
|
|
ID: "task-2",
|
|
JobName: "job-2",
|
|
Status: "queued",
|
|
Priority: 20, // Higher priority
|
|
CreatedAt: time.Now(),
|
|
}
|
|
err = tq.AddTask(task)
|
|
assert.NoError(t, err)
|
|
|
|
// Should get task-2 first due to higher priority
|
|
nextTask, err := tq.GetNextTask()
|
|
assert.NoError(t, err)
|
|
assert.NotNil(t, nextTask)
|
|
assert.Equal(t, "task-2", nextTask.ID)
|
|
|
|
// At this point task-2 has been popped; we rely on TaskQueue implementation
|
|
// to maintain Redis state and don't assert internal Redis structures here.
|
|
})
|
|
|
|
t.Run("PeekNextTask", func(t *testing.T) {
|
|
t.Helper()
|
|
// With task-1 still queued (from AddTask), PeekNextTask should return it.
|
|
peeked, err := tq.PeekNextTask()
|
|
require.NoError(t, err)
|
|
require.NotNil(t, peeked)
|
|
assert.Equal(t, "task-1", peeked.ID)
|
|
|
|
// Ensure peeking does not remove the task.
|
|
next, err := tq.GetNextTask()
|
|
require.NoError(t, err)
|
|
require.NotNil(t, next)
|
|
assert.Equal(t, "task-1", next.ID)
|
|
|
|
// Now the queue may be empty; PeekNextTask should return nil.
|
|
emptyPeek, err := tq.PeekNextTask()
|
|
require.NoError(t, err)
|
|
assert.Nil(t, emptyPeek)
|
|
})
|
|
|
|
t.Run("GetNextTaskWithLease", func(t *testing.T) {
|
|
t.Helper()
|
|
task := &queue.Task{
|
|
ID: "task-lease",
|
|
JobName: "job-lease",
|
|
Status: "queued",
|
|
Priority: 15,
|
|
CreatedAt: time.Now(),
|
|
}
|
|
require.NoError(t, tq.AddTask(task))
|
|
|
|
leaseDuration := 1 * time.Minute
|
|
|
|
leasedTask, err := tq.GetNextTaskWithLease(workerID, leaseDuration)
|
|
require.NoError(t, err)
|
|
require.NotNil(t, leasedTask)
|
|
assert.Equal(t, "task-lease", leasedTask.ID)
|
|
assert.Equal(t, workerID, leasedTask.LeasedBy)
|
|
assert.NotNil(t, leasedTask.LeaseExpiry)
|
|
assert.True(t, leasedTask.LeaseExpiry.After(time.Now()))
|
|
})
|
|
|
|
t.Run("RenewLease", func(t *testing.T) {
|
|
t.Helper()
|
|
// Reuse task-lease from previous subtest
|
|
const taskID = "task-lease"
|
|
|
|
// Get initial expiry
|
|
task, err := tq.GetTask(taskID)
|
|
require.NoError(t, err)
|
|
initialExpiry := task.LeaseExpiry
|
|
|
|
// Wait a bit
|
|
time.Sleep(10 * time.Millisecond)
|
|
|
|
// Renew lease
|
|
require.NoError(t, tq.RenewLease(taskID, workerID, 1*time.Minute))
|
|
|
|
// Verify expiry updated
|
|
task, err = tq.GetTask(taskID)
|
|
require.NoError(t, err)
|
|
assert.True(t, task.LeaseExpiry.After(*initialExpiry))
|
|
})
|
|
|
|
t.Run("GetNextTaskWithLeaseBlocking", func(t *testing.T) {
|
|
t.Helper()
|
|
task := &queue.Task{
|
|
ID: "task-lease-blocking",
|
|
JobName: "job-lease-blocking",
|
|
Status: "queued",
|
|
Priority: 5,
|
|
CreatedAt: time.Now(),
|
|
}
|
|
require.NoError(t, tq.AddTask(task))
|
|
|
|
leasedTask, err := tq.GetNextTaskWithLeaseBlocking(workerID, 1*time.Minute, 50*time.Millisecond)
|
|
require.NoError(t, err)
|
|
require.NotNil(t, leasedTask)
|
|
assert.Equal(t, workerID, leasedTask.LeasedBy)
|
|
assert.NotNil(t, leasedTask.LeaseExpiry)
|
|
})
|
|
|
|
t.Run("ReleaseLease", func(t *testing.T) {
|
|
t.Helper()
|
|
const taskID = "task-lease"
|
|
|
|
require.NoError(t, tq.ReleaseLease(taskID, workerID))
|
|
|
|
task, err := tq.GetTask(taskID)
|
|
require.NoError(t, err)
|
|
assert.Nil(t, task.LeaseExpiry)
|
|
assert.Empty(t, task.LeasedBy)
|
|
})
|
|
|
|
t.Run("RetryTaskAndDLQ", func(t *testing.T) {
|
|
t.Helper()
|
|
// RetryTask path
|
|
retryTask := &queue.Task{
|
|
ID: "task-retry",
|
|
JobName: "job-retry",
|
|
Status: "failed",
|
|
Priority: 10,
|
|
CreatedAt: time.Now(),
|
|
MaxRetries: 3,
|
|
RetryCount: 0,
|
|
Error: "some transient error",
|
|
}
|
|
require.NoError(t, tq.AddTask(retryTask))
|
|
|
|
retryTask.Error = "connection timeout"
|
|
require.NoError(t, tq.RetryTask(retryTask))
|
|
|
|
updatedTask, err := tq.GetTask(retryTask.ID)
|
|
require.NoError(t, err)
|
|
assert.Equal(t, 1, updatedTask.RetryCount)
|
|
assert.Equal(t, "queued", updatedTask.Status)
|
|
assert.Empty(t, updatedTask.Error)
|
|
assert.Equal(t, "connection timeout", updatedTask.LastError)
|
|
assert.NotNil(t, updatedTask.NextRetry)
|
|
|
|
// DLQ path
|
|
dlqTask := &queue.Task{
|
|
ID: "task-dlq",
|
|
JobName: "job-dlq",
|
|
Status: "failed",
|
|
Priority: 10,
|
|
CreatedAt: time.Now(),
|
|
MaxRetries: 1,
|
|
RetryCount: 1,
|
|
Error: "fatal error",
|
|
}
|
|
require.NoError(t, tq.AddTask(dlqTask))
|
|
|
|
require.NoError(t, tq.RetryTask(dlqTask))
|
|
|
|
// We don't reach into internal Redis structures here; DLQ behavior is
|
|
// verified indirectly via the presence of the DLQ key below.
|
|
})
|
|
|
|
t.Run("PrewarmState", func(t *testing.T) {
|
|
t.Helper()
|
|
|
|
state := queue.PrewarmState{
|
|
WorkerID: workerID,
|
|
TaskID: "task-prewarm",
|
|
StartedAt: time.Now().UTC().Format(time.RFC3339Nano),
|
|
UpdatedAt: time.Now().UTC().Format(time.RFC3339Nano),
|
|
Phase: "datasets",
|
|
DatasetCnt: 2,
|
|
EnvHit: 1,
|
|
EnvMiss: 2,
|
|
EnvBuilt: 3,
|
|
EnvTimeNs: 4,
|
|
}
|
|
require.NoError(t, tq.SetWorkerPrewarmState(state))
|
|
|
|
got, err := tq.GetWorkerPrewarmState(workerID)
|
|
require.NoError(t, err)
|
|
require.NotNil(t, got)
|
|
assert.Equal(t, state.WorkerID, got.WorkerID)
|
|
assert.Equal(t, state.TaskID, got.TaskID)
|
|
assert.Equal(t, state.Phase, got.Phase)
|
|
assert.Equal(t, state.DatasetCnt, got.DatasetCnt)
|
|
assert.Equal(t, state.EnvHit, got.EnvHit)
|
|
assert.Equal(t, state.EnvMiss, got.EnvMiss)
|
|
assert.Equal(t, state.EnvBuilt, got.EnvBuilt)
|
|
assert.Equal(t, state.EnvTimeNs, got.EnvTimeNs)
|
|
|
|
require.NoError(t, tq.ClearWorkerPrewarmState(workerID))
|
|
got, err = tq.GetWorkerPrewarmState(workerID)
|
|
require.NoError(t, err)
|
|
assert.Nil(t, got)
|
|
})
|
|
}
|