fetch_ml/tests/integration/queue_execution_test.go
Jeremie Fraeys ea15af1833 Fix multi-user authentication and clean up debug code
- Fix YAML tags in auth config struct (json -> yaml)
- Update CLI configs to use pre-hashed API keys
- Remove double hashing in WebSocket client
- Fix port mapping (9102 -> 9103) in CLI commands
- Update permission keys to use jobs:read, jobs:create, etc.
- Clean up all debug logging from CLI and server
- All user roles now authenticate correctly:
  * Admin: Can queue jobs and see all jobs
  * Researcher: Can queue jobs and see own jobs
  * Analyst: Can see status (read-only access)

Multi-user authentication is now fully functional.
2025-12-06 12:35:32 -05:00

531 lines
16 KiB
Go

package tests
import (
"fmt"
"os"
"path/filepath"
"testing"
"time"
"github.com/jfraeys/fetch_ml/internal/fileutil"
tests "github.com/jfraeys/fetch_ml/tests/fixtures"
)
// TestQueueExecution tests that experiments are processed sequentially through the queue
func TestQueueExecution(t *testing.T) {
t.Parallel()
testDir := t.TempDir()
examplesDir := tests.NewExamplesDir(filepath.Join("..", "fixtures", "examples"))
t.Run("QueueSubmission", func(t *testing.T) {
runQueueSubmissionTest(t, testDir, examplesDir)
})
t.Run("SequentialProcessing", func(t *testing.T) {
runSequentialProcessingTest(t, testDir)
})
}
type experimentCase struct {
name string
priority int
exampleDir string
}
func runQueueSubmissionTest(t *testing.T, testDir string, examplesDir *tests.ExamplesDir) {
t.Helper()
queueDir := filepath.Join(testDir, "server", "home", "mluser", "ml_jobs", "pending")
experiments := []experimentCase{
{"sklearn_classification", 1, "sklearn_project"},
{"xgboost_classification", 2, "xgboost_project"},
{"pytorch_nn", 3, "pytorch_project"},
}
for _, exp := range experiments {
setupQueueExperiment(t, testDir, queueDir, examplesDir, exp)
}
verifyQueuedExperiments(t, queueDir, experiments)
}
func setupQueueExperiment(t *testing.T, testDir, queueDir string, examplesDir *tests.ExamplesDir, exp experimentCase) {
t.Helper()
sourceDir := examplesDir.GetPath(exp.exampleDir)
experimentDir := filepath.Join(testDir, exp.name)
if err := tests.CopyDir(sourceDir, experimentDir); err != nil {
t.Fatalf("Failed to copy example %s: %v", exp.exampleDir, err)
}
jobDir := createQueueJobDir(t, queueDir, exp)
copyExperimentArtifacts(t, experimentDir, jobDir)
writeQueueMetadata(t, jobDir, exp)
}
func createQueueJobDir(t *testing.T, queueDir string, exp experimentCase) string {
t.Helper()
timestamp := time.Now().Format("20060102_150405")
jobName := fmt.Sprintf("%s_%s_priority_%d", exp.name, timestamp, exp.priority)
jobDir := filepath.Join(queueDir, jobName)
if err := os.MkdirAll(jobDir, 0750); err != nil {
t.Fatalf("Failed to create queue directory for %s: %v", exp.name, err)
}
return jobDir
}
func copyExperimentArtifacts(t *testing.T, experimentDir, jobDir string) {
t.Helper()
files := []string{"train.py", "requirements.txt", "README.md"}
for _, file := range files {
src := filepath.Join(experimentDir, file)
dst := filepath.Join(jobDir, file)
if _, err := os.Stat(src); os.IsNotExist(err) {
continue
}
data, err := fileutil.SecureFileRead(src)
if err != nil {
t.Fatalf("Failed to read %s: %v", file, err)
}
//nolint:gosec // G306: Script needs execute permissions
if err := os.WriteFile(dst, data, 0750); err != nil {
t.Fatalf("Failed to copy %s: %v", file, err)
}
}
}
func writeQueueMetadata(t *testing.T, jobDir string, exp experimentCase) {
t.Helper()
jobName := filepath.Base(jobDir)
queueMetadata := filepath.Join(jobDir, "queue_metadata.json")
metadata := fmt.Sprintf(`{
"job_name": "%s",
"experiment_name": "%s",
"example_source": "%s",
"priority": %d,
"status": "pending",
"submitted_at": "%s"
}`, jobName, exp.name, exp.exampleDir, exp.priority, time.Now().Format(time.RFC3339))
if err := os.WriteFile(queueMetadata, []byte(metadata), 0600); err != nil {
t.Fatalf("Failed to create queue metadata for %s: %v", exp.name, err)
}
}
func verifyQueuedExperiments(t *testing.T, queueDir string, experiments []experimentCase) {
t.Helper()
for _, exp := range experiments {
pattern := filepath.Join(queueDir, fmt.Sprintf("%s_*_priority_%d", exp.name, exp.priority))
queueJobs, err := filepath.Glob(pattern)
if err != nil || len(queueJobs) == 0 {
t.Errorf("Queue job should exist for %s with priority %d", exp.name, exp.priority)
}
}
}
func runSequentialProcessingTest(t *testing.T, testDir string) {
t.Helper()
pendingDir := filepath.Join(testDir, "server", "home", "mluser", "ml_jobs", "pending")
runningDir := filepath.Join(testDir, "server", "home", "mluser", "ml_jobs", "running")
finishedDir := filepath.Join(testDir, "server", "home", "mluser", "ml_jobs", "finished")
ensureDir(t, runningDir)
ensureDir(t, finishedDir)
for priority := 1; priority <= 3; priority++ {
processJobByPriority(t, pendingDir, runningDir, finishedDir, priority)
}
verifyFinalQueueState(t, pendingDir, runningDir, finishedDir)
}
func ensureDir(t *testing.T, dir string) {
t.Helper()
if err := os.MkdirAll(dir, 0750); err != nil {
t.Fatalf("Failed to create directory %s: %v", dir, err)
}
}
func processJobByPriority(t *testing.T, pendingDir, runningDir, finishedDir string, priority int) {
t.Helper()
jobDir := selectJobByPriority(t, pendingDir, priority)
jobName := filepath.Base(jobDir)
runningJobDir := filepath.Join(runningDir, jobName)
if err := os.Rename(jobDir, runningJobDir); err != nil {
t.Fatalf("Failed to move job %s to running: %v", jobName, err)
}
ensureSingleRunningJob(t, runningDir)
simulateJobExecution(t, runningJobDir, jobName, priority)
finishedJobDir := filepath.Join(finishedDir, jobName)
if err := os.Rename(runningJobDir, finishedJobDir); err != nil {
t.Fatalf("Failed to move job %s to finished: %v", jobName, err)
}
assertDirectoryAbsent(t, jobDir, "pending")
assertDirectoryAbsent(t, runningJobDir, "running")
}
func selectJobByPriority(t *testing.T, pendingDir string, priority int) string {
t.Helper()
pattern := filepath.Join(pendingDir, fmt.Sprintf("*_priority_%d", priority))
jobs, err := filepath.Glob(pattern)
if err != nil {
t.Fatalf("Failed to find jobs with priority %d: %v", priority, err)
}
if len(jobs) == 0 {
t.Fatalf("No job found with priority %d", priority)
}
return jobs[0]
}
func ensureSingleRunningJob(t *testing.T, runningDir string) {
t.Helper()
runningJobs, err := filepath.Glob(filepath.Join(runningDir, "*"))
if err != nil || len(runningJobs) != 1 {
t.Errorf("Expected exactly 1 running job, found %d", len(runningJobs))
}
}
func simulateJobExecution(t *testing.T, runningJobDir, jobName string, priority int) {
t.Helper()
outputDir := filepath.Join(runningJobDir, "results")
if err := os.MkdirAll(outputDir, 0750); err != nil {
t.Fatalf("Failed to create output directory for %s: %v", jobName, err)
}
framework := detectFramework(t, filepath.Join(runningJobDir, "train.py"))
results := fmt.Sprintf(`{
"job_name": "%s",
"framework": "%s",
"priority": %d,
"status": "completed",
"execution_order": %d,
"started_at": "%s",
"completed_at": "%s",
"source": "actual_example"
}`, jobName, framework, priority, priority,
time.Now().Add(-time.Duration(priority)*time.Minute).Format(time.RFC3339),
time.Now().Format(time.RFC3339))
resultsFile := filepath.Join(outputDir, "results.json")
if err := os.WriteFile(resultsFile, []byte(results), 0600); err != nil {
t.Fatalf("Failed to create results for %s: %v", jobName, err)
}
}
func detectFramework(t *testing.T, trainScript string) string {
t.Helper()
scriptContent, err := fileutil.SecureFileRead(trainScript)
if err != nil {
t.Fatalf("Failed to read train.py: %v", err)
}
scriptStr := string(scriptContent)
switch {
case contains(scriptStr, "sklearn"):
return "scikit-learn"
case contains(scriptStr, "xgboost"):
return "xgboost"
case contains(scriptStr, "torch"):
return "pytorch"
case contains(scriptStr, "tensorflow"):
return "tensorflow"
case contains(scriptStr, "statsmodels"):
return "statsmodels"
default:
return "unknown"
}
}
func assertDirectoryAbsent(t *testing.T, path, location string) {
t.Helper()
if _, err := os.Stat(path); !os.IsNotExist(err) {
t.Errorf("Job should no longer be in %s directory: %s", location, path)
}
}
func verifyFinalQueueState(t *testing.T, pendingDir, runningDir, finishedDir string) {
t.Helper()
finishedJobs, err := filepath.Glob(filepath.Join(finishedDir, "*"))
if err != nil || len(finishedJobs) != 3 {
t.Errorf("Expected 3 finished jobs, got %d", len(finishedJobs))
}
pendingJobs, err := filepath.Glob(filepath.Join(pendingDir, "*"))
if err != nil || len(pendingJobs) != 0 {
t.Errorf("Expected 0 pending jobs after processing, found %d", len(pendingJobs))
}
runningJobs, err := filepath.Glob(filepath.Join(runningDir, "*"))
if err != nil || len(runningJobs) != 0 {
t.Errorf("Expected 0 running jobs after processing, found %d", len(runningJobs))
}
}
// TestQueueCapacity tests queue capacity and resource limits
func TestQueueCapacity(t *testing.T) {
t.Parallel() // Enable parallel execution
testDir := t.TempDir()
t.Run("QueueCapacityLimits", func(t *testing.T) {
// Use fixtures for examples directory operations
examplesDir := tests.NewExamplesDir(filepath.Join("..", "fixtures", "examples"))
pendingDir := filepath.Join(testDir, "server", "home", "mluser", "ml_jobs", "pending")
runningDir := filepath.Join(testDir, "server", "home", "mluser", "ml_jobs", "running")
finishedDir := filepath.Join(testDir, "server", "home", "mluser", "ml_jobs", "finished")
// Create directories
if err := os.MkdirAll(pendingDir, 0750); err != nil {
t.Fatalf("Failed to create pending directory: %v", err)
}
if err := os.MkdirAll(runningDir, 0750); err != nil {
t.Fatalf("Failed to create running directory: %v", err)
}
if err := os.MkdirAll(finishedDir, 0750); err != nil {
t.Fatalf("Failed to create finished directory: %v", err)
}
// Create more jobs than server can handle simultaneously using actual examples
examples := []string{
"standard_ml_project", "sklearn_project", "xgboost_project",
"pytorch_project", "tensorflow_project",
}
totalJobs := len(examples)
for i, example := range examples {
jobName := fmt.Sprintf("capacity_test_job_%d", i)
jobDir := filepath.Join(pendingDir, jobName)
if err := os.MkdirAll(jobDir, 0750); err != nil {
t.Fatalf("Failed to create job directory %s: %v", jobDir, err)
}
// Copy actual example files using fixtures
sourceDir := examplesDir.GetPath(example)
// Copy actual example files
if _, err := os.Stat(sourceDir); os.IsNotExist(err) {
// Create minimal files if example doesn't exist
trainScript := filepath.Join(jobDir, "train.py")
script := fmt.Sprintf(`#!/usr/bin/env python3
import json, time
from pathlib import Path
def main():
results = {
"job_id": %d,
"example": "%s",
"status": "completed",
"completion_time": time.strftime("%%Y-%%m-%%d %%H:%%M:%%S")
}
output_dir = Path("./results")
output_dir.mkdir(parents=True, exist_ok=True)
with open(output_dir / "results.json", "w") as f:
json.dump(results, f, indent=2)
if __name__ == "__main__":
main()
`, i, example)
//nolint:gosec // G306: Script needs execute permissions
if err := os.WriteFile(trainScript, []byte(script), 0750); err != nil {
t.Fatalf("Failed to create train script for job %d: %v", i, err)
}
} else {
// Copy actual example files
files := []string{"train.py", "requirements.txt"}
for _, file := range files {
src := filepath.Join(sourceDir, file)
dst := filepath.Join(jobDir, file)
if _, err := os.Stat(src); os.IsNotExist(err) {
continue // Skip if file doesn't exist
}
data, err := fileutil.SecureFileRead(src)
if err != nil {
t.Fatalf("Failed to read %s for job %d: %v", file, i, err)
}
//nolint:gosec // G306: Script needs execute permissions
if err := os.WriteFile(dst, data, 0750); err != nil {
t.Fatalf("Failed to copy %s for job %d: %v", file, i, err)
}
}
}
}
// Verify all jobs are in pending queue
pendingJobs, err := filepath.Glob(filepath.Join(pendingDir, "capacity_test_job_*"))
if err != nil || len(pendingJobs) != totalJobs {
t.Errorf("Expected %d pending jobs, found %d", totalJobs, len(pendingJobs))
}
// Process one job at a time (sequential execution)
for i := 0; i < totalJobs; i++ {
// Move one job to running
jobName := fmt.Sprintf("capacity_test_job_%d", i)
pendingJobDir := filepath.Join(pendingDir, jobName)
runningJobDir := filepath.Join(runningDir, jobName)
if err := os.Rename(pendingJobDir, runningJobDir); err != nil {
t.Fatalf("Failed to move job %d to running: %v", i, err)
}
// Verify only one job is running
runningJobs, err := filepath.Glob(filepath.Join(runningDir, "*"))
if err != nil || len(runningJobs) != 1 {
t.Errorf("Expected exactly 1 running job, found %d", len(runningJobs))
}
// Simulate job completion
time.Sleep(5 * time.Millisecond) // Reduced from 10ms
// Move to finished
finishedJobDir := filepath.Join(finishedDir, jobName)
if err := os.Rename(runningJobDir, finishedJobDir); err != nil {
t.Fatalf("Failed to move job %d to finished: %v", i, err)
}
// Verify no jobs are running between jobs
runningJobs, err = filepath.Glob(filepath.Join(runningDir, "*"))
if err != nil || len(runningJobs) != 0 {
t.Errorf("Expected 0 running jobs between jobs, found %d", len(runningJobs))
}
}
// Verify all jobs completed
finishedJobs, err := filepath.Glob(filepath.Join(finishedDir, "capacity_test_job_*"))
if err != nil || len(finishedJobs) != totalJobs {
t.Errorf("Expected %d finished jobs, found %d", totalJobs, len(finishedJobs))
}
// Verify queue is empty
pendingJobs, err = filepath.Glob(filepath.Join(pendingDir, "capacity_test_job_*"))
if err != nil || len(pendingJobs) != 0 {
t.Errorf("Expected 0 pending jobs after processing, found %d", len(pendingJobs))
}
})
}
// TestResourceIsolation tests that experiments have isolated resources
func TestResourceIsolation(t *testing.T) {
t.Parallel() // Enable parallel execution
testDir := t.TempDir()
t.Run("OutputDirectoryIsolation", func(t *testing.T) {
// Use fixtures for examples directory operations
examplesDir := tests.NewExamplesDir(filepath.Join("..", "fixtures", "examples"))
// Create multiple experiments with same timestamp using actual examples
timestamp := "20231201_143022"
examples := []string{"sklearn_project", "xgboost_project", "pytorch_project"}
runningDir := filepath.Join(testDir, "server", "home", "mluser", "ml_jobs", "running")
for i, expName := range examples {
jobName := fmt.Sprintf("exp%d_%s", i, timestamp)
outputDir := filepath.Join(runningDir, jobName, "results")
if err := os.MkdirAll(outputDir, 0750); err != nil {
t.Fatalf("Failed to create output directory: %v", err)
}
// Copy actual example files using fixtures
sourceDir := examplesDir.GetPath(expName)
// Read actual example to create realistic results
trainScript := filepath.Join(sourceDir, "train.py")
framework := "unknown"
if content, err := fileutil.SecureFileRead(trainScript); err == nil {
scriptStr := string(content)
switch {
case contains(scriptStr, "sklearn"):
framework = "scikit-learn"
case contains(scriptStr, "xgboost"):
framework = "xgboost"
case contains(scriptStr, "torch"):
framework = "pytorch"
}
}
// Create unique results file based on actual framework
resultsFile := filepath.Join(outputDir, "results.json")
results := fmt.Sprintf(`{
"experiment": "exp%d",
"framework": "%s",
"job_name": "%s",
"output_dir": "%s",
"example_source": "%s",
"unique_id": "exp%d_%d"
}`, i, framework, jobName, outputDir, expName, i, time.Now().UnixNano())
if err := os.WriteFile(resultsFile, []byte(results), 0600); err != nil {
t.Fatalf("Failed to create results for %s: %v", expName, err)
}
}
// Verify each experiment has its own isolated output directory
for i, expName := range examples {
jobName := fmt.Sprintf("exp%d_%s", i, timestamp)
outputDir := filepath.Join(runningDir, jobName, "results")
resultsFile := filepath.Join(outputDir, "results.json")
if _, err := os.Stat(resultsFile); os.IsNotExist(err) {
t.Errorf("Results file should exist for %s in isolated directory", expName)
}
// Verify content is unique
content, err := fileutil.SecureFileRead(resultsFile)
if err != nil {
t.Fatalf("Failed to read results for %s: %v", expName, err)
}
if !contains(string(content), fmt.Sprintf("exp%d", i)) {
t.Errorf("Results file should contain experiment ID exp%d", i)
}
if !contains(string(content), expName) {
t.Errorf("Results file should contain example source %s", expName)
}
}
})
}
// Helper function to check if string contains substring
func contains(s, substr string) bool {
return len(s) >= len(substr) && (s == substr || len(s) > len(substr) &&
(s[:len(substr)] == substr || s[len(s)-len(substr):] == substr ||
findSubstring(s, substr)))
}
func findSubstring(s, substr string) bool {
for i := 0; i <= len(s)-len(substr); i++ {
if s[i:i+len(substr)] == substr {
return true
}
}
return false
}