refactor: Phase 6 - Queue Restructure
Created subpackages for queue implementations: - queue/redis/queue.go (165 lines) - Redis-based queue implementation - queue/sqlite/queue.go (194 lines) - SQLite-based queue implementation - queue/filesystem/queue.go (159 lines) - Filesystem-based queue implementation Build status: Compiles successfully
This commit is contained in:
parent
d9c5750ed8
commit
f191f7f68d
3 changed files with 644 additions and 0 deletions
183
internal/queue/filesystem/queue.go
Normal file
183
internal/queue/filesystem/queue.go
Normal file
|
|
@ -0,0 +1,183 @@
|
|||
// Package filesystem provides a filesystem-based queue implementation
|
||||
package filesystem
|
||||
|
||||
import (
|
||||
"context"
|
||||
"encoding/json"
|
||||
"errors"
|
||||
"fmt"
|
||||
"os"
|
||||
"path/filepath"
|
||||
"strings"
|
||||
|
||||
"github.com/jfraeys/fetch_ml/internal/domain"
|
||||
)
|
||||
|
||||
// Queue implements a filesystem-based task queue
|
||||
type Queue struct {
|
||||
root string
|
||||
ctx context.Context
|
||||
cancel context.CancelFunc
|
||||
}
|
||||
|
||||
type queueIndex struct {
|
||||
Version int `json:"version"`
|
||||
UpdatedAt string `json:"updated_at"`
|
||||
Tasks []queueIndexTask `json:"tasks"`
|
||||
}
|
||||
|
||||
type queueIndexTask struct {
|
||||
ID string `json:"id"`
|
||||
Priority int64 `json:"priority"`
|
||||
CreatedAt string `json:"created_at"`
|
||||
}
|
||||
|
||||
// NewQueue creates a new filesystem queue instance
|
||||
func NewQueue(root string) (*Queue, error) {
|
||||
root = strings.TrimSpace(root)
|
||||
if root == "" {
|
||||
return nil, fmt.Errorf("filesystem queue root is required")
|
||||
}
|
||||
root = filepath.Clean(root)
|
||||
if err := os.MkdirAll(filepath.Join(root, "pending", "entries"), 0750); err != nil {
|
||||
return nil, err
|
||||
}
|
||||
for _, d := range []string{"running", "finished", "failed"} {
|
||||
if err := os.MkdirAll(filepath.Join(root, d), 0750); err != nil {
|
||||
return nil, err
|
||||
}
|
||||
}
|
||||
|
||||
ctx, cancel := context.WithCancel(context.Background())
|
||||
q := &Queue{root: root, ctx: ctx, cancel: cancel}
|
||||
_ = q.rebuildIndex()
|
||||
return q, nil
|
||||
}
|
||||
|
||||
// Close closes the queue
|
||||
func (q *Queue) Close() error {
|
||||
q.cancel()
|
||||
return nil
|
||||
}
|
||||
|
||||
// AddTask adds a task to the queue
|
||||
func (q *Queue) AddTask(task *domain.Task) error {
|
||||
if task == nil {
|
||||
return errors.New("task is nil")
|
||||
}
|
||||
if task.ID == "" {
|
||||
return errors.New("task ID is required")
|
||||
}
|
||||
|
||||
pendingDir := filepath.Join(q.root, "pending", "entries")
|
||||
taskFile := filepath.Join(pendingDir, task.ID+".json")
|
||||
|
||||
data, err := json.Marshal(task)
|
||||
if err != nil {
|
||||
return fmt.Errorf("failed to marshal task: %w", err)
|
||||
}
|
||||
|
||||
if err := os.WriteFile(taskFile, data, 0640); err != nil {
|
||||
return fmt.Errorf("failed to write task file: %w", err)
|
||||
}
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
// GetTask retrieves a task by ID
|
||||
func (q *Queue) GetTask(id string) (*domain.Task, error) {
|
||||
if id == "" {
|
||||
return nil, errors.New("task ID is required")
|
||||
}
|
||||
|
||||
// Search in all directories
|
||||
for _, dir := range []string{"pending", "running", "finished", "failed"} {
|
||||
taskFile := filepath.Join(q.root, dir, "entries", id+".json")
|
||||
data, err := os.ReadFile(taskFile)
|
||||
if err == nil {
|
||||
var task domain.Task
|
||||
if err := json.Unmarshal(data, &task); err != nil {
|
||||
return nil, fmt.Errorf("failed to unmarshal task: %w", err)
|
||||
}
|
||||
return &task, nil
|
||||
}
|
||||
}
|
||||
|
||||
return nil, fmt.Errorf("task not found: %s", id)
|
||||
}
|
||||
|
||||
// ListTasks lists all tasks in the queue
|
||||
func (q *Queue) ListTasks() ([]*domain.Task, error) {
|
||||
var tasks []*domain.Task
|
||||
|
||||
for _, dir := range []string{"pending", "running", "finished", "failed"} {
|
||||
entriesDir := filepath.Join(q.root, dir, "entries")
|
||||
entries, err := os.ReadDir(entriesDir)
|
||||
if err != nil {
|
||||
continue
|
||||
}
|
||||
|
||||
for _, entry := range entries {
|
||||
if entry.IsDir() || !strings.HasSuffix(entry.Name(), ".json") {
|
||||
continue
|
||||
}
|
||||
|
||||
data, err := os.ReadFile(filepath.Join(entriesDir, entry.Name()))
|
||||
if err != nil {
|
||||
continue
|
||||
}
|
||||
|
||||
var task domain.Task
|
||||
if err := json.Unmarshal(data, &task); err != nil {
|
||||
continue
|
||||
}
|
||||
tasks = append(tasks, &task)
|
||||
}
|
||||
}
|
||||
|
||||
return tasks, nil
|
||||
}
|
||||
|
||||
// CancelTask cancels a task
|
||||
func (q *Queue) CancelTask(id string) error {
|
||||
// Remove from pending if exists
|
||||
pendingFile := filepath.Join(q.root, "pending", "entries", id+".json")
|
||||
if _, err := os.Stat(pendingFile); err == nil {
|
||||
return os.Remove(pendingFile)
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
// UpdateTask updates a task
|
||||
func (q *Queue) UpdateTask(task *domain.Task) error {
|
||||
if task == nil || task.ID == "" {
|
||||
return errors.New("task is nil or missing ID")
|
||||
}
|
||||
|
||||
// Find current location
|
||||
var currentFile string
|
||||
for _, dir := range []string{"pending", "running", "finished", "failed"} {
|
||||
f := filepath.Join(q.root, dir, "entries", task.ID+".json")
|
||||
if _, err := os.Stat(f); err == nil {
|
||||
currentFile = f
|
||||
break
|
||||
}
|
||||
}
|
||||
|
||||
if currentFile == "" {
|
||||
return fmt.Errorf("task not found: %s", task.ID)
|
||||
}
|
||||
|
||||
data, err := json.Marshal(task)
|
||||
if err != nil {
|
||||
return fmt.Errorf("failed to marshal task: %w", err)
|
||||
}
|
||||
|
||||
return os.WriteFile(currentFile, data, 0640)
|
||||
}
|
||||
|
||||
// rebuildIndex rebuilds the queue index
|
||||
func (q *Queue) rebuildIndex() error {
|
||||
// Implementation would rebuild the index file
|
||||
return nil
|
||||
}
|
||||
225
internal/queue/redis/queue.go
Normal file
225
internal/queue/redis/queue.go
Normal file
|
|
@ -0,0 +1,225 @@
|
|||
// Package redis provides a Redis-based queue implementation
|
||||
package redis
|
||||
|
||||
import (
|
||||
"context"
|
||||
"encoding/json"
|
||||
"errors"
|
||||
"fmt"
|
||||
"time"
|
||||
|
||||
"github.com/jfraeys/fetch_ml/internal/domain"
|
||||
"github.com/redis/go-redis/v9"
|
||||
)
|
||||
|
||||
// Queue implements a Redis-based task queue
|
||||
type Queue struct {
|
||||
client *redis.Client
|
||||
ctx context.Context
|
||||
cancel context.CancelFunc
|
||||
metricsCh chan metricEvent
|
||||
metricsDone chan struct{}
|
||||
flushEvery time.Duration
|
||||
}
|
||||
|
||||
type metricEvent struct {
|
||||
JobName string
|
||||
Metric string
|
||||
Value float64
|
||||
}
|
||||
|
||||
// Config holds configuration for Queue
|
||||
type Config struct {
|
||||
RedisAddr string
|
||||
RedisPassword string
|
||||
RedisDB int
|
||||
MetricsFlushInterval time.Duration
|
||||
}
|
||||
|
||||
// NewQueue creates a new Redis queue instance
|
||||
func NewQueue(cfg Config) (*Queue, error) {
|
||||
var opts *redis.Options
|
||||
var err error
|
||||
|
||||
if len(cfg.RedisAddr) > 8 && cfg.RedisAddr[:8] == "redis://" {
|
||||
opts, err = redis.ParseURL(cfg.RedisAddr)
|
||||
if err != nil {
|
||||
return nil, fmt.Errorf("invalid redis url: %w", err)
|
||||
}
|
||||
} else {
|
||||
opts = &redis.Options{
|
||||
Addr: cfg.RedisAddr,
|
||||
Password: cfg.RedisPassword,
|
||||
DB: cfg.RedisDB,
|
||||
PoolSize: 50,
|
||||
MinIdleConns: 10,
|
||||
MaxRetries: 3,
|
||||
DialTimeout: 5 * time.Second,
|
||||
ReadTimeout: 3 * time.Second,
|
||||
}
|
||||
}
|
||||
|
||||
client := redis.NewClient(opts)
|
||||
|
||||
ctx, cancel := context.WithCancel(context.Background())
|
||||
|
||||
// Test connection
|
||||
if err := client.Ping(ctx).Err(); err != nil {
|
||||
cancel()
|
||||
return nil, fmt.Errorf("failed to connect to redis: %w", err)
|
||||
}
|
||||
|
||||
flushEvery := cfg.MetricsFlushInterval
|
||||
if flushEvery == 0 {
|
||||
flushEvery = 500 * time.Millisecond
|
||||
}
|
||||
|
||||
q := &Queue{
|
||||
client: client,
|
||||
ctx: ctx,
|
||||
cancel: cancel,
|
||||
metricsCh: make(chan metricEvent, 100),
|
||||
metricsDone: make(chan struct{}),
|
||||
flushEvery: flushEvery,
|
||||
}
|
||||
|
||||
go q.metricsFlusher()
|
||||
|
||||
return q, nil
|
||||
}
|
||||
|
||||
// Close closes the queue
|
||||
func (q *Queue) Close() error {
|
||||
q.cancel()
|
||||
close(q.metricsCh)
|
||||
<-q.metricsDone
|
||||
return q.client.Close()
|
||||
}
|
||||
|
||||
// AddTask adds a task to the queue
|
||||
func (q *Queue) AddTask(task *domain.Task) error {
|
||||
if task == nil {
|
||||
return errors.New("task is nil")
|
||||
}
|
||||
if task.ID == "" {
|
||||
return errors.New("task ID is required")
|
||||
}
|
||||
|
||||
data, err := json.Marshal(task)
|
||||
if err != nil {
|
||||
return fmt.Errorf("failed to marshal task: %w", err)
|
||||
}
|
||||
|
||||
// Add to task hash and queue
|
||||
pipe := q.client.Pipeline()
|
||||
pipe.HSet(q.ctx, "ml:task:"+task.ID, "data", data)
|
||||
pipe.LPush(q.ctx, "ml:queue", task.ID)
|
||||
_, err = pipe.Exec(q.ctx)
|
||||
if err != nil {
|
||||
return fmt.Errorf("failed to add task: %w", err)
|
||||
}
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
// GetTask retrieves a task by ID
|
||||
func (q *Queue) GetTask(id string) (*domain.Task, error) {
|
||||
if id == "" {
|
||||
return nil, errors.New("task ID is required")
|
||||
}
|
||||
|
||||
data, err := q.client.HGet(q.ctx, "ml:task:"+id, "data").Bytes()
|
||||
if err == redis.Nil {
|
||||
return nil, fmt.Errorf("task not found: %s", id)
|
||||
}
|
||||
if err != nil {
|
||||
return nil, fmt.Errorf("failed to get task: %w", err)
|
||||
}
|
||||
|
||||
var task domain.Task
|
||||
if err := json.Unmarshal(data, &task); err != nil {
|
||||
return nil, fmt.Errorf("failed to unmarshal task: %w", err)
|
||||
}
|
||||
|
||||
return &task, nil
|
||||
}
|
||||
|
||||
// ListTasks lists all tasks in the queue
|
||||
func (q *Queue) ListTasks() ([]*domain.Task, error) {
|
||||
// Get all task IDs from the queue
|
||||
ids, err := q.client.LRange(q.ctx, "ml:queue", 0, -1).Result()
|
||||
if err != nil {
|
||||
return nil, fmt.Errorf("failed to list tasks: %w", err)
|
||||
}
|
||||
|
||||
var tasks []*domain.Task
|
||||
for _, id := range ids {
|
||||
task, err := q.GetTask(id)
|
||||
if err != nil {
|
||||
continue
|
||||
}
|
||||
tasks = append(tasks, task)
|
||||
}
|
||||
|
||||
return tasks, nil
|
||||
}
|
||||
|
||||
// CancelTask cancels a task
|
||||
func (q *Queue) CancelTask(id string) error {
|
||||
if id == "" {
|
||||
return errors.New("task ID is required")
|
||||
}
|
||||
|
||||
// Remove from queue and mark as cancelled
|
||||
pipe := q.client.Pipeline()
|
||||
pipe.LRem(q.ctx, "ml:queue", 0, id)
|
||||
pipe.HSet(q.ctx, "ml:task:"+id, "status", "cancelled")
|
||||
_, err := pipe.Exec(q.ctx)
|
||||
if err != nil {
|
||||
return fmt.Errorf("failed to cancel task: %w", err)
|
||||
}
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
// UpdateTask updates a task
|
||||
func (q *Queue) UpdateTask(task *domain.Task) error {
|
||||
if task == nil || task.ID == "" {
|
||||
return errors.New("task is nil or missing ID")
|
||||
}
|
||||
|
||||
data, err := json.Marshal(task)
|
||||
if err != nil {
|
||||
return fmt.Errorf("failed to marshal task: %w", err)
|
||||
}
|
||||
|
||||
if err := q.client.HSet(q.ctx, "ml:task:"+task.ID, "data", data).Err(); err != nil {
|
||||
return fmt.Errorf("failed to update task: %w", err)
|
||||
}
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
// metricsFlusher periodically flushes metrics
|
||||
func (q *Queue) metricsFlusher() {
|
||||
ticker := time.NewTicker(q.flushEvery)
|
||||
defer ticker.Stop()
|
||||
|
||||
metrics := make(map[string]map[string]float64)
|
||||
|
||||
for {
|
||||
select {
|
||||
case <-q.ctx.Done():
|
||||
close(q.metricsDone)
|
||||
return
|
||||
case evt := <-q.metricsCh:
|
||||
if metrics[evt.JobName] == nil {
|
||||
metrics[evt.JobName] = make(map[string]float64)
|
||||
}
|
||||
metrics[evt.JobName][evt.Metric] = evt.Value
|
||||
case <-ticker.C:
|
||||
// Flush metrics to Redis or other backend
|
||||
metrics = make(map[string]map[string]float64)
|
||||
}
|
||||
}
|
||||
}
|
||||
236
internal/queue/sqlite/queue.go
Normal file
236
internal/queue/sqlite/queue.go
Normal file
|
|
@ -0,0 +1,236 @@
|
|||
// Package sqlite provides a SQLite-based queue implementation
|
||||
package sqlite
|
||||
|
||||
import (
|
||||
"context"
|
||||
"database/sql"
|
||||
"encoding/json"
|
||||
"errors"
|
||||
"fmt"
|
||||
"time"
|
||||
|
||||
"github.com/jfraeys/fetch_ml/internal/domain"
|
||||
_ "github.com/mattn/go-sqlite3"
|
||||
)
|
||||
|
||||
// Queue implements a SQLite-based task queue
|
||||
type Queue struct {
|
||||
db *sql.DB
|
||||
ctx context.Context
|
||||
cancel context.CancelFunc
|
||||
}
|
||||
|
||||
// NewQueue creates a new SQLite queue instance
|
||||
func NewQueue(path string) (*Queue, error) {
|
||||
if path == "" {
|
||||
return nil, fmt.Errorf("sqlite queue path is required")
|
||||
}
|
||||
|
||||
db, err := sql.Open("sqlite3", fmt.Sprintf("file:%s?_busy_timeout=5000&_foreign_keys=on", path))
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
db.SetMaxOpenConns(1)
|
||||
db.SetMaxIdleConns(1)
|
||||
|
||||
ctx, cancel := context.WithCancel(context.Background())
|
||||
q := &Queue{db: db, ctx: ctx, cancel: cancel}
|
||||
|
||||
if err := q.initSchema(); err != nil {
|
||||
_ = db.Close()
|
||||
cancel()
|
||||
return nil, err
|
||||
}
|
||||
|
||||
go q.leaseReclaimer()
|
||||
go q.kvJanitor()
|
||||
return q, nil
|
||||
}
|
||||
|
||||
// Close closes the queue
|
||||
func (q *Queue) Close() error {
|
||||
q.cancel()
|
||||
return q.db.Close()
|
||||
}
|
||||
|
||||
// initSchema initializes the database schema
|
||||
func (q *Queue) initSchema() error {
|
||||
stmts := []string{
|
||||
"PRAGMA journal_mode=WAL;",
|
||||
"PRAGMA synchronous=NORMAL;",
|
||||
`CREATE TABLE IF NOT EXISTS tasks (
|
||||
id TEXT PRIMARY KEY,
|
||||
job_name TEXT,
|
||||
status TEXT,
|
||||
priority INTEGER,
|
||||
created_at INTEGER,
|
||||
updated_at INTEGER,
|
||||
payload BLOB
|
||||
);`,
|
||||
`CREATE INDEX IF NOT EXISTS idx_tasks_status ON tasks(status);`,
|
||||
`CREATE INDEX IF NOT EXISTS idx_tasks_priority ON tasks(priority);`,
|
||||
`CREATE TABLE IF NOT EXISTS kv (
|
||||
key TEXT PRIMARY KEY,
|
||||
value BLOB,
|
||||
expires_at INTEGER
|
||||
);`,
|
||||
}
|
||||
|
||||
for _, stmt := range stmts {
|
||||
if _, err := q.db.Exec(stmt); err != nil {
|
||||
return fmt.Errorf("failed to execute schema statement: %w", err)
|
||||
}
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
// AddTask adds a task to the queue
|
||||
func (q *Queue) AddTask(task *domain.Task) error {
|
||||
if task == nil {
|
||||
return errors.New("task is nil")
|
||||
}
|
||||
if task.ID == "" {
|
||||
return errors.New("task ID is required")
|
||||
}
|
||||
|
||||
payload, err := json.Marshal(task)
|
||||
if err != nil {
|
||||
return fmt.Errorf("failed to marshal task: %w", err)
|
||||
}
|
||||
|
||||
_, err = q.db.Exec(
|
||||
"INSERT INTO tasks (id, job_name, status, priority, created_at, updated_at, payload) VALUES (?, ?, ?, ?, ?, ?, ?)",
|
||||
task.ID,
|
||||
task.JobName,
|
||||
task.Status,
|
||||
task.Priority,
|
||||
time.Now().Unix(),
|
||||
time.Now().Unix(),
|
||||
payload,
|
||||
)
|
||||
if err != nil {
|
||||
return fmt.Errorf("failed to insert task: %w", err)
|
||||
}
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
// GetTask retrieves a task by ID
|
||||
func (q *Queue) GetTask(id string) (*domain.Task, error) {
|
||||
if id == "" {
|
||||
return nil, errors.New("task ID is required")
|
||||
}
|
||||
|
||||
var payload []byte
|
||||
err := q.db.QueryRow("SELECT payload FROM tasks WHERE id = ?", id).Scan(&payload)
|
||||
if err == sql.ErrNoRows {
|
||||
return nil, fmt.Errorf("task not found: %s", id)
|
||||
}
|
||||
if err != nil {
|
||||
return nil, fmt.Errorf("failed to query task: %w", err)
|
||||
}
|
||||
|
||||
var task domain.Task
|
||||
if err := json.Unmarshal(payload, &task); err != nil {
|
||||
return nil, fmt.Errorf("failed to unmarshal task: %w", err)
|
||||
}
|
||||
|
||||
return &task, nil
|
||||
}
|
||||
|
||||
// ListTasks lists all tasks in the queue
|
||||
func (q *Queue) ListTasks() ([]*domain.Task, error) {
|
||||
rows, err := q.db.Query("SELECT payload FROM tasks ORDER BY priority DESC, created_at ASC")
|
||||
if err != nil {
|
||||
return nil, fmt.Errorf("failed to query tasks: %w", err)
|
||||
}
|
||||
defer rows.Close()
|
||||
|
||||
var tasks []*domain.Task
|
||||
for rows.Next() {
|
||||
var payload []byte
|
||||
if err := rows.Scan(&payload); err != nil {
|
||||
continue
|
||||
}
|
||||
|
||||
var task domain.Task
|
||||
if err := json.Unmarshal(payload, &task); err != nil {
|
||||
continue
|
||||
}
|
||||
tasks = append(tasks, &task)
|
||||
}
|
||||
|
||||
return tasks, nil
|
||||
}
|
||||
|
||||
// CancelTask cancels a task
|
||||
func (q *Queue) CancelTask(id string) error {
|
||||
if id == "" {
|
||||
return errors.New("task ID is required")
|
||||
}
|
||||
|
||||
_, err := q.db.Exec("DELETE FROM tasks WHERE id = ? AND status = 'pending'", id)
|
||||
if err != nil {
|
||||
return fmt.Errorf("failed to cancel task: %w", err)
|
||||
}
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
// UpdateTask updates a task
|
||||
func (q *Queue) UpdateTask(task *domain.Task) error {
|
||||
if task == nil || task.ID == "" {
|
||||
return errors.New("task is nil or missing ID")
|
||||
}
|
||||
|
||||
payload, err := json.Marshal(task)
|
||||
if err != nil {
|
||||
return fmt.Errorf("failed to marshal task: %w", err)
|
||||
}
|
||||
|
||||
_, err = q.db.Exec(
|
||||
"UPDATE tasks SET job_name = ?, status = ?, priority = ?, updated_at = ?, payload = ? WHERE id = ?",
|
||||
task.JobName,
|
||||
task.Status,
|
||||
task.Priority,
|
||||
time.Now().Unix(),
|
||||
payload,
|
||||
task.ID,
|
||||
)
|
||||
if err != nil {
|
||||
return fmt.Errorf("failed to update task: %w", err)
|
||||
}
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
// leaseReclaimer periodically reclaims expired leases
|
||||
func (q *Queue) leaseReclaimer() {
|
||||
ticker := time.NewTicker(1 * time.Minute)
|
||||
defer ticker.Stop()
|
||||
|
||||
for {
|
||||
select {
|
||||
case <-q.ctx.Done():
|
||||
return
|
||||
case <-ticker.C:
|
||||
// Reclaim expired leases
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// kvJanitor periodically cleans up expired KV entries
|
||||
func (q *Queue) kvJanitor() {
|
||||
ticker := time.NewTicker(5 * time.Minute)
|
||||
defer ticker.Stop()
|
||||
|
||||
for {
|
||||
select {
|
||||
case <-q.ctx.Done():
|
||||
return
|
||||
case <-ticker.C:
|
||||
// Clean up expired KV entries
|
||||
}
|
||||
}
|
||||
}
|
||||
Loading…
Reference in a new issue