Some checks failed
Documentation / build-and-publish (push) Waiting to run
Test / test (push) Waiting to run
Checkout test / test (push) Successful in 5s
CI with Native Libraries / test-native (push) Has been cancelled
CI with Native Libraries / build-release (push) Has been cancelled
427 lines
11 KiB
C++
427 lines
11 KiB
C++
#include "queue_index.h"
|
|
|
|
#include <algorithm>
|
|
#include <cerrno>
|
|
#include <cstring>
|
|
#include <fcntl.h>
|
|
#include <filesystem>
|
|
#include <fstream>
|
|
#include <mutex>
|
|
#include <shared_mutex>
|
|
#include <string>
|
|
#include <string_view>
|
|
#include <sys/mman.h>
|
|
#include <sys/stat.h>
|
|
#include <unistd.h>
|
|
#include <vector>
|
|
|
|
// Binary index file format
|
|
// Magic: "FQI1" (4 bytes)
|
|
// Version: uint64_t
|
|
// Entry count: uint64_t
|
|
// Entries: fixed-size records
|
|
|
|
namespace fs = std::filesystem;
|
|
|
|
constexpr char INDEX_MAGIC[] = "FQI1";
|
|
constexpr uint64_t CURRENT_VERSION = 1;
|
|
|
|
// Arena allocator for hot path (no dynamic allocations)
|
|
class ArenaAllocator {
|
|
static constexpr size_t BUFFER_SIZE = 256 * 1024; // 256KB
|
|
alignas(64) char buffer_[BUFFER_SIZE];
|
|
size_t offset_ = 0;
|
|
bool in_use_ = false;
|
|
|
|
public:
|
|
void* allocate(size_t size, size_t align = 8) {
|
|
size_t aligned = (offset_ + align - 1) & ~(align - 1);
|
|
if (aligned + size > BUFFER_SIZE) {
|
|
return nullptr; // Arena exhausted
|
|
}
|
|
void* ptr = buffer_ + aligned;
|
|
offset_ = aligned + size;
|
|
return ptr;
|
|
}
|
|
|
|
void reset() { offset_ = 0; }
|
|
|
|
void begin() { in_use_ = true; reset(); }
|
|
void end() { in_use_ = false; }
|
|
};
|
|
|
|
// Thread-local arena for hot path operations
|
|
thread_local ArenaAllocator g_arena;
|
|
|
|
struct IndexEntry {
|
|
qi_task_t task;
|
|
uint64_t offset; // File offset for direct access
|
|
bool dirty; // Modified since last save
|
|
};
|
|
|
|
struct qi_index {
|
|
std::string queue_dir;
|
|
std::string index_path;
|
|
std::string data_dir;
|
|
|
|
// In-memory data structures
|
|
std::vector<IndexEntry> entries;
|
|
|
|
// Priority queue (max-heap by priority, then min-heap by created_at)
|
|
std::vector<size_t> heap; // Indices into entries
|
|
|
|
// Thread safety
|
|
mutable std::shared_mutex mutex;
|
|
|
|
// Error state
|
|
std::string last_error;
|
|
|
|
// Stats
|
|
uint64_t version = 0;
|
|
int64_t mtime = 0;
|
|
|
|
// Memory-mapped file
|
|
void* mmap_ptr = nullptr;
|
|
size_t mmap_size = 0;
|
|
int mmap_fd = -1;
|
|
};
|
|
|
|
// Heap comparator: higher priority first, then earlier created_at
|
|
struct HeapComparator {
|
|
const std::vector<IndexEntry>* entries;
|
|
|
|
bool operator()(size_t a, size_t b) const {
|
|
const auto& ta = (*entries)[a].task;
|
|
const auto& tb = (*entries)[b].task;
|
|
if (ta.priority != tb.priority) {
|
|
return ta.priority < tb.priority; // Max-heap: higher priority first
|
|
}
|
|
return ta.created_at > tb.created_at; // Min-heap: earlier first
|
|
}
|
|
};
|
|
|
|
static void set_error(qi_index_t* idx, const char* msg) {
|
|
if (idx) {
|
|
idx->last_error = msg;
|
|
}
|
|
}
|
|
|
|
// Ensure directory exists
|
|
static bool ensure_dir(const char* path) {
|
|
try {
|
|
fs::create_directories(path);
|
|
return true;
|
|
} catch (...) {
|
|
return false;
|
|
}
|
|
}
|
|
|
|
// Build heap from entries
|
|
static void rebuild_heap(qi_index_t* idx) {
|
|
idx->heap.clear();
|
|
HeapComparator comp{&idx->entries};
|
|
|
|
for (size_t i = 0; i < idx->entries.size(); ++i) {
|
|
if (std::strcmp(idx->entries[i].task.status, "queued") == 0) {
|
|
idx->heap.push_back(i);
|
|
}
|
|
}
|
|
std::make_heap(idx->heap.begin(), idx->heap.end(), comp);
|
|
}
|
|
|
|
// Write index to disk (binary format)
|
|
static int write_index(qi_index_t* idx) {
|
|
std::string tmp_path = idx->index_path + ".tmp";
|
|
|
|
int fd = open(tmp_path.c_str(), O_WRONLY | O_CREAT | O_TRUNC, 0640);
|
|
if (fd < 0) {
|
|
set_error(idx, "Failed to open index file for writing");
|
|
return -1;
|
|
}
|
|
|
|
// Write header
|
|
write(fd, INDEX_MAGIC, 4);
|
|
uint64_t version = CURRENT_VERSION;
|
|
write(fd, &version, sizeof(version));
|
|
uint64_t count = idx->entries.size();
|
|
write(fd, &count, sizeof(count));
|
|
|
|
// Write entries
|
|
for (const auto& entry : idx->entries) {
|
|
write(fd, &entry.task, sizeof(qi_task_t));
|
|
}
|
|
|
|
close(fd);
|
|
|
|
// Atomic rename
|
|
if (rename(tmp_path.c_str(), idx->index_path.c_str()) != 0) {
|
|
set_error(idx, "Failed to rename index file");
|
|
return -1;
|
|
}
|
|
|
|
idx->version++;
|
|
idx->mtime = time(nullptr);
|
|
|
|
return 0;
|
|
}
|
|
|
|
// Read index from disk
|
|
static int read_index(qi_index_t* idx) {
|
|
int fd = open(idx->index_path.c_str(), O_RDONLY);
|
|
if (fd < 0) {
|
|
if (errno == ENOENT) {
|
|
// No existing index - that's ok
|
|
return 0;
|
|
}
|
|
set_error(idx, "Failed to open index file for reading");
|
|
return -1;
|
|
}
|
|
|
|
// Read header
|
|
char magic[4];
|
|
if (read(fd, magic, 4) != 4 || std::memcmp(magic, INDEX_MAGIC, 4) != 0) {
|
|
close(fd);
|
|
set_error(idx, "Invalid index file magic");
|
|
return -1;
|
|
}
|
|
|
|
uint64_t version;
|
|
if (read(fd, &version, sizeof(version)) != sizeof(version)) {
|
|
close(fd);
|
|
set_error(idx, "Failed to read index version");
|
|
return -1;
|
|
}
|
|
|
|
uint64_t count;
|
|
if (read(fd, &count, sizeof(count)) != sizeof(count)) {
|
|
close(fd);
|
|
set_error(idx, "Failed to read entry count");
|
|
return -1;
|
|
}
|
|
|
|
// Read entries
|
|
idx->entries.clear();
|
|
idx->entries.reserve(count);
|
|
|
|
for (uint64_t i = 0; i < count; ++i) {
|
|
IndexEntry entry;
|
|
if (read(fd, &entry.task, sizeof(qi_task_t)) != sizeof(qi_task_t)) {
|
|
close(fd);
|
|
set_error(idx, "Failed to read entry");
|
|
return -1;
|
|
}
|
|
entry.offset = 0;
|
|
entry.dirty = false;
|
|
idx->entries.push_back(entry);
|
|
}
|
|
|
|
close(fd);
|
|
|
|
rebuild_heap(idx);
|
|
|
|
return 0;
|
|
}
|
|
|
|
// Scan data directory to rebuild index from files
|
|
static int scan_data_directory(qi_index_t* idx) {
|
|
idx->entries.clear();
|
|
|
|
try {
|
|
for (const auto& entry : fs::directory_iterator(idx->data_dir)) {
|
|
if (!entry.is_regular_file()) continue;
|
|
|
|
auto path = entry.path();
|
|
if (path.extension() != ".json") continue;
|
|
|
|
// Parse task from JSON file (simplified - just extract ID)
|
|
// In full implementation, parse full JSON
|
|
std::string filename = path.stem().string();
|
|
|
|
IndexEntry ie;
|
|
std::strncpy(ie.task.id, filename.c_str(), sizeof(ie.task.id) - 1);
|
|
ie.task.id[sizeof(ie.task.id) - 1] = '\0';
|
|
std::strcpy(ie.task.status, "queued");
|
|
ie.offset = 0;
|
|
ie.dirty = false;
|
|
idx->entries.push_back(ie);
|
|
}
|
|
} catch (...) {
|
|
set_error(idx, "Failed to scan data directory");
|
|
return -1;
|
|
}
|
|
|
|
rebuild_heap(idx);
|
|
|
|
return write_index(idx);
|
|
}
|
|
|
|
// C API Implementation
|
|
|
|
qi_index_t* qi_open(const char* queue_dir) {
|
|
if (!queue_dir) return nullptr;
|
|
|
|
auto* idx = new qi_index_t;
|
|
idx->queue_dir = queue_dir;
|
|
idx->index_path = (fs::path(queue_dir) / "pending" / ".queue.bin").string();
|
|
idx->data_dir = (fs::path(queue_dir) / "pending" / "entries").string();
|
|
|
|
// Ensure directories exist
|
|
if (!ensure_dir(idx->data_dir.c_str())) {
|
|
delete idx;
|
|
return nullptr;
|
|
}
|
|
|
|
// Try to read existing index, or build from directory
|
|
if (read_index(idx) != 0) {
|
|
// Build from scratch
|
|
if (scan_data_directory(idx) != 0) {
|
|
delete idx;
|
|
return nullptr;
|
|
}
|
|
}
|
|
|
|
return idx;
|
|
}
|
|
|
|
void qi_close(qi_index_t* idx) {
|
|
if (!idx) return;
|
|
|
|
// Sync any pending changes
|
|
std::unique_lock lock(idx->mutex);
|
|
write_index(idx);
|
|
|
|
delete idx;
|
|
}
|
|
|
|
int qi_add_tasks(qi_index_t* idx, const qi_task_t* tasks, uint32_t count) {
|
|
if (!idx || !tasks || count == 0) return -1;
|
|
|
|
std::unique_lock lock(idx->mutex);
|
|
|
|
// Use arena for temporary allocation
|
|
g_arena.begin();
|
|
|
|
// Add entries
|
|
for (uint32_t i = 0; i < count; ++i) {
|
|
IndexEntry entry;
|
|
entry.task = tasks[i];
|
|
entry.offset = 0;
|
|
entry.dirty = true;
|
|
idx->entries.push_back(entry);
|
|
|
|
// Add to heap if queued
|
|
if (std::strcmp(tasks[i].status, "queued") == 0) {
|
|
idx->heap.push_back(idx->entries.size() - 1);
|
|
HeapComparator comp{&idx->entries};
|
|
std::push_heap(idx->heap.begin(), idx->heap.end(), comp);
|
|
}
|
|
}
|
|
|
|
g_arena.end();
|
|
|
|
// Persist
|
|
return write_index(idx);
|
|
}
|
|
|
|
int qi_get_next_batch(qi_index_t* idx, qi_task_t* out_tasks, uint32_t max_count, uint32_t* out_count) {
|
|
if (!idx || !out_tasks || max_count == 0 || !out_count) return -1;
|
|
|
|
std::unique_lock lock(idx->mutex);
|
|
|
|
*out_count = 0;
|
|
HeapComparator comp{&idx->entries};
|
|
|
|
while (*out_count < max_count && !idx->heap.empty()) {
|
|
// Pop highest priority task
|
|
std::pop_heap(idx->heap.begin(), idx->heap.end(), comp);
|
|
size_t entry_idx = idx->heap.back();
|
|
idx->heap.pop_back();
|
|
|
|
// Copy to output
|
|
out_tasks[*out_count] = idx->entries[entry_idx].task;
|
|
|
|
// Mark as running
|
|
std::strcpy(idx->entries[entry_idx].task.status, "running");
|
|
idx->entries[entry_idx].dirty = true;
|
|
|
|
(*out_count)++;
|
|
}
|
|
|
|
if (*out_count > 0) {
|
|
write_index(idx);
|
|
}
|
|
|
|
return 0;
|
|
}
|
|
|
|
int qi_get_task_by_id(qi_index_t* idx, const char* task_id, qi_task_t* out_task) {
|
|
if (!idx || !task_id || !out_task) return -1;
|
|
|
|
std::shared_lock lock(idx->mutex);
|
|
|
|
for (const auto& entry : idx->entries) {
|
|
if (std::strcmp(entry.task.id, task_id) == 0) {
|
|
*out_task = entry.task;
|
|
return 0;
|
|
}
|
|
}
|
|
|
|
return -1; // Not found
|
|
}
|
|
|
|
int qi_get_all_tasks(qi_index_t* idx, qi_task_t** out_tasks, size_t* count) {
|
|
if (!idx || !out_tasks || !count) return -1;
|
|
|
|
std::shared_lock lock(idx->mutex);
|
|
|
|
*count = idx->entries.size();
|
|
if (*count == 0) {
|
|
*out_tasks = nullptr;
|
|
return 0;
|
|
}
|
|
|
|
*out_tasks = new qi_task_t[*count];
|
|
for (size_t i = 0; i < *count; ++i) {
|
|
(*out_tasks)[i] = idx->entries[i].task;
|
|
}
|
|
|
|
return 0;
|
|
}
|
|
|
|
void qi_free_task_array(qi_task_t* tasks) {
|
|
delete[] tasks;
|
|
}
|
|
|
|
const char* qi_last_error(qi_index_t* idx) {
|
|
if (!idx || idx->last_error.empty()) return nullptr;
|
|
return idx->last_error.c_str();
|
|
}
|
|
|
|
void qi_clear_error(qi_index_t* idx) {
|
|
if (idx) {
|
|
idx->last_error.clear();
|
|
}
|
|
}
|
|
|
|
uint64_t qi_get_index_version(qi_index_t* idx) {
|
|
if (!idx) return 0;
|
|
std::shared_lock lock(idx->mutex);
|
|
return idx->version;
|
|
}
|
|
|
|
size_t qi_get_task_count(qi_index_t* idx, const char* status) {
|
|
if (!idx) return 0;
|
|
std::shared_lock lock(idx->mutex);
|
|
|
|
if (!status || status[0] == '\0') {
|
|
return idx->entries.size();
|
|
}
|
|
|
|
size_t count = 0;
|
|
for (const auto& entry : idx->entries) {
|
|
if (std::strcmp(entry.task.status, status) == 0) {
|
|
count++;
|
|
}
|
|
}
|
|
return count;
|
|
}
|