- Add protocol buffer optimizations (internal/api/protocol.go) - Add filesystem queue backend (internal/queue/filesystem_queue.go) - Add run manifest support (internal/manifest/run_manifest.go) - Worker and jupyter task refinements - Exported test wrappers for benchmarking
226 lines
6.3 KiB
Go
226 lines
6.3 KiB
Go
package manifest
|
|
|
|
import (
|
|
"encoding/json"
|
|
"fmt"
|
|
"path/filepath"
|
|
"strings"
|
|
"time"
|
|
|
|
"github.com/jfraeys/fetch_ml/internal/fileutil"
|
|
)
|
|
|
|
const runManifestFilename = "run_manifest.json"
|
|
|
|
type Annotation struct {
|
|
Timestamp time.Time `json:"timestamp"`
|
|
Author string `json:"author,omitempty"`
|
|
Note string `json:"note"`
|
|
}
|
|
|
|
func (a *Annotation) UnmarshalJSON(data []byte) error {
|
|
type annotationWire struct {
|
|
Timestamp *time.Time `json:"timestamp,omitempty"`
|
|
TS *time.Time `json:"ts,omitempty"`
|
|
Author string `json:"author,omitempty"`
|
|
Note string `json:"note"`
|
|
}
|
|
var w annotationWire
|
|
if err := json.Unmarshal(data, &w); err != nil {
|
|
return err
|
|
}
|
|
if w.Timestamp != nil {
|
|
a.Timestamp = *w.Timestamp
|
|
} else if w.TS != nil {
|
|
a.Timestamp = *w.TS
|
|
}
|
|
a.Author = w.Author
|
|
a.Note = w.Note
|
|
return nil
|
|
}
|
|
|
|
type Narrative struct {
|
|
Hypothesis string `json:"hypothesis,omitempty"`
|
|
Context string `json:"context,omitempty"`
|
|
Intent string `json:"intent,omitempty"`
|
|
ExpectedOutcome string `json:"expected_outcome,omitempty"`
|
|
ParentRun string `json:"parent_run,omitempty"`
|
|
ExperimentGroup string `json:"experiment_group,omitempty"`
|
|
Tags []string `json:"tags,omitempty"`
|
|
}
|
|
|
|
type NarrativePatch struct {
|
|
Hypothesis *string `json:"hypothesis,omitempty"`
|
|
Context *string `json:"context,omitempty"`
|
|
Intent *string `json:"intent,omitempty"`
|
|
ExpectedOutcome *string `json:"expected_outcome,omitempty"`
|
|
ParentRun *string `json:"parent_run,omitempty"`
|
|
ExperimentGroup *string `json:"experiment_group,omitempty"`
|
|
Tags *[]string `json:"tags,omitempty"`
|
|
}
|
|
|
|
type ArtifactFile struct {
|
|
Path string `json:"path"`
|
|
SizeBytes int64 `json:"size_bytes"`
|
|
Modified time.Time `json:"modified"`
|
|
}
|
|
|
|
type Artifacts struct {
|
|
DiscoveryTime time.Time `json:"discovery_time"`
|
|
Files []ArtifactFile `json:"files,omitempty"`
|
|
TotalSizeBytes int64 `json:"total_size_bytes,omitempty"`
|
|
}
|
|
|
|
// RunManifest is a best-effort, self-contained provenance record for a run.
|
|
// It is written to <run_dir>/run_manifest.json.
|
|
type RunManifest struct {
|
|
RunID string `json:"run_id"`
|
|
TaskID string `json:"task_id"`
|
|
JobName string `json:"job_name"`
|
|
CreatedAt time.Time `json:"created_at"`
|
|
StartedAt time.Time `json:"started_at,omitempty"`
|
|
EndedAt time.Time `json:"ended_at,omitempty"`
|
|
|
|
Annotations []Annotation `json:"annotations,omitempty"`
|
|
Narrative *Narrative `json:"narrative,omitempty"`
|
|
Artifacts *Artifacts `json:"artifacts,omitempty"`
|
|
|
|
CommitID string `json:"commit_id,omitempty"`
|
|
ExperimentManifestSHA string `json:"experiment_manifest_sha,omitempty"`
|
|
DepsManifestName string `json:"deps_manifest_name,omitempty"`
|
|
DepsManifestSHA string `json:"deps_manifest_sha,omitempty"`
|
|
TrainScriptPath string `json:"train_script_path,omitempty"`
|
|
|
|
WorkerVersion string `json:"worker_version,omitempty"`
|
|
PodmanImage string `json:"podman_image,omitempty"`
|
|
ImageDigest string `json:"image_digest,omitempty"`
|
|
|
|
SnapshotID string `json:"snapshot_id,omitempty"`
|
|
SnapshotSHA256 string `json:"snapshot_sha256,omitempty"`
|
|
|
|
Command string `json:"command,omitempty"`
|
|
Args string `json:"args,omitempty"`
|
|
ExitCode *int `json:"exit_code,omitempty"`
|
|
Error string `json:"error,omitempty"`
|
|
|
|
StagingDurationMS int64 `json:"staging_duration_ms,omitempty"`
|
|
ExecutionDurationMS int64 `json:"execution_duration_ms,omitempty"`
|
|
FinalizeDurationMS int64 `json:"finalize_duration_ms,omitempty"`
|
|
TotalDurationMS int64 `json:"total_duration_ms,omitempty"`
|
|
|
|
GPUDevices []string `json:"gpu_devices,omitempty"`
|
|
WorkerHost string `json:"worker_host,omitempty"`
|
|
Metadata map[string]string `json:"metadata,omitempty"`
|
|
}
|
|
|
|
func NewRunManifest(runID, taskID, jobName string, createdAt time.Time) *RunManifest {
|
|
m := &RunManifest{
|
|
RunID: runID,
|
|
TaskID: taskID,
|
|
JobName: jobName,
|
|
CreatedAt: createdAt,
|
|
Metadata: make(map[string]string),
|
|
}
|
|
return m
|
|
}
|
|
|
|
func ManifestPath(dir string) string {
|
|
return filepath.Join(dir, runManifestFilename)
|
|
}
|
|
|
|
func (m *RunManifest) WriteToDir(dir string) error {
|
|
if m == nil {
|
|
return fmt.Errorf("run manifest is nil")
|
|
}
|
|
data, err := json.MarshalIndent(m, "", " ")
|
|
if err != nil {
|
|
return fmt.Errorf("marshal run manifest: %w", err)
|
|
}
|
|
if err := fileutil.SecureFileWrite(ManifestPath(dir), data, 0640); err != nil {
|
|
return fmt.Errorf("write run manifest: %w", err)
|
|
}
|
|
return nil
|
|
}
|
|
|
|
func LoadFromDir(dir string) (*RunManifest, error) {
|
|
data, err := fileutil.SecureFileRead(ManifestPath(dir))
|
|
if err != nil {
|
|
return nil, fmt.Errorf("read run manifest: %w", err)
|
|
}
|
|
var m RunManifest
|
|
if err := json.Unmarshal(data, &m); err != nil {
|
|
return nil, fmt.Errorf("parse run manifest: %w", err)
|
|
}
|
|
return &m, nil
|
|
}
|
|
|
|
func (m *RunManifest) MarkStarted(t time.Time) {
|
|
m.StartedAt = t
|
|
}
|
|
|
|
func (m *RunManifest) MarkFinished(t time.Time, exitCode *int, execErr error) {
|
|
m.EndedAt = t
|
|
m.ExitCode = exitCode
|
|
if execErr != nil {
|
|
m.Error = execErr.Error()
|
|
} else {
|
|
m.Error = ""
|
|
}
|
|
if !m.StartedAt.IsZero() {
|
|
m.TotalDurationMS = m.EndedAt.Sub(m.StartedAt).Milliseconds()
|
|
}
|
|
}
|
|
|
|
func (m *RunManifest) AddAnnotation(ts time.Time, author, note string) {
|
|
if m == nil {
|
|
return
|
|
}
|
|
n := strings.TrimSpace(note)
|
|
if n == "" {
|
|
return
|
|
}
|
|
a := Annotation{
|
|
Timestamp: ts,
|
|
Author: strings.TrimSpace(author),
|
|
Note: n,
|
|
}
|
|
m.Annotations = append(m.Annotations, a)
|
|
}
|
|
|
|
func (m *RunManifest) ApplyNarrativePatch(p NarrativePatch) {
|
|
if m == nil {
|
|
return
|
|
}
|
|
if m.Narrative == nil {
|
|
m.Narrative = &Narrative{}
|
|
}
|
|
if p.Hypothesis != nil {
|
|
m.Narrative.Hypothesis = strings.TrimSpace(*p.Hypothesis)
|
|
}
|
|
if p.Context != nil {
|
|
m.Narrative.Context = strings.TrimSpace(*p.Context)
|
|
}
|
|
if p.Intent != nil {
|
|
m.Narrative.Intent = strings.TrimSpace(*p.Intent)
|
|
}
|
|
if p.ExpectedOutcome != nil {
|
|
m.Narrative.ExpectedOutcome = strings.TrimSpace(*p.ExpectedOutcome)
|
|
}
|
|
if p.ParentRun != nil {
|
|
m.Narrative.ParentRun = strings.TrimSpace(*p.ParentRun)
|
|
}
|
|
if p.ExperimentGroup != nil {
|
|
m.Narrative.ExperimentGroup = strings.TrimSpace(*p.ExperimentGroup)
|
|
}
|
|
if p.Tags != nil {
|
|
clean := make([]string, 0, len(*p.Tags))
|
|
for _, t := range *p.Tags {
|
|
t = strings.TrimSpace(t)
|
|
if t == "" {
|
|
continue
|
|
}
|
|
clean = append(clean, t)
|
|
}
|
|
m.Narrative.Tags = clean
|
|
}
|
|
}
|