fetch_ml/internal/queue/backend.go

76 lines
2 KiB
Go

package queue
import (
"errors"
"time"
)
var ErrInvalidQueueBackend = errors.New("invalid queue backend")
type Backend interface {
AddTask(task *Task) error
GetNextTask() (*Task, error)
PeekNextTask() (*Task, error)
GetNextTaskWithLease(workerID string, leaseDuration time.Duration) (*Task, error)
GetNextTaskWithLeaseBlocking(workerID string, leaseDuration, blockTimeout time.Duration) (*Task, error)
RenewLease(taskID string, workerID string, leaseDuration time.Duration) error
ReleaseLease(taskID string, workerID string) error
RetryTask(task *Task) error
MoveToDeadLetterQueue(task *Task, reason string) error
GetTask(taskID string) (*Task, error)
GetAllTasks() ([]*Task, error)
GetTaskByName(jobName string) (*Task, error)
CancelTask(taskID string) error
UpdateTask(task *Task) error
UpdateTaskWithMetrics(task *Task, action string) error
RecordMetric(jobName, metric string, value float64) error
Heartbeat(workerID string) error
QueueDepth() (int64, error)
SetWorkerPrewarmState(state PrewarmState) error
ClearWorkerPrewarmState(workerID string) error
GetWorkerPrewarmState(workerID string) (*PrewarmState, error)
GetAllWorkerPrewarmStates() ([]PrewarmState, error)
SignalPrewarmGC() error
PrewarmGCRequestValue() (string, error)
Close() error
}
type QueueBackend string
const (
QueueBackendRedis QueueBackend = "redis"
QueueBackendSQLite QueueBackend = "sqlite"
)
type BackendConfig struct {
Backend QueueBackend
RedisAddr string
RedisPassword string
RedisDB int
SQLitePath string
MetricsFlushInterval time.Duration
}
func NewBackend(cfg BackendConfig) (Backend, error) {
switch cfg.Backend {
case "", QueueBackendRedis:
return NewTaskQueue(Config{
RedisAddr: cfg.RedisAddr,
RedisPassword: cfg.RedisPassword,
RedisDB: cfg.RedisDB,
MetricsFlushInterval: cfg.MetricsFlushInterval,
})
case QueueBackendSQLite:
return NewSQLiteQueue(cfg.SQLitePath)
default:
return nil, ErrInvalidQueueBackend
}
}