// priority_queue.cpp - C++ style but using C-style storage #include "priority_queue.h" #include "../../common/include/secure_mem.h" #include #include using fetchml::common::safe_strncpy; PriorityQueueIndex::PriorityQueueIndex(const char* queue_dir) : heap_(entries_, EntryComparator{}) { // Initialize storage (returns false if path invalid, we ignore - open() will fail) storage_init(&storage_, queue_dir); } PriorityQueueIndex::~PriorityQueueIndex() { close(); } bool PriorityQueueIndex::open() { if (!storage_open(&storage_)) { safe_strncpy(last_error_, "Failed to open storage", sizeof(last_error_)); return false; } load_entries(); rebuild_heap(); return true; } void PriorityQueueIndex::close() { if (dirty_) { save(); } storage_close(&storage_); storage_cleanup(&storage_); entries_.clear(); heap_.clear(); } void PriorityQueueIndex::load_entries() { entries_.clear(); // Try memory-mapped access first if (storage_mmap_for_read(&storage_)) { size_t count = storage_mmap_entry_count(&storage_); const DiskEntry* disk_entries = storage_mmap_entries(&storage_); entries_.reserve(count); for (size_t i = 0; i < count; ++i) { IndexEntry entry; memcpy(&entry.task.id, disk_entries[i].id, 64); entry.task.id[63] = '\0'; // Ensure null termination memcpy(&entry.task.job_name, disk_entries[i].job_name, 128); entry.task.job_name[127] = '\0'; // Ensure null termination memcpy(&entry.task.status, disk_entries[i].status, 16); entry.task.status[15] = '\0'; entry.task.priority = disk_entries[i].priority; entry.task.created_at = disk_entries[i].created_at; entry.task.next_retry = disk_entries[i].next_retry; entry.offset = i; entry.dirty = false; entries_.push_back(entry); } } storage_munmap(&storage_); // Rebuild ID index after loading rebuild_id_index(); } void PriorityQueueIndex::rebuild_heap() { std::vector queued_indices; for (size_t i = 0; i < entries_.size(); ++i) { queued_indices.push_back(i); } heap_.build(queued_indices); } void PriorityQueueIndex::rebuild_id_index() { id_index_.clear(); id_index_.reserve(entries_.size()); for (size_t i = 0; i < entries_.size(); ++i) { id_index_[entries_[i].task.id] = i; } } int PriorityQueueIndex::add_tasks(const qi_task_t* tasks, uint32_t count) { // Validate batch size to prevent integer overflow (CVE-2025-0838) if (!tasks || count == 0) return 0; if (count > MAX_BATCH_SIZE) { safe_strncpy(last_error_, "Batch size exceeds maximum", sizeof(last_error_)); return -1; } std::lock_guard lock(mutex_); for (uint32_t i = 0; i < count; ++i) { IndexEntry entry; entry.task = tasks[i]; // Enforce null termination on all string fields entry.task.id[sizeof(entry.task.id) - 1] = '\0'; entry.task.job_name[sizeof(entry.task.job_name) - 1] = '\0'; entry.task.status[sizeof(entry.task.status) - 1] = '\0'; entry.offset = 0; entry.dirty = true; entries_.push_back(entry); } dirty_ = true; rebuild_heap(); return static_cast(count); } int PriorityQueueIndex::get_next_batch(qi_task_t* out_tasks, uint32_t max_count, uint32_t* out_count) { std::lock_guard lock(mutex_); uint32_t got = 0; while (got < max_count && !heap_.empty()) { size_t idx = heap_.pop(); if (idx >= entries_.size()) continue; out_tasks[got] = entries_[idx].task; got++; } if (out_count) { *out_count = got; } return 0; } int PriorityQueueIndex::save() { std::lock_guard lock(mutex_); // Convert entries to disk format std::vector disk_entries; disk_entries.reserve(entries_.size()); for (const auto& entry : entries_) { DiskEntry disk; memcpy(disk.id, entry.task.id, 64); memcpy(disk.job_name, entry.task.job_name, 128); memcpy(disk.status, entry.task.status, 16); disk.priority = entry.task.priority; disk.created_at = entry.task.created_at; disk.next_retry = entry.task.next_retry; memset(disk.reserved, 0, sizeof(disk.reserved)); disk_entries.push_back(disk); } if (!storage_write_entries(&storage_, disk_entries.data(), disk_entries.size())) { safe_strncpy(last_error_, "Failed to write entries", sizeof(last_error_)); return -1; } dirty_ = false; return 0; } int PriorityQueueIndex::get_all_tasks(qi_task_t** out_tasks, size_t* out_count) { std::lock_guard lock(mutex_); if (entries_.empty()) { *out_tasks = nullptr; *out_count = 0; return 0; } qi_task_t* tasks = new qi_task_t[entries_.size()]; for (size_t i = 0; i < entries_.size(); ++i) { tasks[i] = entries_[i].task; } *out_tasks = tasks; *out_count = entries_.size(); return 0; } // Get task by ID (O(1) lookup via hash map) int PriorityQueueIndex::get_task_by_id(const char* task_id, qi_task_t* out_task) { std::lock_guard lock(mutex_); if (!task_id || !out_task) return -1; auto it = id_index_.find(task_id); if (it == id_index_.end()) { safe_strncpy(last_error_, "Task not found", sizeof(last_error_)); return -1; } *out_task = entries_[it->second].task; return 0; } // Update tasks int PriorityQueueIndex::update_tasks(const qi_task_t* tasks, uint32_t count) { std::lock_guard lock(mutex_); if (!tasks || count == 0) return -1; for (uint32_t i = 0; i < count; ++i) { auto it = id_index_.find(tasks[i].id); if (it != id_index_.end()) { entries_[it->second].task = tasks[i]; // Enforce null termination on all string fields entries_[it->second].task.id[sizeof(entries_[it->second].task.id) - 1] = '\0'; entries_[it->second].task.job_name[sizeof(entries_[it->second].task.job_name) - 1] = '\0'; entries_[it->second].task.status[sizeof(entries_[it->second].task.status) - 1] = '\0'; entries_[it->second].dirty = true; } } dirty_ = true; rebuild_heap(); return static_cast(count); } // Remove tasks int PriorityQueueIndex::remove_tasks(const char** task_ids, uint32_t count) { std::lock_guard lock(mutex_); if (!task_ids || count == 0) return -1; int removed = 0; for (uint32_t i = 0; i < count; ++i) { if (!task_ids[i]) continue; auto it = id_index_.find(task_ids[i]); if (it != id_index_.end()) { size_t idx = it->second; // Swap with last and pop (fast removal) if (idx < entries_.size() - 1) { entries_[idx] = entries_.back(); id_index_[entries_[idx].task.id] = idx; } entries_.pop_back(); id_index_.erase(it); removed++; } } if (removed > 0) { dirty_ = true; rebuild_heap(); } return removed; } // Compact index (remove finished/failed tasks) int PriorityQueueIndex::compact_index() { std::lock_guard lock(mutex_); size_t original_size = entries_.size(); // Remove entries with "finished" or "failed" status auto new_end = std::remove_if(entries_.begin(), entries_.end(), [](const IndexEntry& e) { return strcmp(e.task.status, "finished") == 0 || strcmp(e.task.status, "failed") == 0; }); entries_.erase(new_end, entries_.end()); if (entries_.size() < original_size) { dirty_ = true; rebuild_id_index(); rebuild_heap(); } return static_cast(original_size - entries_.size()); } // Rebuild heap int PriorityQueueIndex::rebuild() { std::lock_guard lock(mutex_); rebuild_heap(); return 0; }