fetch_ml/tests/integration/storage_redis_integration_test.go
Jeremie Fraeys ea15af1833 Fix multi-user authentication and clean up debug code
- Fix YAML tags in auth config struct (json -> yaml)
- Update CLI configs to use pre-hashed API keys
- Remove double hashing in WebSocket client
- Fix port mapping (9102 -> 9103) in CLI commands
- Update permission keys to use jobs:read, jobs:create, etc.
- Clean up all debug logging from CLI and server
- All user roles now authenticate correctly:
  * Admin: Can queue jobs and see all jobs
  * Researcher: Can queue jobs and see own jobs
  * Analyst: Can see status (read-only access)

Multi-user authentication is now fully functional.
2025-12-06 12:35:32 -05:00

351 lines
8.8 KiB
Go

package tests
import (
"context"
"testing"
"time"
"github.com/jfraeys/fetch_ml/internal/storage"
fixtures "github.com/jfraeys/fetch_ml/tests/fixtures"
"github.com/redis/go-redis/v9"
)
// setupRedis creates a Redis client for testing
func setupRedis(t *testing.T) *redis.Client {
rdb := redis.NewClient(&redis.Options{
Addr: "localhost:6379",
Password: "",
DB: 1, // Use DB 1 for tests to avoid conflicts
})
ctx := context.Background()
if err := rdb.Ping(ctx).Err(); err != nil {
t.Skipf("Redis not available, skipping integration test: %v", err)
return nil
}
// Clean up the test database
rdb.FlushDB(ctx)
t.Cleanup(func() {
rdb.FlushDB(ctx)
defer func() { _ = rdb.Close() }()
})
return rdb
}
func TestStorageRedisIntegration(t *testing.T) {
t.Parallel() // Enable parallel execution
// Setup Redis and storage
redisHelper := setupRedis(t)
defer func() { _ = redisHelper.Close() }()
tempDir := t.TempDir()
db, err := storage.NewDBFromPath(tempDir + "/test.db")
if err != nil {
t.Fatalf("Failed to create database: %v", err)
}
defer func() { _ = db.Close() }()
// Initialize database schema
schema := fixtures.TestSchema
err = db.Initialize(schema)
if err != nil {
t.Fatalf("Failed to initialize database: %v", err)
}
// Test 1: Create job in storage and queue in Redis
job := &storage.Job{
ID: "test-job-1",
JobName: "Test Job",
Status: "pending",
CreatedAt: time.Now(),
UpdatedAt: time.Now(),
Args: "",
Priority: 0,
}
// Store job in database
err = db.CreateJob(job)
if err != nil {
t.Fatalf("Failed to create job: %v", err)
}
// Queue job in Redis
ctx := context.Background()
err = redisHelper.RPush(ctx, "ml:queue", job.ID).Err()
if err != nil {
t.Fatalf("Failed to queue job in Redis: %v", err)
}
// Verify job exists in both systems
retrievedJob, err := db.GetJob(job.ID)
if err != nil {
t.Fatalf("Failed to retrieve job from database: %v", err)
}
if retrievedJob.ID != job.ID {
t.Errorf("Expected job ID %s, got %s", job.ID, retrievedJob.ID)
}
// Verify job is in Redis queue
queueLength := redisHelper.LLen(ctx, "ml:queue").Val()
if queueLength != 1 {
t.Errorf("Expected queue length 1, got %d", queueLength)
}
queuedJobID := redisHelper.LIndex(ctx, "ml:queue", 0).Val()
if queuedJobID != job.ID {
t.Errorf("Expected queued job ID %s, got %s", job.ID, queuedJobID)
}
}
func TestStorageRedisWorkerIntegration(t *testing.T) {
t.Parallel() // Enable parallel execution
// Setup Redis and storage
redisHelper := setupRedis(t)
defer func() { _ = redisHelper.Close() }()
tempDir := t.TempDir()
db, err := storage.NewDBFromPath(tempDir + "/test.db")
if err != nil {
t.Fatalf("Failed to create database: %v", err)
}
defer func() { _ = db.Close() }()
schema := fixtures.TestSchema
err = db.Initialize(schema)
if err != nil {
t.Fatalf("Failed to initialize database: %v", err)
}
// Test 2: Worker registration and heartbeat integration
worker := &storage.Worker{
ID: "worker-1",
Hostname: "test-host",
LastHeartbeat: time.Now(),
Status: "active",
CurrentJobs: 0,
MaxJobs: 1,
}
// Register worker in database
err = db.RegisterWorker(worker)
if err != nil {
t.Fatalf("Failed to register worker: %v", err)
}
// Update worker heartbeat in Redis
ctx := context.Background()
heartbeatKey := "ml:workers:heartbeat"
err = redisHelper.HSet(ctx, heartbeatKey, worker.ID, time.Now().Unix()).Err()
if err != nil {
t.Fatalf("Failed to set worker heartbeat in Redis: %v", err)
}
// Verify worker exists in database
activeWorkers, err := db.GetActiveWorkers()
if err != nil {
t.Fatalf("Failed to get active workers: %v", err)
}
if len(activeWorkers) != 1 {
t.Errorf("Expected 1 active worker, got %d", len(activeWorkers))
}
if activeWorkers[0].ID != worker.ID {
t.Errorf("Expected worker ID %s, got %s", worker.ID, activeWorkers[0].ID)
}
// Verify heartbeat exists in Redis
heartbeatTime := redisHelper.HGet(ctx, heartbeatKey, worker.ID).Val()
if heartbeatTime == "" {
t.Error("Worker heartbeat not found in Redis")
}
}
func TestStorageRedisMetricsIntegration(t *testing.T) {
t.Parallel() // Enable parallel execution
// Setup Redis and storage
redisHelper := setupRedis(t)
defer func() { _ = redisHelper.Close() }()
tempDir := t.TempDir()
db, err := storage.NewDBFromPath(tempDir + "/test.db")
if err != nil {
t.Fatalf("Failed to create database: %v", err)
}
defer func() { _ = db.Close() }()
schema := fixtures.TestSchema
err = db.Initialize(schema)
if err != nil {
t.Fatalf("Failed to initialize database: %v", err)
}
// Test 3: Metrics recording in both systems
jobID := "metrics-job-1"
// Create job first to satisfy foreign key constraint
job := &storage.Job{
ID: jobID,
JobName: "Metrics Test Job",
Status: statusRunning,
CreatedAt: time.Now(),
UpdatedAt: time.Now(),
Args: "",
Priority: 0,
}
err = db.CreateJob(job)
if err != nil {
t.Fatalf("Failed to create job: %v", err)
}
// Record job metrics in database
err = db.RecordJobMetric(jobID, "cpu_usage", "75.5")
if err != nil {
t.Fatalf("Failed to record job metric: %v", err)
}
err = db.RecordJobMetric(jobID, "memory_usage", "1024.0")
if err != nil {
t.Fatalf("Failed to record job metric: %v", err)
}
// Record system metrics in Redis
ctx := context.Background()
systemMetricsKey := "ml:metrics:system"
metricsData := map[string]interface{}{
"timestamp": time.Now().Unix(),
"cpu_total": 85.2,
"memory_total": 4096.0,
"disk_usage": 75.0,
}
err = redisHelper.HMSet(ctx, systemMetricsKey, metricsData).Err()
if err != nil {
t.Fatalf("Failed to record system metrics in Redis: %v", err)
}
// Verify job metrics in database
jobMetrics, err := db.GetJobMetrics(jobID)
if err != nil {
t.Fatalf("Failed to get job metrics: %v", err)
}
if len(jobMetrics) != 2 {
t.Errorf("Expected 2 job metrics, got %d", len(jobMetrics))
}
// Verify system metrics in Redis
cpuTotal := redisHelper.HGet(ctx, systemMetricsKey, "cpu_total").Val()
if cpuTotal != "85.2" {
t.Errorf("Expected CPU total 85.2, got %s", cpuTotal)
}
memoryTotal := redisHelper.HGet(ctx, systemMetricsKey, "memory_total").Val()
if memoryTotal != "4096" {
t.Errorf("Expected memory total 4096, got %s", memoryTotal)
}
}
func TestStorageRedisJobStatusIntegration(t *testing.T) {
t.Parallel() // Enable parallel execution
// Setup Redis and storage
redisHelper := setupRedis(t)
defer func() { _ = redisHelper.Close() }()
tempDir := t.TempDir()
db, err := storage.NewDBFromPath(tempDir + "/test.db")
if err != nil {
t.Fatalf("Failed to create database: %v", err)
}
defer func() { _ = db.Close() }()
schema := fixtures.TestSchema
err = db.Initialize(schema)
if err != nil {
t.Fatalf("Failed to initialize database: %v", err)
}
// Test 4: Job status updates across both systems
jobID := "status-job-1"
// Create initial job
job := &storage.Job{
ID: jobID,
JobName: "Status Test Job",
Status: "pending",
CreatedAt: time.Now(),
UpdatedAt: time.Now(),
Args: "",
Priority: 0,
}
err = db.CreateJob(job)
if err != nil {
t.Fatalf("Failed to create job: %v", err)
}
// Update job status to running
err = db.UpdateJobStatus(jobID, statusRunning, "worker-1", "")
if err != nil {
t.Fatalf("Failed to update job status: %v", err)
}
// Set job status in Redis for real-time tracking
ctx := context.Background()
statusKey := "ml:status:" + jobID
err = redisHelper.Set(ctx, statusKey, statusRunning, time.Hour).Err()
if err != nil {
t.Fatalf("Failed to set job status in Redis: %v", err)
}
// Verify status in database
updatedJob, err := db.GetJob(jobID)
if err != nil {
t.Fatalf("Failed to get updated job: %v", err)
}
if updatedJob.Status != statusRunning {
t.Errorf("Expected job status 'running', got '%s'", updatedJob.Status)
}
// Verify status in Redis
redisStatus := redisHelper.Get(ctx, statusKey).Val()
if redisStatus != statusRunning {
t.Errorf("Expected Redis status 'running', got '%s'", redisStatus)
}
// Test status progression to completed
err = db.UpdateJobStatus(jobID, statusCompleted, "worker-1", "")
if err != nil {
t.Fatalf("Failed to update job status to completed: %v", err)
}
err = redisHelper.Set(ctx, statusKey, statusCompleted, time.Hour).Err()
if err != nil {
t.Fatalf("Failed to update Redis status: %v", err)
}
// Final verification
finalJob, err := db.GetJob(jobID)
if err != nil {
t.Fatalf("Failed to get final job: %v", err)
}
if finalJob.Status != statusCompleted {
t.Errorf("Expected final job status 'completed', got '%s'", finalJob.Status)
}
// Final Redis verification
finalRedisStatus := redisHelper.Get(ctx, statusKey).Val()
if finalRedisStatus != statusCompleted {
t.Errorf("Expected final Redis status 'completed', got '%s'", finalRedisStatus)
}
}