package queue import ( "encoding/json" "fmt" "time" "github.com/jfraeys/fetch_ml/internal/scheduler" ) // SchedulerConn defines the interface for scheduler connection type SchedulerConn interface { Send(msg scheduler.Message) } // SchedulerBackend implements queue.Backend by communicating with a scheduler over WebSocket type SchedulerBackend struct { conn SchedulerConn pendingTasks chan *Task prewarmHint *Task localSlots scheduler.SlotStatus } // NewSchedulerBackend creates a new scheduler backend func NewSchedulerBackend(conn SchedulerConn) *SchedulerBackend { return &SchedulerBackend{ conn: conn, pendingTasks: make(chan *Task, 1), } } // GetNextTaskWithLeaseBlocking implements queue.Backend func (sb *SchedulerBackend) GetNextTaskWithLeaseBlocking( workerID string, leaseDuration time.Duration, blockTimeout time.Duration, ) (*Task, error) { // Signal readiness with current slot status sb.conn.Send(scheduler.Message{ Type: scheduler.MsgReadyForWork, Payload: mustMarshal(scheduler.ReadyPayload{ WorkerID: workerID, Slots: sb.localSlots, Reason: "polling", }), }) // Wait for scheduler push or timeout select { case task := <-sb.pendingTasks: return task, nil case <-time.After(blockTimeout): return nil, nil // RunLoop retries — same behaviour as empty queue } } // OnJobAssign is called when the scheduler pushes a job assignment func (sb *SchedulerBackend) OnJobAssign(spec *scheduler.JobSpec) { task := &Task{ ID: spec.ID, JobName: "distributed-job", Priority: 1, Status: "assigned", } select { case sb.pendingTasks <- task: default: // Channel full, drop (shouldn't happen with buffer of 1) } } // UpdateTask implements queue.Backend - forwards state changes to scheduler func (sb *SchedulerBackend) UpdateTask(task *Task) error { // Notify scheduler of state change result := scheduler.JobResultPayload{ TaskID: task.ID, State: task.Status, } if task.Error != "" { result.Error = task.Error } sb.conn.Send(scheduler.Message{ Type: scheduler.MsgJobResult, Payload: mustMarshal(result), }) return nil } // PeekNextTask implements queue.Backend - returns last prewarm hint func (sb *SchedulerBackend) PeekNextTask() (*Task, error) { if sb.prewarmHint == nil { return nil, nil } return sb.prewarmHint, nil } // OnPrewarmHint stores a prewarm hint from the scheduler func (sb *SchedulerBackend) OnPrewarmHint(hint scheduler.PrewarmHintPayload) { sb.prewarmHint = &Task{ ID: hint.TaskID, Status: "prewarm_hint", Metadata: map[string]string{ "snapshot_id": hint.SnapshotID, "snapshot_sha": hint.SnapshotSHA, }, } } // UpdateSlots updates the local slot status for readiness signaling func (sb *SchedulerBackend) UpdateSlots(running, max int) { sb.localSlots = scheduler.SlotStatus{ BatchTotal: max, BatchInUse: running, } } // Required queue.Backend methods (stub implementations for now) func (sb *SchedulerBackend) AddTask(task *Task) error { return fmt.Errorf("AddTask not supported in distributed mode - submit to scheduler instead") } func (sb *SchedulerBackend) GetNextTask() (*Task, error) { return nil, fmt.Errorf("GetNextTask not supported - use GetNextTaskWithLeaseBlocking") } func (sb *SchedulerBackend) GetNextTaskWithLease(workerID string, leaseDuration time.Duration) (*Task, error) { return sb.GetNextTaskWithLeaseBlocking(workerID, leaseDuration, 0) } func (sb *SchedulerBackend) RenewLease(taskID string, workerID string, leaseDuration time.Duration) error { // Distributed mode: scheduler manages leases return nil } func (sb *SchedulerBackend) ReleaseLease(taskID string, workerID string) error { // Distributed mode: scheduler manages leases return nil } func (sb *SchedulerBackend) RetryTask(task *Task) error { sb.conn.Send(scheduler.Message{ Type: scheduler.MsgJobResult, Payload: mustMarshal(scheduler.JobResultPayload{TaskID: task.ID, State: "retry"}), }) return nil } func (sb *SchedulerBackend) MoveToDeadLetterQueue(task *Task, reason string) error { return nil // Scheduler handles this } func (sb *SchedulerBackend) GetTask(taskID string) (*Task, error) { return nil, fmt.Errorf("GetTask not supported in distributed mode") } func (sb *SchedulerBackend) GetAllTasks() ([]*Task, error) { return nil, fmt.Errorf("GetAllTasks not supported in distributed mode") } func (sb *SchedulerBackend) GetTaskByName(jobName string) (*Task, error) { return nil, fmt.Errorf("GetTaskByName not supported in distributed mode") } func (sb *SchedulerBackend) CancelTask(taskID string) error { return fmt.Errorf("CancelTask not supported - use scheduler API") } func (sb *SchedulerBackend) UpdateTaskWithMetrics(task *Task, action string) error { return sb.UpdateTask(task) } func (sb *SchedulerBackend) RecordMetric(jobName, metric string, value float64) error { return nil // Metrics handled by scheduler } func (sb *SchedulerBackend) Heartbeat(workerID string) error { return nil // Heartbeat handled by SchedulerConn } func (sb *SchedulerBackend) QueueDepth() (int64, error) { return 0, nil // Queue depth managed by scheduler } func (sb *SchedulerBackend) SetWorkerPrewarmState(state PrewarmState) error { return nil } func (sb *SchedulerBackend) ClearWorkerPrewarmState(workerID string) error { return nil } func (sb *SchedulerBackend) GetWorkerPrewarmState(workerID string) (*PrewarmState, error) { return nil, nil } func (sb *SchedulerBackend) GetAllWorkerPrewarmStates() ([]PrewarmState, error) { return nil, nil } func (sb *SchedulerBackend) SignalPrewarmGC() error { return nil } func (sb *SchedulerBackend) PrewarmGCRequestValue() (string, error) { return "", nil } func (sb *SchedulerBackend) Close() error { return nil } // Conn returns the underlying scheduler connection for heartbeat func (sb *SchedulerBackend) Conn() *scheduler.SchedulerConn { if conn, ok := sb.conn.(*scheduler.SchedulerConn); ok { return conn } return nil } func mustMarshal(v any) []byte { b, _ := json.Marshal(v) return b } // Compile-time check - ensure SchedulerBackend implements the interface var _ Backend = (*SchedulerBackend)(nil)