// Package helpers provides shared utilities for WebSocket handlers. package helpers import ( "context" "fmt" "time" "github.com/jfraeys/fetch_ml/internal/experiment" "github.com/jfraeys/fetch_ml/internal/logging" "github.com/jfraeys/fetch_ml/internal/queue" "github.com/jfraeys/fetch_ml/internal/storage" "github.com/jfraeys/fetch_ml/internal/telemetry" ) // ExperimentSetupResult contains the result of experiment setup operations type ExperimentSetupResult struct { Err error Manifest *experiment.Manifest CommitIDStr string } // RunExperimentSetup performs the common experiment setup operations: // create experiment dir, write metadata, ensure minimal files, generate manifest. // Returns the commitID string and any error that occurred. func RunExperimentSetup( logger *logging.Logger, expMgr *experiment.Manager, commitID []byte, jobName string, userName string, ) (string, error) { commitIDStr := fmt.Sprintf("%x", commitID) if _, err := telemetry.ExecWithMetrics( logger, "experiment.create", 50*time.Millisecond, func() (string, error) { return "", expMgr.CreateExperiment(commitIDStr) }, ); err != nil { logger.Error("failed to create experiment directory", "error", err) return "", fmt.Errorf("failed to create experiment directory: %w", err) } meta := &experiment.Metadata{ CommitID: commitIDStr, JobName: jobName, User: userName, Timestamp: time.Now().Unix(), } if _, err := telemetry.ExecWithMetrics( logger, "experiment.write_metadata", 50*time.Millisecond, func() (string, error) { return "", expMgr.WriteMetadata(meta) }, ); err != nil { logger.Error("failed to save experiment metadata", "error", err) return "", fmt.Errorf("failed to save experiment metadata: %w", err) } if _, err := telemetry.ExecWithMetrics( logger, "experiment.ensure_minimal_files", 50*time.Millisecond, func() (string, error) { return "", EnsureMinimalExperimentFiles(expMgr, commitIDStr) }, ); err != nil { logger.Error("failed to ensure minimal experiment files", "error", err) return "", fmt.Errorf("failed to initialize experiment files: %w", err) } if _, err := telemetry.ExecWithMetrics( logger, "experiment.generate_manifest", 100*time.Millisecond, func() (string, error) { manifest, err := expMgr.GenerateManifest(commitIDStr) if err != nil { return "", fmt.Errorf("failed to generate manifest: %w", err) } return "", expMgr.WriteManifest(manifest) }, ); err != nil { logger.Error("failed to generate/write manifest", "error", err) return "", fmt.Errorf("failed to generate content integrity manifest: %w", err) } return commitIDStr, nil } // RunExperimentSetupWithoutManifest performs experiment setup without manifest generation. // Used for jobs with args/note where manifest generation is deferred. func RunExperimentSetupWithoutManifest( logger *logging.Logger, expMgr *experiment.Manager, commitID []byte, jobName string, userName string, ) (string, error) { commitIDStr := fmt.Sprintf("%x", commitID) if _, err := telemetry.ExecWithMetrics( logger, "experiment.create", 50*time.Millisecond, func() (string, error) { return "", expMgr.CreateExperiment(commitIDStr) }, ); err != nil { logger.Error("failed to create experiment directory", "error", err) return "", fmt.Errorf("failed to create experiment directory: %w", err) } meta := &experiment.Metadata{ CommitID: commitIDStr, JobName: jobName, User: userName, Timestamp: time.Now().Unix(), } if _, err := telemetry.ExecWithMetrics( logger, "experiment.write_metadata", 50*time.Millisecond, func() (string, error) { return "", expMgr.WriteMetadata(meta) }, ); err != nil { logger.Error("failed to save experiment metadata", "error", err) return "", fmt.Errorf("failed to save experiment metadata: %w", err) } if _, err := telemetry.ExecWithMetrics( logger, "experiment.ensure_minimal_files", 50*time.Millisecond, func() (string, error) { return "", EnsureMinimalExperimentFiles(expMgr, commitIDStr) }, ); err != nil { logger.Error("failed to ensure minimal experiment files", "error", err) return "", fmt.Errorf("failed to initialize experiment files: %w", err) } return commitIDStr, nil } // UpsertExperimentDBAsync upserts experiment data to the database asynchronously. // This is a fire-and-forget operation that runs in a goroutine. func UpsertExperimentDBAsync( logger *logging.Logger, db *storage.DB, commitIDStr string, jobName string, userName string, ) { if db == nil { return } go func() { ctx, cancel := context.WithTimeout(context.Background(), 3*time.Second) defer cancel() exp := &storage.Experiment{ID: commitIDStr, Name: jobName, Status: "pending", UserID: userName} if _, err := telemetry.ExecWithMetrics(logger, "db.experiments.upsert", 50*time.Millisecond, func() (string, error) { return "", db.UpsertExperiment(ctx, exp) }); err != nil { logger.Error("failed to upsert experiment row", "error", err) } }() } // TaskEnqueueResult contains the result of task enqueueing type TaskEnqueueResult struct { Err error TaskID string } // BuildTaskMetadata creates the standard task metadata map. func BuildTaskMetadata( commitIDStr, datasetID, paramsHash string, prov map[string]string, ) map[string]string { meta := map[string]string{ "commit_id": commitIDStr, "dataset_id": datasetID, "params_hash": paramsHash, } for k, v := range prov { if v != "" { meta[k] = v } } return meta } // BuildSnapshotTaskMetadata creates task metadata for snapshot jobs. func BuildSnapshotTaskMetadata( commitIDStr, snapshotSHA string, prov map[string]string, ) map[string]string { meta := map[string]string{ "commit_id": commitIDStr, "snapshot_sha256": snapshotSHA, } for k, v := range prov { if v != "" { meta[k] = v } } return meta } // ApplyResourceRequest applies resource request to a task. func ApplyResourceRequest(task *queue.Task, resources *ResourceRequest) { if resources != nil { task.CPU = resources.CPU task.MemoryGB = resources.MemoryGB task.GPU = resources.GPU task.GPUMemory = resources.GPUMemory } }