Update queue and storage systems for scheduler integration: - Queue backend with scheduler coordination - Filesystem queue with batch operations - Deduplication with tenant-aware keys - Storage layer with audit logging hooks - Domain models (Task, Events, Errors) with scheduler fields - Database layer with tenant isolation - Dataset storage with integrity checks
142 lines
4.2 KiB
Go
142 lines
4.2 KiB
Go
package queue
|
|
|
|
import (
|
|
"errors"
|
|
"fmt"
|
|
"strings"
|
|
"time"
|
|
|
|
"github.com/jfraeys/fetch_ml/internal/scheduler"
|
|
)
|
|
|
|
var ErrInvalidQueueBackend = errors.New("invalid queue backend")
|
|
|
|
type Backend interface {
|
|
AddTask(task *Task) error
|
|
GetNextTask() (*Task, error)
|
|
PeekNextTask() (*Task, error)
|
|
|
|
GetNextTaskWithLease(workerID string, leaseDuration time.Duration) (*Task, error)
|
|
GetNextTaskWithLeaseBlocking(workerID string, leaseDuration, blockTimeout time.Duration) (*Task, error)
|
|
RenewLease(taskID string, workerID string, leaseDuration time.Duration) error
|
|
ReleaseLease(taskID string, workerID string) error
|
|
|
|
RetryTask(task *Task) error
|
|
MoveToDeadLetterQueue(task *Task, reason string) error
|
|
|
|
GetTask(taskID string) (*Task, error)
|
|
GetAllTasks() ([]*Task, error)
|
|
GetTaskByName(jobName string) (*Task, error)
|
|
CancelTask(taskID string) error
|
|
|
|
UpdateTask(task *Task) error
|
|
UpdateTaskWithMetrics(task *Task, action string) error
|
|
|
|
RecordMetric(jobName, metric string, value float64) error
|
|
Heartbeat(workerID string) error
|
|
QueueDepth() (int64, error)
|
|
|
|
SetWorkerPrewarmState(state PrewarmState) error
|
|
ClearWorkerPrewarmState(workerID string) error
|
|
GetWorkerPrewarmState(workerID string) (*PrewarmState, error)
|
|
GetAllWorkerPrewarmStates() ([]PrewarmState, error)
|
|
|
|
SignalPrewarmGC() error
|
|
PrewarmGCRequestValue() (string, error)
|
|
|
|
Close() error
|
|
}
|
|
|
|
type QueueBackend string
|
|
|
|
const (
|
|
QueueBackendRedis QueueBackend = "redis"
|
|
QueueBackendSQLite QueueBackend = "sqlite"
|
|
QueueBackendFS QueueBackend = "filesystem"
|
|
QueueBackendNative QueueBackend = "native" // Native C++ queue_index (requires -tags native_libs)
|
|
QueueBackendScheduler QueueBackend = "scheduler" // Distributed mode via WebSocket
|
|
)
|
|
|
|
type SchedulerConfig struct {
|
|
Address string // Scheduler address (e.g., "192.168.1.10:7777")
|
|
Cert string // Path to scheduler's TLS certificate
|
|
Token string // Worker authentication token
|
|
}
|
|
|
|
type BackendConfig struct {
|
|
Mode string // "standalone" | "distributed"
|
|
Backend QueueBackend
|
|
RedisAddr string
|
|
RedisPassword string
|
|
SQLitePath string
|
|
FilesystemPath string
|
|
RedisDB int
|
|
MetricsFlushInterval time.Duration
|
|
FallbackToFilesystem bool
|
|
Scheduler SchedulerConfig // Config for distributed mode
|
|
}
|
|
|
|
func NewBackend(cfg BackendConfig) (Backend, error) {
|
|
// Distributed mode: use SchedulerBackend
|
|
if cfg.Mode == "distributed" {
|
|
return NewSchedulerBackendFromConfig(cfg.Scheduler)
|
|
}
|
|
|
|
mkFallback := func(err error) (Backend, error) {
|
|
if !cfg.FallbackToFilesystem {
|
|
return nil, err
|
|
}
|
|
if strings.TrimSpace(cfg.FilesystemPath) == "" {
|
|
return nil, fmt.Errorf("filesystem queue path is required for fallback")
|
|
}
|
|
fsq, fsErr := NewFilesystemQueue(cfg.FilesystemPath)
|
|
if fsErr != nil {
|
|
return nil, fmt.Errorf("filesystem queue fallback init failed: %w", fsErr)
|
|
}
|
|
return fsq, nil
|
|
}
|
|
|
|
switch cfg.Backend {
|
|
case QueueBackendFS:
|
|
if strings.TrimSpace(cfg.FilesystemPath) == "" {
|
|
return nil, fmt.Errorf("filesystem queue path is required")
|
|
}
|
|
return NewFilesystemQueue(cfg.FilesystemPath)
|
|
case QueueBackendNative:
|
|
if strings.TrimSpace(cfg.FilesystemPath) == "" {
|
|
return nil, fmt.Errorf("native queue path is required")
|
|
}
|
|
return NewNativeQueue(cfg.FilesystemPath)
|
|
case "", QueueBackendRedis:
|
|
b, err := NewTaskQueue(Config{
|
|
RedisAddr: cfg.RedisAddr,
|
|
RedisPassword: cfg.RedisPassword,
|
|
RedisDB: cfg.RedisDB,
|
|
MetricsFlushInterval: cfg.MetricsFlushInterval,
|
|
})
|
|
if err != nil {
|
|
return mkFallback(err)
|
|
}
|
|
return b, nil
|
|
case QueueBackendSQLite:
|
|
b, err := NewSQLiteQueue(cfg.SQLitePath)
|
|
if err != nil {
|
|
return mkFallback(err)
|
|
}
|
|
return b, nil
|
|
default:
|
|
return nil, ErrInvalidQueueBackend
|
|
}
|
|
}
|
|
|
|
// NewSchedulerBackendFromConfig creates a SchedulerBackend from config
|
|
func NewSchedulerBackendFromConfig(cfg SchedulerConfig) (Backend, error) {
|
|
if cfg.Address == "" {
|
|
return nil, fmt.Errorf("scheduler address is required for distributed mode")
|
|
}
|
|
conn := scheduler.NewSchedulerConn(cfg.Address, cfg.Cert, cfg.Token, "", scheduler.WorkerCapabilities{})
|
|
if err := conn.Connect(); err != nil {
|
|
return nil, fmt.Errorf("connect to scheduler: %w", err)
|
|
}
|
|
return NewSchedulerBackend(conn), nil
|
|
}
|