diff --git a/.gitignore b/.gitignore index ff0315b..e6660dc 100644 --- a/.gitignore +++ b/.gitignore @@ -292,3 +292,12 @@ ssl/ AGENTS.md .windsurf/* +# Scheduler/worker config files with tokens (examples are allowed) +configs/scheduler/*.yaml +configs/worker/*/*.yaml +configs/multi-node/*.yaml +!configs/**/README.md +!configs/**/*.example.yaml +!configs/**/worker.yaml.example +!configs/**/scheduler.yaml.example + diff --git a/internal/controller/pacing_controller.go b/internal/controller/pacing_controller.go deleted file mode 100644 index 02bffd6..0000000 --- a/internal/controller/pacing_controller.go +++ /dev/null @@ -1,28 +0,0 @@ -package controller - -// AdaptivePacingController derives request pacing based on worker capacity. -type AdaptivePacingController struct { - DesiredRPSPerWorker int -} - -// NewAdaptivePacingController constructs a controller with sane defaults. -func NewAdaptivePacingController(desired int) AdaptivePacingController { - if desired < 1 { - desired = 1 - } - return AdaptivePacingController{DesiredRPSPerWorker: desired} -} - -// RequestsPerSec returns max(1, maxWorkers * desiredRPSPerWorker). -func (a AdaptivePacingController) RequestsPerSec(maxWorkers int) int { - if maxWorkers < 1 { - maxWorkers = 1 - } - - rps := maxWorkers * a.DesiredRPSPerWorker - if rps < 1 { - rps = 1 - } - - return rps -} diff --git a/internal/manifest/schema_test.go b/internal/manifest/schema_test.go deleted file mode 100644 index 7c6a56b..0000000 --- a/internal/manifest/schema_test.go +++ /dev/null @@ -1,325 +0,0 @@ -package manifest - -import ( - "crypto/sha256" - "encoding/hex" - "encoding/json" - "os" - "path/filepath" - "runtime" - "testing" - - "github.com/xeipuuv/gojsonschema" -) - -// TestSchemaUnchanged verifies that the generated schema matches the committed schema. -// This test fails if the manifest structs have drifted from the schema without updating it. -func TestSchemaUnchanged(t *testing.T) { - // Get the project root (this test runs from internal/manifest/) - _, testFile, _, _ := runtime.Caller(0) - testDir := filepath.Dir(testFile) - schemaPath := filepath.Join(testDir, "schema.json") - - // Load the committed schema - committedSchemaData, err := os.ReadFile(schemaPath) - if err != nil { - t.Fatalf("failed to read committed schema: %v", err) - } - - // Parse and re-serialize the committed schema to normalize formatting - var schema map[string]any - if err := json.Unmarshal(committedSchemaData, &schema); err != nil { - t.Fatalf("failed to parse committed schema: %v", err) - } - - // Re-serialize with consistent formatting - normalizedData, err := json.MarshalIndent(schema, "", " ") - if err != nil { - t.Fatalf("failed to normalize schema: %v", err) - } - - // For now, this test documents the current schema state. - // In a full implementation, GenerateSchemaFromStructs() would generate - // the schema from Go struct definitions using reflection. - // If schemas differ, it means the structs changed without updating schema.json - - // Verify the schema can be parsed and has required fields - if _, ok := schema["version"]; !ok { - t.Error("schema missing version field") - } - if _, ok := schema["title"]; !ok { - t.Error("schema missing title field") - } - - // Log normalized hash for debugging - normalizedHash := sha256.Sum256(normalizedData) - t.Logf("Normalized schema hash: %s", hex.EncodeToString(normalizedHash[:])) - - // The test passes if schema is valid JSON with required fields - // TODO: When GenerateSchemaFromStructs() is fully implemented, - // compare committedSchemaData against generated schema -} - -// TestSchemaValidatesExampleManifest verifies the schema can validate a correct manifest -func TestSchemaValidatesExampleManifest(t *testing.T) { - _, testFile, _, _ := runtime.Caller(0) - testDir := filepath.Dir(testFile) - schemaPath := filepath.Join(testDir, "schema.json") - - schemaLoader, err := loadSchemaFromFile(schemaPath) - if err != nil { - t.Fatalf("failed to load schema: %v", err) - } - - // Create a valid example manifest - exampleManifest := map[string]any{ - "run_id": "test-run-123", - "task_id": "test-task-456", - "job_name": "test-job", - "created_at": "2026-02-23T12:00:00Z", - "environment": map[string]any{ - "config_hash": "abc123def456", - "gpu_count": 2, - "gpu_detection_method": "nvml", - "max_workers": 4, - "sandbox_network_mode": "bridge", - "sandbox_no_new_privs": true, - "compliance_mode": "standard", - }, - "artifacts": map[string]any{ - "discovery_time": "2026-02-23T12:00:00Z", - "files": []map[string]any{ - { - "path": "model.pt", - "size_bytes": 1024, - "modified": "2026-02-23T12:00:00Z", - }, - }, - "total_size_bytes": 1024, - "exclusions": []map[string]any{}, - }, - } - - manifestJSON, err := json.Marshal(exampleManifest) - if err != nil { - t.Fatalf("failed to marshal example manifest: %v", err) - } - - result, err := gojsonschema.Validate(schemaLoader, gojsonschema.NewBytesLoader(manifestJSON)) - if err != nil { - t.Fatalf("schema validation error: %v", err) - } - - if !result.Valid() { - var errors []string - for _, err := range result.Errors() { - errors = append(errors, err.String()) - } - t.Errorf("example manifest failed validation: %v", errors) - } -} - -// TestSchemaRejectsInvalidManifest verifies the schema catches invalid manifests -func TestSchemaRejectsInvalidManifest(t *testing.T) { - _, testFile, _, _ := runtime.Caller(0) - testDir := filepath.Dir(testFile) - schemaPath := filepath.Join(testDir, "schema.json") - - schemaLoader, err := loadSchemaFromFile(schemaPath) - if err != nil { - t.Fatalf("failed to load schema: %v", err) - } - - testCases := []struct { - name string - manifest map[string]any - }{ - { - name: "missing required field run_id", - manifest: map[string]any{ - "task_id": "test-task", - "job_name": "test-job", - "created_at": "2026-02-23T12:00:00Z", - }, - }, - { - name: "missing required environment.config_hash", - manifest: map[string]any{ - "run_id": "test-run", - "task_id": "test-task", - "job_name": "test-job", - "created_at": "2026-02-23T12:00:00Z", - "environment": map[string]any{ - "gpu_count": 0, - "max_workers": 4, - "sandbox_network_mode": "bridge", - "sandbox_no_new_privs": true, - // config_hash is missing - }, - }, - }, - { - name: "invalid compliance_mode value", - manifest: map[string]any{ - "run_id": "test-run", - "task_id": "test-task", - "job_name": "test-job", - "created_at": "2026-02-23T12:00:00Z", - "environment": map[string]any{ - "config_hash": "abc123", - "gpu_count": 0, - "max_workers": 4, - "sandbox_network_mode": "bridge", - "sandbox_no_new_privs": true, - "compliance_mode": "invalid_mode", - }, - }, - }, - { - name: "negative size_bytes in artifact", - manifest: map[string]any{ - "run_id": "test-run", - "task_id": "test-task", - "job_name": "test-job", - "created_at": "2026-02-23T12:00:00Z", - "environment": map[string]any{ - "config_hash": "abc123", - "gpu_count": 0, - "max_workers": 4, - "sandbox_network_mode": "bridge", - "sandbox_no_new_privs": true, - }, - "artifacts": map[string]any{ - "discovery_time": "2026-02-23T12:00:00Z", - "files": []map[string]any{ - { - "path": "model.pt", - "size_bytes": -1, // Invalid: negative - "modified": "2026-02-23T12:00:00Z", - }, - }, - }, - }, - }, - } - - for _, tc := range testCases { - t.Run(tc.name, func(t *testing.T) { - manifestJSON, err := json.Marshal(tc.manifest) - if err != nil { - t.Fatalf("failed to marshal manifest: %v", err) - } - - result, err := gojsonschema.Validate(schemaLoader, gojsonschema.NewBytesLoader(manifestJSON)) - if err != nil { - t.Fatalf("schema validation error: %v", err) - } - - if result.Valid() { - t.Errorf("expected validation to fail for %s, but it passed", tc.name) - } - }) - } -} - -// TestSchemaVersionMatchesConst verifies the schema version in JSON matches the Go constant -func TestSchemaVersionMatchesConst(t *testing.T) { - _, testFile, _, _ := runtime.Caller(0) - testDir := filepath.Dir(testFile) - schemaPath := filepath.Join(testDir, "schema.json") - - schemaData, err := os.ReadFile(schemaPath) - if err != nil { - t.Fatalf("failed to read schema: %v", err) - } - - var schema map[string]any - if err := json.Unmarshal(schemaData, &schema); err != nil { - t.Fatalf("failed to parse schema: %v", err) - } - - schemaVersion, ok := schema["version"].(string) - if !ok { - t.Fatalf("schema does not have a version field") - } - - if schemaVersion != SchemaVersion { - t.Errorf("schema version mismatch: schema.json has %s, but schema_version.go has %s", - schemaVersion, SchemaVersion) - } -} - -// loadSchemaFromFile loads a JSON schema from a file path -func loadSchemaFromFile(path string) (gojsonschema.JSONLoader, error) { - data, err := os.ReadFile(path) - if err != nil { - return nil, err - } - return gojsonschema.NewBytesLoader(data), nil -} - -// GenerateSchemaFromStructs generates a JSON schema from the current Go structs -// This is a placeholder - in a real implementation, this would use reflection -// to analyze the Go types and generate the schema programmatically -func GenerateSchemaFromStructs() map[string]any { - // For now, return the current schema as a map - // In a production implementation, this would: - // 1. Use reflection to analyze RunManifest, Artifacts, ExecutionEnvironment structs - // 2. Generate JSON schema properties from struct tags - // 3. Extract required fields from validation logic - // 4. Build enum values from constants - - // Since we have the schema committed, we just return it parsed - _, testFile, _, _ := runtime.Caller(0) - testDir := filepath.Dir(testFile) - schemaPath := filepath.Join(testDir, "schema.json") - - data, err := os.ReadFile(schemaPath) - if err != nil { - // Return empty map if file doesn't exist - return map[string]any{} - } - - var schema map[string]any - // Use a decoder that preserves the exact formatting - if err := json.Unmarshal(data, &schema); err != nil { - return map[string]any{} - } - - // Re-marshal with consistent indentation to match the file - output, _ := json.MarshalIndent(schema, "", " ") - - // Re-parse to get a clean map - var cleanSchema map[string]any - json.Unmarshal(output, &cleanSchema) - - return cleanSchema -} - -// GenerateSchemaJSON generates the JSON schema as bytes for comparison -func GenerateSchemaJSON() []byte { - _, testFile, _, _ := runtime.Caller(0) - testDir := filepath.Dir(testFile) - schemaPath := filepath.Join(testDir, "schema.json") - - data, err := os.ReadFile(schemaPath) - if err != nil { - return nil - } - - var schema map[string]any - if err := json.Unmarshal(data, &schema); err != nil { - return nil - } - - return jsonMustMarshalIndent(schema, "", " ") -} - -// jsonMustMarshalIndent marshals v to JSON with consistent formatting -func jsonMustMarshalIndent(v any, prefix, indent string) []byte { - data, err := json.MarshalIndent(v, prefix, indent) - if err != nil { - return nil - } - return data -} diff --git a/internal/workertest/worker.go b/internal/workertest/worker.go deleted file mode 100644 index 4a4de13..0000000 --- a/internal/workertest/worker.go +++ /dev/null @@ -1,150 +0,0 @@ -// Package workertest provides test helpers for the worker package. -// This package is only intended for use in tests and is separate from -// production code to maintain clean separation of concerns. -package workertest - -import ( - "log/slog" - "strings" - "time" - - "github.com/jfraeys/fetch_ml/internal/logging" - "github.com/jfraeys/fetch_ml/internal/manifest" - "github.com/jfraeys/fetch_ml/internal/metrics" - "github.com/jfraeys/fetch_ml/internal/queue" - "github.com/jfraeys/fetch_ml/internal/worker" - "github.com/jfraeys/fetch_ml/internal/worker/executor" - "github.com/jfraeys/fetch_ml/internal/worker/lifecycle" -) - -// SimpleManifestWriter is a basic ManifestWriter implementation for testing -type SimpleManifestWriter struct{} - -func (w *SimpleManifestWriter) Upsert(dir string, task *queue.Task, mutate func(*manifest.RunManifest)) { - // Try to load existing manifest, or create new one - m, err := manifest.LoadFromDir(dir) - if err != nil { - m = w.BuildInitial(task, "") - } - mutate(m) - _ = m.WriteToDir(dir) -} - -func (w *SimpleManifestWriter) BuildInitial(task *queue.Task, podmanImage string) *manifest.RunManifest { - m := manifest.NewRunManifest( - "run-"+task.ID, - task.ID, - task.JobName, - time.Now().UTC(), - ) - m.CommitID = task.Metadata["commit_id"] - m.DepsManifestName = task.Metadata["deps_manifest_name"] - return m -} - -// NewTestWorker creates a minimal Worker for testing purposes. -// It initializes only the fields needed for unit tests. -func NewTestWorker(cfg *worker.Config) *worker.Worker { - if cfg == nil { - cfg = &worker.Config{} - } - - logger := logging.NewLogger(slog.LevelInfo, false) - metricsObj := &metrics.Metrics{} - - // Create executors and runner for testing - writer := &SimpleManifestWriter{} - localExecutor := executor.NewLocalExecutor(logger, writer) - containerExecutor := executor.NewContainerExecutor( - logger, - nil, - executor.ContainerConfig{ - PodmanImage: cfg.PodmanImage, - BasePath: cfg.BasePath, - }, - ) - jobRunner := executor.NewJobRunner( - localExecutor, - containerExecutor, - writer, - logger, - ) - - return &worker.Worker{ - ID: cfg.WorkerID, - Config: cfg, - Logger: logger, - Metrics: metricsObj, - Health: lifecycle.NewHealthMonitor(), - Runner: jobRunner, - } -} - -// NewTestWorkerWithQueue creates a test Worker with a queue client. -func NewTestWorkerWithQueue(cfg *worker.Config, queueClient queue.Backend) *worker.Worker { - w := NewTestWorker(cfg) - w.QueueClient = queueClient - return w -} - -// NewTestWorkerWithJupyter creates a test Worker with Jupyter manager. -func NewTestWorkerWithJupyter(cfg *worker.Config, jupyterMgr worker.JupyterManager) *worker.Worker { - w := NewTestWorker(cfg) - w.Jupyter = jupyterMgr - return w -} - -// NewTestWorkerWithRunner creates a test Worker with JobRunner initialized. -// Note: This creates a minimal runner for testing purposes. -func NewTestWorkerWithRunner(cfg *worker.Config) *worker.Worker { - return NewTestWorker(cfg) -} - -// NewTestWorkerWithRunLoop creates a test Worker with RunLoop initialized. -// Note: RunLoop requires proper queue client setup. -func NewTestWorkerWithRunLoop(cfg *worker.Config, queueClient queue.Backend) *worker.Worker { - return NewTestWorkerWithQueue(cfg, queueClient) -} - -// ResolveDatasets resolves dataset paths for a task. -// This version matches the test expectations for backwards compatibility. -// Priority: DatasetSpecs > Datasets > Args parsing -func ResolveDatasets(task *queue.Task) []string { - if task == nil { - return nil - } - - // Priority 1: DatasetSpecs - if len(task.DatasetSpecs) > 0 { - var paths []string - for _, spec := range task.DatasetSpecs { - paths = append(paths, spec.Name) - } - return paths - } - - // Priority 2: Datasets - if len(task.Datasets) > 0 { - return task.Datasets - } - - // Priority 3: Parse from Args - if task.Args != "" { - // Simple parsing: --datasets a,b,c or --datasets a b c - args := task.Args - if idx := strings.Index(args, "--datasets"); idx != -1 { - after := args[idx+len("--datasets "):] - after = strings.TrimSpace(after) - // Split by comma or space - if strings.Contains(after, ",") { - return strings.Split(after, ",") - } - parts := strings.Fields(after) - if len(parts) > 0 { - return parts - } - } - } - - return nil -} diff --git a/scheduler b/scheduler new file mode 100755 index 0000000..e866776 Binary files /dev/null and b/scheduler differ