fetch_ml/native/queue_index/storage/index_storage.cpp
Jeremie Fraeys 43d241c28d
feat: implement C++ native libraries for performance-critical operations
- Add arena allocator for zero-allocation hot paths
- Add thread pool for parallel operations
- Add mmap utilities for memory-mapped I/O
- Implement queue_index with heap-based priority queue
- Implement dataset_hash with SIMD support (SHA-NI, ARMv8)
- Add runtime SIMD detection for cross-platform correctness
- Add comprehensive tests and benchmarks
2026-02-16 20:38:04 -05:00

240 lines
6.7 KiB
C++

// index_storage.cpp - C-style storage implementation
// Security: path validation rejects '..' and null bytes
#include "index_storage.h"
#include <fcntl.h>
#include <sys/mman.h>
#include <sys/stat.h>
#include <unistd.h>
#include <string.h>
#include <stdio.h>
#include <errno.h>
// Maximum index file size: 100MB
#define MAX_INDEX_SIZE (100 * 1024 * 1024)
// Simple path validation - rejects traversal attempts
static bool is_valid_path(const char* path) {
if (!path || path[0] == '\0') return false;
// Reject .. and null bytes
for (const char* p = path; *p; ++p) {
if (*p == '\0') return false;
if (p[0] == '.' && p[1] == '.') return false;
}
return true;
}
// Simple recursive mkdir (replacement for std::filesystem::create_directories)
static bool mkdir_p(const char* path) {
char tmp[4096];
strncpy(tmp, path, sizeof(tmp) - 1);
tmp[sizeof(tmp) - 1] = '\0';
// Remove trailing slash if present
size_t len = strlen(tmp);
if (len > 0 && tmp[len - 1] == '/') {
tmp[len - 1] = '\0';
}
// Try to create each component
for (char* p = tmp + 1; *p; ++p) {
if (*p == '/') {
*p = '\0';
mkdir(tmp, 0755);
*p = '/';
}
}
// Final component
return mkdir(tmp, 0755) == 0 || errno == EEXIST;
}
bool storage_init(IndexStorage* storage, const char* queue_dir) {
if (!storage) return false;
memset(storage, 0, sizeof(IndexStorage));
storage->fd = -1;
if (!is_valid_path(queue_dir)) {
return false;
}
// Build path: queue_dir + "/index.bin"
size_t dir_len = strlen(queue_dir);
if (dir_len >= sizeof(storage->index_path) - 11) {
return false; // Path too long
}
memcpy(storage->index_path, queue_dir, dir_len);
memcpy(storage->index_path + dir_len, "/index.bin", 11); // includes null
return true;
}
void storage_cleanup(IndexStorage* storage) {
if (!storage) return;
storage_close(storage);
}
bool storage_open(IndexStorage* storage) {
if (!storage || storage->fd >= 0) return false;
// Ensure directory exists (find last slash, create parent)
char parent[4096];
strncpy(parent, storage->index_path, sizeof(parent) - 1);
parent[sizeof(parent) - 1] = '\0';
char* last_slash = strrchr(parent, '/');
if (last_slash) {
*last_slash = '\0';
mkdir_p(parent);
}
storage->fd = ::open(storage->index_path, O_RDWR | O_CREAT, 0640);
if (storage->fd < 0) {
return false;
}
struct stat st;
if (fstat(storage->fd, &st) < 0) {
storage_close(storage);
return false;
}
if (st.st_size == 0) {
// Write header for new file
FileHeader header;
memcpy(header.magic, INDEX_MAGIC, 4);
header.version = CURRENT_VERSION;
header.entry_count = 0;
memset(header.reserved, 0, sizeof(header.reserved));
memset(header.padding, 0, sizeof(header.padding));
if (write(storage->fd, &header, sizeof(header)) != sizeof(header)) {
storage_close(storage);
return false;
}
}
return true;
}
void storage_close(IndexStorage* storage) {
if (!storage) return;
storage_munmap(storage);
if (storage->fd >= 0) {
::close(storage->fd);
storage->fd = -1;
}
}
bool storage_read_entries(IndexStorage* storage, DiskEntry* out_entries, size_t max_count, size_t* out_count) {
if (!storage || storage->fd < 0 || !out_entries) return false;
FileHeader header;
if (pread(storage->fd, &header, sizeof(header), 0) != sizeof(header)) {
return false;
}
if (memcmp(header.magic, INDEX_MAGIC, 4) != 0) {
return false;
}
size_t to_read = header.entry_count < max_count ? header.entry_count : max_count;
size_t bytes = to_read * sizeof(DiskEntry);
if (pread(storage->fd, out_entries, bytes, sizeof(FileHeader)) != (ssize_t)bytes) {
return false;
}
if (out_count) {
*out_count = to_read;
}
return true;
}
bool storage_write_entries(IndexStorage* storage, const DiskEntry* entries, size_t count) {
if (!storage || storage->fd < 0 || !entries) return false;
char tmp_path[4096 + 4];
strncpy(tmp_path, storage->index_path, sizeof(tmp_path) - 5);
tmp_path[sizeof(tmp_path) - 5] = '\0';
strcat(tmp_path, ".tmp");
int tmp_fd = ::open(tmp_path, O_WRONLY | O_CREAT | O_TRUNC, 0640);
if (tmp_fd < 0) {
return false;
}
// Write header
FileHeader header;
memcpy(header.magic, INDEX_MAGIC, 4);
header.version = CURRENT_VERSION;
header.entry_count = count;
memset(header.reserved, 0, sizeof(header.reserved));
memset(header.padding, 0, sizeof(header.padding));
if (write(tmp_fd, &header, sizeof(header)) != sizeof(header)) {
::close(tmp_fd);
unlink(tmp_path);
return false;
}
// Write entries
size_t bytes = count * sizeof(DiskEntry);
if (write(tmp_fd, entries, bytes) != (ssize_t)bytes) {
::close(tmp_fd);
unlink(tmp_path);
return false;
}
::close(tmp_fd);
// Atomic rename
if (rename(tmp_path, storage->index_path) != 0) {
unlink(tmp_path);
return false;
}
return true;
}
bool storage_mmap_for_read(IndexStorage* storage) {
if (!storage || storage->fd < 0) return false;
storage_munmap(storage);
struct stat st;
if (fstat(storage->fd, &st) < 0) {
return false;
}
if (st.st_size <= (off_t)sizeof(FileHeader)) {
return true; // Empty but valid
}
if (st.st_size > (off_t)MAX_INDEX_SIZE) {
return false; // File too large
}
storage->mmap_size = (size_t)st.st_size;
storage->mmap_ptr = mmap(nullptr, storage->mmap_size, PROT_READ, MAP_PRIVATE, storage->fd, 0);
return storage->mmap_ptr != MAP_FAILED;
}
void storage_munmap(IndexStorage* storage) {
if (!storage) return;
if (storage->mmap_ptr && storage->mmap_ptr != MAP_FAILED) {
munmap(storage->mmap_ptr, storage->mmap_size);
storage->mmap_ptr = nullptr;
storage->mmap_size = 0;
}
}
const DiskEntry* storage_mmap_entries(IndexStorage* storage) {
if (!storage || !storage->mmap_ptr || storage->mmap_ptr == MAP_FAILED) return nullptr;
return (const DiskEntry*)((const uint8_t*)storage->mmap_ptr + sizeof(FileHeader));
}
size_t storage_mmap_entry_count(IndexStorage* storage) {
if (!storage || !storage->mmap_ptr || storage->mmap_ptr == MAP_FAILED) return 0;
const FileHeader* header = (const FileHeader*)storage->mmap_ptr;
return header->entry_count;
}