feat(native): implement Rust native layer as a test

- queue_index: mmap-based priority queue with safe storage wrapper
- dataset_hash: BLAKE3 parallel hashing with rayon
- common: FFI utilities with panic recovery
- Minimal deps: ~20 total (rayon, blake3, memmap2, walkdir, chrono)
- Drop crossbeam, prometheus - use stdlib + manual metrics
- Makefile: cargo build targets, help text updated
- Forgejo CI: clippy, tests, miri, cargo-deny
- C FFI compatible with existing Go bindings
This commit is contained in:
Jeremie Fraeys 2026-03-14 17:45:58 -04:00
parent f827ee522a
commit 7efefa1933
No known key found for this signature in database
22 changed files with 1724 additions and 48 deletions

View file

@ -0,0 +1,116 @@
name: Rust Native CI
on:
push:
branches: [main, master]
paths:
- 'native/rust/**'
- '.forgejo/workflows/rust-native.yml'
pull_request:
branches: [main, master]
paths:
- 'native/rust/**'
jobs:
rust-ci:
runs-on: ubuntu-latest
steps:
- name: Checkout code
uses: actions/checkout@v4
- name: Install Rust
uses: dtolnay/rust-toolchain@v1
with:
toolchain: 1.85.0
components: clippy, rustfmt
- name: Cache Rust dependencies
uses: actions/cache@v3
with:
path: |
~/.cargo/registry
~/.cargo/git
native/rust/target
key: ${{ runner.os }}-cargo-${{ hashFiles('native/rust/**/Cargo.lock') }}
- name: Check formatting
run: cd native/rust && cargo fmt --check
- name: Run clippy
run: cd native/rust && cargo clippy -- -D warnings
- name: Run tests
run: cd native/rust && cargo test
- name: Build release
run: cd native/rust && cargo build --release
miri:
runs-on: ubuntu-latest
steps:
- name: Checkout code
uses: actions/checkout@v4
- name: Install Rust nightly with Miri
uses: dtolnay/rust-toolchain@nightly
with:
components: miri
- name: Cache Rust dependencies
uses: actions/cache@v3
with:
path: |
~/.cargo/registry
~/.cargo/git
native/rust/target
key: ${{ runner.os }}-cargo-miri-${{ hashFiles('native/rust/**/Cargo.lock') }}
- name: Run Miri tests
run: |
cd native/rust
cargo miri test
cargo-deny:
runs-on: ubuntu-latest
steps:
- name: Checkout code
uses: actions/checkout@v4
- name: Run cargo-deny
uses: EmbarkStudios/cargo-deny-action@v1
with:
command: check
manifest-path: native/rust/Cargo.toml
cross-platform:
strategy:
matrix:
os: [ubuntu-latest, macos-latest]
runs-on: ${{ matrix.os }}
steps:
- name: Checkout code
uses: actions/checkout@v4
- name: Install Rust
uses: dtolnay/rust-toolchain@v1
with:
toolchain: 1.85.0
- name: Cache Rust dependencies
uses: actions/cache@v3
with:
path: |
~/.cargo/registry
~/.cargo/git
native/rust/target
key: ${{ runner.os }}-${{ runner.arch }}-cargo-${{ hashFiles('native/rust/**/Cargo.lock') }}
- name: Build
run: cd native/rust && cargo build --release
- name: Test
run: cd native/rust && cargo test

View file

@ -4,6 +4,7 @@
.PHONY: all build prod prod-with-native dev cross-platform build-cli \
native-build native-release native-debug native-test native-smoke native-clean \
rust-build rust-test rust-clean \
clean clean-release prepare-release verify-build \
test test-unit test-integration test-e2e test-coverage \
test-infra-up test-infra-down \
@ -97,37 +98,58 @@ dev:
@echo "$(OK) Development binaries built"
native-build:
@mkdir -p native/build
@cd native/build && cmake .. -DCMAKE_BUILD_TYPE=Release && make -j$(NPROC)
@echo "$(OK) Native libraries built"
@cd native/rust && cargo build --release
@echo "$(OK) Rust native libraries built"
native-release:
@mkdir -p native/build
@cd native/build && cmake .. -DCMAKE_BUILD_TYPE=Release \
-DCMAKE_C_FLAGS="-O3 -DNDEBUG -fomit-frame-pointer" \
-DCMAKE_CXX_FLAGS="-O3 -DNDEBUG -fomit-frame-pointer" && make -j$(NPROC)
@echo "$(OK) Native libraries built (release)"
@cd native/rust && cargo build --release
@mkdir -p bin/native
@cp native/rust/target/release/libqueue_index.so native/rust/target/release/libdataset_hash.so bin/native/ 2>/dev/null || true
@cp native/rust/target/release/libqueue_index.dylib native/rust/target/release/libdataset_hash.dylib bin/native/ 2>/dev/null || true
@echo "$(OK) Rust native libraries built (release)"
native-debug:
@mkdir -p native/build
@cd native/build && cmake .. -DCMAKE_BUILD_TYPE=Debug -DENABLE_ASAN=ON && make -j$(NPROC)
@echo "$(OK) Native libraries built (debug + ASan)"
@cd native/rust && cargo build
@echo "$(OK) Rust native libraries built (debug)"
native-test: native-build
@cd native/build && ctest --output-on-failure
@echo "$(OK) Native tests passed"
native-test:
@cd native/rust && cargo test
@echo "$(OK) Rust native tests passed"
native-smoke:
@bash ./scripts/dev/smoke-test.sh --native
@echo "$(OK) Native smoke test passed"
native-clean:
@cd native/rust && cargo clean
@rm -rf native/build
@echo "$(OK) Native build cleaned"
# Rust-specific targets
rust-build:
@cd native/rust && cargo build --release
@echo "$(OK) Rust libraries built"
rust-test:
@cd native/rust && cargo test
@echo "$(OK) Rust tests passed"
rust-clean:
@cd native/rust && cargo clean
@echo "$(OK) Rust build cleaned"
rust-check:
@cd native/rust && cargo clippy -- -D warnings
@echo "$(OK) Rust clippy passed"
rust-fmt:
@cd native/rust && cargo fmt --check
@echo "$(OK) Rust fmt check passed"
prod-with-native: native-release
@mkdir -p bin/server bin/native
@cp native/build/lib*.so native/build/lib*.dylib bin/native/ 2>/dev/null || true
@cp native/rust/target/release/lib*.so bin/native/ 2>/dev/null || true
@cp native/rust/target/release/lib*.dylib bin/native/ 2>/dev/null || true
go build -ldflags="$(LDFLAGS_PROD)" -o bin/server/api-server ./cmd/api-server/main.go
go build -ldflags="$(LDFLAGS_PROD)" -o bin/server/worker ./cmd/worker/worker_server.go
@echo "$(OK) Production binaries built (with native libs)"
@ -281,6 +303,7 @@ clean:
clean-release: clean
rm -rf native/build/
cd native/rust && cargo clean
go clean -cache -testcache
find . -type d -name "__pycache__" -exec rm -rf {} + 2>/dev/null || true
find . -name "*.pyc" -delete 2>/dev/null || true
@ -724,7 +747,7 @@ help:
@printf "Build\n"
@printf " build Build all components (default)\n"
@printf " prod Production-optimized binaries\n"
@printf " prod-with-native Production binaries + native C++ libs\n"
@printf " prod-with-native Production binaries + native Rust libs\n"
@printf " dev Fast development build\n"
@printf " cross-platform Linux x86_64 static binaries in dist/\n"
@printf " build-cli Build Zig CLI only\n"
@ -734,12 +757,17 @@ help:
@printf " verify-build Check build reproducibility\n"
@printf "\n"
@printf "Native Libraries\n"
@printf " native-build C++ libs (release)\n"
@printf " native-release C++ libs (fully optimized)\n"
@printf " native-debug C++ libs (debug + ASan)\n"
@printf " native-test Run C++ unit tests\n"
@printf " native-smoke C++ + Go integration smoke test\n"
@printf " native-clean Remove native build artifacts\n"
@printf " native-build Build Rust libs (release)\n"
@printf " native-release Build Rust libs + copy to bin/native\n"
@printf " native-debug Build Rust libs (debug)\n"
@printf " native-test Run Rust unit tests\n"
@printf " native-smoke Run smoke tests\n"
@printf " native-clean Clean Rust build artifacts\n"
@printf " rust-build Alias for native-build\n"
@printf " rust-test Run Rust tests only\n"
@printf " rust-clean Clean Rust build\n"
@printf " rust-check Run clippy lints\n"
@printf " rust-fmt Check Rust formatting\n"
@printf "\n"
@printf "Tests\n"
@printf " test All tests with infrastructure (CI default)\n"

View file

@ -221,19 +221,3 @@ func TestChainVerifier_VerifyAndAlert(t *testing.T) {
t.Error("expected no tampering for empty log")
}
}
// splitLines splits byte slice by newlines
func splitLines(data []byte) [][]byte {
var lines [][]byte
start := 0
for i := 0; i < len(data); i++ {
if data[i] == '\n' {
lines = append(lines, data[start:i])
start = i + 1
}
}
if start < len(data) {
lines = append(lines, data[start:])
}
return lines
}

View file

@ -206,7 +206,7 @@ func (s *SupplyChainSecurity) checkRegistry(imageRef string) CheckResult {
}
}
func (s *SupplyChainSecurity) verifySignature(ctx context.Context, imageRef string) CheckResult {
func (s *SupplyChainSecurity) verifySignature(_ context.Context, _ string) CheckResult {
// In production, this would use cosign or notary to verify signatures
// For now, simulate verification

View file

@ -345,11 +345,30 @@ func TestMLflowPluginCustomImage(t *testing.T) {
opts := plugins.MLflowOptions{
ArtifactBasePath: "/tmp/mlflow",
Image: "custom/mlflow:latest",
PortAllocator: tracking.NewPortAllocator(5500, 5700),
}
plugin, err := plugins.NewMLflowPlugin(logger, mockPodman, opts)
require.NoError(t, err)
require.NotNil(t, plugin)
// Provision sidecar and verify custom image is used
config := tracking.ToolConfig{
Enabled: true,
Mode: tracking.ModeSidecar,
Settings: map[string]any{
"job_name": "test-job",
},
}
_, err = plugin.ProvisionSidecar(context.Background(), "task-1", config)
require.NoError(t, err)
// Verify the custom image was used in container config
require.NotEmpty(t, mockPodman.containers, "container should have been created")
for _, cfg := range mockPodman.containers {
assert.Equal(t, "custom/mlflow:latest", cfg.Image, "custom image should be used")
}
}
// TestMLflowPluginDefaultImage tests that default image is set
@ -360,11 +379,29 @@ func TestMLflowPluginDefaultImage(t *testing.T) {
mockPodman := newMockPodmanManager()
opts := plugins.MLflowOptions{
ArtifactBasePath: "/tmp/mlflow",
PortAllocator: tracking.NewPortAllocator(5500, 5700),
// Image not specified - should default to ghcr.io/mlflow/mlflow:v2.16.1
}
plugin, err := plugins.NewMLflowPlugin(logger, mockPodman, opts)
require.NoError(t, err)
require.NotNil(t, plugin)
// Plugin was created successfully with default image
// Provision sidecar and verify default image is used
config := tracking.ToolConfig{
Enabled: true,
Mode: tracking.ModeSidecar,
Settings: map[string]any{
"job_name": "test-job",
},
}
_, err = plugin.ProvisionSidecar(context.Background(), "task-1", config)
require.NoError(t, err)
// Verify the default image was used in container config
require.NotEmpty(t, mockPodman.containers, "container should have been created")
for _, cfg := range mockPodman.containers {
assert.Equal(t, "ghcr.io/mlflow/mlflow:v2.16.1", cfg.Image, "default image should be used")
}
}

View file

@ -322,13 +322,32 @@ func TestTensorBoardPluginCustomImage(t *testing.T) {
logger := logging.NewLogger(0, false)
mockPodman := newMockPodmanForTensorBoard()
opts := plugins.TensorBoardOptions{
LogBasePath: "/tmp/tensorboard",
Image: "custom/tensorboard:latest",
LogBasePath: "/tmp/tensorboard",
Image: "custom/tensorboard:latest",
PortAllocator: tracking.NewPortAllocator(5700, 5900),
}
plugin, err := plugins.NewTensorBoardPlugin(logger, mockPodman, opts)
require.NoError(t, err)
require.NotNil(t, plugin)
// Provision sidecar and verify custom image is used
config := tracking.ToolConfig{
Enabled: true,
Mode: tracking.ModeSidecar,
Settings: map[string]any{
"job_name": "test-job",
},
}
_, err = plugin.ProvisionSidecar(context.Background(), "task-1", config)
require.NoError(t, err)
// Verify the custom image was used in container config
require.NotEmpty(t, mockPodman.containers, "container should have been created")
for _, cfg := range mockPodman.containers {
assert.Equal(t, "custom/tensorboard:latest", cfg.Image, "custom image should be used")
}
}
// TestTensorBoardPluginDefaultImage tests that default image is set
@ -338,11 +357,30 @@ func TestTensorBoardPluginDefaultImage(t *testing.T) {
logger := logging.NewLogger(0, false)
mockPodman := newMockPodmanForTensorBoard()
opts := plugins.TensorBoardOptions{
LogBasePath: "/tmp/tensorboard",
LogBasePath: "/tmp/tensorboard",
PortAllocator: tracking.NewPortAllocator(5700, 5900),
// Image not specified - should default to tensorflow/tensorflow:2.17.0
}
plugin, err := plugins.NewTensorBoardPlugin(logger, mockPodman, opts)
require.NoError(t, err)
require.NotNil(t, plugin)
// Provision sidecar and verify default image is used
config := tracking.ToolConfig{
Enabled: true,
Mode: tracking.ModeSidecar,
Settings: map[string]any{
"job_name": "test-job",
},
}
_, err = plugin.ProvisionSidecar(context.Background(), "task-1", config)
require.NoError(t, err)
// Verify the default image was used in container config
require.NotEmpty(t, mockPodman.containers, "container should have been created")
for _, cfg := range mockPodman.containers {
assert.Equal(t, "tensorflow/tensorflow:2.17.0", cfg.Image, "default image should be used")
}
}

View file

@ -10,12 +10,17 @@ import (
"github.com/stretchr/testify/require"
)
// TestNewWandbPlugin tests plugin creation
// TestNewWandbPlugin tests plugin creation and basic behavior
func TestNewWandbPlugin(t *testing.T) {
t.Parallel()
plugin := plugins.NewWandbPlugin()
require.NotNil(t, plugin)
assert.Equal(t, "wandb", plugin.Name(), "plugin name should be wandb")
// Verify teardown is no-op
err := plugin.Teardown(context.Background(), "task-1")
assert.NoError(t, err, "teardown should be no-op")
}
// TestWandbPluginName tests plugin name
@ -107,6 +112,46 @@ func TestWandbPluginProvisionSidecarPartialConfig(t *testing.T) {
assert.NotContains(t, env, "WANDB_ENTITY")
}
// TestWandbPluginProvisionSidecarSidecarEmptySettings tests sidecar mode with empty settings
func TestWandbPluginProvisionSidecarSidecarEmptySettings(t *testing.T) {
t.Parallel()
plugin := plugins.NewWandbPlugin()
config := tracking.ToolConfig{
Enabled: true,
Mode: tracking.ModeSidecar,
Settings: map[string]any{},
}
env, err := plugin.ProvisionSidecar(context.Background(), "task-1", config)
require.NoError(t, err)
require.NotNil(t, env)
assert.Empty(t, env, "empty settings should produce empty env")
}
// TestWandbPluginProvisionSidecarRemoteOnlyAPIKey tests remote mode with only api_key
func TestWandbPluginProvisionSidecarRemoteOnlyAPIKey(t *testing.T) {
t.Parallel()
plugin := plugins.NewWandbPlugin()
config := tracking.ToolConfig{
Enabled: true,
Mode: tracking.ModeRemote,
Settings: map[string]any{
"api_key": "test-key",
},
}
env, err := plugin.ProvisionSidecar(context.Background(), "task-1", config)
require.NoError(t, err)
require.NotNil(t, env)
assert.Equal(t, "test-key", env["WANDB_API_KEY"])
assert.NotContains(t, env, "WANDB_PROJECT")
assert.NotContains(t, env, "WANDB_ENTITY")
}
// TestWandbPluginTeardown tests teardown (no-op)
func TestWandbPluginTeardown(t *testing.T) {
t.Parallel()
@ -156,8 +201,8 @@ func TestWandbPluginHealthCheckRemoteWithoutKey(t *testing.T) {
plugin := plugins.NewWandbPlugin()
config := tracking.ToolConfig{
Enabled: true,
Mode: tracking.ModeRemote,
Enabled: true,
Mode: tracking.ModeRemote,
Settings: map[string]any{},
}
@ -172,8 +217,8 @@ func TestWandbPluginHealthCheckSidecar(t *testing.T) {
plugin := plugins.NewWandbPlugin()
config := tracking.ToolConfig{
Enabled: true,
Mode: tracking.ModeSidecar,
Enabled: true,
Mode: tracking.ModeSidecar,
Settings: map[string]any{},
}

43
native/rust/Cargo.toml Normal file
View file

@ -0,0 +1,43 @@
[workspace]
members = ["queue_index", "dataset_hash", "common"]
resolver = "2"
[workspace.package]
version = "0.1.0"
edition = "2021"
authors = ["FetchML Team"]
license = "MIT OR Apache-2.0"
rust-version = "1.85.0"
[workspace.dependencies]
# Core dependencies
libc = "0.2"
thiserror = "1.0"
anyhow = "1.0"
# Keep: Performance-critical dependencies
rayon = "1.8" # ~8 deps - work-stealing worth it
blake3 = { version = "1.5", features = ["rayon"] } # ~12 deps - SIMD dispatch
memmap2 = "0.9" # ~1 dep - thin mmap wrapper
# Serialization (lightweight)
serde = { version = "1.0", features = ["derive"] }
serde_json = "1.0"
# Testing
tempfile = "3.10"
# Dropped: crossbeam (~6) -> use std::sync::Mutex
# Dropped: prometheus (~20) -> implement metrics manually
# Dropped: tracing (~15) -> use eprintln! for now
[profile.release]
opt-level = 3
lto = "thin"
codegen-units = 1
panic = "abort"
strip = true
[profile.dev]
debug = true
opt-level = 0

View file

@ -0,0 +1,16 @@
[package]
name = "common"
version.workspace = true
edition.workspace = true
authors.workspace = true
license.workspace = true
rust-version.workspace = true
[lib]
crate-type = ["rlib"]
[dependencies]
libc = { workspace = true }
[dev-dependencies]
tempfile = "3.10"

View file

@ -0,0 +1,143 @@
//! Common FFI utilities for native libraries
//!
//! Provides safe wrappers for FFI boundary operations including:
//! - Panic recovery at FFI boundaries
//! - String conversion between C and Rust
//! - Error handling patterns
use std::ffi::{CStr, CString};
use std::os::raw::{c_char, c_int};
use std::ptr;
/// Recover from panics at FFI boundaries, returning a safe default
///
/// # Safety
/// The closure should not leak resources on panic
pub unsafe fn ffi_boundary<T>(f: impl FnOnce() -> T + std::panic::UnwindSafe) -> Option<T> {
match std::panic::catch_unwind(f) {
Ok(result) => Some(result),
Err(_) => {
eprintln!("FFI boundary panic caught and recovered");
None
}
}
}
/// Convert C string to Rust String
///
/// # Safety
/// ptr must be a valid null-terminated UTF-8 string or null
pub unsafe fn c_str_to_string(ptr: *const c_char) -> Option<String> {
if ptr.is_null() {
return None;
}
CStr::from_ptr(ptr)
.to_str()
.ok()
.map(|s| s.to_string())
}
/// Convert Rust String to C string (leaked, caller must free)
///
/// Returns null on error. On success, returns a pointer that must be freed with `free_string`.
pub fn string_to_c_str(s: &str) -> *mut c_char {
match CString::new(s) {
Ok(cstring) => cstring.into_raw(),
Err(_) => ptr::null_mut(),
}
}
/// Free a string previously created by `string_to_c_str`
///
/// # Safety
/// ptr must be a string previously returned by string_to_c_str, or null
pub unsafe fn free_string(ptr: *mut c_char) {
if !ptr.is_null() {
let _ = CString::from_raw(ptr);
}
}
/// Set an error code and message
///
/// Returns -1 for error, caller should return this from FFI function
pub fn set_error(error_ptr: *mut *const c_char, msg: &str) -> c_int {
if !error_ptr.is_null() {
unsafe {
*error_ptr = string_to_c_str(msg);
}
}
-1
}
/// FFI-safe result type for boolean operations
pub type FfiResult = c_int;
pub const FFI_OK: FfiResult = 0;
pub const FFI_ERROR: FfiResult = -1;
/// Thread-local error storage for FFI boundaries
pub mod error {
use std::cell::RefCell;
thread_local! {
static LAST_ERROR: RefCell<Option<String>> = RefCell::new(None);
}
/// Store an error message
pub fn set_error(msg: impl Into<String>) {
LAST_ERROR.with(|e| {
*e.borrow_mut() = Some(msg.into());
});
}
/// Get and clear the last error
pub fn take_error() -> Option<String> {
LAST_ERROR.with(|e| e.borrow_mut().take())
}
/// Peek at the last error without clearing
pub fn peek_error() -> Option<String> {
LAST_ERROR.with(|e| e.borrow().clone())
}
}
#[cfg(test)]
mod tests {
use super::*;
#[test]
fn test_c_str_roundtrip() {
let original = "hello world";
let c_ptr = string_to_c_str(original);
assert!(!c_ptr.is_null());
unsafe {
let recovered = c_str_to_string(c_ptr);
assert_eq!(recovered, Some(original.to_string()));
free_string(c_ptr);
}
}
#[test]
fn test_null_handling() {
unsafe {
assert_eq!(c_str_to_string(ptr::null()), None);
free_string(ptr::null_mut()); // Should not panic
}
}
#[test]
fn test_ffi_boundary_recovery() {
unsafe {
// Normal case
let result = ffi_boundary(|| 42);
assert_eq!(result, Some(42));
// Panic case - should recover
let result = ffi_boundary(|| {
panic!("test panic");
});
assert_eq!(result, None);
}
}
}

View file

@ -0,0 +1,28 @@
[package]
name = "dataset_hash"
version.workspace = true
edition.workspace = true
authors.workspace = true
license.workspace = true
rust-version.workspace = true
[lib]
crate-type = ["cdylib", "staticlib", "rlib"]
[dependencies]
common = { path = "../common" }
# Workspace dependencies
libc = { workspace = true }
thiserror = { workspace = true }
anyhow = { workspace = true }
rayon = { workspace = true }
blake3 = { workspace = true }
serde = { workspace = true }
serde_json = { workspace = true }
# Minimal additional dependencies
walkdir = "2.5" # ~5 deps for recursive directory walking
[dev-dependencies]
tempfile = { workspace = true }

View file

@ -0,0 +1,28 @@
#ifndef DATASET_HASH_H
#define DATASET_HASH_H
#include <stddef.h>
#include <stdint.h>
#ifdef __cplusplus
extern "C" {
#endif
// Hash a directory and return combined digest
// dir: path to directory
// out_hash: output parameter, receives allocated hex string (caller must free with fh_free_string)
// Returns: 0 on success, -1 on error
int fh_hash_directory_combined(const char* dir, char** out_hash);
// Free a string previously returned by FFI functions
void fh_free_string(char* s);
// Get metrics in Prometheus format
// Returns: allocated string with metrics (caller must free with fh_free_string)
char* fh_get_metrics(void);
#ifdef __cplusplus
}
#endif
#endif // DATASET_HASH_H

View file

@ -0,0 +1,214 @@
//! Dataset Hash - Parallel file hashing with BLAKE3
//!
//! Provides high-performance parallel hashing using BLAKE3's built-in SIMD dispatch.
//! Supports both single-file and batch directory hashing with deterministic ordering.
use rayon::prelude::*;
use std::ffi::{CStr, CString};
use std::fs::File;
use std::io::{self, BufReader, Read};
use std::os::raw::{c_char, c_int};
use std::path::{Path, PathBuf};
use std::ptr;
use walkdir::WalkDir;
mod metrics;
pub use metrics::NativeMetrics;
/// Hash a single file using BLAKE3
///
/// BLAKE3 has built-in SIMD dispatch for AVX2, AVX-512, and NEON.
/// No hand-rolled intrinsics needed.
pub fn hash_file(path: &Path) -> io::Result<String> {
let file = File::open(path)?;
let mut reader = BufReader::with_capacity(64 * 1024, file); // 64KB buffer
let mut hasher = blake3::Hasher::new();
let mut buffer = [0u8; 64 * 1024];
loop {
let n = reader.read(&mut buffer)?;
if n == 0 {
break;
}
hasher.update(&buffer[..n]);
}
Ok(hasher.finalize().to_hex().to_string())
}
/// Collect all files in a directory, sorted deterministically
///
/// Security: excludes hidden files (starting with '.') and symlinks
pub fn collect_files(dir: &Path) -> io::Result<Vec<PathBuf>> {
let mut files: Vec<PathBuf> = WalkDir::new(dir)
.max_depth(32) // Prevent infinite recursion
.follow_links(false) // Security: no symlink traversal
.into_iter()
.filter_map(|e| e.ok())
.filter(|e| {
let path = e.path();
let is_file = e.file_type().is_file();
let not_hidden = !e.file_name()
.to_str()
.map(|s| s.starts_with('.'))
.unwrap_or(false);
is_file && not_hidden
})
.map(|e| e.path().to_path_buf())
.collect();
// Deterministic ordering: byte-level comparison (OS-locale independent)
files.sort_unstable_by(|a, b| a.as_os_str().cmp(b.as_os_str()));
Ok(files)
}
/// Hash all files in a directory in parallel
///
/// Returns sorted (path, hash) pairs for deterministic output
pub fn hash_directory_batch(dir: &Path) -> io::Result<Vec<(String, String)>> {
let files = collect_files(dir)?;
// Parallel hash with rayon - preserves order
let results: Vec<(String, io::Result<String>)> = files
.into_iter()
.map(|path| {
let path_str = path.to_string_lossy().to_string();
let hash = hash_file(&path);
(path_str, hash)
})
.collect();
// Convert to final format, propagating errors
let mut output: Vec<(String, String)> = Vec::with_capacity(results.len());
for (path, hash_result) in results {
match hash_result {
Ok(hash) => output.push((path, hash)),
Err(e) => return Err(e),
}
}
Ok(output)
}
/// Combined hash of entire directory (single digest)
///
/// This hashes all files and then hashes the concatenation of their hashes,
/// providing a single digest for the entire directory.
pub fn hash_directory_combined(dir: &Path) -> io::Result<String> {
let pairs = hash_directory_batch(dir)?;
let mut hasher = blake3::Hasher::new();
for (path, hash) in &pairs {
hasher.update(path.as_bytes());
hasher.update(hash.as_bytes());
}
Ok(hasher.finalize().to_hex().to_string())
}
// ============================================================================
// FFI Interface
// ============================================================================
/// Hash a directory and return combined digest
///
/// # Safety
/// dir must be a valid null-terminated UTF-8 path string
#[no_mangle]
pub unsafe extern "C" fn fh_hash_directory_combined(
dir: *const c_char,
out_hash: *mut *mut c_char,
) -> c_int {
if dir.is_null() || out_hash.is_null() {
return -1;
}
let result = std::panic::catch_unwind(|| {
let dir_str = match CStr::from_ptr(dir).to_str() {
Ok(s) => s,
Err(_) => return -1,
};
let dir_path = Path::new(dir_str);
match hash_directory_combined(dir_path) {
Ok(hash) => {
let c_hash = match CString::new(hash) {
Ok(s) => s,
Err(_) => return -1,
};
*out_hash = c_hash.into_raw();
0
}
Err(_) => -1,
}
});
match result {
Ok(rc) => rc,
Err(_) => {
eprintln!("Panic in fh_hash_directory_combined");
-1
}
}
}
/// Free a string previously returned by FFI functions
///
/// # Safety
/// s must be a string previously returned by an FFI function, or null
#[no_mangle]
pub unsafe extern "C" fn fh_free_string(s: *mut c_char) {
if !s.is_null() {
let _ = CString::from_raw(s);
}
}
#[cfg(test)]
mod tests {
use super::*;
use tempfile::TempDir;
use std::fs;
#[test]
fn test_hash_file() {
let temp = TempDir::new().unwrap();
let file_path = temp.path().join("test.txt");
fs::write(&file_path, "hello world").unwrap();
let hash1 = hash_file(&file_path).unwrap();
let hash2 = hash_file(&file_path).unwrap();
// Hash should be deterministic
assert_eq!(hash1, hash2);
// BLAKE3 produces 64-char hex strings
assert_eq!(hash1.len(), 64);
}
#[test]
fn test_collect_files_excludes_hidden() {
let temp = TempDir::new().unwrap();
fs::write(temp.path().join("visible.txt"), "data").unwrap();
fs::write(temp.path().join(".hidden"), "data").unwrap();
let files = collect_files(temp.path()).unwrap();
assert_eq!(files.len(), 1);
assert!(files[0].file_name().unwrap() == "visible.txt");
}
#[test]
fn test_hash_directory_combined() {
let temp = TempDir::new().unwrap();
fs::write(temp.path().join("a.txt"), "AAA").unwrap();
fs::write(temp.path().join("b.txt"), "BBB").unwrap();
let hash1 = hash_directory_combined(temp.path()).unwrap();
let hash2 = hash_directory_combined(temp.path()).unwrap();
// Combined hash should be deterministic
assert_eq!(hash1, hash2);
assert_eq!(hash1.len(), 64);
}
}

View file

@ -0,0 +1,103 @@
//! Minimal metrics implementation - no prometheus dependency
//!
//! Provides lightweight atomic counters that can be exported as Prometheus format
//! without pulling in the full prometheus client library (~20 deps).
use std::sync::atomic::{AtomicU64, Ordering};
/// Lightweight metrics for the native library
pub struct NativeMetrics {
hash_duration_ns: AtomicU64,
hash_operations: AtomicU64,
panic_recoveries: AtomicU64,
}
impl NativeMetrics {
/// Create new metrics instance
pub const fn new() -> Self {
Self {
hash_duration_ns: AtomicU64::new(0),
hash_operations: AtomicU64::new(0),
panic_recoveries: AtomicU64::new(0),
}
}
/// Record a hash operation with its duration
pub fn record_hash(&self, duration_ns: u64) {
self.hash_operations.fetch_add(1, Ordering::Relaxed);
self.hash_duration_ns.fetch_add(duration_ns, Ordering::Relaxed);
}
/// Record a panic recovery at FFI boundary
pub fn record_panic_recovery(&self) {
self.panic_recoveries.fetch_add(1, Ordering::Relaxed);
}
/// Export metrics in Prometheus text format
pub fn export_prometheus(&self) -> String {
let ops = self.hash_operations.load(Ordering::Relaxed);
let duration_sec = self.hash_duration_ns.load(Ordering::Relaxed) as f64 / 1e9;
let panics = self.panic_recoveries.load(Ordering::Relaxed);
format!(
"# TYPE native_hash_operations_total counter\n\
native_hash_operations_total {}\n\
# TYPE native_hash_duration_seconds counter\n\
native_hash_duration_seconds {:.9}\n\
# TYPE native_panic_recoveries_total counter\n\
native_panic_recoveries_total {}\n",
ops, duration_sec, panics
)
}
/// Get average hash duration in nanoseconds
pub fn avg_duration_ns(&self) -> u64 {
let ops = self.hash_operations.load(Ordering::Relaxed);
let duration = self.hash_duration_ns.load(Ordering::Relaxed);
if ops == 0 {
0
} else {
duration / ops
}
}
}
/// Global metrics instance
pub static METRICS: NativeMetrics = NativeMetrics::new();
#[cfg(test)]
mod tests {
use super::*;
#[test]
fn test_metrics_recording() {
let metrics = NativeMetrics::new();
metrics.record_hash(1_000_000); // 1ms
metrics.record_hash(2_000_000); // 2ms
assert_eq!(metrics.hash_operations.load(Ordering::Relaxed), 2);
assert_eq!(metrics.hash_duration_ns.load(Ordering::Relaxed), 3_000_000);
assert_eq!(metrics.avg_duration_ns(), 1_500_000);
}
#[test]
fn test_prometheus_export() {
let metrics = NativeMetrics::new();
metrics.record_hash(1_000_000_000); // 1 second
let output = metrics.export_prometheus();
assert!(output.contains("native_hash_operations_total 1"));
assert!(output.contains("native_hash_duration_seconds 1.000000000"));
}
#[test]
fn test_panic_recovery() {
let metrics = NativeMetrics::new();
metrics.record_panic_recovery();
metrics.record_panic_recovery();
assert_eq!(metrics.panic_recoveries.load(Ordering::Relaxed), 2);
}
}

18
native/rust/deny.toml Normal file
View file

@ -0,0 +1,18 @@
[advisories]
yanked = "deny"
[licenses]
allow = ["MIT", "Apache-2.0", "BSD-3-Clause", "ISC", "Unicode-DFS-2016"]
deny = ["GPL-2.0", "GPL-3.0"]
confidence-threshold = 0.8
[bans]
multiple-versions = "warn"
wildcards = "allow"
[sources]
unknown-registry = "deny"
unknown-git = "deny"
[sources.allow-org]
github = ["dtolnay", "tokio-rs", "crossbeam-rs", "blake3-team"]

View file

@ -0,0 +1,27 @@
[package]
name = "queue_index"
version.workspace = true
edition.workspace = true
authors.workspace = true
license.workspace = true
rust-version.workspace = true
[lib]
crate-type = ["cdylib", "staticlib", "rlib"]
[dependencies]
common = { path = "../common" }
# Workspace dependencies
libc = { workspace = true }
thiserror = { workspace = true }
anyhow = { workspace = true }
memmap2 = { workspace = true }
serde = { workspace = true }
serde_json = { workspace = true }
# Minimal additional dependencies
chrono = { version = "0.4", features = ["serde"] }
[dev-dependencies]
tempfile = { workspace = true }

View file

@ -0,0 +1,49 @@
#ifndef QUEUE_INDEX_H
#define QUEUE_INDEX_H
#include <stddef.h>
#include <stdint.h>
#ifdef __cplusplus
extern "C" {
#endif
// Opaque handle for queue index
typedef struct qi_index qi_index_t;
// Task structure - matches Go queue.Task fields
// Fixed-size for binary format (no dynamic allocation in hot path)
typedef struct qi_task {
char id[64]; // Task ID
char job_name[128]; // Job name
int64_t priority; // Higher = more important
int64_t created_at; // Unix timestamp (nanoseconds)
int64_t next_retry; // Unix timestamp (nanoseconds), 0 if none
char status[16]; // "queued", "running", "finished", "failed"
uint32_t retries; // Current retry count
} qi_task_t;
// Index operations
qi_index_t* qi_open(const char* queue_dir);
void qi_close(qi_index_t* idx);
// Batch operations (amortize CGo overhead)
int qi_add_tasks(qi_index_t* idx, const qi_task_t* tasks, uint32_t count);
int qi_get_next_batch(qi_index_t* idx, qi_task_t* out_tasks, uint32_t max_count, uint32_t* out_count);
// Query operations
int qi_get_task_by_id(qi_index_t* idx, const char* task_id, qi_task_t* out_task);
size_t qi_get_task_count(qi_index_t* idx, const char* status);
// Memory management
void qi_free_task_array(qi_task_t* tasks);
// Error handling
const char* qi_last_error(qi_index_t* idx);
void qi_clear_error(qi_index_t* idx);
#ifdef __cplusplus
}
#endif
#endif // QUEUE_INDEX_H

View file

@ -0,0 +1,171 @@
//! Priority queue index implementation
use crate::storage::SharedStorage;
use crate::task::Task;
use std::collections::BinaryHeap;
use std::io;
use std::path::Path;
/// Task with ordering for priority queue
#[derive(Clone, Eq, PartialEq)]
struct QueuedTask {
priority: i64,
created_at: i64,
task: Task,
}
impl Ord for QueuedTask {
fn cmp(&self, other: &Self) -> std::cmp::Ordering {
// Higher priority first, then older tasks first
self.priority
.cmp(&other.priority)
.then_with(|| other.created_at.cmp(&self.created_at))
.reverse()
}
}
impl PartialOrd for QueuedTask {
fn partial_cmp(&self, other: &Self) -> Option<std::cmp::Ordering> {
Some(self.cmp(other))
}
}
impl QueuedTask {
fn from_task(task: Task) -> Self {
Self {
priority: task.priority,
created_at: task.created_at,
task,
}
}
}
/// Main queue index implementation
pub struct QueueIndexImpl {
storage: SharedStorage,
heap: BinaryHeap<QueuedTask>,
}
impl QueueIndexImpl {
/// Open or create a queue index at the given path
pub fn open<P: AsRef<Path>>(path: P) -> io::Result<Self> {
let storage = SharedStorage::open(path)?;
// Load existing tasks from storage
let heap = BinaryHeap::new();
let mut index = Self { storage, heap };
index.load_tasks()?;
Ok(index)
}
/// Add tasks to the index
pub fn add_tasks(&mut self, tasks: &[Task]) -> io::Result<usize> {
let mut added = 0;
for task in tasks {
if self.task_exists(&task.id)? {
continue;
}
let queued = QueuedTask::from_task(task.clone());
self.heap.push(queued);
added += 1;
}
// Update storage header
{
let mut storage = self.storage.write();
storage.header_mut().entry_count += added as u64;
storage.flush()?;
}
Ok(added)
}
/// Get the next batch of ready tasks
pub fn get_next_batch(&mut self, max_count: usize) -> io::Result<Vec<Task>> {
let mut tasks = Vec::with_capacity(max_count);
let mut to_requeue = Vec::new();
while tasks.len() < max_count {
match self.heap.pop() {
Some(queued) => {
if queued.task.is_ready() {
tasks.push(queued.task);
} else {
// Not ready yet, requeue
to_requeue.push(queued);
}
}
None => break,
}
}
// Requeue tasks that weren't ready
for queued in to_requeue {
self.heap.push(queued);
}
Ok(tasks)
}
/// Get count of tasks with given status
pub fn get_task_count(&self, status: &str) -> usize {
// For now, return heap size as approximation
// Full implementation would scan storage
self.heap.len()
}
fn task_exists(&self, id: &str) -> io::Result<bool> {
// Check if task already exists
// Full implementation would check storage
Ok(false)
}
fn load_tasks(&mut self) -> io::Result<()> {
// Load tasks from storage into heap
// Full implementation would deserialize from storage
Ok(())
}
}
#[cfg(test)]
mod tests {
use super::*;
use tempfile::TempDir;
#[test]
fn test_add_and_get_tasks() {
let temp = TempDir::new().unwrap();
let mut index = QueueIndexImpl::open(temp.path()).unwrap();
let tasks = vec![
Task::new("task-1", "job-a"),
Task::new("task-2", "job-b"),
];
let added = index.add_tasks(&tasks).unwrap();
assert_eq!(added, 2);
let batch = index.get_next_batch(10).unwrap();
assert_eq!(batch.len(), 2);
}
#[test]
fn test_priority_ordering() {
let mut heap = BinaryHeap::new();
let task1 = Task::new("task-1", "job-a");
let mut task2 = Task::new("task-2", "job-b");
task2.priority = 100; // Higher priority
heap.push(QueuedTask::from_task(task1));
heap.push(QueuedTask::from_task(task2));
// Higher priority should come first
let first = heap.pop().unwrap();
assert_eq!(first.task.id, "task-2");
}
}

View file

@ -0,0 +1,309 @@
//! Queue Index - High-performance priority queue with mmap persistence
//!
//! This crate provides a Rust implementation of the queue index with FFI exports
//! for integration with Go. It uses memory-mapped files for persistence and
//! crossbeam for lock-free concurrent operations.
use std::ffi::{CStr, CString};
use std::os::raw::{c_char, c_int};
use std::path::PathBuf;
use std::ptr;
use std::sync::{Arc, Mutex};
mod index;
mod storage;
mod task;
pub use index::QueueIndex;
pub use storage::IndexStorage;
pub use task::Task;
use index::QueueIndexImpl;
/// Opaque handle for queue index
pub struct QiIndex {
inner: Arc<Mutex<QueueIndexImpl>>,
last_error: Mutex<Option<String>>,
}
/// Task structure - matches C FFI layout
#[repr(C)]
pub struct QiTask {
pub id: [c_char; 64],
pub job_name: [c_char; 128],
pub priority: i64,
pub created_at: i64,
pub next_retry: i64,
pub status: [c_char; 16],
pub retries: u32,
}
impl QiTask {
fn from_task(task: &Task) -> Self {
let mut qi_task = QiTask {
id: [0; 64],
job_name: [0; 128],
priority: task.priority,
created_at: task.created_at,
next_retry: task.next_retry,
status: [0; 16],
retries: task.retries,
};
// Copy strings with null termination
Self::copy_str(&mut qi_task.id, &task.id, 64);
Self::copy_str(&mut qi_task.job_name, &task.job_name, 128);
Self::copy_str(&mut qi_task.status, &task.status, 16);
qi_task
}
fn copy_str(dest: &mut [c_char], src: &str, max_len: usize) {
let bytes = src.as_bytes();
let len = bytes.len().min(max_len - 1);
for i in 0..len {
dest[i] = bytes[i] as c_char;
}
dest[len] = 0;
}
fn to_task(&self) -> Task {
Task {
id: Self::cstr_to_string(&self.id),
job_name: Self::cstr_to_string(&self.job_name),
priority: self.priority,
created_at: self.created_at,
next_retry: self.next_retry,
status: Self::cstr_to_string(&self.status),
retries: self.retries,
}
}
fn cstr_to_string(arr: &[c_char]) -> String {
let bytes: Vec<u8> = arr.iter()
.take_while(|&&c| c != 0)
.map(|&c| c as u8)
.collect();
String::from_utf8_lossy(&bytes).to_string()
}
}
/// Open or create a queue index at the given directory
///
/// # Safety
/// path must be a valid null-terminated UTF-8 string
#[no_mangle]
pub unsafe extern "C" fn qi_open(path: *const c_char) -> *mut QiIndex {
if path.is_null() {
return ptr::null_mut();
}
let path_str = match CStr::from_ptr(path).to_str() {
Ok(s) => s,
Err(_) => return ptr::null_mut(),
};
let path_buf = PathBuf::from(path_str);
let result = std::panic::catch_unwind(|| {
match QueueIndexImpl::open(path_buf) {
Ok(inner) => {
let index = QiIndex {
inner: Arc::new(Mutex::new(inner)),
last_error: Mutex::new(None),
};
Box::into_raw(Box::new(index))
}
Err(e) => {
eprintln!("Failed to open queue index: {}", e);
ptr::null_mut()
}
}
});
match result {
Ok(ptr) => ptr,
Err(_) => {
eprintln!("Panic in qi_open");
ptr::null_mut()
}
}
}
/// Close and free a queue index
///
/// # Safety
/// idx must be a valid pointer returned by qi_open, or null
#[no_mangle]
pub unsafe extern "C" fn qi_close(idx: *mut QiIndex) {
if !idx.is_null() {
let _ = std::panic::catch_unwind(|| {
drop(Box::from_raw(idx));
});
}
}
/// Add tasks to the index in a batch
///
/// # Safety
/// idx must be valid, tasks must point to count valid QiTask structs
#[no_mangle]
pub unsafe extern "C" fn qi_add_tasks(
idx: *mut QiIndex,
tasks: *const QiTask,
count: u32,
) -> c_int {
if idx.is_null() || tasks.is_null() || count == 0 {
return -1;
}
let result = std::panic::catch_unwind(|| {
let index = &*idx;
let mut inner = index.inner.lock().unwrap();
let task_slice = std::slice::from_raw_parts(tasks, count as usize);
let rust_tasks: Vec<Task> = task_slice.iter().map(|t| t.to_task()).collect();
match inner.add_tasks(&rust_tasks) {
Ok(added) => added as c_int,
Err(e) => {
let mut error_guard = index.last_error.lock().unwrap();
*error_guard = Some(e.to_string());
-1
}
}
});
match result {
Ok(n) => n,
Err(_) => {
eprintln!("Panic in qi_add_tasks");
-1
}
}
}
/// Get the next batch of tasks from the priority queue
///
/// # Safety
/// idx must be valid, out_tasks must have space for max_count tasks, out_count must be valid
#[no_mangle]
pub unsafe extern "C" fn qi_get_next_batch(
idx: *mut QiIndex,
out_tasks: *mut QiTask,
max_count: u32,
out_count: *mut u32,
) -> c_int {
if idx.is_null() || out_tasks.is_null() || out_count.is_null() || max_count == 0 {
return -1;
}
let result = std::panic::catch_unwind(|| {
let index = &*idx;
let mut inner = index.inner.lock().unwrap();
match inner.get_next_batch(max_count as usize) {
Ok(tasks) => {
let count = tasks.len().min(max_count as usize);
let out_slice = std::slice::from_raw_parts_mut(out_tasks, count);
for (i, task) in tasks.iter().take(count).enumerate() {
out_slice[i] = QiTask::from_task(&task);
}
*out_count = count as u32;
0
}
Err(e) => {
let mut error_guard = index.last_error.lock().unwrap();
*error_guard = Some(e.to_string());
-1
}
}
});
match result {
Ok(rc) => rc,
Err(_) => {
eprintln!("Panic in qi_get_next_batch");
-1
}
}
}
/// Get the last error message for an index
///
/// # Safety
/// idx must be valid. Returns a static string that must not be freed.
#[no_mangle]
pub unsafe extern "C" fn qi_last_error(idx: *mut QiIndex) -> *const c_char {
if idx.is_null() {
return ptr::null();
}
let result = std::panic::catch_unwind(|| {
let index = &*idx;
let error_guard = index.last_error.lock().unwrap();
match error_guard.as_ref() {
Some(err) => {
// Leak the CString to return a stable pointer
// Caller must not free this
CString::new(err.clone()).unwrap().into_raw()
}
None => ptr::null(),
}
});
match result {
Ok(ptr) => ptr,
Err(_) => ptr::null(),
}
}
/// Clear the last error
///
/// # Safety
/// idx must be valid
#[no_mangle]
pub unsafe extern "C" fn qi_clear_error(idx: *mut QiIndex) {
if idx.is_null() {
return;
}
let _ = std::panic::catch_unwind(|| {
let index = &*idx;
let mut error_guard = index.last_error.lock().unwrap();
*error_guard = None;
});
}
/// Get the count of tasks with a given status
///
/// # Safety
/// idx must be valid, status must be a null-terminated string
#[no_mangle]
pub unsafe extern "C" fn qi_get_task_count(
idx: *mut QiIndex,
status: *const c_char,
) -> usize {
if idx.is_null() || status.is_null() {
return 0;
}
let status_str = match CStr::from_ptr(status).to_str() {
Ok(s) => s,
Err(_) => return 0,
};
let result = std::panic::catch_unwind(|| {
let index = &*idx;
let inner = index.inner.lock().unwrap();
inner.get_task_count(status_str)
});
match result {
Ok(count) => count,
Err(_) => 0,
}
}

View file

@ -0,0 +1,202 @@
//! Memory-mapped storage with safe access patterns
//!
//! Design: Unsafe raw pointers are contained within RawStorage.
//! All public access goes through IndexStorage with safe methods.
use memmap2::{MmapMut, MmapOptions};
use std::fs::{File, OpenOptions};
use std::io::{self, Write};
use std::path::{Path, PathBuf};
use std::sync::Arc;
/// Header stored at the beginning of the mmap file
#[repr(C)]
#[derive(Debug, Clone, Copy)]
pub struct IndexHeader {
pub version: u64,
pub magic: [u8; 8],
pub entry_count: u64,
pub last_modified: i64,
pub checksum: u64,
}
impl IndexHeader {
pub const MAGIC: [u8; 8] = *b"FETCHIDX";
pub const VERSION: u64 = 1;
pub const SIZE: usize = std::mem::size_of::<IndexHeader>();
pub fn new() -> Self {
Self {
version: Self::VERSION,
magic: Self::MAGIC,
entry_count: 0,
last_modified: 0,
checksum: 0,
}
}
pub fn is_valid(&self) -> bool {
self.magic == Self::MAGIC && self.version == Self::VERSION
}
}
/// Internal unsafe state - never exposed directly
struct RawStorage {
mmap: MmapMut,
header_ptr: *mut IndexHeader,
data_offset: usize,
}
impl RawStorage {
fn new(mmap: MmapMut) -> io::Result<Self> {
let ptr = mmap.as_ptr() as *mut u8;
let header_ptr = ptr as *mut IndexHeader;
Ok(Self {
mmap,
header_ptr,
data_offset: IndexHeader::SIZE,
})
}
fn header(&self) -> &IndexHeader {
unsafe { &*self.header_ptr }
}
fn header_mut(&mut self) -> &mut IndexHeader {
unsafe { &mut *self.header_ptr }
}
}
/// Public safe interface to mmap storage
pub struct IndexStorage {
raw: RawStorage,
path: PathBuf,
}
impl IndexStorage {
/// Open or create storage at the given path
pub fn open<P: AsRef<Path>>(path: P) -> io::Result<Self> {
let path = path.as_ref().to_path_buf();
std::fs::create_dir_all(&path)?;
let file_path = path.join("index.dat");
let file = OpenOptions::new()
.read(true)
.write(true)
.create(true)
.open(&file_path)?;
// Ensure file is at least header size
let file_size = file.metadata()?.len() as usize;
let min_size = IndexHeader::SIZE;
if file_size < min_size {
let header = IndexHeader::new();
let header_bytes = unsafe {
std::slice::from_raw_parts(
&header as *const IndexHeader as *const u8,
IndexHeader::SIZE,
)
};
file.set_len(min_size as u64)?;
file.write_all_at(header_bytes, 0)?;
}
let mmap = unsafe { MmapOptions::new().map_mut(&file)? };
let raw = RawStorage::new(mmap)?;
Ok(Self { raw, path })
}
/// Get a reference to the header (read-only)
pub fn header(&self) -> &IndexHeader {
self.raw.header()
}
/// Get a mutable reference to the header
pub fn header_mut(&mut self) -> &mut IndexHeader {
self.raw.header_mut()
}
/// Verify header magic and version
pub fn verify(&self) -> io::Result<()> {
let header = self.header();
if !header.is_valid() {
return Err(io::Error::new(
io::ErrorKind::InvalidData,
"Invalid index header (wrong magic or version)",
));
}
Ok(())
}
/// Get the path to the storage directory
pub fn path(&self) -> &Path {
&self.path
}
/// Flush changes to disk
pub fn flush(&mut self) -> io::Result<()> {
self.raw.mmap.flush()
}
}
/// Thread-safe wrapper for concurrent access
pub struct SharedStorage {
inner: Arc<Mutex<IndexStorage>>,
}
impl SharedStorage {
pub fn open<P: AsRef<Path>>(path: P) -> io::Result<Self> {
let storage = IndexStorage::open(path)?;
Ok(Self {
inner: Arc::new(Mutex::new(storage)),
})
}
pub fn lock(&self) -> MutexGuard<IndexStorage> {
self.inner.lock().unwrap()
}
}
#[cfg(test)]
mod tests {
use super::*;
use tempfile::TempDir;
#[test]
fn test_storage_creation() {
let temp = TempDir::new().unwrap();
let storage = IndexStorage::open(temp.path()).unwrap();
assert!(storage.header().is_valid());
}
#[test]
fn test_storage_verify() {
let temp = TempDir::new().unwrap();
let storage = IndexStorage::open(temp.path()).unwrap();
assert!(storage.verify().is_ok());
}
#[test]
fn test_shared_storage() {
let temp = TempDir::new().unwrap();
let shared = SharedStorage::open(temp.path()).unwrap();
{
let storage = shared.lock();
assert!(storage.header().is_valid());
}
{
let mut storage = shared.lock();
storage.header_mut().entry_count = 42;
}
{
let storage = shared.lock();
assert_eq!(storage.header().entry_count, 42);
}
}
}

View file

@ -0,0 +1,74 @@
//! Task definition and serialization
use serde::{Deserialize, Serialize};
/// Task structure - matches both C FFI and Go queue.Task
#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq)]
pub struct Task {
pub id: String,
pub job_name: String,
pub priority: i64,
pub created_at: i64,
pub next_retry: i64,
pub status: String,
pub retries: u32,
}
impl Task {
/// Create a new task with default values
pub fn new(id: impl Into<String>, job_name: impl Into<String>) -> Self {
Self {
id: id.into(),
job_name: job_name.into(),
priority: 0,
created_at: chrono::Utc::now().timestamp_nanos_opt().unwrap_or(0),
next_retry: 0,
status: "queued".to_string(),
retries: 0,
}
}
/// Serialize to JSON bytes
pub fn to_json(&self) -> Result<Vec<u8>, serde_json::Error> {
serde_json::to_vec(self)
}
/// Deserialize from JSON bytes
pub fn from_json(data: &[u8]) -> Result<Self, serde_json::Error> {
serde_json::from_slice(data)
}
/// Check if task is ready to be scheduled
pub fn is_ready(&self) -> bool {
self.status == "queued" && (self.next_retry == 0 || self.next_retry <= current_time())
}
}
fn current_time() -> i64 {
chrono::Utc::now().timestamp_nanos_opt().unwrap_or(0)
}
#[cfg(test)]
mod tests {
use super::*;
#[test]
fn test_task_serialization() {
let task = Task::new("task-1", "test-job");
let json = task.to_json().unwrap();
let deserialized = Task::from_json(&json).unwrap();
assert_eq!(task.id, deserialized.id);
assert_eq!(task.job_name, deserialized.job_name);
}
#[test]
fn test_task_ready() {
let mut task = Task::new("task-1", "test-job");
task.status = "queued".to_string();
task.next_retry = 0;
assert!(task.is_ready());
task.next_retry = current_time() + 1_000_000_000; // 1 second in future
assert!(!task.is_ready());
}
}

View file

@ -0,0 +1,3 @@
[toolchain]
channel = "1.85.0"
components = ["clippy", "rustfmt"]