fetch_ml/native/common/include/thread_pool.h
Jeremie Fraeys 7efe8bbfbf
native: security hardening, research trustworthiness, and CVE mitigations
Security Fixes:
- CVE-2024-45339: Add O_EXCL flag to temp file creation in storage_write_entries()
  Prevents symlink attacks on predictable .tmp file paths
- CVE-2025-47290: Use openat_nofollow() in storage_open()
  Closes TOCTOU race condition via path_sanitizer infrastructure
- CVE-2025-0838: Add MAX_BATCH_SIZE=10000 to add_tasks()
  Prevents integer overflow in batch operations

Research Trustworthiness (dataset_hash):
- Deterministic file ordering: std::sort after collect_files()
- Recursive directory traversal: depth-limited with cycle detection
- Documented exclusions: hidden files and special files noted in API

Bug Fixes:
- R1: storage_init path validation for non-existent directories
- R2: safe_strncpy return value check before strcat
- R3: parallel_hash 256-file cap replaced with std::vector
- R4: wire qi_compact_index/qi_rebuild_index stubs
- R5: CompletionLatch race condition fix (hold mutex during decrement)
- R6: ARMv8 SHA256 transform fix (save abcd_pre before vsha256hq_u32)
- R7: fuzz_index_storage header format fix
- R8: enforce null termination in add_tasks/update_tasks
- R9: use 64 bytes (not 65) in combined hash to exclude null terminator
- R10: status field persistence in save()

New Tests:
- test_recursive_dataset.cpp: Verify deterministic recursive hashing
- test_storage_symlink_resistance.cpp: Verify CVE-2024-45339 fix
- test_queue_index_batch_limit.cpp: Verify CVE-2025-0838 fix
- test_sha256_arm_kat.cpp: ARMv8 known-answer tests
- test_storage_init_new_dir.cpp: F1 verification
- test_parallel_hash_large_dir.cpp: F3 verification
- test_queue_index_compact.cpp: F4 verification

All 8 native tests passing. Library ready for research lab deployment.
2026-02-21 13:33:45 -05:00

58 lines
1.5 KiB
C++

#pragma once
#include <condition_variable>
#include <functional>
#include <mutex>
#include <queue>
#include <thread>
#include <vector>
#include <cstdint>
// Fixed-size thread pool for parallel operations.
// Minimizes thread creation overhead for batch operations.
class ThreadPool {
std::vector<std::thread> workers_;
std::queue<std::function<void()>> tasks_;
std::mutex queue_mutex_;
std::condition_variable condition_;
std::condition_variable done_condition_;
std::atomic<size_t> active_tasks_{0};
bool stop_ = false;
public:
explicit ThreadPool(size_t num_threads);
~ThreadPool();
// Add task to queue. Thread-safe.
void enqueue(std::function<void()> task);
// Wait for all queued AND executing tasks to complete.
void wait_all();
// Get optimal thread count (capped at 8 for I/O bound work)
static uint32_t default_thread_count();
};
// Synchronization primitive: wait for N completions
class CompletionLatch {
std::atomic<size_t> count_{0};
std::mutex mutex_;
std::condition_variable cv_;
public:
explicit CompletionLatch(size_t total) : count_(total) {}
void arrive() {
{
std::lock_guard<std::mutex> lock(mutex_);
--count_;
}
if (count_.load() == 0) {
cv_.notify_all();
}
}
void wait() {
std::unique_lock<std::mutex> lock(mutex_);
cv_.wait(lock, [&] { return count_.load() == 0; });
}
};