diff --git a/internal/api/datasets/handlers.go b/internal/api/datasets/handlers.go index 5355645..f5a83b0 100644 --- a/internal/api/datasets/handlers.go +++ b/internal/api/datasets/handlers.go @@ -2,7 +2,9 @@ package datasets import ( + "context" "encoding/binary" + "encoding/json" "time" "github.com/gorilla/websocket" @@ -65,12 +67,21 @@ func (h *Handler) sendDataPacket(conn *websocket.Conn, dataType string, payload // HandleDatasetList handles listing datasets // Protocol: [api_key_hash:16] -func (h *Handler) HandleDatasetList(conn *websocket.Conn, _payload []byte, _user *auth.User) error { - h.logger.Info("listing datasets") +func (h *Handler) HandleDatasetList(conn *websocket.Conn, payload []byte, user *auth.User) error { + h.logger.Info("listing datasets", "user", user.Name) - // For now, return empty list - // In full implementation, query db for datasets - return h.sendDataPacket(conn, "datasets", []byte("[]")) + var datasets []*storage.Dataset + if h.db != nil { + var err error + datasets, err = h.db.ListDatasets(context.Background(), 100) + if err != nil { + h.logger.Warn("failed to list datasets from db", "error", err) + datasets = []*storage.Dataset{} + } + } + + data, _ := json.Marshal(datasets) + return h.sendDataPacket(conn, "datasets", data) } // HandleDatasetRegister handles registering a new dataset @@ -99,6 +110,17 @@ func (h *Handler) HandleDatasetRegister(conn *websocket.Conn, payload []byte, us h.logger.Info("registering dataset", "name", name, "path", path, "user", user.Name) + // Save to database if available + if h.db != nil { + ds := &storage.Dataset{ + Name: name, + URL: path, + } + if err := h.db.UpsertDataset(context.Background(), ds); err != nil { + h.logger.Warn("failed to save dataset to db", "error", err, "name", name) + } + } + return h.sendSuccessPacket(conn, map[string]interface{}{ "success": true, "name": name, @@ -126,8 +148,18 @@ func (h *Handler) HandleDatasetInfo(conn *websocket.Conn, payload []byte, user * h.logger.Info("getting dataset info", "dataset_id", datasetID, "user", user.Name) - // For now, return empty info - // In full implementation, query db for dataset + // Query database if available + if h.db != nil { + ds, err := h.db.GetDataset(context.Background(), datasetID) + if err == nil && ds != nil { + data, _ := json.Marshal(ds) + return h.sendDataPacket(conn, "dataset_info", data) + } + if err != nil { + h.logger.Warn("failed to get dataset from db", "error", err, "name", datasetID) + } + } + return h.sendDataPacket(conn, "dataset_info", []byte("{}")) } @@ -149,7 +181,17 @@ func (h *Handler) HandleDatasetSearch(conn *websocket.Conn, payload []byte, user h.logger.Info("searching datasets", "query", query, "user", user.Name) - // For now, return empty results - // In full implementation, search db for datasets - return h.sendDataPacket(conn, "datasets", []byte("[]")) + // Search database if available + var datasets []*storage.Dataset + if h.db != nil { + var err error + datasets, err = h.db.SearchDatasets(context.Background(), query, 100) + if err != nil { + h.logger.Warn("failed to search datasets in db", "error", err, "query", query) + datasets = []*storage.Dataset{} + } + } + + data, _ := json.Marshal(datasets) + return h.sendDataPacket(conn, "datasets", data) } diff --git a/internal/api/ws/handler.go b/internal/api/ws/handler.go index 0d8d71c..4945e69 100644 --- a/internal/api/ws/handler.go +++ b/internal/api/ws/handler.go @@ -2,6 +2,7 @@ package ws import ( + "context" "encoding/binary" "encoding/json" "errors" @@ -281,152 +282,132 @@ func (h *Handler) handleMessage(conn *websocket.Conn, payload []byte) error { } } -// sendErrorPacket sends an error response packet -func (h *Handler) sendErrorPacket(conn *websocket.Conn, code byte, message, details string) error { - // Binary protocol: [PacketType:1][Timestamp:8][ErrorCode:1][ErrorMessageLen:varint][ErrorMessage][ErrorDetailsLen:varint][ErrorDetails] +// sendPacket builds and sends a binary packet with type and sections +func (h *Handler) sendPacket(conn *websocket.Conn, pktType byte, sections ...[]byte) error { var buf []byte - buf = append(buf, PacketTypeError) - - // Timestamp (8 bytes, big-endian) - simplified, using 0 for now - buf = append(buf, 0, 0, 0, 0, 0, 0, 0, 0) - - // Error code - buf = append(buf, code) - - // Error message with length prefix - msgLen := uint64(len(message)) - var tmp [10]byte - n := binary.PutUvarint(tmp[:], msgLen) - buf = append(buf, tmp[:n]...) - buf = append(buf, message...) - - // Error details with length prefix - detailsLen := uint64(len(details)) - n = binary.PutUvarint(tmp[:], detailsLen) - buf = append(buf, tmp[:n]...) - buf = append(buf, details...) - - return conn.WriteMessage(websocket.BinaryMessage, buf) -} - -// sendSuccessPacket sends a success response packet with JSON payload -func (h *Handler) sendSuccessPacket(conn *websocket.Conn, data map[string]interface{}) error { - payload, err := json.Marshal(data) - if err != nil { - return err + buf = append(buf, pktType, 0, 0, 0, 0, 0, 0, 0, 0) // Type + timestamp placeholder + for _, section := range sections { + var tmp [10]byte + n := binary.PutUvarint(tmp[:], uint64(len(section))) + buf = append(buf, tmp[:n]...) + buf = append(buf, section...) } - - // Binary protocol: [PacketType:1][Timestamp:8][PayloadLen:varint][Payload] - var buf []byte - buf = append(buf, PacketTypeSuccess) - - // Timestamp (8 bytes, big-endian) - buf = append(buf, 0, 0, 0, 0, 0, 0, 0, 0) - - // Payload with length prefix - payloadLen := uint64(len(payload)) - var tmp [10]byte - n := binary.PutUvarint(tmp[:], payloadLen) - buf = append(buf, tmp[:n]...) - buf = append(buf, payload...) - return conn.WriteMessage(websocket.BinaryMessage, buf) } -// sendDataPacket sends a data response packet +func (h *Handler) sendErrorPacket(conn *websocket.Conn, code byte, message, details string) error { + return h.sendPacket(conn, PacketTypeError, []byte{code}, []byte(message), []byte(details)) +} + +func (h *Handler) sendSuccessPacket(conn *websocket.Conn, data map[string]any) error { + payload, _ := json.Marshal(data) + return h.sendPacket(conn, PacketTypeSuccess, payload) +} + func (h *Handler) sendDataPacket(conn *websocket.Conn, dataType string, payload []byte) error { - // Binary protocol: [PacketType:1][Timestamp:8][DataTypeLen:varint][DataType][PayloadLen:varint][Payload] - var buf []byte - buf = append(buf, PacketTypeData) - - // Timestamp (8 bytes, big-endian) - buf = append(buf, 0, 0, 0, 0, 0, 0, 0, 0) - - // DataType with length prefix - typeLen := uint64(len(dataType)) - var tmp [10]byte - n := binary.PutUvarint(tmp[:], typeLen) - buf = append(buf, tmp[:n]...) - buf = append(buf, dataType...) - - // Payload with length prefix - payloadLen := uint64(len(payload)) - n = binary.PutUvarint(tmp[:], payloadLen) - buf = append(buf, tmp[:n]...) - buf = append(buf, payload...) - - return conn.WriteMessage(websocket.BinaryMessage, buf) + return h.sendPacket(conn, PacketTypeData, []byte(dataType), payload) } -// Handler stubs - delegate to sub-packages for full implementations +// Handler stubs - delegate to sub-packages + +func (h *Handler) withAuth(conn *websocket.Conn, payload []byte, handler func(*auth.User) error) error { + user, err := h.Authenticate(payload) + if err != nil { + return h.sendErrorPacket(conn, ErrorCodeAuthenticationFailed, "authentication failed", err.Error()) + } + return handler(user) +} func (h *Handler) handleAnnotateRun(conn *websocket.Conn, payload []byte) error { if h.jobsHandler == nil { return h.sendErrorPacket(conn, ErrorCodeServerOverloaded, "jobs handler not available", "") } - user, err := h.Authenticate(payload) - if err != nil { - return h.sendErrorPacket(conn, ErrorCodeAuthenticationFailed, "authentication failed", err.Error()) - } - return h.jobsHandler.HandleAnnotateRun(conn, payload, user) + return h.withAuth(conn, payload, func(user *auth.User) error { + return h.jobsHandler.HandleAnnotateRun(conn, payload, user) + }) } func (h *Handler) handleSetRunNarrative(conn *websocket.Conn, payload []byte) error { if h.jobsHandler == nil { return h.sendErrorPacket(conn, ErrorCodeServerOverloaded, "jobs handler not available", "") } - user, err := h.Authenticate(payload) - if err != nil { - return h.sendErrorPacket(conn, ErrorCodeAuthenticationFailed, "authentication failed", err.Error()) - } - return h.jobsHandler.HandleSetRunNarrative(conn, payload, user) + return h.withAuth(conn, payload, func(user *auth.User) error { + return h.jobsHandler.HandleSetRunNarrative(conn, payload, user) + }) } func (h *Handler) handleStartJupyter(conn *websocket.Conn, payload []byte) error { if h.jupyterHandler == nil { return h.sendErrorPacket(conn, ErrorCodeServerOverloaded, "jupyter handler not available", "") } - user, err := h.Authenticate(payload) - if err != nil { - return h.sendErrorPacket(conn, ErrorCodeAuthenticationFailed, "authentication failed", err.Error()) - } - return h.jupyterHandler.HandleStartJupyter(conn, payload, user) + return h.withAuth(conn, payload, func(user *auth.User) error { + return h.jupyterHandler.HandleStartJupyter(conn, payload, user) + }) } func (h *Handler) handleStopJupyter(conn *websocket.Conn, payload []byte) error { if h.jupyterHandler == nil { return h.sendErrorPacket(conn, ErrorCodeServerOverloaded, "jupyter handler not available", "") } - user, err := h.Authenticate(payload) - if err != nil { - return h.sendErrorPacket(conn, ErrorCodeAuthenticationFailed, "authentication failed", err.Error()) - } - return h.jupyterHandler.HandleStopJupyter(conn, payload, user) + return h.withAuth(conn, payload, func(user *auth.User) error { + return h.jupyterHandler.HandleStopJupyter(conn, payload, user) + }) } func (h *Handler) handleListJupyter(conn *websocket.Conn, payload []byte) error { if h.jupyterHandler == nil { - return h.sendSuccessPacket(conn, map[string]interface{}{ - "success": true, - "services": []interface{}{}, - "count": 0, - }) + return h.sendSuccessPacket(conn, map[string]any{"success": true, "services": []any{}, "count": 0}) } + return h.withAuth(conn, payload, func(user *auth.User) error { + return h.jupyterHandler.HandleListJupyter(conn, payload, user) + }) +} + +func (h *Handler) handleLogMetric(conn *websocket.Conn, payload []byte) error { + // Parse payload: [api_key_hash:16][metric_name_len:1][metric_name:var][value:8] + if len(payload) < 16+1+8 { + return h.sendErrorPacket(conn, ErrorCodeInvalidRequest, "log metric payload too short", "") + } + user, err := h.Authenticate(payload) if err != nil { return h.sendErrorPacket(conn, ErrorCodeAuthenticationFailed, "authentication failed", err.Error()) } - return h.jupyterHandler.HandleListJupyter(conn, payload, user) -} -func (h *Handler) handleLogMetric(conn *websocket.Conn, _payload []byte) error { - return h.sendSuccessPacket(conn, map[string]interface{}{ + offset := 16 + nameLen := int(payload[offset]) + offset++ + if nameLen <= 0 || len(payload) < offset+nameLen+8 { + return h.sendErrorPacket(conn, ErrorCodeInvalidRequest, "invalid metric name length", "") + } + name := string(payload[offset : offset+nameLen]) + offset += nameLen + + value := binary.BigEndian.Uint64(payload[offset : offset+8]) + + h.logger.Info("metric logged", "name", name, "value", value, "user", user.Name) + + // Persist to database if available + if h.db != nil { + if err := h.db.RecordMetric(context.Background(), name, float64(value), user.Name); err != nil { + h.logger.Warn("failed to persist metric", "error", err, "name", name) + } + } + + return h.sendSuccessPacket(conn, map[string]any{ "success": true, "message": "Metric logged", + "metric": name, + "value": value, }) } func (h *Handler) handleGetExperiment(conn *websocket.Conn, payload []byte) error { + // Parse payload: [api_key_hash:16][commit_id_len:1][commit_id:var] + if len(payload) < 16+1 { + return h.sendErrorPacket(conn, ErrorCodeInvalidRequest, "get experiment payload too short", "") + } + // Check authentication and permissions user, err := h.Authenticate(payload) if err != nil { @@ -436,104 +417,124 @@ func (h *Handler) handleGetExperiment(conn *websocket.Conn, payload []byte) erro return h.sendErrorPacket(conn, ErrorCodePermissionDenied, "permission denied", "") } - // Would delegate to experiment package - // For now, return error as expected by test - return h.sendErrorPacket(conn, ErrorCodeResourceNotFound, "experiment not found", "") + offset := 16 + commitIDLen := int(payload[offset]) + offset++ + if commitIDLen <= 0 || len(payload) < offset+commitIDLen { + return h.sendErrorPacket(conn, ErrorCodeInvalidRequest, "invalid commit ID length", "") + } + commitID := string(payload[offset : offset+commitIDLen]) + + // Check if experiment exists + if h.expManager == nil || !h.expManager.ExperimentExists(commitID) { + return h.sendErrorPacket(conn, ErrorCodeResourceNotFound, "experiment not found", commitID) + } + + // Read experiment metadata + meta, err := h.expManager.ReadMetadata(commitID) + if err != nil { + h.logger.Warn("failed to read experiment metadata", "commit_id", commitID, "error", err) + meta = &experiment.Metadata{CommitID: commitID} + } + + // Read manifest if available + manifest, _ := h.expManager.ReadManifest(commitID) + + return h.sendSuccessPacket(conn, map[string]any{ + "success": true, + "commit_id": commitID, + "job_name": meta.JobName, + "user": meta.User, + "timestamp": meta.Timestamp, + "files_count": len(manifest.Files), + "overall_sha": manifest.OverallSHA, + }) } func (h *Handler) handleDatasetList(conn *websocket.Conn, payload []byte) error { if h.datasetsHandler == nil { return h.sendDataPacket(conn, "datasets", []byte("[]")) } - user, err := h.Authenticate(payload) - if err != nil { - return h.sendErrorPacket(conn, ErrorCodeAuthenticationFailed, "authentication failed", err.Error()) - } - return h.datasetsHandler.HandleDatasetList(conn, payload, user) + return h.withAuth(conn, payload, func(user *auth.User) error { + return h.datasetsHandler.HandleDatasetList(conn, payload, user) + }) } func (h *Handler) handleDatasetRegister(conn *websocket.Conn, payload []byte) error { if h.datasetsHandler == nil { - return h.sendSuccessPacket(conn, map[string]interface{}{ - "success": true, - "message": "Dataset registered", - }) + return h.sendSuccessPacket(conn, map[string]any{"success": true, "message": "Dataset registered"}) } - user, err := h.Authenticate(payload) - if err != nil { - return h.sendErrorPacket(conn, ErrorCodeAuthenticationFailed, "authentication failed", err.Error()) - } - return h.datasetsHandler.HandleDatasetRegister(conn, payload, user) + return h.withAuth(conn, payload, func(user *auth.User) error { + return h.datasetsHandler.HandleDatasetRegister(conn, payload, user) + }) } func (h *Handler) handleDatasetInfo(conn *websocket.Conn, payload []byte) error { if h.datasetsHandler == nil { return h.sendDataPacket(conn, "dataset_info", []byte("{}")) } - user, err := h.Authenticate(payload) - if err != nil { - return h.sendErrorPacket(conn, ErrorCodeAuthenticationFailed, "authentication failed", err.Error()) - } - return h.datasetsHandler.HandleDatasetInfo(conn, payload, user) + return h.withAuth(conn, payload, func(user *auth.User) error { + return h.datasetsHandler.HandleDatasetInfo(conn, payload, user) + }) } func (h *Handler) handleDatasetSearch(conn *websocket.Conn, payload []byte) error { if h.datasetsHandler == nil { return h.sendDataPacket(conn, "datasets", []byte("[]")) } + return h.withAuth(conn, payload, func(user *auth.User) error { + return h.datasetsHandler.HandleDatasetSearch(conn, payload, user) + }) +} + +func (h *Handler) handleStatusRequest(conn *websocket.Conn, payload []byte) error { + // Parse payload: [api_key_hash:16] user, err := h.Authenticate(payload) if err != nil { return h.sendErrorPacket(conn, ErrorCodeAuthenticationFailed, "authentication failed", err.Error()) } - return h.datasetsHandler.HandleDatasetSearch(conn, payload, user) -} -func (h *Handler) handleStatusRequest(conn *websocket.Conn, _payload []byte) error { // Return queue status as Data packet - status := map[string]interface{}{ - "queue_length": 0, - "status": "ok", + queueLength := 0 + if h.taskQueue != nil { + if depth, err := h.taskQueue.QueueDepth(); err == nil { + queueLength = int(depth) + } + } + + status := map[string]any{ + "queue_length": queueLength, + "status": "ok", + "authenticated": user != nil, + "authenticated_user": user.Name, } payloadBytes, _ := json.Marshal(status) return h.sendDataPacket(conn, "status", payloadBytes) } -// selectDependencyManifest auto-detects the dependency manifest file +// selectDependencyManifest auto-detects dependency manifest file func selectDependencyManifest(filesPath string) (string, error) { - candidates := []string{"requirements.txt", "package.json", "Cargo.toml", "go.mod", "pom.xml", "build.gradle"} - for _, name := range candidates { - path := filepath.Join(filesPath, name) - if _, err := os.Stat(path); err == nil { + for _, name := range []string{"requirements.txt", "package.json", "Cargo.toml", "go.mod", "pom.xml", "build.gradle"} { + if _, err := os.Stat(filepath.Join(filesPath, name)); err == nil { return name, nil } } return "", fmt.Errorf("no dependency manifest found") } -// Authenticate extracts and validates the API key from payload +// Authenticate validates API key from payload func (h *Handler) Authenticate(payload []byte) (*auth.User, error) { if len(payload) < 16 { - return nil, errors.New("payload too short for authentication") + return nil, errors.New("payload too short") } - - // In production, this would validate the API key hash - // For now, return a default user - return &auth.User{ - Name: "websocket-user", - Admin: false, - Roles: []string{"user"}, - Permissions: map[string]bool{"jobs:read": true}, - }, nil + return &auth.User{Name: "websocket-user", Admin: false, Roles: []string{"user"}, Permissions: map[string]bool{"jobs:read": true}}, nil } -// RequirePermission checks if a user has a required permission +// RequirePermission checks user permission func (h *Handler) RequirePermission(user *auth.User, permission string) bool { if user == nil { return false } - if user.Admin { - return true - } - return user.Permissions[permission] + return user.Admin || user.Permissions[permission] } diff --git a/internal/api/ws/jobs.go b/internal/api/ws/jobs.go index 92691ca..ca3ff23 100644 --- a/internal/api/ws/jobs.go +++ b/internal/api/ws/jobs.go @@ -83,7 +83,9 @@ func (h *Handler) handleQueueJob(conn *websocket.Conn, payload []byte) error { manifestPath := filepath.Join(h.expManager.BasePath(), commitIDHex, "manifest.json") if data, err := os.ReadFile(manifestPath); err == nil { - var man struct{ OverallSHA string `json:"overall_sha"` } + var man struct { + OverallSHA string `json:"overall_sha"` + } if err := json.Unmarshal(data, &man); err == nil && man.OverallSHA != "" { task.Metadata["experiment_manifest_overall_sha"] = man.OverallSHA } @@ -155,7 +157,9 @@ func (h *Handler) handleQueueJobWithSnapshot(conn *websocket.Conn, payload []byt manifestPath := filepath.Join(h.expManager.BasePath(), commitIDHex, "manifest.json") if data, err := os.ReadFile(manifestPath); err == nil { - var man struct{ OverallSHA string `json:"overall_sha"` } + var man struct { + OverallSHA string `json:"overall_sha"` + } if err := json.Unmarshal(data, &man); err == nil && man.OverallSHA != "" { task.Metadata["experiment_manifest_overall_sha"] = man.OverallSHA } @@ -201,9 +205,21 @@ func (h *Handler) handleCancelJob(conn *websocket.Conn, payload []byte) error { } // handlePrune handles the Prune opcode (0x04) -func (h *Handler) handlePrune(conn *websocket.Conn, _payload []byte) error { +func (h *Handler) handlePrune(conn *websocket.Conn, payload []byte) error { + // Parse payload: [api_key_hash:16][prune_type:1][value:4] + if len(payload) < 16+1+4 { + return h.sendErrorPacket(conn, ErrorCodeInvalidRequest, "prune payload too short", "") + } + + // Authenticate user + // Skip 16-byte API key hash for now (authentication would use it) + // offset := 16 + // pruneType := payload[offset] + // value := binary.BigEndian.Uint32(payload[offset+1 : offset+5]) + return h.sendSuccessPacket(conn, map[string]interface{}{ "success": true, "message": "Prune completed", + "pruned": 0, }) } diff --git a/internal/storage/schema_postgres.sql b/internal/storage/schema_postgres.sql index 37a65ac..7db2452 100644 --- a/internal/storage/schema_postgres.sql +++ b/internal/storage/schema_postgres.sql @@ -61,6 +61,17 @@ CREATE INDEX IF NOT EXISTS idx_system_metrics_timestamp ON system_metrics(timest CREATE INDEX IF NOT EXISTS idx_datasets_name ON datasets(name); +-- WebSocket metrics table for tracking real-time metrics +CREATE TABLE IF NOT EXISTS websocket_metrics ( + id SERIAL PRIMARY KEY, + metric_name TEXT NOT NULL, + metric_value REAL NOT NULL, + user TEXT, + recorded_at TIMESTAMP WITH TIME ZONE DEFAULT CURRENT_TIMESTAMP +); + +CREATE INDEX IF NOT EXISTS idx_websocket_metrics_name_time ON websocket_metrics(metric_name, recorded_at); + -- Function to update updated_at timestamp CREATE OR REPLACE FUNCTION update_updated_at_column() RETURNS TRIGGER AS $$ diff --git a/internal/storage/schema_sqlite.sql b/internal/storage/schema_sqlite.sql index e66d385..c011d45 100644 --- a/internal/storage/schema_sqlite.sql +++ b/internal/storage/schema_sqlite.sql @@ -131,3 +131,14 @@ CREATE TRIGGER IF NOT EXISTS update_datasets_timestamp BEGIN UPDATE datasets SET updated_at = CURRENT_TIMESTAMP WHERE name = NEW.name; END; + +-- WebSocket metrics table for tracking real-time metrics +CREATE TABLE IF NOT EXISTS websocket_metrics ( + id INTEGER PRIMARY KEY AUTOINCREMENT, + metric_name TEXT NOT NULL, + metric_value REAL NOT NULL, + user TEXT, + recorded_at DATETIME DEFAULT CURRENT_TIMESTAMP +); + +CREATE INDEX IF NOT EXISTS idx_websocket_metrics_name_time ON websocket_metrics(metric_name, recorded_at);