fetch_ml/cmd/data_manager/data_sync.go
Jeremie Fraeys dbf96020af
refactor(dependency-hygiene): Fix Redis leak, simplify TUI wrapper, clean go.mod
Phase 1: Fix Redis Schema Leak
- Create internal/storage/dataset.go with DatasetStore abstraction
- Remove all direct Redis calls from cmd/data_manager/data_sync.go
- data_manager now uses DatasetStore for transfer tracking and metadata

Phase 2: Simplify TUI Services
- Embed *queue.TaskQueue directly in services.TaskQueue
- Eliminate 60% of wrapper boilerplate (203 -> ~100 lines)
- Keep only TUI-specific methods (EnqueueTask, GetJobStatus, experiment methods)

Phase 5: Clean go.mod Dependencies
- Remove duplicate go-redis/redis/v8 dependency
- Migrate internal/storage/migrate.go to redis/go-redis/v9
- Separate test-only deps (miniredis, testify) into own block

Results:
- Zero direct Redis calls in cmd/
- 60% fewer lines in TUI services
- Cleaner dependency structure
2026-02-17 21:13:49 -05:00

853 lines
23 KiB
Go

// data_manager.go - Fetch data from NAS to ML server on-demand
package main
import (
"context"
"fmt"
"log"
"log/slog"
"os"
"os/signal"
"path/filepath"
"sort"
"strings"
"syscall"
"time"
"github.com/jfraeys/fetch_ml/internal/auth"
"github.com/jfraeys/fetch_ml/internal/container"
"github.com/jfraeys/fetch_ml/internal/errtypes"
"github.com/jfraeys/fetch_ml/internal/logging"
"github.com/jfraeys/fetch_ml/internal/network"
"github.com/jfraeys/fetch_ml/internal/queue"
"github.com/jfraeys/fetch_ml/internal/storage"
"github.com/jfraeys/fetch_ml/internal/telemetry"
)
func shellQuote(s string) string {
if s == "" {
return "''"
}
return "'" + strings.ReplaceAll(s, "'", "'\"'\"'") + "'"
}
// SSHClient alias for convenience.
type SSHClient = network.SSHClient
// DataManager manages data synchronization between NAS and ML server.
type DataManager struct {
config *DataConfig
mlServer *SSHClient
nasServer *SSHClient
taskQueue *queue.TaskQueue
datasetStore *storage.DatasetStore
ctx context.Context
cancel context.CancelFunc
logger *logging.Logger
}
func (dm *DataManager) archiveDatasetOnML(datasetName string) (string, error) {
datasetName = strings.TrimSpace(datasetName)
if err := container.ValidateJobName(datasetName); err != nil {
return "", fmt.Errorf("invalid dataset name: %w", err)
}
if strings.TrimSpace(dm.config.MLDataDir) == "" {
return "", fmt.Errorf("missing ml_data_dir")
}
stamp := time.Now().UTC().Format("20060102-150405")
archiveRoot := filepath.Join(dm.config.MLDataDir, ".archive", stamp)
src := filepath.Join(dm.config.MLDataDir, datasetName)
dst := filepath.Join(archiveRoot, datasetName)
cmd := fmt.Sprintf(
"mkdir -p %s && mv %s %s",
shellQuote(archiveRoot),
shellQuote(src),
shellQuote(dst),
)
if _, err := dm.mlServer.Exec(cmd); err != nil {
return "", err
}
return dst, nil
}
// DataFetchRequest represents a request to fetch datasets.
type DataFetchRequest struct {
JobName string `json:"job_name"`
Datasets []string `json:"datasets"` // Dataset names to fetch
Priority int `json:"priority"`
RequestedAt time.Time `json:"requested_at"`
}
// DatasetInfo contains information about a dataset.
type DatasetInfo struct {
Name string `json:"name"`
SizeBytes int64 `json:"size_bytes"`
Location string `json:"location"` // "nas" or "ml"
LastAccess time.Time `json:"last_access"`
}
// NewDataManager creates a new DataManager instance.
func NewDataManager(cfg *DataConfig, _ string) (*DataManager, error) {
mlServer, err := network.NewSSHClient(cfg.MLHost, cfg.MLUser, cfg.MLSSHKey, cfg.MLPort, "")
if err != nil {
return nil, fmt.Errorf("ML server connection failed: %w", err)
}
defer func() {
if err != nil {
if closeErr := mlServer.Close(); closeErr != nil {
log.Printf("Warning: failed to close ML server connection: %v", closeErr)
}
}
}()
nasServer, err := network.NewSSHClient(cfg.NASHost, cfg.NASUser, cfg.NASSSHKey, cfg.NASPort, "")
if err != nil {
return nil, fmt.Errorf("NAS connection failed: %w", err)
}
defer func() {
if err != nil {
if closeErr := nasServer.Close(); closeErr != nil {
log.Printf("Warning: failed to close NAS server connection: %v", closeErr)
}
}
}()
// Create MLDataDir if it doesn't exist (for production without NAS)
if cfg.MLDataDir != "" {
if _, err := mlServer.Exec(fmt.Sprintf("mkdir -p %s", cfg.MLDataDir)); err != nil {
logger := logging.NewLogger(slog.LevelInfo, false)
logger.Job(context.Background(), "data_manager", "").Error(
"Failed to create ML data directory",
"dir", cfg.MLDataDir,
"error", err,
)
}
}
// Setup Redis using internal queue
ctx, cancel := context.WithCancel(context.Background())
logger := logging.NewLogger(slog.LevelInfo, false)
var taskQueue *queue.TaskQueue
var datasetStore *storage.DatasetStore
if cfg.RedisAddr != "" {
queueCfg := queue.Config{
RedisAddr: cfg.RedisAddr,
RedisPassword: cfg.RedisPassword,
RedisDB: cfg.RedisDB,
}
var err error
taskQueue, err = queue.NewTaskQueue(queueCfg)
if err != nil {
// FIXED: Check error return values for cleanup
if closeErr := mlServer.Close(); closeErr != nil {
logger.Warn("failed to close ML server during error cleanup", "error", closeErr)
}
if closeErr := nasServer.Close(); closeErr != nil {
logger.Warn("failed to close NAS server during error cleanup", "error", closeErr)
}
cancel() // Cancel context to prevent leak
return nil, fmt.Errorf("redis connection failed: %w", err)
}
// Initialize dataset store with the Redis client
datasetStore = storage.NewDatasetStoreWithContext(taskQueue.GetRedisClient(), ctx)
} else {
taskQueue = nil // Local mode - no Redis
datasetStore = nil
}
return &DataManager{
config: cfg,
mlServer: mlServer,
nasServer: nasServer,
taskQueue: taskQueue,
datasetStore: datasetStore,
ctx: ctx,
cancel: cancel,
logger: logger,
}, nil
}
// FetchDataset fetches a dataset from NAS to ML server.
func (dm *DataManager) FetchDataset(jobName, datasetName string) error {
ctx, cancel := context.WithTimeout(dm.ctx, 30*time.Minute)
defer cancel()
return network.RetryForNetworkOperations(ctx, func() error {
return dm.fetchDatasetInternal(ctx, jobName, datasetName)
})
}
func (dm *DataManager) fetchDatasetInternal(
ctx context.Context,
jobName string,
datasetName string,
) error {
if err := container.ValidateJobName(datasetName); err != nil {
return &errtypes.DataFetchError{
Dataset: datasetName,
JobName: jobName,
Err: fmt.Errorf("invalid dataset name: %w", err),
}
}
logger := dm.logger.Job(ctx, jobName, "")
logger.Info("fetching dataset", "dataset", datasetName)
// Validate dataset size and run cleanup if needed
if err := dm.ValidateDatasetWithCleanup(datasetName); err != nil {
return &errtypes.DataFetchError{
Dataset: datasetName,
JobName: jobName,
Err: fmt.Errorf("dataset size validation failed: %w", err),
}
}
nasPath := filepath.Join(dm.config.NASDataDir, datasetName)
mlPath := filepath.Join(dm.config.MLDataDir, datasetName)
// Check if dataset exists on NAS
if !dm.nasServer.FileExists(nasPath) {
return &errtypes.DataFetchError{
Dataset: datasetName,
JobName: jobName,
Err: fmt.Errorf("dataset not found on NAS"),
}
}
// Check if already on ML server
if dm.mlServer.FileExists(mlPath) {
logger.Info("dataset already on ML server", "dataset", datasetName)
dm.updateLastAccess(datasetName)
return nil
}
// Get size for progress tracking
size, err := dm.nasServer.GetFileSize(nasPath)
if err != nil {
logger.Warn("could not get dataset size", "dataset", datasetName, "error", err)
size = 0
}
sizeGB := float64(size) / (1024 * 1024 * 1024)
logger.Info("transferring dataset",
"dataset", datasetName,
"size_gb", sizeGB,
"nas_path", nasPath,
"ml_path", mlPath)
if dm.datasetStore != nil {
if err := dm.datasetStore.RecordTransferStart(dm.ctx, datasetName, jobName, size); err != nil {
logger.Warn("failed to record transfer start in Redis", "error", err)
}
}
// Use local copy for local mode, rsync for remote mode
var rsyncCmd string
if dm.config.NASHost == "" || dm.config.NASUser == "" {
// Local mode - use cp
rsyncCmd = fmt.Sprintf("mkdir -p %s && cp -r %s %s/", dm.config.MLDataDir, nasPath, mlPath)
} else {
// Remote mode - use rsync over SSH
rsyncCmd = fmt.Sprintf(
"mkdir -p %s && rsync -avz --progress %s@%s:%s/ %s/",
dm.config.MLDataDir,
dm.config.NASUser,
dm.config.NASHost,
nasPath,
mlPath,
)
}
ioBefore, ioErr := telemetry.ReadProcessIO()
start := time.Now()
out, err := telemetry.ExecWithMetrics(
dm.logger,
"dataset transfer",
time.Since(start),
func() (string, error) {
return dm.nasServer.ExecContext(ctx, rsyncCmd)
},
)
duration := time.Since(start)
if err != nil {
logger.Error("transfer failed",
"dataset", datasetName,
"duration", duration,
"error", err,
"output", out)
if ioErr == nil {
if after, readErr := telemetry.ReadProcessIO(); readErr == nil {
delta := telemetry.DiffIO(ioBefore, after)
logger.Debug("transfer io stats",
"dataset", datasetName,
"read_bytes", delta.ReadBytes,
"write_bytes", delta.WriteBytes)
}
}
if dm.datasetStore != nil {
if redisErr := dm.datasetStore.RecordTransferFailure(dm.ctx, datasetName, err); redisErr != nil {
logger.Warn("failed to record transfer failure in Redis", "error", redisErr)
}
}
return err
}
logger.Info("transfer complete",
"dataset", datasetName,
"duration", duration,
"size_gb", sizeGB)
if ioErr == nil {
if after, readErr := telemetry.ReadProcessIO(); readErr == nil {
delta := telemetry.DiffIO(ioBefore, after)
logger.Debug("transfer io stats",
"dataset", datasetName,
"read_bytes", delta.ReadBytes,
"write_bytes", delta.WriteBytes)
}
}
if dm.datasetStore != nil {
if err := dm.datasetStore.RecordTransferComplete(dm.ctx, datasetName, duration); err != nil {
logger.Warn("failed to record transfer completion in Redis", "error", err)
}
}
// Track dataset metadata
dm.saveDatasetInfo(datasetName, size)
return nil
}
func (dm *DataManager) saveDatasetInfo(name string, size int64) {
if dm.datasetStore == nil {
return // Skip in local mode
}
info := storage.DatasetInfo{
Name: name,
SizeBytes: size,
Location: "ml",
LastAccess: time.Now(),
}
if err := dm.datasetStore.SaveDatasetInfo(dm.ctx, info); err != nil {
dm.logger.Job(dm.ctx, "data_manager", "").Warn("failed to save dataset info to Redis",
"dataset", name, "error", err)
}
}
func (dm *DataManager) updateLastAccess(name string) {
if dm.datasetStore == nil {
return // Skip in local mode
}
if err := dm.datasetStore.UpdateLastAccess(dm.ctx, name); err != nil {
dm.logger.Job(dm.ctx, "data_manager", "").Warn("failed to update last access in Redis",
"dataset", name, "error", err)
}
}
// ListDatasetsOnML returns a list of all datasets currently stored on the ML server.
func (dm *DataManager) ListDatasetsOnML() ([]DatasetInfo, error) {
out, err := dm.mlServer.Exec(fmt.Sprintf("ls -1 %s 2>/dev/null", dm.config.MLDataDir))
if err != nil {
return nil, err
}
var datasets []DatasetInfo
for name := range strings.SplitSeq(strings.TrimSpace(out), "\n") {
if name == "" {
continue
}
var info DatasetInfo
// Only use Redis if available
if dm.datasetStore != nil {
dsInfo, err := dm.datasetStore.GetDatasetInfo(dm.ctx, name)
if err == nil && dsInfo != nil {
info = DatasetInfo{
Name: dsInfo.Name,
SizeBytes: dsInfo.SizeBytes,
Location: dsInfo.Location,
LastAccess: dsInfo.LastAccess,
}
} else {
// Fallback: get from disk
size, _ := dm.mlServer.GetFileSize(filepath.Join(dm.config.MLDataDir, name))
info = DatasetInfo{
Name: name,
SizeBytes: size,
Location: "ml",
}
}
} else {
// Local mode: get from disk
size, _ := dm.mlServer.GetFileSize(filepath.Join(dm.config.MLDataDir, name))
info = DatasetInfo{
Name: name,
SizeBytes: size,
Location: "ml",
}
}
datasets = append(datasets, info)
}
return datasets, nil
}
// CleanupOldData removes old datasets based on age and size limits.
func (dm *DataManager) CleanupOldData() error {
logger := dm.logger.Job(dm.ctx, "data_manager", "")
logger.Info("running data cleanup")
datasets, err := dm.ListDatasetsOnML()
if err != nil {
return err
}
var totalSize int64
for _, ds := range datasets {
totalSize += ds.SizeBytes
}
totalSizeGB := float64(totalSize) / (1024 * 1024 * 1024)
logger.Info("current storage usage",
"total_size_gb", totalSizeGB,
"dataset_count", len(datasets))
// Archive datasets older than max age or if over size limit
maxAge := time.Duration(dm.config.MaxAgeHours) * time.Hour
maxSize := int64(dm.config.MaxSizeGB) * 1024 * 1024 * 1024
// Ensure deterministic ordering when needing to reduce size.
sort.Slice(datasets, func(i, j int) bool {
ai := datasets[i].LastAccess
aj := datasets[j].LastAccess
if ai.IsZero() && aj.IsZero() {
return datasets[i].Name < datasets[j].Name
}
if ai.IsZero() {
return true
}
if aj.IsZero() {
return false
}
if ai.Equal(aj) {
return datasets[i].Name < datasets[j].Name
}
return ai.Before(aj)
})
valid := make([]DatasetInfo, 0, len(datasets))
for _, ds := range datasets {
name := strings.TrimSpace(ds.Name)
if err := container.ValidateJobName(name); err != nil {
logger.Warn("skipping dataset with invalid name", "dataset", ds.Name)
continue
}
ds.Name = name
valid = append(valid, ds)
}
archivedSet := make(map[string]struct{}, len(valid))
var archived []string
archiveOne := func(ds DatasetInfo, reason string) {
if _, ok := archivedSet[ds.Name]; ok {
return
}
path := filepath.Join(dm.config.MLDataDir, ds.Name)
logger.Info("archiving dataset", "dataset", ds.Name, "path", path, "reason", reason)
if _, err := dm.archiveDatasetOnML(ds.Name); err != nil {
logger.Error("failed to archive dataset",
"dataset", ds.Name,
"error", err)
return
}
archivedSet[ds.Name] = struct{}{}
archived = append(archived, ds.Name)
totalSize -= ds.SizeBytes
totalSizeGB = float64(totalSize) / (1024 * 1024 * 1024)
if dm.datasetStore != nil {
if err := dm.datasetStore.DeleteDatasetInfo(dm.ctx, ds.Name); err != nil {
logger.Warn("failed to delete dataset from Redis",
"dataset", ds.Name,
"error", err)
}
}
}
// First archive datasets older than maxAge.
now := time.Now()
for _, ds := range valid {
if ds.LastAccess.IsZero() {
continue
}
if now.Sub(ds.LastAccess) > maxAge {
archiveOne(ds, "max_age")
}
}
// Then archive additional oldest datasets until we're under maxSize.
for totalSize > maxSize {
found := false
for _, ds := range valid {
if _, ok := archivedSet[ds.Name]; ok {
continue
}
archiveOne(ds, "max_size")
found = true
break
}
if !found {
break
}
}
if len(archived) > 0 {
logger.Info("cleanup complete",
"archived_count", len(archived),
"archived_datasets", archived)
} else {
logger.Info("cleanup complete", "archived_count", 0)
}
return nil
}
// GetAvailableDiskSpace returns available disk space in bytes.
func (dm *DataManager) GetAvailableDiskSpace() int64 {
logger := dm.logger.Job(dm.ctx, "data_manager", "")
// Check disk space on ML server
cmd := "df -k " + dm.config.MLDataDir + " | tail -1 | awk '{print $4}'"
output, err := dm.mlServer.Exec(cmd)
if err != nil {
logger.Error("failed to get disk space", "error", err)
return 0
}
// Parse KB to bytes
var freeKB int64
_, err = fmt.Sscanf(strings.TrimSpace(output), "%d", &freeKB)
if err != nil {
logger.Error("failed to parse disk space", "error", err, "output", output)
return 0
}
return freeKB * 1024 // Convert KB to bytes
}
// GetDatasetInfo returns information about a dataset from NAS.
func (dm *DataManager) GetDatasetInfo(datasetName string) (*DatasetInfo, error) {
// Check if dataset exists on NAS
nasPath := filepath.Join(dm.config.NASDataDir, datasetName)
cmd := fmt.Sprintf("test -d %s && echo 'exists'", nasPath)
output, err := dm.nasServer.Exec(cmd)
if err != nil || strings.TrimSpace(output) != "exists" {
return nil, fmt.Errorf("dataset %s not found on NAS", datasetName)
}
// Get dataset size
cmd = fmt.Sprintf("du -sb %s | cut -f1", nasPath)
output, err = dm.nasServer.Exec(cmd)
if err != nil {
return nil, fmt.Errorf("failed to get dataset size: %w", err)
}
var sizeBytes int64
_, err = fmt.Sscanf(strings.TrimSpace(output), "%d", &sizeBytes)
if err != nil {
return nil, fmt.Errorf("failed to parse dataset size: %w", err)
}
// Get last modification time as proxy for last access
cmd = fmt.Sprintf("stat -c %%Y %s", nasPath)
output, err = dm.nasServer.Exec(cmd)
if err != nil {
return nil, fmt.Errorf("failed to get dataset timestamp: %w", err)
}
var modTime int64
_, err = fmt.Sscanf(strings.TrimSpace(output), "%d", &modTime)
if err != nil {
return nil, fmt.Errorf("failed to parse timestamp: %w", err)
}
return &DatasetInfo{
Name: datasetName,
SizeBytes: sizeBytes,
Location: "nas",
LastAccess: time.Unix(modTime, 0),
}, nil
}
// ValidateDatasetWithCleanup checks if dataset fits and runs cleanup if needed.
func (dm *DataManager) ValidateDatasetWithCleanup(datasetName string) error {
logger := dm.logger.Job(dm.ctx, "data_manager", "")
// Get dataset info
info, err := dm.GetDatasetInfo(datasetName)
if err != nil {
return fmt.Errorf("failed to get dataset info: %w", err)
}
// Check current available space
availableSpace := dm.GetAvailableDiskSpace()
logger.Info("dataset size validation",
"dataset", datasetName,
"dataset_size_gb", float64(info.SizeBytes)/(1024*1024*1024),
"available_gb", float64(availableSpace)/(1024*1024*1024))
// If enough space, proceed
if info.SizeBytes <= availableSpace {
logger.Info("sufficient space available", "dataset", datasetName)
return nil
}
// Try cleanup first
logger.Info("insufficient space, running cleanup",
"dataset", datasetName,
"required_gb", float64(info.SizeBytes)/(1024*1024*1024),
"available_gb", float64(availableSpace)/(1024*1024*1024))
if err := dm.CleanupOldData(); err != nil {
return fmt.Errorf("cleanup failed: %w", err)
}
// Check space again after cleanup
availableSpace = dm.GetAvailableDiskSpace()
logger.Info("space after cleanup",
"available_gb", float64(availableSpace)/(1024*1024*1024))
// If now enough space, proceed
if info.SizeBytes <= availableSpace {
logger.Info("cleanup freed enough space", "dataset", datasetName)
return nil
}
// Still not enough space
return fmt.Errorf("dataset %s (%.2fGB) too large for available space (%.2fGB) even after cleanup",
datasetName,
float64(info.SizeBytes)/(1024*1024*1024),
float64(availableSpace)/(1024*1024*1024))
}
// StartCleanupLoop starts the periodic cleanup loop.
func (dm *DataManager) StartCleanupLoop() {
logger := dm.logger.Job(dm.ctx, "data_manager", "")
ticker := time.NewTicker(time.Duration(dm.config.CleanupInterval) * time.Minute)
go func() {
defer ticker.Stop()
for {
select {
case <-dm.ctx.Done():
logger.Info("cleanup loop stopping")
return
case <-ticker.C:
if err := dm.CleanupOldData(); err != nil {
logger.Error("cleanup error", "error", err)
}
}
}
}()
}
// Close gracefully shuts down the DataManager, stopping the cleanup loop and
// closing all connections to ML server, NAS server, and Redis.
func (dm *DataManager) Close() {
dm.cancel() // Cancel context to stop cleanup loop
// Wait a moment for cleanup loop to finish
time.Sleep(100 * time.Millisecond)
if dm.mlServer != nil {
if err := dm.mlServer.Close(); err != nil {
dm.logger.Job(dm.ctx, "data_manager", "").Warn(
"error closing ML server connection",
"error",
err,
)
}
}
if dm.nasServer != nil {
if err := dm.nasServer.Close(); err != nil {
dm.logger.Job(dm.ctx, "data_manager", "").Warn(
"error closing NAS server connection",
"error",
err,
)
}
}
if dm.taskQueue != nil {
if err := dm.taskQueue.Close(); err != nil {
dm.logger.Job(dm.ctx, "data_manager", "").Warn(
"error closing Redis connection",
"error",
err,
)
}
}
}
func main() {
// Parse authentication flags
authFlags := auth.ParseAuthFlags()
if err := auth.ValidateFlags(authFlags); err != nil {
log.Fatalf("Authentication flag error: %v", err)
}
// Get API key from various sources
apiKey := auth.GetAPIKeyFromSources(authFlags)
configFile := "configs/api/dev.yaml"
if authFlags.ConfigFile != "" {
configFile = authFlags.ConfigFile
}
// Parse command line args
if len(os.Args) < 2 {
fmt.Println("Usage:")
fmt.Println(" data_manager [--config configs/api/dev.yaml] [--api-key <key>] " +
"fetch <job-name> <dataset> [dataset...]")
fmt.Println(" data_manager [--config configs/api/dev.yaml] [--api-key <key>] list")
fmt.Println(" data_manager [--config configs/api/dev.yaml] [--api-key <key>] cleanup")
fmt.Println(" data_manager [--config configs/api/dev.yaml] [--api-key <key>] validate <dataset>")
fmt.Println(" data_manager [--config configs/api/dev.yaml] [--api-key <key>] daemon")
fmt.Println()
auth.PrintAuthHelp()
os.Exit(1)
}
// Check for --config flag
if len(os.Args) >= 3 && os.Args[1] == "--config" {
configFile = os.Args[2]
// Shift args
os.Args = append([]string{os.Args[0]}, os.Args[3:]...)
}
cfg, err := LoadDataConfig(configFile)
if err != nil {
log.Fatalf("Failed to load config: %v", err)
}
// Validate authentication configuration
if err := cfg.Auth.ValidateAuthConfig(); err != nil {
log.Fatalf("Invalid authentication configuration: %v", err)
}
// Validate configuration
if err := cfg.Validate(); err != nil {
log.Fatalf("Invalid configuration: %v", err)
}
// Test authentication if enabled
if cfg.Auth.Enabled && apiKey != "" {
user, err := cfg.Auth.ValidateAPIKey(apiKey)
if err != nil {
log.Fatalf("Authentication failed: %v", err)
}
log.Printf("Data manager authenticated as user: %s (admin: %v)", user.Name, user.Admin)
} else if cfg.Auth.Enabled {
log.Fatal("Authentication required but no API key provided")
}
dm, err := NewDataManager(cfg, apiKey)
if err != nil {
log.Fatalf("Failed to create data manager: %v", err)
}
defer dm.Close()
cmd := os.Args[1]
switch cmd {
case "fetch":
if len(os.Args) < 4 {
log.Printf("Usage: data_manager fetch <job-name> <dataset> [dataset...]")
return
}
jobName := os.Args[2]
datasets := os.Args[3:]
for _, dataset := range datasets {
if err := dm.FetchDataset(jobName, dataset); err != nil {
dm.logger.Job(context.Background(), jobName, "").Error("failed to fetch dataset",
"dataset", dataset,
"error", err)
}
}
case "list":
datasets, err := dm.ListDatasetsOnML()
if err != nil {
log.Printf("Failed to list datasets: %v", err)
return
}
fmt.Println("Datasets on ML server:")
fmt.Println("======================")
var totalSize int64
for _, ds := range datasets {
sizeMB := float64(ds.SizeBytes) / (1024 * 1024)
lastAccess := "unknown"
if !ds.LastAccess.IsZero() {
lastAccess = ds.LastAccess.Format("2006-01-02 15:04:05")
}
fmt.Printf("%-30s %10.2f MB Last access: %s\n", ds.Name, sizeMB, lastAccess)
totalSize += ds.SizeBytes
}
fmt.Printf("\nTotal: %.2f GB\n", float64(totalSize)/(1024*1024*1024))
case "validate":
if len(os.Args) < 3 {
log.Printf("Usage: data_manager validate <dataset>")
return
}
dataset := os.Args[2]
fmt.Printf("Validating dataset: %s\n", dataset)
if err := dm.ValidateDatasetWithCleanup(dataset); err != nil {
log.Printf("Validation failed: %v", err)
return
}
fmt.Printf("✅ Dataset %s can be downloaded\n", dataset)
case "cleanup":
if err := dm.CleanupOldData(); err != nil {
log.Printf("Cleanup failed: %v", err)
return
}
case "daemon":
logger := dm.logger.Job(context.Background(), "data_manager", "")
logger.Info("starting data manager daemon")
dm.StartCleanupLoop()
logger.Info("cleanup configuration",
"interval_minutes", cfg.CleanupInterval,
"max_age_hours", cfg.MaxAgeHours,
"max_size_gb", cfg.MaxSizeGB)
// Handle graceful shutdown
sigChan := make(chan os.Signal, 1)
signal.Notify(sigChan, syscall.SIGINT, syscall.SIGTERM)
sig := <-sigChan
logger.Info("received shutdown signal", "signal", sig)
dm.Close()
logger.Info("data manager shut down gracefully")
default:
log.Printf("Unknown command: %s", cmd)
}
}