package worker // #cgo LDFLAGS: -L${SRCDIR}/../../native/build -Wl,-rpath,${SRCDIR}/../../native/build -lqueue_index -ldataset_hash -lartifact_scanner -lstreaming_io // #include "../../native/queue_index/queue_index.h" // #include "../../native/dataset_hash/dataset_hash.h" // #include "../../native/artifact_scanner/artifact_scanner.h" // #include "../../native/streaming_io/streaming_io.h" // #include import "C" import ( "errors" "log" "os" "time" "unsafe" "github.com/jfraeys/fetch_ml/internal/manifest" "github.com/jfraeys/fetch_ml/internal/queue" ) // UseNativeLibs controls whether to use C++ implementations. // Set FETCHML_NATIVE_LIBS=1 to enable native libraries. var UseNativeLibs = os.Getenv("FETCHML_NATIVE_LIBS") == "1" func init() { if UseNativeLibs { log.Printf("[native] Native libraries enabled (SIMD: %s)", GetSIMDImplName()) } else { log.Printf("[native] Native libraries disabled (set FETCHML_NATIVE_LIBS=1 to enable)") } } // ============================================================================ // Dataset Hash (dirOverallSHA256Hex) // ============================================================================ // dirOverallSHA256Hex selects implementation based on toggle. // Native version uses mmap + SIMD SHA256 + parallel processing. func dirOverallSHA256Hex(root string) (string, error) { if !UseNativeLibs { return dirOverallSHA256HexGo(root) } return dirOverallSHA256HexNative(root) } // dirOverallSHA256HexNative calls C++ implementation. // Single CGo crossing for entire directory. func dirOverallSHA256HexNative(root string) (string, error) { ctx := C.fh_init(0) // 0 = auto-detect threads if ctx == nil { return "", errors.New("failed to initialize native hash context") } defer C.fh_cleanup(ctx) croot := C.CString(root) defer C.free(unsafe.Pointer(croot)) result := C.fh_hash_directory_combined(ctx, croot) if result == nil { err := C.fh_last_error(ctx) if err != nil { return "", errors.New(C.GoString(err)) } return "", errors.New("native hash failed") } defer C.fh_free_string(result) return C.GoString(result), nil } // HashFilesBatchNative batch hashes files using native library. // Amortizes CGo overhead across multiple files. func HashFilesBatchNative(paths []string) ([]string, error) { if len(paths) == 0 { return []string{}, nil } ctx := C.fh_init(0) if ctx == nil { return nil, errors.New("failed to initialize native hash context") } defer C.fh_cleanup(ctx) // Convert Go strings to C strings cPaths := make([]*C.char, len(paths)) for i, p := range paths { cPaths[i] = C.CString(p) } defer func() { for _, p := range cPaths { C.free(unsafe.Pointer(p)) } }() // Allocate output buffers (65 chars: 64 hex + null) outHashes := make([]*C.char, len(paths)) for i := range outHashes { outHashes[i] = (*C.char)(C.malloc(65)) } defer func() { for _, p := range outHashes { C.free(unsafe.Pointer(p)) } }() // Single CGo call for entire batch rc := C.fh_hash_batch(ctx, &cPaths[0], C.uint32_t(len(paths)), &outHashes[0]) if rc != 0 { err := C.fh_last_error(ctx) if err != nil { return nil, errors.New(C.GoString(err)) } return nil, errors.New("batch hash failed") } // Convert results to Go strings hashes := make([]string, len(paths)) for i, h := range outHashes { hashes[i] = C.GoString(h) } return hashes, nil } // GetSIMDImplName returns the native SHA256 implementation name. func GetSIMDImplName() string { return C.GoString(C.fh_get_simd_impl_name()) } // HasSIMDSHA256 returns true if SIMD SHA256 is available. func HasSIMDSHA256() bool { return C.fh_has_simd_sha256() == 1 } // ============================================================================ // Artifact Scanner // ============================================================================ // ScanArtifactsNative uses C++ fast directory traversal. func ScanArtifactsNative(runDir string) (*manifest.Artifacts, error) { scanner := C.as_create(nil, 0) if scanner == nil { return nil, errors.New("failed to create native scanner") } defer C.as_destroy(scanner) cRunDir := C.CString(runDir) defer C.free(unsafe.Pointer(cRunDir)) result := C.as_scan_directory(scanner, cRunDir) if result == nil { err := C.as_last_error(scanner) if err != nil { return nil, errors.New(C.GoString(err)) } return nil, errors.New("native scan failed") } defer C.as_free_result(result) // Convert C result to Go Artifacts files := make([]manifest.ArtifactFile, result.count) for i := 0; i < int(result.count); i++ { art := (*C.as_artifact_t)(unsafe.Pointer(uintptr(unsafe.Pointer(result.artifacts)) + uintptr(i)*unsafe.Sizeof(C.as_artifact_t{}))) files[i] = manifest.ArtifactFile{ Path: C.GoString(&art.path[0]), SizeBytes: int64(art.size_bytes), Modified: time.Unix(int64(art.mtime), 0), } } return &manifest.Artifacts{ DiscoveryTime: time.Now(), // Native doesn't return this directly Files: files, TotalSizeBytes: int64(result.total_size), }, nil } // ============================================================================ // Streaming I/O (Tar.gz extraction) // ============================================================================ // ExtractTarGzNative uses C++ parallel decompression. func ExtractTarGzNative(archivePath, dstDir string) error { ex := C.sio_create_extractor(0) // 0 = auto-detect threads if ex == nil { return errors.New("failed to create native extractor") } defer C.sio_destroy_extractor(ex) cArchive := C.CString(archivePath) defer C.free(unsafe.Pointer(cArchive)) cDst := C.CString(dstDir) defer C.free(unsafe.Pointer(cDst)) rc := C.sio_extract_tar_gz(ex, cArchive, cDst) if rc != 0 { err := C.sio_last_error(ex) if err != nil { return errors.New(C.GoString(err)) } return errors.New("native extraction failed") } return nil } // ============================================================================ // Queue Index (for future filesystem queue integration) // ============================================================================ // QueueIndexNative provides access to native binary queue index. type QueueIndexNative struct { handle *C.qi_index_t } // OpenQueueIndexNative opens a native queue index. func OpenQueueIndexNative(queueDir string) (*QueueIndexNative, error) { cDir := C.CString(queueDir) defer C.free(unsafe.Pointer(cDir)) handle := C.qi_open(cDir) if handle == nil { return nil, errors.New("failed to open native queue index") } return &QueueIndexNative{handle: handle}, nil } // Close closes the native queue index. func (qi *QueueIndexNative) Close() { if qi.handle != nil { C.qi_close(qi.handle) qi.handle = nil } } // AddTasks adds tasks to the native index. func (qi *QueueIndexNative) AddTasks(tasks []*queue.Task) error { if qi.handle == nil { return errors.New("index not open") } // Convert Go tasks to C tasks cTasks := make([]C.qi_task_t, len(tasks)) for i, t := range tasks { copyStringToC(t.ID, cTasks[i].id[:], 64) copyStringToC(t.JobName, cTasks[i].job_name[:], 128) cTasks[i].priority = C.int64_t(t.Priority) cTasks[i].created_at = C.int64_t(time.Now().UnixNano()) } rc := C.qi_add_tasks(qi.handle, &cTasks[0], C.uint32_t(len(tasks))) if rc != 0 { err := C.qi_last_error(qi.handle) if err != nil { return errors.New(C.GoString(err)) } return errors.New("failed to add tasks") } return nil } // Helper function to copy Go string to fixed-size C char array func copyStringToC(src string, dst []C.char, maxLen int) { n := len(src) if n > maxLen-1 { n = maxLen - 1 } for i := 0; i < n; i++ { dst[i] = C.char(src[i]) } dst[n] = 0 }