package queue import ( "path/filepath" "testing" "time" "github.com/stretchr/testify/require" "github.com/jfraeys/fetch_ml/internal/queue" ) func TestSQLiteQueue_PersistenceAcrossRestart(t *testing.T) { base := t.TempDir() dbPath := filepath.Join(base, "queue.db") q1, err := queue.NewSQLiteQueue(dbPath) require.NoError(t, err) task := &queue.Task{ ID: "task-1", JobName: "job-1", Status: "queued", Priority: 10, CreatedAt: time.Now().UTC(), } require.NoError(t, q1.AddTask(task)) require.NoError(t, q1.Close()) q2, err := queue.NewSQLiteQueue(dbPath) require.NoError(t, err) t.Cleanup(func() { _ = q2.Close() }) got, err := q2.PeekNextTask() require.NoError(t, err) require.NotNil(t, got) require.Equal(t, task.ID, got.ID) leased, err := q2.GetNextTaskWithLease("worker-1", 30*time.Second) require.NoError(t, err) require.NotNil(t, leased) require.Equal(t, task.ID, leased.ID) } func TestSQLiteQueue_LeaseAndRelease(t *testing.T) { base := t.TempDir() dbPath := filepath.Join(base, "queue.db") q, err := queue.NewSQLiteQueue(dbPath) require.NoError(t, err) t.Cleanup(func() { _ = q.Close() }) task := &queue.Task{ID: "task-1", JobName: "job-1", Status: "queued", Priority: 1, CreatedAt: time.Now().UTC()} require.NoError(t, q.AddTask(task)) leased, err := q.GetNextTaskWithLease("worker-1", 30*time.Second) require.NoError(t, err) require.NotNil(t, leased) require.Equal(t, "worker-1", leased.LeasedBy) require.NotNil(t, leased.LeaseExpiry) require.NoError(t, q.ReleaseLease(task.ID, "worker-1")) stored, err := q.GetTask(task.ID) require.NoError(t, err) require.Empty(t, stored.LeasedBy) require.Nil(t, stored.LeaseExpiry) } func TestSQLiteQueue_GetNextTaskWithLeaseBlocking(t *testing.T) { base := t.TempDir() dbPath := filepath.Join(base, "queue.db") q, err := queue.NewSQLiteQueue(dbPath) require.NoError(t, err) t.Cleanup(func() { _ = q.Close() }) start := time.Now() go func() { time.Sleep(100 * time.Millisecond) _ = q.AddTask(&queue.Task{ ID: "task-1", JobName: "job-1", Status: "queued", Priority: 1, CreatedAt: time.Now().UTC(), }) }() got, err := q.GetNextTaskWithLeaseBlocking("worker-1", 30*time.Second, 800*time.Millisecond) require.NoError(t, err) require.NotNil(t, got) require.Equal(t, "task-1", got.ID) require.GreaterOrEqual(t, time.Since(start), 100*time.Millisecond) } func TestSQLiteQueue_RetryTask_SchedulesNextRetryAndNotImmediatelyAvailable(t *testing.T) { base := t.TempDir() dbPath := filepath.Join(base, "queue.db") q, err := queue.NewSQLiteQueue(dbPath) require.NoError(t, err) t.Cleanup(func() { _ = q.Close() }) task := &queue.Task{ ID: "task-1", JobName: "job-1", Status: "running", Priority: 1, CreatedAt: time.Now().UTC(), MaxRetries: 3, RetryCount: 0, Error: "timeout", } require.NoError(t, q.AddTask(task)) require.NoError(t, q.RetryTask(task)) stored, err := q.GetTask(task.ID) require.NoError(t, err) require.Equal(t, "queued", stored.Status) require.Equal(t, 1, stored.RetryCount) require.NotNil(t, stored.NextRetry) require.True(t, stored.NextRetry.After(time.Now().UTC().Add(-1*time.Second))) peek, err := q.PeekNextTask() require.NoError(t, err) require.Nil(t, peek) } func TestSQLiteQueue_RetryTask_MaxRetriesMovesToDLQ(t *testing.T) { base := t.TempDir() dbPath := filepath.Join(base, "queue.db") q, err := queue.NewSQLiteQueue(dbPath) require.NoError(t, err) t.Cleanup(func() { _ = q.Close() }) task := &queue.Task{ ID: "task-1", JobName: "job-1", Status: "running", Priority: 1, CreatedAt: time.Now().UTC(), MaxRetries: 2, RetryCount: 2, LastError: "boom", Error: "boom", } require.NoError(t, q.AddTask(task)) require.NoError(t, q.RetryTask(task)) stored, err := q.GetTask(task.ID) require.NoError(t, err) require.Equal(t, "failed", stored.Status) require.Contains(t, stored.Error, "DLQ:") depth, err := q.QueueDepth() require.NoError(t, err) require.EqualValues(t, 0, depth) } func TestSQLiteQueue_PrewarmState_CRUD(t *testing.T) { base := t.TempDir() dbPath := filepath.Join(base, "queue.db") q, err := queue.NewSQLiteQueue(dbPath) require.NoError(t, err) t.Cleanup(func() { _ = q.Close() }) st := queue.PrewarmState{WorkerID: "worker-1", TaskID: "task-1", Phase: "env", DatasetCnt: 0} require.NoError(t, q.SetWorkerPrewarmState(st)) got, err := q.GetWorkerPrewarmState("worker-1") require.NoError(t, err) require.NotNil(t, got) require.Equal(t, "worker-1", got.WorkerID) all, err := q.GetAllWorkerPrewarmStates() require.NoError(t, err) require.NotEmpty(t, all) require.NoError(t, q.ClearWorkerPrewarmState("worker-1")) got2, err := q.GetWorkerPrewarmState("worker-1") require.NoError(t, err) require.Nil(t, got2) } func TestSQLiteQueue_ReclaimExpiredLeases_QueuesRetry(t *testing.T) { base := t.TempDir() dbPath := filepath.Join(base, "queue.db") q, err := queue.NewSQLiteQueue(dbPath) require.NoError(t, err) t.Cleanup(func() { _ = q.Close() }) expired := time.Now().UTC().Add(-1 * time.Second) task := &queue.Task{ ID: "task-1", JobName: "job-1", Status: "running", Priority: 1, CreatedAt: time.Now().UTC(), MaxRetries: 3, RetryCount: 0, LeasedBy: "worker-1", LeaseExpiry: &expired, } require.NoError(t, q.AddTask(task)) require.NoError(t, q.ReclaimExpiredLeases()) stored, err := q.GetTask(task.ID) require.NoError(t, err) require.Equal(t, "queued", stored.Status) require.Equal(t, 1, stored.RetryCount) require.Empty(t, stored.LeasedBy) require.Nil(t, stored.LeaseExpiry) require.NotNil(t, stored.NextRetry) require.NotEmpty(t, stored.LastError) } func TestSQLiteQueue_PrewarmGCSignal(t *testing.T) { base := t.TempDir() dbPath := filepath.Join(base, "queue.db") q, err := queue.NewSQLiteQueue(dbPath) require.NoError(t, err) t.Cleanup(func() { _ = q.Close() }) v0, err := q.PrewarmGCRequestValue() require.NoError(t, err) require.Equal(t, "", v0) require.NoError(t, q.SignalPrewarmGC()) v1, err := q.PrewarmGCRequestValue() require.NoError(t, err) require.NotEmpty(t, v1) time.Sleep(2 * time.Millisecond) require.NoError(t, q.SignalPrewarmGC()) v2, err := q.PrewarmGCRequestValue() require.NoError(t, err) require.NotEmpty(t, v2) require.NotEqual(t, v1, v2) }