package queue import ( "errors" "fmt" "strings" "time" ) var ErrInvalidQueueBackend = errors.New("invalid queue backend") type Backend interface { AddTask(task *Task) error GetNextTask() (*Task, error) PeekNextTask() (*Task, error) GetNextTaskWithLease(workerID string, leaseDuration time.Duration) (*Task, error) GetNextTaskWithLeaseBlocking(workerID string, leaseDuration, blockTimeout time.Duration) (*Task, error) RenewLease(taskID string, workerID string, leaseDuration time.Duration) error ReleaseLease(taskID string, workerID string) error RetryTask(task *Task) error MoveToDeadLetterQueue(task *Task, reason string) error GetTask(taskID string) (*Task, error) GetAllTasks() ([]*Task, error) GetTaskByName(jobName string) (*Task, error) CancelTask(taskID string) error UpdateTask(task *Task) error UpdateTaskWithMetrics(task *Task, action string) error RecordMetric(jobName, metric string, value float64) error Heartbeat(workerID string) error QueueDepth() (int64, error) SetWorkerPrewarmState(state PrewarmState) error ClearWorkerPrewarmState(workerID string) error GetWorkerPrewarmState(workerID string) (*PrewarmState, error) GetAllWorkerPrewarmStates() ([]PrewarmState, error) SignalPrewarmGC() error PrewarmGCRequestValue() (string, error) Close() error } type QueueBackend string const ( QueueBackendRedis QueueBackend = "redis" QueueBackendSQLite QueueBackend = "sqlite" QueueBackendFS QueueBackend = "filesystem" ) type BackendConfig struct { Backend QueueBackend RedisAddr string RedisPassword string RedisDB int SQLitePath string FilesystemPath string FallbackToFilesystem bool MetricsFlushInterval time.Duration } func NewBackend(cfg BackendConfig) (Backend, error) { mkFallback := func(err error) (Backend, error) { if !cfg.FallbackToFilesystem { return nil, err } if strings.TrimSpace(cfg.FilesystemPath) == "" { return nil, fmt.Errorf("filesystem queue path is required for fallback") } fsq, fsErr := NewFilesystemQueue(cfg.FilesystemPath) if fsErr != nil { return nil, fmt.Errorf("filesystem queue fallback init failed: %w", fsErr) } return fsq, nil } switch cfg.Backend { case QueueBackendFS: if strings.TrimSpace(cfg.FilesystemPath) == "" { return nil, fmt.Errorf("filesystem queue path is required") } return NewFilesystemQueue(cfg.FilesystemPath) case "", QueueBackendRedis: b, err := NewTaskQueue(Config{ RedisAddr: cfg.RedisAddr, RedisPassword: cfg.RedisPassword, RedisDB: cfg.RedisDB, MetricsFlushInterval: cfg.MetricsFlushInterval, }) if err != nil { return mkFallback(err) } return b, nil case QueueBackendSQLite: b, err := NewSQLiteQueue(cfg.SQLitePath) if err != nil { return mkFallback(err) } return b, nil default: return nil, ErrInvalidQueueBackend } }