fetch_ml/tests/e2e/logs_debug_e2e_test.go
Jeremie Fraeys 7305e2bc21
test: add comprehensive test coverage and command improvements
- Add logs and debug end-to-end tests
- Add test helper utilities
- Improve test fixtures and templates
- Update API server and config lint commands
- Add multi-user database initialization
2026-02-16 20:38:15 -05:00

590 lines
18 KiB
Go

package tests
import (
"context"
"encoding/binary"
"encoding/json"
"fmt"
"io"
"net/http"
"os"
"os/exec"
"path/filepath"
"strings"
"testing"
"time"
"github.com/gorilla/websocket"
)
const (
logsDebugComposeFile = "docker-compose.logs-debug.yml"
apiPort = "9102" // Use docker-compose port
apiHost = "localhost"
)
// TestLogsDebugE2E tests the logs and debug WebSocket API end-to-end
func TestLogsDebugE2E(t *testing.T) {
if os.Getenv("FETCH_ML_E2E_LOGS_DEBUG") != "1" {
t.Skip("Skipping LogsDebugE2E (set FETCH_ML_E2E_LOGS_DEBUG=1 to enable)")
}
composeFile := filepath.Join(".", logsDebugComposeFile)
if _, err := os.Stat(composeFile); os.IsNotExist(err) {
t.Skipf("Docker compose file not found: %s", composeFile)
}
// Ensure Docker is available
if _, err := exec.LookPath("docker"); err != nil {
t.Skip("Docker not found in PATH")
}
if _, err := exec.LookPath("docker-compose"); err != nil {
t.Skip("docker-compose not found in PATH")
}
t.Run("SetupAndLogsAPI", func(t *testing.T) {
testLogsAPIWithDockerCompose(t, composeFile)
})
t.Run("SetupAndDebugAPI", func(t *testing.T) {
testDebugAPIWithDockerCompose(t, composeFile)
})
}
// testLogsAPIWithDockerCompose tests the logs API using Docker Compose
func testLogsAPIWithDockerCompose(t *testing.T, composeFile string) {
// Cleanup any existing containers
cleanupDockerCompose(t, composeFile)
// Start services
t.Log("Starting Docker Compose services for logs API test...")
upCmd := exec.CommandContext(context.Background(),
"docker-compose", "-f", composeFile, "-p", "logsdebug-e2e", "up", "-d", "--build")
upOutput, err := upCmd.CombinedOutput()
if err != nil {
t.Fatalf("Failed to start Docker Compose: %v\nOutput: %s", err, string(upOutput))
}
// Cleanup after test
defer func() {
cleanupDockerCompose(t, composeFile)
}()
// Wait for services to be healthy
t.Log("Waiting for services to be healthy...")
if !waitForServicesHealthy(t, "logsdebug-e2e", 90*time.Second) {
t.Fatal("Services failed to become healthy")
}
// Add delay to ensure server is fully ready for WebSocket connections
t.Log("Waiting additional 3 seconds for WebSocket to be ready...")
time.Sleep(3 * time.Second)
// Test logs API
testTargetID := "ab"
testGetLogsAPI(t, testTargetID)
testStreamLogsAPI(t, testTargetID)
}
// testDebugAPIWithDockerCompose tests the debug API using Docker Compose
func testDebugAPIWithDockerCompose(t *testing.T, composeFile string) {
// Cleanup any existing containers
cleanupDockerCompose(t, composeFile)
// Start services
t.Log("Starting Docker Compose services for debug API test...")
upCmd := exec.CommandContext(context.Background(),
"docker-compose", "-f", composeFile, "-p", "logsdebug-e2e", "up", "-d", "--build")
upOutput, err := upCmd.CombinedOutput()
if err != nil {
t.Fatalf("Failed to start Docker Compose: %v\nOutput: %s", err, string(upOutput))
}
// Cleanup after test
defer func() {
cleanupDockerCompose(t, composeFile)
}()
// Wait for services to be healthy
t.Log("Waiting for services to be healthy...")
if !waitForServicesHealthy(t, "logsdebug-e2e", 90*time.Second) {
t.Fatal("Services failed to become healthy")
}
// Add delay to ensure server is fully ready for WebSocket connections
t.Log("Waiting additional 3 seconds for WebSocket to be ready...")
time.Sleep(3 * time.Second)
testTargetID := "test-job-456"
testAttachDebugAPI(t, testTargetID, "interactive")
testAttachDebugAPI(t, testTargetID, "gdb")
testAttachDebugAPI(t, testTargetID, "pdb")
}
// connectWebSocketWithRetry attempts to connect with exponential backoff
func connectWebSocketWithRetry(t *testing.T, wsURL string, maxRetries int) (*websocket.Conn, *http.Response, error) {
var conn *websocket.Conn
var resp *http.Response
var err error
// Create dialer with nil headers (like existing working test)
dialer := websocket.DefaultDialer
dialer.HandshakeTimeout = 10 * time.Second
for i := 0; i < maxRetries; i++ {
conn, resp, err = dialer.Dial(wsURL, nil)
if err == nil {
return conn, resp, nil
}
// Log response details for debugging
if resp != nil {
body, _ := io.ReadAll(resp.Body)
resp.Body.Close()
t.Logf("WebSocket connection attempt %d/%d failed: status=%d, body=%s", i+1, maxRetries, resp.StatusCode, string(body))
} else {
t.Logf("WebSocket connection attempt %d/%d failed: %v", i+1, maxRetries, err)
}
if i < maxRetries-1 {
delay := time.Duration(i+1) * 500 * time.Millisecond
t.Logf("Waiting %v before retry...", delay)
time.Sleep(delay)
}
}
return nil, resp, err
}
func testGetLogsAPI(t *testing.T, targetID string) {
t.Logf("Testing get_logs API with target: %s", targetID)
// First verify HTTP connectivity
healthURL := fmt.Sprintf("http://%s:%s/health", apiHost, apiPort)
resp, err := http.Get(healthURL)
if err != nil {
t.Fatalf("HTTP health check failed: %v", err)
}
resp.Body.Close()
t.Logf("HTTP health check passed: status=%d", resp.StatusCode)
// Test WebSocket endpoint with HTTP GET (should return 400 or upgrade required)
wsHTTPURL := fmt.Sprintf("http://%s:%s/ws", apiHost, apiPort)
resp, err = http.Get(wsHTTPURL)
if err != nil {
t.Logf("HTTP GET to /ws failed: %v", err)
} else {
body, _ := io.ReadAll(resp.Body)
resp.Body.Close()
t.Logf("HTTP GET /ws: status=%d, body=%s", resp.StatusCode, string(body))
}
wsURL := fmt.Sprintf("ws://%s:%s/ws", apiHost, apiPort)
// Connect to WebSocket with retry
conn, resp, err := connectWebSocketWithRetry(t, wsURL, 5)
if resp != nil && resp.Body != nil {
defer resp.Body.Close()
}
if err != nil {
t.Fatalf("Failed to connect to WebSocket after retries: %v", err)
}
defer conn.Close()
// Build binary message for get_logs
// [opcode:1][api_key_hash:16][target_id_len:1][target_id:var]
opcode := byte(0x20) // OpcodeGetLogs
apiKeyHash := make([]byte, 16) // Zero-filled for testing (no auth)
targetIDBytes := []byte(targetID)
message := []byte{opcode}
message = append(message, apiKeyHash...)
message = append(message, byte(len(targetIDBytes)))
message = append(message, targetIDBytes...)
t.Logf("Sending message: opcode=0x%02x, len=%d, payload_len=%d", opcode, len(message), len(message)-1)
// Send message
err = conn.WriteMessage(websocket.BinaryMessage, message)
if err != nil {
t.Fatalf("Failed to send get_logs message: %v", err)
}
// Read response
conn.SetReadDeadline(time.Now().Add(5 * time.Second))
messageType, response, err := conn.ReadMessage()
if err != nil {
t.Fatalf("Failed to read get_logs response: %v", err)
}
t.Logf("Received response type %d, length %d bytes", messageType, len(response))
// Parse response
if len(response) < 1 {
t.Fatal("Empty response received")
}
// Parse the packet using the protocol
packet, err := parseResponsePacket(response)
if err != nil {
t.Fatalf("Failed to parse response packet: %v", err)
}
// Verify response is a data packet (0x04 = PacketTypeData)
if packet.PacketType != 0x04 {
t.Errorf("Expected data packet (0x04), got 0x%02x", packet.PacketType)
}
// Parse JSON payload
var logsResponse struct {
TargetID string `json:"target_id"`
Logs string `json:"logs"`
Truncated bool `json:"truncated"`
TotalLines int `json:"total_lines"`
}
if err := json.Unmarshal(packet.Payload, &logsResponse); err != nil {
t.Fatalf("Failed to parse logs JSON response: %v", err)
}
// Verify response fields
if logsResponse.TargetID != targetID {
t.Errorf("Expected target_id %s, got %s", targetID, logsResponse.TargetID)
}
if logsResponse.Logs == "" {
t.Error("Expected non-empty logs content")
}
t.Logf("Successfully received logs for target %s (%d lines, truncated=%v)",
logsResponse.TargetID, logsResponse.TotalLines, logsResponse.Truncated)
}
// testStreamLogsAPI tests the stream_logs WebSocket endpoint
func testStreamLogsAPI(t *testing.T, targetID string) {
t.Logf("Testing stream_logs API with target: %s", targetID)
wsURL := fmt.Sprintf("ws://%s:%s/ws", apiHost, apiPort)
// Connect to WebSocket with retry
conn, resp, err := connectWebSocketWithRetry(t, wsURL, 5)
if resp != nil && resp.Body != nil {
defer resp.Body.Close()
}
if err != nil {
t.Fatalf("Failed to connect to WebSocket after retries: %v", err)
}
defer conn.Close()
// Build binary message for stream_logs
// [opcode:1][api_key_hash:16][target_id_len:1][target_id:var]
opcode := byte(0x21) // OpcodeStreamLogs
apiKeyHash := make([]byte, 16)
targetIDBytes := []byte(targetID)
message := []byte{opcode}
message = append(message, apiKeyHash...)
message = append(message, byte(len(targetIDBytes)))
message = append(message, targetIDBytes...)
// Send message
err = conn.WriteMessage(websocket.BinaryMessage, message)
if err != nil {
t.Fatalf("Failed to send stream_logs message: %v", err)
}
// Read response
conn.SetReadDeadline(time.Now().Add(5 * time.Second))
messageType, response, err := conn.ReadMessage()
if err != nil {
t.Fatalf("Failed to read stream_logs response: %v", err)
}
t.Logf("Received stream response type %d, length %d bytes", messageType, len(response))
// Parse response
packet, err := parseResponsePacket(response)
if err != nil {
t.Fatalf("Failed to parse response packet: %v", err)
}
// Verify response
if packet.PacketType != 0x04 { // PacketTypeData
t.Errorf("Expected data packet (0x04), got 0x%02x", packet.PacketType)
}
var streamResponse struct {
TargetID string `json:"target_id"`
Streaming bool `json:"streaming"`
Message string `json:"message"`
}
if err := json.Unmarshal(packet.Payload, &streamResponse); err != nil {
t.Fatalf("Failed to parse stream JSON response: %v", err)
}
if streamResponse.TargetID != targetID {
t.Errorf("Expected target_id %s, got %s", targetID, streamResponse.TargetID)
}
if !streamResponse.Streaming {
t.Error("Expected streaming=true in response")
}
t.Logf("Successfully initiated log streaming for target %s", streamResponse.TargetID)
}
// testAttachDebugAPI tests the attach_debug WebSocket endpoint
func testAttachDebugAPI(t *testing.T, targetID, debugType string) {
t.Logf("Testing attach_debug API with target: %s, debug_type: %s", targetID, debugType)
wsURL := fmt.Sprintf("ws://%s:%s/ws", apiHost, apiPort)
// Connect to WebSocket with retry
conn, resp, err := connectWebSocketWithRetry(t, wsURL, 5)
if resp != nil && resp.Body != nil {
defer resp.Body.Close()
}
if err != nil {
t.Fatalf("Failed to connect to WebSocket after retries: %v", err)
}
defer conn.Close()
// Build binary message for attach_debug
// [opcode:1][api_key_hash:16][target_id_len:1][target_id:var][debug_type:var]
opcode := byte(0x22) // OpcodeAttachDebug
apiKeyHash := make([]byte, 16)
targetIDBytes := []byte(targetID)
debugTypeBytes := []byte(debugType)
message := []byte{opcode}
message = append(message, apiKeyHash...)
message = append(message, byte(len(targetIDBytes)))
message = append(message, targetIDBytes...)
message = append(message, debugTypeBytes...)
// Send message
err = conn.WriteMessage(websocket.BinaryMessage, message)
if err != nil {
t.Fatalf("Failed to send attach_debug message: %v", err)
}
// Read response
conn.SetReadDeadline(time.Now().Add(5 * time.Second))
messageType, response, err := conn.ReadMessage()
if err != nil {
t.Fatalf("Failed to read attach_debug response: %v", err)
}
t.Logf("Received debug response type %d, length %d bytes", messageType, len(response))
// Parse response
packet, err := parseResponsePacket(response)
if err != nil {
t.Fatalf("Failed to parse response packet: %v", err)
}
// Verify response
if packet.PacketType != 0x04 { // PacketTypeData
t.Errorf("Expected data packet (0x04), got 0x%02x", packet.PacketType)
}
var debugResponse struct {
TargetID string `json:"target_id"`
DebugType string `json:"debug_type"`
Attached bool `json:"attached"`
Message string `json:"message"`
Suggestion string `json:"suggestion"`
}
if err := json.Unmarshal(packet.Payload, &debugResponse); err != nil {
t.Fatalf("Failed to parse debug JSON response: %v", err)
}
if debugResponse.TargetID != targetID {
t.Errorf("Expected target_id %s, got %s", targetID, debugResponse.TargetID)
}
if debugResponse.DebugType != debugType {
t.Errorf("Expected debug_type %s, got %s", debugType, debugResponse.DebugType)
}
// Note: attached is false in stub implementation
t.Logf("Debug attachment response: target=%s, type=%s, attached=%v",
debugResponse.TargetID, debugResponse.DebugType, debugResponse.Attached)
}
// ResponsePacket represents a parsed WebSocket response packet
type ResponsePacket struct {
PacketType byte
Payload []byte
}
// parseResponsePacket parses a binary WebSocket response packet
func parseResponsePacket(data []byte) (*ResponsePacket, error) {
if len(data) < 9 { // min: type(1) + timestamp(8)
return nil, fmt.Errorf("packet too short: %d bytes", len(data))
}
packetType := data[0]
// Skip timestamp (8 bytes)
payloadStart := 9
// Handle different response types
switch packetType {
case 0x00: // PacketTypeSuccess
if len(data) <= payloadStart {
return &ResponsePacket{PacketType: packetType, Payload: nil}, nil
}
// Parse varint length + string
strLen, n := binary.Uvarint(data[payloadStart:])
if n <= 0 {
return nil, fmt.Errorf("invalid varint")
}
return &ResponsePacket{
PacketType: packetType,
Payload: data[payloadStart+n : payloadStart+n+int(strLen)],
}, nil
case 0x01: // PacketTypeError
if len(data) < payloadStart+1 {
return nil, fmt.Errorf("error packet too short")
}
errorCode := data[payloadStart]
// Skip error code and read message
msgStart := payloadStart + 1
if len(data) > msgStart && len(data) > msgStart+1 {
return nil, fmt.Errorf("server error (code: 0x%02x): %s", errorCode, string(data[msgStart:]))
}
return nil, fmt.Errorf("server error (code: 0x%02x)", errorCode)
case 0x04: // PacketTypeData
// Format after timestamp: [data_type_len:varint][data_type][payload_len:varint][payload]
if len(data) <= payloadStart {
return &ResponsePacket{PacketType: packetType, Payload: nil}, nil
}
// Read data_type length (varint)
typeLen, n1 := binary.Uvarint(data[payloadStart:])
if n1 <= 0 {
return nil, fmt.Errorf("invalid type length varint")
}
// Skip data_type string
payloadLenStart := payloadStart + n1 + int(typeLen)
if len(data) <= payloadLenStart {
return &ResponsePacket{PacketType: packetType, Payload: nil}, nil
}
// Read payload length (varint)
payloadLen, n2 := binary.Uvarint(data[payloadLenStart:])
if n2 <= 0 {
return nil, fmt.Errorf("invalid payload length varint")
}
payloadDataStart := payloadLenStart + n2
if len(data) < payloadDataStart+int(payloadLen) {
return nil, fmt.Errorf("data packet payload incomplete")
}
return &ResponsePacket{
PacketType: packetType,
Payload: data[payloadDataStart : payloadDataStart+int(payloadLen)],
}, nil
default:
// Unknown packet type, return raw data after timestamp
if len(data) > payloadStart {
return &ResponsePacket{
PacketType: packetType,
Payload: data[payloadStart:],
}, nil
}
return &ResponsePacket{PacketType: packetType}, nil
}
}
// cleanupDockerCompose stops and removes Docker Compose containers
func cleanupDockerCompose(t *testing.T, composeFile string) {
t.Log("Cleaning up Docker Compose...")
downCmd := exec.CommandContext(context.Background(),
"docker-compose", "-f", composeFile, "-p", "logsdebug-e2e", "down", "--remove-orphans", "--volumes")
downCmd.Stdout = os.Stdout
downCmd.Stderr = os.Stderr
if err := downCmd.Run(); err != nil {
t.Logf("Warning: Failed to cleanup Docker Compose: %v", err)
}
}
// waitForServicesHealthy waits for all services to be healthy
func waitForServicesHealthy(t *testing.T, projectName string, timeout time.Duration) bool {
start := time.Now()
for time.Since(start) < timeout {
// Check container status
psCmd := exec.CommandContext(context.Background(),
"docker", "ps", "--filter", fmt.Sprintf("name=%s", projectName), "--format", "{{.Names}}\t{{.Status}}")
output, err := psCmd.CombinedOutput()
if err == nil {
status := string(output)
t.Logf("Container status:\n%s", status)
// Check if all containers are healthy (not just "Up")
allHealthy := true
lines := strings.Split(status, "\n")
containerCount := 0
for _, line := range lines {
if strings.TrimSpace(line) == "" {
continue
}
containerCount++
// Must explicitly contain "healthy" - "Up (health: starting)" is NOT healthy
if !strings.Contains(line, "healthy") {
allHealthy = false
break
}
}
if allHealthy && containerCount >= 2 { // redis and api-server
t.Log("All services are healthy")
return true
}
}
time.Sleep(1 * time.Second)
}
t.Logf("Timeout waiting for services after %v", timeout)
return false
}
// TestLogsDebugHTTPHealthE2E tests the HTTP health endpoint for logs/debug services
func TestLogsDebugHTTPHealthE2E(t *testing.T) {
if os.Getenv("FETCH_ML_E2E_LOGS_DEBUG") != "1" {
t.Skip("Skipping LogsDebugHTTPHealthE2E (set FETCH_ML_E2E_LOGS_DEBUG=1 to enable)")
}
// Check if API is already running
healthURL := fmt.Sprintf("http://%s:%s/health", apiHost, apiPort)
client := &http.Client{Timeout: 5 * time.Second}
resp, err := client.Get(healthURL)
if err != nil {
t.Skipf("API server not available at %s: %v", healthURL, err)
}
defer resp.Body.Close()
if resp.StatusCode != http.StatusOK {
t.Errorf("Health check failed: status %d", resp.StatusCode)
}
// Parse health response
var healthResponse struct {
Status string `json:"status"`
Version string `json:"version"`
}
if err := json.NewDecoder(resp.Body).Decode(&healthResponse); err != nil {
t.Logf("Failed to parse health response: %v", err)
} else {
t.Logf("API health: status=%s, version=%s", healthResponse.Status, healthResponse.Version)
}
}