fetch_ml/native/queue_index/index/priority_queue.cpp
Jeremie Fraeys 7efe8bbfbf
native: security hardening, research trustworthiness, and CVE mitigations
Security Fixes:
- CVE-2024-45339: Add O_EXCL flag to temp file creation in storage_write_entries()
  Prevents symlink attacks on predictable .tmp file paths
- CVE-2025-47290: Use openat_nofollow() in storage_open()
  Closes TOCTOU race condition via path_sanitizer infrastructure
- CVE-2025-0838: Add MAX_BATCH_SIZE=10000 to add_tasks()
  Prevents integer overflow in batch operations

Research Trustworthiness (dataset_hash):
- Deterministic file ordering: std::sort after collect_files()
- Recursive directory traversal: depth-limited with cycle detection
- Documented exclusions: hidden files and special files noted in API

Bug Fixes:
- R1: storage_init path validation for non-existent directories
- R2: safe_strncpy return value check before strcat
- R3: parallel_hash 256-file cap replaced with std::vector
- R4: wire qi_compact_index/qi_rebuild_index stubs
- R5: CompletionLatch race condition fix (hold mutex during decrement)
- R6: ARMv8 SHA256 transform fix (save abcd_pre before vsha256hq_u32)
- R7: fuzz_index_storage header format fix
- R8: enforce null termination in add_tasks/update_tasks
- R9: use 64 bytes (not 65) in combined hash to exclude null terminator
- R10: status field persistence in save()

New Tests:
- test_recursive_dataset.cpp: Verify deterministic recursive hashing
- test_storage_symlink_resistance.cpp: Verify CVE-2024-45339 fix
- test_queue_index_batch_limit.cpp: Verify CVE-2025-0838 fix
- test_sha256_arm_kat.cpp: ARMv8 known-answer tests
- test_storage_init_new_dir.cpp: F1 verification
- test_parallel_hash_large_dir.cpp: F3 verification
- test_queue_index_compact.cpp: F4 verification

All 8 native tests passing. Library ready for research lab deployment.
2026-02-21 13:33:45 -05:00

284 lines
8.2 KiB
C++

// priority_queue.cpp - C++ style but using C-style storage
#include "priority_queue.h"
#include "../../common/include/secure_mem.h"
#include <algorithm>
#include <cstring>
using fetchml::common::safe_strncpy;
PriorityQueueIndex::PriorityQueueIndex(const char* queue_dir)
: heap_(entries_, EntryComparator{}) {
// Initialize storage (returns false if path invalid, we ignore - open() will fail)
storage_init(&storage_, queue_dir);
}
PriorityQueueIndex::~PriorityQueueIndex() {
close();
}
bool PriorityQueueIndex::open() {
if (!storage_open(&storage_)) {
safe_strncpy(last_error_, "Failed to open storage", sizeof(last_error_));
return false;
}
load_entries();
rebuild_heap();
return true;
}
void PriorityQueueIndex::close() {
if (dirty_) {
save();
}
storage_close(&storage_);
storage_cleanup(&storage_);
entries_.clear();
heap_.clear();
}
void PriorityQueueIndex::load_entries() {
entries_.clear();
// Try memory-mapped access first
if (storage_mmap_for_read(&storage_)) {
size_t count = storage_mmap_entry_count(&storage_);
const DiskEntry* disk_entries = storage_mmap_entries(&storage_);
entries_.reserve(count);
for (size_t i = 0; i < count; ++i) {
IndexEntry entry;
memcpy(&entry.task.id, disk_entries[i].id, 64);
entry.task.id[63] = '\0'; // Ensure null termination
memcpy(&entry.task.job_name, disk_entries[i].job_name, 128);
entry.task.job_name[127] = '\0'; // Ensure null termination
memcpy(&entry.task.status, disk_entries[i].status, 16);
entry.task.status[15] = '\0';
entry.task.priority = disk_entries[i].priority;
entry.task.created_at = disk_entries[i].created_at;
entry.task.next_retry = disk_entries[i].next_retry;
entry.offset = i;
entry.dirty = false;
entries_.push_back(entry);
}
}
storage_munmap(&storage_);
// Rebuild ID index after loading
rebuild_id_index();
}
void PriorityQueueIndex::rebuild_heap() {
std::vector<size_t> queued_indices;
for (size_t i = 0; i < entries_.size(); ++i) {
queued_indices.push_back(i);
}
heap_.build(queued_indices);
}
void PriorityQueueIndex::rebuild_id_index() {
id_index_.clear();
id_index_.reserve(entries_.size());
for (size_t i = 0; i < entries_.size(); ++i) {
id_index_[entries_[i].task.id] = i;
}
}
int PriorityQueueIndex::add_tasks(const qi_task_t* tasks, uint32_t count) {
// Validate batch size to prevent integer overflow (CVE-2025-0838)
if (!tasks || count == 0) return 0;
if (count > MAX_BATCH_SIZE) {
safe_strncpy(last_error_, "Batch size exceeds maximum", sizeof(last_error_));
return -1;
}
std::lock_guard<std::mutex> lock(mutex_);
for (uint32_t i = 0; i < count; ++i) {
IndexEntry entry;
entry.task = tasks[i];
// Enforce null termination on all string fields
entry.task.id[sizeof(entry.task.id) - 1] = '\0';
entry.task.job_name[sizeof(entry.task.job_name) - 1] = '\0';
entry.task.status[sizeof(entry.task.status) - 1] = '\0';
entry.offset = 0;
entry.dirty = true;
entries_.push_back(entry);
}
dirty_ = true;
rebuild_heap();
return static_cast<int>(count);
}
int PriorityQueueIndex::get_next_batch(qi_task_t* out_tasks, uint32_t max_count, uint32_t* out_count) {
std::lock_guard<std::mutex> lock(mutex_);
uint32_t got = 0;
while (got < max_count && !heap_.empty()) {
size_t idx = heap_.pop();
if (idx >= entries_.size()) continue;
out_tasks[got] = entries_[idx].task;
got++;
}
if (out_count) {
*out_count = got;
}
return 0;
}
int PriorityQueueIndex::save() {
std::lock_guard<std::mutex> lock(mutex_);
// Convert entries to disk format
std::vector<DiskEntry> disk_entries;
disk_entries.reserve(entries_.size());
for (const auto& entry : entries_) {
DiskEntry disk;
memcpy(disk.id, entry.task.id, 64);
memcpy(disk.job_name, entry.task.job_name, 128);
memcpy(disk.status, entry.task.status, 16);
disk.priority = entry.task.priority;
disk.created_at = entry.task.created_at;
disk.next_retry = entry.task.next_retry;
memset(disk.reserved, 0, sizeof(disk.reserved));
disk_entries.push_back(disk);
}
if (!storage_write_entries(&storage_, disk_entries.data(), disk_entries.size())) {
safe_strncpy(last_error_, "Failed to write entries", sizeof(last_error_));
return -1;
}
dirty_ = false;
return 0;
}
int PriorityQueueIndex::get_all_tasks(qi_task_t** out_tasks, size_t* out_count) {
std::lock_guard<std::mutex> lock(mutex_);
if (entries_.empty()) {
*out_tasks = nullptr;
*out_count = 0;
return 0;
}
qi_task_t* tasks = new qi_task_t[entries_.size()];
for (size_t i = 0; i < entries_.size(); ++i) {
tasks[i] = entries_[i].task;
}
*out_tasks = tasks;
*out_count = entries_.size();
return 0;
}
// Get task by ID (O(1) lookup via hash map)
int PriorityQueueIndex::get_task_by_id(const char* task_id, qi_task_t* out_task) {
std::lock_guard<std::mutex> lock(mutex_);
if (!task_id || !out_task) return -1;
auto it = id_index_.find(task_id);
if (it == id_index_.end()) {
safe_strncpy(last_error_, "Task not found", sizeof(last_error_));
return -1;
}
*out_task = entries_[it->second].task;
return 0;
}
// Update tasks
int PriorityQueueIndex::update_tasks(const qi_task_t* tasks, uint32_t count) {
std::lock_guard<std::mutex> lock(mutex_);
if (!tasks || count == 0) return -1;
for (uint32_t i = 0; i < count; ++i) {
auto it = id_index_.find(tasks[i].id);
if (it != id_index_.end()) {
entries_[it->second].task = tasks[i];
// Enforce null termination on all string fields
entries_[it->second].task.id[sizeof(entries_[it->second].task.id) - 1] = '\0';
entries_[it->second].task.job_name[sizeof(entries_[it->second].task.job_name) - 1] = '\0';
entries_[it->second].task.status[sizeof(entries_[it->second].task.status) - 1] = '\0';
entries_[it->second].dirty = true;
}
}
dirty_ = true;
rebuild_heap();
return static_cast<int>(count);
}
// Remove tasks
int PriorityQueueIndex::remove_tasks(const char** task_ids, uint32_t count) {
std::lock_guard<std::mutex> lock(mutex_);
if (!task_ids || count == 0) return -1;
int removed = 0;
for (uint32_t i = 0; i < count; ++i) {
if (!task_ids[i]) continue;
auto it = id_index_.find(task_ids[i]);
if (it != id_index_.end()) {
size_t idx = it->second;
// Swap with last and pop (fast removal)
if (idx < entries_.size() - 1) {
entries_[idx] = entries_.back();
id_index_[entries_[idx].task.id] = idx;
}
entries_.pop_back();
id_index_.erase(it);
removed++;
}
}
if (removed > 0) {
dirty_ = true;
rebuild_heap();
}
return removed;
}
// Compact index (remove finished/failed tasks)
int PriorityQueueIndex::compact_index() {
std::lock_guard<std::mutex> lock(mutex_);
size_t original_size = entries_.size();
// Remove entries with "finished" or "failed" status
auto new_end = std::remove_if(entries_.begin(), entries_.end(),
[](const IndexEntry& e) {
return strcmp(e.task.status, "finished") == 0 ||
strcmp(e.task.status, "failed") == 0;
});
entries_.erase(new_end, entries_.end());
if (entries_.size() < original_size) {
dirty_ = true;
rebuild_id_index();
rebuild_heap();
}
return static_cast<int>(original_size - entries_.size());
}
// Rebuild heap
int PriorityQueueIndex::rebuild() {
std::lock_guard<std::mutex> lock(mutex_);
rebuild_heap();
return 0;
}