// Package plugins provides framework-specific extensions to the worker. // This implements the prolog/epilog model where plugins hook into task lifecycle events. package plugins import ( "context" "encoding/json" "fmt" "github.com/jfraeys/fetch_ml/internal/jupyter" "github.com/jfraeys/fetch_ml/internal/queue" ) // JupyterManager interface for jupyter service management type JupyterManager interface { StartService(ctx context.Context, req *jupyter.StartRequest) (*jupyter.JupyterService, error) StopService(ctx context.Context, serviceID string) error RemoveService(ctx context.Context, serviceID string, purge bool) error RestoreWorkspace(ctx context.Context, name string) (string, error) ListServices() []*jupyter.JupyterService ListInstalledPackages(ctx context.Context, serviceName string) ([]jupyter.InstalledPackage, error) } // TaskRunner executes framework-specific tasks // This is the minimal interface needed by plugins to execute tasks type TaskRunner interface { GetJupyterManager() JupyterManager } // RunJupyterTask runs a Jupyter-related task. // It handles start, stop, remove, restore, and list_packages actions. // This is a plugin function that extends the core worker with Jupyter support. func RunJupyterTask(ctx context.Context, runner TaskRunner, task *queue.Task) ([]byte, error) { jm := runner.GetJupyterManager() if jm == nil { return nil, fmt.Errorf("jupyter manager not configured") } action := task.Metadata["jupyter_action"] if action == "" { action = "start" // Default action } switch action { case "start": name := task.Metadata["jupyter_name"] if name == "" { name = task.Metadata["jupyter_workspace"] } if name == "" { // Extract from jobName if format is "jupyter-" if len(task.JobName) > 8 && task.JobName[:8] == "jupyter-" { name = task.JobName[8:] } } if name == "" { return nil, fmt.Errorf("missing jupyter_name or jupyter_workspace in task metadata") } req := &jupyter.StartRequest{Name: name} service, err := jm.StartService(ctx, req) if err != nil { return nil, fmt.Errorf("failed to start jupyter service: %w", err) } output := map[string]any{ "type": "start", "service": service, } return json.Marshal(output) case "stop": serviceID := task.Metadata["jupyter_service_id"] if serviceID == "" { return nil, fmt.Errorf("missing jupyter_service_id in task metadata") } if err := jm.StopService(ctx, serviceID); err != nil { return nil, fmt.Errorf("failed to stop jupyter service: %w", err) } return json.Marshal(map[string]string{"type": "stop", "status": "stopped"}) case "remove": serviceID := task.Metadata["jupyter_service_id"] if serviceID == "" { return nil, fmt.Errorf("missing jupyter_service_id in task metadata") } purge := task.Metadata["jupyter_purge"] == "true" if err := jm.RemoveService(ctx, serviceID, purge); err != nil { return nil, fmt.Errorf("failed to remove jupyter service: %w", err) } return json.Marshal(map[string]string{"type": "remove", "status": "removed"}) case "restore": name := task.Metadata["jupyter_name"] if name == "" { name = task.Metadata["jupyter_workspace"] } if name == "" { return nil, fmt.Errorf("missing jupyter_name or jupyter_workspace in task metadata") } serviceID, err := jm.RestoreWorkspace(ctx, name) if err != nil { return nil, fmt.Errorf("failed to restore jupyter workspace: %w", err) } return json.Marshal(map[string]string{"type": "restore", "service_id": serviceID}) case "list_packages": serviceName := task.Metadata["jupyter_name"] if serviceName == "" { // Extract from jobName if format is "jupyter-packages-" if len(task.JobName) > 16 && task.JobName[:16] == "jupyter-packages-" { serviceName = task.JobName[16:] } } if serviceName == "" { return nil, fmt.Errorf("missing jupyter_name in task metadata") } packages, err := jm.ListInstalledPackages(ctx, serviceName) if err != nil { return nil, fmt.Errorf("failed to list installed packages: %w", err) } output := map[string]any{ "type": "list_packages", "packages": packages, } return json.Marshal(output) default: return nil, fmt.Errorf("unknown jupyter action: %s", action) } }