diff --git a/docker-compose.dev.yml b/docker-compose.dev.yml new file mode 100644 index 0000000..39edd32 --- /dev/null +++ b/docker-compose.dev.yml @@ -0,0 +1,54 @@ +# Developer-focused Docker Compose for local development +# Simplified from deployments/docker-compose.dev.yml for quick local dev + +services: + redis: + image: redis:7-alpine + ports: + - "6379:6379" + volumes: + - redis_data:/data + command: redis-server --appendonly yes + + api-server: + build: + context: . + dockerfile: build/docker/simple.Dockerfile + ports: + - "9101:9101" + volumes: + - .:/workspace + - ./data/dev/logs:/logs + - ./data/dev/experiments:/data/experiments + - ./data/dev/active:/data/active + environment: + - LOG_LEVEL=debug + - LOG_FORMAT=text + - ENV=development + depends_on: + - redis + command: ["/bin/sh", "-c", "mkdir -p /data/experiments /data/active && exec /usr/local/bin/api-server -config /app/configs/api/dev.yaml"] + + worker: + build: + context: . + dockerfile: build/docker/simple.Dockerfile + ports: + - "9102:9102" + volumes: + - .:/workspace + - ./data/dev/logs:/logs + - ./data/dev/active:/data/active + - ./data/dev/experiments:/data/experiments + - /var/run/docker.sock:/var/run/docker.sock + environment: + - LOG_LEVEL=debug + - LOG_FORMAT=text + - ENV=development + depends_on: + - redis + - api-server + command: ["/usr/local/bin/worker", "-config", "/app/configs/worker.yaml"] + +volumes: + redis_data: diff --git a/internal/manifest/validator.go b/internal/manifest/validator.go new file mode 100644 index 0000000..6e00347 --- /dev/null +++ b/internal/manifest/validator.go @@ -0,0 +1,159 @@ +package manifest + +import ( + "errors" + "fmt" +) + +// ErrIncompleteManifest is returned when a required manifest field is missing. +var ErrIncompleteManifest = errors.New("incomplete manifest") + +// Validator validates that a RunManifest is complete before execution. +type Validator struct { + requiredFields []string +} + +// NewValidator creates a new manifest validator with default required fields. +func NewValidator() *Validator { + return &Validator{ + requiredFields: []string{ + "commit_id", + "deps_manifest_sha256", + }, + } +} + +// NewValidatorWithFields creates a validator with custom required fields. +func NewValidatorWithFields(fields []string) *Validator { + return &Validator{ + requiredFields: fields, + } +} + +// ValidationError contains details about a validation failure. +type ValidationError struct { + Field string `json:"field"` + Message string `json:"message"` +} + +// Error returns the error string. +func (e ValidationError) Error() string { + return fmt.Sprintf("validation error for field '%s': %s", e.Field, e.Message) +} + +// Validate checks that the manifest has all required fields. +// Returns an error listing all missing fields. +func (v *Validator) Validate(m *RunManifest) error { + if m == nil { + return fmt.Errorf("manifest is nil: %w", ErrIncompleteManifest) + } + + var validationErrors []ValidationError + + for _, field := range v.requiredFields { + if err := v.validateField(m, field); err != nil { + validationErrors = append(validationErrors, *err) + } + } + + if len(validationErrors) > 0 { + // Build comprehensive error message + msg := "manifest validation failed:\n" + for _, err := range validationErrors { + msg += fmt.Sprintf(" - %s\n", err.Error()) + } + return fmt.Errorf("%s: %w", msg, ErrIncompleteManifest) + } + + return nil +} + +// ValidateStrict fails if ANY optional fields commonly used for provenance are missing. +// This is for high-assurance environments. +func (v *Validator) ValidateStrict(m *RunManifest) error { + if err := v.Validate(m); err != nil { + return err + } + + // Additional strict checks + var strictErrors []ValidationError + + if m.WorkerVersion == "" { + strictErrors = append(strictErrors, ValidationError{ + Field: "worker_version", + Message: "required for strict provenance", + }) + } + + if m.PodmanImage == "" { + strictErrors = append(strictErrors, ValidationError{ + Field: "podman_image", + Message: "required for strict provenance", + }) + } + + if len(strictErrors) > 0 { + msg := "strict manifest validation failed:\n" + for _, err := range strictErrors { + msg += fmt.Sprintf(" - %s\n", err.Error()) + } + return fmt.Errorf("%s: %w", msg, ErrIncompleteManifest) + } + + return nil +} + +// validateField checks a single required field. +func (v *Validator) validateField(m *RunManifest, field string) *ValidationError { + switch field { + case "commit_id": + if m.CommitID == "" { + return &ValidationError{ + Field: field, + Message: "commit_id is required for code provenance", + } + } + case "deps_manifest_sha256": + if m.DepsManifestSHA == "" { + return &ValidationError{ + Field: field, + Message: "deps_manifest_sha256 is required for dependency provenance", + } + } + case "run_id": + if m.RunID == "" { + return &ValidationError{ + Field: field, + Message: "run_id is required", + } + } + case "task_id": + if m.TaskID == "" { + return &ValidationError{ + Field: field, + Message: "task_id is required", + } + } + case "job_name": + if m.JobName == "" { + return &ValidationError{ + Field: field, + Message: "job_name is required", + } + } + case "snapshot_sha256": + if m.SnapshotID != "" && m.SnapshotSHA256 == "" { + return &ValidationError{ + Field: field, + Message: "snapshot_sha256 is required when snapshot_id is provided", + } + } + } + + return nil +} + +// IsValidationError checks if an error is a manifest validation error. +func IsValidationError(err error) bool { + return errors.Is(err, ErrIncompleteManifest) +} diff --git a/internal/queue/queue_spec_test.go b/internal/queue/queue_spec_test.go new file mode 100644 index 0000000..67067c5 --- /dev/null +++ b/internal/queue/queue_spec_test.go @@ -0,0 +1,188 @@ +package queue + +import ( + "testing" + "time" +) + +// TestTaskPrioritizationSpec documents the scheduler's priority and FIFO behavior. +// These tests serve as executable specifications for the queue system. +func TestTaskPrioritizationSpec(t *testing.T) { + tests := []struct { + name string + tasks []Task + expected []string // IDs in expected execution order + }{ + { + name: "higher priority runs first", + tasks: []Task{ + {ID: "low", Priority: 1, CreatedAt: time.Unix(100, 0)}, + {ID: "high", Priority: 10, CreatedAt: time.Unix(100, 0)}, + }, + expected: []string{"high", "low"}, + }, + { + name: "FIFO for same priority", + tasks: []Task{ + {ID: "first", Priority: 5, CreatedAt: time.Unix(100, 0)}, + {ID: "second", Priority: 5, CreatedAt: time.Unix(200, 0)}, + }, + expected: []string{"first", "second"}, + }, + { + name: "mixed priorities and creation times", + tasks: []Task{ + {ID: "medium-early", Priority: 5, CreatedAt: time.Unix(100, 0)}, + {ID: "high-late", Priority: 10, CreatedAt: time.Unix(300, 0)}, + {ID: "low-early", Priority: 1, CreatedAt: time.Unix(50, 0)}, + }, + expected: []string{"high-late", "medium-early", "low-early"}, + }, + { + name: "negative priority is lowest", + tasks: []Task{ + {ID: "negative", Priority: -1, CreatedAt: time.Unix(100, 0)}, + {ID: "positive", Priority: 1, CreatedAt: time.Unix(100, 0)}, + }, + expected: []string{"positive", "negative"}, + }, + { + name: "zero priority is default", + tasks: []Task{ + {ID: "zero", Priority: 0, CreatedAt: time.Unix(100, 0)}, + {ID: "positive", Priority: 1, CreatedAt: time.Unix(100, 0)}, + }, + expected: []string{"positive", "zero"}, + }, + } + + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + // Create a queue and add tasks + tmpDir := t.TempDir() + q, err := NewFilesystemQueue(tmpDir) + if err != nil { + t.Fatalf("failed to create queue: %v", err) + } + defer q.Close() + + // Add all tasks + for _, task := range tt.tasks { + task := task // capture range variable + if err := q.AddTask(&task); err != nil { + t.Fatalf("failed to add task %s: %v", task.ID, err) + } + } + + // Get tasks in order and verify + var actual []string + for i := 0; i < len(tt.tasks); i++ { + task, err := q.GetNextTask() + if err != nil { + t.Fatalf("failed to get task at position %d: %v", i, err) + } + if task == nil { + t.Fatalf("expected task at position %d, got nil", i) + } + actual = append(actual, task.ID) + } + + // Verify order + if len(actual) != len(tt.expected) { + t.Errorf("expected %d tasks, got %d", len(tt.expected), len(actual)) + } + for i, expectedID := range tt.expected { + if i >= len(actual) { + break + } + if actual[i] != expectedID { + t.Errorf("position %d: expected %s, got %s", i, expectedID, actual[i]) + } + } + }) + } +} + +// TestQueueSpec_ClaimAndComplete documents the claim-complete lifecycle +func TestQueueSpec_ClaimAndComplete(t *testing.T) { + tmpDir := t.TempDir() + q, err := NewFilesystemQueue(tmpDir) + if err != nil { + t.Fatalf("failed to create queue: %v", err) + } + defer q.Close() + + // Add a task + task := &Task{ + ID: "task-1", + JobName: "test-job", + Priority: 5, + CreatedAt: time.Now(), + } + if err := q.AddTask(task); err != nil { + t.Fatalf("failed to add task: %v", err) + } + + // Get the task + claimed, err := q.GetNextTask() + if err != nil { + t.Fatalf("failed to get task: %v", err) + } + if claimed == nil { + t.Fatal("expected to claim a task, got nil") + } + if claimed.ID != task.ID { + t.Errorf("expected task %s, got %s", task.ID, claimed.ID) + } + + // Verify task is no longer in queue + tasks, err := q.GetAllTasks() + if err != nil { + t.Fatalf("failed to get tasks: %v", err) + } + for _, tsk := range tasks { + if tsk.ID == task.ID { + t.Error("claimed task should not be in queue") + } + } +} + +// TestQueueSpec_TaskPriorityOrdering documents numeric priority ordering +func TestQueueSpec_TaskPriorityOrdering(t *testing.T) { + tmpDir := t.TempDir() + q, err := NewFilesystemQueue(tmpDir) + if err != nil { + t.Fatalf("failed to create queue: %v", err) + } + defer q.Close() + + // Add tasks with various priorities + priorities := []int64{100, 50, 200, 1, 75} + for i, p := range priorities { + task := &Task{ + ID: "task-" + string(rune('a'+i)), + JobName: "job-" + string(rune('a'+i)), + Priority: p, + CreatedAt: time.Now(), + } + if err := q.AddTask(task); err != nil { + t.Fatalf("failed to add task: %v", err) + } + } + + // Expected order: 200, 100, 75, 50, 1 (descending) + expected := []string{"task-c", "task-a", "task-e", "task-b", "task-d"} + + for i, expID := range expected { + task, err := q.GetNextTask() + if err != nil { + t.Fatalf("position %d: failed to get task: %v", i, err) + } + if task == nil { + t.Fatalf("position %d: expected task %s, got nil", i, expID) + } + if task.ID != expID { + t.Errorf("position %d: expected %s, got %s", i, expID, task.ID) + } + } +} diff --git a/internal/worker/executor/runner.go b/internal/worker/executor/runner.go index 9ed2caf..4e308f4 100644 --- a/internal/worker/executor/runner.go +++ b/internal/worker/executor/runner.go @@ -80,6 +80,27 @@ func (r *JobRunner) Run( // 3. Select executor executor := r.selectExecutor(mode, localMode) + // 3.5 Validate manifest completeness before execution + if r.writer != nil { + // Load current manifest and validate + if m, err := manifest.LoadFromDir(outputDir); err == nil { + validator := manifest.NewValidator() + if err := validator.Validate(m); err != nil { + r.logger.Error("manifest validation failed - execution blocked", + "task", task.ID, + "error", err) + return &errtypes.TaskExecutionError{ + TaskID: task.ID, + JobName: task.JobName, + Phase: "validation", + Message: "manifest incomplete - execution blocked", + Err: err, + Recoverable: false, // Can't retry - manifest is missing data + } + } + } + } + // 4. Pre-execution manifest update if r.writer != nil { r.writer.Upsert(outputDir, task, func(m *manifest.RunManifest) { diff --git a/podman/hermetic.dockerfile b/podman/hermetic.dockerfile new file mode 100644 index 0000000..a940ca7 --- /dev/null +++ b/podman/hermetic.dockerfile @@ -0,0 +1,40 @@ +# Hermetic Dockerfile - Reproducible builds with pinned dependencies +# Tag image with: deps- +# Example: docker build -t mylab/pytorch:deps-abc123 . + +FROM pytorch/pytorch:2.0.1-cuda11.8-cudnn8-runtime + +# Pin system dependencies to specific versions for reproducibility +# These versions are frozen - update only after testing +RUN apt-get update && apt-get install -y --no-install-recommends \ + libblas3=3.9.0-1build1 \ + liblapack3=3.9.0-1build1 \ + libcudnn8=8.6.0.163-1+cuda11.8 \ + && rm -rf /var/lib/apt/lists/* + +# Install conda environment with pinned packages +COPY deps_manifest.json /tmp/deps_manifest.json + +# If using conda environment file +RUN if [ -f /tmp/deps_manifest.json ]; then \ + conda env update -n base -f /tmp/deps_manifest.json; \ + fi + +# If using requirements.txt with hashes +COPY requirements.txt /tmp/requirements.txt +RUN pip install --require-hashes --no-deps -r /tmp/requirements.txt \ + || echo "Warning: Some packages may not have hashes" + +# Verify installation +RUN python -c "import torch; print(f'PyTorch: {torch.__version__}')" \ + && python -c "import numpy; print(f'NumPy: {numpy.__version__}')" + +# Labels for provenance +LABEL org.opencontainers.image.title="Hermetic ML Environment" \ + org.opencontainers.image.description="Reproducible ML training environment" \ + org.fetchml.deps_manifest="/tmp/deps_manifest.json" \ + org.fetchml.build_date="${BUILD_DATE}" \ + org.fetchml.git_commit="${GIT_COMMIT}" + +WORKDIR /workspace +CMD ["python", "--version"]