diff --git a/internal/queue/errors.go b/internal/queue/errors.go index aa0b030..d983a86 100644 --- a/internal/queue/errors.go +++ b/internal/queue/errors.go @@ -4,10 +4,155 @@ package queue import ( "errors" "fmt" + "os" "strings" + "syscall" ) -// ErrorCategory represents the type of error encountered +// FailureClass represents the classification of a task failure +// Used to determine appropriate retry policy and user guidance +type FailureClass string + +const ( + FailureInfrastructure FailureClass = "infrastructure" // OOM kill, SIGKILL, node failure + FailureCode FailureClass = "code" // non-zero exit, exception, assertion + FailureData FailureClass = "data" // hash mismatch, dataset unreachable + FailureResource FailureClass = "resource" // GPU OOM, disk full, timeout + FailureUnknown FailureClass = "unknown" // cannot classify +) + +// ClassifyFailure determines the failure class from exit signals, codes, and log output +func ClassifyFailure(exitCode int, signal os.Signal, logTail string) FailureClass { + logLower := strings.ToLower(logTail) + + // Killed by OS — infrastructure failure + if signal == syscall.SIGKILL { + return FailureInfrastructure + } + + // CUDA OOM or GPU resource issues + if strings.Contains(logLower, "cuda out of memory") || + strings.Contains(logLower, "cuda error") || + strings.Contains(logLower, "gpu oom") { + return FailureResource + } + + // General OOM (non-GPU) — infrastructure + if strings.Contains(logLower, "out of memory") || + strings.Contains(logLower, "oom") || + strings.Contains(logLower, "cannot allocate memory") { + return FailureInfrastructure + } + + // Dataset hash check failed — data failure + if strings.Contains(logLower, "hash mismatch") || + strings.Contains(logLower, "checksum failed") || + strings.Contains(logLower, "dataset not found") || + strings.Contains(logLower, "dataset unreachable") { + return FailureData + } + + // Disk/resource exhaustion + if strings.Contains(logLower, "no space left") || + strings.Contains(logLower, "disk full") || + strings.Contains(logLower, "disk quota exceeded") { + return FailureResource + } + + // Timeout — resource (time budget exceeded) + if strings.Contains(logLower, "timeout") || + strings.Contains(logLower, "deadline exceeded") || + strings.Contains(logLower, "context deadline") { + return FailureResource + } + + // Network issues — infrastructure + if strings.Contains(logLower, "connection refused") || + strings.Contains(logLower, "connection reset") || + strings.Contains(logLower, "no route to host") || + strings.Contains(logLower, "network unreachable") { + return FailureInfrastructure + } + + // Non-zero exit without specific signal — code failure + if exitCode != 0 { + return FailureCode + } + + return FailureUnknown +} + +// FailureInfo contains complete failure context for the manifest +type FailureInfo struct { + Class FailureClass `json:"class"` + ExitCode int `json:"exit_code,omitempty"` + Signal string `json:"signal,omitempty"` + LogTail string `json:"log_tail,omitempty"` + Suggestion string `json:"suggestion,omitempty"` + AutoRetried bool `json:"auto_retried,omitempty"` + RetryCount int `json:"retry_count,omitempty"` + RetryCap int `json:"retry_cap,omitempty"` + ClassifiedAt string `json:"classified_at,omitempty"` + Context map[string]string `json:"context,omitempty"` +} + +// GetFailureSuggestion returns user guidance based on failure class +func GetFailureSuggestion(class FailureClass, logTail string) string { + switch class { + case FailureInfrastructure: + return "Infrastructure failure (node died, OOM kill). Auto-retry in progress." + case FailureCode: + return "Code error in training script. Fix before resubmitting." + case FailureData: + return "Data verification failed. Check dataset accessibility and hashes." + case FailureResource: + if strings.Contains(strings.ToLower(logTail), "cuda") { + return "GPU OOM. Increase --gpu-memory or use smaller batch size." + } + if strings.Contains(strings.ToLower(logTail), "disk") { + return "Disk full. Clean up storage or request more space." + } + return "Resource exhausted. Try with larger allocation or reduced load." + default: + return "Unknown failure. Review logs and contact support if persistent." + } +} + +// ShouldAutoRetry determines if a failure class should auto-retry +// infrastructure: 3 retries transparent +// resource: 1 retry with backoff +// unknown: 1 retry (conservative - was retryable in old system) +// others: never auto-retry +func ShouldAutoRetry(class FailureClass, retryCount int) bool { + switch class { + case FailureInfrastructure: + return retryCount < 3 + case FailureResource: + return retryCount < 1 + case FailureUnknown: + // Unknown failures get 1 retry attempt (conservative, matches old behavior) + return retryCount < 1 + default: + // code, data failures never auto-retry + return false + } +} + +// RetryDelayForClass returns appropriate backoff for the failure class +func RetryDelayForClass(class FailureClass, retryCount int) int { + switch class { + case FailureInfrastructure: + // Immediate retry for infrastructure + return 0 + case FailureResource: + // Short backoff for resource issues + return 30 + default: + return 0 + } +} + +// ErrorCategory represents the type of error encountered (DEPRECATED: use FailureClass) type ErrorCategory string // Error categories for task classification and retry logic @@ -51,124 +196,53 @@ func NewTaskError(category ErrorCategory, message string, cause error) *TaskErro } } -// ClassifyError categorizes an error for retry logic +// ClassifyError categorizes an error for retry logic (DEPRECATED: use classifyFailure) +// This function now delegates to the more accurate classifyFailure func ClassifyError(err error) ErrorCategory { if err == nil { return ErrorUnknown } - // Check if already classified + // Check if already classified as TaskError var taskErr *TaskError if errors.As(err, &taskErr) { return taskErr.Category } - errStr := strings.ToLower(err.Error()) + // Delegate to new FailureClass classification + failureClass := ClassifyFailure(0, nil, err.Error()) - // Network errors (retryable) - networkIndicators := []string{ - "connection refused", - "connection reset", - "connection timeout", - "no route to host", - "network unreachable", - "temporary failure", - "dns", - "dial tcp", - "i/o timeout", + // Map FailureClass back to ErrorCategory for backward compatibility + switch failureClass { + case FailureInfrastructure: + return ErrorNetwork + case FailureCode: + return ErrorPermanent + case FailureData: + return ErrorValidation + case FailureResource: + return ErrorResource + default: + return ErrorUnknown } - for _, indicator := range networkIndicators { - if strings.Contains(errStr, indicator) { - return ErrorNetwork - } - } - - // Resource errors (retryable after delay) - resourceIndicators := []string{ - "out of memory", - "oom", - "no space left", - "disk full", - "resource temporarily unavailable", - "too many open files", - "cannot allocate memory", - } - for _, indicator := range resourceIndicators { - if strings.Contains(errStr, indicator) { - return ErrorResource - } - } - - // Rate limiting (retryable with backoff) - rateLimitIndicators := []string{ - "rate limit", - "too many requests", - "throttle", - "quota exceeded", - "429", - } - for _, indicator := range rateLimitIndicators { - if strings.Contains(errStr, indicator) { - return ErrorRateLimit - } - } - - // Timeout errors (retryable) - timeoutIndicators := []string{ - "timeout", - "deadline exceeded", - "context deadline", - } - for _, indicator := range timeoutIndicators { - if strings.Contains(errStr, indicator) { - return ErrorTimeout - } - } - - // Authentication errors (not retryable) - authIndicators := []string{ - "unauthorized", - "forbidden", - "authentication failed", - "invalid credentials", - "access denied", - "401", - "403", - } - for _, indicator := range authIndicators { - if strings.Contains(errStr, indicator) { - return ErrorAuth - } - } - - // Validation errors (not retryable) - validationIndicators := []string{ - "invalid input", - "validation failed", - "bad request", - "malformed", - "400", - } - for _, indicator := range validationIndicators { - if strings.Contains(errStr, indicator) { - return ErrorValidation - } - } - - // Default to unknown - return ErrorUnknown } // IsRetryable determines if an error category should be retried +// Now delegates to ShouldAutoRetry with FailureClass mapping func IsRetryable(category ErrorCategory) bool { + // Map ErrorCategory to FailureClass + var failureClass FailureClass switch category { - case ErrorNetwork, ErrorResource, ErrorRateLimit, ErrorTimeout, ErrorUnknown: - return true + case ErrorNetwork: + failureClass = FailureInfrastructure + case ErrorResource, ErrorTimeout: + failureClass = FailureResource case ErrorAuth, ErrorValidation, ErrorPermanent: - return false + failureClass = FailureCode default: - return false + failureClass = FailureUnknown } + return ShouldAutoRetry(failureClass, 0) } // GetUserMessage returns a user-friendly error message with suggestions @@ -195,32 +269,17 @@ func GetUserMessage(category ErrorCategory, err error) string { } // RetryDelay calculates the retry delay based on error category and retry count +// Now delegates to RetryDelayForClass with FailureClass mapping func RetryDelay(category ErrorCategory, retryCount int) int { + // Map ErrorCategory to FailureClass + var failureClass FailureClass switch category { - case ErrorRateLimit: - // Longer backoff for rate limits - return intMin(300, 10*(1< +import "C" + +import ( + "errors" + "os" + "time" + "unsafe" +) + +// UseNativeQueue controls whether to use C++ binary queue index. +// Set FETCHML_NATIVE_LIBS=1 to enable native libraries. +var UseNativeQueue = os.Getenv("FETCHML_NATIVE_LIBS") == "1" + +// NativeQueue wraps the C++ binary queue index for high-performance +// task operations. It replaces JSON-based filesystem operations with +// a memory-mapped binary format. +type NativeQueue struct { + handle *C.qi_index_t + root string +} + +// NewNativeQueue creates a new native binary queue index. +// Falls back to standard FilesystemQueue if native libs unavailable. +func NewNativeQueue(root string) (Backend, error) { + if !UseNativeQueue { + return NewFilesystemQueue(root) + } + + croot := C.CString(root) + defer C.free(unsafe.Pointer(croot)) + + handle := C.qi_open(croot) + if handle == nil { + return nil, errors.New("failed to open native queue index") + } + + return &NativeQueue{ + handle: handle, + root: root, + }, nil +} + +// Close closes the native queue index and syncs to disk. +func (q *NativeQueue) Close() error { + if q.handle != nil { + C.qi_close(q.handle) + q.handle = nil + } + return nil +} + +// AddTask adds a task to the native queue index. +func (q *NativeQueue) AddTask(task *Task) error { + if q.handle == nil { + return errors.New("queue not open") + } + + cTask := taskToC(task) + + rc := C.qi_add_tasks(q.handle, &cTask, 1) + if rc < 0 { + err := C.qi_last_error(q.handle) + if err != nil { + return errors.New(C.GoString(err)) + } + return errors.New("failed to add task") + } + + TasksQueued.Inc() + if depth, derr := q.QueueDepth(); derr == nil { + UpdateQueueDepth(depth) + } + return nil +} + +// GetNextTask retrieves and claims the highest priority task. +func (q *NativeQueue) GetNextTask() (*Task, error) { + return q.claimNext("", 0, false) +} + +// PeekNextTask returns the highest priority task without claiming. +func (q *NativeQueue) PeekNextTask() (*Task, error) { + return q.claimNext("", 0, true) +} + +// GetNextTaskWithLease retrieves a task with a lease for a specific worker. +func (q *NativeQueue) GetNextTaskWithLease(workerID string, leaseDuration time.Duration) (*Task, error) { + return q.claimNext(workerID, leaseDuration, false) +} + +// claimNext retrieves the next task from the priority queue. +func (q *NativeQueue) claimNext(workerID string, leaseDuration time.Duration, peek bool) (*Task, error) { + if q.handle == nil { + return nil, errors.New("queue not open") + } + + var cTask C.qi_task_t + + if peek { + rc := C.qi_peek_next(q.handle, &cTask) + if rc != 0 { + return nil, nil // No tasks available + } + } else { + var count C.uint32_t + rc := C.qi_get_next_batch(q.handle, &cTask, 1, &count) + if rc != 0 || count == 0 { + return nil, nil // No tasks available + } + } + + task := cToTask(&cTask) + + if !peek && workerID != "" { + task.LeasedBy = workerID + exp := time.Now().UTC().Add(leaseDuration) + task.LeaseExpiry = &exp + } + + if depth, derr := q.QueueDepth(); derr == nil { + UpdateQueueDepth(depth) + } + + return task, nil +} + +// GetTask retrieves a task by ID from the native index. +func (q *NativeQueue) GetTask(taskID string) (*Task, error) { + if q.handle == nil { + return nil, errors.New("queue not open") + } + + cID := C.CString(taskID) + defer C.free(unsafe.Pointer(cID)) + + var cTask C.qi_task_t + rc := C.qi_get_task_by_id(q.handle, cID, &cTask) + if rc != 0 { + return nil, os.ErrNotExist + } + + return cToTask(&cTask), nil +} + +// GetAllTasks retrieves all tasks from the native index. +func (q *NativeQueue) GetAllTasks() ([]*Task, error) { + if q.handle == nil { + return nil, errors.New("queue not open") + } + + var cTasks *C.qi_task_t + var count C.size_t + + rc := C.qi_get_all_tasks(q.handle, &cTasks, &count) + if rc != 0 { + err := C.qi_last_error(q.handle) + if err != nil { + return nil, errors.New(C.GoString(err)) + } + return nil, errors.New("failed to get tasks") + } + + if count == 0 { + return []*Task{}, nil + } + + tasks := make([]*Task, count) + for i := 0; i < int(count); i++ { + // Access array element using pointer arithmetic + taskPtr := (*C.qi_task_t)(unsafe.Pointer(uintptr(unsafe.Pointer(cTasks)) + uintptr(i)*unsafe.Sizeof(C.qi_task_t{}))) + tasks[i] = cToTask(taskPtr) + } + + C.qi_free_task_array(cTasks) + return tasks, nil +} + +// UpdateTask updates a task in the native index. +func (q *NativeQueue) UpdateTask(task *Task) error { + if q.handle == nil { + return errors.New("queue not open") + } + + cTask := taskToC(task) + + rc := C.qi_update_tasks(q.handle, &cTask, 1) + if rc != 0 { + err := C.qi_last_error(q.handle) + if err != nil { + return errors.New(C.GoString(err)) + } + return errors.New("failed to update task") + } + + if depth, derr := q.QueueDepth(); derr == nil { + UpdateQueueDepth(depth) + } + return nil +} + +// UpdateTaskWithMetrics updates a task with metrics (same as UpdateTask for native). +func (q *NativeQueue) UpdateTaskWithMetrics(task *Task, _ string) error { + return q.UpdateTask(task) +} + +// CancelTask cancels a task by updating its status. +func (q *NativeQueue) CancelTask(taskID string) error { + task, err := q.GetTask(taskID) + if err != nil { + return err + } + task.Status = "cancelled" + now := time.Now().UTC() + task.EndedAt = &now + return q.UpdateTask(task) +} + +// GetTaskByName retrieves the most recent task with the given job name. +func (q *NativeQueue) GetTaskByName(jobName string) (*Task, error) { + tasks, err := q.GetAllTasks() + if err != nil { + return nil, err + } + + var best *Task + for _, t := range tasks { + if t == nil || t.JobName != jobName { + continue + } + if best == nil || t.CreatedAt.After(best.CreatedAt) { + best = t + } + } + + if best == nil { + return nil, os.ErrNotExist + } + return best, nil +} + +// QueueDepth returns the number of queued tasks. +func (q *NativeQueue) QueueDepth() (int64, error) { + if q.handle == nil { + return 0, errors.New("queue not open") + } + cStatus := C.CString("queued") + defer C.free(unsafe.Pointer(cStatus)) + count := C.qi_get_task_count(q.handle, cStatus) + return int64(count), nil +} + +// Helper functions for C struct conversion + +func taskToC(task *Task) C.qi_task_t { + var cTask C.qi_task_t + + copyStringToCBuffer(task.ID, cTask.id[:], 64) + copyStringToCBuffer(task.JobName, cTask.job_name[:], 128) + + cTask.priority = C.int64_t(task.Priority) + cTask.created_at = C.int64_t(task.CreatedAt.UnixNano()) + + if task.NextRetry != nil { + cTask.next_retry = C.int64_t(task.NextRetry.UnixNano()) + } else { + cTask.next_retry = 0 + } + + copyStringToCBuffer(task.Status, cTask.status[:], 16) + cTask.retries = C.uint32_t(task.RetryCount) + + return cTask +} + +func cToTask(cTask *C.qi_task_t) *Task { + return &Task{ + ID: C.GoString(&cTask.id[0]), + JobName: C.GoString(&cTask.job_name[0]), + Priority: int64(cTask.priority), + CreatedAt: time.Unix(0, int64(cTask.created_at)), + Status: C.GoString(&cTask.status[0]), + RetryCount: int(cTask.retries), + } +} + +func copyStringToCBuffer(src string, dst []C.char, maxLen int) { + n := len(src) + if n > maxLen-1 { + n = maxLen - 1 + } + for i := 0; i < n; i++ { + dst[i] = C.char(src[i]) + } + dst[n] = 0 +} + +// Stub implementations for queue.Backend interface + +func (q *NativeQueue) GetNextTaskWithLeaseBlocking(workerID string, leaseDuration, blockTimeout time.Duration) (*Task, error) { + return nil, errors.New("blocking get not implemented for native queue") +} + +func (q *NativeQueue) RetryTask(task *Task) error { + if q.handle == nil { + return errors.New("queue not open") + } + + cID := C.CString(task.ID) + defer C.free(unsafe.Pointer(cID)) + + var nextRetry int64 + if task.NextRetry != nil { + nextRetry = task.NextRetry.UnixNano() + } + + rc := C.qi_retry_task(q.handle, cID, C.int64_t(nextRetry), C.uint32_t(task.MaxRetries)) + if rc != 0 { + err := C.qi_last_error(q.handle) + if err != nil { + return errors.New(C.GoString(err)) + } + return errors.New("failed to retry task") + } + + RecordTaskRetry(task.JobName, ErrorUnknown) + return nil +} + +func (q *NativeQueue) MoveToDeadLetterQueue(task *Task, reason string) error { + if q.handle == nil { + return errors.New("queue not open") + } + + cID := C.CString(task.ID) + defer C.free(unsafe.Pointer(cID)) + + cReason := C.CString(reason) + defer C.free(unsafe.Pointer(cReason)) + + rc := C.qi_move_to_dlq(q.handle, cID, cReason) + if rc != 0 { + err := C.qi_last_error(q.handle) + if err != nil { + return errors.New(C.GoString(err)) + } + return errors.New("failed to move task to DLQ") + } + + RecordDLQAddition(reason) + return nil +} + +func (q *NativeQueue) RenewLease(taskID, workerID string, leaseDuration time.Duration) error { + if q.handle == nil { + return errors.New("queue not open") + } + + cID := C.CString(taskID) + defer C.free(unsafe.Pointer(cID)) + + cWorker := C.CString(workerID) + defer C.free(unsafe.Pointer(cWorker)) + + expiry := time.Now().Add(leaseDuration).UnixNano() + + rc := C.qi_renew_lease(q.handle, cID, cWorker, C.int64_t(expiry)) + if rc != 0 { + err := C.qi_last_error(q.handle) + if err != nil { + return errors.New(C.GoString(err)) + } + return errors.New("failed to renew lease") + } + + RecordLeaseRenewal(workerID) + return nil +} + +func (q *NativeQueue) ReleaseLease(taskID, workerID string) error { + if q.handle == nil { + return errors.New("queue not open") + } + + cID := C.CString(taskID) + defer C.free(unsafe.Pointer(cID)) + + cWorker := C.CString(workerID) + defer C.free(unsafe.Pointer(cWorker)) + + rc := C.qi_release_lease(q.handle, cID, cWorker) + if rc != 0 { + err := C.qi_last_error(q.handle) + if err != nil { + return errors.New(C.GoString(err)) + } + return errors.New("failed to release lease") + } + + return nil +} + +func (q *NativeQueue) RecordMetric(_, _ string, _ float64) error { + return nil +} + +func (q *NativeQueue) Heartbeat(_ string) error { + return nil +} + +func (q *NativeQueue) SetWorkerPrewarmState(_ PrewarmState) error { return nil } +func (q *NativeQueue) ClearWorkerPrewarmState(_ string) error { return nil } +func (q *NativeQueue) GetWorkerPrewarmState(_ string) (*PrewarmState, error) { return nil, nil } +func (q *NativeQueue) GetAllWorkerPrewarmStates() ([]PrewarmState, error) { return nil, nil } +func (q *NativeQueue) SignalPrewarmGC() error { return nil } +func (q *NativeQueue) PrewarmGCRequestValue() (string, error) { return "", nil } diff --git a/internal/queue/native_queue_stub.go b/internal/queue/native_queue_stub.go new file mode 100644 index 0000000..fd50319 --- /dev/null +++ b/internal/queue/native_queue_stub.go @@ -0,0 +1,17 @@ +//go:build !native_libs +// +build !native_libs + +package queue + +import "errors" + +// UseNativeQueue is always false without native_libs build tag. +var UseNativeQueue = false + +// NativeQueue is not available without native_libs build tag. +type NativeQueue struct{} + +// NewNativeQueue returns an error without native_libs build tag. +func NewNativeQueue(root string) (Backend, error) { + return nil, errors.New("native queue requires native_libs build tag") +} diff --git a/internal/queue/task.go b/internal/queue/task.go index 9bcad75..8106380 100644 --- a/internal/queue/task.go +++ b/internal/queue/task.go @@ -57,10 +57,27 @@ type Task struct { LastError string `json:"last_error,omitempty"` // Last error encountered NextRetry *time.Time `json:"next_retry,omitempty"` // When to retry next (exponential backoff) + // Attempt tracking - complete history of all execution attempts + Attempts []Attempt `json:"attempts,omitempty"` + // Optional tracking configuration for this task Tracking *TrackingConfig `json:"tracking,omitempty"` } +// Attempt represents a single execution attempt of a task +type Attempt struct { + Attempt int `json:"attempt"` // Attempt number (1-indexed) + StartedAt time.Time `json:"started_at"` // When attempt started + EndedAt *time.Time `json:"ended_at,omitempty"` // When attempt ended (if completed) + WorkerID string `json:"worker_id,omitempty"` // Which worker ran this attempt + Status string `json:"status"` // running, completed, failed + FailureClass FailureClass `json:"failure_class,omitempty"` // Failure classification (if failed) + ExitCode int `json:"exit_code,omitempty"` // Process exit code + Signal string `json:"signal,omitempty"` // Termination signal (if any) + Error string `json:"error,omitempty"` // Error message (if failed) + LogTail string `json:"log_tail,omitempty"` // Last N lines of log output +} + // TrackingConfig specifies experiment tracking tools to enable for a task. type TrackingConfig struct { MLflow *MLflowTrackingConfig `json:"mlflow,omitempty"` diff --git a/internal/worker/config.go b/internal/worker/config.go index 6e1fda7..041e488 100644 --- a/internal/worker/config.go +++ b/internal/worker/config.go @@ -253,7 +253,12 @@ func (c *Config) Validate() error { // Convert relative paths to absolute c.BasePath = config.ExpandPath(c.BasePath) if !filepath.IsAbs(c.BasePath) { - c.BasePath = filepath.Join(config.DefaultBasePath, c.BasePath) + // Resolve relative to current working directory, not DefaultBasePath + cwd, err := os.Getwd() + if err != nil { + return fmt.Errorf("failed to get current directory: %w", err) + } + c.BasePath = filepath.Join(cwd, c.BasePath) } } diff --git a/internal/worker/hash_selector.go b/internal/worker/hash_selector.go new file mode 100644 index 0000000..b735985 --- /dev/null +++ b/internal/worker/hash_selector.go @@ -0,0 +1,16 @@ +package worker + +// UseNativeLibs controls whether to use C++ implementations. +// Set FETCHML_NATIVE_LIBS=1 to enable native libraries. +// This is defined here so it's available regardless of build tags. +var UseNativeLibs = false + +// dirOverallSHA256Hex selects implementation based on toggle. +// This file has no CGo imports so it compiles even when CGO is disabled. +// The actual implementations are in native_bridge.go (native) and data_integrity.go (Go). +func dirOverallSHA256Hex(root string) (string, error) { + if !UseNativeLibs { + return dirOverallSHA256HexGo(root) + } + return dirOverallSHA256HexNative(root) +} diff --git a/internal/worker/native_bridge_libs.go b/internal/worker/native_bridge_libs.go new file mode 100644 index 0000000..faac945 --- /dev/null +++ b/internal/worker/native_bridge_libs.go @@ -0,0 +1,48 @@ +//go:build cgo && native_libs +// +build cgo,native_libs + +package worker + +// #cgo LDFLAGS: -L${SRCDIR}/../../native/build -Wl,-rpath,${SRCDIR}/../../native/build -ldataset_hash +// #include "../../native/dataset_hash/dataset_hash.h" +// #include +import "C" + +import ( + "errors" + "unsafe" +) + +// dirOverallSHA256HexNative implementation with native library. +func dirOverallSHA256HexNative(root string) (string, error) { + ctx := C.fh_init(0) // 0 = auto-detect threads + if ctx == nil { + return "", errors.New("failed to initialize native hash context") + } + defer C.fh_cleanup(ctx) + + croot := C.CString(root) + defer C.free(unsafe.Pointer(croot)) + + result := C.fh_hash_directory_combined(ctx, croot) + if result == nil { + err := C.fh_last_error(ctx) + if err != nil { + return "", errors.New(C.GoString(err)) + } + return "", errors.New("native hash failed") + } + defer C.fh_free_string(result) + + return C.GoString(result), nil +} + +// GetSIMDImplName returns the native SHA256 implementation name. +func GetSIMDImplName() string { + return C.GoString(C.fh_get_simd_impl_name()) +} + +// HasSIMDSHA256 returns true if SIMD SHA256 is available. +func HasSIMDSHA256() bool { + return C.fh_has_simd_sha256() == 1 +} diff --git a/internal/worker/native_bridge_nocgo.go b/internal/worker/native_bridge_nocgo.go new file mode 100644 index 0000000..10e4be1 --- /dev/null +++ b/internal/worker/native_bridge_nocgo.go @@ -0,0 +1,40 @@ +//go:build !cgo +// +build !cgo + +package worker + +import ( + "errors" + + "github.com/jfraeys/fetch_ml/internal/manifest" +) + +// dirOverallSHA256HexNative is not available without CGO. +func dirOverallSHA256HexNative(root string) (string, error) { + return "", errors.New("native hash requires CGO") +} + +// HashFilesBatchNative is not available without CGO. +func HashFilesBatchNative(paths []string) ([]string, error) { + return nil, errors.New("native batch hash requires CGO") +} + +// GetSIMDImplName returns "disabled" without CGO. +func GetSIMDImplName() string { + return "disabled (no CGO)" +} + +// HasSIMDSHA256 returns false without CGO. +func HasSIMDSHA256() bool { + return false +} + +// ScanArtifactsNative is disabled without CGO. +func ScanArtifactsNative(runDir string) (*manifest.Artifacts, error) { + return nil, errors.New("native artifact scanner requires CGO") +} + +// ExtractTarGzNative is disabled without CGO. +func ExtractTarGzNative(archivePath, dstDir string) error { + return errors.New("native tar.gz extractor requires CGO") +} diff --git a/internal/worker/runloop.go b/internal/worker/runloop.go index 3ced976..dbf72fc 100644 --- a/internal/worker/runloop.go +++ b/internal/worker/runloop.go @@ -340,12 +340,21 @@ func (w *Worker) executeTaskWithLease(task *queue.Task) { go w.heartbeatLoop(heartbeatCtx, task.ID) - // Update task status + // Update task status and record attempt start task.Status = "running" now := time.Now() task.StartedAt = &now task.WorkerID = w.id + // Record this attempt in the task history + attempt := queue.Attempt{ + Attempt: task.RetryCount + 1, // 1-indexed attempt number + StartedAt: now, + WorkerID: w.id, + Status: "running", + } + task.Attempts = append(task.Attempts, attempt) + // Phase 1 provenance capture: best-effort metadata enrichment before persisting the running state. w.recordTaskProvenance(taskCtx, task) @@ -411,13 +420,28 @@ func (w *Worker) executeTaskWithLease(task *queue.Task) { execErr = w.runJob(taskCtx, task, cudaVisible) }() - // Finalize task + // Finalize task and record attempt completion endTime := time.Now() task.EndedAt = &endTime + // Update the last attempt with completion info + if len(task.Attempts) > 0 { + lastIdx := len(task.Attempts) - 1 + task.Attempts[lastIdx].EndedAt = &endTime + } + if execErr != nil { task.Error = execErr.Error() + // Update last attempt with failure info + if len(task.Attempts) > 0 { + lastIdx := len(task.Attempts) - 1 + task.Attempts[lastIdx].Status = "failed" + task.Attempts[lastIdx].Error = execErr.Error() + task.Attempts[lastIdx].FailureClass = queue.ClassifyFailure(0, nil, execErr.Error()) + // TODO: Capture exit code and signal from actual execution + } + // Check if transient error (network, timeout, etc) if isTransientError(execErr) && task.RetryCount < task.MaxRetries { w.logger.Warn("task failed with transient error, will retry", @@ -431,6 +455,11 @@ func (w *Worker) executeTaskWithLease(task *queue.Task) { } } else { task.Status = "completed" + // Update last attempt with success + if len(task.Attempts) > 0 { + lastIdx := len(task.Attempts) - 1 + task.Attempts[lastIdx].Status = "completed" + } _ = w.queue.UpdateTaskWithMetrics(task, "final") }