package worker_test import ( "context" "os" "path/filepath" "testing" "time" "github.com/alicebob/miniredis/v2" "github.com/jfraeys/fetch_ml/internal/queue" "github.com/jfraeys/fetch_ml/internal/worker" tests "github.com/jfraeys/fetch_ml/tests/fixtures" ) func TestPrewarmNextOnce_Snapshot_WritesPrewarmDir(t *testing.T) { s, err := miniredis.Run() if err != nil { t.Fatalf("miniredis: %v", err) } t.Cleanup(s.Close) base := t.TempDir() dataDir := filepath.Join(base, "data") // Create a snapshot directory and compute its overall SHA. srcSnapshot := filepath.Join(base, "snapshot-src") if err := os.MkdirAll(srcSnapshot, 0750); err != nil { t.Fatalf("mkdir src snapshot: %v", err) } if err := os.WriteFile(filepath.Join(srcSnapshot, "file.txt"), []byte("ok"), 0600); err != nil { t.Fatalf("write snapshot file: %v", err) } sha, err := worker.DirOverallSHA256Hex(srcSnapshot) if err != nil { t.Fatalf("DirOverallSHA256Hex: %v", err) } cacheDir := filepath.Join(dataDir, "snapshots", "sha256", sha) if err := os.MkdirAll(filepath.Dir(cacheDir), 0750); err != nil { t.Fatalf("mkdir cache parent: %v", err) } if err := os.Rename(srcSnapshot, cacheDir); err != nil { t.Fatalf("rename into cache: %v", err) } // Queue has one task; prewarm should stage it into base/.prewarm/snapshots/. tq, err := queue.NewTaskQueue(queue.Config{RedisAddr: s.Addr(), MetricsFlushInterval: 5 * time.Millisecond}) if err != nil { t.Fatalf("NewTaskQueue: %v", err) } t.Cleanup(func() { _ = tq.Close() }) task := &queue.Task{ ID: "task-1", JobName: "job-1", Status: "queued", Priority: 10, CreatedAt: time.Now().UTC(), SnapshotID: "snap-1", Metadata: map[string]string{ "snapshot_sha256": "sha256:" + sha, }, } if err := tq.AddTask(task); err != nil { t.Fatalf("AddTask: %v", err) } cfg := &worker.Config{ WorkerID: "worker-1", BasePath: base, DataDir: dataDir, PrewarmEnabled: true, AutoFetchData: false, LocalMode: true, PollInterval: 1, MaxWorkers: 1, DatasetCacheTTL: 30 * time.Minute, } w := tests.NewTestWorkerWithQueue(cfg, tq) ok, err := w.PrewarmNextOnce(context.Background()) if err != nil { t.Fatalf("PrewarmNextOnce: %v", err) } if !ok { t.Fatalf("expected ok=true") } prewarmed := filepath.Join(base, ".prewarm", "snapshots", task.ID, "file.txt") if _, err := os.Stat(prewarmed); err != nil { t.Fatalf("expected prewarmed file to exist: %v", err) } } func TestPrewarmNextOnce_Disabled_NoOp(t *testing.T) { s, err := miniredis.Run() if err != nil { t.Fatalf("miniredis: %v", err) } t.Cleanup(s.Close) base := t.TempDir() dataDir := filepath.Join(base, "data") tq, err := queue.NewTaskQueue(queue.Config{RedisAddr: s.Addr(), MetricsFlushInterval: 5 * time.Millisecond}) if err != nil { t.Fatalf("NewTaskQueue: %v", err) } t.Cleanup(func() { _ = tq.Close() }) task := &queue.Task{ID: "task-1", JobName: "job-1", Status: "queued", Priority: 10, CreatedAt: time.Now().UTC()} if err := tq.AddTask(task); err != nil { t.Fatalf("AddTask: %v", err) } cfg := &worker.Config{WorkerID: "worker-1", BasePath: base, DataDir: dataDir, PrewarmEnabled: false} w := tests.NewTestWorkerWithQueue(cfg, tq) ok, err := w.PrewarmNextOnce(context.Background()) if err != nil { t.Fatalf("PrewarmNextOnce: %v", err) } if ok { t.Fatalf("expected ok=false") } if _, err := os.Stat(filepath.Join(base, ".prewarm")); err == nil { t.Fatalf("expected no .prewarm dir when disabled") } } func TestPrewarmNextOnce_QueueEmpty_DoesNotDeleteState(t *testing.T) { s, err := miniredis.Run() if err != nil { t.Fatalf("miniredis: %v", err) } t.Cleanup(s.Close) base := t.TempDir() dataDir := filepath.Join(base, "data") // Create a snapshot directory and compute its overall SHA. srcSnapshot := filepath.Join(base, "snapshot-src") if err := os.MkdirAll(srcSnapshot, 0750); err != nil { t.Fatalf("mkdir src snapshot: %v", err) } if err := os.WriteFile(filepath.Join(srcSnapshot, "file.txt"), []byte("ok"), 0600); err != nil { t.Fatalf("write snapshot file: %v", err) } sha, err := worker.DirOverallSHA256Hex(srcSnapshot) if err != nil { t.Fatalf("DirOverallSHA256Hex: %v", err) } cacheDir := filepath.Join(dataDir, "snapshots", "sha256", sha) if err := os.MkdirAll(filepath.Dir(cacheDir), 0750); err != nil { t.Fatalf("mkdir cache parent: %v", err) } if err := os.Rename(srcSnapshot, cacheDir); err != nil { t.Fatalf("rename into cache: %v", err) } tq, err := queue.NewTaskQueue(queue.Config{RedisAddr: s.Addr(), MetricsFlushInterval: 5 * time.Millisecond}) if err != nil { t.Fatalf("NewTaskQueue: %v", err) } t.Cleanup(func() { _ = tq.Close() }) task := &queue.Task{ ID: "task-1", JobName: "job-1", Status: "queued", Priority: 10, CreatedAt: time.Now().UTC(), SnapshotID: "snap-1", Metadata: map[string]string{ "snapshot_sha256": "sha256:" + sha, }, } if err := tq.AddTask(task); err != nil { t.Fatalf("AddTask: %v", err) } cfg := &worker.Config{ WorkerID: "worker-1", BasePath: base, DataDir: dataDir, PrewarmEnabled: true, AutoFetchData: false, LocalMode: true, PollInterval: 1, MaxWorkers: 1, DatasetCacheTTL: 30 * time.Minute, } w := tests.NewTestWorkerWithQueue(cfg, tq) ok, err := w.PrewarmNextOnce(context.Background()) if err != nil { t.Fatalf("PrewarmNextOnce: %v", err) } if !ok { t.Fatalf("expected ok=true") } // Empty the queue and run again. This should not delete the prewarm state; it should // simply cancel its internal state and let the Redis TTL expire naturally. _, _ = tq.GetNextTask() // drain the only queued task _, _ = w.PrewarmNextOnce(context.Background()) state, err := tq.GetWorkerPrewarmState(cfg.WorkerID) if err != nil { t.Fatalf("GetWorkerPrewarmState: %v", err) } if state == nil { t.Fatalf("expected prewarm state to remain present when queue is empty") } } func TestStageSnapshotFromPath_UsesPrewarm(t *testing.T) { base := t.TempDir() taskID := "task-1" jobDir := filepath.Join(base, "pending", "job-1", "run") if err := os.MkdirAll(jobDir, 0750); err != nil { t.Fatalf("mkdir jobDir: %v", err) } // Create a prewarmed snapshot directory. prewarmSrc := filepath.Join(base, ".prewarm", "snapshots", taskID) if err := os.MkdirAll(prewarmSrc, 0750); err != nil { t.Fatalf("mkdir prewarm parent: %v", err) } if err := os.WriteFile(filepath.Join(prewarmSrc, "file.txt"), []byte("prewarmed"), 0600); err != nil { t.Fatalf("write prewarm file: %v", err) } // Call stageSnapshotFromPath with a dummy srcPath; it should prefer the prewarmed dir. dummySrc := filepath.Join(base, "unused") if err := worker.StageSnapshotFromPath(base, taskID, dummySrc, jobDir); err != nil { t.Fatalf("StageSnapshotFromPath: %v", err) } // Verify the prewarmed content was renamed into the job snapshot dir. dstFile := filepath.Join(jobDir, "snapshot", "file.txt") if _, err := os.Stat(dstFile); err != nil { t.Fatalf("expected prewarmed file in job snapshot dir: %v", err) } if _, err := os.Stat(prewarmSrc); err == nil { t.Fatalf("expected prewarm src to be moved (rename) not copied") } // Verify the content is correct. got, err := os.ReadFile(dstFile) if err != nil { t.Fatalf("read dst file: %v", err) } if string(got) != "prewarmed" { t.Fatalf("expected content 'prewarmed', got %q", string(got)) } } func TestStageSnapshotFromPath_FallsBackToCopy_WhenNoPrewarm(t *testing.T) { base := t.TempDir() taskID := "task-1" jobDir := filepath.Join(base, "pending", "job-1", "run") if err := os.MkdirAll(jobDir, 0750); err != nil { t.Fatalf("mkdir jobDir: %v", err) } // Create a source snapshot dir (simulating ResolveSnapshot result). src := filepath.Join(base, "src") if err := os.MkdirAll(src, 0750); err != nil { t.Fatalf("mkdir src: %v", err) } if err := os.WriteFile(filepath.Join(src, "file.txt"), []byte("source"), 0600); err != nil { t.Fatalf("write src file: %v", err) } // No prewarm dir exists; should copy from src. if err := worker.StageSnapshotFromPath(base, taskID, src, jobDir); err != nil { t.Fatalf("StageSnapshotFromPath: %v", err) } dstFile := filepath.Join(jobDir, "snapshot", "file.txt") if _, err := os.Stat(dstFile); err != nil { t.Fatalf("expected file in job snapshot dir: %v", err) } got, err := os.ReadFile(dstFile) if err != nil { t.Fatalf("read dst file: %v", err) } if string(got) != "source" { t.Fatalf("expected content 'source', got %q", string(got)) } // Verify src is still present (copy, not move). if _, err := os.Stat(filepath.Join(src, "file.txt")); err != nil { t.Fatalf("expected src file to remain after copy") } }