fetch_ml/internal/worker/native_bridge.go
Jeremie Fraeys d408a60eb1
Some checks failed
Documentation / build-and-publish (push) Waiting to run
Test / test (push) Waiting to run
Checkout test / test (push) Successful in 5s
CI with Native Libraries / test-native (push) Has been cancelled
CI with Native Libraries / build-release (push) Has been cancelled
ci: push all workflow updates
2026-02-12 13:28:15 -05:00

276 lines
7.5 KiB
Go

package worker
// #cgo LDFLAGS: -L${SRCDIR}/../../native/build -Wl,-rpath,${SRCDIR}/../../native/build -lqueue_index -ldataset_hash -lartifact_scanner -lstreaming_io
// #include "../../native/queue_index/queue_index.h"
// #include "../../native/dataset_hash/dataset_hash.h"
// #include "../../native/artifact_scanner/artifact_scanner.h"
// #include "../../native/streaming_io/streaming_io.h"
// #include <stdlib.h>
import "C"
import (
"errors"
"log"
"os"
"time"
"unsafe"
"github.com/jfraeys/fetch_ml/internal/manifest"
"github.com/jfraeys/fetch_ml/internal/queue"
)
// UseNativeLibs controls whether to use C++ implementations.
// Set FETCHML_NATIVE_LIBS=1 to enable native libraries.
var UseNativeLibs = os.Getenv("FETCHML_NATIVE_LIBS") == "1"
func init() {
if UseNativeLibs {
log.Printf("[native] Native libraries enabled (SIMD: %s)", GetSIMDImplName())
} else {
log.Printf("[native] Native libraries disabled (set FETCHML_NATIVE_LIBS=1 to enable)")
}
}
// ============================================================================
// Dataset Hash (dirOverallSHA256Hex)
// ============================================================================
// dirOverallSHA256Hex selects implementation based on toggle.
// Native version uses mmap + SIMD SHA256 + parallel processing.
func dirOverallSHA256Hex(root string) (string, error) {
if !UseNativeLibs {
return dirOverallSHA256HexGo(root)
}
return dirOverallSHA256HexNative(root)
}
// dirOverallSHA256HexNative calls C++ implementation.
// Single CGo crossing for entire directory.
func dirOverallSHA256HexNative(root string) (string, error) {
ctx := C.fh_init(0) // 0 = auto-detect threads
if ctx == nil {
return "", errors.New("failed to initialize native hash context")
}
defer C.fh_cleanup(ctx)
croot := C.CString(root)
defer C.free(unsafe.Pointer(croot))
result := C.fh_hash_directory_combined(ctx, croot)
if result == nil {
err := C.fh_last_error(ctx)
if err != nil {
return "", errors.New(C.GoString(err))
}
return "", errors.New("native hash failed")
}
defer C.fh_free_string(result)
return C.GoString(result), nil
}
// HashFilesBatchNative batch hashes files using native library.
// Amortizes CGo overhead across multiple files.
func HashFilesBatchNative(paths []string) ([]string, error) {
if len(paths) == 0 {
return []string{}, nil
}
ctx := C.fh_init(0)
if ctx == nil {
return nil, errors.New("failed to initialize native hash context")
}
defer C.fh_cleanup(ctx)
// Convert Go strings to C strings
cPaths := make([]*C.char, len(paths))
for i, p := range paths {
cPaths[i] = C.CString(p)
}
defer func() {
for _, p := range cPaths {
C.free(unsafe.Pointer(p))
}
}()
// Allocate output buffers (65 chars: 64 hex + null)
outHashes := make([]*C.char, len(paths))
for i := range outHashes {
outHashes[i] = (*C.char)(C.malloc(65))
}
defer func() {
for _, p := range outHashes {
C.free(unsafe.Pointer(p))
}
}()
// Single CGo call for entire batch
rc := C.fh_hash_batch(ctx, &cPaths[0], C.uint32_t(len(paths)), &outHashes[0])
if rc != 0 {
err := C.fh_last_error(ctx)
if err != nil {
return nil, errors.New(C.GoString(err))
}
return nil, errors.New("batch hash failed")
}
// Convert results to Go strings
hashes := make([]string, len(paths))
for i, h := range outHashes {
hashes[i] = C.GoString(h)
}
return hashes, nil
}
// GetSIMDImplName returns the native SHA256 implementation name.
func GetSIMDImplName() string {
return C.GoString(C.fh_get_simd_impl_name())
}
// HasSIMDSHA256 returns true if SIMD SHA256 is available.
func HasSIMDSHA256() bool {
return C.fh_has_simd_sha256() == 1
}
// ============================================================================
// Artifact Scanner
// ============================================================================
// ScanArtifactsNative uses C++ fast directory traversal.
func ScanArtifactsNative(runDir string) (*manifest.Artifacts, error) {
scanner := C.as_create(nil, 0)
if scanner == nil {
return nil, errors.New("failed to create native scanner")
}
defer C.as_destroy(scanner)
cRunDir := C.CString(runDir)
defer C.free(unsafe.Pointer(cRunDir))
result := C.as_scan_directory(scanner, cRunDir)
if result == nil {
err := C.as_last_error(scanner)
if err != nil {
return nil, errors.New(C.GoString(err))
}
return nil, errors.New("native scan failed")
}
defer C.as_free_result(result)
// Convert C result to Go Artifacts
files := make([]manifest.ArtifactFile, result.count)
for i := 0; i < int(result.count); i++ {
art := (*C.as_artifact_t)(unsafe.Pointer(uintptr(unsafe.Pointer(result.artifacts)) + uintptr(i)*unsafe.Sizeof(C.as_artifact_t{})))
files[i] = manifest.ArtifactFile{
Path: C.GoString(&art.path[0]),
SizeBytes: int64(art.size_bytes),
Modified: time.Unix(int64(art.mtime), 0),
}
}
return &manifest.Artifacts{
DiscoveryTime: time.Now(), // Native doesn't return this directly
Files: files,
TotalSizeBytes: int64(result.total_size),
}, nil
}
// ============================================================================
// Streaming I/O (Tar.gz extraction)
// ============================================================================
// ExtractTarGzNative uses C++ parallel decompression.
func ExtractTarGzNative(archivePath, dstDir string) error {
ex := C.sio_create_extractor(0) // 0 = auto-detect threads
if ex == nil {
return errors.New("failed to create native extractor")
}
defer C.sio_destroy_extractor(ex)
cArchive := C.CString(archivePath)
defer C.free(unsafe.Pointer(cArchive))
cDst := C.CString(dstDir)
defer C.free(unsafe.Pointer(cDst))
rc := C.sio_extract_tar_gz(ex, cArchive, cDst)
if rc != 0 {
err := C.sio_last_error(ex)
if err != nil {
return errors.New(C.GoString(err))
}
return errors.New("native extraction failed")
}
return nil
}
// ============================================================================
// Queue Index (for future filesystem queue integration)
// ============================================================================
// QueueIndexNative provides access to native binary queue index.
type QueueIndexNative struct {
handle *C.qi_index_t
}
// OpenQueueIndexNative opens a native queue index.
func OpenQueueIndexNative(queueDir string) (*QueueIndexNative, error) {
cDir := C.CString(queueDir)
defer C.free(unsafe.Pointer(cDir))
handle := C.qi_open(cDir)
if handle == nil {
return nil, errors.New("failed to open native queue index")
}
return &QueueIndexNative{handle: handle}, nil
}
// Close closes the native queue index.
func (qi *QueueIndexNative) Close() {
if qi.handle != nil {
C.qi_close(qi.handle)
qi.handle = nil
}
}
// AddTasks adds tasks to the native index.
func (qi *QueueIndexNative) AddTasks(tasks []*queue.Task) error {
if qi.handle == nil {
return errors.New("index not open")
}
// Convert Go tasks to C tasks
cTasks := make([]C.qi_task_t, len(tasks))
for i, t := range tasks {
copyStringToC(t.ID, cTasks[i].id[:], 64)
copyStringToC(t.JobName, cTasks[i].job_name[:], 128)
cTasks[i].priority = C.int64_t(t.Priority)
cTasks[i].created_at = C.int64_t(time.Now().UnixNano())
}
rc := C.qi_add_tasks(qi.handle, &cTasks[0], C.uint32_t(len(tasks)))
if rc != 0 {
err := C.qi_last_error(qi.handle)
if err != nil {
return errors.New(C.GoString(err))
}
return errors.New("failed to add tasks")
}
return nil
}
// Helper function to copy Go string to fixed-size C char array
func copyStringToC(src string, dst []C.char, maxLen int) {
n := len(src)
if n > maxLen-1 {
n = maxLen - 1
}
for i := 0; i < n; i++ {
dst[i] = C.char(src[i])
}
dst[n] = 0
}