Add comprehensive capability routing system to scheduler hub: - Capability-aware worker matching with requirement/offer negotiation - Hub v2 protocol with structured message types and heartbeat management - Worker capability advertisement and dynamic routing decisions - Orphan recovery for disconnected workers with state reconciliation - Template-based job scheduling with capability constraints Add extensive test coverage: - Unit tests for capability routing logic and heartbeat mechanics - Unit tests for orphan recovery scenarios - E2E tests for capability routing across multiple workers - Hub capabilities integration tests - Scheduler fixture helpers for test setup Protocol improvements: - Define structured protocol messages for hub-worker communication - Add capability matching algorithm with scoring - Implement graceful worker disconnection handling
227 lines
6.8 KiB
Go
227 lines
6.8 KiB
Go
package scheduler
|
|
|
|
import (
|
|
"encoding/json"
|
|
"time"
|
|
)
|
|
|
|
type Message struct {
|
|
Type MessageType `json:"type"`
|
|
Payload json.RawMessage `json:"payload,omitempty"`
|
|
Error string `json:"error,omitempty"`
|
|
}
|
|
|
|
type MessageType string
|
|
|
|
const (
|
|
// Worker → Scheduler
|
|
MsgRegister MessageType = "register"
|
|
MsgHeartbeat MessageType = "heartbeat" // slots only, every 10s
|
|
MsgReadyForWork MessageType = "ready_for_work"
|
|
MsgJobAccepted MessageType = "job_accepted"
|
|
MsgJobResult MessageType = "job_result"
|
|
MsgServiceHealth MessageType = "service_health"
|
|
MsgMetricsRequest MessageType = "metrics_request" // WSS metrics request
|
|
|
|
// Scheduler → Worker
|
|
MsgJobAssign MessageType = "job_assign"
|
|
MsgNoWork MessageType = "no_work" // nothing available right now
|
|
MsgJobCancel MessageType = "job_cancel"
|
|
MsgPrewarmHint MessageType = "prewarm_hint"
|
|
MsgAck MessageType = "ack"
|
|
MsgMetricsResponse MessageType = "metrics_response" // WSS metrics response
|
|
)
|
|
|
|
// Heartbeat — liveness and slot status combined, no CPU/mem load
|
|
type HeartbeatPayload struct {
|
|
WorkerID string `json:"worker_id"`
|
|
Slots SlotStatus `json:"slots"`
|
|
Capability WorkerCapabilities `json:"capability"` // Dynamic capability updates
|
|
}
|
|
|
|
type ReadyPayload struct {
|
|
WorkerID string `json:"worker_id"`
|
|
Slots SlotStatus `json:"slots"`
|
|
Reason string `json:"reason"`
|
|
}
|
|
|
|
type JobResultPayload struct {
|
|
TaskID string `json:"task_id"`
|
|
State string `json:"state"`
|
|
ExitCode int `json:"exit_code"`
|
|
Error string `json:"error,omitempty"`
|
|
}
|
|
|
|
type PrewarmHintPayload struct {
|
|
TaskID string `json:"task_id"`
|
|
SnapshotID string `json:"snapshot_id"`
|
|
SnapshotSHA string `json:"snapshot_sha,omitempty"`
|
|
}
|
|
|
|
type WorkerRegistration struct {
|
|
ID string `json:"id"`
|
|
Capabilities WorkerCapabilities `json:"capabilities"`
|
|
ActiveTasks []ActiveTaskReport `json:"active_tasks"`
|
|
}
|
|
|
|
type ActiveTaskReport struct {
|
|
TaskID string `json:"task_id"`
|
|
State string `json:"state"`
|
|
StartedAt time.Time `json:"started_at,omitzero"`
|
|
}
|
|
|
|
type SlotStatus struct {
|
|
BatchTotal int `json:"batch_total"`
|
|
BatchInUse int `json:"batch_in_use"`
|
|
ServiceTotal int `json:"service_total"`
|
|
ServiceInUse int `json:"service_in_use"`
|
|
}
|
|
|
|
func (s SlotStatus) BatchAvailable() int { return s.BatchTotal - s.BatchInUse }
|
|
func (s SlotStatus) ServiceAvailable() int { return s.ServiceTotal - s.ServiceInUse }
|
|
|
|
type GPUBackend string
|
|
|
|
const (
|
|
BackendNVIDIA GPUBackend = "nvidia"
|
|
BackendMetal GPUBackend = "metal"
|
|
BackendVulkan GPUBackend = "vulkan"
|
|
BackendCPU GPUBackend = "cpu"
|
|
)
|
|
|
|
type WorkerCapabilities struct {
|
|
GPUBackend GPUBackend `json:"gpu_backend"`
|
|
GPUCount int `json:"gpu_count"`
|
|
GPUType string `json:"gpu_type"`
|
|
VRAMGB float64 `json:"vram_gb"`
|
|
CPUCount int `json:"cpu_count"`
|
|
MemoryGB float64 `json:"memory_gb"`
|
|
Hostname string `json:"hostname"`
|
|
GPUInfo GPUDetectionInfo `json:"gpu_info"`
|
|
}
|
|
|
|
type GPUDetectionInfo struct {
|
|
GPUType string `json:"gpu_type"`
|
|
Count int `json:"count"`
|
|
Devices []string `json:"devices,omitempty"`
|
|
Driver string `json:"driver,omitempty"`
|
|
MemTotal uint64 `json:"mem_total,omitempty"`
|
|
}
|
|
|
|
type JobTier string
|
|
|
|
const (
|
|
TierDataProcessing JobTier = "data_processing"
|
|
TierEvaluation JobTier = "evaluation"
|
|
TierTraining JobTier = "training"
|
|
TierFineTuning JobTier = "fine_tuning"
|
|
)
|
|
|
|
var TierGracePeriods = map[JobTier]time.Duration{
|
|
TierDataProcessing: 30 * time.Second,
|
|
TierEvaluation: 2 * time.Minute,
|
|
TierTraining: 10 * time.Minute,
|
|
TierFineTuning: 10 * time.Minute,
|
|
}
|
|
|
|
type JobSpec struct {
|
|
ID string `json:"id"`
|
|
Type JobType `json:"type"` // "batch" | "service"
|
|
JobTier JobTier `json:"job_tier,omitempty"` // default: TierDataProcessing
|
|
SlotPool string `json:"slot_pool"`
|
|
UserID string `json:"user_id,omitempty"` // NEW: for per-user quota tracking
|
|
|
|
GPUCount int `json:"gpu_count"`
|
|
GPUType string `json:"gpu_type,omitempty"`
|
|
GPUBackend string `json:"gpu_backend,omitempty"` // empty = any
|
|
MinVRAMGB float64 `json:"min_vram_gb,omitempty"`
|
|
MinCPUCores int `json:"min_cpu_cores,omitempty"`
|
|
NodeCount int `json:"node_count"`
|
|
|
|
// MaxRuntimeHours is the maximum wall-clock time for this job.
|
|
// 0 = default (24h), capped at 168h (7d) by the scheduler.
|
|
MaxRuntimeHours int `json:"max_runtime_hours,omitempty"`
|
|
|
|
Command []string `json:"command"`
|
|
Env map[string]string `json:"env"`
|
|
|
|
Prolog []string `json:"prolog,omitempty"`
|
|
Epilog []string `json:"epilog,omitempty"`
|
|
|
|
SnapshotID string `json:"snapshot_id,omitempty"`
|
|
SnapshotSHA string `json:"snapshot_sha,omitempty"`
|
|
HealthCheck *HealthCheck `json:"health_check,omitempty"`
|
|
Metadata map[string]string `json:"metadata,omitempty"`
|
|
}
|
|
|
|
type JobType string
|
|
|
|
const (
|
|
JobTypeBatch JobType = "batch"
|
|
JobTypeService JobType = "service"
|
|
)
|
|
|
|
type HealthCheck struct {
|
|
LivenessEndpoint string `json:"liveness"`
|
|
ReadinessEndpoint string `json:"readiness"`
|
|
IntervalSecs int `json:"interval_secs"`
|
|
}
|
|
|
|
type ServiceHealthPayload struct {
|
|
TaskID string `json:"task_id"`
|
|
Healthy bool `json:"healthy"`
|
|
Message string `json:"message,omitempty"`
|
|
}
|
|
|
|
// JobAssignPayload is sent from scheduler to worker when assigning a task.
|
|
type JobAssignPayload struct {
|
|
Spec JobSpec `json:"spec"`
|
|
RemainingTime time.Duration `json:"remaining_time"` // Wall-clock budget left
|
|
}
|
|
|
|
// CLI message types for worker visibility
|
|
const (
|
|
// CLI → Scheduler
|
|
MsgWorkerListRequest MessageType = "worker_list_request"
|
|
MsgWorkerShowRequest MessageType = "worker_show_request"
|
|
|
|
// Scheduler → CLI
|
|
MsgWorkerListResponse MessageType = "worker_list_response"
|
|
MsgWorkerShowResponse MessageType = "worker_show_response"
|
|
)
|
|
|
|
type WorkerListRequest struct {
|
|
Backend string `json:"backend,omitempty"` // filter by backend
|
|
}
|
|
|
|
type WorkerListResponse struct {
|
|
Workers []WorkerInfo `json:"workers"`
|
|
}
|
|
|
|
type WorkerInfo struct {
|
|
ID string `json:"id"`
|
|
Backend GPUBackend `json:"backend"`
|
|
GPUCount int `json:"gpu_count"`
|
|
VRAMGB float64 `json:"vram_gb"`
|
|
CPUCount int `json:"cpu_count"`
|
|
Status string `json:"status"` // ready, busy, offline
|
|
ActiveJobs int `json:"active_jobs"`
|
|
TotalSlots int `json:"total_slots"`
|
|
LastHeartbeat time.Time `json:"last_heartbeat"`
|
|
}
|
|
|
|
type WorkerShowRequest struct {
|
|
WorkerID string `json:"worker_id"`
|
|
}
|
|
|
|
type WorkerShowResponse struct {
|
|
Worker WorkerInfo `json:"worker"`
|
|
Jobs []JobSummary `json:"jobs"`
|
|
}
|
|
|
|
type JobSummary struct {
|
|
TaskID string `json:"task_id"`
|
|
JobName string `json:"job_name"`
|
|
Status string `json:"status"`
|
|
StartedAt time.Time `json:"started_at"`
|
|
}
|