// Package lifecycle provides task lifecycle management with explicit state transitions. package lifecycle import ( "fmt" "slices" "time" "github.com/jfraeys/fetch_ml/internal/audit" "github.com/jfraeys/fetch_ml/internal/domain" ) // TaskState represents the current state of a task in its lifecycle. // These states form a finite state machine with valid transitions defined below. type TaskState string const ( // StateQueued indicates the task is waiting to be picked up by a worker. StateQueued TaskState = "queued" // StatePreparing indicates the task is being prepared (workspace setup, data staging). StatePreparing TaskState = "preparing" // StateRunning indicates the task is currently executing in a container. StateRunning TaskState = "running" // StateCollecting indicates the task has finished execution and results are being collected. StateCollecting TaskState = "collecting" // StateCompleted indicates the task finished successfully. StateCompleted TaskState = "completed" // StateFailed indicates the task failed during execution. StateFailed TaskState = "failed" // StateCancelled indicates the task was cancelled by user or system. StateCancelled TaskState = "cancelled" // StateOrphaned indicates the worker was lost - task will be silently re-queued. StateOrphaned TaskState = "orphaned" // StateServing indicates a service job (Jupyter/vLLM) is up and ready. StateServing TaskState = "serving" // StateStopping indicates a service job is in graceful shutdown. StateStopping TaskState = "stopping" ) // ValidTransitions defines the allowed state transitions. // The key is the "from" state, the value is a list of valid "to" states. // This enforces that state transitions follow the expected lifecycle. var ValidTransitions = map[TaskState][]TaskState{ StateQueued: {StatePreparing, StateFailed, StateCancelled}, StatePreparing: {StateRunning, StateFailed, StateCancelled}, StateRunning: {StateCollecting, StateFailed, StateOrphaned, StateServing, StateCancelled}, StateCollecting: {StateCompleted, StateFailed, StateCancelled}, StateCompleted: {}, StateFailed: {}, StateCancelled: {}, StateOrphaned: {StateQueued}, StateServing: {StateStopping, StateFailed, StateOrphaned, StateCancelled}, StateStopping: {StateCompleted, StateFailed}, } // StateTransitionError is returned when an invalid state transition is attempted. type StateTransitionError struct { From TaskState To TaskState } func (e StateTransitionError) Error() string { return fmt.Sprintf("invalid state transition: %s -> %s", e.From, e.To) } // StateManager manages task state transitions with audit logging. type StateManager struct { auditor *audit.Logger enabled bool } // NewStateManager creates a new state manager with the given audit logger. func NewStateManager(auditor *audit.Logger) *StateManager { return &StateManager{ enabled: auditor != nil, auditor: auditor, } } // Transition attempts to transition a task from its current state to a new state. // It validates the transition, updates the task status, and logs the event. // Returns StateTransitionError if the transition is not valid. func (sm *StateManager) Transition(task *domain.Task, to TaskState) error { from := TaskState(task.Status) // Validate the transition if err := sm.validateTransition(from, to); err != nil { return err } // Audit the transition before updating if sm.enabled && sm.auditor != nil { sm.auditor.Log(audit.Event{ EventType: audit.EventJobStarted, Timestamp: time.Now(), Resource: task.ID, Action: "task_state_change", Success: true, Metadata: map[string]any{ "job_name": task.JobName, "old_state": string(from), "new_state": string(to), }, }) } // Update task state task.Status = string(to) return nil } // validateTransition checks if a transition from one state to another is valid. func (sm *StateManager) validateTransition(from, to TaskState) error { // Check if "from" state is valid allowed, ok := ValidTransitions[from] if !ok { return StateTransitionError{From: from, To: to} } // Check if "to" state is in the allowed list if slices.Contains(allowed, to) { return nil } return StateTransitionError{From: from, To: to} } // IsTerminalState returns true if the state is terminal (no further transitions allowed). func IsTerminalState(state TaskState) bool { return state == StateCompleted || state == StateFailed || state == StateCancelled } // CanTransition returns true if a transition from -> to is valid. func CanTransition(from, to TaskState) bool { allowed, ok := ValidTransitions[from] if !ok { return false } return slices.Contains(allowed, to) }