From c0eeeda940d294b16eb7708022e957449c1e57f8 Mon Sep 17 00:00:00 2001 From: Jeremie Fraeys Date: Mon, 5 Jan 2026 12:37:34 -0500 Subject: [PATCH] feat(experiment): improve experiment lifecycle and update first-experiment guide --- docs/src/first-experiment.md | 44 +++--- internal/experiment/manager.go | 259 ++++++++++++++++++++++++++++++++- 2 files changed, 276 insertions(+), 27 deletions(-) diff --git a/docs/src/first-experiment.md b/docs/src/first-experiment.md index 5ee427d..3c609fa 100644 --- a/docs/src/first-experiment.md +++ b/docs/src/first-experiment.md @@ -57,7 +57,7 @@ if __name__ == '__main__': ```bash # Submit experiment -curl -X POST http://localhost:9101/api/v1/jobs \ +curl -X POST http://localhost:8080/api/v1/jobs \ -H "Content-Type: application/json" \ -H "X-API-Key: your-api-key" \ -d '{ @@ -76,30 +76,30 @@ curl -X POST http://localhost:9101/api/v1/jobs \ ```bash # Check job status curl -H "X-API-Key: your-api-key" \ - http://localhost:9101/api/v1/jobs/first-experiment + http://localhost:8080/api/v1/jobs/first-experiment # List all jobs curl -H "X-API-Key: your-api-key" \ - http://localhost:9101/api/v1/jobs + http://localhost:8080/api/v1/jobs # Get job metrics curl -H "X-API-Key: your-api-key" \ - http://localhost:9101/api/v1/jobs/first-experiment/metrics + http://localhost:8080/api/v1/jobs/first-experiment/metrics ``` ### 4. Use CLI ```bash # Submit with CLI -cd cli && zig build dev -./cli/zig-out/dev/ml submit \ +cd cli && zig build --release=fast +./cli/zig-out/bin/ml submit \ --name "cli-experiment" \ --args "--epochs 15 --lr 0.005" \ - --server http://localhost:9101 + --server http://localhost:8080 # Monitor with CLI -./cli/zig-out/dev/ml list-jobs --server http://localhost:9101 -./cli/zig-out/dev/ml job-status cli-experiment --server http://localhost:9101 +./cli/zig-out/bin/ml list-jobs --server http://localhost:8080 +./cli/zig-out/bin/ml job-status cli-experiment --server http://localhost:8080 ``` ## Advanced Experiment @@ -109,7 +109,7 @@ cd cli && zig build dev ```bash # Submit multiple experiments for lr in 0.001 0.01 0.1; do - curl -X POST http://localhost:9101/api/v1/jobs \ + curl -X POST http://localhost:8080/api/v1/jobs \ -H "Content-Type: application/json" \ -H "X-API-Key: your-api-key" \ -d "{ @@ -124,7 +124,7 @@ done ```bash # Submit batch job -curl -X POST http://localhost:9101/api/v1/jobs \ +curl -X POST http://localhost:8080/api/v1/jobs \ -H "Content-Type: application/json" \ -H "X-API-Key: your-api-key" \ -d '{ @@ -142,11 +142,11 @@ curl -X POST http://localhost:9101/api/v1/jobs \ ```bash # Download results curl -H "X-API-Key: your-api-key" \ - http://localhost:9101/api/v1/jobs/first-experiment/results + http://localhost:8080/api/v1/jobs/first-experiment/results # View job details curl -H "X-API-Key: your-api-key" \ - http://localhost:9101/api/v1/jobs/first-experiment | jq . + http://localhost:8080/api/v1/jobs/first-experiment | jq . ``` ### Result Format @@ -197,10 +197,10 @@ curl -H "X-API-Key: your-api-key" \ ```bash # Check failed jobs curl -H "X-API-Key: your-api-key" \ - "http://localhost:9101/api/v1/jobs?status=failed" + "http://localhost:8080/api/v1/jobs?status=failed" # Retry failed job -curl -X POST http://localhost:9101/api/v1/jobs \ +curl -X POST http://localhost:8080/api/v1/jobs \ -H "Content-Type: application/json" \ -H "X-API-Key: your-api-key" \ -d '{ @@ -210,19 +210,19 @@ curl -X POST http://localhost:9101/api/v1/jobs \ }' ``` -## ## Related Documentation +## Related Documentation -- [Development Setup (see [Development Setup](development-setup.md))](development-setup.md) - Local development environment -- [Testing Guide (see [Testing Guide](testing.md))](testing.md) - Test your experiments -- [Production Deployment (see [Deployment](deployment.md))](deployment.md) - Scale to production -- [Monitoring](production-monitoring.md) - Track experiment performance +- [Quick Start](quick-start.md) - Local development environment and dev stack +- [Testing Guide](testing.md) - Test your experiments +- [Deployment](deployment.md) - Scale to production +- [Performance Monitoring](performance-monitoring.md) - Track experiment performance ## Troubleshooting **Job stuck in pending?** -- Check worker status: `curl /api/v1/workers` +- Check worker status: `curl http://localhost:8080/api/v1/workers` - Verify resources: `docker stats` -- Check logs: `docker-compose logs api-server` +- Check logs: `docker logs ml-experiments-api` **Job failed?** - Check error message: `curl /api/v1/jobs/job-id` diff --git a/internal/experiment/manager.go b/internal/experiment/manager.go index a3a7cf0..444fa9d 100644 --- a/internal/experiment/manager.go +++ b/internal/experiment/manager.go @@ -2,16 +2,30 @@ package experiment import ( + "crypto/sha256" "encoding/binary" + "encoding/hex" + "encoding/json" "fmt" + "io" "math" "os" "path/filepath" + "strings" "time" + "github.com/jfraeys/fetch_ml/internal/container" "github.com/jfraeys/fetch_ml/internal/fileutil" ) +// Manifest represents a content integrity manifest for experiment files +type Manifest struct { + CommitID string `json:"commit_id"` + Files map[string]string `json:"files"` // relative path -> sha256 hex + OverallSHA string `json:"overall_sha"` // sha256 of concatenated file hashes + Timestamp int64 `json:"timestamp"` +} + // Metadata represents experiment metadata stored in meta.bin type Metadata struct { CommitID string @@ -32,6 +46,13 @@ func NewManager(basePath string) *Manager { } } +func (m *Manager) BasePath() string { + if m == nil { + return "" + } + return m.basePath +} + // Initialize ensures the experiment directory exists func (m *Manager) Initialize() error { if err := os.MkdirAll(m.basePath, 0750); err != nil { @@ -78,7 +99,8 @@ func (m *Manager) WriteMetadata(meta *Metadata) error { path := m.GetMetadataPath(meta.CommitID) // Binary format: - // [version:1][timestamp:8][commit_id_len:1][commit_id:var][job_name_len:1][job_name:var][user_len:1][user:var] + // [version:1][timestamp:8][commit_id_len:1][commit_id:var][job_name_len:1][job_name:var] + // [user_len:1][user:var] buf := make([]byte, 0, 256) @@ -168,6 +190,9 @@ func (m *Manager) ListExperiments() ([]string, error) { var commitIDs []string for _, entry := range entries { if entry.IsDir() { + if entry.Name() == "archive" { + continue + } commitIDs = append(commitIDs, entry.Name()) } } @@ -175,6 +200,37 @@ func (m *Manager) ListExperiments() ([]string, error) { return commitIDs, nil } +func (m *Manager) archiveExperiment(commitID string) (string, error) { + if m == nil { + return "", fmt.Errorf("missing manager") + } + commitID = strings.TrimSpace(commitID) + if err := container.ValidateJobName(commitID); err != nil { + return "", fmt.Errorf("invalid commit id: %w", err) + } + + src := m.GetExperimentPath(commitID) + info, err := os.Stat(src) + if err != nil { + return "", err + } + if !info.IsDir() { + return "", fmt.Errorf("experiment path is not a directory") + } + + stamp := time.Now().UTC().Format("20060102-150405") + archiveRoot := filepath.Join(m.basePath, "archive", stamp) + if err := os.MkdirAll(archiveRoot, 0750); err != nil { + return "", err + } + + dst := filepath.Join(archiveRoot, commitID) + if err := os.Rename(src, dst); err != nil { + return "", err + } + return dst, nil +} + // PruneExperiments removes old experiments based on retention policy func (m *Manager) PruneExperiments(keepCount int, olderThanDays int) ([]string, error) { commitIDs, err := m.ListExperiments() @@ -225,9 +281,7 @@ func (m *Manager) PruneExperiments(keepCount int, olderThanDays int) ([]string, } if shouldPrune { - expPath := m.GetExperimentPath(exp.commitID) - if err := os.RemoveAll(expPath); err != nil { - // Log but continue + if _, err := m.archiveExperiment(exp.commitID); err != nil { continue } pruned = append(pruned, exp.commitID) @@ -254,9 +308,13 @@ func (m *Manager) GetMetricsPath(commitID string) string { func (m *Manager) LogMetric(commitID string, name string, value float64, step int) error { path := m.GetMetricsPath(commitID) + // Ensure the experiment directory exists + if err := os.MkdirAll(m.GetExperimentPath(commitID), 0750); err != nil { + return fmt.Errorf("failed to create experiment directory: %w", err) + } + // Binary format for each metric: // [timestamp:8][step:4][value:8][name_len:1][name:var] - buf := make([]byte, 0, 64) // Timestamp @@ -345,3 +403,194 @@ func (m *Manager) GetMetrics(commitID string) ([]Metric, error) { return metrics, nil } + +// GetManifestPath returns the path to the manifest file for an experiment +func (m *Manager) GetManifestPath(commitID string) string { + return filepath.Join(m.GetExperimentPath(commitID), "manifest.json") +} + +// GenerateManifest creates a content integrity manifest for all files in the experiment directory +func (m *Manager) GenerateManifest(commitID string) (*Manifest, error) { + filesPath := m.GetFilesPath(commitID) + + // Check if files directory exists + if _, err := os.Stat(filesPath); os.IsNotExist(err) { + return nil, fmt.Errorf("files directory does not exist: %s", filesPath) + } + + manifest := &Manifest{ + CommitID: commitID, + Files: make(map[string]string), + Timestamp: time.Now().Unix(), + } + + // Walk the files directory and hash each file + err := filepath.Walk(filesPath, func(path string, info os.FileInfo, err error) error { + if err != nil { + return err + } + + // Skip directories + if info.IsDir() { + return nil + } + + // Get relative path from files directory + relPath, err := filepath.Rel(filesPath, path) + if err != nil { + return fmt.Errorf("failed to get relative path for %s: %w", path, err) + } + + // Calculate SHA256 of file + hash, err := m.hashFile(path) + if err != nil { + return fmt.Errorf("failed to hash file %s: %w", path, err) + } + + manifest.Files[relPath] = hash + return nil + }) + + if err != nil { + return nil, fmt.Errorf("failed to walk files directory: %w", err) + } + + // Calculate overall SHA256 of concatenated file hashes (sorted by path for determinism) + manifest.OverallSHA = m.calculateOverallSHA(manifest.Files) + + return manifest, nil +} + +// WriteManifest persists the manifest to disk +func (m *Manager) WriteManifest(manifest *Manifest) error { + path := m.GetManifestPath(manifest.CommitID) + + data, err := json.MarshalIndent(manifest, "", " ") + if err != nil { + return fmt.Errorf("failed to marshal manifest: %w", err) + } + + if err := fileutil.SecureFileWrite(path, data, 0640); err != nil { + return fmt.Errorf("failed to write manifest file: %w", err) + } + + return nil +} + +// ReadManifest loads the manifest from disk +func (m *Manager) ReadManifest(commitID string) (*Manifest, error) { + path := m.GetManifestPath(commitID) + + data, err := fileutil.SecureFileRead(path) + if err != nil { + return nil, fmt.Errorf("failed to read manifest file: %w", err) + } + + var manifest Manifest + if err := json.Unmarshal(data, &manifest); err != nil { + return nil, fmt.Errorf("failed to unmarshal manifest: %w", err) + } + + return &manifest, nil +} + +// ValidateManifest verifies that the current files match the stored manifest +func (m *Manager) ValidateManifest(commitID string) error { + // Read stored manifest + stored, err := m.ReadManifest(commitID) + if err != nil { + return fmt.Errorf("failed to read stored manifest: %w", err) + } + + // Generate manifest from current files + current, err := m.GenerateManifest(commitID) + if err != nil { + return fmt.Errorf("failed to generate current manifest: %w", err) + } + + // Verify commit ID matches + if stored.CommitID != current.CommitID { + return fmt.Errorf("commit ID mismatch: stored=%s, current=%s", stored.CommitID, current.CommitID) + } + + // Verify overall SHA matches + if stored.OverallSHA != current.OverallSHA { + return fmt.Errorf( + "overall integrity checksum mismatch: stored=%s, current=%s", + stored.OverallSHA, + current.OverallSHA, + ) + } + + // Verify file count matches + if len(stored.Files) != len(current.Files) { + return fmt.Errorf( + "file count mismatch: stored=%d, current=%d", + len(stored.Files), + len(current.Files), + ) + } + + // Verify each file hash matches + for relPath, storedHash := range stored.Files { + currentHash, exists := current.Files[relPath] + if !exists { + return fmt.Errorf("file missing in current manifest: %s", relPath) + } + if storedHash != currentHash { + return fmt.Errorf( + "file hash mismatch for %s: stored=%s, current=%s", + relPath, + storedHash, + currentHash, + ) + } + } + + return nil +} + +// hashFile calculates SHA256 hash of a file +func (m *Manager) hashFile(path string) (string, error) { + file, err := os.Open(path) + if err != nil { + return "", err + } + defer file.Close() + + hasher := sha256.New() + if _, err := io.Copy(hasher, file); err != nil { + return "", err + } + + return hex.EncodeToString(hasher.Sum(nil)), nil +} + +// calculateOverallSHA calculates deterministic SHA256 of all file hashes +func (m *Manager) calculateOverallSHA(files map[string]string) string { + // Sort paths for deterministic ordering + paths := make([]string, 0, len(files)) + for path := range files { + paths = append(paths, path) + } + + // Simple bubble sort for small lists (deterministic) + for i := 0; i < len(paths); i++ { + for j := i + 1; j < len(paths); j++ { + if paths[i] > paths[j] { + paths[i], paths[j] = paths[j], paths[i] + } + } + } + + // Concatenate all hashes + var combined strings.Builder + for _, path := range paths { + combined.WriteString(files[path]) + } + + // Calculate SHA256 of the combined string + hasher := sha256.New() + hasher.Write([]byte(combined.String())) + return hex.EncodeToString(hasher.Sum(nil)) +}