fetch_ml/internal/scheduler/protocol.go
Jeremie Fraeys 57787e1e7b
feat(scheduler): implement capability-based routing and hub v2
Add comprehensive capability routing system to scheduler hub:
- Capability-aware worker matching with requirement/offer negotiation
- Hub v2 protocol with structured message types and heartbeat management
- Worker capability advertisement and dynamic routing decisions
- Orphan recovery for disconnected workers with state reconciliation
- Template-based job scheduling with capability constraints

Add extensive test coverage:
- Unit tests for capability routing logic and heartbeat mechanics
- Unit tests for orphan recovery scenarios
- E2E tests for capability routing across multiple workers
- Hub capabilities integration tests
- Scheduler fixture helpers for test setup

Protocol improvements:
- Define structured protocol messages for hub-worker communication
- Add capability matching algorithm with scoring
- Implement graceful worker disconnection handling
2026-03-12 12:00:05 -04:00

227 lines
6.8 KiB
Go

package scheduler
import (
"encoding/json"
"time"
)
type Message struct {
Type MessageType `json:"type"`
Payload json.RawMessage `json:"payload,omitempty"`
Error string `json:"error,omitempty"`
}
type MessageType string
const (
// Worker → Scheduler
MsgRegister MessageType = "register"
MsgHeartbeat MessageType = "heartbeat" // slots only, every 10s
MsgReadyForWork MessageType = "ready_for_work"
MsgJobAccepted MessageType = "job_accepted"
MsgJobResult MessageType = "job_result"
MsgServiceHealth MessageType = "service_health"
MsgMetricsRequest MessageType = "metrics_request" // WSS metrics request
// Scheduler → Worker
MsgJobAssign MessageType = "job_assign"
MsgNoWork MessageType = "no_work" // nothing available right now
MsgJobCancel MessageType = "job_cancel"
MsgPrewarmHint MessageType = "prewarm_hint"
MsgAck MessageType = "ack"
MsgMetricsResponse MessageType = "metrics_response" // WSS metrics response
)
// Heartbeat — liveness and slot status combined, no CPU/mem load
type HeartbeatPayload struct {
WorkerID string `json:"worker_id"`
Slots SlotStatus `json:"slots"`
Capability WorkerCapabilities `json:"capability"` // Dynamic capability updates
}
type ReadyPayload struct {
WorkerID string `json:"worker_id"`
Slots SlotStatus `json:"slots"`
Reason string `json:"reason"`
}
type JobResultPayload struct {
TaskID string `json:"task_id"`
State string `json:"state"`
ExitCode int `json:"exit_code"`
Error string `json:"error,omitempty"`
}
type PrewarmHintPayload struct {
TaskID string `json:"task_id"`
SnapshotID string `json:"snapshot_id"`
SnapshotSHA string `json:"snapshot_sha,omitempty"`
}
type WorkerRegistration struct {
ID string `json:"id"`
Capabilities WorkerCapabilities `json:"capabilities"`
ActiveTasks []ActiveTaskReport `json:"active_tasks"`
}
type ActiveTaskReport struct {
TaskID string `json:"task_id"`
State string `json:"state"`
StartedAt time.Time `json:"started_at,omitzero"`
}
type SlotStatus struct {
BatchTotal int `json:"batch_total"`
BatchInUse int `json:"batch_in_use"`
ServiceTotal int `json:"service_total"`
ServiceInUse int `json:"service_in_use"`
}
func (s SlotStatus) BatchAvailable() int { return s.BatchTotal - s.BatchInUse }
func (s SlotStatus) ServiceAvailable() int { return s.ServiceTotal - s.ServiceInUse }
type GPUBackend string
const (
BackendNVIDIA GPUBackend = "nvidia"
BackendMetal GPUBackend = "metal"
BackendVulkan GPUBackend = "vulkan"
BackendCPU GPUBackend = "cpu"
)
type WorkerCapabilities struct {
GPUBackend GPUBackend `json:"gpu_backend"`
GPUCount int `json:"gpu_count"`
GPUType string `json:"gpu_type"`
VRAMGB float64 `json:"vram_gb"`
CPUCount int `json:"cpu_count"`
MemoryGB float64 `json:"memory_gb"`
Hostname string `json:"hostname"`
GPUInfo GPUDetectionInfo `json:"gpu_info"`
}
type GPUDetectionInfo struct {
GPUType string `json:"gpu_type"`
Count int `json:"count"`
Devices []string `json:"devices,omitempty"`
Driver string `json:"driver,omitempty"`
MemTotal uint64 `json:"mem_total,omitempty"`
}
type JobTier string
const (
TierDataProcessing JobTier = "data_processing"
TierEvaluation JobTier = "evaluation"
TierTraining JobTier = "training"
TierFineTuning JobTier = "fine_tuning"
)
var TierGracePeriods = map[JobTier]time.Duration{
TierDataProcessing: 30 * time.Second,
TierEvaluation: 2 * time.Minute,
TierTraining: 10 * time.Minute,
TierFineTuning: 10 * time.Minute,
}
type JobSpec struct {
ID string `json:"id"`
Type JobType `json:"type"` // "batch" | "service"
JobTier JobTier `json:"job_tier,omitempty"` // default: TierDataProcessing
SlotPool string `json:"slot_pool"`
UserID string `json:"user_id,omitempty"` // NEW: for per-user quota tracking
GPUCount int `json:"gpu_count"`
GPUType string `json:"gpu_type,omitempty"`
GPUBackend string `json:"gpu_backend,omitempty"` // empty = any
MinVRAMGB float64 `json:"min_vram_gb,omitempty"`
MinCPUCores int `json:"min_cpu_cores,omitempty"`
NodeCount int `json:"node_count"`
// MaxRuntimeHours is the maximum wall-clock time for this job.
// 0 = default (24h), capped at 168h (7d) by the scheduler.
MaxRuntimeHours int `json:"max_runtime_hours,omitempty"`
Command []string `json:"command"`
Env map[string]string `json:"env"`
Prolog []string `json:"prolog,omitempty"`
Epilog []string `json:"epilog,omitempty"`
SnapshotID string `json:"snapshot_id,omitempty"`
SnapshotSHA string `json:"snapshot_sha,omitempty"`
HealthCheck *HealthCheck `json:"health_check,omitempty"`
Metadata map[string]string `json:"metadata,omitempty"`
}
type JobType string
const (
JobTypeBatch JobType = "batch"
JobTypeService JobType = "service"
)
type HealthCheck struct {
LivenessEndpoint string `json:"liveness"`
ReadinessEndpoint string `json:"readiness"`
IntervalSecs int `json:"interval_secs"`
}
type ServiceHealthPayload struct {
TaskID string `json:"task_id"`
Healthy bool `json:"healthy"`
Message string `json:"message,omitempty"`
}
// JobAssignPayload is sent from scheduler to worker when assigning a task.
type JobAssignPayload struct {
Spec JobSpec `json:"spec"`
RemainingTime time.Duration `json:"remaining_time"` // Wall-clock budget left
}
// CLI message types for worker visibility
const (
// CLI → Scheduler
MsgWorkerListRequest MessageType = "worker_list_request"
MsgWorkerShowRequest MessageType = "worker_show_request"
// Scheduler → CLI
MsgWorkerListResponse MessageType = "worker_list_response"
MsgWorkerShowResponse MessageType = "worker_show_response"
)
type WorkerListRequest struct {
Backend string `json:"backend,omitempty"` // filter by backend
}
type WorkerListResponse struct {
Workers []WorkerInfo `json:"workers"`
}
type WorkerInfo struct {
ID string `json:"id"`
Backend GPUBackend `json:"backend"`
GPUCount int `json:"gpu_count"`
VRAMGB float64 `json:"vram_gb"`
CPUCount int `json:"cpu_count"`
Status string `json:"status"` // ready, busy, offline
ActiveJobs int `json:"active_jobs"`
TotalSlots int `json:"total_slots"`
LastHeartbeat time.Time `json:"last_heartbeat"`
}
type WorkerShowRequest struct {
WorkerID string `json:"worker_id"`
}
type WorkerShowResponse struct {
Worker WorkerInfo `json:"worker"`
Jobs []JobSummary `json:"jobs"`
}
type JobSummary struct {
TaskID string `json:"task_id"`
JobName string `json:"job_name"`
Status string `json:"status"`
StartedAt time.Time `json:"started_at"`
}