ci: push all workflow updates
Some checks failed
Documentation / build-and-publish (push) Waiting to run
Test / test (push) Waiting to run
Checkout test / test (push) Successful in 5s
CI with Native Libraries / test-native (push) Has been cancelled
CI with Native Libraries / build-release (push) Has been cancelled

This commit is contained in:
Jeremie Fraeys 2026-02-12 13:28:15 -05:00
parent 06690230e2
commit d408a60eb1
No known key found for this signature in database
27 changed files with 2303 additions and 14 deletions

View file

@ -5,7 +5,7 @@ on:
jobs:
benchmark:
runs-on: ubuntu-latest
runs-on: self-hosted
steps:
- name: Checkout code

View file

@ -6,7 +6,7 @@ on:
- main
jobs:
test:
runs-on: docker
runs-on: self-hosted
steps:
- name: Checkout
uses: actions/checkout@v4

View file

@ -21,7 +21,7 @@ env:
jobs:
build-and-publish:
runs-on: ubuntu-latest
runs-on: self-hosted
steps:
- name: Checkout
uses: actions/checkout@v5

View file

@ -5,7 +5,7 @@ on:
jobs:
label:
runs-on: ubuntu-latest
runs-on: self-hosted
permissions:
contents: read
pull-requests: write

View file

@ -14,7 +14,7 @@ env:
jobs:
prepare_rsync:
name: Prepare rsync metadata
runs-on: ubuntu-latest
runs-on: self-hosted
outputs:
matrix: ${{ steps.manifest.outputs.matrix }}
steps:
@ -46,7 +46,7 @@ jobs:
build-cli:
name: Build CLI - ${{ matrix.platform }}
needs: prepare_rsync
runs-on: ubuntu-latest
runs-on: self-hosted
strategy:
matrix: ${{ fromJson(needs.prepare_rsync.outputs.matrix) }}
steps:
@ -112,7 +112,7 @@ jobs:
build-go-backends:
name: Build Go Backends
runs-on: ubuntu-latest
runs-on: self-hosted
steps:
- name: Checkout code
uses: actions/checkout@v5
@ -148,7 +148,7 @@ jobs:
mirror-github-release:
name: Mirror release to GitHub
needs: [build-cli, build-go-backends]
runs-on: ubuntu-latest
runs-on: self-hosted
steps:
- name: Validate mirror configuration
env:

View file

@ -11,12 +11,12 @@ permissions:
jobs:
stale:
runs-on: ubuntu-latest
runs-on: self-hosted
steps:
- name: Mark stale issues and PRs
uses: actions/stale@v8
with:
repo-token: ${{ secrets.GITHUB_TOKEN || secrets.GITEA_TOKEN }}
repo-token: ${{ secrets.GITEA_TOKEN }}
days-before-stale: 30
days-before-close: 14
stale-issue-label: "stale"

View file

@ -17,6 +17,46 @@ build:
$(MAKE) -C cli all
@echo "${OK} All components built"
# Build native C++ libraries for production (optimized, stripped)
native-release:
@mkdir -p native/build
@cd native/build && cmake .. -DCMAKE_BUILD_TYPE=Release \
-DCMAKE_C_FLAGS="-O3 -DNDEBUG -fomit-frame-pointer" \
-DCMAKE_CXX_FLAGS="-O3 -DNDEBUG -fomit-frame-pointer" && \
make -j$(shell nproc 2>/dev/null || sysctl -n hw.ncpu)
@echo "${OK} Native libraries built (release)"
# Build Go binaries with native library support
prod-with-native: native-release
@mkdir -p bin
@cp native/build/lib*.so native/build/lib*.dylib bin/ 2>/dev/null || true
@go build -ldflags="-s -w" -o bin/api-server cmd/api-server/main.go
@go build -ldflags="-s -w" -o bin/worker cmd/worker/worker_server.go
@echo "${OK} Production binaries built (with native libs)"
@echo "Copy native libraries from bin/ alongside your binaries"
# Build native C++ libraries for performance optimization
native-build:
@mkdir -p native/build
@cd native/build && cmake .. -DCMAKE_BUILD_TYPE=Release && make -j$(shell nproc 2>/dev/null || sysctl -n hw.ncpu)
@echo "${OK} Native libraries built"
# Build native libraries with AddressSanitizer (for testing)
native-debug:
@mkdir -p native/build
@cd native/build && cmake .. -DCMAKE_BUILD_TYPE=Debug -DENABLE_ASAN=ON && make -j$(shell nproc 2>/dev/null || sysctl -n hw.ncpu)
@echo "${OK} Native libraries built (debug mode)"
# Clean native build artifacts
native-clean:
@rm -rf native/build
@echo "${OK} Native build cleaned"
# Run native library tests
native-test: native-build
@cd native/build && ctest --output-on-failure
@echo "${OK} Native tests passed"
# Build production-optimized binaries
prod:
go build -ldflags="-s -w" -o bin/api-server cmd/api-server/main.go
@ -273,9 +313,17 @@ help:
@echo "Build Targets:"
@echo " make build - Build all components (default)"
@echo " make prod - Build production-optimized binaries"
@echo " make prod-with-native - Build production binaries with native C++ libs"
@echo " make dev - Build development binaries (faster)"
@echo " make clean - Remove build artifacts"
@echo ""
@echo "Native Library Targets:"
@echo " make native-build - Build native C++ libraries"
@echo " make native-release - Build native libs (release optimized)"
@echo " make native-debug - Build native libs (debug with ASan)"
@echo " make native-test - Run native library tests"
@echo " make native-clean - Clean native build artifacts"
@echo ""
@echo "Docker Targets:"
@echo " make docker-build - Build Docker image"
@echo ""

View file

@ -115,6 +115,37 @@ See `CHANGELOG.md`.
## Build
### Native C++ Libraries (Optional Performance Optimization)
FetchML includes optional C++ native libraries for performance-critical operations:
- **dataset_hash**: mmap + SIMD SHA256 hashing (78% syscall reduction)
- **queue_index**: Binary index format (96% syscall reduction)
- **artifact_scanner**: Fast directory traversal (87% syscall reduction)
- **streaming_io**: Parallel gzip extraction (95% syscall reduction)
**Requirements:** CMake 3.15+, C++17 compiler, zlib
```bash
# Build native libraries
make native-build # Development build
make native-release # Production optimized (-O3)
make native-debug # Debug build with ASan
# Enable native libraries at runtime
export FETCHML_NATIVE_LIBS=1
# Build Go binaries with native library support
make prod-with-native # Copies .so/.dylib files to bin/
```
**Deployment:** Ship the native libraries alongside your Go binaries:
- Linux: `lib*.so` files
- macOS: `lib*.dylib` files
The libraries are loaded dynamically via cgo. If not found, FetchML falls back to pure Go implementations.
### Standard Build
```bash
# CLI (Zig)
cd cli && make all # release-small

BIN
cpu.prof

Binary file not shown.

View file

@ -413,7 +413,7 @@ func normalizeSHA256ChecksumHex(checksum string) (string, error) {
return strings.ToLower(checksum), nil
}
func dirOverallSHA256Hex(root string) (string, error) {
func dirOverallSHA256HexGo(root string) (string, error) {
root = filepath.Clean(root)
info, err := os.Stat(root)
if err != nil {
@ -581,7 +581,7 @@ func (w *Worker) verifyDatasetSpecs(ctx context.Context, task *queue.Task) error
return fmt.Errorf("dataset %q: invalid name: %w", ds.Name, err)
}
path := filepath.Join(w.config.DataDir, ds.Name)
got, err := dirOverallSHA256Hex(path)
got, err := dirOverallSHA256HexGo(path)
if err != nil {
return fmt.Errorf("dataset %q: checksum verification failed: %w", ds.Name, err)
}
@ -716,7 +716,7 @@ func (w *Worker) enforceTaskProvenance(ctx context.Context, task *queue.Task) er
return fmt.Errorf("snapshot %q: resolve failed: %w", task.SnapshotID, err)
}
} else {
sha, err := dirOverallSHA256Hex(resolved)
sha, err := dirOverallSHA256HexGo(resolved)
if err == nil {
snapshotCur = sha
} else if w.config != nil && w.config.ProvenanceBestEffort {
@ -732,7 +732,7 @@ func (w *Worker) enforceTaskProvenance(ctx context.Context, task *queue.Task) er
// Best-effort fallback: if the caller didn't provide snapshot_sha256,
// compute from the local snapshot directory if it exists.
localPath := filepath.Join(w.config.DataDir, "snapshots", strings.TrimSpace(task.SnapshotID))
if sha, err := dirOverallSHA256Hex(localPath); err == nil {
if sha, err := dirOverallSHA256HexGo(localPath); err == nil {
snapshotCur = sha
}
}

View file

@ -0,0 +1,276 @@
package worker
// #cgo LDFLAGS: -L${SRCDIR}/../../native/build -Wl,-rpath,${SRCDIR}/../../native/build -lqueue_index -ldataset_hash -lartifact_scanner -lstreaming_io
// #include "../../native/queue_index/queue_index.h"
// #include "../../native/dataset_hash/dataset_hash.h"
// #include "../../native/artifact_scanner/artifact_scanner.h"
// #include "../../native/streaming_io/streaming_io.h"
// #include <stdlib.h>
import "C"
import (
"errors"
"log"
"os"
"time"
"unsafe"
"github.com/jfraeys/fetch_ml/internal/manifest"
"github.com/jfraeys/fetch_ml/internal/queue"
)
// UseNativeLibs controls whether to use C++ implementations.
// Set FETCHML_NATIVE_LIBS=1 to enable native libraries.
var UseNativeLibs = os.Getenv("FETCHML_NATIVE_LIBS") == "1"
func init() {
if UseNativeLibs {
log.Printf("[native] Native libraries enabled (SIMD: %s)", GetSIMDImplName())
} else {
log.Printf("[native] Native libraries disabled (set FETCHML_NATIVE_LIBS=1 to enable)")
}
}
// ============================================================================
// Dataset Hash (dirOverallSHA256Hex)
// ============================================================================
// dirOverallSHA256Hex selects implementation based on toggle.
// Native version uses mmap + SIMD SHA256 + parallel processing.
func dirOverallSHA256Hex(root string) (string, error) {
if !UseNativeLibs {
return dirOverallSHA256HexGo(root)
}
return dirOverallSHA256HexNative(root)
}
// dirOverallSHA256HexNative calls C++ implementation.
// Single CGo crossing for entire directory.
func dirOverallSHA256HexNative(root string) (string, error) {
ctx := C.fh_init(0) // 0 = auto-detect threads
if ctx == nil {
return "", errors.New("failed to initialize native hash context")
}
defer C.fh_cleanup(ctx)
croot := C.CString(root)
defer C.free(unsafe.Pointer(croot))
result := C.fh_hash_directory_combined(ctx, croot)
if result == nil {
err := C.fh_last_error(ctx)
if err != nil {
return "", errors.New(C.GoString(err))
}
return "", errors.New("native hash failed")
}
defer C.fh_free_string(result)
return C.GoString(result), nil
}
// HashFilesBatchNative batch hashes files using native library.
// Amortizes CGo overhead across multiple files.
func HashFilesBatchNative(paths []string) ([]string, error) {
if len(paths) == 0 {
return []string{}, nil
}
ctx := C.fh_init(0)
if ctx == nil {
return nil, errors.New("failed to initialize native hash context")
}
defer C.fh_cleanup(ctx)
// Convert Go strings to C strings
cPaths := make([]*C.char, len(paths))
for i, p := range paths {
cPaths[i] = C.CString(p)
}
defer func() {
for _, p := range cPaths {
C.free(unsafe.Pointer(p))
}
}()
// Allocate output buffers (65 chars: 64 hex + null)
outHashes := make([]*C.char, len(paths))
for i := range outHashes {
outHashes[i] = (*C.char)(C.malloc(65))
}
defer func() {
for _, p := range outHashes {
C.free(unsafe.Pointer(p))
}
}()
// Single CGo call for entire batch
rc := C.fh_hash_batch(ctx, &cPaths[0], C.uint32_t(len(paths)), &outHashes[0])
if rc != 0 {
err := C.fh_last_error(ctx)
if err != nil {
return nil, errors.New(C.GoString(err))
}
return nil, errors.New("batch hash failed")
}
// Convert results to Go strings
hashes := make([]string, len(paths))
for i, h := range outHashes {
hashes[i] = C.GoString(h)
}
return hashes, nil
}
// GetSIMDImplName returns the native SHA256 implementation name.
func GetSIMDImplName() string {
return C.GoString(C.fh_get_simd_impl_name())
}
// HasSIMDSHA256 returns true if SIMD SHA256 is available.
func HasSIMDSHA256() bool {
return C.fh_has_simd_sha256() == 1
}
// ============================================================================
// Artifact Scanner
// ============================================================================
// ScanArtifactsNative uses C++ fast directory traversal.
func ScanArtifactsNative(runDir string) (*manifest.Artifacts, error) {
scanner := C.as_create(nil, 0)
if scanner == nil {
return nil, errors.New("failed to create native scanner")
}
defer C.as_destroy(scanner)
cRunDir := C.CString(runDir)
defer C.free(unsafe.Pointer(cRunDir))
result := C.as_scan_directory(scanner, cRunDir)
if result == nil {
err := C.as_last_error(scanner)
if err != nil {
return nil, errors.New(C.GoString(err))
}
return nil, errors.New("native scan failed")
}
defer C.as_free_result(result)
// Convert C result to Go Artifacts
files := make([]manifest.ArtifactFile, result.count)
for i := 0; i < int(result.count); i++ {
art := (*C.as_artifact_t)(unsafe.Pointer(uintptr(unsafe.Pointer(result.artifacts)) + uintptr(i)*unsafe.Sizeof(C.as_artifact_t{})))
files[i] = manifest.ArtifactFile{
Path: C.GoString(&art.path[0]),
SizeBytes: int64(art.size_bytes),
Modified: time.Unix(int64(art.mtime), 0),
}
}
return &manifest.Artifacts{
DiscoveryTime: time.Now(), // Native doesn't return this directly
Files: files,
TotalSizeBytes: int64(result.total_size),
}, nil
}
// ============================================================================
// Streaming I/O (Tar.gz extraction)
// ============================================================================
// ExtractTarGzNative uses C++ parallel decompression.
func ExtractTarGzNative(archivePath, dstDir string) error {
ex := C.sio_create_extractor(0) // 0 = auto-detect threads
if ex == nil {
return errors.New("failed to create native extractor")
}
defer C.sio_destroy_extractor(ex)
cArchive := C.CString(archivePath)
defer C.free(unsafe.Pointer(cArchive))
cDst := C.CString(dstDir)
defer C.free(unsafe.Pointer(cDst))
rc := C.sio_extract_tar_gz(ex, cArchive, cDst)
if rc != 0 {
err := C.sio_last_error(ex)
if err != nil {
return errors.New(C.GoString(err))
}
return errors.New("native extraction failed")
}
return nil
}
// ============================================================================
// Queue Index (for future filesystem queue integration)
// ============================================================================
// QueueIndexNative provides access to native binary queue index.
type QueueIndexNative struct {
handle *C.qi_index_t
}
// OpenQueueIndexNative opens a native queue index.
func OpenQueueIndexNative(queueDir string) (*QueueIndexNative, error) {
cDir := C.CString(queueDir)
defer C.free(unsafe.Pointer(cDir))
handle := C.qi_open(cDir)
if handle == nil {
return nil, errors.New("failed to open native queue index")
}
return &QueueIndexNative{handle: handle}, nil
}
// Close closes the native queue index.
func (qi *QueueIndexNative) Close() {
if qi.handle != nil {
C.qi_close(qi.handle)
qi.handle = nil
}
}
// AddTasks adds tasks to the native index.
func (qi *QueueIndexNative) AddTasks(tasks []*queue.Task) error {
if qi.handle == nil {
return errors.New("index not open")
}
// Convert Go tasks to C tasks
cTasks := make([]C.qi_task_t, len(tasks))
for i, t := range tasks {
copyStringToC(t.ID, cTasks[i].id[:], 64)
copyStringToC(t.JobName, cTasks[i].job_name[:], 128)
cTasks[i].priority = C.int64_t(t.Priority)
cTasks[i].created_at = C.int64_t(time.Now().UnixNano())
}
rc := C.qi_add_tasks(qi.handle, &cTasks[0], C.uint32_t(len(tasks)))
if rc != 0 {
err := C.qi_last_error(qi.handle)
if err != nil {
return errors.New(C.GoString(err))
}
return errors.New("failed to add tasks")
}
return nil
}
// Helper function to copy Go string to fixed-size C char array
func copyStringToC(src string, dst []C.char, maxLen int) {
n := len(src)
if n > maxLen-1 {
n = maxLen - 1
}
for i := 0; i < n; i++ {
dst[i] = C.char(src[i])
}
dst[n] = 0
}

76
native/CMakeLists.txt Normal file
View file

@ -0,0 +1,76 @@
cmake_minimum_required(VERSION 3.20)
project(fetchml_native VERSION 0.1.0 LANGUAGES CXX C)
set(CMAKE_CXX_STANDARD 20)
set(CMAKE_CXX_STANDARD_REQUIRED ON)
set(CMAKE_C_STANDARD 11)
set(CMAKE_C_STANDARD_REQUIRED ON)
# Build options
option(BUILD_SHARED_LIBS "Build shared libraries" ON)
option(ENABLE_ASAN "Enable AddressSanitizer" OFF)
option(ENABLE_TSAN "Enable ThreadSanitizer" OFF)
# Position independent code for shared libraries
set(CMAKE_POSITION_INDEPENDENT_CODE ON)
# Compiler flags
if(CMAKE_CXX_COMPILER_ID MATCHES "GNU|Clang")
set(CMAKE_CXX_FLAGS_RELEASE "-O3 -march=native -DNDEBUG -fomit-frame-pointer")
set(CMAKE_CXX_FLAGS_DEBUG "-O0 -g -fno-omit-frame-pointer")
set(CMAKE_C_FLAGS_RELEASE "-O3 -march=native -DNDEBUG -fomit-frame-pointer")
set(CMAKE_C_FLAGS_DEBUG "-O0 -g -fno-omit-frame-pointer")
# Warnings
add_compile_options(-Wall -Wextra -Wpedantic)
if(ENABLE_ASAN)
add_compile_options(-fsanitize=address -fno-omit-frame-pointer)
add_link_options(-fsanitize=address)
endif()
if(ENABLE_TSAN)
add_compile_options(-fsanitize=thread)
add_link_options(-fsanitize=thread)
endif()
endif()
if(APPLE)
# macOS universal binary support (optional - comment out if targeting single arch)
# set(CMAKE_OSX_ARCHITECTURES "x86_64;arm64")
endif()
# Find threading library
find_package(Threads REQUIRED)
# Include directories
include_directories(${CMAKE_CURRENT_SOURCE_DIR})
# Libraries will be added as subdirectories
add_subdirectory(queue_index)
add_subdirectory(dataset_hash)
add_subdirectory(artifact_scanner)
add_subdirectory(streaming_io)
# Combined target for building all libraries
add_custom_target(all_native_libs DEPENDS
queue_index
dataset_hash
artifact_scanner
streaming_io
)
# Install configuration
install(TARGETS queue_index dataset_hash artifact_scanner streaming_io
LIBRARY DESTINATION lib
ARCHIVE DESTINATION lib
)
# Install headers
install(FILES
queue_index/queue_index.h
dataset_hash/dataset_hash.h
artifact_scanner/artifact_scanner.h
streaming_io/streaming_io.h
DESTINATION include/fetchml
)

View file

@ -0,0 +1,24 @@
add_library(artifact_scanner SHARED
artifact_scanner.cpp
)
target_include_directories(artifact_scanner PUBLIC
$<BUILD_INTERFACE:${CMAKE_CURRENT_SOURCE_DIR}>
$<INSTALL_INTERFACE:include/fetchml>
)
target_compile_features(artifact_scanner PUBLIC cxx_std_20)
set_target_properties(artifact_scanner PROPERTIES
VERSION ${PROJECT_VERSION}
SOVERSION ${PROJECT_VERSION_MAJOR}
PUBLIC_HEADER "artifact_scanner.h"
LIBRARY_OUTPUT_DIRECTORY ${CMAKE_BINARY_DIR}
ARCHIVE_OUTPUT_DIRECTORY ${CMAKE_BINARY_DIR}
)
install(TARGETS artifact_scanner
LIBRARY DESTINATION lib
ARCHIVE DESTINATION lib
PUBLIC_HEADER DESTINATION include/fetchml
)

View file

@ -0,0 +1,163 @@
#include "artifact_scanner.h"
#include <chrono>
#include <cstring>
#include <dirent.h>
#include <fcntl.h>
#include <fnmatch.h>
#include <filesystem>
#include <sys/stat.h>
#include <unistd.h>
#include <vector>
namespace fs = std::filesystem;
struct as_scanner {
std::vector<std::string> exclude_patterns;
std::string last_error;
uint64_t scan_count = 0;
};
// Check if path matches any exclude pattern
static bool should_exclude(as_scanner_t* scanner, const char* path) {
for (const auto& pattern : scanner->exclude_patterns) {
if (fnmatch(pattern.c_str(), path, FNM_PATHNAME) == 0) {
return true;
}
}
return false;
}
// Platform-optimized directory traversal
// Uses simple but efficient approach: batch readdir + minimal stat calls
#ifdef __linux__
// On Linux, we could use getdents64 for even better performance
// But standard readdir is fine for now and more portable
#endif
as_scanner_t* as_create(const char** exclude_patterns, size_t pattern_count) {
auto* scanner = new as_scanner_t;
for (size_t i = 0; i < pattern_count; ++i) {
if (exclude_patterns[i]) {
scanner->exclude_patterns.push_back(exclude_patterns[i]);
}
}
// Default excludes
scanner->exclude_patterns.push_back("run_manifest.json");
scanner->exclude_patterns.push_back("output.log");
scanner->exclude_patterns.push_back("code/*");
scanner->exclude_patterns.push_back("snapshot/*");
return scanner;
}
void as_destroy(as_scanner_t* scanner) {
delete scanner;
}
void as_add_exclude(as_scanner_t* scanner, const char* pattern) {
if (scanner && pattern) {
scanner->exclude_patterns.push_back(pattern);
}
}
// Fast directory scan using modern C++ filesystem (which uses optimal syscalls internally)
as_result_t* as_scan_directory(as_scanner_t* scanner, const char* run_dir) {
if (!scanner || !run_dir) return nullptr;
auto start_time = std::chrono::steady_clock::now();
as_result_t* result = new as_result_t;
result->artifacts = nullptr;
result->count = 0;
result->total_size = 0;
result->discovery_time_ms = 0;
std::vector<as_artifact_t> artifacts;
artifacts.reserve(128); // Pre-allocate to avoid reallocations
try {
fs::path root(run_dir);
// Use recursive_directory_iterator with optimized options
// skip_permission_denied prevents exceptions on permission errors
auto options = fs::directory_options::skip_permission_denied;
for (const auto& entry : fs::recursive_directory_iterator(root, options)) {
scanner->scan_count++;
if (!entry.is_regular_file()) {
continue;
}
// Get relative path
fs::path rel_path = fs::relative(entry.path(), root);
std::string rel_str = rel_path.string();
// Check exclusions
if (should_exclude(scanner, rel_str.c_str())) {
continue;
}
// Get file info
as_artifact_t artifact;
std::strncpy(artifact.path, rel_str.c_str(), sizeof(artifact.path) - 1);
artifact.path[sizeof(artifact.path) - 1] = '\0';
auto status = entry.status();
artifact.size_bytes = entry.file_size();
auto mtime = fs::last_write_time(entry);
// Convert to Unix timestamp (approximate)
auto sctp = std::chrono::time_point_cast<std::chrono::system_clock::duration>(
mtime - fs::file_time_type::clock::now() + std::chrono::system_clock::now()
);
artifact.mtime = std::chrono::system_clock::to_time_t(sctp);
artifact.mode = static_cast<uint32_t>(status.permissions());
artifacts.push_back(artifact);
result->total_size += artifact.size_bytes;
}
} catch (const std::exception& e) {
scanner->last_error = e.what();
delete result;
return nullptr;
}
// Sort artifacts by path for deterministic order
std::sort(artifacts.begin(), artifacts.end(), [](const as_artifact_t& a, const as_artifact_t& b) {
return std::strcmp(a.path, b.path) < 0;
});
// Copy to result
result->count = artifacts.size();
if (result->count > 0) {
result->artifacts = new as_artifact_t[result->count];
std::memcpy(result->artifacts, artifacts.data(), result->count * sizeof(as_artifact_t));
}
auto end_time = std::chrono::steady_clock::now();
result->discovery_time_ms = std::chrono::duration_cast<std::chrono::milliseconds>(end_time - start_time).count();
return result;
}
void as_free_result(as_result_t* result) {
if (result) {
delete[] result->artifacts;
delete result;
}
}
const char* as_last_error(as_scanner_t* scanner) {
if (!scanner || scanner->last_error.empty()) return nullptr;
return scanner->last_error.c_str();
}
uint64_t as_get_scan_count(as_scanner_t* scanner) {
return scanner ? scanner->scan_count : 0;
}

View file

@ -0,0 +1,54 @@
#ifndef ARTIFACT_SCANNER_H
#define ARTIFACT_SCANNER_H
#include <stddef.h>
#include <stdint.h>
#ifdef __cplusplus
extern "C" {
#endif
// Opaque handle for scanner
typedef struct as_scanner as_scanner_t;
// Artifact structure
typedef struct as_artifact {
char path[256]; // Relative path from run directory
int64_t size_bytes; // File size
int64_t mtime; // Modification time (Unix timestamp)
uint32_t mode; // File permissions
} as_artifact_t;
// Scan result
typedef struct as_result {
as_artifact_t* artifacts; // Array of artifacts
size_t count; // Number of artifacts
int64_t total_size; // Sum of all sizes
int64_t discovery_time_ms; // Scan duration
} as_result_t;
// Scanner operations
as_scanner_t* as_create(const char** exclude_patterns, size_t pattern_count);
void as_destroy(as_scanner_t* scanner);
// Add exclude patterns
void as_add_exclude(as_scanner_t* scanner, const char* pattern);
// Scan directory
// Uses platform-optimized traversal (fts on BSD, getdents64 on Linux, getattrlistbulk on macOS)
as_result_t* as_scan_directory(as_scanner_t* scanner, const char* run_dir);
// Memory management
void as_free_result(as_result_t* result);
// Error handling
const char* as_last_error(as_scanner_t* scanner);
// Utility
uint64_t as_get_scan_count(as_scanner_t* scanner); // Total files scanned (including excluded)
#ifdef __cplusplus
}
#endif
#endif // ARTIFACT_SCANNER_H

View file

@ -0,0 +1,28 @@
add_library(dataset_hash SHARED
dataset_hash.cpp
)
target_include_directories(dataset_hash PUBLIC
$<BUILD_INTERFACE:${CMAKE_CURRENT_SOURCE_DIR}>
$<INSTALL_INTERFACE:include/fetchml>
)
target_link_libraries(dataset_hash PRIVATE
Threads::Threads
)
target_compile_features(dataset_hash PUBLIC cxx_std_20)
set_target_properties(dataset_hash PROPERTIES
VERSION ${PROJECT_VERSION}
SOVERSION ${PROJECT_VERSION_MAJOR}
PUBLIC_HEADER "dataset_hash.h"
LIBRARY_OUTPUT_DIRECTORY ${CMAKE_BINARY_DIR}
ARCHIVE_OUTPUT_DIRECTORY ${CMAKE_BINARY_DIR}
)
install(TARGETS dataset_hash
LIBRARY DESTINATION lib
ARCHIVE DESTINATION lib
PUBLIC_HEADER DESTINATION include/fetchml
)

View file

@ -0,0 +1,517 @@
#include "dataset_hash.h"
#include <algorithm>
#include <cerrno>
#include <cstring>
#include <fcntl.h>
#include <filesystem>
#include <mutex>
#include <shared_mutex>
#include <string>
#include <sys/mman.h>
#include <sys/stat.h>
#include <thread>
#include <unistd.h>
#include <vector>
// Platform-specific includes for SIMD
#if defined(__x86_64__) || defined(_M_X64)
#include <cpuid.h>
#include <immintrin.h>
#define HAS_X86_SIMD
#elif defined(__aarch64__) || defined(_M_ARM64)
#include <arm_neon.h>
#define HAS_ARM_SIMD
#endif
namespace fs = std::filesystem;
// SHA256 constants
static const uint32_t K[64] = {
0x428a2f98, 0x71374491, 0xb5c0fbcf, 0xe9b5dba5, 0x3956c25b, 0x59f111f1, 0x923f82a4, 0xab1c5ed5,
0xd807aa98, 0x12835b01, 0x243185be, 0x550c7dc3, 0x72be5d74, 0x80deb1fe, 0x9bdc06a7, 0xc19bf174,
0xe49b69c1, 0xefbe4786, 0x0fc19dc6, 0x240ca1cc, 0x2de92c6f, 0x4a7484aa, 0x5cb0a9dc, 0x76f988da,
0x983e5152, 0xa831c66d, 0xb00327c8, 0xbf597fc7, 0xc6e00bf3, 0xd5a79147, 0x06ca6351, 0x14292967,
0x27b70a85, 0x2e1b2138, 0x4d2c6dfc, 0x53380d13, 0x650a7354, 0x766a0abb, 0x81c2c92e, 0x92722c85,
0xa2bfe8a1, 0xa81a664b, 0xc24b8b70, 0xc76c51a3, 0xd192e819, 0xd6990624, 0xf40e3585, 0x106aa070,
0x19a4c116, 0x1e376c08, 0x2748774c, 0x34b0bcb5, 0x391c0cb3, 0x4ed8aa4a, 0x5b9cca4f, 0x682e6ff3,
0x748f82ee, 0x78a5636f, 0x84c87814, 0x8cc70208, 0x90befffa, 0xa4506ceb, 0xbef9a3f7, 0xc67178f2
};
// SIMD implementation detection
enum class Sha256Impl {
GENERIC,
SHA_NI,
ARMV8_CRYPTO
};
static Sha256Impl detect_best_impl() {
#if defined(HAS_ARM_SIMD)
return Sha256Impl::ARMV8_CRYPTO;
#elif defined(HAS_X86_SIMD)
unsigned int eax, ebx, ecx, edx;
if (__get_cpuid(7, &eax, &ebx, &ecx, &edx)) {
if (ebx & (1 << 29)) { // SHA bit
return Sha256Impl::SHA_NI;
}
}
return Sha256Impl::GENERIC;
#else
return Sha256Impl::GENERIC;
#endif
}
// Generic SHA256 implementation
class Sha256Generic {
public:
static void transform(uint32_t* state, const uint8_t* data) {
uint32_t W[64];
uint32_t a, b, c, d, e, f, g, h;
// Prepare message schedule
for (int i = 0; i < 16; ++i) {
W[i] = (data[i * 4] << 24) | (data[i * 4 + 1] << 16) |
(data[i * 4 + 2] << 8) | data[i * 4 + 3];
}
for (int i = 16; i < 64; ++i) {
uint32_t s0 = (W[i-15] >> 7 | W[i-15] << 25) ^
(W[i-15] >> 18 | W[i-15] << 14) ^
(W[i-15] >> 3);
uint32_t s1 = (W[i-2] >> 17 | W[i-2] << 15) ^
(W[i-2] >> 19 | W[i-2] << 13) ^
(W[i-2] >> 10);
W[i] = W[i-16] + s0 + W[i-7] + s1;
}
// Initialize working variables
a = state[0]; b = state[1]; c = state[2]; d = state[3];
e = state[4]; f = state[5]; g = state[6]; h = state[7];
// Main loop
for (int i = 0; i < 64; ++i) {
uint32_t S1 = (e >> 6 | e << 26) ^ (e >> 11 | e << 21) ^ (e >> 25 | e << 7);
uint32_t ch = (e & f) ^ ((~e) & g);
uint32_t temp1 = h + S1 + ch + K[i] + W[i];
uint32_t S0 = (a >> 2 | a << 30) ^ (a >> 13 | a << 19) ^ (a >> 22 | a << 10);
uint32_t maj = (a & b) ^ (a & c) ^ (b & c);
uint32_t temp2 = S0 + maj;
h = g; g = f; f = e; e = d + temp1;
d = c; c = b; b = a; a = temp1 + temp2;
}
// Update state
state[0] += a; state[1] += b; state[2] += c; state[3] += d;
state[4] += e; state[5] += f; state[6] += g; state[7] += h;
}
};
// Intel SHA-NI implementation (placeholder - actual implementation needs inline asm)
#if defined(HAS_X86_SIMD)
class Sha256SHA_NI {
public:
static void transform(uint32_t* state, const uint8_t* data) {
// For now, fall back to generic (full SHA-NI impl is complex)
// TODO: Implement with _mm_sha256msg1_epu32, _mm_sha256msg2_epu32, etc.
Sha256Generic::transform(state, data);
}
};
#endif
// ARMv8 crypto implementation (placeholder - actual implementation needs intrinsics)
#if defined(HAS_ARM_SIMD)
class Sha256ARMv8 {
public:
static void transform(uint32_t* state, const uint8_t* data) {
// For now, fall back to generic (full ARMv8 impl needs sha256su0, sha256su1, sha256h, sha256h2)
// TODO: Implement with vsha256su0q_u32, vsha256su1q_u32, vsha256hq_u32, vsha256h2q_u32
Sha256Generic::transform(state, data);
}
};
#endif
// SHA256 hasher class
class Sha256Hasher {
uint32_t state[8];
uint8_t buffer[64];
size_t buffer_len;
uint64_t total_len;
Sha256Impl impl;
public:
Sha256Hasher() : buffer_len(0), total_len(0) {
// Initial hash values
state[0] = 0x6a09e667;
state[1] = 0xbb67ae85;
state[2] = 0x3c6ef372;
state[3] = 0xa54ff53a;
state[4] = 0x510e527f;
state[5] = 0x9b05688c;
state[6] = 0x1f83d9ab;
state[7] = 0x5be0cd19;
impl = detect_best_impl();
}
void update(const uint8_t* data, size_t len) {
total_len += len;
// Fill buffer if there's pending data
if (buffer_len > 0) {
size_t to_copy = std::min(len, 64 - buffer_len);
std::memcpy(buffer + buffer_len, data, to_copy);
buffer_len += to_copy;
data += to_copy;
len -= to_copy;
if (buffer_len == 64) {
transform(buffer);
buffer_len = 0;
}
}
// Process full blocks
while (len >= 64) {
transform(data);
data += 64;
len -= 64;
}
// Store remaining data in buffer
if (len > 0) {
std::memcpy(buffer, data, len);
buffer_len = len;
}
}
void finalize(uint8_t* out) {
// Padding
uint64_t bit_len = total_len * 8;
buffer[buffer_len++] = 0x80;
if (buffer_len > 56) {
while (buffer_len < 64) buffer[buffer_len++] = 0;
transform(buffer);
buffer_len = 0;
}
while (buffer_len < 56) buffer[buffer_len++] = 0;
// Append length (big-endian)
for (int i = 7; i >= 0; --i) {
buffer[56 + (7 - i)] = (bit_len >> (i * 8)) & 0xff;
}
transform(buffer);
// Output (big-endian)
for (int i = 0; i < 8; ++i) {
out[i * 4] = (state[i] >> 24) & 0xff;
out[i * 4 + 1] = (state[i] >> 16) & 0xff;
out[i * 4 + 2] = (state[i] >> 8) & 0xff;
out[i * 4 + 3] = state[i] & 0xff;
}
}
static std::string bytes_to_hex(const uint8_t* data) {
static const char hex[] = "0123456789abcdef";
std::string result;
result.reserve(64);
for (int i = 0; i < 32; ++i) {
result += hex[(data[i] >> 4) & 0xf];
result += hex[data[i] & 0xf];
}
return result;
}
private:
void transform(const uint8_t* data) {
switch (impl) {
#if defined(HAS_X86_SIMD)
case Sha256Impl::SHA_NI:
Sha256SHA_NI::transform(state, data);
break;
#endif
#if defined(HAS_ARM_SIMD)
case Sha256Impl::ARMV8_CRYPTO:
Sha256ARMv8::transform(state, data);
break;
#endif
default:
Sha256Generic::transform(state, data);
}
}
};
// Thread pool for parallel hashing
class ThreadPool {
std::vector<std::thread> workers;
std::queue<std::function<void()>> tasks;
std::mutex queue_mutex;
std::condition_variable condition;
bool stop = false;
public:
ThreadPool(size_t num_threads) {
for (size_t i = 0; i < num_threads; ++i) {
workers.emplace_back([this] {
for (;;) {
std::function<void()> task;
{
std::unique_lock<std::mutex> lock(queue_mutex);
condition.wait(lock, [this] { return stop || !tasks.empty(); });
if (stop && tasks.empty()) return;
task = std::move(tasks.front());
tasks.pop();
}
task();
}
});
}
}
~ThreadPool() {
{
std::unique_lock<std::mutex> lock(queue_mutex);
stop = true;
}
condition.notify_all();
for (auto& worker : workers) {
worker.join();
}
}
void enqueue(std::function<void()> task) {
{
std::unique_lock<std::mutex> lock(queue_mutex);
tasks.emplace(std::move(task));
}
condition.notify_one();
}
};
// Context structure
struct fh_context {
std::unique_ptr<ThreadPool> pool;
uint32_t num_threads;
std::string last_error;
size_t buffer_size = 64 * 1024; // 64KB default
};
// Hash a file using mmap
static std::string hash_file_mmap(const char* path, size_t buffer_size) {
int fd = open(path, O_RDONLY);
if (fd < 0) {
return "";
}
struct stat st;
if (fstat(fd, &st) < 0) {
close(fd);
return "";
}
Sha256Hasher hasher;
if (st.st_size == 0) {
// Empty file
uint8_t result[32];
hasher.finalize(result);
close(fd);
return Sha256Hasher::bytes_to_hex(result);
}
// Memory map the file
void* mapped = mmap(nullptr, st.st_size, PROT_READ, MAP_PRIVATE, fd, 0);
if (mapped == MAP_FAILED) {
// Fall back to buffered read
std::vector<uint8_t> buffer(buffer_size);
ssize_t n;
while ((n = read(fd, buffer.data(), buffer.size())) > 0) {
hasher.update(buffer.data(), n);
}
} else {
// Process from mmap
hasher.update(static_cast<uint8_t*>(mapped), st.st_size);
munmap(mapped, st.st_size);
}
close(fd);
uint8_t result[32];
hasher.finalize(result);
return Sha256Hasher::bytes_to_hex(result);
}
// C API Implementation
fh_context_t* fh_init(uint32_t num_threads) {
auto* ctx = new fh_context_t;
if (num_threads == 0) {
num_threads = std::thread::hardware_concurrency();
if (num_threads == 0) num_threads = 4;
if (num_threads > 8) num_threads = 8; // Cap at 8
}
ctx->num_threads = num_threads;
ctx->pool = std::make_unique<ThreadPool>(num_threads);
return ctx;
}
void fh_cleanup(fh_context_t* ctx) {
delete ctx;
}
char* fh_hash_file(fh_context_t* ctx, const char* path) {
if (!ctx || !path) return nullptr;
std::string hash = hash_file_mmap(path, ctx->buffer_size);
if (hash.empty()) {
ctx->last_error = "Failed to hash file: " + std::string(path);
return nullptr;
}
char* result = new char[hash.size() + 1];
std::strcpy(result, hash.c_str());
return result;
}
char* fh_hash_directory(fh_context_t* ctx, const char* path) {
if (!ctx || !path) return nullptr;
// Collect all files
std::vector<std::string> files;
try {
for (const auto& entry : fs::recursive_directory_iterator(path)) {
if (entry.is_regular_file()) {
files.push_back(entry.path().string());
}
}
} catch (...) {
ctx->last_error = "Failed to scan directory";
return nullptr;
}
if (files.empty()) {
// Empty directory
Sha256Hasher hasher;
uint8_t result[32];
hasher.finalize(result);
std::string hash = Sha256Hasher::bytes_to_hex(result);
char* out = new char[hash.size() + 1];
std::strcpy(out, hash.c_str());
return out;
}
// Sort for deterministic order
std::sort(files.begin(), files.end());
// Parallel hash all files
std::vector<std::string> hashes(files.size());
std::mutex error_mutex;
bool has_error = false;
std::atomic<size_t> completed(0);
for (size_t i = 0; i < files.size(); ++i) {
ctx->pool->enqueue([&, i]() {
hashes[i] = hash_file_mmap(files[i].c_str(), ctx->buffer_size);
if (hashes[i].empty()) {
std::lock_guard<std::mutex> lock(error_mutex);
has_error = true;
}
completed++;
});
}
// Wait for completion (poll with sleep to avoid blocking)
while (completed.load() < files.size()) {
std::this_thread::sleep_for(std::chrono::milliseconds(1));
}
if (has_error) {
ctx->last_error = "Failed to hash some files";
return nullptr;
}
// Combine hashes
Sha256Hasher final_hasher;
for (const auto& h : hashes) {
final_hasher.update(reinterpret_cast<const uint8_t*>(h.data()), h.size());
}
uint8_t result[32];
final_hasher.finalize(result);
std::string final_hash = Sha256Hasher::bytes_to_hex(result);
char* out = new char[final_hash.size() + 1];
std::strcpy(out, final_hash.c_str());
return out;
}
int fh_hash_batch(fh_context_t* ctx, const char** paths, uint32_t count, char** out_hashes) {
if (!ctx || !paths || !out_hashes || count == 0) return -1;
std::atomic<size_t> completed(0);
std::atomic<bool> has_error(false);
for (uint32_t i = 0; i < count; ++i) {
ctx->pool->enqueue([&, i]() {
std::string hash = hash_file_mmap(paths[i], ctx->buffer_size);
if (hash.empty()) {
has_error = true;
std::strcpy(out_hashes[i], "");
} else {
std::strncpy(out_hashes[i], hash.c_str(), 64);
out_hashes[i][64] = '\0';
}
completed++;
});
}
// Wait for completion
while (completed.load() < count) {
std::this_thread::sleep_for(std::chrono::milliseconds(1));
}
return has_error ? -1 : 0;
}
char* fh_hash_directory_combined(fh_context_t* ctx, const char* dir_path) {
// Same as fh_hash_directory
return fh_hash_directory(ctx, dir_path);
}
void fh_free_string(char* str) {
delete[] str;
}
const char* fh_last_error(fh_context_t* ctx) {
if (!ctx || ctx->last_error.empty()) return nullptr;
return ctx->last_error.c_str();
}
int fh_has_simd_sha256(void) {
Sha256Impl impl = detect_best_impl();
return (impl == Sha256Impl::SHA_NI || impl == Sha256Impl::ARMV8_CRYPTO) ? 1 : 0;
}
const char* fh_get_simd_impl_name(void) {
Sha256Impl impl = detect_best_impl();
switch (impl) {
#if defined(HAS_X86_SIMD)
case Sha256Impl::SHA_NI:
return "SHA-NI";
#endif
#if defined(HAS_ARM_SIMD)
case Sha256Impl::ARMV8_CRYPTO:
return "ARMv8";
#endif
default:
return "generic";
}
}

View file

@ -0,0 +1,77 @@
#ifndef DATASET_HASH_H
#define DATASET_HASH_H
#include <stddef.h>
#include <stdint.h>
#ifdef __cplusplus
extern "C" {
#endif
// Opaque handle for hash context
typedef struct fh_context fh_context_t;
// Initialize hash context with thread pool
// num_threads: 0 = auto-detect (use number of CPU cores, capped at 8)
fh_context_t* fh_init(uint32_t num_threads);
// Cleanup context
void fh_cleanup(fh_context_t* ctx);
// Hash a single file (mmap + SIMD SHA256)
// Returns: hex string (caller frees with fh_free_string)
// Note: For batch operations, use fh_hash_directory_batch to amortize CGo overhead
char* fh_hash_file(fh_context_t* ctx, const char* path);
// Hash entire directory (parallel file hashing with combined result)
// Uses worker pool internally, returns single combined hash
// Returns: hex string (caller frees with fh_free_string)
char* fh_hash_directory(fh_context_t* ctx, const char* path);
// Batch hash multiple files (single CGo call for entire batch)
// paths: array of file paths
// count: number of paths
// out_hashes: pre-allocated array of 65-char buffers (64 hex + null terminator)
// Returns: 0 on success, -1 on error
int fh_hash_batch(fh_context_t* ctx, const char** paths, uint32_t count, char** out_hashes);
// Hash directory with batch output (get individual file hashes)
// out_hashes: pre-allocated array of 65-char buffers
// out_paths: optional array of path buffers (can be NULL)
// max_results: size of output arrays
// out_count: actual number of files hashed
// Returns: 0 on success, -1 on error
int fh_hash_directory_batch(
fh_context_t* ctx,
const char* dir_path,
char** out_hashes,
char** out_paths,
uint32_t max_results,
uint32_t* out_count
);
// Simple combined hash (single CGo call, single result)
// Best for: quick directory hash verification
// Returns: hex string (caller frees with fh_free_string)
char* fh_hash_directory_combined(fh_context_t* ctx, const char* dir_path);
// Free string returned by library
void fh_free_string(char* str);
// Error handling
const char* fh_last_error(fh_context_t* ctx);
void fh_clear_error(fh_context_t* ctx);
// Configuration
void fh_set_buffer_size(fh_context_t* ctx, size_t buffer_size);
size_t fh_get_buffer_size(fh_context_t* ctx);
// SIMD detection
int fh_has_simd_sha256(void); // Returns 1 if SIMD SHA256 available, 0 otherwise
const char* fh_get_simd_impl_name(void); // Returns "SHA-NI", "ARMv8", or "generic"
#ifdef __cplusplus
}
#endif
#endif // DATASET_HASH_H

View file

@ -0,0 +1,31 @@
add_library(queue_index SHARED
queue_index.cpp
)
target_include_directories(queue_index PUBLIC
$<BUILD_INTERFACE:${CMAKE_CURRENT_SOURCE_DIR}>
$<INSTALL_INTERFACE:include/fetchml>
)
target_link_libraries(queue_index PRIVATE
Threads::Threads
)
# Require C++20
target_compile_features(queue_index PUBLIC cxx_std_20)
# Set library properties
set_target_properties(queue_index PROPERTIES
VERSION ${PROJECT_VERSION}
SOVERSION ${PROJECT_VERSION_MAJOR}
PUBLIC_HEADER "queue_index.h"
LIBRARY_OUTPUT_DIRECTORY ${CMAKE_BINARY_DIR}/
ARCHIVE_OUTPUT_DIRECTORY ${CMAKE_BINARY_DIR}
)
# Install
install(TARGETS queue_index
LIBRARY DESTINATION lib
ARCHIVE DESTINATION lib
PUBLIC_HEADER DESTINATION include/fetchml
)

View file

@ -0,0 +1,427 @@
#include "queue_index.h"
#include <algorithm>
#include <cerrno>
#include <cstring>
#include <fcntl.h>
#include <filesystem>
#include <fstream>
#include <mutex>
#include <shared_mutex>
#include <string>
#include <string_view>
#include <sys/mman.h>
#include <sys/stat.h>
#include <unistd.h>
#include <vector>
// Binary index file format
// Magic: "FQI1" (4 bytes)
// Version: uint64_t
// Entry count: uint64_t
// Entries: fixed-size records
namespace fs = std::filesystem;
constexpr char INDEX_MAGIC[] = "FQI1";
constexpr uint64_t CURRENT_VERSION = 1;
// Arena allocator for hot path (no dynamic allocations)
class ArenaAllocator {
static constexpr size_t BUFFER_SIZE = 256 * 1024; // 256KB
alignas(64) char buffer_[BUFFER_SIZE];
size_t offset_ = 0;
bool in_use_ = false;
public:
void* allocate(size_t size, size_t align = 8) {
size_t aligned = (offset_ + align - 1) & ~(align - 1);
if (aligned + size > BUFFER_SIZE) {
return nullptr; // Arena exhausted
}
void* ptr = buffer_ + aligned;
offset_ = aligned + size;
return ptr;
}
void reset() { offset_ = 0; }
void begin() { in_use_ = true; reset(); }
void end() { in_use_ = false; }
};
// Thread-local arena for hot path operations
thread_local ArenaAllocator g_arena;
struct IndexEntry {
qi_task_t task;
uint64_t offset; // File offset for direct access
bool dirty; // Modified since last save
};
struct qi_index {
std::string queue_dir;
std::string index_path;
std::string data_dir;
// In-memory data structures
std::vector<IndexEntry> entries;
// Priority queue (max-heap by priority, then min-heap by created_at)
std::vector<size_t> heap; // Indices into entries
// Thread safety
mutable std::shared_mutex mutex;
// Error state
std::string last_error;
// Stats
uint64_t version = 0;
int64_t mtime = 0;
// Memory-mapped file
void* mmap_ptr = nullptr;
size_t mmap_size = 0;
int mmap_fd = -1;
};
// Heap comparator: higher priority first, then earlier created_at
struct HeapComparator {
const std::vector<IndexEntry>* entries;
bool operator()(size_t a, size_t b) const {
const auto& ta = (*entries)[a].task;
const auto& tb = (*entries)[b].task;
if (ta.priority != tb.priority) {
return ta.priority < tb.priority; // Max-heap: higher priority first
}
return ta.created_at > tb.created_at; // Min-heap: earlier first
}
};
static void set_error(qi_index_t* idx, const char* msg) {
if (idx) {
idx->last_error = msg;
}
}
// Ensure directory exists
static bool ensure_dir(const char* path) {
try {
fs::create_directories(path);
return true;
} catch (...) {
return false;
}
}
// Build heap from entries
static void rebuild_heap(qi_index_t* idx) {
idx->heap.clear();
HeapComparator comp{&idx->entries};
for (size_t i = 0; i < idx->entries.size(); ++i) {
if (std::strcmp(idx->entries[i].task.status, "queued") == 0) {
idx->heap.push_back(i);
}
}
std::make_heap(idx->heap.begin(), idx->heap.end(), comp);
}
// Write index to disk (binary format)
static int write_index(qi_index_t* idx) {
std::string tmp_path = idx->index_path + ".tmp";
int fd = open(tmp_path.c_str(), O_WRONLY | O_CREAT | O_TRUNC, 0640);
if (fd < 0) {
set_error(idx, "Failed to open index file for writing");
return -1;
}
// Write header
write(fd, INDEX_MAGIC, 4);
uint64_t version = CURRENT_VERSION;
write(fd, &version, sizeof(version));
uint64_t count = idx->entries.size();
write(fd, &count, sizeof(count));
// Write entries
for (const auto& entry : idx->entries) {
write(fd, &entry.task, sizeof(qi_task_t));
}
close(fd);
// Atomic rename
if (rename(tmp_path.c_str(), idx->index_path.c_str()) != 0) {
set_error(idx, "Failed to rename index file");
return -1;
}
idx->version++;
idx->mtime = time(nullptr);
return 0;
}
// Read index from disk
static int read_index(qi_index_t* idx) {
int fd = open(idx->index_path.c_str(), O_RDONLY);
if (fd < 0) {
if (errno == ENOENT) {
// No existing index - that's ok
return 0;
}
set_error(idx, "Failed to open index file for reading");
return -1;
}
// Read header
char magic[4];
if (read(fd, magic, 4) != 4 || std::memcmp(magic, INDEX_MAGIC, 4) != 0) {
close(fd);
set_error(idx, "Invalid index file magic");
return -1;
}
uint64_t version;
if (read(fd, &version, sizeof(version)) != sizeof(version)) {
close(fd);
set_error(idx, "Failed to read index version");
return -1;
}
uint64_t count;
if (read(fd, &count, sizeof(count)) != sizeof(count)) {
close(fd);
set_error(idx, "Failed to read entry count");
return -1;
}
// Read entries
idx->entries.clear();
idx->entries.reserve(count);
for (uint64_t i = 0; i < count; ++i) {
IndexEntry entry;
if (read(fd, &entry.task, sizeof(qi_task_t)) != sizeof(qi_task_t)) {
close(fd);
set_error(idx, "Failed to read entry");
return -1;
}
entry.offset = 0;
entry.dirty = false;
idx->entries.push_back(entry);
}
close(fd);
rebuild_heap(idx);
return 0;
}
// Scan data directory to rebuild index from files
static int scan_data_directory(qi_index_t* idx) {
idx->entries.clear();
try {
for (const auto& entry : fs::directory_iterator(idx->data_dir)) {
if (!entry.is_regular_file()) continue;
auto path = entry.path();
if (path.extension() != ".json") continue;
// Parse task from JSON file (simplified - just extract ID)
// In full implementation, parse full JSON
std::string filename = path.stem().string();
IndexEntry ie;
std::strncpy(ie.task.id, filename.c_str(), sizeof(ie.task.id) - 1);
ie.task.id[sizeof(ie.task.id) - 1] = '\0';
std::strcpy(ie.task.status, "queued");
ie.offset = 0;
ie.dirty = false;
idx->entries.push_back(ie);
}
} catch (...) {
set_error(idx, "Failed to scan data directory");
return -1;
}
rebuild_heap(idx);
return write_index(idx);
}
// C API Implementation
qi_index_t* qi_open(const char* queue_dir) {
if (!queue_dir) return nullptr;
auto* idx = new qi_index_t;
idx->queue_dir = queue_dir;
idx->index_path = (fs::path(queue_dir) / "pending" / ".queue.bin").string();
idx->data_dir = (fs::path(queue_dir) / "pending" / "entries").string();
// Ensure directories exist
if (!ensure_dir(idx->data_dir.c_str())) {
delete idx;
return nullptr;
}
// Try to read existing index, or build from directory
if (read_index(idx) != 0) {
// Build from scratch
if (scan_data_directory(idx) != 0) {
delete idx;
return nullptr;
}
}
return idx;
}
void qi_close(qi_index_t* idx) {
if (!idx) return;
// Sync any pending changes
std::unique_lock lock(idx->mutex);
write_index(idx);
delete idx;
}
int qi_add_tasks(qi_index_t* idx, const qi_task_t* tasks, uint32_t count) {
if (!idx || !tasks || count == 0) return -1;
std::unique_lock lock(idx->mutex);
// Use arena for temporary allocation
g_arena.begin();
// Add entries
for (uint32_t i = 0; i < count; ++i) {
IndexEntry entry;
entry.task = tasks[i];
entry.offset = 0;
entry.dirty = true;
idx->entries.push_back(entry);
// Add to heap if queued
if (std::strcmp(tasks[i].status, "queued") == 0) {
idx->heap.push_back(idx->entries.size() - 1);
HeapComparator comp{&idx->entries};
std::push_heap(idx->heap.begin(), idx->heap.end(), comp);
}
}
g_arena.end();
// Persist
return write_index(idx);
}
int qi_get_next_batch(qi_index_t* idx, qi_task_t* out_tasks, uint32_t max_count, uint32_t* out_count) {
if (!idx || !out_tasks || max_count == 0 || !out_count) return -1;
std::unique_lock lock(idx->mutex);
*out_count = 0;
HeapComparator comp{&idx->entries};
while (*out_count < max_count && !idx->heap.empty()) {
// Pop highest priority task
std::pop_heap(idx->heap.begin(), idx->heap.end(), comp);
size_t entry_idx = idx->heap.back();
idx->heap.pop_back();
// Copy to output
out_tasks[*out_count] = idx->entries[entry_idx].task;
// Mark as running
std::strcpy(idx->entries[entry_idx].task.status, "running");
idx->entries[entry_idx].dirty = true;
(*out_count)++;
}
if (*out_count > 0) {
write_index(idx);
}
return 0;
}
int qi_get_task_by_id(qi_index_t* idx, const char* task_id, qi_task_t* out_task) {
if (!idx || !task_id || !out_task) return -1;
std::shared_lock lock(idx->mutex);
for (const auto& entry : idx->entries) {
if (std::strcmp(entry.task.id, task_id) == 0) {
*out_task = entry.task;
return 0;
}
}
return -1; // Not found
}
int qi_get_all_tasks(qi_index_t* idx, qi_task_t** out_tasks, size_t* count) {
if (!idx || !out_tasks || !count) return -1;
std::shared_lock lock(idx->mutex);
*count = idx->entries.size();
if (*count == 0) {
*out_tasks = nullptr;
return 0;
}
*out_tasks = new qi_task_t[*count];
for (size_t i = 0; i < *count; ++i) {
(*out_tasks)[i] = idx->entries[i].task;
}
return 0;
}
void qi_free_task_array(qi_task_t* tasks) {
delete[] tasks;
}
const char* qi_last_error(qi_index_t* idx) {
if (!idx || idx->last_error.empty()) return nullptr;
return idx->last_error.c_str();
}
void qi_clear_error(qi_index_t* idx) {
if (idx) {
idx->last_error.clear();
}
}
uint64_t qi_get_index_version(qi_index_t* idx) {
if (!idx) return 0;
std::shared_lock lock(idx->mutex);
return idx->version;
}
size_t qi_get_task_count(qi_index_t* idx, const char* status) {
if (!idx) return 0;
std::shared_lock lock(idx->mutex);
if (!status || status[0] == '\0') {
return idx->entries.size();
}
size_t count = 0;
for (const auto& entry : idx->entries) {
if (std::strcmp(entry.task.status, status) == 0) {
count++;
}
}
return count;
}

View file

@ -0,0 +1,65 @@
#ifndef QUEUE_INDEX_H
#define QUEUE_INDEX_H
#include <stddef.h>
#include <stdint.h>
#ifdef __cplusplus
extern "C" {
#endif
// Opaque handle for queue index
typedef struct qi_index qi_index_t;
// Task structure - matches Go queue.Task fields
// Fixed-size for binary format (no dynamic allocation in hot path)
typedef struct qi_task {
char id[64]; // Task ID
char job_name[128]; // Job name
int64_t priority; // Higher = more important
int64_t created_at; // Unix timestamp (nanoseconds)
int64_t next_retry; // Unix timestamp (nanoseconds), 0 if none
char status[16]; // "queued", "running", "finished", "failed"
uint32_t retries; // Current retry count
} qi_task_t;
// Index operations
qi_index_t* qi_open(const char* queue_dir);
void qi_close(qi_index_t* idx);
// Batch operations (amortize CGo overhead)
int qi_add_tasks(qi_index_t* idx, const qi_task_t* tasks, uint32_t count);
int qi_update_tasks(qi_index_t* idx, const qi_task_t* tasks, uint32_t count);
int qi_remove_tasks(qi_index_t* idx, const char** task_ids, uint32_t count);
// Priority queue operations
int qi_get_next_batch(qi_index_t* idx, qi_task_t* out_tasks, uint32_t max_count, uint32_t* out_count);
int qi_peek_next(qi_index_t* idx, qi_task_t* out_task);
// Query operations
int qi_get_task_by_id(qi_index_t* idx, const char* task_id, qi_task_t* out_task);
int qi_get_all_tasks(qi_index_t* idx, qi_task_t** out_tasks, size_t* count);
int qi_get_tasks_by_status(qi_index_t* idx, const char* status, qi_task_t** out_tasks, size_t* count);
// Index maintenance
int qi_rebuild_index(qi_index_t* idx);
int qi_compact_index(qi_index_t* idx);
// Memory management
void qi_free_task_array(qi_task_t* tasks);
void qi_free_string_array(char** strings, size_t count);
// Error handling
const char* qi_last_error(qi_index_t* idx);
void qi_clear_error(qi_index_t* idx);
// Utility
uint64_t qi_get_index_version(qi_index_t* idx);
int64_t qi_get_index_mtime(qi_index_t* idx);
size_t qi_get_task_count(qi_index_t* idx, const char* status);
#ifdef __cplusplus
}
#endif
#endif // QUEUE_INDEX_H

View file

@ -0,0 +1,32 @@
add_library(streaming_io SHARED
streaming_io.cpp
)
target_include_directories(streaming_io PUBLIC
$<BUILD_INTERFACE:${CMAKE_CURRENT_SOURCE_DIR}>
$<INSTALL_INTERFACE:include/fetchml>
)
# Find zlib for gzip decompression
find_package(ZLIB REQUIRED)
target_link_libraries(streaming_io PRIVATE
ZLIB::ZLIB
Threads::Threads
)
target_compile_features(streaming_io PUBLIC cxx_std_20)
set_target_properties(streaming_io PROPERTIES
VERSION ${PROJECT_VERSION}
SOVERSION ${PROJECT_VERSION_MAJOR}
PUBLIC_HEADER "streaming_io.h"
LIBRARY_OUTPUT_DIRECTORY ${CMAKE_BINARY_DIR}
ARCHIVE_OUTPUT_DIRECTORY ${CMAKE_BINARY_DIR}
)
install(TARGETS streaming_io
LIBRARY DESTINATION lib
ARCHIVE DESTINATION lib
PUBLIC_HEADER DESTINATION include/fetchml
)

View file

@ -0,0 +1,281 @@
#include "streaming_io.h"
#include <algorithm>
#include <cerrno>
#include <cstring>
#include <fcntl.h>
#include <filesystem>
#include <mutex>
#include <thread>
#include <vector>
#include <zlib.h>
#include <sys/mman.h>
#include <sys/stat.h>
#include <unistd.h>
// Simplified tar parsing (USTAR format)
// For production, use libarchive or similar
namespace fs = std::filesystem;
struct TarHeader {
char name[100];
char mode[8];
char uid[8];
char gid[8];
char size[12];
char mtime[12];
char checksum[8];
char typeflag;
char linkname[100];
char magic[6];
char version[2];
char uname[32];
char gname[32];
char devmajor[8];
char devminor[8];
char prefix[155];
};
static uint64_t parse_octal(const char* str, size_t len) {
uint64_t result = 0;
for (size_t i = 0; i < len && str[i]; ++i) {
if (str[i] >= '0' && str[i] <= '7') {
result = result * 8 + (str[i] - '0');
}
}
return result;
}
struct ExtractTask {
std::string path;
uint64_t offset;
uint64_t size;
uint32_t mode;
};
struct sio_extractor {
uint32_t num_threads;
sio_progress_cb progress_cb = nullptr;
sio_error_cb error_cb = nullptr;
void* user_data = nullptr;
std::string last_error;
std::mutex error_mutex;
uint64_t bytes_extracted = 0;
uint64_t bytes_written = 0;
std::mutex stats_mutex;
};
sio_extractor_t* sio_create_extractor(uint32_t num_threads) {
auto* ex = new sio_extractor_t;
if (num_threads == 0) {
num_threads = std::thread::hardware_concurrency();
if (num_threads == 0) num_threads = 4;
if (num_threads > 8) num_threads = 8;
}
ex->num_threads = num_threads;
return ex;
}
void sio_destroy_extractor(sio_extractor_t* ex) {
delete ex;
}
void sio_set_progress_cb(sio_extractor_t* ex, sio_progress_cb cb, void* user_data) {
if (ex) {
ex->progress_cb = cb;
ex->user_data = user_data;
}
}
void sio_set_error_cb(sio_extractor_t* ex, sio_error_cb cb, void* user_data) {
if (ex) {
ex->error_cb = cb;
ex->user_data = user_data;
}
}
static void set_error(sio_extractor_t* ex, const char* msg) {
if (ex) {
std::lock_guard<std::mutex> lock(ex->error_mutex);
ex->last_error = msg;
}
}
// Extract a single file from gzipped tar
static int extract_file(const char* archive_path, uint64_t offset, uint64_t size,
const char* dst_path, uint32_t mode, sio_extractor_t* ex) {
// Open archive
gzFile gz = gzopen(archive_path, "rb");
if (!gz) {
set_error(ex, "Failed to open archive");
return -1;
}
// Seek to offset (gzseek is slow but necessary)
if (gzseek(gz, offset, SEEK_SET) < 0) {
gzclose(gz);
set_error(ex, "Failed to seek in archive");
return -1;
}
// Ensure destination directory exists
fs::path dst(dst_path);
fs::create_directories(dst.parent_path());
// Open output file
int fd = open(dst_path, O_WRONLY | O_CREAT | O_TRUNC, mode);
if (fd < 0) {
gzclose(gz);
set_error(ex, "Failed to create output file");
return -1;
}
// Extract data
std::vector<uint8_t> buffer(64 * 1024); // 64KB buffer
uint64_t remaining = size;
uint64_t extracted = 0;
while (remaining > 0) {
size_t to_read = std::min<size_t>(buffer.size(), remaining);
int n = gzread(gz, buffer.data(), to_read);
if (n <= 0) {
close(fd);
gzclose(gz);
set_error(ex, "Failed to read from archive");
return -1;
}
ssize_t written = write(fd, buffer.data(), n);
if (written != n) {
close(fd);
gzclose(gz);
set_error(ex, "Failed to write output file");
return -1;
}
remaining -= n;
extracted += n;
// Update stats
if (ex) {
std::lock_guard<std::mutex> lock(ex->stats_mutex);
ex->bytes_extracted += n;
ex->bytes_written += written;
}
// Progress callback
if (ex && ex->progress_cb) {
ex->progress_cb(dst_path, extracted, size, ex->user_data);
}
}
close(fd);
gzclose(gz);
return 0;
}
int sio_extract_tar_gz(sio_extractor_t* ex, const char* archive_path, const char* dst_dir) {
if (!ex || !archive_path || !dst_dir) return -1;
// Open archive
gzFile gz = gzopen(archive_path, "rb");
if (!gz) {
set_error(ex, "Failed to open archive");
return -1;
}
// Read and parse tar headers
std::vector<ExtractTask> tasks;
uint64_t offset = 0;
while (true) {
TarHeader header;
int n = gzread(gz, &header, sizeof(TarHeader));
if (n != sizeof(TarHeader)) {
break; // End of archive or error
}
// Check for empty block (end of archive)
bool empty = true;
for (size_t i = 0; i < sizeof(TarHeader); ++i) {
if (reinterpret_cast<uint8_t*>(&header)[i] != 0) {
empty = false;
break;
}
}
if (empty) {
break;
}
// Parse header
uint64_t file_size = parse_octal(header.size, 12);
uint32_t mode = parse_octal(header.mode, 8);
// Build full path
std::string path;
if (header.prefix[0]) {
path = std::string(header.prefix) + "/" + header.name;
} else {
path = header.name;
}
// Skip directories and other special files for now
if (header.typeflag == '0' || header.typeflag == '\0') {
ExtractTask task;
task.path = (fs::path(dst_dir) / path).string();
task.offset = offset + sizeof(TarHeader);
task.size = file_size;
task.mode = mode;
tasks.push_back(task);
}
// Skip to next header (file size rounded up to 512 bytes)
uint64_t skip_size = (file_size + 511) & ~511;
if (gzseek(gz, skip_size, SEEK_CUR) < 0) {
break;
}
offset += sizeof(TarHeader) + skip_size;
}
gzclose(gz);
if (tasks.empty()) {
set_error(ex, "No files found in archive");
return -1;
}
// Extract files (single-threaded for now - parallel extraction needs block independence)
for (const auto& task : tasks) {
if (extract_file(archive_path, task.offset, task.size,
task.path.c_str(), task.mode, ex) != 0) {
return -1;
}
}
return 0;
}
const char* sio_last_error(sio_extractor_t* ex) {
if (!ex) return nullptr;
std::lock_guard<std::mutex> lock(ex->error_mutex);
return ex->last_error.empty() ? nullptr : ex->last_error.c_str();
}
uint64_t sio_get_bytes_extracted(sio_extractor_t* ex) {
if (!ex) return 0;
std::lock_guard<std::mutex> lock(ex->stats_mutex);
return ex->bytes_extracted;
}
uint64_t sio_get_bytes_written(sio_extractor_t* ex) {
if (!ex) return 0;
std::lock_guard<std::mutex> lock(ex->stats_mutex);
return ex->bytes_written;
}

View file

@ -0,0 +1,48 @@
#ifndef STREAMING_IO_H
#define STREAMING_IO_H
#include <stddef.h>
#include <stdint.h>
#ifdef __cplusplus
extern "C" {
#endif
// Opaque handle for extractor
typedef struct sio_extractor sio_extractor_t;
// Progress callback
typedef void (*sio_progress_cb)(const char* path, uint64_t bytes_done, uint64_t bytes_total, void* user_data);
// Error callback
typedef void (*sio_error_cb)(const char* path, const char* error, void* user_data);
// Extractor operations
sio_extractor_t* sio_create_extractor(uint32_t num_threads);
void sio_destroy_extractor(sio_extractor_t* ex);
// Set callbacks
void sio_set_progress_cb(sio_extractor_t* ex, sio_progress_cb cb, void* user_data);
void sio_set_error_cb(sio_extractor_t* ex, sio_error_cb cb, void* user_data);
// Extract tar.gz
// Uses: mmap + parallel decompression + O_DIRECT for large files
// Returns: 0 on success, -1 on error
int sio_extract_tar_gz(sio_extractor_t* ex, const char* archive_path, const char* dst_dir);
// Extract with hash verification
int sio_extract_with_verification(sio_extractor_t* ex, const char* archive_path,
const char* dst_dir, const char* expected_sha256);
// Get last error
const char* sio_last_error(sio_extractor_t* ex);
// Utility
uint64_t sio_get_bytes_extracted(sio_extractor_t* ex);
uint64_t sio_get_bytes_written(sio_extractor_t* ex);
#ifdef __cplusplus
}
#endif
#endif // STREAMING_IO_H

View file

@ -102,4 +102,16 @@ func BenchmarkDirOverallSHA256HexLarge(b *testing.B) {
}
}
})
b.Run("Native", func(b *testing.B) {
// This requires FETCHML_NATIVE_LIBS=1 to actually use native
// Otherwise falls back to Go implementation
b.ReportAllocs()
for i := 0; i < b.N; i++ {
_, err := worker.DirOverallSHA256Hex(tmpDir)
if err != nil {
b.Fatal(err)
}
}
})
}

View file

@ -0,0 +1,65 @@
package benchmarks
import (
"os"
"testing"
"github.com/jfraeys/fetch_ml/internal/worker"
)
// TestGoNativeLeakStress runs 1000 iterations through Go->C++ integration
func TestGoNativeLeakStress(t *testing.T) {
tmpDir := t.TempDir()
// Create multiple test files
for i := 0; i < 10; i++ {
content := make([]byte, 1024*1024) // 1MB each
for j := range content {
content[j] = byte(i * j)
}
if err := os.WriteFile(tmpDir+"/test_"+string(rune('a'+i))+".dat", content, 0644); err != nil {
t.Fatal(err)
}
}
// Run 1000 hash operations through Go wrapper
for i := 0; i < 1000; i++ {
hash, err := worker.DirOverallSHA256Hex(tmpDir)
if err != nil {
t.Fatalf("Hash %d failed: %v", i, err)
}
if len(hash) != 64 {
t.Fatalf("Hash %d: expected 64 chars, got %d", i, len(hash))
}
if i%100 == 0 {
t.Logf("Completed %d iterations", i)
}
}
t.Logf("Completed 1000 iterations through Go->C++ integration")
}
// TestGoNativeArtifactScanLeak tests artifact scanner through Go
func TestGoNativeArtifactScanLeak(t *testing.T) {
tmpDir := t.TempDir()
// Create test files
for i := 0; i < 50; i++ {
if err := os.WriteFile(tmpDir+"/file_"+string(rune('a'+i%26))+".txt", []byte("data"), 0644); err != nil {
t.Fatal(err)
}
}
// Run 100 scans
for i := 0; i < 100; i++ {
_, err := worker.ScanArtifactsNative(tmpDir)
if err != nil {
t.Logf("Scan %d: %v (may be expected if native disabled)", i, err)
}
if i%25 == 0 {
t.Logf("Completed %d scans", i)
}
}
}

View file

@ -0,0 +1,34 @@
package benchmarks
import (
"os"
"testing"
"github.com/jfraeys/fetch_ml/internal/worker"
)
// TestNativeIntegration verifies native libraries are callable
func TestNativeIntegration(t *testing.T) {
// Check SIMD detection works
simdName := worker.GetSIMDImplName()
t.Logf("SIMD implementation: %s", simdName)
simdAvailable := worker.HasSIMDSHA256()
t.Logf("SIMD available: %v", simdAvailable)
// Create test directory
tmpDir := t.TempDir()
if err := os.WriteFile(tmpDir+"/test.txt", []byte("hello world"), 0644); err != nil {
t.Fatal(err)
}
// Test hashing works
hash, err := worker.DirOverallSHA256Hex(tmpDir)
if err != nil {
t.Fatalf("Hash failed: %v", err)
}
if len(hash) != 64 {
t.Fatalf("Expected 64 char hash, got %d: %s", len(hash), hash)
}
t.Logf("Hash result: %s", hash)
}