// data_manager.go - Fetch data from NAS to ML server on-demand package main import ( "context" "fmt" "log" "log/slog" "os" "os/signal" "path/filepath" "sort" "strings" "syscall" "time" "github.com/jfraeys/fetch_ml/internal/auth" "github.com/jfraeys/fetch_ml/internal/container" "github.com/jfraeys/fetch_ml/internal/errtypes" "github.com/jfraeys/fetch_ml/internal/logging" "github.com/jfraeys/fetch_ml/internal/network" "github.com/jfraeys/fetch_ml/internal/queue" "github.com/jfraeys/fetch_ml/internal/storage" "github.com/jfraeys/fetch_ml/internal/telemetry" ) func shellQuote(s string) string { if s == "" { return "''" } return "'" + strings.ReplaceAll(s, "'", "'\"'\"'") + "'" } // SSHClient alias for convenience. type SSHClient = network.SSHClient // DataManager manages data synchronization between NAS and ML server. type DataManager struct { config *DataConfig mlServer *SSHClient nasServer *SSHClient taskQueue *queue.TaskQueue datasetStore *storage.DatasetStore ctx context.Context cancel context.CancelFunc logger *logging.Logger } func (dm *DataManager) archiveDatasetOnML(datasetName string) (string, error) { datasetName = strings.TrimSpace(datasetName) if err := container.ValidateJobName(datasetName); err != nil { return "", fmt.Errorf("invalid dataset name: %w", err) } if strings.TrimSpace(dm.config.MLDataDir) == "" { return "", fmt.Errorf("missing ml_data_dir") } stamp := time.Now().UTC().Format("20060102-150405") archiveRoot := filepath.Join(dm.config.MLDataDir, ".archive", stamp) src := filepath.Join(dm.config.MLDataDir, datasetName) dst := filepath.Join(archiveRoot, datasetName) cmd := fmt.Sprintf( "mkdir -p %s && mv %s %s", shellQuote(archiveRoot), shellQuote(src), shellQuote(dst), ) if _, err := dm.mlServer.Exec(cmd); err != nil { return "", err } return dst, nil } // DataFetchRequest represents a request to fetch datasets. type DataFetchRequest struct { JobName string `json:"job_name"` Datasets []string `json:"datasets"` // Dataset names to fetch Priority int `json:"priority"` RequestedAt time.Time `json:"requested_at"` } // DatasetInfo contains information about a dataset. type DatasetInfo struct { Name string `json:"name"` SizeBytes int64 `json:"size_bytes"` Location string `json:"location"` // "nas" or "ml" LastAccess time.Time `json:"last_access"` } // NewDataManager creates a new DataManager instance. func NewDataManager(cfg *DataConfig, _ string) (*DataManager, error) { mlServer, err := network.NewSSHClient(cfg.MLHost, cfg.MLUser, cfg.MLSSHKey, cfg.MLPort, "") if err != nil { return nil, fmt.Errorf("ML server connection failed: %w", err) } defer func() { if err != nil { if closeErr := mlServer.Close(); closeErr != nil { log.Printf("Warning: failed to close ML server connection: %v", closeErr) } } }() nasServer, err := network.NewSSHClient(cfg.NASHost, cfg.NASUser, cfg.NASSSHKey, cfg.NASPort, "") if err != nil { return nil, fmt.Errorf("NAS connection failed: %w", err) } defer func() { if err != nil { if closeErr := nasServer.Close(); closeErr != nil { log.Printf("Warning: failed to close NAS server connection: %v", closeErr) } } }() // Create MLDataDir if it doesn't exist (for production without NAS) if cfg.MLDataDir != "" { if _, err := mlServer.Exec(fmt.Sprintf("mkdir -p %s", cfg.MLDataDir)); err != nil { logger := logging.NewLogger(slog.LevelInfo, false) logger.Job(context.Background(), "data_manager", "").Error( "Failed to create ML data directory", "dir", cfg.MLDataDir, "error", err, ) } } // Setup Redis using internal queue ctx, cancel := context.WithCancel(context.Background()) logger := logging.NewLogger(slog.LevelInfo, false) var taskQueue *queue.TaskQueue var datasetStore *storage.DatasetStore if cfg.RedisAddr != "" { queueCfg := queue.Config{ RedisAddr: cfg.RedisAddr, RedisPassword: cfg.RedisPassword, RedisDB: cfg.RedisDB, } var err error taskQueue, err = queue.NewTaskQueue(queueCfg) if err != nil { // FIXED: Check error return values for cleanup if closeErr := mlServer.Close(); closeErr != nil { logger.Warn("failed to close ML server during error cleanup", "error", closeErr) } if closeErr := nasServer.Close(); closeErr != nil { logger.Warn("failed to close NAS server during error cleanup", "error", closeErr) } cancel() // Cancel context to prevent leak return nil, fmt.Errorf("redis connection failed: %w", err) } // Initialize dataset store with the Redis client datasetStore = storage.NewDatasetStoreWithContext(taskQueue.GetRedisClient(), ctx) } else { taskQueue = nil // Local mode - no Redis datasetStore = nil } return &DataManager{ config: cfg, mlServer: mlServer, nasServer: nasServer, taskQueue: taskQueue, datasetStore: datasetStore, ctx: ctx, cancel: cancel, logger: logger, }, nil } // FetchDataset fetches a dataset from NAS to ML server. func (dm *DataManager) FetchDataset(jobName, datasetName string) error { ctx, cancel := context.WithTimeout(dm.ctx, 30*time.Minute) defer cancel() return network.RetryForNetworkOperations(ctx, func() error { return dm.fetchDatasetInternal(ctx, jobName, datasetName) }) } func (dm *DataManager) fetchDatasetInternal( ctx context.Context, jobName string, datasetName string, ) error { if err := container.ValidateJobName(datasetName); err != nil { return &errtypes.DataFetchError{ Dataset: datasetName, JobName: jobName, Err: fmt.Errorf("invalid dataset name: %w", err), } } logger := dm.logger.Job(ctx, jobName, "") logger.Info("fetching dataset", "dataset", datasetName) // Validate dataset size and run cleanup if needed if err := dm.ValidateDatasetWithCleanup(datasetName); err != nil { return &errtypes.DataFetchError{ Dataset: datasetName, JobName: jobName, Err: fmt.Errorf("dataset size validation failed: %w", err), } } nasPath := filepath.Join(dm.config.NASDataDir, datasetName) mlPath := filepath.Join(dm.config.MLDataDir, datasetName) // Check if dataset exists on NAS if !dm.nasServer.FileExists(nasPath) { return &errtypes.DataFetchError{ Dataset: datasetName, JobName: jobName, Err: fmt.Errorf("dataset not found on NAS"), } } // Check if already on ML server if dm.mlServer.FileExists(mlPath) { logger.Info("dataset already on ML server", "dataset", datasetName) dm.updateLastAccess(datasetName) return nil } // Get size for progress tracking size, err := dm.nasServer.GetFileSize(nasPath) if err != nil { logger.Warn("could not get dataset size", "dataset", datasetName, "error", err) size = 0 } sizeGB := float64(size) / (1024 * 1024 * 1024) logger.Info("transferring dataset", "dataset", datasetName, "size_gb", sizeGB, "nas_path", nasPath, "ml_path", mlPath) if dm.datasetStore != nil { if err := dm.datasetStore.RecordTransferStart(dm.ctx, datasetName, jobName, size); err != nil { logger.Warn("failed to record transfer start in Redis", "error", err) } } // Use local copy for local mode, rsync for remote mode var rsyncCmd string if dm.config.NASHost == "" || dm.config.NASUser == "" { // Local mode - use cp rsyncCmd = fmt.Sprintf("mkdir -p %s && cp -r %s %s/", dm.config.MLDataDir, nasPath, mlPath) } else { // Remote mode - use rsync over SSH rsyncCmd = fmt.Sprintf( "mkdir -p %s && rsync -avz --progress %s@%s:%s/ %s/", dm.config.MLDataDir, dm.config.NASUser, dm.config.NASHost, nasPath, mlPath, ) } ioBefore, ioErr := telemetry.ReadProcessIO() start := time.Now() out, err := telemetry.ExecWithMetrics( dm.logger, "dataset transfer", time.Since(start), func() (string, error) { return dm.nasServer.ExecContext(ctx, rsyncCmd) }, ) duration := time.Since(start) if err != nil { logger.Error("transfer failed", "dataset", datasetName, "duration", duration, "error", err, "output", out) if ioErr == nil { if after, readErr := telemetry.ReadProcessIO(); readErr == nil { delta := telemetry.DiffIO(ioBefore, after) logger.Debug("transfer io stats", "dataset", datasetName, "read_bytes", delta.ReadBytes, "write_bytes", delta.WriteBytes) } } if dm.datasetStore != nil { if redisErr := dm.datasetStore.RecordTransferFailure(dm.ctx, datasetName, err); redisErr != nil { logger.Warn("failed to record transfer failure in Redis", "error", redisErr) } } return err } logger.Info("transfer complete", "dataset", datasetName, "duration", duration, "size_gb", sizeGB) if ioErr == nil { if after, readErr := telemetry.ReadProcessIO(); readErr == nil { delta := telemetry.DiffIO(ioBefore, after) logger.Debug("transfer io stats", "dataset", datasetName, "read_bytes", delta.ReadBytes, "write_bytes", delta.WriteBytes) } } if dm.datasetStore != nil { if err := dm.datasetStore.RecordTransferComplete(dm.ctx, datasetName, duration); err != nil { logger.Warn("failed to record transfer completion in Redis", "error", err) } } // Track dataset metadata dm.saveDatasetInfo(datasetName, size) return nil } func (dm *DataManager) saveDatasetInfo(name string, size int64) { if dm.datasetStore == nil { return // Skip in local mode } info := storage.DatasetInfo{ Name: name, SizeBytes: size, Location: "ml", LastAccess: time.Now(), } if err := dm.datasetStore.SaveDatasetInfo(dm.ctx, info); err != nil { dm.logger.Job(dm.ctx, "data_manager", "").Warn("failed to save dataset info to Redis", "dataset", name, "error", err) } } func (dm *DataManager) updateLastAccess(name string) { if dm.datasetStore == nil { return // Skip in local mode } if err := dm.datasetStore.UpdateLastAccess(dm.ctx, name); err != nil { dm.logger.Job(dm.ctx, "data_manager", "").Warn("failed to update last access in Redis", "dataset", name, "error", err) } } // ListDatasetsOnML returns a list of all datasets currently stored on the ML server. func (dm *DataManager) ListDatasetsOnML() ([]DatasetInfo, error) { out, err := dm.mlServer.Exec(fmt.Sprintf("ls -1 %s 2>/dev/null", dm.config.MLDataDir)) if err != nil { return nil, err } var datasets []DatasetInfo for name := range strings.SplitSeq(strings.TrimSpace(out), "\n") { if name == "" { continue } var info DatasetInfo // Only use Redis if available if dm.datasetStore != nil { dsInfo, err := dm.datasetStore.GetDatasetInfo(dm.ctx, name) if err == nil && dsInfo != nil { info = DatasetInfo{ Name: dsInfo.Name, SizeBytes: dsInfo.SizeBytes, Location: dsInfo.Location, LastAccess: dsInfo.LastAccess, } } else { // Fallback: get from disk size, _ := dm.mlServer.GetFileSize(filepath.Join(dm.config.MLDataDir, name)) info = DatasetInfo{ Name: name, SizeBytes: size, Location: "ml", } } } else { // Local mode: get from disk size, _ := dm.mlServer.GetFileSize(filepath.Join(dm.config.MLDataDir, name)) info = DatasetInfo{ Name: name, SizeBytes: size, Location: "ml", } } datasets = append(datasets, info) } return datasets, nil } // CleanupOldData removes old datasets based on age and size limits. func (dm *DataManager) CleanupOldData() error { logger := dm.logger.Job(dm.ctx, "data_manager", "") logger.Info("running data cleanup") datasets, err := dm.ListDatasetsOnML() if err != nil { return err } var totalSize int64 for _, ds := range datasets { totalSize += ds.SizeBytes } totalSizeGB := float64(totalSize) / (1024 * 1024 * 1024) logger.Info("current storage usage", "total_size_gb", totalSizeGB, "dataset_count", len(datasets)) // Archive datasets older than max age or if over size limit maxAge := time.Duration(dm.config.MaxAgeHours) * time.Hour maxSize := int64(dm.config.MaxSizeGB) * 1024 * 1024 * 1024 // Ensure deterministic ordering when needing to reduce size. sort.Slice(datasets, func(i, j int) bool { ai := datasets[i].LastAccess aj := datasets[j].LastAccess if ai.IsZero() && aj.IsZero() { return datasets[i].Name < datasets[j].Name } if ai.IsZero() { return true } if aj.IsZero() { return false } if ai.Equal(aj) { return datasets[i].Name < datasets[j].Name } return ai.Before(aj) }) valid := make([]DatasetInfo, 0, len(datasets)) for _, ds := range datasets { name := strings.TrimSpace(ds.Name) if err := container.ValidateJobName(name); err != nil { logger.Warn("skipping dataset with invalid name", "dataset", ds.Name) continue } ds.Name = name valid = append(valid, ds) } archivedSet := make(map[string]struct{}, len(valid)) var archived []string archiveOne := func(ds DatasetInfo, reason string) { if _, ok := archivedSet[ds.Name]; ok { return } path := filepath.Join(dm.config.MLDataDir, ds.Name) logger.Info("archiving dataset", "dataset", ds.Name, "path", path, "reason", reason) if _, err := dm.archiveDatasetOnML(ds.Name); err != nil { logger.Error("failed to archive dataset", "dataset", ds.Name, "error", err) return } archivedSet[ds.Name] = struct{}{} archived = append(archived, ds.Name) totalSize -= ds.SizeBytes totalSizeGB = float64(totalSize) / (1024 * 1024 * 1024) if dm.datasetStore != nil { if err := dm.datasetStore.DeleteDatasetInfo(dm.ctx, ds.Name); err != nil { logger.Warn("failed to delete dataset from Redis", "dataset", ds.Name, "error", err) } } } // First archive datasets older than maxAge. now := time.Now() for _, ds := range valid { if ds.LastAccess.IsZero() { continue } if now.Sub(ds.LastAccess) > maxAge { archiveOne(ds, "max_age") } } // Then archive additional oldest datasets until we're under maxSize. for totalSize > maxSize { found := false for _, ds := range valid { if _, ok := archivedSet[ds.Name]; ok { continue } archiveOne(ds, "max_size") found = true break } if !found { break } } if len(archived) > 0 { logger.Info("cleanup complete", "archived_count", len(archived), "archived_datasets", archived) } else { logger.Info("cleanup complete", "archived_count", 0) } return nil } // GetAvailableDiskSpace returns available disk space in bytes. func (dm *DataManager) GetAvailableDiskSpace() int64 { logger := dm.logger.Job(dm.ctx, "data_manager", "") // Check disk space on ML server cmd := "df -k " + dm.config.MLDataDir + " | tail -1 | awk '{print $4}'" output, err := dm.mlServer.Exec(cmd) if err != nil { logger.Error("failed to get disk space", "error", err) return 0 } // Parse KB to bytes var freeKB int64 _, err = fmt.Sscanf(strings.TrimSpace(output), "%d", &freeKB) if err != nil { logger.Error("failed to parse disk space", "error", err, "output", output) return 0 } return freeKB * 1024 // Convert KB to bytes } // GetDatasetInfo returns information about a dataset from NAS. func (dm *DataManager) GetDatasetInfo(datasetName string) (*DatasetInfo, error) { // Check if dataset exists on NAS nasPath := filepath.Join(dm.config.NASDataDir, datasetName) cmd := fmt.Sprintf("test -d %s && echo 'exists'", nasPath) output, err := dm.nasServer.Exec(cmd) if err != nil || strings.TrimSpace(output) != "exists" { return nil, fmt.Errorf("dataset %s not found on NAS", datasetName) } // Get dataset size cmd = fmt.Sprintf("du -sb %s | cut -f1", nasPath) output, err = dm.nasServer.Exec(cmd) if err != nil { return nil, fmt.Errorf("failed to get dataset size: %w", err) } var sizeBytes int64 _, err = fmt.Sscanf(strings.TrimSpace(output), "%d", &sizeBytes) if err != nil { return nil, fmt.Errorf("failed to parse dataset size: %w", err) } // Get last modification time as proxy for last access cmd = fmt.Sprintf("stat -c %%Y %s", nasPath) output, err = dm.nasServer.Exec(cmd) if err != nil { return nil, fmt.Errorf("failed to get dataset timestamp: %w", err) } var modTime int64 _, err = fmt.Sscanf(strings.TrimSpace(output), "%d", &modTime) if err != nil { return nil, fmt.Errorf("failed to parse timestamp: %w", err) } return &DatasetInfo{ Name: datasetName, SizeBytes: sizeBytes, Location: "nas", LastAccess: time.Unix(modTime, 0), }, nil } // ValidateDatasetWithCleanup checks if dataset fits and runs cleanup if needed. func (dm *DataManager) ValidateDatasetWithCleanup(datasetName string) error { logger := dm.logger.Job(dm.ctx, "data_manager", "") // Get dataset info info, err := dm.GetDatasetInfo(datasetName) if err != nil { return fmt.Errorf("failed to get dataset info: %w", err) } // Check current available space availableSpace := dm.GetAvailableDiskSpace() logger.Info("dataset size validation", "dataset", datasetName, "dataset_size_gb", float64(info.SizeBytes)/(1024*1024*1024), "available_gb", float64(availableSpace)/(1024*1024*1024)) // If enough space, proceed if info.SizeBytes <= availableSpace { logger.Info("sufficient space available", "dataset", datasetName) return nil } // Try cleanup first logger.Info("insufficient space, running cleanup", "dataset", datasetName, "required_gb", float64(info.SizeBytes)/(1024*1024*1024), "available_gb", float64(availableSpace)/(1024*1024*1024)) if err := dm.CleanupOldData(); err != nil { return fmt.Errorf("cleanup failed: %w", err) } // Check space again after cleanup availableSpace = dm.GetAvailableDiskSpace() logger.Info("space after cleanup", "available_gb", float64(availableSpace)/(1024*1024*1024)) // If now enough space, proceed if info.SizeBytes <= availableSpace { logger.Info("cleanup freed enough space", "dataset", datasetName) return nil } // Still not enough space return fmt.Errorf("dataset %s (%.2fGB) too large for available space (%.2fGB) even after cleanup", datasetName, float64(info.SizeBytes)/(1024*1024*1024), float64(availableSpace)/(1024*1024*1024)) } // StartCleanupLoop starts the periodic cleanup loop. func (dm *DataManager) StartCleanupLoop() { logger := dm.logger.Job(dm.ctx, "data_manager", "") ticker := time.NewTicker(time.Duration(dm.config.CleanupInterval) * time.Minute) go func() { defer ticker.Stop() for { select { case <-dm.ctx.Done(): logger.Info("cleanup loop stopping") return case <-ticker.C: if err := dm.CleanupOldData(); err != nil { logger.Error("cleanup error", "error", err) } } } }() } // Close gracefully shuts down the DataManager, stopping the cleanup loop and // closing all connections to ML server, NAS server, and Redis. func (dm *DataManager) Close() { dm.cancel() // Cancel context to stop cleanup loop // Wait a moment for cleanup loop to finish time.Sleep(100 * time.Millisecond) if dm.mlServer != nil { if err := dm.mlServer.Close(); err != nil { dm.logger.Job(dm.ctx, "data_manager", "").Warn( "error closing ML server connection", "error", err, ) } } if dm.nasServer != nil { if err := dm.nasServer.Close(); err != nil { dm.logger.Job(dm.ctx, "data_manager", "").Warn( "error closing NAS server connection", "error", err, ) } } if dm.taskQueue != nil { if err := dm.taskQueue.Close(); err != nil { dm.logger.Job(dm.ctx, "data_manager", "").Warn( "error closing Redis connection", "error", err, ) } } } func main() { // Parse authentication flags authFlags := auth.ParseAuthFlags() if err := auth.ValidateFlags(authFlags); err != nil { log.Fatalf("Authentication flag error: %v", err) } // Get API key from various sources apiKey := auth.GetAPIKeyFromSources(authFlags) configFile := "configs/api/dev.yaml" if authFlags.ConfigFile != "" { configFile = authFlags.ConfigFile } // Parse command line args if len(os.Args) < 2 { fmt.Println("Usage:") fmt.Println(" data_manager [--config configs/api/dev.yaml] [--api-key ] " + "fetch [dataset...]") fmt.Println(" data_manager [--config configs/api/dev.yaml] [--api-key ] list") fmt.Println(" data_manager [--config configs/api/dev.yaml] [--api-key ] cleanup") fmt.Println(" data_manager [--config configs/api/dev.yaml] [--api-key ] validate ") fmt.Println(" data_manager [--config configs/api/dev.yaml] [--api-key ] daemon") fmt.Println() auth.PrintAuthHelp() os.Exit(1) } // Check for --config flag if len(os.Args) >= 3 && os.Args[1] == "--config" { configFile = os.Args[2] // Shift args os.Args = append([]string{os.Args[0]}, os.Args[3:]...) } cfg, err := LoadDataConfig(configFile) if err != nil { log.Fatalf("Failed to load config: %v", err) } // Validate authentication configuration if err := cfg.Auth.ValidateAuthConfig(); err != nil { log.Fatalf("Invalid authentication configuration: %v", err) } // Validate configuration if err := cfg.Validate(); err != nil { log.Fatalf("Invalid configuration: %v", err) } // Test authentication if enabled if cfg.Auth.Enabled && apiKey != "" { user, err := cfg.Auth.ValidateAPIKey(apiKey) if err != nil { log.Fatalf("Authentication failed: %v", err) } log.Printf("Data manager authenticated as user: %s (admin: %v)", user.Name, user.Admin) } else if cfg.Auth.Enabled { log.Fatal("Authentication required but no API key provided") } dm, err := NewDataManager(cfg, apiKey) if err != nil { log.Fatalf("Failed to create data manager: %v", err) } defer dm.Close() cmd := os.Args[1] switch cmd { case "fetch": if len(os.Args) < 4 { log.Printf("Usage: data_manager fetch [dataset...]") return } jobName := os.Args[2] datasets := os.Args[3:] for _, dataset := range datasets { if err := dm.FetchDataset(jobName, dataset); err != nil { dm.logger.Job(context.Background(), jobName, "").Error("failed to fetch dataset", "dataset", dataset, "error", err) } } case "list": datasets, err := dm.ListDatasetsOnML() if err != nil { log.Printf("Failed to list datasets: %v", err) return } fmt.Println("Datasets on ML server:") fmt.Println("======================") var totalSize int64 for _, ds := range datasets { sizeMB := float64(ds.SizeBytes) / (1024 * 1024) lastAccess := "unknown" if !ds.LastAccess.IsZero() { lastAccess = ds.LastAccess.Format("2006-01-02 15:04:05") } fmt.Printf("%-30s %10.2f MB Last access: %s\n", ds.Name, sizeMB, lastAccess) totalSize += ds.SizeBytes } fmt.Printf("\nTotal: %.2f GB\n", float64(totalSize)/(1024*1024*1024)) case "validate": if len(os.Args) < 3 { log.Printf("Usage: data_manager validate ") return } dataset := os.Args[2] fmt.Printf("Validating dataset: %s\n", dataset) if err := dm.ValidateDatasetWithCleanup(dataset); err != nil { log.Printf("Validation failed: %v", err) return } fmt.Printf("✅ Dataset %s can be downloaded\n", dataset) case "cleanup": if err := dm.CleanupOldData(); err != nil { log.Printf("Cleanup failed: %v", err) return } case "daemon": logger := dm.logger.Job(context.Background(), "data_manager", "") logger.Info("starting data manager daemon") dm.StartCleanupLoop() logger.Info("cleanup configuration", "interval_minutes", cfg.CleanupInterval, "max_age_hours", cfg.MaxAgeHours, "max_size_gb", cfg.MaxSizeGB) // Handle graceful shutdown sigChan := make(chan os.Signal, 1) signal.Notify(sigChan, syscall.SIGINT, syscall.SIGTERM) sig := <-sigChan logger.Info("received shutdown signal", "signal", sig) dm.Close() logger.Info("data manager shut down gracefully") default: log.Printf("Unknown command: %s", cmd) } }