Security Fixes: - CVE-2024-45339: Add O_EXCL flag to temp file creation in storage_write_entries() Prevents symlink attacks on predictable .tmp file paths - CVE-2025-47290: Use openat_nofollow() in storage_open() Closes TOCTOU race condition via path_sanitizer infrastructure - CVE-2025-0838: Add MAX_BATCH_SIZE=10000 to add_tasks() Prevents integer overflow in batch operations Research Trustworthiness (dataset_hash): - Deterministic file ordering: std::sort after collect_files() - Recursive directory traversal: depth-limited with cycle detection - Documented exclusions: hidden files and special files noted in API Bug Fixes: - R1: storage_init path validation for non-existent directories - R2: safe_strncpy return value check before strcat - R3: parallel_hash 256-file cap replaced with std::vector - R4: wire qi_compact_index/qi_rebuild_index stubs - R5: CompletionLatch race condition fix (hold mutex during decrement) - R6: ARMv8 SHA256 transform fix (save abcd_pre before vsha256hq_u32) - R7: fuzz_index_storage header format fix - R8: enforce null termination in add_tasks/update_tasks - R9: use 64 bytes (not 65) in combined hash to exclude null terminator - R10: status field persistence in save() New Tests: - test_recursive_dataset.cpp: Verify deterministic recursive hashing - test_storage_symlink_resistance.cpp: Verify CVE-2024-45339 fix - test_queue_index_batch_limit.cpp: Verify CVE-2025-0838 fix - test_sha256_arm_kat.cpp: ARMv8 known-answer tests - test_storage_init_new_dir.cpp: F1 verification - test_parallel_hash_large_dir.cpp: F3 verification - test_queue_index_compact.cpp: F4 verification All 8 native tests passing. Library ready for research lab deployment.
332 lines
10 KiB
C++
332 lines
10 KiB
C++
#include "parallel_hash.h"
|
|
#include "../io/file_hash.h"
|
|
#include "../crypto/sha256_hasher.h"
|
|
#include "../../common/include/thread_pool.h"
|
|
#include "../../common/include/secure_mem.h"
|
|
#include <dirent.h>
|
|
#include <sys/stat.h>
|
|
#include <string.h>
|
|
#include <stdlib.h>
|
|
#include <atomic>
|
|
#include <functional>
|
|
#include <thread>
|
|
#include <vector>
|
|
#include <algorithm>
|
|
#include <unordered_set>
|
|
|
|
using fetchml::common::safe_strncpy;
|
|
|
|
// Maximum recursion depth to prevent stack overflow on symlink cycles
|
|
static constexpr int MAX_RECURSION_DEPTH = 32;
|
|
|
|
// Track visited directories by device+inode pair for cycle detection
|
|
struct DirId {
|
|
dev_t device;
|
|
ino_t inode;
|
|
bool operator==(const DirId& other) const {
|
|
return device == other.device && inode == other.inode;
|
|
}
|
|
};
|
|
|
|
struct DirIdHash {
|
|
size_t operator()(const DirId& id) const {
|
|
return std::hash<dev_t>()(id.device) ^
|
|
(std::hash<ino_t>()(id.inode) << 1);
|
|
}
|
|
};
|
|
|
|
// Forward declaration for recursion
|
|
static int collect_files_recursive(const char* dir_path,
|
|
std::vector<std::string>& out_paths,
|
|
int depth,
|
|
std::unordered_set<DirId, DirIdHash>& visited);
|
|
|
|
// Collect files recursively from directory tree
|
|
// Returns: 0 on success, -1 on I/O error or cycle detected
|
|
static int collect_files(const char* dir_path, std::vector<std::string>& out_paths) {
|
|
out_paths.clear();
|
|
std::unordered_set<DirId, DirIdHash> visited;
|
|
int result = collect_files_recursive(dir_path, out_paths, 0, visited);
|
|
if (result == 0) {
|
|
// Sort for deterministic ordering across filesystems
|
|
std::sort(out_paths.begin(), out_paths.end());
|
|
}
|
|
return result;
|
|
}
|
|
|
|
static int collect_files_recursive(const char* dir_path,
|
|
std::vector<std::string>& out_paths,
|
|
int depth,
|
|
std::unordered_set<DirId, DirIdHash>& visited) {
|
|
if (depth > MAX_RECURSION_DEPTH) {
|
|
return -1; // Depth limit exceeded - possible cycle
|
|
}
|
|
|
|
DIR* dir = opendir(dir_path);
|
|
if (!dir) return -1;
|
|
|
|
struct dirent* entry;
|
|
while ((entry = readdir(dir)) != NULL) {
|
|
if (entry->d_name[0] == '.') continue; // Skip hidden and . / ..
|
|
|
|
char full_path[4096];
|
|
int written = snprintf(full_path, sizeof(full_path), "%s/%s",
|
|
dir_path, entry->d_name);
|
|
if (written < 0 || (size_t)written >= sizeof(full_path)) {
|
|
closedir(dir);
|
|
return -1; // Path too long
|
|
}
|
|
|
|
struct stat st;
|
|
if (stat(full_path, &st) != 0) continue; // Can't stat, skip
|
|
|
|
if (S_ISREG(st.st_mode)) {
|
|
out_paths.emplace_back(full_path);
|
|
} else if (S_ISDIR(st.st_mode)) {
|
|
// Check for cycles via device+inode
|
|
DirId dir_id{st.st_dev, st.st_ino};
|
|
if (visited.find(dir_id) != visited.end()) {
|
|
continue; // Already visited this directory (cycle)
|
|
}
|
|
visited.insert(dir_id);
|
|
|
|
// Recurse into subdirectory
|
|
if (collect_files_recursive(full_path, out_paths, depth + 1, visited) != 0) {
|
|
closedir(dir);
|
|
return -1;
|
|
}
|
|
}
|
|
// Symlinks, devices, and special files are silently skipped
|
|
}
|
|
|
|
closedir(dir);
|
|
return 0;
|
|
}
|
|
|
|
int parallel_hasher_init(ParallelHasher* hasher, uint32_t num_threads, size_t buffer_size) {
|
|
if (!hasher) return 0;
|
|
|
|
hasher->buffer_size = buffer_size;
|
|
hasher->pool = (ThreadPool*)malloc(sizeof(ThreadPool));
|
|
if (!hasher->pool) return 0;
|
|
|
|
if (num_threads == 0) {
|
|
num_threads = ThreadPool::default_thread_count();
|
|
}
|
|
|
|
new (hasher->pool) ThreadPool(num_threads);
|
|
return 1;
|
|
}
|
|
|
|
void parallel_hasher_cleanup(ParallelHasher* hasher) {
|
|
if (!hasher || !hasher->pool) return;
|
|
|
|
hasher->pool->~ThreadPool();
|
|
free(hasher->pool);
|
|
hasher->pool = nullptr;
|
|
}
|
|
|
|
// Batch hash task - processes a range of files
|
|
struct BatchHashTask {
|
|
const char** paths;
|
|
char** out_hashes;
|
|
size_t buffer_size;
|
|
int start_idx;
|
|
int end_idx;
|
|
std::atomic<bool>* success;
|
|
};
|
|
|
|
// Worker function for batch processing
|
|
static void batch_hash_worker(BatchHashTask* task) {
|
|
for (int i = task->start_idx; i < task->end_idx; i++) {
|
|
if (hash_file(task->paths[i], task->buffer_size, task->out_hashes[i]) != 0) {
|
|
task->success->store(false);
|
|
}
|
|
}
|
|
}
|
|
|
|
int parallel_hash_directory(ParallelHasher* hasher, const char* path, char* out_hash) {
|
|
if (!hasher || !path || !out_hash) return -1;
|
|
|
|
// Collect files into vector (no limit)
|
|
std::vector<std::string> paths;
|
|
if (collect_files(path, paths) != 0) {
|
|
return -1; // I/O error
|
|
}
|
|
|
|
size_t count = paths.size();
|
|
if (count == 0) {
|
|
// Empty directory - hash empty string
|
|
Sha256State st;
|
|
sha256_init(&st);
|
|
uint8_t result[32];
|
|
sha256_finalize(&st, result);
|
|
static const char hex[] = "0123456789abcdef";
|
|
for (int i = 0; i < 32; i++) {
|
|
out_hash[i*2] = hex[(result[i] >> 4) & 0xf];
|
|
out_hash[i*2+1] = hex[result[i] & 0xf];
|
|
}
|
|
out_hash[64] = '\0';
|
|
return 0;
|
|
}
|
|
|
|
// Convert to const char* array for batch task
|
|
std::vector<const char*> path_array;
|
|
path_array.reserve(count);
|
|
for (size_t i = 0; i < count; i++) {
|
|
path_array.push_back(paths[i].c_str());
|
|
}
|
|
|
|
// Parallel hash all files using ThreadPool with batched tasks
|
|
std::vector<std::string> hashes(count);
|
|
for (size_t i = 0; i < count; i++) {
|
|
hashes[i].resize(65);
|
|
}
|
|
// Create array of pointers to hash buffers for batch task
|
|
std::vector<char*> hash_ptrs(count);
|
|
for (size_t i = 0; i < count; i++) {
|
|
hash_ptrs[i] = &hashes[i][0];
|
|
}
|
|
std::atomic<bool> all_success{true};
|
|
|
|
// Determine batch size - divide files among threads
|
|
uint32_t num_threads = ThreadPool::default_thread_count();
|
|
int batch_size = (static_cast<int>(count) + num_threads - 1) / num_threads;
|
|
if (batch_size < 1) batch_size = 1;
|
|
int num_batches = (static_cast<int>(count) + batch_size - 1) / batch_size;
|
|
|
|
// Allocate batch tasks
|
|
BatchHashTask* batch_tasks = new BatchHashTask[num_batches];
|
|
|
|
for (int b = 0; b < num_batches; b++) {
|
|
int start = b * batch_size;
|
|
int end = start + batch_size;
|
|
if (end > static_cast<int>(count)) end = static_cast<int>(count);
|
|
|
|
batch_tasks[b].paths = path_array.data();
|
|
batch_tasks[b].out_hashes = hash_ptrs.data();
|
|
batch_tasks[b].buffer_size = hasher->buffer_size;
|
|
batch_tasks[b].start_idx = start;
|
|
batch_tasks[b].end_idx = end;
|
|
batch_tasks[b].success = &all_success;
|
|
}
|
|
|
|
// Enqueue batch tasks (one per thread, not one per file)
|
|
for (int b = 0; b < num_batches; b++) {
|
|
hasher->pool->enqueue([batch_tasks, b]() {
|
|
batch_hash_worker(&batch_tasks[b]);
|
|
});
|
|
}
|
|
|
|
// Use wait_all() instead of spin-loop with stack-local atomic
|
|
// This ensures workers complete before batch_tasks goes out of scope
|
|
hasher->pool->wait_all();
|
|
|
|
// Check for errors
|
|
if (!all_success.load()) {
|
|
delete[] batch_tasks;
|
|
return -1;
|
|
}
|
|
|
|
// Combine hashes deterministically (same order as paths) - use 64 chars, not 65
|
|
Sha256State st;
|
|
sha256_init(&st);
|
|
for (size_t i = 0; i < count; i++) {
|
|
sha256_update(&st, (uint8_t*)hashes[i].c_str(), 64); // 64 hex chars, not 65
|
|
}
|
|
uint8_t result[32];
|
|
sha256_finalize(&st, result);
|
|
|
|
// Convert to hex
|
|
static const char hex[] = "0123456789abcdef";
|
|
for (int i = 0; i < 32; i++) {
|
|
out_hash[i*2] = hex[(result[i] >> 4) & 0xf];
|
|
out_hash[i*2+1] = hex[result[i] & 0xf];
|
|
}
|
|
out_hash[64] = '\0';
|
|
|
|
delete[] batch_tasks;
|
|
return 0;
|
|
}
|
|
|
|
int parallel_hash_directory_batch(
|
|
ParallelHasher* hasher,
|
|
const char* path,
|
|
char** out_hashes,
|
|
char** out_paths,
|
|
uint32_t max_results,
|
|
uint32_t* out_count) {
|
|
|
|
if (!hasher || !path || !out_hashes) return -1;
|
|
|
|
// Collect files into vector (no limit)
|
|
std::vector<std::string> paths;
|
|
if (collect_files(path, paths) != 0) {
|
|
if (out_count) *out_count = 0;
|
|
return -1; // I/O error
|
|
}
|
|
|
|
// Respect max_results limit if provided
|
|
if (paths.size() > max_results) {
|
|
paths.resize(max_results);
|
|
}
|
|
|
|
size_t count = paths.size();
|
|
if (out_count) *out_count = static_cast<uint32_t>(count);
|
|
|
|
if (count == 0) {
|
|
return 0;
|
|
}
|
|
|
|
// Copy paths to out_paths if provided (for caller's reference)
|
|
if (out_paths) {
|
|
for (size_t i = 0; i < count && i < max_results; i++) {
|
|
safe_strncpy(out_paths[i], paths[i].c_str(), 4096);
|
|
}
|
|
}
|
|
|
|
// Convert to const char* array for batch task
|
|
std::vector<const char*> path_array;
|
|
path_array.reserve(count);
|
|
for (size_t i = 0; i < count; i++) {
|
|
path_array.push_back(paths[i].c_str());
|
|
}
|
|
|
|
// Parallel hash all files using ThreadPool with batched tasks
|
|
std::atomic<bool> all_success{true};
|
|
|
|
// Determine batch size
|
|
uint32_t num_threads = ThreadPool::default_thread_count();
|
|
int batch_size = (static_cast<int>(count) + num_threads - 1) / num_threads;
|
|
if (batch_size < 1) batch_size = 1;
|
|
int num_batches = (static_cast<int>(count) + batch_size - 1) / batch_size;
|
|
|
|
// Allocate batch tasks
|
|
BatchHashTask* batch_tasks = new BatchHashTask[num_batches];
|
|
|
|
for (int b = 0; b < num_batches; b++) {
|
|
int start = b * batch_size;
|
|
int end = start + batch_size;
|
|
if (end > static_cast<int>(count)) end = static_cast<int>(count);
|
|
|
|
batch_tasks[b].paths = path_array.data();
|
|
batch_tasks[b].out_hashes = out_hashes;
|
|
batch_tasks[b].buffer_size = hasher->buffer_size;
|
|
batch_tasks[b].start_idx = start;
|
|
batch_tasks[b].end_idx = end;
|
|
batch_tasks[b].success = &all_success;
|
|
}
|
|
|
|
// Enqueue batch tasks
|
|
for (int b = 0; b < num_batches; b++) {
|
|
hasher->pool->enqueue([batch_tasks, b]() {
|
|
batch_hash_worker(&batch_tasks[b]);
|
|
});
|
|
}
|
|
|
|
// Use wait_all() instead of spin-loop
|
|
hasher->pool->wait_all();
|
|
|
|
delete[] batch_tasks;
|
|
return all_success.load() ? 0 : -1;
|
|
}
|