package queue import ( "testing" "time" "github.com/alicebob/miniredis/v2" "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" "github.com/jfraeys/fetch_ml/internal/queue" ) const workerID = "worker-1" func TestTaskQueue(t *testing.T) { t.Parallel() // Start miniredis s, err := miniredis.Run() if err != nil { t.Fatalf("failed to start miniredis: %v", err) } t.Cleanup(s.Close) // Create TaskQueue cfg := queue.Config{ RedisAddr: s.Addr(), MetricsFlushInterval: 10 * time.Millisecond, // Fast flush for testing } tq, err := queue.NewTaskQueue(cfg) assert.NoError(t, err) t.Cleanup(func() { if err := tq.Close(); err != nil { t.Logf("Warning: failed to close task queue: %v", err) } }) t.Run("AddTask", func(t *testing.T) { t.Helper() // Use non-parallel subtest because of shared miniredis instance task := &queue.Task{ ID: "task-1", JobName: "job-1", Status: "queued", Priority: 10, CreatedAt: time.Now(), } err = tq.AddTask(task) assert.NoError(t, err) // Verify task is in Redis (ZSET) score, err := s.ZScore(queue.TaskQueueKey, "task-1") assert.NoError(t, err) assert.Equal(t, float64(10), score) }) t.Run("GetNextTask", func(t *testing.T) { t.Helper() // Add another task task := &queue.Task{ ID: "task-2", JobName: "job-2", Status: "queued", Priority: 20, // Higher priority CreatedAt: time.Now(), } err = tq.AddTask(task) assert.NoError(t, err) // Should get task-2 first due to higher priority nextTask, err := tq.GetNextTask() assert.NoError(t, err) assert.NotNil(t, nextTask) assert.Equal(t, "task-2", nextTask.ID) // At this point task-2 has been popped; we rely on TaskQueue implementation // to maintain Redis state and don't assert internal Redis structures here. }) t.Run("PeekNextTask", func(t *testing.T) { t.Helper() // With task-1 still queued (from AddTask), PeekNextTask should return it. peeked, err := tq.PeekNextTask() require.NoError(t, err) require.NotNil(t, peeked) assert.Equal(t, "task-1", peeked.ID) // Ensure peeking does not remove the task. next, err := tq.GetNextTask() require.NoError(t, err) require.NotNil(t, next) assert.Equal(t, "task-1", next.ID) // Now the queue may be empty; PeekNextTask should return nil. emptyPeek, err := tq.PeekNextTask() require.NoError(t, err) assert.Nil(t, emptyPeek) }) t.Run("GetNextTaskWithLease", func(t *testing.T) { t.Helper() task := &queue.Task{ ID: "task-lease", JobName: "job-lease", Status: "queued", Priority: 15, CreatedAt: time.Now(), } require.NoError(t, tq.AddTask(task)) leaseDuration := 1 * time.Minute leasedTask, err := tq.GetNextTaskWithLease(workerID, leaseDuration) require.NoError(t, err) require.NotNil(t, leasedTask) assert.Equal(t, "task-lease", leasedTask.ID) assert.Equal(t, workerID, leasedTask.LeasedBy) assert.NotNil(t, leasedTask.LeaseExpiry) assert.True(t, leasedTask.LeaseExpiry.After(time.Now())) }) t.Run("RenewLease", func(t *testing.T) { t.Helper() // Reuse task-lease from previous subtest const taskID = "task-lease" // Get initial expiry task, err := tq.GetTask(taskID) require.NoError(t, err) initialExpiry := task.LeaseExpiry // Wait a bit time.Sleep(10 * time.Millisecond) // Renew lease require.NoError(t, tq.RenewLease(taskID, workerID, 1*time.Minute)) // Verify expiry updated task, err = tq.GetTask(taskID) require.NoError(t, err) assert.True(t, task.LeaseExpiry.After(*initialExpiry)) }) t.Run("GetNextTaskWithLeaseBlocking", func(t *testing.T) { t.Helper() task := &queue.Task{ ID: "task-lease-blocking", JobName: "job-lease-blocking", Status: "queued", Priority: 5, CreatedAt: time.Now(), } require.NoError(t, tq.AddTask(task)) leasedTask, err := tq.GetNextTaskWithLeaseBlocking(workerID, 1*time.Minute, 1*time.Second) require.NoError(t, err) require.NotNil(t, leasedTask) assert.Equal(t, workerID, leasedTask.LeasedBy) assert.NotNil(t, leasedTask.LeaseExpiry) }) t.Run("ReleaseLease", func(t *testing.T) { t.Helper() const taskID = "task-lease" require.NoError(t, tq.ReleaseLease(taskID, workerID)) task, err := tq.GetTask(taskID) require.NoError(t, err) assert.Nil(t, task.LeaseExpiry) assert.Empty(t, task.LeasedBy) }) t.Run("RetryTaskAndDLQ", func(t *testing.T) { t.Helper() // RetryTask path retryTask := &queue.Task{ ID: "task-retry", JobName: "job-retry", Status: "failed", Priority: 10, CreatedAt: time.Now(), MaxRetries: 3, RetryCount: 0, Error: "some transient error", } require.NoError(t, tq.AddTask(retryTask)) retryTask.Error = "connection timeout" require.NoError(t, tq.RetryTask(retryTask)) updatedTask, err := tq.GetTask(retryTask.ID) require.NoError(t, err) assert.Equal(t, 1, updatedTask.RetryCount) assert.Equal(t, "queued", updatedTask.Status) assert.Empty(t, updatedTask.Error) assert.Equal(t, "connection timeout", updatedTask.LastError) assert.NotNil(t, updatedTask.NextRetry) // DLQ path dlqTask := &queue.Task{ ID: "task-dlq", JobName: "job-dlq", Status: "failed", Priority: 10, CreatedAt: time.Now(), MaxRetries: 1, RetryCount: 1, Error: "fatal error", } require.NoError(t, tq.AddTask(dlqTask)) require.NoError(t, tq.RetryTask(dlqTask)) // We don't reach into internal Redis structures here; DLQ behavior is // verified indirectly via the presence of the DLQ key below. }) t.Run("PrewarmState", func(t *testing.T) { t.Helper() state := queue.PrewarmState{ WorkerID: workerID, TaskID: "task-prewarm", StartedAt: time.Now().UTC().Format(time.RFC3339Nano), UpdatedAt: time.Now().UTC().Format(time.RFC3339Nano), Phase: "datasets", DatasetCnt: 2, EnvHit: 1, EnvMiss: 2, EnvBuilt: 3, EnvTimeNs: 4, } require.NoError(t, tq.SetWorkerPrewarmState(state)) got, err := tq.GetWorkerPrewarmState(workerID) require.NoError(t, err) require.NotNil(t, got) assert.Equal(t, state.WorkerID, got.WorkerID) assert.Equal(t, state.TaskID, got.TaskID) assert.Equal(t, state.Phase, got.Phase) assert.Equal(t, state.DatasetCnt, got.DatasetCnt) assert.Equal(t, state.EnvHit, got.EnvHit) assert.Equal(t, state.EnvMiss, got.EnvMiss) assert.Equal(t, state.EnvBuilt, got.EnvBuilt) assert.Equal(t, state.EnvTimeNs, got.EnvTimeNs) require.NoError(t, tq.ClearWorkerPrewarmState(workerID)) got, err = tq.GetWorkerPrewarmState(workerID) require.NoError(t, err) assert.Nil(t, got) }) }