- Integrate RunManifest.Validate with existing Validator - Add manifest Sign() and Verify() methods - Add native C++ hashing libraries (dataset_hash, queue_index) - Add native bridge for Go/C++ integration - Add deduplication support in queue
248 lines
7.3 KiB
C++
248 lines
7.3 KiB
C++
#include "parallel_hash.h"
|
|
#include "../io/file_hash.h"
|
|
#include "../crypto/sha256_hasher.h"
|
|
#include "../../common/include/thread_pool.h"
|
|
#include <dirent.h>
|
|
#include <sys/stat.h>
|
|
#include <string.h>
|
|
#include <stdlib.h>
|
|
#include <atomic>
|
|
#include <functional>
|
|
#include <thread>
|
|
#include <vector>
|
|
|
|
// Simple file collector - just flat directory for now
|
|
static int collect_files(const char* dir_path, char** out_paths, int max_files) {
|
|
DIR* dir = opendir(dir_path);
|
|
if (!dir) return 0;
|
|
|
|
int count = 0;
|
|
struct dirent* entry;
|
|
while ((entry = readdir(dir)) != NULL && count < max_files) {
|
|
if (entry->d_name[0] == '.') continue; // Skip hidden
|
|
|
|
char full_path[4096];
|
|
snprintf(full_path, sizeof(full_path), "%s/%s", dir_path, entry->d_name);
|
|
|
|
struct stat st;
|
|
if (stat(full_path, &st) == 0 && S_ISREG(st.st_mode)) {
|
|
if (out_paths) {
|
|
strncpy(out_paths[count], full_path, 4095);
|
|
out_paths[count][4095] = '\0';
|
|
}
|
|
count++;
|
|
}
|
|
}
|
|
closedir(dir);
|
|
return count;
|
|
}
|
|
|
|
int parallel_hasher_init(ParallelHasher* hasher, uint32_t num_threads, size_t buffer_size) {
|
|
if (!hasher) return 0;
|
|
|
|
hasher->buffer_size = buffer_size;
|
|
hasher->pool = (ThreadPool*)malloc(sizeof(ThreadPool));
|
|
if (!hasher->pool) return 0;
|
|
|
|
if (num_threads == 0) {
|
|
num_threads = ThreadPool::default_thread_count();
|
|
}
|
|
|
|
new (hasher->pool) ThreadPool(num_threads);
|
|
return 1;
|
|
}
|
|
|
|
void parallel_hasher_cleanup(ParallelHasher* hasher) {
|
|
if (!hasher || !hasher->pool) return;
|
|
|
|
hasher->pool->~ThreadPool();
|
|
free(hasher->pool);
|
|
hasher->pool = nullptr;
|
|
}
|
|
|
|
// Batch hash task - processes a range of files
|
|
struct BatchHashTask {
|
|
const char** paths;
|
|
char** out_hashes;
|
|
size_t buffer_size;
|
|
int start_idx;
|
|
int end_idx;
|
|
std::atomic<bool>* success;
|
|
};
|
|
|
|
// Worker function for batch processing
|
|
static void batch_hash_worker(BatchHashTask* task) {
|
|
for (int i = task->start_idx; i < task->end_idx; i++) {
|
|
if (hash_file(task->paths[i], task->buffer_size, task->out_hashes[i]) != 0) {
|
|
task->success->store(false);
|
|
}
|
|
}
|
|
}
|
|
|
|
int parallel_hash_directory(ParallelHasher* hasher, const char* path, char* out_hash) {
|
|
if (!hasher || !path || !out_hash) return -1;
|
|
|
|
// Collect files
|
|
char paths[256][4096];
|
|
char* path_ptrs[256];
|
|
for (int i = 0; i < 256; i++) path_ptrs[i] = paths[i];
|
|
|
|
int count = collect_files(path, path_ptrs, 256);
|
|
if (count == 0) {
|
|
// Empty directory - hash empty string
|
|
Sha256State st;
|
|
sha256_init(&st);
|
|
uint8_t result[32];
|
|
sha256_finalize(&st, result);
|
|
static const char hex[] = "0123456789abcdef";
|
|
for (int i = 0; i < 32; i++) {
|
|
out_hash[i*2] = hex[(result[i] >> 4) & 0xf];
|
|
out_hash[i*2+1] = hex[result[i] & 0xf];
|
|
}
|
|
out_hash[64] = '\0';
|
|
return 0;
|
|
}
|
|
|
|
// Convert path_ptrs to const char** for batch task
|
|
const char* path_array[256];
|
|
for (int i = 0; i < count; i++) {
|
|
path_array[i] = path_ptrs[i];
|
|
}
|
|
|
|
// Parallel hash all files using ThreadPool with batched tasks
|
|
char hashes[256][65];
|
|
std::atomic<bool> all_success{true};
|
|
std::atomic<int> completed_batches{0};
|
|
|
|
// Determine batch size - divide files among threads
|
|
uint32_t num_threads = ThreadPool::default_thread_count();
|
|
int batch_size = (count + num_threads - 1) / num_threads;
|
|
if (batch_size < 1) batch_size = 1;
|
|
int num_batches = (count + batch_size - 1) / batch_size;
|
|
|
|
// Allocate batch tasks
|
|
BatchHashTask* batch_tasks = new BatchHashTask[num_batches];
|
|
char* hash_ptrs[256];
|
|
for (int i = 0; i < count; i++) {
|
|
hash_ptrs[i] = hashes[i];
|
|
}
|
|
|
|
for (int b = 0; b < num_batches; b++) {
|
|
int start = b * batch_size;
|
|
int end = start + batch_size;
|
|
if (end > count) end = count;
|
|
|
|
batch_tasks[b].paths = path_array;
|
|
batch_tasks[b].out_hashes = hash_ptrs;
|
|
batch_tasks[b].buffer_size = hasher->buffer_size;
|
|
batch_tasks[b].start_idx = start;
|
|
batch_tasks[b].end_idx = end;
|
|
batch_tasks[b].success = &all_success;
|
|
}
|
|
|
|
// Enqueue batch tasks (one per thread, not one per file)
|
|
for (int b = 0; b < num_batches; b++) {
|
|
hasher->pool->enqueue([batch_tasks, b, &completed_batches]() {
|
|
batch_hash_worker(&batch_tasks[b]);
|
|
completed_batches.fetch_add(1);
|
|
});
|
|
}
|
|
|
|
// Wait for all batches to complete
|
|
while (completed_batches.load() < num_batches) {
|
|
std::this_thread::yield();
|
|
}
|
|
|
|
// Check for errors
|
|
if (!all_success.load()) {
|
|
delete[] batch_tasks;
|
|
return -1;
|
|
}
|
|
|
|
// Combine hashes deterministically (same order as paths)
|
|
Sha256State st;
|
|
sha256_init(&st);
|
|
for (int i = 0; i < count; i++) {
|
|
sha256_update(&st, (uint8_t*)hashes[i], strlen(hashes[i]));
|
|
}
|
|
uint8_t result[32];
|
|
sha256_finalize(&st, result);
|
|
|
|
// Convert to hex
|
|
static const char hex[] = "0123456789abcdef";
|
|
for (int i = 0; i < 32; i++) {
|
|
out_hash[i*2] = hex[(result[i] >> 4) & 0xf];
|
|
out_hash[i*2+1] = hex[result[i] & 0xf];
|
|
}
|
|
out_hash[64] = '\0';
|
|
|
|
delete[] batch_tasks;
|
|
return 0;
|
|
}
|
|
|
|
int parallel_hash_directory_batch(
|
|
ParallelHasher* hasher,
|
|
const char* path,
|
|
char** out_hashes,
|
|
char** out_paths,
|
|
uint32_t max_results,
|
|
uint32_t* out_count) {
|
|
|
|
if (!hasher || !path || !out_hashes) return -1;
|
|
|
|
// Collect files
|
|
int count = collect_files(path, out_paths, (int)max_results);
|
|
if (out_count) *out_count = (uint32_t)count;
|
|
|
|
if (count == 0) {
|
|
return 0;
|
|
}
|
|
|
|
// Convert out_paths to const char** for batch task
|
|
const char* path_array[256];
|
|
for (int i = 0; i < count; i++) {
|
|
path_array[i] = out_paths ? out_paths[i] : nullptr;
|
|
}
|
|
|
|
// Parallel hash all files using ThreadPool with batched tasks
|
|
std::atomic<bool> all_success{true};
|
|
std::atomic<int> completed_batches{0};
|
|
|
|
// Determine batch size
|
|
uint32_t num_threads = ThreadPool::default_thread_count();
|
|
int batch_size = (count + num_threads - 1) / num_threads;
|
|
if (batch_size < 1) batch_size = 1;
|
|
int num_batches = (count + batch_size - 1) / batch_size;
|
|
|
|
// Allocate batch tasks
|
|
BatchHashTask* batch_tasks = new BatchHashTask[num_batches];
|
|
|
|
for (int b = 0; b < num_batches; b++) {
|
|
int start = b * batch_size;
|
|
int end = start + batch_size;
|
|
if (end > count) end = count;
|
|
|
|
batch_tasks[b].paths = path_array;
|
|
batch_tasks[b].out_hashes = out_hashes;
|
|
batch_tasks[b].buffer_size = hasher->buffer_size;
|
|
batch_tasks[b].start_idx = start;
|
|
batch_tasks[b].end_idx = end;
|
|
batch_tasks[b].success = &all_success;
|
|
}
|
|
|
|
// Enqueue batch tasks
|
|
for (int b = 0; b < num_batches; b++) {
|
|
hasher->pool->enqueue([batch_tasks, b, &completed_batches]() {
|
|
batch_hash_worker(&batch_tasks[b]);
|
|
completed_batches.fetch_add(1);
|
|
});
|
|
}
|
|
|
|
// Wait for all batches to complete
|
|
while (completed_batches.load() < num_batches) {
|
|
std::this_thread::yield();
|
|
}
|
|
|
|
delete[] batch_tasks;
|
|
return all_success.load() ? 0 : -1;
|
|
}
|