#include "parallel_hash.h" #include "../io/file_hash.h" #include "../crypto/sha256_hasher.h" #include "../../common/include/thread_pool.h" #include "../../common/include/secure_mem.h" #include #include #include #include #include #include #include #include #include #include using fetchml::common::safe_strncpy; // Maximum recursion depth to prevent stack overflow on symlink cycles static constexpr int MAX_RECURSION_DEPTH = 32; // Track visited directories by device+inode pair for cycle detection struct DirId { dev_t device; ino_t inode; bool operator==(const DirId& other) const { return device == other.device && inode == other.inode; } }; struct DirIdHash { size_t operator()(const DirId& id) const { return std::hash()(id.device) ^ (std::hash()(id.inode) << 1); } }; // Forward declaration for recursion static int collect_files_recursive(const char* dir_path, std::vector& out_paths, int depth, std::unordered_set& visited); // Collect files recursively from directory tree // Returns: 0 on success, -1 on I/O error or cycle detected static int collect_files(const char* dir_path, std::vector& out_paths) { out_paths.clear(); std::unordered_set visited; int result = collect_files_recursive(dir_path, out_paths, 0, visited); if (result == 0) { // Sort for deterministic ordering across filesystems std::sort(out_paths.begin(), out_paths.end()); } return result; } static int collect_files_recursive(const char* dir_path, std::vector& out_paths, int depth, std::unordered_set& visited) { if (depth > MAX_RECURSION_DEPTH) { return -1; // Depth limit exceeded - possible cycle } DIR* dir = opendir(dir_path); if (!dir) return -1; struct dirent* entry; while ((entry = readdir(dir)) != NULL) { if (entry->d_name[0] == '.') continue; // Skip hidden and . / .. char full_path[4096]; int written = snprintf(full_path, sizeof(full_path), "%s/%s", dir_path, entry->d_name); if (written < 0 || (size_t)written >= sizeof(full_path)) { closedir(dir); return -1; // Path too long } struct stat st; if (stat(full_path, &st) != 0) continue; // Can't stat, skip if (S_ISREG(st.st_mode)) { out_paths.emplace_back(full_path); } else if (S_ISDIR(st.st_mode)) { // Check for cycles via device+inode DirId dir_id{st.st_dev, st.st_ino}; if (visited.find(dir_id) != visited.end()) { continue; // Already visited this directory (cycle) } visited.insert(dir_id); // Recurse into subdirectory if (collect_files_recursive(full_path, out_paths, depth + 1, visited) != 0) { closedir(dir); return -1; } } // Symlinks, devices, and special files are silently skipped } closedir(dir); return 0; } int parallel_hasher_init(ParallelHasher* hasher, uint32_t num_threads, size_t buffer_size) { if (!hasher) return 0; hasher->buffer_size = buffer_size; hasher->pool = (ThreadPool*)malloc(sizeof(ThreadPool)); if (!hasher->pool) return 0; if (num_threads == 0) { num_threads = ThreadPool::default_thread_count(); } new (hasher->pool) ThreadPool(num_threads); return 1; } void parallel_hasher_cleanup(ParallelHasher* hasher) { if (!hasher || !hasher->pool) return; hasher->pool->~ThreadPool(); free(hasher->pool); hasher->pool = nullptr; } // Batch hash task - processes a range of files struct BatchHashTask { const char** paths; char** out_hashes; size_t buffer_size; int start_idx; int end_idx; std::atomic* success; }; // Worker function for batch processing static void batch_hash_worker(BatchHashTask* task) { for (int i = task->start_idx; i < task->end_idx; i++) { if (hash_file(task->paths[i], task->buffer_size, task->out_hashes[i]) != 0) { task->success->store(false); } } } int parallel_hash_directory(ParallelHasher* hasher, const char* path, char* out_hash) { if (!hasher || !path || !out_hash) return -1; // Collect files into vector (no limit) std::vector paths; if (collect_files(path, paths) != 0) { return -1; // I/O error } size_t count = paths.size(); if (count == 0) { // Empty directory - hash empty string Sha256State st; sha256_init(&st); uint8_t result[32]; sha256_finalize(&st, result); static const char hex[] = "0123456789abcdef"; for (int i = 0; i < 32; i++) { out_hash[i*2] = hex[(result[i] >> 4) & 0xf]; out_hash[i*2+1] = hex[result[i] & 0xf]; } out_hash[64] = '\0'; return 0; } // Convert to const char* array for batch task std::vector path_array; path_array.reserve(count); for (size_t i = 0; i < count; i++) { path_array.push_back(paths[i].c_str()); } // Parallel hash all files using ThreadPool with batched tasks std::vector hashes(count); for (size_t i = 0; i < count; i++) { hashes[i].resize(65); } // Create array of pointers to hash buffers for batch task std::vector hash_ptrs(count); for (size_t i = 0; i < count; i++) { hash_ptrs[i] = &hashes[i][0]; } std::atomic all_success{true}; // Determine batch size - divide files among threads uint32_t num_threads = ThreadPool::default_thread_count(); int batch_size = (static_cast(count) + num_threads - 1) / num_threads; if (batch_size < 1) batch_size = 1; int num_batches = (static_cast(count) + batch_size - 1) / batch_size; // Allocate batch tasks BatchHashTask* batch_tasks = new BatchHashTask[num_batches]; for (int b = 0; b < num_batches; b++) { int start = b * batch_size; int end = start + batch_size; if (end > static_cast(count)) end = static_cast(count); batch_tasks[b].paths = path_array.data(); batch_tasks[b].out_hashes = hash_ptrs.data(); batch_tasks[b].buffer_size = hasher->buffer_size; batch_tasks[b].start_idx = start; batch_tasks[b].end_idx = end; batch_tasks[b].success = &all_success; } // Enqueue batch tasks (one per thread, not one per file) for (int b = 0; b < num_batches; b++) { hasher->pool->enqueue([batch_tasks, b]() { batch_hash_worker(&batch_tasks[b]); }); } // Use wait_all() instead of spin-loop with stack-local atomic // This ensures workers complete before batch_tasks goes out of scope hasher->pool->wait_all(); // Check for errors if (!all_success.load()) { delete[] batch_tasks; return -1; } // Combine hashes deterministically (same order as paths) - use 64 chars, not 65 Sha256State st; sha256_init(&st); for (size_t i = 0; i < count; i++) { sha256_update(&st, (uint8_t*)hashes[i].c_str(), 64); // 64 hex chars, not 65 } uint8_t result[32]; sha256_finalize(&st, result); // Convert to hex static const char hex[] = "0123456789abcdef"; for (int i = 0; i < 32; i++) { out_hash[i*2] = hex[(result[i] >> 4) & 0xf]; out_hash[i*2+1] = hex[result[i] & 0xf]; } out_hash[64] = '\0'; delete[] batch_tasks; return 0; } int parallel_hash_directory_batch( ParallelHasher* hasher, const char* path, char** out_hashes, char** out_paths, uint32_t max_results, uint32_t* out_count) { if (!hasher || !path || !out_hashes) return -1; // Collect files into vector (no limit) std::vector paths; if (collect_files(path, paths) != 0) { if (out_count) *out_count = 0; return -1; // I/O error } // Respect max_results limit if provided if (paths.size() > max_results) { paths.resize(max_results); } size_t count = paths.size(); if (out_count) *out_count = static_cast(count); if (count == 0) { return 0; } // Copy paths to out_paths if provided (for caller's reference) if (out_paths) { for (size_t i = 0; i < count && i < max_results; i++) { safe_strncpy(out_paths[i], paths[i].c_str(), 4096); } } // Convert to const char* array for batch task std::vector path_array; path_array.reserve(count); for (size_t i = 0; i < count; i++) { path_array.push_back(paths[i].c_str()); } // Parallel hash all files using ThreadPool with batched tasks std::atomic all_success{true}; // Determine batch size uint32_t num_threads = ThreadPool::default_thread_count(); int batch_size = (static_cast(count) + num_threads - 1) / num_threads; if (batch_size < 1) batch_size = 1; int num_batches = (static_cast(count) + batch_size - 1) / batch_size; // Allocate batch tasks BatchHashTask* batch_tasks = new BatchHashTask[num_batches]; for (int b = 0; b < num_batches; b++) { int start = b * batch_size; int end = start + batch_size; if (end > static_cast(count)) end = static_cast(count); batch_tasks[b].paths = path_array.data(); batch_tasks[b].out_hashes = out_hashes; batch_tasks[b].buffer_size = hasher->buffer_size; batch_tasks[b].start_idx = start; batch_tasks[b].end_idx = end; batch_tasks[b].success = &all_success; } // Enqueue batch tasks for (int b = 0; b < num_batches; b++) { hasher->pool->enqueue([batch_tasks, b]() { batch_hash_worker(&batch_tasks[b]); }); } // Use wait_all() instead of spin-loop hasher->pool->wait_all(); delete[] batch_tasks; return all_success.load() ? 0 : -1; }