From e67d18900e3030830af2b84b4df5085511f73225 Mon Sep 17 00:00:00 2001 From: Jeremie Fraeys Date: Mon, 23 Mar 2026 15:16:26 -0400 Subject: [PATCH] chore(native): remove old native/rust directory Remove the old native/rust/ directory - files were previously moved to native_rust/ workspace at the repository root. This cleans up the deprecated location after the Rust workspace reorganization. --- native/rust/Cargo.toml | 43 --- native/rust/common/Cargo.toml | 16 - native/rust/common/src/lib.rs | 143 -------- native/rust/dataset_hash/Cargo.toml | 28 -- .../rust/dataset_hash/include/dataset_hash.h | 28 -- native/rust/dataset_hash/src/lib.rs | 214 ------------ native/rust/dataset_hash/src/metrics.rs | 103 ------ native/rust/deny.toml | 18 - native/rust/queue_index/Cargo.toml | 27 -- native/rust/queue_index/include/queue_index.h | 49 --- native/rust/queue_index/src/index.rs | 171 ---------- native/rust/queue_index/src/lib.rs | 309 ------------------ native/rust/queue_index/src/storage.rs | 202 ------------ native/rust/queue_index/src/task.rs | 74 ----- native/rust/rust-toolchain.toml | 3 - 15 files changed, 1428 deletions(-) delete mode 100644 native/rust/Cargo.toml delete mode 100644 native/rust/common/Cargo.toml delete mode 100644 native/rust/common/src/lib.rs delete mode 100644 native/rust/dataset_hash/Cargo.toml delete mode 100644 native/rust/dataset_hash/include/dataset_hash.h delete mode 100644 native/rust/dataset_hash/src/lib.rs delete mode 100644 native/rust/dataset_hash/src/metrics.rs delete mode 100644 native/rust/deny.toml delete mode 100644 native/rust/queue_index/Cargo.toml delete mode 100644 native/rust/queue_index/include/queue_index.h delete mode 100644 native/rust/queue_index/src/index.rs delete mode 100644 native/rust/queue_index/src/lib.rs delete mode 100644 native/rust/queue_index/src/storage.rs delete mode 100644 native/rust/queue_index/src/task.rs delete mode 100644 native/rust/rust-toolchain.toml diff --git a/native/rust/Cargo.toml b/native/rust/Cargo.toml deleted file mode 100644 index bc414f7..0000000 --- a/native/rust/Cargo.toml +++ /dev/null @@ -1,43 +0,0 @@ -[workspace] -members = ["queue_index", "dataset_hash", "common"] -resolver = "2" - -[workspace.package] -version = "0.1.0" -edition = "2021" -authors = ["FetchML Team"] -license = "MIT OR Apache-2.0" -rust-version = "1.85.0" - -[workspace.dependencies] -# Core dependencies -libc = "0.2" -thiserror = "1.0" -anyhow = "1.0" - -# Keep: Performance-critical dependencies -rayon = "1.8" # ~8 deps - work-stealing worth it -blake3 = { version = "1.5", features = ["rayon"] } # ~12 deps - SIMD dispatch -memmap2 = "0.9" # ~1 dep - thin mmap wrapper - -# Serialization (lightweight) -serde = { version = "1.0", features = ["derive"] } -serde_json = "1.0" - -# Testing -tempfile = "3.10" - -# Dropped: crossbeam (~6) -> use std::sync::Mutex -# Dropped: prometheus (~20) -> implement metrics manually -# Dropped: tracing (~15) -> use eprintln! for now - -[profile.release] -opt-level = 3 -lto = "thin" -codegen-units = 1 -panic = "abort" -strip = true - -[profile.dev] -debug = true -opt-level = 0 diff --git a/native/rust/common/Cargo.toml b/native/rust/common/Cargo.toml deleted file mode 100644 index a9e1bdf..0000000 --- a/native/rust/common/Cargo.toml +++ /dev/null @@ -1,16 +0,0 @@ -[package] -name = "common" -version.workspace = true -edition.workspace = true -authors.workspace = true -license.workspace = true -rust-version.workspace = true - -[lib] -crate-type = ["rlib"] - -[dependencies] -libc = { workspace = true } - -[dev-dependencies] -tempfile = "3.10" diff --git a/native/rust/common/src/lib.rs b/native/rust/common/src/lib.rs deleted file mode 100644 index 3932795..0000000 --- a/native/rust/common/src/lib.rs +++ /dev/null @@ -1,143 +0,0 @@ -//! Common FFI utilities for native libraries -//! -//! Provides safe wrappers for FFI boundary operations including: -//! - Panic recovery at FFI boundaries -//! - String conversion between C and Rust -//! - Error handling patterns - -use std::ffi::{CStr, CString}; -use std::os::raw::{c_char, c_int}; -use std::ptr; - -/// Recover from panics at FFI boundaries, returning a safe default -/// -/// # Safety -/// The closure should not leak resources on panic -pub unsafe fn ffi_boundary(f: impl FnOnce() -> T + std::panic::UnwindSafe) -> Option { - match std::panic::catch_unwind(f) { - Ok(result) => Some(result), - Err(_) => { - eprintln!("FFI boundary panic caught and recovered"); - None - } - } -} - -/// Convert C string to Rust String -/// -/// # Safety -/// ptr must be a valid null-terminated UTF-8 string or null -pub unsafe fn c_str_to_string(ptr: *const c_char) -> Option { - if ptr.is_null() { - return None; - } - - CStr::from_ptr(ptr) - .to_str() - .ok() - .map(|s| s.to_string()) -} - -/// Convert Rust String to C string (leaked, caller must free) -/// -/// Returns null on error. On success, returns a pointer that must be freed with `free_string`. -pub fn string_to_c_str(s: &str) -> *mut c_char { - match CString::new(s) { - Ok(cstring) => cstring.into_raw(), - Err(_) => ptr::null_mut(), - } -} - -/// Free a string previously created by `string_to_c_str` -/// -/// # Safety -/// ptr must be a string previously returned by string_to_c_str, or null -pub unsafe fn free_string(ptr: *mut c_char) { - if !ptr.is_null() { - let _ = CString::from_raw(ptr); - } -} - -/// Set an error code and message -/// -/// Returns -1 for error, caller should return this from FFI function -pub fn set_error(error_ptr: *mut *const c_char, msg: &str) -> c_int { - if !error_ptr.is_null() { - unsafe { - *error_ptr = string_to_c_str(msg); - } - } - -1 -} - -/// FFI-safe result type for boolean operations -pub type FfiResult = c_int; -pub const FFI_OK: FfiResult = 0; -pub const FFI_ERROR: FfiResult = -1; - -/// Thread-local error storage for FFI boundaries -pub mod error { - use std::cell::RefCell; - - thread_local! { - static LAST_ERROR: RefCell> = RefCell::new(None); - } - - /// Store an error message - pub fn set_error(msg: impl Into) { - LAST_ERROR.with(|e| { - *e.borrow_mut() = Some(msg.into()); - }); - } - - /// Get and clear the last error - pub fn take_error() -> Option { - LAST_ERROR.with(|e| e.borrow_mut().take()) - } - - /// Peek at the last error without clearing - pub fn peek_error() -> Option { - LAST_ERROR.with(|e| e.borrow().clone()) - } -} - -#[cfg(test)] -mod tests { - use super::*; - - #[test] - fn test_c_str_roundtrip() { - let original = "hello world"; - let c_ptr = string_to_c_str(original); - assert!(!c_ptr.is_null()); - - unsafe { - let recovered = c_str_to_string(c_ptr); - assert_eq!(recovered, Some(original.to_string())); - free_string(c_ptr); - } - } - - #[test] - fn test_null_handling() { - unsafe { - assert_eq!(c_str_to_string(ptr::null()), None); - free_string(ptr::null_mut()); // Should not panic - } - } - - #[test] - fn test_ffi_boundary_recovery() { - unsafe { - // Normal case - let result = ffi_boundary(|| 42); - assert_eq!(result, Some(42)); - - // Panic case - should recover - let result = ffi_boundary(|| { - panic!("test panic"); - }); - assert_eq!(result, None); - } - } -} diff --git a/native/rust/dataset_hash/Cargo.toml b/native/rust/dataset_hash/Cargo.toml deleted file mode 100644 index bbb53bf..0000000 --- a/native/rust/dataset_hash/Cargo.toml +++ /dev/null @@ -1,28 +0,0 @@ -[package] -name = "dataset_hash" -version.workspace = true -edition.workspace = true -authors.workspace = true -license.workspace = true -rust-version.workspace = true - -[lib] -crate-type = ["cdylib", "staticlib", "rlib"] - -[dependencies] -common = { path = "../common" } - -# Workspace dependencies -libc = { workspace = true } -thiserror = { workspace = true } -anyhow = { workspace = true } -rayon = { workspace = true } -blake3 = { workspace = true } -serde = { workspace = true } -serde_json = { workspace = true } - -# Minimal additional dependencies -walkdir = "2.5" # ~5 deps for recursive directory walking - -[dev-dependencies] -tempfile = { workspace = true } diff --git a/native/rust/dataset_hash/include/dataset_hash.h b/native/rust/dataset_hash/include/dataset_hash.h deleted file mode 100644 index b864055..0000000 --- a/native/rust/dataset_hash/include/dataset_hash.h +++ /dev/null @@ -1,28 +0,0 @@ -#ifndef DATASET_HASH_H -#define DATASET_HASH_H - -#include -#include - -#ifdef __cplusplus -extern "C" { -#endif - -// Hash a directory and return combined digest -// dir: path to directory -// out_hash: output parameter, receives allocated hex string (caller must free with fh_free_string) -// Returns: 0 on success, -1 on error -int fh_hash_directory_combined(const char* dir, char** out_hash); - -// Free a string previously returned by FFI functions -void fh_free_string(char* s); - -// Get metrics in Prometheus format -// Returns: allocated string with metrics (caller must free with fh_free_string) -char* fh_get_metrics(void); - -#ifdef __cplusplus -} -#endif - -#endif // DATASET_HASH_H diff --git a/native/rust/dataset_hash/src/lib.rs b/native/rust/dataset_hash/src/lib.rs deleted file mode 100644 index e0a6029..0000000 --- a/native/rust/dataset_hash/src/lib.rs +++ /dev/null @@ -1,214 +0,0 @@ -//! Dataset Hash - Parallel file hashing with BLAKE3 -//! -//! Provides high-performance parallel hashing using BLAKE3's built-in SIMD dispatch. -//! Supports both single-file and batch directory hashing with deterministic ordering. - -use rayon::prelude::*; -use std::ffi::{CStr, CString}; -use std::fs::File; -use std::io::{self, BufReader, Read}; -use std::os::raw::{c_char, c_int}; -use std::path::{Path, PathBuf}; -use std::ptr; -use walkdir::WalkDir; - -mod metrics; -pub use metrics::NativeMetrics; - -/// Hash a single file using BLAKE3 -/// -/// BLAKE3 has built-in SIMD dispatch for AVX2, AVX-512, and NEON. -/// No hand-rolled intrinsics needed. -pub fn hash_file(path: &Path) -> io::Result { - let file = File::open(path)?; - let mut reader = BufReader::with_capacity(64 * 1024, file); // 64KB buffer - - let mut hasher = blake3::Hasher::new(); - let mut buffer = [0u8; 64 * 1024]; - - loop { - let n = reader.read(&mut buffer)?; - if n == 0 { - break; - } - hasher.update(&buffer[..n]); - } - - Ok(hasher.finalize().to_hex().to_string()) -} - -/// Collect all files in a directory, sorted deterministically -/// -/// Security: excludes hidden files (starting with '.') and symlinks -pub fn collect_files(dir: &Path) -> io::Result> { - let mut files: Vec = WalkDir::new(dir) - .max_depth(32) // Prevent infinite recursion - .follow_links(false) // Security: no symlink traversal - .into_iter() - .filter_map(|e| e.ok()) - .filter(|e| { - let path = e.path(); - let is_file = e.file_type().is_file(); - let not_hidden = !e.file_name() - .to_str() - .map(|s| s.starts_with('.')) - .unwrap_or(false); - is_file && not_hidden - }) - .map(|e| e.path().to_path_buf()) - .collect(); - - // Deterministic ordering: byte-level comparison (OS-locale independent) - files.sort_unstable_by(|a, b| a.as_os_str().cmp(b.as_os_str())); - - Ok(files) -} - -/// Hash all files in a directory in parallel -/// -/// Returns sorted (path, hash) pairs for deterministic output -pub fn hash_directory_batch(dir: &Path) -> io::Result> { - let files = collect_files(dir)?; - - // Parallel hash with rayon - preserves order - let results: Vec<(String, io::Result)> = files - .into_iter() - .map(|path| { - let path_str = path.to_string_lossy().to_string(); - let hash = hash_file(&path); - (path_str, hash) - }) - .collect(); - - // Convert to final format, propagating errors - let mut output: Vec<(String, String)> = Vec::with_capacity(results.len()); - for (path, hash_result) in results { - match hash_result { - Ok(hash) => output.push((path, hash)), - Err(e) => return Err(e), - } - } - - Ok(output) -} - -/// Combined hash of entire directory (single digest) -/// -/// This hashes all files and then hashes the concatenation of their hashes, -/// providing a single digest for the entire directory. -pub fn hash_directory_combined(dir: &Path) -> io::Result { - let pairs = hash_directory_batch(dir)?; - - let mut hasher = blake3::Hasher::new(); - for (path, hash) in &pairs { - hasher.update(path.as_bytes()); - hasher.update(hash.as_bytes()); - } - - Ok(hasher.finalize().to_hex().to_string()) -} - -// ============================================================================ -// FFI Interface -// ============================================================================ - -/// Hash a directory and return combined digest -/// -/// # Safety -/// dir must be a valid null-terminated UTF-8 path string -#[no_mangle] -pub unsafe extern "C" fn fh_hash_directory_combined( - dir: *const c_char, - out_hash: *mut *mut c_char, -) -> c_int { - if dir.is_null() || out_hash.is_null() { - return -1; - } - - let result = std::panic::catch_unwind(|| { - let dir_str = match CStr::from_ptr(dir).to_str() { - Ok(s) => s, - Err(_) => return -1, - }; - - let dir_path = Path::new(dir_str); - - match hash_directory_combined(dir_path) { - Ok(hash) => { - let c_hash = match CString::new(hash) { - Ok(s) => s, - Err(_) => return -1, - }; - *out_hash = c_hash.into_raw(); - 0 - } - Err(_) => -1, - } - }); - - match result { - Ok(rc) => rc, - Err(_) => { - eprintln!("Panic in fh_hash_directory_combined"); - -1 - } - } -} - -/// Free a string previously returned by FFI functions -/// -/// # Safety -/// s must be a string previously returned by an FFI function, or null -#[no_mangle] -pub unsafe extern "C" fn fh_free_string(s: *mut c_char) { - if !s.is_null() { - let _ = CString::from_raw(s); - } -} - -#[cfg(test)] -mod tests { - use super::*; - use tempfile::TempDir; - use std::fs; - - #[test] - fn test_hash_file() { - let temp = TempDir::new().unwrap(); - let file_path = temp.path().join("test.txt"); - fs::write(&file_path, "hello world").unwrap(); - - let hash1 = hash_file(&file_path).unwrap(); - let hash2 = hash_file(&file_path).unwrap(); - - // Hash should be deterministic - assert_eq!(hash1, hash2); - // BLAKE3 produces 64-char hex strings - assert_eq!(hash1.len(), 64); - } - - #[test] - fn test_collect_files_excludes_hidden() { - let temp = TempDir::new().unwrap(); - fs::write(temp.path().join("visible.txt"), "data").unwrap(); - fs::write(temp.path().join(".hidden"), "data").unwrap(); - - let files = collect_files(temp.path()).unwrap(); - assert_eq!(files.len(), 1); - assert!(files[0].file_name().unwrap() == "visible.txt"); - } - - #[test] - fn test_hash_directory_combined() { - let temp = TempDir::new().unwrap(); - fs::write(temp.path().join("a.txt"), "AAA").unwrap(); - fs::write(temp.path().join("b.txt"), "BBB").unwrap(); - - let hash1 = hash_directory_combined(temp.path()).unwrap(); - let hash2 = hash_directory_combined(temp.path()).unwrap(); - - // Combined hash should be deterministic - assert_eq!(hash1, hash2); - assert_eq!(hash1.len(), 64); - } -} diff --git a/native/rust/dataset_hash/src/metrics.rs b/native/rust/dataset_hash/src/metrics.rs deleted file mode 100644 index 4a31726..0000000 --- a/native/rust/dataset_hash/src/metrics.rs +++ /dev/null @@ -1,103 +0,0 @@ -//! Minimal metrics implementation - no prometheus dependency -//! -//! Provides lightweight atomic counters that can be exported as Prometheus format -//! without pulling in the full prometheus client library (~20 deps). - -use std::sync::atomic::{AtomicU64, Ordering}; - -/// Lightweight metrics for the native library -pub struct NativeMetrics { - hash_duration_ns: AtomicU64, - hash_operations: AtomicU64, - panic_recoveries: AtomicU64, -} - -impl NativeMetrics { - /// Create new metrics instance - pub const fn new() -> Self { - Self { - hash_duration_ns: AtomicU64::new(0), - hash_operations: AtomicU64::new(0), - panic_recoveries: AtomicU64::new(0), - } - } - - /// Record a hash operation with its duration - pub fn record_hash(&self, duration_ns: u64) { - self.hash_operations.fetch_add(1, Ordering::Relaxed); - self.hash_duration_ns.fetch_add(duration_ns, Ordering::Relaxed); - } - - /// Record a panic recovery at FFI boundary - pub fn record_panic_recovery(&self) { - self.panic_recoveries.fetch_add(1, Ordering::Relaxed); - } - - /// Export metrics in Prometheus text format - pub fn export_prometheus(&self) -> String { - let ops = self.hash_operations.load(Ordering::Relaxed); - let duration_sec = self.hash_duration_ns.load(Ordering::Relaxed) as f64 / 1e9; - let panics = self.panic_recoveries.load(Ordering::Relaxed); - - format!( - "# TYPE native_hash_operations_total counter\n\ - native_hash_operations_total {}\n\ - # TYPE native_hash_duration_seconds counter\n\ - native_hash_duration_seconds {:.9}\n\ - # TYPE native_panic_recoveries_total counter\n\ - native_panic_recoveries_total {}\n", - ops, duration_sec, panics - ) - } - - /// Get average hash duration in nanoseconds - pub fn avg_duration_ns(&self) -> u64 { - let ops = self.hash_operations.load(Ordering::Relaxed); - let duration = self.hash_duration_ns.load(Ordering::Relaxed); - - if ops == 0 { - 0 - } else { - duration / ops - } - } -} - -/// Global metrics instance -pub static METRICS: NativeMetrics = NativeMetrics::new(); - -#[cfg(test)] -mod tests { - use super::*; - - #[test] - fn test_metrics_recording() { - let metrics = NativeMetrics::new(); - - metrics.record_hash(1_000_000); // 1ms - metrics.record_hash(2_000_000); // 2ms - - assert_eq!(metrics.hash_operations.load(Ordering::Relaxed), 2); - assert_eq!(metrics.hash_duration_ns.load(Ordering::Relaxed), 3_000_000); - assert_eq!(metrics.avg_duration_ns(), 1_500_000); - } - - #[test] - fn test_prometheus_export() { - let metrics = NativeMetrics::new(); - metrics.record_hash(1_000_000_000); // 1 second - - let output = metrics.export_prometheus(); - assert!(output.contains("native_hash_operations_total 1")); - assert!(output.contains("native_hash_duration_seconds 1.000000000")); - } - - #[test] - fn test_panic_recovery() { - let metrics = NativeMetrics::new(); - metrics.record_panic_recovery(); - metrics.record_panic_recovery(); - - assert_eq!(metrics.panic_recoveries.load(Ordering::Relaxed), 2); - } -} diff --git a/native/rust/deny.toml b/native/rust/deny.toml deleted file mode 100644 index 01d72fc..0000000 --- a/native/rust/deny.toml +++ /dev/null @@ -1,18 +0,0 @@ -[advisories] -yanked = "deny" - -[licenses] -allow = ["MIT", "Apache-2.0", "BSD-3-Clause", "ISC", "Unicode-DFS-2016"] -deny = ["GPL-2.0", "GPL-3.0"] -confidence-threshold = 0.8 - -[bans] -multiple-versions = "warn" -wildcards = "allow" - -[sources] -unknown-registry = "deny" -unknown-git = "deny" - -[sources.allow-org] -github = ["dtolnay", "tokio-rs", "crossbeam-rs", "blake3-team"] diff --git a/native/rust/queue_index/Cargo.toml b/native/rust/queue_index/Cargo.toml deleted file mode 100644 index f8382e2..0000000 --- a/native/rust/queue_index/Cargo.toml +++ /dev/null @@ -1,27 +0,0 @@ -[package] -name = "queue_index" -version.workspace = true -edition.workspace = true -authors.workspace = true -license.workspace = true -rust-version.workspace = true - -[lib] -crate-type = ["cdylib", "staticlib", "rlib"] - -[dependencies] -common = { path = "../common" } - -# Workspace dependencies -libc = { workspace = true } -thiserror = { workspace = true } -anyhow = { workspace = true } -memmap2 = { workspace = true } -serde = { workspace = true } -serde_json = { workspace = true } - -# Minimal additional dependencies -chrono = { version = "0.4", features = ["serde"] } - -[dev-dependencies] -tempfile = { workspace = true } diff --git a/native/rust/queue_index/include/queue_index.h b/native/rust/queue_index/include/queue_index.h deleted file mode 100644 index adcc868..0000000 --- a/native/rust/queue_index/include/queue_index.h +++ /dev/null @@ -1,49 +0,0 @@ -#ifndef QUEUE_INDEX_H -#define QUEUE_INDEX_H - -#include -#include - -#ifdef __cplusplus -extern "C" { -#endif - -// Opaque handle for queue index -typedef struct qi_index qi_index_t; - -// Task structure - matches Go queue.Task fields -// Fixed-size for binary format (no dynamic allocation in hot path) -typedef struct qi_task { - char id[64]; // Task ID - char job_name[128]; // Job name - int64_t priority; // Higher = more important - int64_t created_at; // Unix timestamp (nanoseconds) - int64_t next_retry; // Unix timestamp (nanoseconds), 0 if none - char status[16]; // "queued", "running", "finished", "failed" - uint32_t retries; // Current retry count -} qi_task_t; - -// Index operations -qi_index_t* qi_open(const char* queue_dir); -void qi_close(qi_index_t* idx); - -// Batch operations (amortize CGo overhead) -int qi_add_tasks(qi_index_t* idx, const qi_task_t* tasks, uint32_t count); -int qi_get_next_batch(qi_index_t* idx, qi_task_t* out_tasks, uint32_t max_count, uint32_t* out_count); - -// Query operations -int qi_get_task_by_id(qi_index_t* idx, const char* task_id, qi_task_t* out_task); -size_t qi_get_task_count(qi_index_t* idx, const char* status); - -// Memory management -void qi_free_task_array(qi_task_t* tasks); - -// Error handling -const char* qi_last_error(qi_index_t* idx); -void qi_clear_error(qi_index_t* idx); - -#ifdef __cplusplus -} -#endif - -#endif // QUEUE_INDEX_H diff --git a/native/rust/queue_index/src/index.rs b/native/rust/queue_index/src/index.rs deleted file mode 100644 index 97124fd..0000000 --- a/native/rust/queue_index/src/index.rs +++ /dev/null @@ -1,171 +0,0 @@ -//! Priority queue index implementation - -use crate::storage::SharedStorage; -use crate::task::Task; -use std::collections::BinaryHeap; -use std::io; -use std::path::Path; - -/// Task with ordering for priority queue -#[derive(Clone, Eq, PartialEq)] -struct QueuedTask { - priority: i64, - created_at: i64, - task: Task, -} - -impl Ord for QueuedTask { - fn cmp(&self, other: &Self) -> std::cmp::Ordering { - // Higher priority first, then older tasks first - self.priority - .cmp(&other.priority) - .then_with(|| other.created_at.cmp(&self.created_at)) - .reverse() - } -} - -impl PartialOrd for QueuedTask { - fn partial_cmp(&self, other: &Self) -> Option { - Some(self.cmp(other)) - } -} - -impl QueuedTask { - fn from_task(task: Task) -> Self { - Self { - priority: task.priority, - created_at: task.created_at, - task, - } - } -} - -/// Main queue index implementation -pub struct QueueIndexImpl { - storage: SharedStorage, - heap: BinaryHeap, -} - -impl QueueIndexImpl { - /// Open or create a queue index at the given path - pub fn open>(path: P) -> io::Result { - let storage = SharedStorage::open(path)?; - - // Load existing tasks from storage - let heap = BinaryHeap::new(); - - let mut index = Self { storage, heap }; - index.load_tasks()?; - - Ok(index) - } - - /// Add tasks to the index - pub fn add_tasks(&mut self, tasks: &[Task]) -> io::Result { - let mut added = 0; - - for task in tasks { - if self.task_exists(&task.id)? { - continue; - } - - let queued = QueuedTask::from_task(task.clone()); - self.heap.push(queued); - added += 1; - } - - // Update storage header - { - let mut storage = self.storage.write(); - storage.header_mut().entry_count += added as u64; - storage.flush()?; - } - - Ok(added) - } - - /// Get the next batch of ready tasks - pub fn get_next_batch(&mut self, max_count: usize) -> io::Result> { - let mut tasks = Vec::with_capacity(max_count); - let mut to_requeue = Vec::new(); - - while tasks.len() < max_count { - match self.heap.pop() { - Some(queued) => { - if queued.task.is_ready() { - tasks.push(queued.task); - } else { - // Not ready yet, requeue - to_requeue.push(queued); - } - } - None => break, - } - } - - // Requeue tasks that weren't ready - for queued in to_requeue { - self.heap.push(queued); - } - - Ok(tasks) - } - - /// Get count of tasks with given status - pub fn get_task_count(&self, status: &str) -> usize { - // For now, return heap size as approximation - // Full implementation would scan storage - self.heap.len() - } - - fn task_exists(&self, id: &str) -> io::Result { - // Check if task already exists - // Full implementation would check storage - Ok(false) - } - - fn load_tasks(&mut self) -> io::Result<()> { - // Load tasks from storage into heap - // Full implementation would deserialize from storage - Ok(()) - } -} - -#[cfg(test)] -mod tests { - use super::*; - use tempfile::TempDir; - - #[test] - fn test_add_and_get_tasks() { - let temp = TempDir::new().unwrap(); - let mut index = QueueIndexImpl::open(temp.path()).unwrap(); - - let tasks = vec![ - Task::new("task-1", "job-a"), - Task::new("task-2", "job-b"), - ]; - - let added = index.add_tasks(&tasks).unwrap(); - assert_eq!(added, 2); - - let batch = index.get_next_batch(10).unwrap(); - assert_eq!(batch.len(), 2); - } - - #[test] - fn test_priority_ordering() { - let mut heap = BinaryHeap::new(); - - let task1 = Task::new("task-1", "job-a"); - let mut task2 = Task::new("task-2", "job-b"); - task2.priority = 100; // Higher priority - - heap.push(QueuedTask::from_task(task1)); - heap.push(QueuedTask::from_task(task2)); - - // Higher priority should come first - let first = heap.pop().unwrap(); - assert_eq!(first.task.id, "task-2"); - } -} diff --git a/native/rust/queue_index/src/lib.rs b/native/rust/queue_index/src/lib.rs deleted file mode 100644 index e637cc1..0000000 --- a/native/rust/queue_index/src/lib.rs +++ /dev/null @@ -1,309 +0,0 @@ -//! Queue Index - High-performance priority queue with mmap persistence -//! -//! This crate provides a Rust implementation of the queue index with FFI exports -//! for integration with Go. It uses memory-mapped files for persistence and -//! crossbeam for lock-free concurrent operations. - -use std::ffi::{CStr, CString}; -use std::os::raw::{c_char, c_int}; -use std::path::PathBuf; -use std::ptr; -use std::sync::{Arc, Mutex}; - -mod index; -mod storage; -mod task; - -pub use index::QueueIndex; -pub use storage::IndexStorage; -pub use task::Task; - -use index::QueueIndexImpl; - -/// Opaque handle for queue index -pub struct QiIndex { - inner: Arc>, - last_error: Mutex>, -} - -/// Task structure - matches C FFI layout -#[repr(C)] -pub struct QiTask { - pub id: [c_char; 64], - pub job_name: [c_char; 128], - pub priority: i64, - pub created_at: i64, - pub next_retry: i64, - pub status: [c_char; 16], - pub retries: u32, -} - -impl QiTask { - fn from_task(task: &Task) -> Self { - let mut qi_task = QiTask { - id: [0; 64], - job_name: [0; 128], - priority: task.priority, - created_at: task.created_at, - next_retry: task.next_retry, - status: [0; 16], - retries: task.retries, - }; - - // Copy strings with null termination - Self::copy_str(&mut qi_task.id, &task.id, 64); - Self::copy_str(&mut qi_task.job_name, &task.job_name, 128); - Self::copy_str(&mut qi_task.status, &task.status, 16); - - qi_task - } - - fn copy_str(dest: &mut [c_char], src: &str, max_len: usize) { - let bytes = src.as_bytes(); - let len = bytes.len().min(max_len - 1); - for i in 0..len { - dest[i] = bytes[i] as c_char; - } - dest[len] = 0; - } - - fn to_task(&self) -> Task { - Task { - id: Self::cstr_to_string(&self.id), - job_name: Self::cstr_to_string(&self.job_name), - priority: self.priority, - created_at: self.created_at, - next_retry: self.next_retry, - status: Self::cstr_to_string(&self.status), - retries: self.retries, - } - } - - fn cstr_to_string(arr: &[c_char]) -> String { - let bytes: Vec = arr.iter() - .take_while(|&&c| c != 0) - .map(|&c| c as u8) - .collect(); - String::from_utf8_lossy(&bytes).to_string() - } -} - -/// Open or create a queue index at the given directory -/// -/// # Safety -/// path must be a valid null-terminated UTF-8 string -#[no_mangle] -pub unsafe extern "C" fn qi_open(path: *const c_char) -> *mut QiIndex { - if path.is_null() { - return ptr::null_mut(); - } - - let path_str = match CStr::from_ptr(path).to_str() { - Ok(s) => s, - Err(_) => return ptr::null_mut(), - }; - - let path_buf = PathBuf::from(path_str); - - let result = std::panic::catch_unwind(|| { - match QueueIndexImpl::open(path_buf) { - Ok(inner) => { - let index = QiIndex { - inner: Arc::new(Mutex::new(inner)), - last_error: Mutex::new(None), - }; - Box::into_raw(Box::new(index)) - } - Err(e) => { - eprintln!("Failed to open queue index: {}", e); - ptr::null_mut() - } - } - }); - - match result { - Ok(ptr) => ptr, - Err(_) => { - eprintln!("Panic in qi_open"); - ptr::null_mut() - } - } -} - -/// Close and free a queue index -/// -/// # Safety -/// idx must be a valid pointer returned by qi_open, or null -#[no_mangle] -pub unsafe extern "C" fn qi_close(idx: *mut QiIndex) { - if !idx.is_null() { - let _ = std::panic::catch_unwind(|| { - drop(Box::from_raw(idx)); - }); - } -} - -/// Add tasks to the index in a batch -/// -/// # Safety -/// idx must be valid, tasks must point to count valid QiTask structs -#[no_mangle] -pub unsafe extern "C" fn qi_add_tasks( - idx: *mut QiIndex, - tasks: *const QiTask, - count: u32, -) -> c_int { - if idx.is_null() || tasks.is_null() || count == 0 { - return -1; - } - - let result = std::panic::catch_unwind(|| { - let index = &*idx; - let mut inner = index.inner.lock().unwrap(); - - let task_slice = std::slice::from_raw_parts(tasks, count as usize); - let rust_tasks: Vec = task_slice.iter().map(|t| t.to_task()).collect(); - - match inner.add_tasks(&rust_tasks) { - Ok(added) => added as c_int, - Err(e) => { - let mut error_guard = index.last_error.lock().unwrap(); - *error_guard = Some(e.to_string()); - -1 - } - } - }); - - match result { - Ok(n) => n, - Err(_) => { - eprintln!("Panic in qi_add_tasks"); - -1 - } - } -} - -/// Get the next batch of tasks from the priority queue -/// -/// # Safety -/// idx must be valid, out_tasks must have space for max_count tasks, out_count must be valid -#[no_mangle] -pub unsafe extern "C" fn qi_get_next_batch( - idx: *mut QiIndex, - out_tasks: *mut QiTask, - max_count: u32, - out_count: *mut u32, -) -> c_int { - if idx.is_null() || out_tasks.is_null() || out_count.is_null() || max_count == 0 { - return -1; - } - - let result = std::panic::catch_unwind(|| { - let index = &*idx; - let mut inner = index.inner.lock().unwrap(); - - match inner.get_next_batch(max_count as usize) { - Ok(tasks) => { - let count = tasks.len().min(max_count as usize); - let out_slice = std::slice::from_raw_parts_mut(out_tasks, count); - - for (i, task) in tasks.iter().take(count).enumerate() { - out_slice[i] = QiTask::from_task(&task); - } - - *out_count = count as u32; - 0 - } - Err(e) => { - let mut error_guard = index.last_error.lock().unwrap(); - *error_guard = Some(e.to_string()); - -1 - } - } - }); - - match result { - Ok(rc) => rc, - Err(_) => { - eprintln!("Panic in qi_get_next_batch"); - -1 - } - } -} - -/// Get the last error message for an index -/// -/// # Safety -/// idx must be valid. Returns a static string that must not be freed. -#[no_mangle] -pub unsafe extern "C" fn qi_last_error(idx: *mut QiIndex) -> *const c_char { - if idx.is_null() { - return ptr::null(); - } - - let result = std::panic::catch_unwind(|| { - let index = &*idx; - let error_guard = index.last_error.lock().unwrap(); - - match error_guard.as_ref() { - Some(err) => { - // Leak the CString to return a stable pointer - // Caller must not free this - CString::new(err.clone()).unwrap().into_raw() - } - None => ptr::null(), - } - }); - - match result { - Ok(ptr) => ptr, - Err(_) => ptr::null(), - } -} - -/// Clear the last error -/// -/// # Safety -/// idx must be valid -#[no_mangle] -pub unsafe extern "C" fn qi_clear_error(idx: *mut QiIndex) { - if idx.is_null() { - return; - } - - let _ = std::panic::catch_unwind(|| { - let index = &*idx; - let mut error_guard = index.last_error.lock().unwrap(); - *error_guard = None; - }); -} - -/// Get the count of tasks with a given status -/// -/// # Safety -/// idx must be valid, status must be a null-terminated string -#[no_mangle] -pub unsafe extern "C" fn qi_get_task_count( - idx: *mut QiIndex, - status: *const c_char, -) -> usize { - if idx.is_null() || status.is_null() { - return 0; - } - - let status_str = match CStr::from_ptr(status).to_str() { - Ok(s) => s, - Err(_) => return 0, - }; - - let result = std::panic::catch_unwind(|| { - let index = &*idx; - let inner = index.inner.lock().unwrap(); - inner.get_task_count(status_str) - }); - - match result { - Ok(count) => count, - Err(_) => 0, - } -} diff --git a/native/rust/queue_index/src/storage.rs b/native/rust/queue_index/src/storage.rs deleted file mode 100644 index c50cba7..0000000 --- a/native/rust/queue_index/src/storage.rs +++ /dev/null @@ -1,202 +0,0 @@ -//! Memory-mapped storage with safe access patterns -//! -//! Design: Unsafe raw pointers are contained within RawStorage. -//! All public access goes through IndexStorage with safe methods. - -use memmap2::{MmapMut, MmapOptions}; -use std::fs::{File, OpenOptions}; -use std::io::{self, Write}; -use std::path::{Path, PathBuf}; -use std::sync::Arc; - -/// Header stored at the beginning of the mmap file -#[repr(C)] -#[derive(Debug, Clone, Copy)] -pub struct IndexHeader { - pub version: u64, - pub magic: [u8; 8], - pub entry_count: u64, - pub last_modified: i64, - pub checksum: u64, -} - -impl IndexHeader { - pub const MAGIC: [u8; 8] = *b"FETCHIDX"; - pub const VERSION: u64 = 1; - pub const SIZE: usize = std::mem::size_of::(); - - pub fn new() -> Self { - Self { - version: Self::VERSION, - magic: Self::MAGIC, - entry_count: 0, - last_modified: 0, - checksum: 0, - } - } - - pub fn is_valid(&self) -> bool { - self.magic == Self::MAGIC && self.version == Self::VERSION - } -} - -/// Internal unsafe state - never exposed directly -struct RawStorage { - mmap: MmapMut, - header_ptr: *mut IndexHeader, - data_offset: usize, -} - -impl RawStorage { - fn new(mmap: MmapMut) -> io::Result { - let ptr = mmap.as_ptr() as *mut u8; - let header_ptr = ptr as *mut IndexHeader; - - Ok(Self { - mmap, - header_ptr, - data_offset: IndexHeader::SIZE, - }) - } - - fn header(&self) -> &IndexHeader { - unsafe { &*self.header_ptr } - } - - fn header_mut(&mut self) -> &mut IndexHeader { - unsafe { &mut *self.header_ptr } - } -} - -/// Public safe interface to mmap storage -pub struct IndexStorage { - raw: RawStorage, - path: PathBuf, -} - -impl IndexStorage { - /// Open or create storage at the given path - pub fn open>(path: P) -> io::Result { - let path = path.as_ref().to_path_buf(); - std::fs::create_dir_all(&path)?; - - let file_path = path.join("index.dat"); - let file = OpenOptions::new() - .read(true) - .write(true) - .create(true) - .open(&file_path)?; - - // Ensure file is at least header size - let file_size = file.metadata()?.len() as usize; - let min_size = IndexHeader::SIZE; - - if file_size < min_size { - let header = IndexHeader::new(); - let header_bytes = unsafe { - std::slice::from_raw_parts( - &header as *const IndexHeader as *const u8, - IndexHeader::SIZE, - ) - }; - file.set_len(min_size as u64)?; - file.write_all_at(header_bytes, 0)?; - } - - let mmap = unsafe { MmapOptions::new().map_mut(&file)? }; - let raw = RawStorage::new(mmap)?; - - Ok(Self { raw, path }) - } - - /// Get a reference to the header (read-only) - pub fn header(&self) -> &IndexHeader { - self.raw.header() - } - - /// Get a mutable reference to the header - pub fn header_mut(&mut self) -> &mut IndexHeader { - self.raw.header_mut() - } - - /// Verify header magic and version - pub fn verify(&self) -> io::Result<()> { - let header = self.header(); - if !header.is_valid() { - return Err(io::Error::new( - io::ErrorKind::InvalidData, - "Invalid index header (wrong magic or version)", - )); - } - Ok(()) - } - - /// Get the path to the storage directory - pub fn path(&self) -> &Path { - &self.path - } - - /// Flush changes to disk - pub fn flush(&mut self) -> io::Result<()> { - self.raw.mmap.flush() - } -} - -/// Thread-safe wrapper for concurrent access -pub struct SharedStorage { - inner: Arc>, -} - -impl SharedStorage { - pub fn open>(path: P) -> io::Result { - let storage = IndexStorage::open(path)?; - Ok(Self { - inner: Arc::new(Mutex::new(storage)), - }) - } - - pub fn lock(&self) -> MutexGuard { - self.inner.lock().unwrap() - } -} - -#[cfg(test)] -mod tests { - use super::*; - use tempfile::TempDir; - - #[test] - fn test_storage_creation() { - let temp = TempDir::new().unwrap(); - let storage = IndexStorage::open(temp.path()).unwrap(); - assert!(storage.header().is_valid()); - } - - #[test] - fn test_storage_verify() { - let temp = TempDir::new().unwrap(); - let storage = IndexStorage::open(temp.path()).unwrap(); - assert!(storage.verify().is_ok()); - } - - #[test] - fn test_shared_storage() { - let temp = TempDir::new().unwrap(); - let shared = SharedStorage::open(temp.path()).unwrap(); - - { - let storage = shared.lock(); - assert!(storage.header().is_valid()); - } - - { - let mut storage = shared.lock(); - storage.header_mut().entry_count = 42; - } - - { - let storage = shared.lock(); - assert_eq!(storage.header().entry_count, 42); - } - } -} diff --git a/native/rust/queue_index/src/task.rs b/native/rust/queue_index/src/task.rs deleted file mode 100644 index 6d27e2c..0000000 --- a/native/rust/queue_index/src/task.rs +++ /dev/null @@ -1,74 +0,0 @@ -//! Task definition and serialization - -use serde::{Deserialize, Serialize}; - -/// Task structure - matches both C FFI and Go queue.Task -#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq)] -pub struct Task { - pub id: String, - pub job_name: String, - pub priority: i64, - pub created_at: i64, - pub next_retry: i64, - pub status: String, - pub retries: u32, -} - -impl Task { - /// Create a new task with default values - pub fn new(id: impl Into, job_name: impl Into) -> Self { - Self { - id: id.into(), - job_name: job_name.into(), - priority: 0, - created_at: chrono::Utc::now().timestamp_nanos_opt().unwrap_or(0), - next_retry: 0, - status: "queued".to_string(), - retries: 0, - } - } - - /// Serialize to JSON bytes - pub fn to_json(&self) -> Result, serde_json::Error> { - serde_json::to_vec(self) - } - - /// Deserialize from JSON bytes - pub fn from_json(data: &[u8]) -> Result { - serde_json::from_slice(data) - } - - /// Check if task is ready to be scheduled - pub fn is_ready(&self) -> bool { - self.status == "queued" && (self.next_retry == 0 || self.next_retry <= current_time()) - } -} - -fn current_time() -> i64 { - chrono::Utc::now().timestamp_nanos_opt().unwrap_or(0) -} - -#[cfg(test)] -mod tests { - use super::*; - - #[test] - fn test_task_serialization() { - let task = Task::new("task-1", "test-job"); - let json = task.to_json().unwrap(); - let deserialized = Task::from_json(&json).unwrap(); - assert_eq!(task.id, deserialized.id); - assert_eq!(task.job_name, deserialized.job_name); - } - - #[test] - fn test_task_ready() { - let mut task = Task::new("task-1", "test-job"); - task.status = "queued".to_string(); - task.next_retry = 0; - assert!(task.is_ready()); - - task.next_retry = current_time() + 1_000_000_000; // 1 second in future - assert!(!task.is_ready()); - } -} diff --git a/native/rust/rust-toolchain.toml b/native/rust/rust-toolchain.toml deleted file mode 100644 index e22c344..0000000 --- a/native/rust/rust-toolchain.toml +++ /dev/null @@ -1,3 +0,0 @@ -[toolchain] -channel = "1.85.0" -components = ["clippy", "rustfmt"]