fetch_ml/internal/scheduler/state.go
Jeremie Fraeys 43e6446587
feat(scheduler): implement multi-tenant job scheduler with gang scheduling
Add new scheduler component for distributed ML workload orchestration:
- Hub-based coordination for multi-worker clusters
- Pacing controller for rate limiting job submissions
- Priority queue with preemption support
- Port allocator for dynamic service discovery
- Protocol handlers for worker-scheduler communication
- Service manager with OS-specific implementations
- Connection management and state persistence
- Template system for service deployment

Includes comprehensive test suite:
- Unit tests for all core components
- Integration tests for distributed scenarios
- Benchmark tests for performance validation
- Mock fixtures for isolated testing

Refs: scheduler-architecture.md
2026-02-26 12:03:23 -05:00

156 lines
3.7 KiB
Go

package scheduler
import (
"bufio"
"encoding/json"
"fmt"
"os"
"path/filepath"
"sync"
"time"
)
// StateEvent represents a state change event for persistence
type StateEvent struct {
Type StateEventType `json:"type"`
Timestamp time.Time `json:"ts"`
TaskID string `json:"task_id"`
WorkerID string `json:"worker_id,omitempty"`
Payload json.RawMessage `json:"payload,omitempty"`
}
type StateEventType string
const (
EventJobEnqueued StateEventType = "job_enqueued"
EventJobAssigned StateEventType = "job_assigned"
EventJobAccepted StateEventType = "job_accepted"
EventJobCompleted StateEventType = "job_completed"
EventJobFailed StateEventType = "job_failed"
EventJobRequeued StateEventType = "job_requeued"
EventJobCancelled StateEventType = "job_cancelled"
)
// StateStore provides append-only persistence for scheduler state
type StateStore struct {
path string
mu sync.Mutex
file *os.File
}
// NewStateStore creates a new state store at the given path
func NewStateStore(path string) (*StateStore, error) {
if err := os.MkdirAll(filepath.Dir(path), 0755); err != nil {
return nil, fmt.Errorf("create state directory: %w", err)
}
file, err := os.OpenFile(path, os.O_APPEND|os.O_CREATE|os.O_WRONLY, 0644)
if err != nil {
return nil, fmt.Errorf("open state file: %w", err)
}
return &StateStore{
path: path,
file: file,
}, nil
}
// Append writes a state event to the log
func (s *StateStore) Append(event StateEvent) error {
s.mu.Lock()
defer s.mu.Unlock()
if event.Timestamp.IsZero() {
event.Timestamp = time.Now()
}
data, err := json.Marshal(event)
if err != nil {
return fmt.Errorf("marshal event: %w", err)
}
if _, err := s.file.Write(data); err != nil {
return fmt.Errorf("write event: %w", err)
}
if _, err := s.file.WriteString("\n"); err != nil {
return fmt.Errorf("write newline: %w", err)
}
return nil
}
// Replay reads all events from the state log
func (s *StateStore) Replay() ([]StateEvent, error) {
s.mu.Lock()
defer s.mu.Unlock()
// Close and reopen to ensure we read from the beginning
if err := s.file.Close(); err != nil {
return nil, fmt.Errorf("close state file: %w", err)
}
file, err := os.Open(s.path)
if err != nil {
if os.IsNotExist(err) {
// Recreate the file for appending
s.file, _ = os.OpenFile(s.path, os.O_APPEND|os.O_CREATE|os.O_WRONLY, 0644)
return nil, nil
}
return nil, fmt.Errorf("open state file for replay: %w", err)
}
defer file.Close()
var events []StateEvent
scanner := bufio.NewScanner(file)
for scanner.Scan() {
var event StateEvent
if err := json.Unmarshal(scanner.Bytes(), &event); err != nil {
// Skip malformed lines but log them
continue
}
events = append(events, event)
}
if err := scanner.Err(); err != nil {
return nil, fmt.Errorf("scan state file: %w", err)
}
// Reopen for appending
s.file, err = os.OpenFile(s.path, os.O_APPEND|os.O_CREATE|os.O_WRONLY, 0644)
if err != nil {
return nil, fmt.Errorf("reopen state file: %w", err)
}
return events, nil
}
// Close closes the state store
func (s *StateStore) Close() error {
s.mu.Lock()
defer s.mu.Unlock()
return s.file.Close()
}
// Rotate rotates the state file (for backup/truncation)
func (s *StateStore) Rotate() (string, error) {
s.mu.Lock()
defer s.mu.Unlock()
backupPath := s.path + "." + time.Now().Format("20060102_150405") + ".bak"
if err := s.file.Close(); err != nil {
return "", fmt.Errorf("close state file: %w", err)
}
if err := os.Rename(s.path, backupPath); err != nil {
return "", fmt.Errorf("rotate state file: %w", err)
}
file, err := os.OpenFile(s.path, os.O_APPEND|os.O_CREATE|os.O_WRONLY, 0644)
if err != nil {
return "", fmt.Errorf("create new state file: %w", err)
}
s.file = file
return backupPath, nil
}