# Scheduler Architecture The FetchML Scheduler manages distributed job scheduling across workers via WebSocket connections. ## Overview The scheduler consists of: - **SchedulerHub**: Core scheduling engine (`internal/scheduler/hub.go`) - **PriorityQueue**: Heap-based job queues for batch and service jobs - **WorkerConn**: WebSocket connection handling per worker - **StateStore**: Persistent state for crash recovery - **ServiceManager**: Long-running service lifecycle management ## Key Components ### SchedulerHub ```go type SchedulerHub struct { workers map[string]*WorkerConn // Active worker connections readyWorkers map[string]*WorkerConn // Workers ready for jobs batchQueue *PriorityQueue // Batch job queue serviceQueue *PriorityQueue // Service job queue reservations map[string]*Reservation // Job reservations multiNodePending map[string]*MultiNodeJob // Multi-node gang allocations pendingAcceptance map[string]*JobAssignment // Jobs awaiting acceptance state *StateStore // Persistent state } ``` ### Job Types | Type | Description | Scheduling | |------|-------------|------------| | **Batch** | Finite training jobs | FIFO with priority aging | | **Service** | Long-running inference | Dedicated slots, health checks | | **Multi-node** | Distributed training | Gang allocation across workers | ## Protocol ### Unified WSS Protocol All communication uses a single WebSocket Secure (WSS) endpoint: - Workers connect to `wss://scheduler:port/ws/worker` - Metrics clients connect with `metrics-` prefixed token ### Message Types ```go const ( // Worker → Scheduler MsgRegister = "register" MsgHeartbeat = "heartbeat" MsgReadyForWork = "ready_for_work" MsgJobAccepted = "job_accepted" MsgJobResult = "job_result" MsgServiceHealth = "service_health" MsgMetricsRequest = "metrics_request" // Metrics over WSS // Scheduler → Worker MsgJobAssign = "job_assign" MsgNoWork = "no_work" MsgJobCancel = "job_cancel" MsgPrewarmHint = "prewarm_hint" MsgAck = "ack" MsgMetricsResponse = "metrics_response" // Metrics over WSS ) ``` ### Metrics Over WSS Metrics are retrieved via WSS using a special client token: ```go // Connect with metrics token conn, err := scheduler.DialWSS("scheduler:8443", "ca.crt", "metrics-scraper-1") // Request metrics conn.WriteJSON(scheduler.Message{ Type: scheduler.MsgMetricsRequest, }) // Receive metrics var msg scheduler.Message conn.ReadJSON(&msg) // msg.Type == MsgMetricsResponse // msg.Payload contains metrics map ``` **Metrics payload:** ```json { "workers_connected": 5, "queue_depth_batch": 12, "queue_depth_service": 3, "jobs_completed": 142, "jobs_failed": 2, "jobs_cancelled": 0, "worker_slots": { "worker-1": {"batch_total": 4, "batch_in_use": 2, ...} } } ``` ## Features ### Priority Aging Prevents starvation by increasing priority of long-waiting jobs: ```go effective_priority = base_priority + (wait_time * aging_rate) ``` ### Gang Allocation Multi-node jobs are allocated atomically across workers: 1. Job submitted with `NodeCount > 1` 2. Scheduler waits for required workers 3. All nodes assigned simultaneously 4. Timeout handling for partial allocations ### Starvation Prevention Tracks job wait times and triggers priority boosts: ```go if wait_time > starvation_threshold { effective_priority += boost_amount } ``` ### Worker Mode Switching Workers can switch between batch and service modes: - Batch mode: processes training jobs - Service mode: runs long-lived inference services ## Testing ### Test Infrastructure All tests use shared fixtures in `tests/fixtures/`: - `SchedulerTestFixture`: Common setup/teardown - `MockWorker`: Simulated worker connections ### Test Categories | Category | Count | Files | |----------|-------|-------| | Unit | 17+ | `tests/unit/scheduler/` | | Integration | 6 | `tests/integration/scheduler/` | | E2E | 6 | `tests/e2e/scheduler/` | ### Running Tests ```bash make test # All tests make test-unit # Unit tests only make test-integration # Integration tests only go test ./tests/e2e/... # E2E tests ``` ## State Persistence The scheduler persists state for crash recovery: - Job queue state - Task assignments - Worker registrations - Lease timestamps State is replayed on startup via `StateStore.Replay()`. ## Service Templates The scheduler provides built-in service templates for common ML workloads: ### Available Templates | Template | Description | Default Port Range | |----------|-------------|-------------------| | **JupyterLab** | Interactive Jupyter environment | 8000-9000 | | **Jupyter Notebook** | Classic Jupyter notebooks | 8000-9000 | | **vLLM** | OpenAI-compatible LLM inference server | 8000-9000 | ### Port Allocation Dynamic port management for service instances: ```go type PortAllocator struct { startPort int // Default: 8000 endPort int // Default: 9000 allocated map[int]time.Time // Port -> allocation time } ``` **Features:** - Automatic port selection from configured range - TTL-based port reclamation - Thread-safe concurrent allocations - Exhaustion handling with clear error messages ### Template Variables Service templates support dynamic variable substitution: | Variable | Description | Example | |----------|-------------|---------| | `{{SERVICE_PORT}}` | Allocated port for the service | `8080` | | `{{WORKER_ID}}` | ID of the assigned worker | `worker-1` | | `{{TASK_ID}}` | Unique task identifier | `task-abc123` | | `{{SECRET:xxx}}` | Secret reference from keychain | `api-key-value` | | `{{MODEL_NAME}}` | ML model name (vLLM) | `llama-2-7b` | | `{{GPU_COUNT}}` | Number of GPUs allocated | `2` | | `{{GPU_DEVICES}}` | Specific GPU device IDs | `0,1` | | `{{MODEL_CACHE}}` | Path to model cache directory | `/models` | | `{{WORKSPACE}}` | Working directory path | `/workspace` | ## API Methods ```go // SubmitJob submits a job to the scheduler func (h *SchedulerHub) SubmitJob(spec JobSpec) error // GetTask retrieves a task by ID func (h *SchedulerHub) GetTask(taskID string) *Task // Addr returns the scheduler's listen address func (h *SchedulerHub) Addr() string // Start begins the scheduler func (h *SchedulerHub) Start() error // Stop shuts down the scheduler func (h *SchedulerHub) Stop() ``` ## Audit Integration The scheduler integrates with the audit logging system for security and compliance: ### Audit Logger Integration ```go type SchedulerHub struct { // ... other fields ... auditor *audit.Logger // Security audit logger } ``` **Initialization:** ```go auditor := audit.NewLogger(audit.Config{ LogPath: "/var/log/fetch_ml/scheduler_audit.log", Enabled: true, }) hub, err := scheduler.NewHub(config, auditor) ``` ### Audit Events The scheduler logs the following audit events: | Event | Description | Fields Logged | |-------|-------------|---------------| | `job_submitted` | New job queued | job_id, user_id, job_type, gpu_count | | `job_assigned` | Job assigned to worker | job_id, worker_id, assignment_time | | `job_accepted` | Worker accepted job | job_id, worker_id, acceptance_time | | `job_completed` | Job finished successfully | job_id, worker_id, duration | | `job_failed` | Job failed | job_id, worker_id, error_code | | `job_cancelled` | Job cancelled | job_id, cancelled_by, reason | | `worker_registered` | Worker connected | worker_id, capabilities, timestamp | | `worker_disconnected` | Worker disconnected | worker_id, duration_connected | | `quota_exceeded` | GPU quota violation | user_id, plugin_name, requested, limit | ### Tamper-Evident Logging Audit logs use chain hashing for integrity: - Each event includes SHA-256 hash of previous event - Chain verification detects log tampering - Separate log file from operational logs ### Configuration ```go type HubConfig struct { BindAddr string // Listen address CertFile string // TLS certificate KeyFile string // TLS key StateDir string // State persistence dir DefaultBatchSlots int // Default batch slots per worker DefaultServiceSlots int // Default service slots per worker StarvationThresholdMins float64 // Starvation detection threshold PriorityAgingRate float64 // Priority increase rate GangAllocTimeoutSecs int // Multi-node allocation timeout AcceptanceTimeoutSecs int // Job acceptance timeout WorkerTokens map[string]string // Authentication tokens PluginQuota PluginQuotaConfig // Plugin GPU quota configuration } ``` ## Cross-Platform Support Process management is abstracted for Unix/Windows: - `service_manager_unix.go`: POSIX process groups - `service_manager_windows.go`: Windows job objects ## See Also - **[Architecture Overview](architecture.md)** - High-level system architecture - **[Security Guide](security.md)** - Audit logging and security features - **[Configuration Reference](configuration-reference.md)** - Plugin GPU quotas and scheduler config - **[Jupyter Workflow](jupyter-workflow.md)** - Jupyter service integration with scheduler - **[vLLM Workflow](vllm-workflow.md)** - vLLM service integration with scheduler - **[Testing Guide](testing.md)** - Testing the scheduler - **`internal/scheduler/hub.go`** - Core implementation - **`tests/fixtures/scheduler_fixture.go`** - Test infrastructure