fetch_ml/internal/queue/filesystem_queue.go
Jeremie Fraeys 6580917ba8
refactor: extract domain types and consolidate error system (Phases 1-2)
Phase 1: Extract Domain Types
=============================
- Create internal/domain/ package with canonical types:
  - domain/task.go: Task, Attempt structs
  - domain/tracking.go: TrackingConfig and MLflow/TensorBoard/Wandb configs
  - domain/dataset.go: DatasetSpec
  - domain/status.go: JobStatus constants
  - domain/errors.go: FailureClass system with classification functions
  - domain/doc.go: package documentation

- Update queue/task.go to re-export domain types (backward compatibility)
- Update TUI model/state.go to use domain types via type aliases
- Simplify TUI services: remove ~60 lines of conversion functions

Phase 2: Delete ErrorCategory System
====================================
- Remove deprecated ErrorCategory type and constants
- Remove TaskError struct and related functions
- Remove mapping functions: ClassifyError, IsRetryable, GetUserMessage, RetryDelay
- Update all queue implementations to use domain.FailureClass directly:
  - queue/metrics.go: RecordTaskFailure/Retry now take FailureClass
  - queue/queue.go: RetryTask uses domain.ClassifyFailure
  - queue/filesystem_queue.go: RetryTask and MoveToDeadLetterQueue updated
  - queue/sqlite_queue.go: RetryTask and MoveToDeadLetterQueue updated

Lines eliminated: ~190 lines of conversion and mapping code
Result: Single source of truth for domain types and error classification
2026-02-17 12:34:28 -05:00

575 lines
14 KiB
Go

package queue
import (
"context"
"encoding/json"
"errors"
"fmt"
"os"
"path/filepath"
"sort"
"strings"
"time"
"github.com/jfraeys/fetch_ml/internal/domain"
)
type FilesystemQueue struct {
root string
ctx context.Context
cancel context.CancelFunc
}
type filesystemQueueIndex struct {
Version int `json:"version"`
UpdatedAt string `json:"updated_at"`
Tasks []filesystemQueueIndexTask `json:"tasks"`
}
type filesystemQueueIndexTask struct {
ID string `json:"id"`
Priority int64 `json:"priority"`
CreatedAt string `json:"created_at"`
}
func NewFilesystemQueue(root string) (*FilesystemQueue, error) {
root = strings.TrimSpace(root)
if root == "" {
return nil, fmt.Errorf("filesystem queue root is required")
}
root = filepath.Clean(root)
if err := os.MkdirAll(filepath.Join(root, "pending", "entries"), 0750); err != nil {
return nil, err
}
for _, d := range []string{"running", "finished", "failed"} {
if err := os.MkdirAll(filepath.Join(root, d), 0750); err != nil {
return nil, err
}
}
ctx, cancel := context.WithCancel(context.Background())
q := &FilesystemQueue{root: root, ctx: ctx, cancel: cancel}
_ = q.rebuildIndex()
return q, nil
}
func (q *FilesystemQueue) Close() error {
q.cancel()
return nil
}
func (q *FilesystemQueue) AddTask(task *Task) error {
if task == nil {
return fmt.Errorf("task is nil")
}
if strings.TrimSpace(task.ID) == "" {
return fmt.Errorf("task id is required")
}
if strings.TrimSpace(task.JobName) == "" {
return fmt.Errorf("job name is required")
}
if task.MaxRetries == 0 {
task.MaxRetries = defaultMaxRetries
}
if task.CreatedAt.IsZero() {
task.CreatedAt = time.Now().UTC()
}
if strings.TrimSpace(task.Status) == "" {
task.Status = "queued"
}
if task.Status != "queued" {
// For filesystem backend we only enqueue queued tasks.
// Other status updates should go through UpdateTask.
task.Status = "queued"
}
payload, err := json.Marshal(task)
if err != nil {
return err
}
path := q.pendingEntryPath(task.ID)
if err := writeFileAtomic(path, payload, 0640); err != nil {
return err
}
TasksQueued.Inc()
if depth, derr := q.QueueDepth(); derr == nil {
UpdateQueueDepth(depth)
}
_ = q.rebuildIndex()
return nil
}
func (q *FilesystemQueue) GetNextTask() (*Task, error) {
return q.claimNext("", 0, false)
}
func (q *FilesystemQueue) PeekNextTask() (*Task, error) {
return q.claimNext("", 0, true)
}
func (q *FilesystemQueue) GetNextTaskWithLease(workerID string, leaseDuration time.Duration) (*Task, error) {
return q.claimNext(workerID, leaseDuration, false)
}
func (q *FilesystemQueue) GetNextTaskWithLeaseBlocking(
workerID string,
leaseDuration, blockTimeout time.Duration,
) (*Task, error) {
if blockTimeout <= 0 {
blockTimeout = defaultBlockTimeout
}
deadline := time.Now().Add(blockTimeout)
for {
t, err := q.claimNext(workerID, leaseDuration, false)
if err != nil {
return nil, err
}
if t != nil {
return t, nil
}
if time.Now().After(deadline) {
return nil, nil
}
select {
case <-q.ctx.Done():
return nil, q.ctx.Err()
case <-time.After(50 * time.Millisecond):
}
}
}
func (q *FilesystemQueue) RenewLease(taskID string, workerID string, leaseDuration time.Duration) error {
// Single-worker friendly best-effort: update task lease fields if present.
t, err := q.GetTask(taskID)
if err != nil {
return err
}
if t.LeasedBy != "" && workerID != "" && t.LeasedBy != workerID {
return fmt.Errorf("task leased by different worker: %s", t.LeasedBy)
}
if leaseDuration == 0 {
leaseDuration = defaultLeaseDuration
}
exp := time.Now().UTC().Add(leaseDuration)
t.LeaseExpiry = &exp
if workerID != "" {
t.LeasedBy = workerID
}
RecordLeaseRenewal(workerID)
return q.UpdateTask(t)
}
func (q *FilesystemQueue) ReleaseLease(taskID string, workerID string) error {
t, err := q.GetTask(taskID)
if err != nil {
return err
}
if t.LeasedBy != "" && workerID != "" && t.LeasedBy != workerID {
return fmt.Errorf("task leased by different worker: %s", t.LeasedBy)
}
t.LeaseExpiry = nil
t.LeasedBy = ""
return q.UpdateTask(t)
}
func (q *FilesystemQueue) RetryTask(task *Task) error {
if task.RetryCount >= task.MaxRetries {
RecordDLQAddition("max_retries")
return q.MoveToDeadLetterQueue(task, "max retries exceeded")
}
failureClass := domain.FailureUnknown
if task.Error != "" {
failureClass = domain.ClassifyFailure(0, nil, task.Error)
}
if !domain.ShouldAutoRetry(failureClass, task.RetryCount) {
RecordDLQAddition(string(failureClass))
return q.MoveToDeadLetterQueue(task, fmt.Sprintf("non-retryable error: %s", failureClass))
}
task.RetryCount++
task.Status = "queued"
task.LastError = task.Error
task.Error = ""
backoffSeconds := domain.RetryDelayForClass(failureClass, task.RetryCount)
nextRetry := time.Now().UTC().Add(time.Duration(backoffSeconds) * time.Second)
task.NextRetry = &nextRetry
task.LeaseExpiry = nil
task.LeasedBy = ""
RecordTaskRetry(task.JobName, failureClass)
return q.AddTask(task)
}
func (q *FilesystemQueue) MoveToDeadLetterQueue(task *Task, reason string) error {
if task == nil {
return fmt.Errorf("task is nil")
}
task.Status = "failed"
task.Error = fmt.Sprintf("DLQ: %s. Last error: %s", reason, task.LastError)
failureClass := domain.ClassifyFailure(0, nil, task.LastError)
RecordTaskFailure(task.JobName, failureClass)
return q.UpdateTask(task)
}
func (q *FilesystemQueue) GetTask(taskID string) (*Task, error) {
path, err := q.findTaskPath(taskID)
if err != nil {
return nil, err
}
data, err := os.ReadFile(path)
if err != nil {
return nil, err
}
var t Task
if err := json.Unmarshal(data, &t); err != nil {
return nil, err
}
return &t, nil
}
func (q *FilesystemQueue) GetAllTasks() ([]*Task, error) {
paths := make([]string, 0, 32)
for _, p := range []string{
filepath.Join(q.root, "pending", "entries"),
filepath.Join(q.root, "running"),
filepath.Join(q.root, "finished"),
filepath.Join(q.root, "failed"),
} {
entries, err := os.ReadDir(p)
if err != nil {
continue
}
for _, e := range entries {
if e.IsDir() {
continue
}
if !strings.HasSuffix(e.Name(), ".json") {
continue
}
paths = append(paths, filepath.Join(p, e.Name()))
}
}
out := make([]*Task, 0, len(paths))
for _, path := range paths {
data, err := os.ReadFile(path)
if err != nil {
continue
}
var t Task
if err := json.Unmarshal(data, &t); err != nil {
continue
}
out = append(out, &t)
}
return out, nil
}
func (q *FilesystemQueue) GetTaskByName(jobName string) (*Task, error) {
jobName = strings.TrimSpace(jobName)
if jobName == "" {
return nil, fmt.Errorf("job name is required")
}
tasks, err := q.GetAllTasks()
if err != nil {
return nil, err
}
var best *Task
for _, t := range tasks {
if t == nil || t.JobName != jobName {
continue
}
if best == nil || t.CreatedAt.After(best.CreatedAt) {
best = t
}
}
if best == nil {
return nil, os.ErrNotExist
}
return best, nil
}
func (q *FilesystemQueue) CancelTask(taskID string) error {
t, err := q.GetTask(taskID)
if err != nil {
return err
}
t.Status = "cancelled"
now := time.Now().UTC()
t.EndedAt = &now
return q.UpdateTask(t)
}
func (q *FilesystemQueue) UpdateTask(task *Task) error {
if task == nil {
return fmt.Errorf("task is nil")
}
if strings.TrimSpace(task.ID) == "" {
return fmt.Errorf("task id is required")
}
if strings.TrimSpace(task.Status) == "" {
return fmt.Errorf("task status is required")
}
payload, err := json.Marshal(task)
if err != nil {
return err
}
dst := q.pathForStatus(task.Status, task.ID)
if dst == "" {
// For statuses we don't map yet, keep it in running.
dst = filepath.Join(q.root, "running", task.ID+".json")
}
if err := os.MkdirAll(filepath.Dir(dst), 0750); err != nil {
return err
}
// Best-effort: remove any other copies before writing.
_ = q.removeTaskFromAllDirs(task.ID)
if err := writeFileAtomic(dst, payload, 0640); err != nil {
return err
}
if depth, derr := q.QueueDepth(); derr == nil {
UpdateQueueDepth(depth)
}
_ = q.rebuildIndex()
return nil
}
func (q *FilesystemQueue) UpdateTaskWithMetrics(task *Task, _ string) error {
return q.UpdateTask(task)
}
func (q *FilesystemQueue) RecordMetric(_, _ string, _ float64) error {
return nil
}
func (q *FilesystemQueue) Heartbeat(_ string) error {
return nil
}
func (q *FilesystemQueue) QueueDepth() (int64, error) {
entries, err := os.ReadDir(filepath.Join(q.root, "pending", "entries"))
if err != nil {
return 0, err
}
var n int64
for _, e := range entries {
if e.IsDir() {
continue
}
if strings.HasSuffix(e.Name(), ".json") {
n++
}
}
return n, nil
}
func (q *FilesystemQueue) SetWorkerPrewarmState(_ PrewarmState) error { return nil }
func (q *FilesystemQueue) ClearWorkerPrewarmState(_ string) error { return nil }
func (q *FilesystemQueue) GetWorkerPrewarmState(_ string) (*PrewarmState, error) {
return nil, nil
}
func (q *FilesystemQueue) GetAllWorkerPrewarmStates() ([]PrewarmState, error) {
return nil, nil
}
func (q *FilesystemQueue) SignalPrewarmGC() error { return nil }
func (q *FilesystemQueue) PrewarmGCRequestValue() (string, error) {
return "", nil
}
func (q *FilesystemQueue) claimNext(workerID string, leaseDuration time.Duration, peek bool) (*Task, error) {
pendingDir := filepath.Join(q.root, "pending", "entries")
entries, err := os.ReadDir(pendingDir)
if err != nil {
return nil, err
}
candidates := make([]*Task, 0, len(entries))
paths := make(map[string]string, len(entries))
for _, e := range entries {
if e.IsDir() {
continue
}
if !strings.HasSuffix(e.Name(), ".json") {
continue
}
path := filepath.Join(pendingDir, e.Name())
data, err := os.ReadFile(path)
if err != nil {
continue
}
var t Task
if err := json.Unmarshal(data, &t); err != nil {
continue
}
if t.NextRetry != nil && time.Now().UTC().Before(t.NextRetry.UTC()) {
continue
}
candidates = append(candidates, &t)
paths[t.ID] = path
}
if len(candidates) == 0 {
return nil, nil
}
sort.Slice(candidates, func(i, j int) bool {
if candidates[i].Priority != candidates[j].Priority {
return candidates[i].Priority > candidates[j].Priority
}
return candidates[i].CreatedAt.Before(candidates[j].CreatedAt)
})
chosen := candidates[0]
if peek {
return chosen, nil
}
src := paths[chosen.ID]
if src == "" {
return nil, nil
}
dst := filepath.Join(q.root, "running", chosen.ID+".json")
if err := os.Rename(src, dst); err != nil {
// Another process might have claimed it.
if errors.Is(err, os.ErrNotExist) {
return nil, nil
}
return nil, err
}
// Refresh from the moved file to avoid race on content.
data, err := os.ReadFile(dst)
if err != nil {
return nil, err
}
var t Task
if err := json.Unmarshal(data, &t); err != nil {
return nil, err
}
now := time.Now().UTC()
if leaseDuration == 0 {
leaseDuration = defaultLeaseDuration
}
exp := now.Add(leaseDuration)
t.LeaseExpiry = &exp
if strings.TrimSpace(workerID) != "" {
t.LeasedBy = workerID
}
// Note: status transitions are handled by worker UpdateTask calls.
payload, err := json.Marshal(&t)
if err == nil {
_ = writeFileAtomic(dst, payload, 0640)
}
if depth, derr := q.QueueDepth(); derr == nil {
UpdateQueueDepth(depth)
}
_ = q.rebuildIndex()
return &t, nil
}
func (q *FilesystemQueue) pendingEntryPath(taskID string) string {
return filepath.Join(q.root, "pending", "entries", taskID+".json")
}
func (q *FilesystemQueue) pathForStatus(status, taskID string) string {
switch status {
case "queued":
return q.pendingEntryPath(taskID)
case "running":
return filepath.Join(q.root, "running", taskID+".json")
case "completed", "finished":
return filepath.Join(q.root, "finished", taskID+".json")
case "failed", "cancelled":
return filepath.Join(q.root, "failed", taskID+".json")
default:
return ""
}
}
func (q *FilesystemQueue) findTaskPath(taskID string) (string, error) {
paths := []string{
q.pendingEntryPath(taskID),
filepath.Join(q.root, "running", taskID+".json"),
filepath.Join(q.root, "finished", taskID+".json"),
filepath.Join(q.root, "failed", taskID+".json"),
}
for _, p := range paths {
if _, err := os.Stat(p); err == nil {
return p, nil
}
}
return "", os.ErrNotExist
}
func (q *FilesystemQueue) removeTaskFromAllDirs(taskID string) error {
paths := []string{
q.pendingEntryPath(taskID),
filepath.Join(q.root, "running", taskID+".json"),
filepath.Join(q.root, "finished", taskID+".json"),
filepath.Join(q.root, "failed", taskID+".json"),
}
var outErr error
for _, p := range paths {
if err := os.Remove(p); err != nil && !errors.Is(err, os.ErrNotExist) {
outErr = err
}
}
return outErr
}
func (q *FilesystemQueue) rebuildIndex() error {
pendingDir := filepath.Join(q.root, "pending", "entries")
entries, err := os.ReadDir(pendingDir)
if err != nil {
return err
}
idx := filesystemQueueIndex{Version: 1, UpdatedAt: time.Now().UTC().Format(time.RFC3339)}
for _, e := range entries {
if e.IsDir() || !strings.HasSuffix(e.Name(), ".json") {
continue
}
data, err := os.ReadFile(filepath.Join(pendingDir, e.Name()))
if err != nil {
continue
}
var t Task
if err := json.Unmarshal(data, &t); err != nil {
continue
}
idx.Tasks = append(idx.Tasks, filesystemQueueIndexTask{ID: t.ID, Priority: t.Priority, CreatedAt: t.CreatedAt.UTC().Format(time.RFC3339Nano)})
}
sort.Slice(idx.Tasks, func(i, j int) bool {
if idx.Tasks[i].Priority != idx.Tasks[j].Priority {
return idx.Tasks[i].Priority > idx.Tasks[j].Priority
}
return idx.Tasks[i].CreatedAt < idx.Tasks[j].CreatedAt
})
payload, err := json.MarshalIndent(&idx, "", " ")
if err != nil {
return err
}
path := filepath.Join(q.root, "pending", ".queue.json")
return writeFileAtomic(path, payload, 0640)
}
func writeFileAtomic(path string, data []byte, perm os.FileMode) error {
dir := filepath.Dir(path)
if err := os.MkdirAll(dir, 0750); err != nil {
return err
}
tmp := path + ".tmp"
if err := os.WriteFile(tmp, data, perm); err != nil {
return err
}
return os.Rename(tmp, path)
}