Security Fixes: - CVE-2024-45339: Add O_EXCL flag to temp file creation in storage_write_entries() Prevents symlink attacks on predictable .tmp file paths - CVE-2025-47290: Use openat_nofollow() in storage_open() Closes TOCTOU race condition via path_sanitizer infrastructure - CVE-2025-0838: Add MAX_BATCH_SIZE=10000 to add_tasks() Prevents integer overflow in batch operations Research Trustworthiness (dataset_hash): - Deterministic file ordering: std::sort after collect_files() - Recursive directory traversal: depth-limited with cycle detection - Documented exclusions: hidden files and special files noted in API Bug Fixes: - R1: storage_init path validation for non-existent directories - R2: safe_strncpy return value check before strcat - R3: parallel_hash 256-file cap replaced with std::vector - R4: wire qi_compact_index/qi_rebuild_index stubs - R5: CompletionLatch race condition fix (hold mutex during decrement) - R6: ARMv8 SHA256 transform fix (save abcd_pre before vsha256hq_u32) - R7: fuzz_index_storage header format fix - R8: enforce null termination in add_tasks/update_tasks - R9: use 64 bytes (not 65) in combined hash to exclude null terminator - R10: status field persistence in save() New Tests: - test_recursive_dataset.cpp: Verify deterministic recursive hashing - test_storage_symlink_resistance.cpp: Verify CVE-2024-45339 fix - test_queue_index_batch_limit.cpp: Verify CVE-2025-0838 fix - test_sha256_arm_kat.cpp: ARMv8 known-answer tests - test_storage_init_new_dir.cpp: F1 verification - test_parallel_hash_large_dir.cpp: F3 verification - test_queue_index_compact.cpp: F4 verification All 8 native tests passing. Library ready for research lab deployment.
284 lines
8.2 KiB
C++
284 lines
8.2 KiB
C++
// priority_queue.cpp - C++ style but using C-style storage
|
|
#include "priority_queue.h"
|
|
#include "../../common/include/secure_mem.h"
|
|
#include <algorithm>
|
|
#include <cstring>
|
|
|
|
using fetchml::common::safe_strncpy;
|
|
|
|
PriorityQueueIndex::PriorityQueueIndex(const char* queue_dir)
|
|
: heap_(entries_, EntryComparator{}) {
|
|
// Initialize storage (returns false if path invalid, we ignore - open() will fail)
|
|
storage_init(&storage_, queue_dir);
|
|
}
|
|
|
|
PriorityQueueIndex::~PriorityQueueIndex() {
|
|
close();
|
|
}
|
|
|
|
bool PriorityQueueIndex::open() {
|
|
if (!storage_open(&storage_)) {
|
|
safe_strncpy(last_error_, "Failed to open storage", sizeof(last_error_));
|
|
return false;
|
|
}
|
|
|
|
load_entries();
|
|
rebuild_heap();
|
|
return true;
|
|
}
|
|
|
|
void PriorityQueueIndex::close() {
|
|
if (dirty_) {
|
|
save();
|
|
}
|
|
storage_close(&storage_);
|
|
storage_cleanup(&storage_);
|
|
entries_.clear();
|
|
heap_.clear();
|
|
}
|
|
|
|
void PriorityQueueIndex::load_entries() {
|
|
entries_.clear();
|
|
|
|
// Try memory-mapped access first
|
|
if (storage_mmap_for_read(&storage_)) {
|
|
size_t count = storage_mmap_entry_count(&storage_);
|
|
const DiskEntry* disk_entries = storage_mmap_entries(&storage_);
|
|
|
|
entries_.reserve(count);
|
|
for (size_t i = 0; i < count; ++i) {
|
|
IndexEntry entry;
|
|
memcpy(&entry.task.id, disk_entries[i].id, 64);
|
|
entry.task.id[63] = '\0'; // Ensure null termination
|
|
memcpy(&entry.task.job_name, disk_entries[i].job_name, 128);
|
|
entry.task.job_name[127] = '\0'; // Ensure null termination
|
|
memcpy(&entry.task.status, disk_entries[i].status, 16);
|
|
entry.task.status[15] = '\0';
|
|
entry.task.priority = disk_entries[i].priority;
|
|
entry.task.created_at = disk_entries[i].created_at;
|
|
entry.task.next_retry = disk_entries[i].next_retry;
|
|
entry.offset = i;
|
|
entry.dirty = false;
|
|
entries_.push_back(entry);
|
|
}
|
|
}
|
|
storage_munmap(&storage_);
|
|
|
|
// Rebuild ID index after loading
|
|
rebuild_id_index();
|
|
}
|
|
|
|
void PriorityQueueIndex::rebuild_heap() {
|
|
std::vector<size_t> queued_indices;
|
|
|
|
for (size_t i = 0; i < entries_.size(); ++i) {
|
|
queued_indices.push_back(i);
|
|
}
|
|
|
|
heap_.build(queued_indices);
|
|
}
|
|
|
|
void PriorityQueueIndex::rebuild_id_index() {
|
|
id_index_.clear();
|
|
id_index_.reserve(entries_.size());
|
|
for (size_t i = 0; i < entries_.size(); ++i) {
|
|
id_index_[entries_[i].task.id] = i;
|
|
}
|
|
}
|
|
|
|
int PriorityQueueIndex::add_tasks(const qi_task_t* tasks, uint32_t count) {
|
|
// Validate batch size to prevent integer overflow (CVE-2025-0838)
|
|
if (!tasks || count == 0) return 0;
|
|
if (count > MAX_BATCH_SIZE) {
|
|
safe_strncpy(last_error_, "Batch size exceeds maximum", sizeof(last_error_));
|
|
return -1;
|
|
}
|
|
|
|
std::lock_guard<std::mutex> lock(mutex_);
|
|
|
|
for (uint32_t i = 0; i < count; ++i) {
|
|
IndexEntry entry;
|
|
entry.task = tasks[i];
|
|
// Enforce null termination on all string fields
|
|
entry.task.id[sizeof(entry.task.id) - 1] = '\0';
|
|
entry.task.job_name[sizeof(entry.task.job_name) - 1] = '\0';
|
|
entry.task.status[sizeof(entry.task.status) - 1] = '\0';
|
|
entry.offset = 0;
|
|
entry.dirty = true;
|
|
entries_.push_back(entry);
|
|
}
|
|
|
|
dirty_ = true;
|
|
rebuild_heap();
|
|
return static_cast<int>(count);
|
|
}
|
|
|
|
int PriorityQueueIndex::get_next_batch(qi_task_t* out_tasks, uint32_t max_count, uint32_t* out_count) {
|
|
std::lock_guard<std::mutex> lock(mutex_);
|
|
|
|
uint32_t got = 0;
|
|
|
|
while (got < max_count && !heap_.empty()) {
|
|
size_t idx = heap_.pop();
|
|
if (idx >= entries_.size()) continue;
|
|
|
|
out_tasks[got] = entries_[idx].task;
|
|
got++;
|
|
}
|
|
|
|
if (out_count) {
|
|
*out_count = got;
|
|
}
|
|
|
|
return 0;
|
|
}
|
|
|
|
int PriorityQueueIndex::save() {
|
|
std::lock_guard<std::mutex> lock(mutex_);
|
|
|
|
// Convert entries to disk format
|
|
std::vector<DiskEntry> disk_entries;
|
|
disk_entries.reserve(entries_.size());
|
|
|
|
for (const auto& entry : entries_) {
|
|
DiskEntry disk;
|
|
memcpy(disk.id, entry.task.id, 64);
|
|
memcpy(disk.job_name, entry.task.job_name, 128);
|
|
memcpy(disk.status, entry.task.status, 16);
|
|
disk.priority = entry.task.priority;
|
|
disk.created_at = entry.task.created_at;
|
|
disk.next_retry = entry.task.next_retry;
|
|
memset(disk.reserved, 0, sizeof(disk.reserved));
|
|
disk_entries.push_back(disk);
|
|
}
|
|
|
|
if (!storage_write_entries(&storage_, disk_entries.data(), disk_entries.size())) {
|
|
safe_strncpy(last_error_, "Failed to write entries", sizeof(last_error_));
|
|
return -1;
|
|
}
|
|
|
|
dirty_ = false;
|
|
return 0;
|
|
}
|
|
|
|
int PriorityQueueIndex::get_all_tasks(qi_task_t** out_tasks, size_t* out_count) {
|
|
std::lock_guard<std::mutex> lock(mutex_);
|
|
|
|
if (entries_.empty()) {
|
|
*out_tasks = nullptr;
|
|
*out_count = 0;
|
|
return 0;
|
|
}
|
|
|
|
qi_task_t* tasks = new qi_task_t[entries_.size()];
|
|
|
|
for (size_t i = 0; i < entries_.size(); ++i) {
|
|
tasks[i] = entries_[i].task;
|
|
}
|
|
|
|
*out_tasks = tasks;
|
|
*out_count = entries_.size();
|
|
return 0;
|
|
}
|
|
|
|
// Get task by ID (O(1) lookup via hash map)
|
|
int PriorityQueueIndex::get_task_by_id(const char* task_id, qi_task_t* out_task) {
|
|
std::lock_guard<std::mutex> lock(mutex_);
|
|
|
|
if (!task_id || !out_task) return -1;
|
|
|
|
auto it = id_index_.find(task_id);
|
|
if (it == id_index_.end()) {
|
|
safe_strncpy(last_error_, "Task not found", sizeof(last_error_));
|
|
return -1;
|
|
}
|
|
|
|
*out_task = entries_[it->second].task;
|
|
return 0;
|
|
}
|
|
|
|
// Update tasks
|
|
int PriorityQueueIndex::update_tasks(const qi_task_t* tasks, uint32_t count) {
|
|
std::lock_guard<std::mutex> lock(mutex_);
|
|
|
|
if (!tasks || count == 0) return -1;
|
|
|
|
for (uint32_t i = 0; i < count; ++i) {
|
|
auto it = id_index_.find(tasks[i].id);
|
|
if (it != id_index_.end()) {
|
|
entries_[it->second].task = tasks[i];
|
|
// Enforce null termination on all string fields
|
|
entries_[it->second].task.id[sizeof(entries_[it->second].task.id) - 1] = '\0';
|
|
entries_[it->second].task.job_name[sizeof(entries_[it->second].task.job_name) - 1] = '\0';
|
|
entries_[it->second].task.status[sizeof(entries_[it->second].task.status) - 1] = '\0';
|
|
entries_[it->second].dirty = true;
|
|
}
|
|
}
|
|
|
|
dirty_ = true;
|
|
rebuild_heap();
|
|
return static_cast<int>(count);
|
|
}
|
|
|
|
// Remove tasks
|
|
int PriorityQueueIndex::remove_tasks(const char** task_ids, uint32_t count) {
|
|
std::lock_guard<std::mutex> lock(mutex_);
|
|
|
|
if (!task_ids || count == 0) return -1;
|
|
|
|
int removed = 0;
|
|
for (uint32_t i = 0; i < count; ++i) {
|
|
if (!task_ids[i]) continue;
|
|
|
|
auto it = id_index_.find(task_ids[i]);
|
|
if (it != id_index_.end()) {
|
|
size_t idx = it->second;
|
|
// Swap with last and pop (fast removal)
|
|
if (idx < entries_.size() - 1) {
|
|
entries_[idx] = entries_.back();
|
|
id_index_[entries_[idx].task.id] = idx;
|
|
}
|
|
entries_.pop_back();
|
|
id_index_.erase(it);
|
|
removed++;
|
|
}
|
|
}
|
|
|
|
if (removed > 0) {
|
|
dirty_ = true;
|
|
rebuild_heap();
|
|
}
|
|
|
|
return removed;
|
|
}
|
|
|
|
// Compact index (remove finished/failed tasks)
|
|
int PriorityQueueIndex::compact_index() {
|
|
std::lock_guard<std::mutex> lock(mutex_);
|
|
|
|
size_t original_size = entries_.size();
|
|
|
|
// Remove entries with "finished" or "failed" status
|
|
auto new_end = std::remove_if(entries_.begin(), entries_.end(),
|
|
[](const IndexEntry& e) {
|
|
return strcmp(e.task.status, "finished") == 0 ||
|
|
strcmp(e.task.status, "failed") == 0;
|
|
});
|
|
|
|
entries_.erase(new_end, entries_.end());
|
|
|
|
if (entries_.size() < original_size) {
|
|
dirty_ = true;
|
|
rebuild_id_index();
|
|
rebuild_heap();
|
|
}
|
|
|
|
return static_cast<int>(original_size - entries_.size());
|
|
}
|
|
|
|
// Rebuild heap
|
|
int PriorityQueueIndex::rebuild() {
|
|
std::lock_guard<std::mutex> lock(mutex_);
|
|
rebuild_heap();
|
|
return 0;
|
|
}
|