#include "queue_index.h" #include #include #include #include #include #include #include #include #include #include #include #include #include #include // Binary index file format // Magic: "FQI1" (4 bytes) // Version: uint64_t // Entry count: uint64_t // Entries: fixed-size records namespace fs = std::filesystem; constexpr char INDEX_MAGIC[] = "FQI1"; constexpr uint64_t CURRENT_VERSION = 1; // Arena allocator for hot path (no dynamic allocations) class ArenaAllocator { static constexpr size_t BUFFER_SIZE = 256 * 1024; // 256KB alignas(64) char buffer_[BUFFER_SIZE]; size_t offset_ = 0; bool in_use_ = false; public: void* allocate(size_t size, size_t align = 8) { size_t aligned = (offset_ + align - 1) & ~(align - 1); if (aligned + size > BUFFER_SIZE) { return nullptr; // Arena exhausted } void* ptr = buffer_ + aligned; offset_ = aligned + size; return ptr; } void reset() { offset_ = 0; } void begin() { in_use_ = true; reset(); } void end() { in_use_ = false; } }; // Thread-local arena for hot path operations thread_local ArenaAllocator g_arena; struct IndexEntry { qi_task_t task; uint64_t offset; // File offset for direct access bool dirty; // Modified since last save }; struct qi_index { std::string queue_dir; std::string index_path; std::string data_dir; // In-memory data structures std::vector entries; // Priority queue (max-heap by priority, then min-heap by created_at) std::vector heap; // Indices into entries // Thread safety mutable std::shared_mutex mutex; // Error state std::string last_error; // Stats uint64_t version = 0; int64_t mtime = 0; // Memory-mapped file void* mmap_ptr = nullptr; size_t mmap_size = 0; int mmap_fd = -1; }; // Heap comparator: higher priority first, then earlier created_at struct HeapComparator { const std::vector* entries; bool operator()(size_t a, size_t b) const { const auto& ta = (*entries)[a].task; const auto& tb = (*entries)[b].task; if (ta.priority != tb.priority) { return ta.priority < tb.priority; // Max-heap: higher priority first } return ta.created_at > tb.created_at; // Min-heap: earlier first } }; static void set_error(qi_index_t* idx, const char* msg) { if (idx) { idx->last_error = msg; } } // Ensure directory exists static bool ensure_dir(const char* path) { try { fs::create_directories(path); return true; } catch (...) { return false; } } // Build heap from entries static void rebuild_heap(qi_index_t* idx) { idx->heap.clear(); HeapComparator comp{&idx->entries}; for (size_t i = 0; i < idx->entries.size(); ++i) { if (std::strcmp(idx->entries[i].task.status, "queued") == 0) { idx->heap.push_back(i); } } std::make_heap(idx->heap.begin(), idx->heap.end(), comp); } // Write index to disk (binary format) static int write_index(qi_index_t* idx) { std::string tmp_path = idx->index_path + ".tmp"; int fd = open(tmp_path.c_str(), O_WRONLY | O_CREAT | O_TRUNC, 0640); if (fd < 0) { set_error(idx, "Failed to open index file for writing"); return -1; } // Write header write(fd, INDEX_MAGIC, 4); uint64_t version = CURRENT_VERSION; write(fd, &version, sizeof(version)); uint64_t count = idx->entries.size(); write(fd, &count, sizeof(count)); // Write entries for (const auto& entry : idx->entries) { write(fd, &entry.task, sizeof(qi_task_t)); } close(fd); // Atomic rename if (rename(tmp_path.c_str(), idx->index_path.c_str()) != 0) { set_error(idx, "Failed to rename index file"); return -1; } idx->version++; idx->mtime = time(nullptr); return 0; } // Read index from disk static int read_index(qi_index_t* idx) { int fd = open(idx->index_path.c_str(), O_RDONLY); if (fd < 0) { if (errno == ENOENT) { // No existing index - that's ok return 0; } set_error(idx, "Failed to open index file for reading"); return -1; } // Read header char magic[4]; if (read(fd, magic, 4) != 4 || std::memcmp(magic, INDEX_MAGIC, 4) != 0) { close(fd); set_error(idx, "Invalid index file magic"); return -1; } uint64_t version; if (read(fd, &version, sizeof(version)) != sizeof(version)) { close(fd); set_error(idx, "Failed to read index version"); return -1; } uint64_t count; if (read(fd, &count, sizeof(count)) != sizeof(count)) { close(fd); set_error(idx, "Failed to read entry count"); return -1; } // Read entries idx->entries.clear(); idx->entries.reserve(count); for (uint64_t i = 0; i < count; ++i) { IndexEntry entry; if (read(fd, &entry.task, sizeof(qi_task_t)) != sizeof(qi_task_t)) { close(fd); set_error(idx, "Failed to read entry"); return -1; } entry.offset = 0; entry.dirty = false; idx->entries.push_back(entry); } close(fd); rebuild_heap(idx); return 0; } // Scan data directory to rebuild index from files static int scan_data_directory(qi_index_t* idx) { idx->entries.clear(); try { for (const auto& entry : fs::directory_iterator(idx->data_dir)) { if (!entry.is_regular_file()) continue; auto path = entry.path(); if (path.extension() != ".json") continue; // Parse task from JSON file (simplified - just extract ID) // In full implementation, parse full JSON std::string filename = path.stem().string(); IndexEntry ie; std::strncpy(ie.task.id, filename.c_str(), sizeof(ie.task.id) - 1); ie.task.id[sizeof(ie.task.id) - 1] = '\0'; std::strcpy(ie.task.status, "queued"); ie.offset = 0; ie.dirty = false; idx->entries.push_back(ie); } } catch (...) { set_error(idx, "Failed to scan data directory"); return -1; } rebuild_heap(idx); return write_index(idx); } // C API Implementation qi_index_t* qi_open(const char* queue_dir) { if (!queue_dir) return nullptr; auto* idx = new qi_index_t; idx->queue_dir = queue_dir; idx->index_path = (fs::path(queue_dir) / "pending" / ".queue.bin").string(); idx->data_dir = (fs::path(queue_dir) / "pending" / "entries").string(); // Ensure directories exist if (!ensure_dir(idx->data_dir.c_str())) { delete idx; return nullptr; } // Try to read existing index, or build from directory if (read_index(idx) != 0) { // Build from scratch if (scan_data_directory(idx) != 0) { delete idx; return nullptr; } } return idx; } void qi_close(qi_index_t* idx) { if (!idx) return; // Sync any pending changes std::unique_lock lock(idx->mutex); write_index(idx); delete idx; } int qi_add_tasks(qi_index_t* idx, const qi_task_t* tasks, uint32_t count) { if (!idx || !tasks || count == 0) return -1; std::unique_lock lock(idx->mutex); // Use arena for temporary allocation g_arena.begin(); // Add entries for (uint32_t i = 0; i < count; ++i) { IndexEntry entry; entry.task = tasks[i]; entry.offset = 0; entry.dirty = true; idx->entries.push_back(entry); // Add to heap if queued if (std::strcmp(tasks[i].status, "queued") == 0) { idx->heap.push_back(idx->entries.size() - 1); HeapComparator comp{&idx->entries}; std::push_heap(idx->heap.begin(), idx->heap.end(), comp); } } g_arena.end(); // Persist return write_index(idx); } int qi_get_next_batch(qi_index_t* idx, qi_task_t* out_tasks, uint32_t max_count, uint32_t* out_count) { if (!idx || !out_tasks || max_count == 0 || !out_count) return -1; std::unique_lock lock(idx->mutex); *out_count = 0; HeapComparator comp{&idx->entries}; while (*out_count < max_count && !idx->heap.empty()) { // Pop highest priority task std::pop_heap(idx->heap.begin(), idx->heap.end(), comp); size_t entry_idx = idx->heap.back(); idx->heap.pop_back(); // Copy to output out_tasks[*out_count] = idx->entries[entry_idx].task; // Mark as running std::strcpy(idx->entries[entry_idx].task.status, "running"); idx->entries[entry_idx].dirty = true; (*out_count)++; } if (*out_count > 0) { write_index(idx); } return 0; } int qi_get_task_by_id(qi_index_t* idx, const char* task_id, qi_task_t* out_task) { if (!idx || !task_id || !out_task) return -1; std::shared_lock lock(idx->mutex); for (const auto& entry : idx->entries) { if (std::strcmp(entry.task.id, task_id) == 0) { *out_task = entry.task; return 0; } } return -1; // Not found } int qi_get_all_tasks(qi_index_t* idx, qi_task_t** out_tasks, size_t* count) { if (!idx || !out_tasks || !count) return -1; std::shared_lock lock(idx->mutex); *count = idx->entries.size(); if (*count == 0) { *out_tasks = nullptr; return 0; } *out_tasks = new qi_task_t[*count]; for (size_t i = 0; i < *count; ++i) { (*out_tasks)[i] = idx->entries[i].task; } return 0; } void qi_free_task_array(qi_task_t* tasks) { delete[] tasks; } const char* qi_last_error(qi_index_t* idx) { if (!idx || idx->last_error.empty()) return nullptr; return idx->last_error.c_str(); } void qi_clear_error(qi_index_t* idx) { if (idx) { idx->last_error.clear(); } } uint64_t qi_get_index_version(qi_index_t* idx) { if (!idx) return 0; std::shared_lock lock(idx->mutex); return idx->version; } size_t qi_get_task_count(qi_index_t* idx, const char* status) { if (!idx) return 0; std::shared_lock lock(idx->mutex); if (!status || status[0] == '\0') { return idx->entries.size(); } size_t count = 0; for (const auto& entry : idx->entries) { if (std::strcmp(entry.task.status, status) == 0) { count++; } } return count; }