// data_manager.go - Fetch data from NAS to ML server on-demand package main import ( "context" "encoding/json" "fmt" "log" "log/slog" "os" "os/signal" "path/filepath" "strings" "syscall" "time" "github.com/jfraeys/fetch_ml/internal/auth" "github.com/jfraeys/fetch_ml/internal/container" "github.com/jfraeys/fetch_ml/internal/errors" "github.com/jfraeys/fetch_ml/internal/logging" "github.com/jfraeys/fetch_ml/internal/network" "github.com/jfraeys/fetch_ml/internal/queue" "github.com/jfraeys/fetch_ml/internal/telemetry" ) // SSHClient alias for convenience type SSHClient = network.SSHClient type DataManager struct { config *DataConfig mlServer *SSHClient nasServer *SSHClient taskQueue *queue.TaskQueue ctx context.Context cancel context.CancelFunc logger *logging.Logger } type DataFetchRequest struct { JobName string `json:"job_name"` Datasets []string `json:"datasets"` // Dataset names to fetch Priority int `json:"priority"` RequestedAt time.Time `json:"requested_at"` } type DatasetInfo struct { Name string `json:"name"` SizeBytes int64 `json:"size_bytes"` Location string `json:"location"` // "nas" or "ml" LastAccess time.Time `json:"last_access"` } func NewDataManager(cfg *DataConfig, apiKey string) (*DataManager, error) { mlServer, err := network.NewSSHClient(cfg.MLHost, cfg.MLUser, cfg.MLSSHKey, cfg.MLPort, "") if err != nil { return nil, fmt.Errorf("ML server connection failed: %w", err) } defer func() { if err != nil { if closeErr := mlServer.Close(); closeErr != nil { log.Printf("Warning: failed to close ML server connection: %v", closeErr) } } }() nasServer, err := network.NewSSHClient(cfg.NASHost, cfg.NASUser, cfg.NASSSHKey, cfg.NASPort, "") if err != nil { return nil, fmt.Errorf("NAS connection failed: %w", err) } defer func() { if err != nil { if closeErr := nasServer.Close(); closeErr != nil { log.Printf("Warning: failed to close NAS server connection: %v", closeErr) } } }() // Create MLDataDir if it doesn't exist (for production without NAS) if cfg.MLDataDir != "" { if _, err := mlServer.Exec(fmt.Sprintf("mkdir -p %s", cfg.MLDataDir)); err != nil { logger := logging.NewLogger(slog.LevelInfo, false) logger.Job(context.Background(), "data_manager", "").Error("Failed to create ML data directory", "dir", cfg.MLDataDir, "error", err) } } // Setup Redis using internal queue ctx, cancel := context.WithCancel(context.Background()) logger := logging.NewLogger(slog.LevelInfo, false) var taskQueue *queue.TaskQueue if cfg.RedisAddr != "" { queueCfg := queue.Config{ RedisAddr: cfg.RedisAddr, RedisPassword: cfg.RedisPassword, RedisDB: cfg.RedisDB, } var err error taskQueue, err = queue.NewTaskQueue(queueCfg) if err != nil { // FIXED: Check error return values for cleanup if closeErr := mlServer.Close(); closeErr != nil { logger.Warn("failed to close ML server during error cleanup", "error", closeErr) } if closeErr := nasServer.Close(); closeErr != nil { logger.Warn("failed to close NAS server during error cleanup", "error", closeErr) } cancel() // Cancel context to prevent leak return nil, fmt.Errorf("redis connection failed: %w", err) } } else { taskQueue = nil // Local mode - no Redis } return &DataManager{ config: cfg, mlServer: mlServer, nasServer: nasServer, taskQueue: taskQueue, ctx: ctx, cancel: cancel, logger: logger, }, nil } func (dm *DataManager) FetchDataset(jobName, datasetName string) error { ctx, cancel := context.WithTimeout(dm.ctx, 30*time.Minute) defer cancel() return network.RetryForNetworkOperations(ctx, func() error { return dm.fetchDatasetInternal(ctx, jobName, datasetName) }) } func (dm *DataManager) fetchDatasetInternal(ctx context.Context, jobName, datasetName string) error { if err := container.ValidateJobName(datasetName); err != nil { return &errors.DataFetchError{ Dataset: datasetName, JobName: jobName, Err: fmt.Errorf("invalid dataset name: %w", err), } } logger := dm.logger.Job(ctx, jobName, "") logger.Info("fetching dataset", "dataset", datasetName) // Validate dataset size and run cleanup if needed if err := dm.ValidateDatasetWithCleanup(datasetName); err != nil { return &errors.DataFetchError{ Dataset: datasetName, JobName: jobName, Err: fmt.Errorf("dataset size validation failed: %w", err), } } nasPath := filepath.Join(dm.config.NASDataDir, datasetName) mlPath := filepath.Join(dm.config.MLDataDir, datasetName) // Check if dataset exists on NAS if !dm.nasServer.FileExists(nasPath) { return &errors.DataFetchError{ Dataset: datasetName, JobName: jobName, Err: fmt.Errorf("dataset not found on NAS"), } } // Check if already on ML server if dm.mlServer.FileExists(mlPath) { logger.Info("dataset already on ML server", "dataset", datasetName) dm.updateLastAccess(datasetName) return nil } // Get size for progress tracking size, err := dm.nasServer.GetFileSize(nasPath) if err != nil { logger.Warn("could not get dataset size", "dataset", datasetName, "error", err) size = 0 } sizeGB := float64(size) / (1024 * 1024 * 1024) logger.Info("transferring dataset", "dataset", datasetName, "size_gb", sizeGB, "nas_path", nasPath, "ml_path", mlPath) if dm.taskQueue != nil { redisClient := dm.taskQueue.GetRedisClient() if err := redisClient.HSet(dm.ctx, fmt.Sprintf("ml:data:transfer:%s", datasetName), "status", "transferring", "job_name", jobName, "size_bytes", size, "started_at", time.Now().Unix()).Err(); err != nil { logger.Warn("failed to record transfer start in Redis", "error", err) } } // Use local copy for local mode, rsync for remote mode var rsyncCmd string if dm.config.NASHost == "" || dm.config.NASUser == "" { // Local mode - use cp rsyncCmd = fmt.Sprintf("mkdir -p %s && cp -r %s %s/", dm.config.MLDataDir, nasPath, mlPath) } else { // Remote mode - use rsync over SSH rsyncCmd = fmt.Sprintf( "mkdir -p %s && rsync -avz --progress %s@%s:%s/ %s/", dm.config.MLDataDir, dm.config.NASUser, dm.config.NASHost, nasPath, mlPath, ) } ioBefore, ioErr := telemetry.ReadProcessIO() start := time.Now() out, err := telemetry.ExecWithMetrics(dm.logger, "dataset transfer", time.Since(start), func() (string, error) { return dm.nasServer.ExecContext(ctx, rsyncCmd) }) duration := time.Since(start) if err != nil { logger.Error("transfer failed", "dataset", datasetName, "duration", duration, "error", err, "output", out) if ioErr == nil { if after, readErr := telemetry.ReadProcessIO(); readErr == nil { delta := telemetry.DiffIO(ioBefore, after) logger.Debug("transfer io stats", "dataset", datasetName, "read_bytes", delta.ReadBytes, "write_bytes", delta.WriteBytes) } } if dm.taskQueue != nil { redisClient := dm.taskQueue.GetRedisClient() if redisErr := redisClient.HSet(dm.ctx, fmt.Sprintf("ml:data:transfer:%s", datasetName), "status", "failed", "error", err.Error()).Err(); redisErr != nil { logger.Warn("failed to record transfer failure in Redis", "error", redisErr) } } return err } logger.Info("transfer complete", "dataset", datasetName, "duration", duration, "size_gb", sizeGB) if ioErr == nil { if after, readErr := telemetry.ReadProcessIO(); readErr == nil { delta := telemetry.DiffIO(ioBefore, after) logger.Debug("transfer io stats", "dataset", datasetName, "read_bytes", delta.ReadBytes, "write_bytes", delta.WriteBytes) } } if dm.taskQueue != nil { redisClient := dm.taskQueue.GetRedisClient() if err := redisClient.HSet(dm.ctx, fmt.Sprintf("ml:data:transfer:%s", datasetName), "status", "completed", "completed_at", time.Now().Unix(), "duration_seconds", duration.Seconds()).Err(); err != nil { logger.Warn("failed to record transfer completion in Redis", "error", err) } } // Track dataset metadata dm.saveDatasetInfo(datasetName, size) return nil } func (dm *DataManager) saveDatasetInfo(name string, size int64) { if dm.taskQueue == nil { return // Skip in local mode } info := DatasetInfo{ Name: name, SizeBytes: size, Location: "ml", LastAccess: time.Now(), } data, _ := json.Marshal(info) if dm.taskQueue != nil { redisClient := dm.taskQueue.GetRedisClient() if err := redisClient.Set(dm.ctx, fmt.Sprintf("ml:dataset:%s", name), data, 0).Err(); err != nil { dm.logger.Job(dm.ctx, "data_manager", "").Warn("failed to save dataset info to Redis", "dataset", name, "error", err) } } } func (dm *DataManager) updateLastAccess(name string) { if dm.taskQueue == nil { return // Skip in local mode } key := fmt.Sprintf("ml:dataset:%s", name) redisClient := dm.taskQueue.GetRedisClient() data, err := redisClient.Get(dm.ctx, key).Result() if err != nil { return } var info DatasetInfo if err := json.Unmarshal([]byte(data), &info); err != nil { return } info.LastAccess = time.Now() newData, _ := json.Marshal(info) redisClient = dm.taskQueue.GetRedisClient() if err := redisClient.Set(dm.ctx, key, newData, 0).Err(); err != nil { dm.logger.Job(dm.ctx, "data_manager", "").Warn("failed to update last access in Redis", "dataset", name, "error", err) } } // ListDatasetsOnML returns a list of all datasets currently stored on the ML server. func (dm *DataManager) ListDatasetsOnML() ([]DatasetInfo, error) { out, err := dm.mlServer.Exec(fmt.Sprintf("ls -1 %s 2>/dev/null", dm.config.MLDataDir)) if err != nil { return nil, err } var datasets []DatasetInfo for name := range strings.SplitSeq(strings.TrimSpace(out), "\n") { if name == "" { continue } var info DatasetInfo // Only use Redis if available if dm.taskQueue != nil { redisClient := dm.taskQueue.GetRedisClient() key := fmt.Sprintf("ml:dataset:%s", name) data, err := redisClient.Get(dm.ctx, key).Result() if err == nil { if unmarshalErr := json.Unmarshal([]byte(data), &info); unmarshalErr != nil { // Fallback to disk if unmarshal fails size, _ := dm.mlServer.GetFileSize(filepath.Join(dm.config.MLDataDir, name)) info = DatasetInfo{ Name: name, SizeBytes: size, Location: "ml", } } } else { // Fallback: get from disk size, _ := dm.mlServer.GetFileSize(filepath.Join(dm.config.MLDataDir, name)) info = DatasetInfo{ Name: name, SizeBytes: size, Location: "ml", } } } else { // Local mode: get from disk size, _ := dm.mlServer.GetFileSize(filepath.Join(dm.config.MLDataDir, name)) info = DatasetInfo{ Name: name, SizeBytes: size, Location: "ml", } } datasets = append(datasets, info) } return datasets, nil } func (dm *DataManager) CleanupOldData() error { logger := dm.logger.Job(dm.ctx, "data_manager", "") logger.Info("running data cleanup") datasets, err := dm.ListDatasetsOnML() if err != nil { return err } var totalSize int64 for _, ds := range datasets { totalSize += ds.SizeBytes } totalSizeGB := float64(totalSize) / (1024 * 1024 * 1024) logger.Info("current storage usage", "total_size_gb", totalSizeGB, "dataset_count", len(datasets)) // Delete datasets older than max age or if over size limit maxAge := time.Duration(dm.config.MaxAgeHours) * time.Hour maxSize := int64(dm.config.MaxSizeGB) * 1024 * 1024 * 1024 var deleted []string for _, ds := range datasets { shouldDelete := false // Check age if !ds.LastAccess.IsZero() && time.Since(ds.LastAccess) > maxAge { logger.Info("dataset is old, marking for deletion", "dataset", ds.Name, "last_access", ds.LastAccess, "age_hours", time.Since(ds.LastAccess).Hours()) shouldDelete = true } // Check if over size limit if totalSize > maxSize { logger.Info("over size limit, deleting oldest dataset", "dataset", ds.Name, "current_size_gb", totalSizeGB, "max_size_gb", dm.config.MaxSizeGB) shouldDelete = true } if shouldDelete { path := filepath.Join(dm.config.MLDataDir, ds.Name) logger.Info("deleting dataset", "dataset", ds.Name, "path", path) if _, err := dm.mlServer.Exec(fmt.Sprintf("rm -rf %s", path)); err != nil { logger.Error("failed to delete dataset", "dataset", ds.Name, "error", err) continue } deleted = append(deleted, ds.Name) totalSize -= ds.SizeBytes // FIXED: Remove from Redis only if available, with error handling if dm.taskQueue != nil { redisClient := dm.taskQueue.GetRedisClient() if err := redisClient.Del(dm.ctx, fmt.Sprintf("ml:dataset:%s", ds.Name)).Err(); err != nil { logger.Warn("failed to delete dataset from Redis", "dataset", ds.Name, "error", err) } } } } if len(deleted) > 0 { logger.Info("cleanup complete", "deleted_count", len(deleted), "deleted_datasets", deleted) } else { logger.Info("cleanup complete", "deleted_count", 0) } return nil } // GetAvailableDiskSpace returns available disk space in bytes func (dm *DataManager) GetAvailableDiskSpace() int64 { logger := dm.logger.Job(dm.ctx, "data_manager", "") // Check disk space on ML server cmd := "df -k " + dm.config.MLDataDir + " | tail -1 | awk '{print $4}'" output, err := dm.mlServer.Exec(cmd) if err != nil { logger.Error("failed to get disk space", "error", err) return 0 } // Parse KB to bytes var freeKB int64 _, err = fmt.Sscanf(strings.TrimSpace(output), "%d", &freeKB) if err != nil { logger.Error("failed to parse disk space", "error", err, "output", output) return 0 } return freeKB * 1024 // Convert KB to bytes } // GetDatasetInfo returns information about a dataset from NAS func (dm *DataManager) GetDatasetInfo(datasetName string) (*DatasetInfo, error) { // Check if dataset exists on NAS nasPath := filepath.Join(dm.config.NASDataDir, datasetName) cmd := fmt.Sprintf("test -d %s && echo 'exists'", nasPath) output, err := dm.nasServer.Exec(cmd) if err != nil || strings.TrimSpace(output) != "exists" { return nil, fmt.Errorf("dataset %s not found on NAS", datasetName) } // Get dataset size cmd = fmt.Sprintf("du -sb %s | cut -f1", nasPath) output, err = dm.nasServer.Exec(cmd) if err != nil { return nil, fmt.Errorf("failed to get dataset size: %w", err) } var sizeBytes int64 _, err = fmt.Sscanf(strings.TrimSpace(output), "%d", &sizeBytes) if err != nil { return nil, fmt.Errorf("failed to parse dataset size: %w", err) } // Get last modification time as proxy for last access cmd = fmt.Sprintf("stat -c %%Y %s", nasPath) output, err = dm.nasServer.Exec(cmd) if err != nil { return nil, fmt.Errorf("failed to get dataset timestamp: %w", err) } var modTime int64 _, err = fmt.Sscanf(strings.TrimSpace(output), "%d", &modTime) if err != nil { return nil, fmt.Errorf("failed to parse timestamp: %w", err) } return &DatasetInfo{ Name: datasetName, SizeBytes: sizeBytes, Location: "nas", LastAccess: time.Unix(modTime, 0), }, nil } // ValidateDatasetWithCleanup checks if dataset fits and runs cleanup if needed func (dm *DataManager) ValidateDatasetWithCleanup(datasetName string) error { logger := dm.logger.Job(dm.ctx, "data_manager", "") // Get dataset info info, err := dm.GetDatasetInfo(datasetName) if err != nil { return fmt.Errorf("failed to get dataset info: %w", err) } // Check current available space availableSpace := dm.GetAvailableDiskSpace() logger.Info("dataset size validation", "dataset", datasetName, "dataset_size_gb", float64(info.SizeBytes)/(1024*1024*1024), "available_gb", float64(availableSpace)/(1024*1024*1024)) // If enough space, proceed if info.SizeBytes <= availableSpace { logger.Info("sufficient space available", "dataset", datasetName) return nil } // Try cleanup first logger.Info("insufficient space, running cleanup", "dataset", datasetName, "required_gb", float64(info.SizeBytes)/(1024*1024*1024), "available_gb", float64(availableSpace)/(1024*1024*1024)) if err := dm.CleanupOldData(); err != nil { return fmt.Errorf("cleanup failed: %w", err) } // Check space again after cleanup availableSpace = dm.GetAvailableDiskSpace() logger.Info("space after cleanup", "available_gb", float64(availableSpace)/(1024*1024*1024)) // If now enough space, proceed if info.SizeBytes <= availableSpace { logger.Info("cleanup freed enough space", "dataset", datasetName) return nil } // Still not enough space return fmt.Errorf("dataset %s (%.2fGB) too large for available space (%.2fGB) even after cleanup", datasetName, float64(info.SizeBytes)/(1024*1024*1024), float64(availableSpace)/(1024*1024*1024)) } func (dm *DataManager) StartCleanupLoop() { logger := dm.logger.Job(dm.ctx, "data_manager", "") ticker := time.NewTicker(time.Duration(dm.config.CleanupInterval) * time.Minute) go func() { defer ticker.Stop() for { select { case <-dm.ctx.Done(): logger.Info("cleanup loop stopping") return case <-ticker.C: if err := dm.CleanupOldData(); err != nil { logger.Error("cleanup error", "error", err) } } } }() } // Close gracefully shuts down the DataManager, stopping the cleanup loop and // closing all connections to ML server, NAS server, and Redis. func (dm *DataManager) Close() { dm.cancel() // Cancel context to stop cleanup loop // Wait a moment for cleanup loop to finish time.Sleep(100 * time.Millisecond) if dm.mlServer != nil { if err := dm.mlServer.Close(); err != nil { dm.logger.Job(dm.ctx, "data_manager", "").Warn("error closing ML server connection", "error", err) } } if dm.nasServer != nil { if err := dm.nasServer.Close(); err != nil { dm.logger.Job(dm.ctx, "data_manager", "").Warn("error closing NAS server connection", "error", err) } } if dm.taskQueue != nil { if err := dm.taskQueue.Close(); err != nil { dm.logger.Job(dm.ctx, "data_manager", "").Warn("error closing Redis connection", "error", err) } } } func main() { // Parse authentication flags authFlags := auth.ParseAuthFlags() if err := auth.ValidateAuthFlags(authFlags); err != nil { log.Fatalf("Authentication flag error: %v", err) } // Get API key from various sources apiKey := auth.GetAPIKeyFromSources(authFlags) configFile := "configs/config-local.yaml" if authFlags.ConfigFile != "" { configFile = authFlags.ConfigFile } // Parse command line args if len(os.Args) < 2 { fmt.Println("Usage:") fmt.Println(" data_manager [--config configs/config-local.yaml] [--api-key ] fetch [dataset...]") fmt.Println(" data_manager [--config configs/config-local.yaml] [--api-key ] list") fmt.Println(" data_manager [--config configs/config-local.yaml] [--api-key ] cleanup") fmt.Println(" data_manager [--config configs/config-local.yaml] [--api-key ] validate ") fmt.Println(" data_manager [--config configs/config-local.yaml] [--api-key ] daemon") fmt.Println() auth.PrintAuthHelp() os.Exit(1) } // Check for --config flag if len(os.Args) >= 3 && os.Args[1] == "--config" { configFile = os.Args[2] // Shift args os.Args = append([]string{os.Args[0]}, os.Args[3:]...) } cfg, err := LoadDataConfig(configFile) if err != nil { log.Fatalf("Failed to load config: %v", err) } // Validate authentication configuration if err := cfg.Auth.ValidateAuthConfig(); err != nil { log.Fatalf("Invalid authentication configuration: %v", err) } // Validate configuration if err := cfg.Validate(); err != nil { log.Fatalf("Invalid configuration: %v", err) } // Test authentication if enabled if cfg.Auth.Enabled && apiKey != "" { user, err := cfg.Auth.ValidateAPIKey(apiKey) if err != nil { log.Fatalf("Authentication failed: %v", err) } log.Printf("Data manager authenticated as user: %s (admin: %v)", user.Name, user.Admin) } else if cfg.Auth.Enabled { log.Fatal("Authentication required but no API key provided") } dm, err := NewDataManager(cfg, apiKey) if err != nil { log.Fatalf("Failed to create data manager: %v", err) } defer dm.Close() cmd := os.Args[1] switch cmd { case "fetch": if len(os.Args) < 4 { log.Fatal("Usage: data_manager fetch [dataset...]") } jobName := os.Args[2] datasets := os.Args[3:] for _, dataset := range datasets { if err := dm.FetchDataset(jobName, dataset); err != nil { dm.logger.Job(context.Background(), jobName, "").Error("failed to fetch dataset", "dataset", dataset, "error", err) } } case "list": datasets, err := dm.ListDatasetsOnML() if err != nil { log.Fatalf("Failed to list datasets: %v", err) } fmt.Println("Datasets on ML server:") fmt.Println("======================") var totalSize int64 for _, ds := range datasets { sizeMB := float64(ds.SizeBytes) / (1024 * 1024) lastAccess := "unknown" if !ds.LastAccess.IsZero() { lastAccess = ds.LastAccess.Format("2006-01-02 15:04:05") } fmt.Printf("%-30s %10.2f MB Last access: %s\n", ds.Name, sizeMB, lastAccess) totalSize += ds.SizeBytes } fmt.Printf("\nTotal: %.2f GB\n", float64(totalSize)/(1024*1024*1024)) case "validate": if len(os.Args) < 3 { log.Fatal("Usage: data_manager validate ") } dataset := os.Args[2] fmt.Printf("Validating dataset: %s\n", dataset) if err := dm.ValidateDatasetWithCleanup(dataset); err != nil { log.Fatalf("Validation failed: %v", err) } fmt.Printf("✅ Dataset %s can be downloaded\n", dataset) case "cleanup": if err := dm.CleanupOldData(); err != nil { log.Fatalf("Cleanup failed: %v", err) } case "daemon": logger := dm.logger.Job(context.Background(), "data_manager", "") logger.Info("starting data manager daemon") dm.StartCleanupLoop() logger.Info("cleanup configuration", "interval_minutes", cfg.CleanupInterval, "max_age_hours", cfg.MaxAgeHours, "max_size_gb", cfg.MaxSizeGB) // Handle graceful shutdown sigChan := make(chan os.Signal, 1) signal.Notify(sigChan, syscall.SIGINT, syscall.SIGTERM) sig := <-sigChan logger.Info("received shutdown signal", "signal", sig) dm.Close() logger.Info("data manager shut down gracefully") default: log.Fatalf("Unknown command: %s", cmd) } }