Update queue and storage systems for scheduler integration: - Queue backend with scheduler coordination - Filesystem queue with batch operations - Deduplication with tenant-aware keys - Storage layer with audit logging hooks - Domain models (Task, Events, Errors) with scheduler fields - Database layer with tenant isolation - Dataset storage with integrity checks
578 lines
14 KiB
Go
578 lines
14 KiB
Go
package queue
|
|
|
|
import (
|
|
"context"
|
|
"encoding/json"
|
|
"errors"
|
|
"fmt"
|
|
"os"
|
|
"path/filepath"
|
|
"sort"
|
|
"strings"
|
|
"time"
|
|
|
|
"github.com/jfraeys/fetch_ml/internal/config"
|
|
"github.com/jfraeys/fetch_ml/internal/domain"
|
|
"github.com/jfraeys/fetch_ml/internal/fileutil"
|
|
)
|
|
|
|
type FilesystemQueue struct {
|
|
ctx context.Context
|
|
cancel context.CancelFunc
|
|
root string
|
|
}
|
|
|
|
type filesystemQueueIndex struct {
|
|
UpdatedAt string `json:"updated_at"`
|
|
Tasks []filesystemQueueIndexTask `json:"tasks"`
|
|
Version int `json:"version"`
|
|
}
|
|
|
|
type filesystemQueueIndexTask struct {
|
|
ID string `json:"id"`
|
|
CreatedAt string `json:"created_at"`
|
|
Priority int64 `json:"priority"`
|
|
}
|
|
|
|
func NewFilesystemQueue(root string) (*FilesystemQueue, error) {
|
|
root = strings.TrimSpace(root)
|
|
if root == "" {
|
|
return nil, fmt.Errorf("filesystem queue root is required")
|
|
}
|
|
root = filepath.Clean(root)
|
|
|
|
// Use PathRegistry for consistent directory creation
|
|
paths := config.FromEnv()
|
|
|
|
if err := paths.EnsureDir(filepath.Join(root, "pending", "entries")); err != nil {
|
|
return nil, err
|
|
}
|
|
for _, d := range []string{"running", "finished", "failed"} {
|
|
if err := paths.EnsureDir(filepath.Join(root, d)); err != nil {
|
|
return nil, err
|
|
}
|
|
}
|
|
|
|
ctx, cancel := context.WithCancel(context.Background())
|
|
q := &FilesystemQueue{root: root, ctx: ctx, cancel: cancel}
|
|
_ = q.rebuildIndex()
|
|
return q, nil
|
|
}
|
|
|
|
func (q *FilesystemQueue) Close() error {
|
|
q.cancel()
|
|
return nil
|
|
}
|
|
|
|
func (q *FilesystemQueue) AddTask(task *Task) error {
|
|
if task == nil {
|
|
return fmt.Errorf("task is nil")
|
|
}
|
|
if strings.TrimSpace(task.ID) == "" {
|
|
return fmt.Errorf("task id is required")
|
|
}
|
|
if strings.TrimSpace(task.JobName) == "" {
|
|
return fmt.Errorf("job name is required")
|
|
}
|
|
if task.MaxRetries == 0 {
|
|
task.MaxRetries = defaultMaxRetries
|
|
}
|
|
if task.CreatedAt.IsZero() {
|
|
task.CreatedAt = time.Now().UTC()
|
|
}
|
|
if strings.TrimSpace(task.Status) == "" {
|
|
task.Status = "queued"
|
|
}
|
|
if task.Status != "queued" {
|
|
// For filesystem backend we only enqueue queued tasks.
|
|
// Other status updates should go through UpdateTask.
|
|
task.Status = "queued"
|
|
}
|
|
|
|
payload, err := json.Marshal(task)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
|
|
path := q.pendingEntryPath(task.ID)
|
|
if err := writeFileAtomic(path, payload, 0640); err != nil {
|
|
return err
|
|
}
|
|
TasksQueued.Inc()
|
|
if depth, derr := q.QueueDepth(); derr == nil {
|
|
UpdateQueueDepth(depth)
|
|
}
|
|
_ = q.rebuildIndex()
|
|
return nil
|
|
}
|
|
|
|
func (q *FilesystemQueue) GetNextTask() (*Task, error) {
|
|
return q.claimNext("", 0, false)
|
|
}
|
|
|
|
func (q *FilesystemQueue) PeekNextTask() (*Task, error) {
|
|
return q.claimNext("", 0, true)
|
|
}
|
|
|
|
func (q *FilesystemQueue) GetNextTaskWithLease(workerID string, leaseDuration time.Duration) (*Task, error) {
|
|
return q.claimNext(workerID, leaseDuration, false)
|
|
}
|
|
|
|
func (q *FilesystemQueue) GetNextTaskWithLeaseBlocking(
|
|
workerID string,
|
|
leaseDuration, blockTimeout time.Duration,
|
|
) (*Task, error) {
|
|
if blockTimeout <= 0 {
|
|
blockTimeout = defaultBlockTimeout
|
|
}
|
|
deadline := time.Now().Add(blockTimeout)
|
|
for {
|
|
t, err := q.claimNext(workerID, leaseDuration, false)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
if t != nil {
|
|
return t, nil
|
|
}
|
|
if time.Now().After(deadline) {
|
|
return nil, nil
|
|
}
|
|
select {
|
|
case <-q.ctx.Done():
|
|
return nil, q.ctx.Err()
|
|
case <-time.After(50 * time.Millisecond):
|
|
}
|
|
}
|
|
}
|
|
|
|
func (q *FilesystemQueue) RenewLease(taskID string, workerID string, leaseDuration time.Duration) error {
|
|
// Single-worker friendly best-effort: update task lease fields if present.
|
|
t, err := q.GetTask(taskID)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
if t.LeasedBy != "" && workerID != "" && t.LeasedBy != workerID {
|
|
return fmt.Errorf("task leased by different worker: %s", t.LeasedBy)
|
|
}
|
|
if leaseDuration == 0 {
|
|
leaseDuration = defaultLeaseDuration
|
|
}
|
|
exp := time.Now().UTC().Add(leaseDuration)
|
|
t.LeaseExpiry = &exp
|
|
if workerID != "" {
|
|
t.LeasedBy = workerID
|
|
}
|
|
RecordLeaseRenewal(workerID)
|
|
return q.UpdateTask(t)
|
|
}
|
|
|
|
func (q *FilesystemQueue) ReleaseLease(taskID string, workerID string) error {
|
|
t, err := q.GetTask(taskID)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
if t.LeasedBy != "" && workerID != "" && t.LeasedBy != workerID {
|
|
return fmt.Errorf("task leased by different worker: %s", t.LeasedBy)
|
|
}
|
|
t.LeaseExpiry = nil
|
|
t.LeasedBy = ""
|
|
return q.UpdateTask(t)
|
|
}
|
|
|
|
func (q *FilesystemQueue) RetryTask(task *Task) error {
|
|
if task.RetryCount >= task.MaxRetries {
|
|
RecordDLQAddition("max_retries")
|
|
return q.MoveToDeadLetterQueue(task, "max retries exceeded")
|
|
}
|
|
|
|
failureClass := domain.FailureUnknown
|
|
if task.Error != "" {
|
|
failureClass = domain.ClassifyFailure(0, nil, task.Error)
|
|
}
|
|
if !domain.ShouldAutoRetry(failureClass, task.RetryCount) {
|
|
RecordDLQAddition(string(failureClass))
|
|
return q.MoveToDeadLetterQueue(task, fmt.Sprintf("non-retryable error: %s", failureClass))
|
|
}
|
|
|
|
task.RetryCount++
|
|
task.Status = "queued"
|
|
task.LastError = task.Error
|
|
task.Error = ""
|
|
|
|
backoffSeconds := domain.RetryDelayForClass(failureClass, task.RetryCount)
|
|
nextRetry := time.Now().UTC().Add(time.Duration(backoffSeconds) * time.Second)
|
|
task.NextRetry = &nextRetry
|
|
task.LeaseExpiry = nil
|
|
task.LeasedBy = ""
|
|
|
|
RecordTaskRetry(task.JobName, failureClass)
|
|
return q.AddTask(task)
|
|
}
|
|
|
|
func (q *FilesystemQueue) MoveToDeadLetterQueue(task *Task, reason string) error {
|
|
if task == nil {
|
|
return fmt.Errorf("task is nil")
|
|
}
|
|
task.Status = "failed"
|
|
task.Error = fmt.Sprintf("DLQ: %s. Last error: %s", reason, task.LastError)
|
|
failureClass := domain.ClassifyFailure(0, nil, task.LastError)
|
|
RecordTaskFailure(task.JobName, failureClass)
|
|
return q.UpdateTask(task)
|
|
}
|
|
|
|
func (q *FilesystemQueue) GetTask(taskID string) (*Task, error) {
|
|
path, err := q.findTaskPath(taskID)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
data, err := os.ReadFile(path)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
var t Task
|
|
if err := json.Unmarshal(data, &t); err != nil {
|
|
return nil, err
|
|
}
|
|
return &t, nil
|
|
}
|
|
|
|
func (q *FilesystemQueue) GetAllTasks() ([]*Task, error) {
|
|
paths := make([]string, 0, 32)
|
|
for _, p := range []string{
|
|
filepath.Join(q.root, "pending", "entries"),
|
|
filepath.Join(q.root, "running"),
|
|
filepath.Join(q.root, "finished"),
|
|
filepath.Join(q.root, "failed"),
|
|
} {
|
|
entries, err := os.ReadDir(p)
|
|
if err != nil {
|
|
continue
|
|
}
|
|
for _, e := range entries {
|
|
if e.IsDir() {
|
|
continue
|
|
}
|
|
if !strings.HasSuffix(e.Name(), ".json") {
|
|
continue
|
|
}
|
|
paths = append(paths, filepath.Join(p, e.Name()))
|
|
}
|
|
}
|
|
|
|
out := make([]*Task, 0, len(paths))
|
|
for _, path := range paths {
|
|
data, err := os.ReadFile(path)
|
|
if err != nil {
|
|
continue
|
|
}
|
|
var t Task
|
|
if err := json.Unmarshal(data, &t); err != nil {
|
|
continue
|
|
}
|
|
out = append(out, &t)
|
|
}
|
|
return out, nil
|
|
}
|
|
|
|
func (q *FilesystemQueue) GetTaskByName(jobName string) (*Task, error) {
|
|
jobName = strings.TrimSpace(jobName)
|
|
if jobName == "" {
|
|
return nil, fmt.Errorf("job name is required")
|
|
}
|
|
tasks, err := q.GetAllTasks()
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
var best *Task
|
|
for _, t := range tasks {
|
|
if t == nil || t.JobName != jobName {
|
|
continue
|
|
}
|
|
if best == nil || t.CreatedAt.After(best.CreatedAt) {
|
|
best = t
|
|
}
|
|
}
|
|
if best == nil {
|
|
return nil, os.ErrNotExist
|
|
}
|
|
return best, nil
|
|
}
|
|
|
|
func (q *FilesystemQueue) CancelTask(taskID string) error {
|
|
t, err := q.GetTask(taskID)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
t.Status = "cancelled"
|
|
now := time.Now().UTC()
|
|
t.EndedAt = &now
|
|
return q.UpdateTask(t)
|
|
}
|
|
|
|
func (q *FilesystemQueue) UpdateTask(task *Task) error {
|
|
if task == nil {
|
|
return fmt.Errorf("task is nil")
|
|
}
|
|
if strings.TrimSpace(task.ID) == "" {
|
|
return fmt.Errorf("task id is required")
|
|
}
|
|
if strings.TrimSpace(task.Status) == "" {
|
|
return fmt.Errorf("task status is required")
|
|
}
|
|
|
|
payload, err := json.Marshal(task)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
|
|
dst := q.pathForStatus(task.Status, task.ID)
|
|
if dst == "" {
|
|
// For statuses we don't map yet, keep it in running.
|
|
dst = filepath.Join(q.root, "running", task.ID+".json")
|
|
}
|
|
if err := os.MkdirAll(filepath.Dir(dst), 0750); err != nil {
|
|
return err
|
|
}
|
|
|
|
// Best-effort: remove any other copies before writing.
|
|
_ = q.removeTaskFromAllDirs(task.ID)
|
|
if err := writeFileAtomic(dst, payload, 0640); err != nil {
|
|
return err
|
|
}
|
|
if depth, derr := q.QueueDepth(); derr == nil {
|
|
UpdateQueueDepth(depth)
|
|
}
|
|
_ = q.rebuildIndex()
|
|
return nil
|
|
}
|
|
|
|
func (q *FilesystemQueue) UpdateTaskWithMetrics(task *Task, _ string) error {
|
|
return q.UpdateTask(task)
|
|
}
|
|
|
|
func (q *FilesystemQueue) RecordMetric(_, _ string, _ float64) error {
|
|
return nil
|
|
}
|
|
|
|
func (q *FilesystemQueue) Heartbeat(_ string) error {
|
|
return nil
|
|
}
|
|
|
|
func (q *FilesystemQueue) QueueDepth() (int64, error) {
|
|
entries, err := os.ReadDir(filepath.Join(q.root, "pending", "entries"))
|
|
if err != nil {
|
|
return 0, err
|
|
}
|
|
var n int64
|
|
for _, e := range entries {
|
|
if e.IsDir() {
|
|
continue
|
|
}
|
|
if strings.HasSuffix(e.Name(), ".json") {
|
|
n++
|
|
}
|
|
}
|
|
return n, nil
|
|
}
|
|
|
|
func (q *FilesystemQueue) SetWorkerPrewarmState(_ PrewarmState) error { return nil }
|
|
func (q *FilesystemQueue) ClearWorkerPrewarmState(_ string) error { return nil }
|
|
func (q *FilesystemQueue) GetWorkerPrewarmState(_ string) (*PrewarmState, error) {
|
|
return nil, nil
|
|
}
|
|
func (q *FilesystemQueue) GetAllWorkerPrewarmStates() ([]PrewarmState, error) {
|
|
return nil, nil
|
|
}
|
|
func (q *FilesystemQueue) SignalPrewarmGC() error { return nil }
|
|
func (q *FilesystemQueue) PrewarmGCRequestValue() (string, error) {
|
|
return "", nil
|
|
}
|
|
|
|
func (q *FilesystemQueue) claimNext(workerID string, leaseDuration time.Duration, peek bool) (*Task, error) {
|
|
pendingDir := filepath.Join(q.root, "pending", "entries")
|
|
entries, err := os.ReadDir(pendingDir)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
|
|
candidates := make([]*Task, 0, len(entries))
|
|
paths := make(map[string]string, len(entries))
|
|
for _, e := range entries {
|
|
if e.IsDir() {
|
|
continue
|
|
}
|
|
if !strings.HasSuffix(e.Name(), ".json") {
|
|
continue
|
|
}
|
|
path := filepath.Join(pendingDir, e.Name())
|
|
data, err := os.ReadFile(path)
|
|
if err != nil {
|
|
continue
|
|
}
|
|
var t Task
|
|
if err := json.Unmarshal(data, &t); err != nil {
|
|
continue
|
|
}
|
|
if t.NextRetry != nil && time.Now().UTC().Before(t.NextRetry.UTC()) {
|
|
continue
|
|
}
|
|
candidates = append(candidates, &t)
|
|
paths[t.ID] = path
|
|
}
|
|
|
|
if len(candidates) == 0 {
|
|
return nil, nil
|
|
}
|
|
|
|
sort.Slice(candidates, func(i, j int) bool {
|
|
if candidates[i].Priority != candidates[j].Priority {
|
|
return candidates[i].Priority > candidates[j].Priority
|
|
}
|
|
return candidates[i].CreatedAt.Before(candidates[j].CreatedAt)
|
|
})
|
|
|
|
chosen := candidates[0]
|
|
if peek {
|
|
return chosen, nil
|
|
}
|
|
|
|
src := paths[chosen.ID]
|
|
if src == "" {
|
|
return nil, nil
|
|
}
|
|
dst := filepath.Join(q.root, "running", chosen.ID+".json")
|
|
if err := os.Rename(src, dst); err != nil {
|
|
// Another process might have claimed it.
|
|
if errors.Is(err, os.ErrNotExist) {
|
|
return nil, nil
|
|
}
|
|
return nil, err
|
|
}
|
|
|
|
// Refresh from the moved file to avoid race on content.
|
|
data, err := os.ReadFile(dst)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
var t Task
|
|
if err := json.Unmarshal(data, &t); err != nil {
|
|
return nil, err
|
|
}
|
|
|
|
now := time.Now().UTC()
|
|
if leaseDuration == 0 {
|
|
leaseDuration = defaultLeaseDuration
|
|
}
|
|
exp := now.Add(leaseDuration)
|
|
t.LeaseExpiry = &exp
|
|
if strings.TrimSpace(workerID) != "" {
|
|
t.LeasedBy = workerID
|
|
}
|
|
// Note: status transitions are handled by worker UpdateTask calls.
|
|
|
|
payload, err := json.Marshal(&t)
|
|
if err == nil {
|
|
_ = writeFileAtomic(dst, payload, 0640)
|
|
}
|
|
if depth, derr := q.QueueDepth(); derr == nil {
|
|
UpdateQueueDepth(depth)
|
|
}
|
|
_ = q.rebuildIndex()
|
|
return &t, nil
|
|
}
|
|
|
|
func (q *FilesystemQueue) pendingEntryPath(taskID string) string {
|
|
return filepath.Join(q.root, "pending", "entries", taskID+".json")
|
|
}
|
|
|
|
func (q *FilesystemQueue) pathForStatus(status, taskID string) string {
|
|
switch status {
|
|
case "queued":
|
|
return q.pendingEntryPath(taskID)
|
|
case "running":
|
|
return filepath.Join(q.root, "running", taskID+".json")
|
|
case "completed", "finished":
|
|
return filepath.Join(q.root, "finished", taskID+".json")
|
|
case "failed", "cancelled":
|
|
return filepath.Join(q.root, "failed", taskID+".json")
|
|
default:
|
|
return ""
|
|
}
|
|
}
|
|
|
|
func (q *FilesystemQueue) findTaskPath(taskID string) (string, error) {
|
|
paths := []string{
|
|
q.pendingEntryPath(taskID),
|
|
filepath.Join(q.root, "running", taskID+".json"),
|
|
filepath.Join(q.root, "finished", taskID+".json"),
|
|
filepath.Join(q.root, "failed", taskID+".json"),
|
|
}
|
|
for _, p := range paths {
|
|
if _, err := os.Stat(p); err == nil {
|
|
return p, nil
|
|
}
|
|
}
|
|
return "", os.ErrNotExist
|
|
}
|
|
|
|
func (q *FilesystemQueue) removeTaskFromAllDirs(taskID string) error {
|
|
paths := []string{
|
|
q.pendingEntryPath(taskID),
|
|
filepath.Join(q.root, "running", taskID+".json"),
|
|
filepath.Join(q.root, "finished", taskID+".json"),
|
|
filepath.Join(q.root, "failed", taskID+".json"),
|
|
}
|
|
var outErr error
|
|
for _, p := range paths {
|
|
if err := os.Remove(p); err != nil && !errors.Is(err, os.ErrNotExist) {
|
|
outErr = err
|
|
}
|
|
}
|
|
return outErr
|
|
}
|
|
|
|
func (q *FilesystemQueue) rebuildIndex() error {
|
|
pendingDir := filepath.Join(q.root, "pending", "entries")
|
|
entries, err := os.ReadDir(pendingDir)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
idx := filesystemQueueIndex{Version: 1, UpdatedAt: time.Now().UTC().Format(time.RFC3339)}
|
|
for _, e := range entries {
|
|
if e.IsDir() || !strings.HasSuffix(e.Name(), ".json") {
|
|
continue
|
|
}
|
|
data, err := os.ReadFile(filepath.Join(pendingDir, e.Name()))
|
|
if err != nil {
|
|
continue
|
|
}
|
|
var t Task
|
|
if err := json.Unmarshal(data, &t); err != nil {
|
|
continue
|
|
}
|
|
idx.Tasks = append(idx.Tasks, filesystemQueueIndexTask{ID: t.ID, Priority: t.Priority, CreatedAt: t.CreatedAt.UTC().Format(time.RFC3339Nano)})
|
|
}
|
|
|
|
sort.Slice(idx.Tasks, func(i, j int) bool {
|
|
if idx.Tasks[i].Priority != idx.Tasks[j].Priority {
|
|
return idx.Tasks[i].Priority > idx.Tasks[j].Priority
|
|
}
|
|
return idx.Tasks[i].CreatedAt < idx.Tasks[j].CreatedAt
|
|
})
|
|
|
|
payload, err := json.MarshalIndent(&idx, "", " ")
|
|
if err != nil {
|
|
return err
|
|
}
|
|
path := filepath.Join(q.root, "pending", ".queue.json")
|
|
return writeFileAtomic(path, payload, 0640)
|
|
}
|
|
|
|
func writeFileAtomic(path string, data []byte, perm os.FileMode) error {
|
|
dir := filepath.Dir(path)
|
|
if err := os.MkdirAll(dir, 0750); err != nil {
|
|
return err
|
|
}
|
|
// SECURITY: Use WriteFileSafe for atomic write with fsync
|
|
return fileutil.WriteFileSafe(path, data, perm)
|
|
}
|