New error handling: - Add internal/api/errors/errors.go with structured API error types - Standardize error codes across all API endpoints - Add user-facing error messages vs internal error details separation Handler improvements: - jupyter/handlers.go: better workspace lifecycle and error handling - plugins/handlers.go: plugin management with validation - groups/handlers.go: group CRUD with capability metadata - jobs/handlers.go: job submission and monitoring improvements - datasets/handlers.go: dataset upload/download with progress - validate/handlers.go: manifest validation with detailed errors - audit/handlers.go: audit log querying with filters Server configuration: - server_config.go: refined config loading with validation - server_gen.go: improved code generation for OpenAPI specs
185 lines
5.4 KiB
Go
185 lines
5.4 KiB
Go
// Package datasets provides WebSocket handlers for dataset-related operations
|
|
package datasets
|
|
|
|
import (
|
|
"context"
|
|
"encoding/binary"
|
|
"encoding/json"
|
|
"time"
|
|
|
|
"github.com/gorilla/websocket"
|
|
"github.com/jfraeys/fetch_ml/internal/api/errors"
|
|
"github.com/jfraeys/fetch_ml/internal/auth"
|
|
"github.com/jfraeys/fetch_ml/internal/logging"
|
|
"github.com/jfraeys/fetch_ml/internal/storage"
|
|
)
|
|
|
|
// Handler provides dataset-related WebSocket handlers
|
|
type Handler struct {
|
|
logger *logging.Logger
|
|
db *storage.DB
|
|
dataDir string
|
|
}
|
|
|
|
// NewHandler creates a new datasets handler
|
|
func NewHandler(
|
|
logger *logging.Logger,
|
|
db *storage.DB,
|
|
dataDir string,
|
|
) *Handler {
|
|
return &Handler{
|
|
logger: logger,
|
|
db: db,
|
|
dataDir: dataDir,
|
|
}
|
|
}
|
|
|
|
// sendErrorPacket sends an error response packet to the client
|
|
func sendErrorPacket(conn *websocket.Conn, code string, message string) error {
|
|
return errors.SendErrorPacket(conn, code, message, "")
|
|
}
|
|
|
|
// sendSuccessPacket sends a success response packet
|
|
func (h *Handler) sendSuccessPacket(conn *websocket.Conn, data map[string]any) error {
|
|
return errors.SendSuccessPacket(conn, data)
|
|
}
|
|
|
|
// sendDataPacket sends a data response packet
|
|
func (h *Handler) sendDataPacket(conn *websocket.Conn, dataType string, payload []byte) error {
|
|
return conn.WriteJSON(map[string]any{
|
|
"type": dataType,
|
|
"payload": string(payload),
|
|
})
|
|
}
|
|
|
|
// HandleDatasetList handles listing datasets
|
|
// Protocol: [api_key_hash:16]
|
|
func (h *Handler) HandleDatasetList(conn *websocket.Conn, payload []byte, user *auth.User) error {
|
|
h.logger.Info("listing datasets", "user", user.Name)
|
|
|
|
var datasets []*storage.Dataset
|
|
if h.db != nil {
|
|
var err error
|
|
datasets, err = h.db.ListDatasets(context.Background(), 100)
|
|
if err != nil {
|
|
h.logger.Warn("failed to list datasets from db", "error", err)
|
|
datasets = []*storage.Dataset{}
|
|
}
|
|
}
|
|
|
|
data, _ := json.Marshal(datasets)
|
|
return h.sendDataPacket(conn, "datasets", data)
|
|
}
|
|
|
|
// HandleDatasetRegister handles registering a new dataset
|
|
// Protocol: [api_key_hash:16][name_len:1][name:var][path_len:2][path:var]
|
|
func (h *Handler) HandleDatasetRegister(
|
|
conn *websocket.Conn, payload []byte, user *auth.User,
|
|
) error {
|
|
if len(payload) < 16+1+2 {
|
|
return sendErrorPacket(conn, errors.CodeInvalidRequest, "register dataset payload too short")
|
|
}
|
|
|
|
offset := 16
|
|
|
|
nameLen := int(payload[offset])
|
|
offset++
|
|
if nameLen <= 0 || len(payload) < offset+nameLen+2 {
|
|
return sendErrorPacket(conn, errors.CodeInvalidRequest, "invalid name length")
|
|
}
|
|
name := string(payload[offset : offset+nameLen])
|
|
offset += nameLen
|
|
|
|
pathLen := int(binary.BigEndian.Uint16(payload[offset : offset+2]))
|
|
offset += 2
|
|
if pathLen < 0 || len(payload) < offset+pathLen {
|
|
return sendErrorPacket(conn, errors.CodeInvalidRequest, "invalid path length")
|
|
}
|
|
path := string(payload[offset : offset+pathLen])
|
|
|
|
h.logger.Info("registering dataset", "name", name, "path", path, "user", user.Name)
|
|
|
|
// Save to database if available
|
|
if h.db != nil {
|
|
ds := &storage.Dataset{
|
|
Name: name,
|
|
URL: path,
|
|
}
|
|
if err := h.db.UpsertDataset(context.Background(), ds); err != nil {
|
|
h.logger.Warn("failed to save dataset to db", "error", err, "name", name)
|
|
}
|
|
}
|
|
|
|
return h.sendSuccessPacket(conn, map[string]any{
|
|
"name": name,
|
|
"path": path,
|
|
"user": user.Name,
|
|
"time": time.Now().UTC(),
|
|
})
|
|
}
|
|
|
|
// HandleDatasetInfo handles getting dataset info
|
|
// Protocol: [api_key_hash:16][dataset_id_len:1][dataset_id:var]
|
|
func (h *Handler) HandleDatasetInfo(conn *websocket.Conn, payload []byte, user *auth.User) error {
|
|
if len(payload) < 16+1 {
|
|
return sendErrorPacket(conn, errors.CodeInvalidRequest, "dataset info payload too short")
|
|
}
|
|
|
|
offset := 16
|
|
|
|
datasetIDLen := int(payload[offset])
|
|
offset++
|
|
if datasetIDLen <= 0 || len(payload) < offset+datasetIDLen {
|
|
return sendErrorPacket(conn, errors.CodeInvalidRequest, "invalid dataset ID length")
|
|
}
|
|
datasetID := string(payload[offset : offset+datasetIDLen])
|
|
|
|
h.logger.Info("getting dataset info", "dataset_id", datasetID, "user", user.Name)
|
|
|
|
// Query database if available
|
|
if h.db != nil {
|
|
ds, err := h.db.GetDataset(context.Background(), datasetID)
|
|
if err == nil && ds != nil {
|
|
data, _ := json.Marshal(ds)
|
|
return h.sendDataPacket(conn, "dataset_info", data)
|
|
}
|
|
if err != nil {
|
|
h.logger.Warn("failed to get dataset from db", "error", err, "name", datasetID)
|
|
}
|
|
}
|
|
|
|
return h.sendDataPacket(conn, "dataset_info", []byte("{}"))
|
|
}
|
|
|
|
// HandleDatasetSearch handles searching datasets
|
|
// Protocol: [api_key_hash:16][query_len:2][query:var]
|
|
func (h *Handler) HandleDatasetSearch(conn *websocket.Conn, payload []byte, user *auth.User) error {
|
|
if len(payload) < 16+2 {
|
|
return sendErrorPacket(conn, errors.CodeInvalidRequest, "dataset search payload too short")
|
|
}
|
|
|
|
offset := 16
|
|
|
|
queryLen := int(binary.BigEndian.Uint16(payload[offset : offset+2]))
|
|
offset += 2
|
|
if queryLen < 0 || len(payload) < offset+queryLen {
|
|
return sendErrorPacket(conn, errors.CodeInvalidRequest, "invalid query length")
|
|
}
|
|
query := string(payload[offset : offset+queryLen])
|
|
|
|
h.logger.Info("searching datasets", "query", query, "user", user.Name)
|
|
|
|
// Search database if available
|
|
var datasets []*storage.Dataset
|
|
if h.db != nil {
|
|
var err error
|
|
datasets, err = h.db.SearchDatasets(context.Background(), query, 100)
|
|
if err != nil {
|
|
h.logger.Warn("failed to search datasets in db", "error", err, "query", query)
|
|
datasets = []*storage.Dataset{}
|
|
}
|
|
}
|
|
|
|
data, _ := json.Marshal(datasets)
|
|
return h.sendDataPacket(conn, "datasets", data)
|
|
}
|