//go:build cgo && native_libs // +build cgo,native_libs package queue // #cgo LDFLAGS: -L${SRCDIR}/../../native/build -lqueue_index -Wl,-rpath,${SRCDIR}/../../native/build // #include "../../native/queue_index/queue_index.h" // #include import "C" import ( "errors" "os" "time" "unsafe" "github.com/jfraeys/fetch_ml/internal/domain" ) // UseNativeQueue controls whether to use C++ binary queue index. // Set FETCHML_NATIVE_LIBS=1 to enable native libraries. var UseNativeQueue = os.Getenv("FETCHML_NATIVE_LIBS") == "1" // NativeQueue wraps the C++ binary queue index for high-performance // task operations. It replaces JSON-based filesystem operations with // a memory-mapped binary format. type NativeQueue struct { handle *C.qi_index_t root string } // NewNativeQueue creates a new native binary queue index. // Falls back to standard FilesystemQueue if native libs unavailable. func NewNativeQueue(root string) (Backend, error) { if !UseNativeQueue { return NewFilesystemQueue(root) } croot := C.CString(root) defer C.free(unsafe.Pointer(croot)) handle := C.qi_open(croot) if handle == nil { return nil, errors.New("failed to open native queue index") } return &NativeQueue{ handle: handle, root: root, }, nil } // Close closes the native queue index and syncs to disk. func (q *NativeQueue) Close() error { if q.handle != nil { C.qi_close(q.handle) q.handle = nil } return nil } // AddTask adds a task to the native queue index. func (q *NativeQueue) AddTask(task *Task) error { if q.handle == nil { return errors.New("queue not open") } cTask := taskToC(task) rc := C.qi_add_tasks(q.handle, &cTask, 1) if rc < 0 { err := C.qi_last_error(q.handle) if err != nil { return errors.New(C.GoString(err)) } return errors.New("failed to add task") } TasksQueued.Inc() if depth, derr := q.QueueDepth(); derr == nil { UpdateQueueDepth(depth) } return nil } // GetNextTask retrieves and claims the highest priority task. func (q *NativeQueue) GetNextTask() (*Task, error) { return q.claimNext("", 0, false) } // PeekNextTask returns the highest priority task without claiming. func (q *NativeQueue) PeekNextTask() (*Task, error) { return q.claimNext("", 0, true) } // GetNextTaskWithLease retrieves a task with a lease for a specific worker. func (q *NativeQueue) GetNextTaskWithLease(workerID string, leaseDuration time.Duration) (*Task, error) { return q.claimNext(workerID, leaseDuration, false) } // claimNext retrieves the next task from the priority queue. func (q *NativeQueue) claimNext(workerID string, leaseDuration time.Duration, peek bool) (*Task, error) { if q.handle == nil { return nil, errors.New("queue not open") } var cTask C.qi_task_t if peek { rc := C.qi_peek_next(q.handle, &cTask) if rc != 0 { return nil, nil // No tasks available } } else { var count C.uint32_t rc := C.qi_get_next_batch(q.handle, &cTask, 1, &count) if rc != 0 || count == 0 { return nil, nil // No tasks available } } task := cToTask(&cTask) if !peek && workerID != "" { task.LeasedBy = workerID exp := time.Now().UTC().Add(leaseDuration) task.LeaseExpiry = &exp } if depth, derr := q.QueueDepth(); derr == nil { UpdateQueueDepth(depth) } return task, nil } // GetTask retrieves a task by ID from the native index. func (q *NativeQueue) GetTask(taskID string) (*Task, error) { if q.handle == nil { return nil, errors.New("queue not open") } cID := C.CString(taskID) defer C.free(unsafe.Pointer(cID)) var cTask C.qi_task_t rc := C.qi_get_task_by_id(q.handle, cID, &cTask) if rc != 0 { return nil, os.ErrNotExist } return cToTask(&cTask), nil } // GetAllTasks retrieves all tasks from the native index. func (q *NativeQueue) GetAllTasks() ([]*Task, error) { if q.handle == nil { return nil, errors.New("queue not open") } var cTasks *C.qi_task_t var count C.size_t rc := C.qi_get_all_tasks(q.handle, &cTasks, &count) if rc != 0 { err := C.qi_last_error(q.handle) if err != nil { return nil, errors.New(C.GoString(err)) } return nil, errors.New("failed to get tasks") } if count == 0 { return []*Task{}, nil } tasks := make([]*Task, count) for i := 0; i < int(count); i++ { // Access array element using pointer arithmetic taskPtr := (*C.qi_task_t)(unsafe.Pointer(uintptr(unsafe.Pointer(cTasks)) + uintptr(i)*unsafe.Sizeof(C.qi_task_t{}))) tasks[i] = cToTask(taskPtr) } C.qi_free_task_array(cTasks) return tasks, nil } // UpdateTask updates a task in the native index. func (q *NativeQueue) UpdateTask(task *Task) error { if q.handle == nil { return errors.New("queue not open") } cTask := taskToC(task) rc := C.qi_update_tasks(q.handle, &cTask, 1) if rc != 0 { err := C.qi_last_error(q.handle) if err != nil { return errors.New(C.GoString(err)) } return errors.New("failed to update task") } if depth, derr := q.QueueDepth(); derr == nil { UpdateQueueDepth(depth) } return nil } // UpdateTaskWithMetrics updates a task with metrics (same as UpdateTask for native). func (q *NativeQueue) UpdateTaskWithMetrics(task *Task, _ string) error { return q.UpdateTask(task) } // CancelTask cancels a task by updating its status. func (q *NativeQueue) CancelTask(taskID string) error { task, err := q.GetTask(taskID) if err != nil { return err } task.Status = "cancelled" now := time.Now().UTC() task.EndedAt = &now return q.UpdateTask(task) } // GetTaskByName retrieves the most recent task with the given job name. func (q *NativeQueue) GetTaskByName(jobName string) (*Task, error) { tasks, err := q.GetAllTasks() if err != nil { return nil, err } var best *Task for _, t := range tasks { if t == nil || t.JobName != jobName { continue } if best == nil || t.CreatedAt.After(best.CreatedAt) { best = t } } if best == nil { return nil, os.ErrNotExist } return best, nil } // QueueDepth returns the number of queued tasks. func (q *NativeQueue) QueueDepth() (int64, error) { if q.handle == nil { return 0, errors.New("queue not open") } cStatus := C.CString("queued") defer C.free(unsafe.Pointer(cStatus)) count := C.qi_get_task_count(q.handle, cStatus) return int64(count), nil } // Helper functions for C struct conversion func taskToC(task *Task) C.qi_task_t { var cTask C.qi_task_t copyStringToCBuffer(task.ID, cTask.id[:], 64) copyStringToCBuffer(task.JobName, cTask.job_name[:], 128) cTask.priority = C.int64_t(task.Priority) cTask.created_at = C.int64_t(task.CreatedAt.UnixNano()) if task.NextRetry != nil { cTask.next_retry = C.int64_t(task.NextRetry.UnixNano()) } else { cTask.next_retry = 0 } copyStringToCBuffer(task.Status, cTask.status[:], 16) cTask.retries = C.uint32_t(task.RetryCount) return cTask } func cToTask(cTask *C.qi_task_t) *Task { return &Task{ ID: C.GoString(&cTask.id[0]), JobName: C.GoString(&cTask.job_name[0]), Priority: int64(cTask.priority), CreatedAt: time.Unix(0, int64(cTask.created_at)), Status: C.GoString(&cTask.status[0]), RetryCount: int(cTask.retries), } } func copyStringToCBuffer(src string, dst []C.char, maxLen int) { n := len(src) if n > maxLen-1 { n = maxLen - 1 } for i := 0; i < n; i++ { dst[i] = C.char(src[i]) } dst[n] = 0 } // Stub implementations for queue.Backend interface func (q *NativeQueue) GetNextTaskWithLeaseBlocking(workerID string, leaseDuration, blockTimeout time.Duration) (*Task, error) { return nil, errors.New("blocking get not implemented for native queue") } func (q *NativeQueue) RetryTask(task *Task) error { if q.handle == nil { return errors.New("queue not open") } cID := C.CString(task.ID) defer C.free(unsafe.Pointer(cID)) var nextRetry int64 if task.NextRetry != nil { nextRetry = task.NextRetry.UnixNano() } rc := C.qi_retry_task(q.handle, cID, C.int64_t(nextRetry), C.uint32_t(task.MaxRetries)) if rc != 0 { err := C.qi_last_error(q.handle) if err != nil { return errors.New(C.GoString(err)) } return errors.New("failed to retry task") } RecordTaskRetry(task.JobName, domain.FailureUnknown) return nil } func (q *NativeQueue) MoveToDeadLetterQueue(task *Task, reason string) error { if q.handle == nil { return errors.New("queue not open") } cID := C.CString(task.ID) defer C.free(unsafe.Pointer(cID)) cReason := C.CString(reason) defer C.free(unsafe.Pointer(cReason)) rc := C.qi_move_to_dlq(q.handle, cID, cReason) if rc != 0 { err := C.qi_last_error(q.handle) if err != nil { return errors.New(C.GoString(err)) } return errors.New("failed to move task to DLQ") } RecordDLQAddition(reason) return nil } func (q *NativeQueue) RenewLease(taskID, workerID string, leaseDuration time.Duration) error { if q.handle == nil { return errors.New("queue not open") } cID := C.CString(taskID) defer C.free(unsafe.Pointer(cID)) cWorker := C.CString(workerID) defer C.free(unsafe.Pointer(cWorker)) expiry := time.Now().Add(leaseDuration).UnixNano() rc := C.qi_renew_lease(q.handle, cID, cWorker, C.int64_t(expiry)) if rc != 0 { err := C.qi_last_error(q.handle) if err != nil { return errors.New(C.GoString(err)) } return errors.New("failed to renew lease") } RecordLeaseRenewal(workerID) return nil } func (q *NativeQueue) ReleaseLease(taskID, workerID string) error { if q.handle == nil { return errors.New("queue not open") } cID := C.CString(taskID) defer C.free(unsafe.Pointer(cID)) cWorker := C.CString(workerID) defer C.free(unsafe.Pointer(cWorker)) rc := C.qi_release_lease(q.handle, cID, cWorker) if rc != 0 { err := C.qi_last_error(q.handle) if err != nil { return errors.New(C.GoString(err)) } return errors.New("failed to release lease") } return nil } func (q *NativeQueue) RecordMetric(_, _ string, _ float64) error { return nil } func (q *NativeQueue) Heartbeat(_ string) error { return nil } func (q *NativeQueue) SetWorkerPrewarmState(_ PrewarmState) error { return nil } func (q *NativeQueue) ClearWorkerPrewarmState(_ string) error { return nil } func (q *NativeQueue) GetWorkerPrewarmState(_ string) (*PrewarmState, error) { return nil, nil } func (q *NativeQueue) GetAllWorkerPrewarmStates() ([]PrewarmState, error) { return nil, nil } func (q *NativeQueue) SignalPrewarmGC() error { return nil } func (q *NativeQueue) PrewarmGCRequestValue() (string, error) { return "", nil }