package scheduler import ( "bufio" "encoding/json" "fmt" "os" "path/filepath" "sync" "time" ) // StateEvent represents a state change event for persistence type StateEvent struct { Type StateEventType `json:"type"` Timestamp time.Time `json:"ts"` TaskID string `json:"task_id"` WorkerID string `json:"worker_id,omitempty"` Payload json.RawMessage `json:"payload,omitempty"` } type StateEventType string const ( EventJobEnqueued StateEventType = "job_enqueued" EventJobAssigned StateEventType = "job_assigned" EventJobAccepted StateEventType = "job_accepted" EventJobCompleted StateEventType = "job_completed" EventJobFailed StateEventType = "job_failed" EventJobRequeued StateEventType = "job_requeued" EventJobCancelled StateEventType = "job_cancelled" ) // StateStore provides append-only persistence for scheduler state type StateStore struct { path string mu sync.Mutex file *os.File } // NewStateStore creates a new state store at the given path func NewStateStore(path string) (*StateStore, error) { if err := os.MkdirAll(filepath.Dir(path), 0755); err != nil { return nil, fmt.Errorf("create state directory: %w", err) } file, err := os.OpenFile(path, os.O_APPEND|os.O_CREATE|os.O_WRONLY, 0644) if err != nil { return nil, fmt.Errorf("open state file: %w", err) } return &StateStore{ path: path, file: file, }, nil } // Append writes a state event to the log func (s *StateStore) Append(event StateEvent) error { s.mu.Lock() defer s.mu.Unlock() if event.Timestamp.IsZero() { event.Timestamp = time.Now() } data, err := json.Marshal(event) if err != nil { return fmt.Errorf("marshal event: %w", err) } if _, err := s.file.Write(data); err != nil { return fmt.Errorf("write event: %w", err) } if _, err := s.file.WriteString("\n"); err != nil { return fmt.Errorf("write newline: %w", err) } return nil } // Replay reads all events from the state log func (s *StateStore) Replay() ([]StateEvent, error) { s.mu.Lock() defer s.mu.Unlock() // Close and reopen to ensure we read from the beginning if err := s.file.Close(); err != nil { return nil, fmt.Errorf("close state file: %w", err) } file, err := os.Open(s.path) if err != nil { if os.IsNotExist(err) { // Recreate the file for appending s.file, _ = os.OpenFile(s.path, os.O_APPEND|os.O_CREATE|os.O_WRONLY, 0644) return nil, nil } return nil, fmt.Errorf("open state file for replay: %w", err) } defer file.Close() var events []StateEvent scanner := bufio.NewScanner(file) for scanner.Scan() { var event StateEvent if err := json.Unmarshal(scanner.Bytes(), &event); err != nil { // Skip malformed lines but log them continue } events = append(events, event) } if err := scanner.Err(); err != nil { return nil, fmt.Errorf("scan state file: %w", err) } // Reopen for appending s.file, err = os.OpenFile(s.path, os.O_APPEND|os.O_CREATE|os.O_WRONLY, 0644) if err != nil { return nil, fmt.Errorf("reopen state file: %w", err) } return events, nil } // Close closes the state store func (s *StateStore) Close() error { s.mu.Lock() defer s.mu.Unlock() return s.file.Close() } // Rotate rotates the state file (for backup/truncation) func (s *StateStore) Rotate() (string, error) { s.mu.Lock() defer s.mu.Unlock() backupPath := s.path + "." + time.Now().Format("20060102_150405") + ".bak" if err := s.file.Close(); err != nil { return "", fmt.Errorf("close state file: %w", err) } if err := os.Rename(s.path, backupPath); err != nil { return "", fmt.Errorf("rotate state file: %w", err) } file, err := os.OpenFile(s.path, os.O_APPEND|os.O_CREATE|os.O_WRONLY, 0644) if err != nil { return "", fmt.Errorf("create new state file: %w", err) } s.file = file return backupPath, nil }