fetch_ml/internal/queue/backend.go
Jeremie Fraeys 48d00b8322
feat: integrate native queue backend into worker and API
- Add QueueBackendNative constant to backend.go
- Add case for native queue in NewBackend() switch
- Native queue uses same FilesystemPath config
- Build tag -tags native_libs enables native implementation

Native library integration now complete:
- dataset_hash: Worker (hash_selector), CLI (verify auto-hash)
- queue_index: Worker/API (backend selection with 'native' type)
2026-02-21 14:11:10 -05:00

114 lines
3.1 KiB
Go

package queue
import (
"errors"
"fmt"
"strings"
"time"
)
var ErrInvalidQueueBackend = errors.New("invalid queue backend")
type Backend interface {
AddTask(task *Task) error
GetNextTask() (*Task, error)
PeekNextTask() (*Task, error)
GetNextTaskWithLease(workerID string, leaseDuration time.Duration) (*Task, error)
GetNextTaskWithLeaseBlocking(workerID string, leaseDuration, blockTimeout time.Duration) (*Task, error)
RenewLease(taskID string, workerID string, leaseDuration time.Duration) error
ReleaseLease(taskID string, workerID string) error
RetryTask(task *Task) error
MoveToDeadLetterQueue(task *Task, reason string) error
GetTask(taskID string) (*Task, error)
GetAllTasks() ([]*Task, error)
GetTaskByName(jobName string) (*Task, error)
CancelTask(taskID string) error
UpdateTask(task *Task) error
UpdateTaskWithMetrics(task *Task, action string) error
RecordMetric(jobName, metric string, value float64) error
Heartbeat(workerID string) error
QueueDepth() (int64, error)
SetWorkerPrewarmState(state PrewarmState) error
ClearWorkerPrewarmState(workerID string) error
GetWorkerPrewarmState(workerID string) (*PrewarmState, error)
GetAllWorkerPrewarmStates() ([]PrewarmState, error)
SignalPrewarmGC() error
PrewarmGCRequestValue() (string, error)
Close() error
}
type QueueBackend string
const (
QueueBackendRedis QueueBackend = "redis"
QueueBackendSQLite QueueBackend = "sqlite"
QueueBackendFS QueueBackend = "filesystem"
QueueBackendNative QueueBackend = "native" // Native C++ queue_index (requires -tags native_libs)
)
type BackendConfig struct {
Backend QueueBackend
RedisAddr string
RedisPassword string
RedisDB int
SQLitePath string
FilesystemPath string
FallbackToFilesystem bool
MetricsFlushInterval time.Duration
}
func NewBackend(cfg BackendConfig) (Backend, error) {
mkFallback := func(err error) (Backend, error) {
if !cfg.FallbackToFilesystem {
return nil, err
}
if strings.TrimSpace(cfg.FilesystemPath) == "" {
return nil, fmt.Errorf("filesystem queue path is required for fallback")
}
fsq, fsErr := NewFilesystemQueue(cfg.FilesystemPath)
if fsErr != nil {
return nil, fmt.Errorf("filesystem queue fallback init failed: %w", fsErr)
}
return fsq, nil
}
switch cfg.Backend {
case QueueBackendFS:
if strings.TrimSpace(cfg.FilesystemPath) == "" {
return nil, fmt.Errorf("filesystem queue path is required")
}
return NewFilesystemQueue(cfg.FilesystemPath)
case QueueBackendNative:
if strings.TrimSpace(cfg.FilesystemPath) == "" {
return nil, fmt.Errorf("native queue path is required")
}
return NewNativeQueue(cfg.FilesystemPath)
case "", QueueBackendRedis:
b, err := NewTaskQueue(Config{
RedisAddr: cfg.RedisAddr,
RedisPassword: cfg.RedisPassword,
RedisDB: cfg.RedisDB,
MetricsFlushInterval: cfg.MetricsFlushInterval,
})
if err != nil {
return mkFallback(err)
}
return b, nil
case QueueBackendSQLite:
b, err := NewSQLiteQueue(cfg.SQLitePath)
if err != nil {
return mkFallback(err)
}
return b, nil
default:
return nil, ErrInvalidQueueBackend
}
}