Extend worker capabilities with new execution plugins and security features: - Jupyter plugin for notebook-based ML experiments - vLLM plugin for LLM inference workloads - Cross-platform process isolation (Unix/Windows) - Network policy enforcement with platform-specific implementations - Service manager integration for lifecycle management - Scheduler backend integration for queue coordination Update lifecycle management: - Enhanced runloop with state transitions - Service manager integration for plugin coordination - Improved state persistence and recovery Add test coverage: - Unit tests for Jupyter and vLLM plugins - Updated worker execution tests
142 lines
4.6 KiB
Go
142 lines
4.6 KiB
Go
// Package lifecycle provides task lifecycle management with explicit state transitions.
|
|
package lifecycle
|
|
|
|
import (
|
|
"fmt"
|
|
"slices"
|
|
"time"
|
|
|
|
"github.com/jfraeys/fetch_ml/internal/audit"
|
|
"github.com/jfraeys/fetch_ml/internal/domain"
|
|
)
|
|
|
|
// TaskState represents the current state of a task in its lifecycle.
|
|
// These states form a finite state machine with valid transitions defined below.
|
|
type TaskState string
|
|
|
|
const (
|
|
// StateQueued indicates the task is waiting to be picked up by a worker.
|
|
StateQueued TaskState = "queued"
|
|
// StatePreparing indicates the task is being prepared (workspace setup, data staging).
|
|
StatePreparing TaskState = "preparing"
|
|
// StateRunning indicates the task is currently executing in a container.
|
|
StateRunning TaskState = "running"
|
|
// StateCollecting indicates the task has finished execution and results are being collected.
|
|
StateCollecting TaskState = "collecting"
|
|
// StateCompleted indicates the task finished successfully.
|
|
StateCompleted TaskState = "completed"
|
|
// StateFailed indicates the task failed during execution.
|
|
StateFailed TaskState = "failed"
|
|
// StateCancelled indicates the task was cancelled by user or system.
|
|
StateCancelled TaskState = "cancelled"
|
|
// StateOrphaned indicates the worker was lost - task will be silently re-queued.
|
|
StateOrphaned TaskState = "orphaned"
|
|
// StateServing indicates a service job (Jupyter/vLLM) is up and ready.
|
|
StateServing TaskState = "serving"
|
|
// StateStopping indicates a service job is in graceful shutdown.
|
|
StateStopping TaskState = "stopping"
|
|
)
|
|
|
|
// ValidTransitions defines the allowed state transitions.
|
|
// The key is the "from" state, the value is a list of valid "to" states.
|
|
// This enforces that state transitions follow the expected lifecycle.
|
|
var ValidTransitions = map[TaskState][]TaskState{
|
|
StateQueued: {StatePreparing, StateFailed, StateCancelled},
|
|
StatePreparing: {StateRunning, StateFailed, StateCancelled},
|
|
StateRunning: {StateCollecting, StateFailed, StateOrphaned, StateServing, StateCancelled},
|
|
StateCollecting: {StateCompleted, StateFailed, StateCancelled},
|
|
StateCompleted: {},
|
|
StateFailed: {},
|
|
StateCancelled: {},
|
|
StateOrphaned: {StateQueued},
|
|
StateServing: {StateStopping, StateFailed, StateOrphaned, StateCancelled},
|
|
StateStopping: {StateCompleted, StateFailed},
|
|
}
|
|
|
|
// StateTransitionError is returned when an invalid state transition is attempted.
|
|
type StateTransitionError struct {
|
|
From TaskState
|
|
To TaskState
|
|
}
|
|
|
|
func (e StateTransitionError) Error() string {
|
|
return fmt.Sprintf("invalid state transition: %s -> %s", e.From, e.To)
|
|
}
|
|
|
|
// StateManager manages task state transitions with audit logging.
|
|
type StateManager struct {
|
|
auditor *audit.Logger
|
|
enabled bool
|
|
}
|
|
|
|
// NewStateManager creates a new state manager with the given audit logger.
|
|
func NewStateManager(auditor *audit.Logger) *StateManager {
|
|
return &StateManager{
|
|
enabled: auditor != nil,
|
|
auditor: auditor,
|
|
}
|
|
}
|
|
|
|
// Transition attempts to transition a task from its current state to a new state.
|
|
// It validates the transition, updates the task status, and logs the event.
|
|
// Returns StateTransitionError if the transition is not valid.
|
|
func (sm *StateManager) Transition(task *domain.Task, to TaskState) error {
|
|
from := TaskState(task.Status)
|
|
|
|
// Validate the transition
|
|
if err := sm.validateTransition(from, to); err != nil {
|
|
return err
|
|
}
|
|
|
|
// Audit the transition before updating
|
|
if sm.enabled && sm.auditor != nil {
|
|
sm.auditor.Log(audit.Event{
|
|
EventType: audit.EventJobStarted,
|
|
Timestamp: time.Now(),
|
|
Resource: task.ID,
|
|
Action: "task_state_change",
|
|
Success: true,
|
|
Metadata: map[string]interface{}{
|
|
"job_name": task.JobName,
|
|
"old_state": string(from),
|
|
"new_state": string(to),
|
|
},
|
|
})
|
|
}
|
|
|
|
// Update task state
|
|
task.Status = string(to)
|
|
|
|
return nil
|
|
}
|
|
|
|
// validateTransition checks if a transition from one state to another is valid.
|
|
func (sm *StateManager) validateTransition(from, to TaskState) error {
|
|
// Check if "from" state is valid
|
|
allowed, ok := ValidTransitions[from]
|
|
if !ok {
|
|
return StateTransitionError{From: from, To: to}
|
|
}
|
|
|
|
// Check if "to" state is in the allowed list
|
|
if slices.Contains(allowed, to) {
|
|
return nil
|
|
}
|
|
|
|
return StateTransitionError{From: from, To: to}
|
|
}
|
|
|
|
// IsTerminalState returns true if the state is terminal (no further transitions allowed).
|
|
func IsTerminalState(state TaskState) bool {
|
|
return state == StateCompleted || state == StateFailed || state == StateCancelled
|
|
}
|
|
|
|
// CanTransition returns true if a transition from -> to is valid.
|
|
func CanTransition(from, to TaskState) bool {
|
|
allowed, ok := ValidTransitions[from]
|
|
if !ok {
|
|
return false
|
|
}
|
|
|
|
return slices.Contains(allowed, to)
|
|
}
|