package tests import ( "context" "encoding/binary" "encoding/json" "fmt" "io" "net/http" "os" "os/exec" "path/filepath" "strings" "testing" "time" "github.com/gorilla/websocket" ) const ( logsDebugComposeFile = "docker-compose.logs-debug.yml" apiPort = "9102" // Use docker-compose port apiHost = "localhost" ) // TestLogsDebugE2E tests the logs and debug WebSocket API end-to-end func TestLogsDebugE2E(t *testing.T) { if os.Getenv("FETCH_ML_E2E_LOGS_DEBUG") != "1" { t.Skip("Skipping LogsDebugE2E (set FETCH_ML_E2E_LOGS_DEBUG=1 to enable)") } composeFile := filepath.Join(".", logsDebugComposeFile) if _, err := os.Stat(composeFile); os.IsNotExist(err) { t.Skipf("Docker compose file not found: %s", composeFile) } // Ensure Docker is available if _, err := exec.LookPath("docker"); err != nil { t.Skip("Docker not found in PATH") } if _, err := exec.LookPath("docker-compose"); err != nil { t.Skip("docker-compose not found in PATH") } t.Run("SetupAndLogsAPI", func(t *testing.T) { testLogsAPIWithDockerCompose(t, composeFile) }) t.Run("SetupAndDebugAPI", func(t *testing.T) { testDebugAPIWithDockerCompose(t, composeFile) }) } // testLogsAPIWithDockerCompose tests the logs API using Docker Compose func testLogsAPIWithDockerCompose(t *testing.T, composeFile string) { // Cleanup any existing containers cleanupDockerCompose(t, composeFile) // Start services t.Log("Starting Docker Compose services for logs API test...") upCmd := exec.CommandContext(context.Background(), "docker-compose", "-f", composeFile, "-p", "logsdebug-e2e", "up", "-d", "--build") upOutput, err := upCmd.CombinedOutput() if err != nil { t.Fatalf("Failed to start Docker Compose: %v\nOutput: %s", err, string(upOutput)) } // Cleanup after test defer func() { cleanupDockerCompose(t, composeFile) }() // Wait for services to be healthy t.Log("Waiting for services to be healthy...") if !waitForServicesHealthy(t, "logsdebug-e2e", 90*time.Second) { t.Fatal("Services failed to become healthy") } // Add delay to ensure server is fully ready for WebSocket connections t.Log("Waiting additional 3 seconds for WebSocket to be ready...") time.Sleep(3 * time.Second) // Test logs API testTargetID := "ab" testGetLogsAPI(t, testTargetID) testStreamLogsAPI(t, testTargetID) } // testDebugAPIWithDockerCompose tests the debug API using Docker Compose func testDebugAPIWithDockerCompose(t *testing.T, composeFile string) { // Cleanup any existing containers cleanupDockerCompose(t, composeFile) // Start services t.Log("Starting Docker Compose services for debug API test...") upCmd := exec.CommandContext(context.Background(), "docker-compose", "-f", composeFile, "-p", "logsdebug-e2e", "up", "-d", "--build") upOutput, err := upCmd.CombinedOutput() if err != nil { t.Fatalf("Failed to start Docker Compose: %v\nOutput: %s", err, string(upOutput)) } // Cleanup after test defer func() { cleanupDockerCompose(t, composeFile) }() // Wait for services to be healthy t.Log("Waiting for services to be healthy...") if !waitForServicesHealthy(t, "logsdebug-e2e", 90*time.Second) { t.Fatal("Services failed to become healthy") } // Add delay to ensure server is fully ready for WebSocket connections t.Log("Waiting additional 3 seconds for WebSocket to be ready...") time.Sleep(3 * time.Second) testTargetID := "test-job-456" testAttachDebugAPI(t, testTargetID, "interactive") testAttachDebugAPI(t, testTargetID, "gdb") testAttachDebugAPI(t, testTargetID, "pdb") } // connectWebSocketWithRetry attempts to connect with exponential backoff func connectWebSocketWithRetry(t *testing.T, wsURL string, maxRetries int) (*websocket.Conn, *http.Response, error) { var conn *websocket.Conn var resp *http.Response var err error // Create dialer with nil headers (like existing working test) dialer := websocket.DefaultDialer dialer.HandshakeTimeout = 10 * time.Second for i := 0; i < maxRetries; i++ { conn, resp, err = dialer.Dial(wsURL, nil) if err == nil { return conn, resp, nil } // Log response details for debugging if resp != nil { body, _ := io.ReadAll(resp.Body) resp.Body.Close() t.Logf("WebSocket connection attempt %d/%d failed: status=%d, body=%s", i+1, maxRetries, resp.StatusCode, string(body)) } else { t.Logf("WebSocket connection attempt %d/%d failed: %v", i+1, maxRetries, err) } if i < maxRetries-1 { delay := time.Duration(i+1) * 500 * time.Millisecond t.Logf("Waiting %v before retry...", delay) time.Sleep(delay) } } return nil, resp, err } func testGetLogsAPI(t *testing.T, targetID string) { t.Logf("Testing get_logs API with target: %s", targetID) // First verify HTTP connectivity healthURL := fmt.Sprintf("http://%s:%s/health", apiHost, apiPort) resp, err := http.Get(healthURL) if err != nil { t.Fatalf("HTTP health check failed: %v", err) } resp.Body.Close() t.Logf("HTTP health check passed: status=%d", resp.StatusCode) // Test WebSocket endpoint with HTTP GET (should return 400 or upgrade required) wsHTTPURL := fmt.Sprintf("http://%s:%s/ws", apiHost, apiPort) resp, err = http.Get(wsHTTPURL) if err != nil { t.Logf("HTTP GET to /ws failed: %v", err) } else { body, _ := io.ReadAll(resp.Body) resp.Body.Close() t.Logf("HTTP GET /ws: status=%d, body=%s", resp.StatusCode, string(body)) } wsURL := fmt.Sprintf("ws://%s:%s/ws", apiHost, apiPort) // Connect to WebSocket with retry conn, resp, err := connectWebSocketWithRetry(t, wsURL, 5) if resp != nil && resp.Body != nil { defer resp.Body.Close() } if err != nil { t.Fatalf("Failed to connect to WebSocket after retries: %v", err) } defer conn.Close() // Build binary message for get_logs // [opcode:1][api_key_hash:16][target_id_len:1][target_id:var] opcode := byte(0x20) // OpcodeGetLogs apiKeyHash := make([]byte, 16) // Zero-filled for testing (no auth) targetIDBytes := []byte(targetID) message := []byte{opcode} message = append(message, apiKeyHash...) message = append(message, byte(len(targetIDBytes))) message = append(message, targetIDBytes...) t.Logf("Sending message: opcode=0x%02x, len=%d, payload_len=%d", opcode, len(message), len(message)-1) // Send message err = conn.WriteMessage(websocket.BinaryMessage, message) if err != nil { t.Fatalf("Failed to send get_logs message: %v", err) } // Read response conn.SetReadDeadline(time.Now().Add(5 * time.Second)) messageType, response, err := conn.ReadMessage() if err != nil { t.Fatalf("Failed to read get_logs response: %v", err) } t.Logf("Received response type %d, length %d bytes", messageType, len(response)) // Parse response if len(response) < 1 { t.Fatal("Empty response received") } // Parse the packet using the protocol packet, err := parseResponsePacket(response) if err != nil { t.Fatalf("Failed to parse response packet: %v", err) } // Verify response is a data packet (0x04 = PacketTypeData) if packet.PacketType != 0x04 { t.Errorf("Expected data packet (0x04), got 0x%02x", packet.PacketType) } // Parse JSON payload var logsResponse struct { TargetID string `json:"target_id"` Logs string `json:"logs"` Truncated bool `json:"truncated"` TotalLines int `json:"total_lines"` } if err := json.Unmarshal(packet.Payload, &logsResponse); err != nil { t.Fatalf("Failed to parse logs JSON response: %v", err) } // Verify response fields if logsResponse.TargetID != targetID { t.Errorf("Expected target_id %s, got %s", targetID, logsResponse.TargetID) } if logsResponse.Logs == "" { t.Error("Expected non-empty logs content") } t.Logf("Successfully received logs for target %s (%d lines, truncated=%v)", logsResponse.TargetID, logsResponse.TotalLines, logsResponse.Truncated) } // testStreamLogsAPI tests the stream_logs WebSocket endpoint func testStreamLogsAPI(t *testing.T, targetID string) { t.Logf("Testing stream_logs API with target: %s", targetID) wsURL := fmt.Sprintf("ws://%s:%s/ws", apiHost, apiPort) // Connect to WebSocket with retry conn, resp, err := connectWebSocketWithRetry(t, wsURL, 5) if resp != nil && resp.Body != nil { defer resp.Body.Close() } if err != nil { t.Fatalf("Failed to connect to WebSocket after retries: %v", err) } defer conn.Close() // Build binary message for stream_logs // [opcode:1][api_key_hash:16][target_id_len:1][target_id:var] opcode := byte(0x21) // OpcodeStreamLogs apiKeyHash := make([]byte, 16) targetIDBytes := []byte(targetID) message := []byte{opcode} message = append(message, apiKeyHash...) message = append(message, byte(len(targetIDBytes))) message = append(message, targetIDBytes...) // Send message err = conn.WriteMessage(websocket.BinaryMessage, message) if err != nil { t.Fatalf("Failed to send stream_logs message: %v", err) } // Read response conn.SetReadDeadline(time.Now().Add(5 * time.Second)) messageType, response, err := conn.ReadMessage() if err != nil { t.Fatalf("Failed to read stream_logs response: %v", err) } t.Logf("Received stream response type %d, length %d bytes", messageType, len(response)) // Parse response packet, err := parseResponsePacket(response) if err != nil { t.Fatalf("Failed to parse response packet: %v", err) } // Verify response if packet.PacketType != 0x04 { // PacketTypeData t.Errorf("Expected data packet (0x04), got 0x%02x", packet.PacketType) } var streamResponse struct { TargetID string `json:"target_id"` Streaming bool `json:"streaming"` Message string `json:"message"` } if err := json.Unmarshal(packet.Payload, &streamResponse); err != nil { t.Fatalf("Failed to parse stream JSON response: %v", err) } if streamResponse.TargetID != targetID { t.Errorf("Expected target_id %s, got %s", targetID, streamResponse.TargetID) } if !streamResponse.Streaming { t.Error("Expected streaming=true in response") } t.Logf("Successfully initiated log streaming for target %s", streamResponse.TargetID) } // testAttachDebugAPI tests the attach_debug WebSocket endpoint func testAttachDebugAPI(t *testing.T, targetID, debugType string) { t.Logf("Testing attach_debug API with target: %s, debug_type: %s", targetID, debugType) wsURL := fmt.Sprintf("ws://%s:%s/ws", apiHost, apiPort) // Connect to WebSocket with retry conn, resp, err := connectWebSocketWithRetry(t, wsURL, 5) if resp != nil && resp.Body != nil { defer resp.Body.Close() } if err != nil { t.Fatalf("Failed to connect to WebSocket after retries: %v", err) } defer conn.Close() // Build binary message for attach_debug // [opcode:1][api_key_hash:16][target_id_len:1][target_id:var][debug_type:var] opcode := byte(0x22) // OpcodeAttachDebug apiKeyHash := make([]byte, 16) targetIDBytes := []byte(targetID) debugTypeBytes := []byte(debugType) message := []byte{opcode} message = append(message, apiKeyHash...) message = append(message, byte(len(targetIDBytes))) message = append(message, targetIDBytes...) message = append(message, debugTypeBytes...) // Send message err = conn.WriteMessage(websocket.BinaryMessage, message) if err != nil { t.Fatalf("Failed to send attach_debug message: %v", err) } // Read response conn.SetReadDeadline(time.Now().Add(5 * time.Second)) messageType, response, err := conn.ReadMessage() if err != nil { t.Fatalf("Failed to read attach_debug response: %v", err) } t.Logf("Received debug response type %d, length %d bytes", messageType, len(response)) // Parse response packet, err := parseResponsePacket(response) if err != nil { t.Fatalf("Failed to parse response packet: %v", err) } // Verify response if packet.PacketType != 0x04 { // PacketTypeData t.Errorf("Expected data packet (0x04), got 0x%02x", packet.PacketType) } var debugResponse struct { TargetID string `json:"target_id"` DebugType string `json:"debug_type"` Attached bool `json:"attached"` Message string `json:"message"` Suggestion string `json:"suggestion"` } if err := json.Unmarshal(packet.Payload, &debugResponse); err != nil { t.Fatalf("Failed to parse debug JSON response: %v", err) } if debugResponse.TargetID != targetID { t.Errorf("Expected target_id %s, got %s", targetID, debugResponse.TargetID) } if debugResponse.DebugType != debugType { t.Errorf("Expected debug_type %s, got %s", debugType, debugResponse.DebugType) } // Note: attached is false in stub implementation t.Logf("Debug attachment response: target=%s, type=%s, attached=%v", debugResponse.TargetID, debugResponse.DebugType, debugResponse.Attached) } // ResponsePacket represents a parsed WebSocket response packet type ResponsePacket struct { PacketType byte Payload []byte } // parseResponsePacket parses a binary WebSocket response packet func parseResponsePacket(data []byte) (*ResponsePacket, error) { if len(data) < 9 { // min: type(1) + timestamp(8) return nil, fmt.Errorf("packet too short: %d bytes", len(data)) } packetType := data[0] // Skip timestamp (8 bytes) payloadStart := 9 // Handle different response types switch packetType { case 0x00: // PacketTypeSuccess if len(data) <= payloadStart { return &ResponsePacket{PacketType: packetType, Payload: nil}, nil } // Parse varint length + string strLen, n := binary.Uvarint(data[payloadStart:]) if n <= 0 { return nil, fmt.Errorf("invalid varint") } return &ResponsePacket{ PacketType: packetType, Payload: data[payloadStart+n : payloadStart+n+int(strLen)], }, nil case 0x01: // PacketTypeError if len(data) < payloadStart+1 { return nil, fmt.Errorf("error packet too short") } errorCode := data[payloadStart] // Skip error code and read message msgStart := payloadStart + 1 if len(data) > msgStart && len(data) > msgStart+1 { return nil, fmt.Errorf("server error (code: 0x%02x): %s", errorCode, string(data[msgStart:])) } return nil, fmt.Errorf("server error (code: 0x%02x)", errorCode) case 0x04: // PacketTypeData // Format after timestamp: [data_type_len:varint][data_type][payload_len:varint][payload] if len(data) <= payloadStart { return &ResponsePacket{PacketType: packetType, Payload: nil}, nil } // Read data_type length (varint) typeLen, n1 := binary.Uvarint(data[payloadStart:]) if n1 <= 0 { return nil, fmt.Errorf("invalid type length varint") } // Skip data_type string payloadLenStart := payloadStart + n1 + int(typeLen) if len(data) <= payloadLenStart { return &ResponsePacket{PacketType: packetType, Payload: nil}, nil } // Read payload length (varint) payloadLen, n2 := binary.Uvarint(data[payloadLenStart:]) if n2 <= 0 { return nil, fmt.Errorf("invalid payload length varint") } payloadDataStart := payloadLenStart + n2 if len(data) < payloadDataStart+int(payloadLen) { return nil, fmt.Errorf("data packet payload incomplete") } return &ResponsePacket{ PacketType: packetType, Payload: data[payloadDataStart : payloadDataStart+int(payloadLen)], }, nil default: // Unknown packet type, return raw data after timestamp if len(data) > payloadStart { return &ResponsePacket{ PacketType: packetType, Payload: data[payloadStart:], }, nil } return &ResponsePacket{PacketType: packetType}, nil } } // cleanupDockerCompose stops and removes Docker Compose containers func cleanupDockerCompose(t *testing.T, composeFile string) { t.Log("Cleaning up Docker Compose...") downCmd := exec.CommandContext(context.Background(), "docker-compose", "-f", composeFile, "-p", "logsdebug-e2e", "down", "--remove-orphans", "--volumes") downCmd.Stdout = os.Stdout downCmd.Stderr = os.Stderr if err := downCmd.Run(); err != nil { t.Logf("Warning: Failed to cleanup Docker Compose: %v", err) } } // waitForServicesHealthy waits for all services to be healthy func waitForServicesHealthy(t *testing.T, projectName string, timeout time.Duration) bool { start := time.Now() for time.Since(start) < timeout { // Check container status psCmd := exec.CommandContext(context.Background(), "docker", "ps", "--filter", fmt.Sprintf("name=%s", projectName), "--format", "{{.Names}}\t{{.Status}}") output, err := psCmd.CombinedOutput() if err == nil { status := string(output) t.Logf("Container status:\n%s", status) // Check if all containers are healthy (not just "Up") allHealthy := true lines := strings.Split(status, "\n") containerCount := 0 for _, line := range lines { if strings.TrimSpace(line) == "" { continue } containerCount++ // Must explicitly contain "healthy" - "Up (health: starting)" is NOT healthy if !strings.Contains(line, "healthy") { allHealthy = false break } } if allHealthy && containerCount >= 2 { // redis and api-server t.Log("All services are healthy") return true } } time.Sleep(1 * time.Second) } t.Logf("Timeout waiting for services after %v", timeout) return false } // TestLogsDebugHTTPHealthE2E tests the HTTP health endpoint for logs/debug services func TestLogsDebugHTTPHealthE2E(t *testing.T) { if os.Getenv("FETCH_ML_E2E_LOGS_DEBUG") != "1" { t.Skip("Skipping LogsDebugHTTPHealthE2E (set FETCH_ML_E2E_LOGS_DEBUG=1 to enable)") } // Check if API is already running healthURL := fmt.Sprintf("http://%s:%s/health", apiHost, apiPort) client := &http.Client{Timeout: 5 * time.Second} resp, err := client.Get(healthURL) if err != nil { t.Skipf("API server not available at %s: %v", healthURL, err) } defer resp.Body.Close() if resp.StatusCode != http.StatusOK { t.Errorf("Health check failed: status %d", resp.StatusCode) } // Parse health response var healthResponse struct { Status string `json:"status"` Version string `json:"version"` } if err := json.NewDecoder(resp.Body).Decode(&healthResponse); err != nil { t.Logf("Failed to parse health response: %v", err) } else { t.Logf("API health: status=%s, version=%s", healthResponse.Status, healthResponse.Version) } }