fetch_ml/native/queue_index/queue_index.cpp
Jeremie Fraeys d408a60eb1
Some checks failed
Documentation / build-and-publish (push) Waiting to run
Test / test (push) Waiting to run
Checkout test / test (push) Successful in 5s
CI with Native Libraries / test-native (push) Has been cancelled
CI with Native Libraries / build-release (push) Has been cancelled
ci: push all workflow updates
2026-02-12 13:28:15 -05:00

427 lines
11 KiB
C++

#include "queue_index.h"
#include <algorithm>
#include <cerrno>
#include <cstring>
#include <fcntl.h>
#include <filesystem>
#include <fstream>
#include <mutex>
#include <shared_mutex>
#include <string>
#include <string_view>
#include <sys/mman.h>
#include <sys/stat.h>
#include <unistd.h>
#include <vector>
// Binary index file format
// Magic: "FQI1" (4 bytes)
// Version: uint64_t
// Entry count: uint64_t
// Entries: fixed-size records
namespace fs = std::filesystem;
constexpr char INDEX_MAGIC[] = "FQI1";
constexpr uint64_t CURRENT_VERSION = 1;
// Arena allocator for hot path (no dynamic allocations)
class ArenaAllocator {
static constexpr size_t BUFFER_SIZE = 256 * 1024; // 256KB
alignas(64) char buffer_[BUFFER_SIZE];
size_t offset_ = 0;
bool in_use_ = false;
public:
void* allocate(size_t size, size_t align = 8) {
size_t aligned = (offset_ + align - 1) & ~(align - 1);
if (aligned + size > BUFFER_SIZE) {
return nullptr; // Arena exhausted
}
void* ptr = buffer_ + aligned;
offset_ = aligned + size;
return ptr;
}
void reset() { offset_ = 0; }
void begin() { in_use_ = true; reset(); }
void end() { in_use_ = false; }
};
// Thread-local arena for hot path operations
thread_local ArenaAllocator g_arena;
struct IndexEntry {
qi_task_t task;
uint64_t offset; // File offset for direct access
bool dirty; // Modified since last save
};
struct qi_index {
std::string queue_dir;
std::string index_path;
std::string data_dir;
// In-memory data structures
std::vector<IndexEntry> entries;
// Priority queue (max-heap by priority, then min-heap by created_at)
std::vector<size_t> heap; // Indices into entries
// Thread safety
mutable std::shared_mutex mutex;
// Error state
std::string last_error;
// Stats
uint64_t version = 0;
int64_t mtime = 0;
// Memory-mapped file
void* mmap_ptr = nullptr;
size_t mmap_size = 0;
int mmap_fd = -1;
};
// Heap comparator: higher priority first, then earlier created_at
struct HeapComparator {
const std::vector<IndexEntry>* entries;
bool operator()(size_t a, size_t b) const {
const auto& ta = (*entries)[a].task;
const auto& tb = (*entries)[b].task;
if (ta.priority != tb.priority) {
return ta.priority < tb.priority; // Max-heap: higher priority first
}
return ta.created_at > tb.created_at; // Min-heap: earlier first
}
};
static void set_error(qi_index_t* idx, const char* msg) {
if (idx) {
idx->last_error = msg;
}
}
// Ensure directory exists
static bool ensure_dir(const char* path) {
try {
fs::create_directories(path);
return true;
} catch (...) {
return false;
}
}
// Build heap from entries
static void rebuild_heap(qi_index_t* idx) {
idx->heap.clear();
HeapComparator comp{&idx->entries};
for (size_t i = 0; i < idx->entries.size(); ++i) {
if (std::strcmp(idx->entries[i].task.status, "queued") == 0) {
idx->heap.push_back(i);
}
}
std::make_heap(idx->heap.begin(), idx->heap.end(), comp);
}
// Write index to disk (binary format)
static int write_index(qi_index_t* idx) {
std::string tmp_path = idx->index_path + ".tmp";
int fd = open(tmp_path.c_str(), O_WRONLY | O_CREAT | O_TRUNC, 0640);
if (fd < 0) {
set_error(idx, "Failed to open index file for writing");
return -1;
}
// Write header
write(fd, INDEX_MAGIC, 4);
uint64_t version = CURRENT_VERSION;
write(fd, &version, sizeof(version));
uint64_t count = idx->entries.size();
write(fd, &count, sizeof(count));
// Write entries
for (const auto& entry : idx->entries) {
write(fd, &entry.task, sizeof(qi_task_t));
}
close(fd);
// Atomic rename
if (rename(tmp_path.c_str(), idx->index_path.c_str()) != 0) {
set_error(idx, "Failed to rename index file");
return -1;
}
idx->version++;
idx->mtime = time(nullptr);
return 0;
}
// Read index from disk
static int read_index(qi_index_t* idx) {
int fd = open(idx->index_path.c_str(), O_RDONLY);
if (fd < 0) {
if (errno == ENOENT) {
// No existing index - that's ok
return 0;
}
set_error(idx, "Failed to open index file for reading");
return -1;
}
// Read header
char magic[4];
if (read(fd, magic, 4) != 4 || std::memcmp(magic, INDEX_MAGIC, 4) != 0) {
close(fd);
set_error(idx, "Invalid index file magic");
return -1;
}
uint64_t version;
if (read(fd, &version, sizeof(version)) != sizeof(version)) {
close(fd);
set_error(idx, "Failed to read index version");
return -1;
}
uint64_t count;
if (read(fd, &count, sizeof(count)) != sizeof(count)) {
close(fd);
set_error(idx, "Failed to read entry count");
return -1;
}
// Read entries
idx->entries.clear();
idx->entries.reserve(count);
for (uint64_t i = 0; i < count; ++i) {
IndexEntry entry;
if (read(fd, &entry.task, sizeof(qi_task_t)) != sizeof(qi_task_t)) {
close(fd);
set_error(idx, "Failed to read entry");
return -1;
}
entry.offset = 0;
entry.dirty = false;
idx->entries.push_back(entry);
}
close(fd);
rebuild_heap(idx);
return 0;
}
// Scan data directory to rebuild index from files
static int scan_data_directory(qi_index_t* idx) {
idx->entries.clear();
try {
for (const auto& entry : fs::directory_iterator(idx->data_dir)) {
if (!entry.is_regular_file()) continue;
auto path = entry.path();
if (path.extension() != ".json") continue;
// Parse task from JSON file (simplified - just extract ID)
// In full implementation, parse full JSON
std::string filename = path.stem().string();
IndexEntry ie;
std::strncpy(ie.task.id, filename.c_str(), sizeof(ie.task.id) - 1);
ie.task.id[sizeof(ie.task.id) - 1] = '\0';
std::strcpy(ie.task.status, "queued");
ie.offset = 0;
ie.dirty = false;
idx->entries.push_back(ie);
}
} catch (...) {
set_error(idx, "Failed to scan data directory");
return -1;
}
rebuild_heap(idx);
return write_index(idx);
}
// C API Implementation
qi_index_t* qi_open(const char* queue_dir) {
if (!queue_dir) return nullptr;
auto* idx = new qi_index_t;
idx->queue_dir = queue_dir;
idx->index_path = (fs::path(queue_dir) / "pending" / ".queue.bin").string();
idx->data_dir = (fs::path(queue_dir) / "pending" / "entries").string();
// Ensure directories exist
if (!ensure_dir(idx->data_dir.c_str())) {
delete idx;
return nullptr;
}
// Try to read existing index, or build from directory
if (read_index(idx) != 0) {
// Build from scratch
if (scan_data_directory(idx) != 0) {
delete idx;
return nullptr;
}
}
return idx;
}
void qi_close(qi_index_t* idx) {
if (!idx) return;
// Sync any pending changes
std::unique_lock lock(idx->mutex);
write_index(idx);
delete idx;
}
int qi_add_tasks(qi_index_t* idx, const qi_task_t* tasks, uint32_t count) {
if (!idx || !tasks || count == 0) return -1;
std::unique_lock lock(idx->mutex);
// Use arena for temporary allocation
g_arena.begin();
// Add entries
for (uint32_t i = 0; i < count; ++i) {
IndexEntry entry;
entry.task = tasks[i];
entry.offset = 0;
entry.dirty = true;
idx->entries.push_back(entry);
// Add to heap if queued
if (std::strcmp(tasks[i].status, "queued") == 0) {
idx->heap.push_back(idx->entries.size() - 1);
HeapComparator comp{&idx->entries};
std::push_heap(idx->heap.begin(), idx->heap.end(), comp);
}
}
g_arena.end();
// Persist
return write_index(idx);
}
int qi_get_next_batch(qi_index_t* idx, qi_task_t* out_tasks, uint32_t max_count, uint32_t* out_count) {
if (!idx || !out_tasks || max_count == 0 || !out_count) return -1;
std::unique_lock lock(idx->mutex);
*out_count = 0;
HeapComparator comp{&idx->entries};
while (*out_count < max_count && !idx->heap.empty()) {
// Pop highest priority task
std::pop_heap(idx->heap.begin(), idx->heap.end(), comp);
size_t entry_idx = idx->heap.back();
idx->heap.pop_back();
// Copy to output
out_tasks[*out_count] = idx->entries[entry_idx].task;
// Mark as running
std::strcpy(idx->entries[entry_idx].task.status, "running");
idx->entries[entry_idx].dirty = true;
(*out_count)++;
}
if (*out_count > 0) {
write_index(idx);
}
return 0;
}
int qi_get_task_by_id(qi_index_t* idx, const char* task_id, qi_task_t* out_task) {
if (!idx || !task_id || !out_task) return -1;
std::shared_lock lock(idx->mutex);
for (const auto& entry : idx->entries) {
if (std::strcmp(entry.task.id, task_id) == 0) {
*out_task = entry.task;
return 0;
}
}
return -1; // Not found
}
int qi_get_all_tasks(qi_index_t* idx, qi_task_t** out_tasks, size_t* count) {
if (!idx || !out_tasks || !count) return -1;
std::shared_lock lock(idx->mutex);
*count = idx->entries.size();
if (*count == 0) {
*out_tasks = nullptr;
return 0;
}
*out_tasks = new qi_task_t[*count];
for (size_t i = 0; i < *count; ++i) {
(*out_tasks)[i] = idx->entries[i].task;
}
return 0;
}
void qi_free_task_array(qi_task_t* tasks) {
delete[] tasks;
}
const char* qi_last_error(qi_index_t* idx) {
if (!idx || idx->last_error.empty()) return nullptr;
return idx->last_error.c_str();
}
void qi_clear_error(qi_index_t* idx) {
if (idx) {
idx->last_error.clear();
}
}
uint64_t qi_get_index_version(qi_index_t* idx) {
if (!idx) return 0;
std::shared_lock lock(idx->mutex);
return idx->version;
}
size_t qi_get_task_count(qi_index_t* idx, const char* status) {
if (!idx) return 0;
std::shared_lock lock(idx->mutex);
if (!status || status[0] == '\0') {
return idx->entries.size();
}
size_t count = 0;
for (const auto& entry : idx->entries) {
if (std::strcmp(entry.task.status, status) == 0) {
count++;
}
}
return count;
}