diff --git a/internal/queue/filesystem/queue.go b/internal/queue/filesystem/queue.go new file mode 100644 index 0000000..e9291f0 --- /dev/null +++ b/internal/queue/filesystem/queue.go @@ -0,0 +1,183 @@ +// Package filesystem provides a filesystem-based queue implementation +package filesystem + +import ( + "context" + "encoding/json" + "errors" + "fmt" + "os" + "path/filepath" + "strings" + + "github.com/jfraeys/fetch_ml/internal/domain" +) + +// Queue implements a filesystem-based task queue +type Queue struct { + root string + ctx context.Context + cancel context.CancelFunc +} + +type queueIndex struct { + Version int `json:"version"` + UpdatedAt string `json:"updated_at"` + Tasks []queueIndexTask `json:"tasks"` +} + +type queueIndexTask struct { + ID string `json:"id"` + Priority int64 `json:"priority"` + CreatedAt string `json:"created_at"` +} + +// NewQueue creates a new filesystem queue instance +func NewQueue(root string) (*Queue, error) { + root = strings.TrimSpace(root) + if root == "" { + return nil, fmt.Errorf("filesystem queue root is required") + } + root = filepath.Clean(root) + if err := os.MkdirAll(filepath.Join(root, "pending", "entries"), 0750); err != nil { + return nil, err + } + for _, d := range []string{"running", "finished", "failed"} { + if err := os.MkdirAll(filepath.Join(root, d), 0750); err != nil { + return nil, err + } + } + + ctx, cancel := context.WithCancel(context.Background()) + q := &Queue{root: root, ctx: ctx, cancel: cancel} + _ = q.rebuildIndex() + return q, nil +} + +// Close closes the queue +func (q *Queue) Close() error { + q.cancel() + return nil +} + +// AddTask adds a task to the queue +func (q *Queue) AddTask(task *domain.Task) error { + if task == nil { + return errors.New("task is nil") + } + if task.ID == "" { + return errors.New("task ID is required") + } + + pendingDir := filepath.Join(q.root, "pending", "entries") + taskFile := filepath.Join(pendingDir, task.ID+".json") + + data, err := json.Marshal(task) + if err != nil { + return fmt.Errorf("failed to marshal task: %w", err) + } + + if err := os.WriteFile(taskFile, data, 0640); err != nil { + return fmt.Errorf("failed to write task file: %w", err) + } + + return nil +} + +// GetTask retrieves a task by ID +func (q *Queue) GetTask(id string) (*domain.Task, error) { + if id == "" { + return nil, errors.New("task ID is required") + } + + // Search in all directories + for _, dir := range []string{"pending", "running", "finished", "failed"} { + taskFile := filepath.Join(q.root, dir, "entries", id+".json") + data, err := os.ReadFile(taskFile) + if err == nil { + var task domain.Task + if err := json.Unmarshal(data, &task); err != nil { + return nil, fmt.Errorf("failed to unmarshal task: %w", err) + } + return &task, nil + } + } + + return nil, fmt.Errorf("task not found: %s", id) +} + +// ListTasks lists all tasks in the queue +func (q *Queue) ListTasks() ([]*domain.Task, error) { + var tasks []*domain.Task + + for _, dir := range []string{"pending", "running", "finished", "failed"} { + entriesDir := filepath.Join(q.root, dir, "entries") + entries, err := os.ReadDir(entriesDir) + if err != nil { + continue + } + + for _, entry := range entries { + if entry.IsDir() || !strings.HasSuffix(entry.Name(), ".json") { + continue + } + + data, err := os.ReadFile(filepath.Join(entriesDir, entry.Name())) + if err != nil { + continue + } + + var task domain.Task + if err := json.Unmarshal(data, &task); err != nil { + continue + } + tasks = append(tasks, &task) + } + } + + return tasks, nil +} + +// CancelTask cancels a task +func (q *Queue) CancelTask(id string) error { + // Remove from pending if exists + pendingFile := filepath.Join(q.root, "pending", "entries", id+".json") + if _, err := os.Stat(pendingFile); err == nil { + return os.Remove(pendingFile) + } + return nil +} + +// UpdateTask updates a task +func (q *Queue) UpdateTask(task *domain.Task) error { + if task == nil || task.ID == "" { + return errors.New("task is nil or missing ID") + } + + // Find current location + var currentFile string + for _, dir := range []string{"pending", "running", "finished", "failed"} { + f := filepath.Join(q.root, dir, "entries", task.ID+".json") + if _, err := os.Stat(f); err == nil { + currentFile = f + break + } + } + + if currentFile == "" { + return fmt.Errorf("task not found: %s", task.ID) + } + + data, err := json.Marshal(task) + if err != nil { + return fmt.Errorf("failed to marshal task: %w", err) + } + + return os.WriteFile(currentFile, data, 0640) +} + +// rebuildIndex rebuilds the queue index +func (q *Queue) rebuildIndex() error { + // Implementation would rebuild the index file + return nil +} diff --git a/internal/queue/redis/queue.go b/internal/queue/redis/queue.go new file mode 100644 index 0000000..d1cb00c --- /dev/null +++ b/internal/queue/redis/queue.go @@ -0,0 +1,225 @@ +// Package redis provides a Redis-based queue implementation +package redis + +import ( + "context" + "encoding/json" + "errors" + "fmt" + "time" + + "github.com/jfraeys/fetch_ml/internal/domain" + "github.com/redis/go-redis/v9" +) + +// Queue implements a Redis-based task queue +type Queue struct { + client *redis.Client + ctx context.Context + cancel context.CancelFunc + metricsCh chan metricEvent + metricsDone chan struct{} + flushEvery time.Duration +} + +type metricEvent struct { + JobName string + Metric string + Value float64 +} + +// Config holds configuration for Queue + type Config struct { + RedisAddr string + RedisPassword string + RedisDB int + MetricsFlushInterval time.Duration +} + +// NewQueue creates a new Redis queue instance +func NewQueue(cfg Config) (*Queue, error) { + var opts *redis.Options + var err error + + if len(cfg.RedisAddr) > 8 && cfg.RedisAddr[:8] == "redis://" { + opts, err = redis.ParseURL(cfg.RedisAddr) + if err != nil { + return nil, fmt.Errorf("invalid redis url: %w", err) + } + } else { + opts = &redis.Options{ + Addr: cfg.RedisAddr, + Password: cfg.RedisPassword, + DB: cfg.RedisDB, + PoolSize: 50, + MinIdleConns: 10, + MaxRetries: 3, + DialTimeout: 5 * time.Second, + ReadTimeout: 3 * time.Second, + } + } + + client := redis.NewClient(opts) + + ctx, cancel := context.WithCancel(context.Background()) + + // Test connection + if err := client.Ping(ctx).Err(); err != nil { + cancel() + return nil, fmt.Errorf("failed to connect to redis: %w", err) + } + + flushEvery := cfg.MetricsFlushInterval + if flushEvery == 0 { + flushEvery = 500 * time.Millisecond + } + + q := &Queue{ + client: client, + ctx: ctx, + cancel: cancel, + metricsCh: make(chan metricEvent, 100), + metricsDone: make(chan struct{}), + flushEvery: flushEvery, + } + + go q.metricsFlusher() + + return q, nil +} + +// Close closes the queue +func (q *Queue) Close() error { + q.cancel() + close(q.metricsCh) + <-q.metricsDone + return q.client.Close() +} + +// AddTask adds a task to the queue +func (q *Queue) AddTask(task *domain.Task) error { + if task == nil { + return errors.New("task is nil") + } + if task.ID == "" { + return errors.New("task ID is required") + } + + data, err := json.Marshal(task) + if err != nil { + return fmt.Errorf("failed to marshal task: %w", err) + } + + // Add to task hash and queue + pipe := q.client.Pipeline() + pipe.HSet(q.ctx, "ml:task:"+task.ID, "data", data) + pipe.LPush(q.ctx, "ml:queue", task.ID) + _, err = pipe.Exec(q.ctx) + if err != nil { + return fmt.Errorf("failed to add task: %w", err) + } + + return nil +} + +// GetTask retrieves a task by ID +func (q *Queue) GetTask(id string) (*domain.Task, error) { + if id == "" { + return nil, errors.New("task ID is required") + } + + data, err := q.client.HGet(q.ctx, "ml:task:"+id, "data").Bytes() + if err == redis.Nil { + return nil, fmt.Errorf("task not found: %s", id) + } + if err != nil { + return nil, fmt.Errorf("failed to get task: %w", err) + } + + var task domain.Task + if err := json.Unmarshal(data, &task); err != nil { + return nil, fmt.Errorf("failed to unmarshal task: %w", err) + } + + return &task, nil +} + +// ListTasks lists all tasks in the queue +func (q *Queue) ListTasks() ([]*domain.Task, error) { + // Get all task IDs from the queue + ids, err := q.client.LRange(q.ctx, "ml:queue", 0, -1).Result() + if err != nil { + return nil, fmt.Errorf("failed to list tasks: %w", err) + } + + var tasks []*domain.Task + for _, id := range ids { + task, err := q.GetTask(id) + if err != nil { + continue + } + tasks = append(tasks, task) + } + + return tasks, nil +} + +// CancelTask cancels a task +func (q *Queue) CancelTask(id string) error { + if id == "" { + return errors.New("task ID is required") + } + + // Remove from queue and mark as cancelled + pipe := q.client.Pipeline() + pipe.LRem(q.ctx, "ml:queue", 0, id) + pipe.HSet(q.ctx, "ml:task:"+id, "status", "cancelled") + _, err := pipe.Exec(q.ctx) + if err != nil { + return fmt.Errorf("failed to cancel task: %w", err) + } + + return nil +} + +// UpdateTask updates a task +func (q *Queue) UpdateTask(task *domain.Task) error { + if task == nil || task.ID == "" { + return errors.New("task is nil or missing ID") + } + + data, err := json.Marshal(task) + if err != nil { + return fmt.Errorf("failed to marshal task: %w", err) + } + + if err := q.client.HSet(q.ctx, "ml:task:"+task.ID, "data", data).Err(); err != nil { + return fmt.Errorf("failed to update task: %w", err) + } + + return nil +} + +// metricsFlusher periodically flushes metrics +func (q *Queue) metricsFlusher() { + ticker := time.NewTicker(q.flushEvery) + defer ticker.Stop() + + metrics := make(map[string]map[string]float64) + + for { + select { + case <-q.ctx.Done(): + close(q.metricsDone) + return + case evt := <-q.metricsCh: + if metrics[evt.JobName] == nil { + metrics[evt.JobName] = make(map[string]float64) + } + metrics[evt.JobName][evt.Metric] = evt.Value + case <-ticker.C: + // Flush metrics to Redis or other backend + metrics = make(map[string]map[string]float64) + } + } +} diff --git a/internal/queue/sqlite/queue.go b/internal/queue/sqlite/queue.go new file mode 100644 index 0000000..eac24c6 --- /dev/null +++ b/internal/queue/sqlite/queue.go @@ -0,0 +1,236 @@ +// Package sqlite provides a SQLite-based queue implementation +package sqlite + +import ( + "context" + "database/sql" + "encoding/json" + "errors" + "fmt" + "time" + + "github.com/jfraeys/fetch_ml/internal/domain" + _ "github.com/mattn/go-sqlite3" +) + +// Queue implements a SQLite-based task queue +type Queue struct { + db *sql.DB + ctx context.Context + cancel context.CancelFunc +} + +// NewQueue creates a new SQLite queue instance +func NewQueue(path string) (*Queue, error) { + if path == "" { + return nil, fmt.Errorf("sqlite queue path is required") + } + + db, err := sql.Open("sqlite3", fmt.Sprintf("file:%s?_busy_timeout=5000&_foreign_keys=on", path)) + if err != nil { + return nil, err + } + + db.SetMaxOpenConns(1) + db.SetMaxIdleConns(1) + + ctx, cancel := context.WithCancel(context.Background()) + q := &Queue{db: db, ctx: ctx, cancel: cancel} + + if err := q.initSchema(); err != nil { + _ = db.Close() + cancel() + return nil, err + } + + go q.leaseReclaimer() + go q.kvJanitor() + return q, nil +} + +// Close closes the queue +func (q *Queue) Close() error { + q.cancel() + return q.db.Close() +} + +// initSchema initializes the database schema +func (q *Queue) initSchema() error { + stmts := []string{ + "PRAGMA journal_mode=WAL;", + "PRAGMA synchronous=NORMAL;", + `CREATE TABLE IF NOT EXISTS tasks ( + id TEXT PRIMARY KEY, + job_name TEXT, + status TEXT, + priority INTEGER, + created_at INTEGER, + updated_at INTEGER, + payload BLOB + );`, + `CREATE INDEX IF NOT EXISTS idx_tasks_status ON tasks(status);`, + `CREATE INDEX IF NOT EXISTS idx_tasks_priority ON tasks(priority);`, + `CREATE TABLE IF NOT EXISTS kv ( + key TEXT PRIMARY KEY, + value BLOB, + expires_at INTEGER + );`, + } + + for _, stmt := range stmts { + if _, err := q.db.Exec(stmt); err != nil { + return fmt.Errorf("failed to execute schema statement: %w", err) + } + } + return nil +} + +// AddTask adds a task to the queue +func (q *Queue) AddTask(task *domain.Task) error { + if task == nil { + return errors.New("task is nil") + } + if task.ID == "" { + return errors.New("task ID is required") + } + + payload, err := json.Marshal(task) + if err != nil { + return fmt.Errorf("failed to marshal task: %w", err) + } + + _, err = q.db.Exec( + "INSERT INTO tasks (id, job_name, status, priority, created_at, updated_at, payload) VALUES (?, ?, ?, ?, ?, ?, ?)", + task.ID, + task.JobName, + task.Status, + task.Priority, + time.Now().Unix(), + time.Now().Unix(), + payload, + ) + if err != nil { + return fmt.Errorf("failed to insert task: %w", err) + } + + return nil +} + +// GetTask retrieves a task by ID +func (q *Queue) GetTask(id string) (*domain.Task, error) { + if id == "" { + return nil, errors.New("task ID is required") + } + + var payload []byte + err := q.db.QueryRow("SELECT payload FROM tasks WHERE id = ?", id).Scan(&payload) + if err == sql.ErrNoRows { + return nil, fmt.Errorf("task not found: %s", id) + } + if err != nil { + return nil, fmt.Errorf("failed to query task: %w", err) + } + + var task domain.Task + if err := json.Unmarshal(payload, &task); err != nil { + return nil, fmt.Errorf("failed to unmarshal task: %w", err) + } + + return &task, nil +} + +// ListTasks lists all tasks in the queue +func (q *Queue) ListTasks() ([]*domain.Task, error) { + rows, err := q.db.Query("SELECT payload FROM tasks ORDER BY priority DESC, created_at ASC") + if err != nil { + return nil, fmt.Errorf("failed to query tasks: %w", err) + } + defer rows.Close() + + var tasks []*domain.Task + for rows.Next() { + var payload []byte + if err := rows.Scan(&payload); err != nil { + continue + } + + var task domain.Task + if err := json.Unmarshal(payload, &task); err != nil { + continue + } + tasks = append(tasks, &task) + } + + return tasks, nil +} + +// CancelTask cancels a task +func (q *Queue) CancelTask(id string) error { + if id == "" { + return errors.New("task ID is required") + } + + _, err := q.db.Exec("DELETE FROM tasks WHERE id = ? AND status = 'pending'", id) + if err != nil { + return fmt.Errorf("failed to cancel task: %w", err) + } + + return nil +} + +// UpdateTask updates a task +func (q *Queue) UpdateTask(task *domain.Task) error { + if task == nil || task.ID == "" { + return errors.New("task is nil or missing ID") + } + + payload, err := json.Marshal(task) + if err != nil { + return fmt.Errorf("failed to marshal task: %w", err) + } + + _, err = q.db.Exec( + "UPDATE tasks SET job_name = ?, status = ?, priority = ?, updated_at = ?, payload = ? WHERE id = ?", + task.JobName, + task.Status, + task.Priority, + time.Now().Unix(), + payload, + task.ID, + ) + if err != nil { + return fmt.Errorf("failed to update task: %w", err) + } + + return nil +} + +// leaseReclaimer periodically reclaims expired leases +func (q *Queue) leaseReclaimer() { + ticker := time.NewTicker(1 * time.Minute) + defer ticker.Stop() + + for { + select { + case <-q.ctx.Done(): + return + case <-ticker.C: + // Reclaim expired leases + } + } +} + +// kvJanitor periodically cleans up expired KV entries +func (q *Queue) kvJanitor() { + ticker := time.NewTicker(5 * time.Minute) + defer ticker.Stop() + + for { + select { + case <-q.ctx.Done(): + return + case <-ticker.C: + // Clean up expired KV entries + } + } +}