From 96a8e139d56efe18535117016b70967537bc0308 Mon Sep 17 00:00:00 2001 From: Jeremie Fraeys Date: Wed, 18 Feb 2026 12:45:59 -0500 Subject: [PATCH] refactor(internal): update native bridge and queue integration - Improve native queue integration in protocol layer - Update native bridge library loading - Clean up queue native implementation --- internal/api/protocol.go | 34 +++++++++++++-------------- internal/queue/native_queue.go | 4 +++- internal/worker/native_bridge_libs.go | 12 ++++++++++ 3 files changed, 31 insertions(+), 19 deletions(-) diff --git a/internal/api/protocol.go b/internal/api/protocol.go index fb6d7e6..5411015 100644 --- a/internal/api/protocol.go +++ b/internal/api/protocol.go @@ -248,22 +248,20 @@ func serializePacketToBuffer(p *ResponsePacket, buf []byte) ([]byte, error) { return buf, nil } -// appendString writes a string with varint length prefix +// appendString writes a string with fixed 16-bit length prefix func appendString(buf []byte, s string) []byte { - length := uint64(len(s)) - var tmp [binary.MaxVarintLen64]byte - n := binary.PutUvarint(tmp[:], length) - buf = append(buf, tmp[:n]...) - return append(buf, s...) + length := uint16(len(s)) + buf = append(buf, byte(length>>8), byte(length)) + buf = append(buf, s...) + return buf } -// appendBytes writes bytes with varint length prefix +// appendBytes writes bytes with fixed 32-bit length prefix func appendBytes(buf []byte, b []byte) []byte { - length := uint64(len(b)) - var tmp [binary.MaxVarintLen64]byte - n := binary.PutUvarint(tmp[:], length) - buf = append(buf, tmp[:n]...) - return append(buf, b...) + length := uint32(len(b)) + buf = append(buf, byte(length>>24), byte(length>>16), byte(length>>8), byte(length)) + buf = append(buf, b...) + return buf } func appendUint32(buf []byte, value uint32) []byte { @@ -277,17 +275,17 @@ func (p *ResponsePacket) estimatedSize() int { switch p.PacketType { case PacketTypeSuccess: - return base + binary.MaxVarintLen64 + len(p.SuccessMessage) + return base + 2 + len(p.SuccessMessage) case PacketTypeError: - return base + 1 + 2*binary.MaxVarintLen64 + len(p.ErrorMessage) + len(p.ErrorDetails) + return base + 1 + 2 + len(p.ErrorMessage) + 2 + len(p.ErrorDetails) case PacketTypeProgress: - return base + 1 + 4 + 4 + binary.MaxVarintLen64 + len(p.ProgressMessage) + return base + 1 + 4 + 4 + 2 + len(p.ProgressMessage) case PacketTypeStatus: - return base + binary.MaxVarintLen64 + len(p.StatusData) + return base + 2 + len(p.StatusData) case PacketTypeData: - return base + binary.MaxVarintLen64 + len(p.DataType) + binary.MaxVarintLen64 + len(p.DataPayload) + return base + 2 + len(p.DataType) + 4 + len(p.DataPayload) case PacketTypeLog: - return base + 1 + binary.MaxVarintLen64 + len(p.LogMessage) + return base + 1 + 2 + len(p.LogMessage) default: return base } diff --git a/internal/queue/native_queue.go b/internal/queue/native_queue.go index 984fa6d..4c08144 100644 --- a/internal/queue/native_queue.go +++ b/internal/queue/native_queue.go @@ -13,6 +13,8 @@ import ( "os" "time" "unsafe" + + "github.com/jfraeys/fetch_ml/internal/domain" ) // UseNativeQueue controls whether to use C++ binary queue index. @@ -330,7 +332,7 @@ func (q *NativeQueue) RetryTask(task *Task) error { return errors.New("failed to retry task") } - RecordTaskRetry(task.JobName, ErrorUnknown) + RecordTaskRetry(task.JobName, domain.FailureUnknown) return nil } diff --git a/internal/worker/native_bridge_libs.go b/internal/worker/native_bridge_libs.go index faac945..6e5cae2 100644 --- a/internal/worker/native_bridge_libs.go +++ b/internal/worker/native_bridge_libs.go @@ -11,6 +11,8 @@ import "C" import ( "errors" "unsafe" + + "github.com/jfraeys/fetch_ml/internal/manifest" ) // dirOverallSHA256HexNative implementation with native library. @@ -46,3 +48,13 @@ func GetSIMDImplName() string { func HasSIMDSHA256() bool { return C.fh_has_simd_sha256() == 1 } + +// ScanArtifactsNative falls back to Go implementation. +func ScanArtifactsNative(runDir string) (*manifest.Artifacts, error) { + return ScanArtifacts(runDir) +} + +// ExtractTarGzNative falls back to Go implementation. +func ExtractTarGzNative(archivePath, dstDir string) error { + return ExtractTarGz(archivePath, dstDir) +}