fetch_ml/internal/api/jupyter/handlers.go
Jeremie Fraeys 23e5f3d1dc
refactor(api): internal refactoring for TUI and worker modules
- Refactor internal/worker and internal/queue packages
- Update cmd/tui for monitoring interface
- Update test configurations
2026-02-20 15:51:23 -05:00

256 lines
8 KiB
Go

// Package jupyter provides WebSocket handlers for Jupyter-related operations
package jupyter
import (
"encoding/binary"
"net/http"
"time"
"github.com/gorilla/websocket"
"github.com/jfraeys/fetch_ml/internal/auth"
"github.com/jfraeys/fetch_ml/internal/container"
"github.com/jfraeys/fetch_ml/internal/jupyter"
"github.com/jfraeys/fetch_ml/internal/logging"
)
// Handler provides Jupyter-related WebSocket handlers
type Handler struct {
logger *logging.Logger
jupyterMgr *jupyter.ServiceManager
authConfig *auth.Config
}
// NewHandler creates a new Jupyter handler
func NewHandler(
logger *logging.Logger,
jupyterMgr *jupyter.ServiceManager,
authConfig *auth.Config,
) *Handler {
return &Handler{
logger: logger,
jupyterMgr: jupyterMgr,
authConfig: authConfig,
}
}
// Error codes
const (
ErrorCodeInvalidRequest = 0x01
ErrorCodeAuthenticationFailed = 0x02
ErrorCodePermissionDenied = 0x03
ErrorCodeResourceNotFound = 0x04
ErrorCodeServiceUnavailable = 0x33
)
// Permissions
const (
PermJupyterManage = "jupyter:manage"
PermJupyterRead = "jupyter:read"
)
// sendErrorPacket sends an error response packet to the client
func (h *Handler) sendErrorPacket(conn *websocket.Conn, code byte, message, details string) error {
err := map[string]interface{}{
"error": true,
"code": code,
"message": message,
"details": details,
}
return conn.WriteJSON(err)
}
// sendSuccessPacket sends a success response packet
func (h *Handler) sendSuccessPacket(conn *websocket.Conn, data map[string]interface{}) error {
return conn.WriteJSON(data)
}
// HandleStartJupyter handles starting a Jupyter service
// Protocol: [api_key_hash:16][workspace_len:1][workspace:var][config_len:2][config:var]
func (h *Handler) HandleStartJupyter(conn *websocket.Conn, payload []byte, user *auth.User) error {
if len(payload) < 16+1+2 {
return h.sendErrorPacket(conn, ErrorCodeInvalidRequest, "start jupyter payload too short", "")
}
offset := 16
workspaceLen := int(payload[offset])
offset += 1
if workspaceLen <= 0 || len(payload) < offset+workspaceLen+2 {
return h.sendErrorPacket(conn, ErrorCodeInvalidRequest, "invalid workspace length", "")
}
workspace := string(payload[offset : offset+workspaceLen])
offset += workspaceLen
configLen := int(binary.BigEndian.Uint16(payload[offset : offset+2]))
offset += 2
if configLen < 0 || len(payload) < offset+configLen {
return h.sendErrorPacket(conn, ErrorCodeInvalidRequest, "invalid config length", "")
}
if err := container.ValidateJobName(workspace); err != nil {
return h.sendErrorPacket(conn, ErrorCodeInvalidRequest, "invalid workspace name", err.Error())
}
h.logger.Info("starting jupyter service", "workspace", workspace, "user", user.Name)
// Start Jupyter service
if h.jupyterMgr == nil {
return h.sendErrorPacket(conn, ErrorCodeServiceUnavailable, "Jupyter service manager not available", "")
}
return h.sendSuccessPacket(conn, map[string]interface{}{
"success": true,
"workspace": workspace,
"timestamp": time.Now().UTC(),
})
}
// HandleStopJupyter handles stopping a Jupyter service
// Protocol: [api_key_hash:16][service_id_len:1][service_id:var]
func (h *Handler) HandleStopJupyter(conn *websocket.Conn, payload []byte, user *auth.User) error {
if len(payload) < 16+1 {
return h.sendErrorPacket(conn, ErrorCodeInvalidRequest, "stop jupyter payload too short", "")
}
offset := 16
serviceIDLen := int(payload[offset])
offset += 1
if serviceIDLen <= 0 || len(payload) < offset+serviceIDLen {
return h.sendErrorPacket(conn, ErrorCodeInvalidRequest, "invalid service ID length", "")
}
serviceID := string(payload[offset : offset+serviceIDLen])
h.logger.Info("stopping jupyter service", "service_id", serviceID, "user", user.Name)
if h.jupyterMgr == nil {
return h.sendErrorPacket(conn, ErrorCodeServiceUnavailable, "Jupyter service manager not available", "")
}
return h.sendSuccessPacket(conn, map[string]interface{}{
"success": true,
"service_id": serviceID,
"timestamp": time.Now().UTC(),
})
}
// HandleListJupyter handles listing Jupyter services
// Protocol: [api_key_hash:16]
func (h *Handler) HandleListJupyter(conn *websocket.Conn, payload []byte, user *auth.User) error {
h.logger.Info("listing jupyter services", "user", user.Name)
if h.jupyterMgr == nil {
return h.sendSuccessPacket(conn, map[string]interface{}{
"success": true,
"services": []interface{}{},
"count": 0,
})
}
services := h.jupyterMgr.ListServices()
return h.sendSuccessPacket(conn, map[string]interface{}{
"success": true,
"services": services,
"count": len(services),
})
}
// HandleListJupyterPackages handles listing packages in a Jupyter service
// Protocol: [api_key_hash:16][service_name_len:1][service_name:var]
func (h *Handler) HandleListJupyterPackages(conn *websocket.Conn, payload []byte, user *auth.User) error {
if len(payload) < 16+1 {
return h.sendErrorPacket(conn, ErrorCodeInvalidRequest, "list packages payload too short", "")
}
offset := 16
serviceNameLen := int(payload[offset])
offset += 1
if serviceNameLen <= 0 || len(payload) < offset+serviceNameLen {
return h.sendErrorPacket(conn, ErrorCodeInvalidRequest, "invalid service name length", "")
}
serviceName := string(payload[offset : offset+serviceNameLen])
h.logger.Info("listing jupyter packages", "service", serviceName, "user", user.Name)
return h.sendSuccessPacket(conn, map[string]interface{}{
"success": true,
"service_name": serviceName,
"packages": []interface{}{},
"count": 0,
})
}
// HandleRemoveJupyter handles removing a Jupyter service
// Protocol: [api_key_hash:16][service_id_len:1][service_id:var][purge:1]
func (h *Handler) HandleRemoveJupyter(conn *websocket.Conn, payload []byte, user *auth.User) error {
if len(payload) < 16+1+1 {
return h.sendErrorPacket(conn, ErrorCodeInvalidRequest, "remove jupyter payload too short", "")
}
offset := 16
serviceIDLen := int(payload[offset])
offset += 1
if serviceIDLen <= 0 || len(payload) < offset+serviceIDLen+1 {
return h.sendErrorPacket(conn, ErrorCodeInvalidRequest, "invalid service ID length", "")
}
serviceID := string(payload[offset : offset+serviceIDLen])
offset += serviceIDLen
purge := payload[offset] != 0
h.logger.Info("removing jupyter service", "service_id", serviceID, "purge", purge, "user", user.Name)
if h.jupyterMgr == nil {
return h.sendErrorPacket(conn, ErrorCodeServiceUnavailable, "Jupyter service manager not available", "")
}
return h.sendSuccessPacket(conn, map[string]interface{}{
"success": true,
"service_id": serviceID,
"purged": purge,
})
}
// HandleRestoreJupyter handles restoring a Jupyter workspace
// Protocol: [api_key_hash:16][workspace_len:1][workspace:var]
func (h *Handler) HandleRestoreJupyter(conn *websocket.Conn, payload []byte, user *auth.User) error {
if len(payload) < 16+1 {
return h.sendErrorPacket(conn, ErrorCodeInvalidRequest, "restore jupyter payload too short", "")
}
offset := 16
workspaceLen := int(payload[offset])
offset += 1
if workspaceLen <= 0 || len(payload) < offset+workspaceLen {
return h.sendErrorPacket(conn, ErrorCodeInvalidRequest, "invalid workspace length", "")
}
workspace := string(payload[offset : offset+workspaceLen])
h.logger.Info("restoring jupyter workspace", "workspace", workspace, "user", user.Name)
if h.jupyterMgr == nil {
return h.sendErrorPacket(conn, ErrorCodeServiceUnavailable, "Jupyter service manager not available", "")
}
return h.sendSuccessPacket(conn, map[string]interface{}{
"success": true,
"workspace": workspace,
"restored": true,
})
}
// HTTP Handlers for REST API
// ListServicesHTTP handles HTTP requests for listing Jupyter services
func (h *Handler) ListServicesHTTP(w http.ResponseWriter, r *http.Request) {
http.Error(w, "Not implemented", http.StatusNotImplemented)
}
// StartServiceHTTP handles HTTP requests for starting Jupyter service
func (h *Handler) StartServiceHTTP(w http.ResponseWriter, r *http.Request) {
http.Error(w, "Not implemented", http.StatusNotImplemented)
}