- Fix YAML tags in auth config struct (json -> yaml) - Update CLI configs to use pre-hashed API keys - Remove double hashing in WebSocket client - Fix port mapping (9102 -> 9103) in CLI commands - Update permission keys to use jobs:read, jobs:create, etc. - Clean up all debug logging from CLI and server - All user roles now authenticate correctly: * Admin: Can queue jobs and see all jobs * Researcher: Can queue jobs and see own jobs * Analyst: Can see status (read-only access) Multi-user authentication is now fully functional.
792 lines
22 KiB
Go
792 lines
22 KiB
Go
// data_manager.go - Fetch data from NAS to ML server on-demand
|
|
package main
|
|
|
|
import (
|
|
"context"
|
|
"encoding/json"
|
|
"fmt"
|
|
"log"
|
|
"log/slog"
|
|
"os"
|
|
"os/signal"
|
|
"path/filepath"
|
|
"strings"
|
|
"syscall"
|
|
"time"
|
|
|
|
"github.com/jfraeys/fetch_ml/internal/auth"
|
|
"github.com/jfraeys/fetch_ml/internal/container"
|
|
"github.com/jfraeys/fetch_ml/internal/errtypes"
|
|
"github.com/jfraeys/fetch_ml/internal/logging"
|
|
"github.com/jfraeys/fetch_ml/internal/network"
|
|
"github.com/jfraeys/fetch_ml/internal/queue"
|
|
"github.com/jfraeys/fetch_ml/internal/telemetry"
|
|
)
|
|
|
|
// SSHClient alias for convenience.
|
|
type SSHClient = network.SSHClient
|
|
|
|
// DataManager manages data synchronization between NAS and ML server.
|
|
type DataManager struct {
|
|
config *DataConfig
|
|
mlServer *SSHClient
|
|
nasServer *SSHClient
|
|
taskQueue *queue.TaskQueue
|
|
ctx context.Context
|
|
cancel context.CancelFunc
|
|
logger *logging.Logger
|
|
}
|
|
|
|
// DataFetchRequest represents a request to fetch datasets.
|
|
type DataFetchRequest struct {
|
|
JobName string `json:"job_name"`
|
|
Datasets []string `json:"datasets"` // Dataset names to fetch
|
|
Priority int `json:"priority"`
|
|
RequestedAt time.Time `json:"requested_at"`
|
|
}
|
|
|
|
// DatasetInfo contains information about a dataset.
|
|
type DatasetInfo struct {
|
|
Name string `json:"name"`
|
|
SizeBytes int64 `json:"size_bytes"`
|
|
Location string `json:"location"` // "nas" or "ml"
|
|
LastAccess time.Time `json:"last_access"`
|
|
}
|
|
|
|
// NewDataManager creates a new DataManager instance.
|
|
func NewDataManager(cfg *DataConfig, _ string) (*DataManager, error) {
|
|
mlServer, err := network.NewSSHClient(cfg.MLHost, cfg.MLUser, cfg.MLSSHKey, cfg.MLPort, "")
|
|
if err != nil {
|
|
return nil, fmt.Errorf("ML server connection failed: %w", err)
|
|
}
|
|
defer func() {
|
|
if err != nil {
|
|
if closeErr := mlServer.Close(); closeErr != nil {
|
|
log.Printf("Warning: failed to close ML server connection: %v", closeErr)
|
|
}
|
|
}
|
|
}()
|
|
|
|
nasServer, err := network.NewSSHClient(cfg.NASHost, cfg.NASUser, cfg.NASSSHKey, cfg.NASPort, "")
|
|
if err != nil {
|
|
return nil, fmt.Errorf("NAS connection failed: %w", err)
|
|
}
|
|
defer func() {
|
|
if err != nil {
|
|
if closeErr := nasServer.Close(); closeErr != nil {
|
|
log.Printf("Warning: failed to close NAS server connection: %v", closeErr)
|
|
}
|
|
}
|
|
}()
|
|
|
|
// Create MLDataDir if it doesn't exist (for production without NAS)
|
|
if cfg.MLDataDir != "" {
|
|
if _, err := mlServer.Exec(fmt.Sprintf("mkdir -p %s", cfg.MLDataDir)); err != nil {
|
|
logger := logging.NewLogger(slog.LevelInfo, false)
|
|
logger.Job(context.Background(), "data_manager", "").Error(
|
|
"Failed to create ML data directory",
|
|
"dir", cfg.MLDataDir,
|
|
"error", err,
|
|
)
|
|
}
|
|
}
|
|
|
|
// Setup Redis using internal queue
|
|
ctx, cancel := context.WithCancel(context.Background())
|
|
logger := logging.NewLogger(slog.LevelInfo, false)
|
|
|
|
var taskQueue *queue.TaskQueue
|
|
if cfg.RedisAddr != "" {
|
|
queueCfg := queue.Config{
|
|
RedisAddr: cfg.RedisAddr,
|
|
RedisPassword: cfg.RedisPassword,
|
|
RedisDB: cfg.RedisDB,
|
|
}
|
|
|
|
var err error
|
|
taskQueue, err = queue.NewTaskQueue(queueCfg)
|
|
if err != nil {
|
|
// FIXED: Check error return values for cleanup
|
|
if closeErr := mlServer.Close(); closeErr != nil {
|
|
logger.Warn("failed to close ML server during error cleanup", "error", closeErr)
|
|
}
|
|
if closeErr := nasServer.Close(); closeErr != nil {
|
|
logger.Warn("failed to close NAS server during error cleanup", "error", closeErr)
|
|
}
|
|
cancel() // Cancel context to prevent leak
|
|
return nil, fmt.Errorf("redis connection failed: %w", err)
|
|
}
|
|
} else {
|
|
taskQueue = nil // Local mode - no Redis
|
|
}
|
|
|
|
return &DataManager{
|
|
config: cfg,
|
|
mlServer: mlServer,
|
|
nasServer: nasServer,
|
|
taskQueue: taskQueue,
|
|
ctx: ctx,
|
|
cancel: cancel,
|
|
logger: logger,
|
|
}, nil
|
|
}
|
|
|
|
// FetchDataset fetches a dataset from NAS to ML server.
|
|
func (dm *DataManager) FetchDataset(jobName, datasetName string) error {
|
|
ctx, cancel := context.WithTimeout(dm.ctx, 30*time.Minute)
|
|
defer cancel()
|
|
|
|
return network.RetryForNetworkOperations(ctx, func() error {
|
|
return dm.fetchDatasetInternal(ctx, jobName, datasetName)
|
|
})
|
|
}
|
|
|
|
func (dm *DataManager) fetchDatasetInternal(ctx context.Context, jobName, datasetName string) error {
|
|
if err := container.ValidateJobName(datasetName); err != nil {
|
|
return &errtypes.DataFetchError{
|
|
Dataset: datasetName,
|
|
JobName: jobName,
|
|
Err: fmt.Errorf("invalid dataset name: %w", err),
|
|
}
|
|
}
|
|
|
|
logger := dm.logger.Job(ctx, jobName, "")
|
|
logger.Info("fetching dataset", "dataset", datasetName)
|
|
|
|
// Validate dataset size and run cleanup if needed
|
|
if err := dm.ValidateDatasetWithCleanup(datasetName); err != nil {
|
|
return &errtypes.DataFetchError{
|
|
Dataset: datasetName,
|
|
JobName: jobName,
|
|
Err: fmt.Errorf("dataset size validation failed: %w", err),
|
|
}
|
|
}
|
|
|
|
nasPath := filepath.Join(dm.config.NASDataDir, datasetName)
|
|
mlPath := filepath.Join(dm.config.MLDataDir, datasetName)
|
|
|
|
// Check if dataset exists on NAS
|
|
if !dm.nasServer.FileExists(nasPath) {
|
|
return &errtypes.DataFetchError{
|
|
Dataset: datasetName,
|
|
JobName: jobName,
|
|
Err: fmt.Errorf("dataset not found on NAS"),
|
|
}
|
|
}
|
|
|
|
// Check if already on ML server
|
|
if dm.mlServer.FileExists(mlPath) {
|
|
logger.Info("dataset already on ML server", "dataset", datasetName)
|
|
dm.updateLastAccess(datasetName)
|
|
return nil
|
|
}
|
|
|
|
// Get size for progress tracking
|
|
size, err := dm.nasServer.GetFileSize(nasPath)
|
|
if err != nil {
|
|
logger.Warn("could not get dataset size", "dataset", datasetName, "error", err)
|
|
size = 0
|
|
}
|
|
|
|
sizeGB := float64(size) / (1024 * 1024 * 1024)
|
|
logger.Info("transferring dataset",
|
|
"dataset", datasetName,
|
|
"size_gb", sizeGB,
|
|
"nas_path", nasPath,
|
|
"ml_path", mlPath)
|
|
|
|
if dm.taskQueue != nil {
|
|
redisClient := dm.taskQueue.GetRedisClient()
|
|
if err := redisClient.HSet(dm.ctx, fmt.Sprintf("ml:data:transfer:%s", datasetName),
|
|
"status", "transferring",
|
|
"job_name", jobName,
|
|
"size_bytes", size,
|
|
"started_at", time.Now().Unix()).Err(); err != nil {
|
|
logger.Warn("failed to record transfer start in Redis", "error", err)
|
|
}
|
|
}
|
|
|
|
// Use local copy for local mode, rsync for remote mode
|
|
var rsyncCmd string
|
|
if dm.config.NASHost == "" || dm.config.NASUser == "" {
|
|
// Local mode - use cp
|
|
rsyncCmd = fmt.Sprintf("mkdir -p %s && cp -r %s %s/", dm.config.MLDataDir, nasPath, mlPath)
|
|
} else {
|
|
// Remote mode - use rsync over SSH
|
|
rsyncCmd = fmt.Sprintf(
|
|
"mkdir -p %s && rsync -avz --progress %s@%s:%s/ %s/",
|
|
dm.config.MLDataDir,
|
|
dm.config.NASUser,
|
|
dm.config.NASHost,
|
|
nasPath,
|
|
mlPath,
|
|
)
|
|
}
|
|
|
|
ioBefore, ioErr := telemetry.ReadProcessIO()
|
|
start := time.Now()
|
|
out, err := telemetry.ExecWithMetrics(dm.logger, "dataset transfer", time.Since(start), func() (string, error) {
|
|
return dm.nasServer.ExecContext(ctx, rsyncCmd)
|
|
})
|
|
duration := time.Since(start)
|
|
|
|
if err != nil {
|
|
logger.Error("transfer failed",
|
|
"dataset", datasetName,
|
|
"duration", duration,
|
|
"error", err,
|
|
"output", out)
|
|
|
|
if ioErr == nil {
|
|
if after, readErr := telemetry.ReadProcessIO(); readErr == nil {
|
|
delta := telemetry.DiffIO(ioBefore, after)
|
|
logger.Debug("transfer io stats",
|
|
"dataset", datasetName,
|
|
"read_bytes", delta.ReadBytes,
|
|
"write_bytes", delta.WriteBytes)
|
|
}
|
|
}
|
|
|
|
if dm.taskQueue != nil {
|
|
redisClient := dm.taskQueue.GetRedisClient()
|
|
if redisErr := redisClient.HSet(dm.ctx, fmt.Sprintf("ml:data:transfer:%s", datasetName),
|
|
"status", "failed",
|
|
"error", err.Error()).Err(); redisErr != nil {
|
|
logger.Warn("failed to record transfer failure in Redis", "error", redisErr)
|
|
}
|
|
}
|
|
return err
|
|
}
|
|
|
|
logger.Info("transfer complete",
|
|
"dataset", datasetName,
|
|
"duration", duration,
|
|
"size_gb", sizeGB)
|
|
|
|
if ioErr == nil {
|
|
if after, readErr := telemetry.ReadProcessIO(); readErr == nil {
|
|
delta := telemetry.DiffIO(ioBefore, after)
|
|
logger.Debug("transfer io stats",
|
|
"dataset", datasetName,
|
|
"read_bytes", delta.ReadBytes,
|
|
"write_bytes", delta.WriteBytes)
|
|
}
|
|
}
|
|
|
|
if dm.taskQueue != nil {
|
|
redisClient := dm.taskQueue.GetRedisClient()
|
|
if err := redisClient.HSet(dm.ctx, fmt.Sprintf("ml:data:transfer:%s", datasetName),
|
|
"status", "completed",
|
|
"completed_at", time.Now().Unix(),
|
|
"duration_seconds", duration.Seconds()).Err(); err != nil {
|
|
logger.Warn("failed to record transfer completion in Redis", "error", err)
|
|
}
|
|
}
|
|
|
|
// Track dataset metadata
|
|
dm.saveDatasetInfo(datasetName, size)
|
|
|
|
return nil
|
|
}
|
|
|
|
func (dm *DataManager) saveDatasetInfo(name string, size int64) {
|
|
if dm.taskQueue == nil {
|
|
return // Skip in local mode
|
|
}
|
|
|
|
info := DatasetInfo{
|
|
Name: name,
|
|
SizeBytes: size,
|
|
Location: "ml",
|
|
LastAccess: time.Now(),
|
|
}
|
|
|
|
data, _ := json.Marshal(info)
|
|
if dm.taskQueue != nil {
|
|
redisClient := dm.taskQueue.GetRedisClient()
|
|
if err := redisClient.Set(dm.ctx, fmt.Sprintf("ml:dataset:%s", name), data, 0).Err(); err != nil {
|
|
dm.logger.Job(dm.ctx, "data_manager", "").Warn("failed to save dataset info to Redis",
|
|
"dataset", name, "error", err)
|
|
}
|
|
}
|
|
}
|
|
|
|
func (dm *DataManager) updateLastAccess(name string) {
|
|
if dm.taskQueue == nil {
|
|
return // Skip in local mode
|
|
}
|
|
|
|
key := fmt.Sprintf("ml:dataset:%s", name)
|
|
redisClient := dm.taskQueue.GetRedisClient()
|
|
data, err := redisClient.Get(dm.ctx, key).Result()
|
|
if err != nil {
|
|
return
|
|
}
|
|
|
|
var info DatasetInfo
|
|
if err := json.Unmarshal([]byte(data), &info); err != nil {
|
|
return
|
|
}
|
|
|
|
info.LastAccess = time.Now()
|
|
newData, _ := json.Marshal(info)
|
|
redisClient = dm.taskQueue.GetRedisClient()
|
|
if err := redisClient.Set(dm.ctx, key, newData, 0).Err(); err != nil {
|
|
dm.logger.Job(dm.ctx, "data_manager", "").Warn("failed to update last access in Redis",
|
|
"dataset", name, "error", err)
|
|
}
|
|
}
|
|
|
|
// ListDatasetsOnML returns a list of all datasets currently stored on the ML server.
|
|
func (dm *DataManager) ListDatasetsOnML() ([]DatasetInfo, error) {
|
|
out, err := dm.mlServer.Exec(fmt.Sprintf("ls -1 %s 2>/dev/null", dm.config.MLDataDir))
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
|
|
var datasets []DatasetInfo
|
|
for name := range strings.SplitSeq(strings.TrimSpace(out), "\n") {
|
|
if name == "" {
|
|
continue
|
|
}
|
|
|
|
var info DatasetInfo
|
|
|
|
// Only use Redis if available
|
|
if dm.taskQueue != nil {
|
|
redisClient := dm.taskQueue.GetRedisClient()
|
|
key := fmt.Sprintf("ml:dataset:%s", name)
|
|
data, err := redisClient.Get(dm.ctx, key).Result()
|
|
|
|
if err == nil {
|
|
if unmarshalErr := json.Unmarshal([]byte(data), &info); unmarshalErr != nil {
|
|
// Fallback to disk if unmarshal fails
|
|
size, _ := dm.mlServer.GetFileSize(filepath.Join(dm.config.MLDataDir, name))
|
|
info = DatasetInfo{
|
|
Name: name,
|
|
SizeBytes: size,
|
|
Location: "ml",
|
|
}
|
|
}
|
|
} else {
|
|
// Fallback: get from disk
|
|
size, _ := dm.mlServer.GetFileSize(filepath.Join(dm.config.MLDataDir, name))
|
|
info = DatasetInfo{
|
|
Name: name,
|
|
SizeBytes: size,
|
|
Location: "ml",
|
|
}
|
|
}
|
|
} else {
|
|
// Local mode: get from disk
|
|
size, _ := dm.mlServer.GetFileSize(filepath.Join(dm.config.MLDataDir, name))
|
|
info = DatasetInfo{
|
|
Name: name,
|
|
SizeBytes: size,
|
|
Location: "ml",
|
|
}
|
|
}
|
|
|
|
datasets = append(datasets, info)
|
|
}
|
|
|
|
return datasets, nil
|
|
}
|
|
|
|
// CleanupOldData removes old datasets based on age and size limits.
|
|
func (dm *DataManager) CleanupOldData() error {
|
|
logger := dm.logger.Job(dm.ctx, "data_manager", "")
|
|
logger.Info("running data cleanup")
|
|
|
|
datasets, err := dm.ListDatasetsOnML()
|
|
if err != nil {
|
|
return err
|
|
}
|
|
|
|
var totalSize int64
|
|
for _, ds := range datasets {
|
|
totalSize += ds.SizeBytes
|
|
}
|
|
|
|
totalSizeGB := float64(totalSize) / (1024 * 1024 * 1024)
|
|
logger.Info("current storage usage",
|
|
"total_size_gb", totalSizeGB,
|
|
"dataset_count", len(datasets))
|
|
|
|
// Delete datasets older than max age or if over size limit
|
|
maxAge := time.Duration(dm.config.MaxAgeHours) * time.Hour
|
|
maxSize := int64(dm.config.MaxSizeGB) * 1024 * 1024 * 1024
|
|
|
|
var deleted []string
|
|
for _, ds := range datasets {
|
|
shouldDelete := false
|
|
|
|
// Check age
|
|
if !ds.LastAccess.IsZero() && time.Since(ds.LastAccess) > maxAge {
|
|
logger.Info("dataset is old, marking for deletion",
|
|
"dataset", ds.Name,
|
|
"last_access", ds.LastAccess,
|
|
"age_hours", time.Since(ds.LastAccess).Hours())
|
|
shouldDelete = true
|
|
}
|
|
|
|
// Check if over size limit
|
|
if totalSize > maxSize {
|
|
logger.Info("over size limit, deleting oldest dataset",
|
|
"dataset", ds.Name,
|
|
"current_size_gb", totalSizeGB,
|
|
"max_size_gb", dm.config.MaxSizeGB)
|
|
shouldDelete = true
|
|
}
|
|
|
|
if shouldDelete {
|
|
path := filepath.Join(dm.config.MLDataDir, ds.Name)
|
|
logger.Info("deleting dataset", "dataset", ds.Name, "path", path)
|
|
|
|
if _, err := dm.mlServer.Exec(fmt.Sprintf("rm -rf %s", path)); err != nil {
|
|
logger.Error("failed to delete dataset",
|
|
"dataset", ds.Name,
|
|
"error", err)
|
|
continue
|
|
}
|
|
|
|
deleted = append(deleted, ds.Name)
|
|
totalSize -= ds.SizeBytes
|
|
|
|
// FIXED: Remove from Redis only if available, with error handling
|
|
if dm.taskQueue != nil {
|
|
redisClient := dm.taskQueue.GetRedisClient()
|
|
if err := redisClient.Del(dm.ctx, fmt.Sprintf("ml:dataset:%s", ds.Name)).Err(); err != nil {
|
|
logger.Warn("failed to delete dataset from Redis",
|
|
"dataset", ds.Name,
|
|
"error", err)
|
|
}
|
|
}
|
|
}
|
|
}
|
|
|
|
if len(deleted) > 0 {
|
|
logger.Info("cleanup complete",
|
|
"deleted_count", len(deleted),
|
|
"deleted_datasets", deleted)
|
|
} else {
|
|
logger.Info("cleanup complete", "deleted_count", 0)
|
|
}
|
|
|
|
return nil
|
|
}
|
|
|
|
// GetAvailableDiskSpace returns available disk space in bytes.
|
|
func (dm *DataManager) GetAvailableDiskSpace() int64 {
|
|
logger := dm.logger.Job(dm.ctx, "data_manager", "")
|
|
|
|
// Check disk space on ML server
|
|
cmd := "df -k " + dm.config.MLDataDir + " | tail -1 | awk '{print $4}'"
|
|
output, err := dm.mlServer.Exec(cmd)
|
|
if err != nil {
|
|
logger.Error("failed to get disk space", "error", err)
|
|
return 0
|
|
}
|
|
|
|
// Parse KB to bytes
|
|
var freeKB int64
|
|
_, err = fmt.Sscanf(strings.TrimSpace(output), "%d", &freeKB)
|
|
if err != nil {
|
|
logger.Error("failed to parse disk space", "error", err, "output", output)
|
|
return 0
|
|
}
|
|
|
|
return freeKB * 1024 // Convert KB to bytes
|
|
}
|
|
|
|
// GetDatasetInfo returns information about a dataset from NAS.
|
|
func (dm *DataManager) GetDatasetInfo(datasetName string) (*DatasetInfo, error) {
|
|
// Check if dataset exists on NAS
|
|
nasPath := filepath.Join(dm.config.NASDataDir, datasetName)
|
|
cmd := fmt.Sprintf("test -d %s && echo 'exists'", nasPath)
|
|
output, err := dm.nasServer.Exec(cmd)
|
|
if err != nil || strings.TrimSpace(output) != "exists" {
|
|
return nil, fmt.Errorf("dataset %s not found on NAS", datasetName)
|
|
}
|
|
|
|
// Get dataset size
|
|
cmd = fmt.Sprintf("du -sb %s | cut -f1", nasPath)
|
|
output, err = dm.nasServer.Exec(cmd)
|
|
if err != nil {
|
|
return nil, fmt.Errorf("failed to get dataset size: %w", err)
|
|
}
|
|
|
|
var sizeBytes int64
|
|
_, err = fmt.Sscanf(strings.TrimSpace(output), "%d", &sizeBytes)
|
|
if err != nil {
|
|
return nil, fmt.Errorf("failed to parse dataset size: %w", err)
|
|
}
|
|
|
|
// Get last modification time as proxy for last access
|
|
cmd = fmt.Sprintf("stat -c %%Y %s", nasPath)
|
|
output, err = dm.nasServer.Exec(cmd)
|
|
if err != nil {
|
|
return nil, fmt.Errorf("failed to get dataset timestamp: %w", err)
|
|
}
|
|
|
|
var modTime int64
|
|
_, err = fmt.Sscanf(strings.TrimSpace(output), "%d", &modTime)
|
|
if err != nil {
|
|
return nil, fmt.Errorf("failed to parse timestamp: %w", err)
|
|
}
|
|
|
|
return &DatasetInfo{
|
|
Name: datasetName,
|
|
SizeBytes: sizeBytes,
|
|
Location: "nas",
|
|
LastAccess: time.Unix(modTime, 0),
|
|
}, nil
|
|
}
|
|
|
|
// ValidateDatasetWithCleanup checks if dataset fits and runs cleanup if needed.
|
|
func (dm *DataManager) ValidateDatasetWithCleanup(datasetName string) error {
|
|
logger := dm.logger.Job(dm.ctx, "data_manager", "")
|
|
|
|
// Get dataset info
|
|
info, err := dm.GetDatasetInfo(datasetName)
|
|
if err != nil {
|
|
return fmt.Errorf("failed to get dataset info: %w", err)
|
|
}
|
|
|
|
// Check current available space
|
|
availableSpace := dm.GetAvailableDiskSpace()
|
|
|
|
logger.Info("dataset size validation",
|
|
"dataset", datasetName,
|
|
"dataset_size_gb", float64(info.SizeBytes)/(1024*1024*1024),
|
|
"available_gb", float64(availableSpace)/(1024*1024*1024))
|
|
|
|
// If enough space, proceed
|
|
if info.SizeBytes <= availableSpace {
|
|
logger.Info("sufficient space available", "dataset", datasetName)
|
|
return nil
|
|
}
|
|
|
|
// Try cleanup first
|
|
logger.Info("insufficient space, running cleanup",
|
|
"dataset", datasetName,
|
|
"required_gb", float64(info.SizeBytes)/(1024*1024*1024),
|
|
"available_gb", float64(availableSpace)/(1024*1024*1024))
|
|
|
|
if err := dm.CleanupOldData(); err != nil {
|
|
return fmt.Errorf("cleanup failed: %w", err)
|
|
}
|
|
|
|
// Check space again after cleanup
|
|
availableSpace = dm.GetAvailableDiskSpace()
|
|
logger.Info("space after cleanup",
|
|
"available_gb", float64(availableSpace)/(1024*1024*1024))
|
|
|
|
// If now enough space, proceed
|
|
if info.SizeBytes <= availableSpace {
|
|
logger.Info("cleanup freed enough space", "dataset", datasetName)
|
|
return nil
|
|
}
|
|
|
|
// Still not enough space
|
|
return fmt.Errorf("dataset %s (%.2fGB) too large for available space (%.2fGB) even after cleanup",
|
|
datasetName,
|
|
float64(info.SizeBytes)/(1024*1024*1024),
|
|
float64(availableSpace)/(1024*1024*1024))
|
|
}
|
|
|
|
// StartCleanupLoop starts the periodic cleanup loop.
|
|
func (dm *DataManager) StartCleanupLoop() {
|
|
logger := dm.logger.Job(dm.ctx, "data_manager", "")
|
|
ticker := time.NewTicker(time.Duration(dm.config.CleanupInterval) * time.Minute)
|
|
go func() {
|
|
defer ticker.Stop()
|
|
for {
|
|
select {
|
|
case <-dm.ctx.Done():
|
|
logger.Info("cleanup loop stopping")
|
|
return
|
|
case <-ticker.C:
|
|
if err := dm.CleanupOldData(); err != nil {
|
|
logger.Error("cleanup error", "error", err)
|
|
}
|
|
}
|
|
}
|
|
}()
|
|
}
|
|
|
|
// Close gracefully shuts down the DataManager, stopping the cleanup loop and
|
|
// closing all connections to ML server, NAS server, and Redis.
|
|
func (dm *DataManager) Close() {
|
|
dm.cancel() // Cancel context to stop cleanup loop
|
|
|
|
// Wait a moment for cleanup loop to finish
|
|
time.Sleep(100 * time.Millisecond)
|
|
|
|
if dm.mlServer != nil {
|
|
if err := dm.mlServer.Close(); err != nil {
|
|
dm.logger.Job(dm.ctx, "data_manager", "").Warn("error closing ML server connection", "error", err)
|
|
}
|
|
}
|
|
if dm.nasServer != nil {
|
|
if err := dm.nasServer.Close(); err != nil {
|
|
dm.logger.Job(dm.ctx, "data_manager", "").Warn("error closing NAS server connection", "error", err)
|
|
}
|
|
}
|
|
if dm.taskQueue != nil {
|
|
if err := dm.taskQueue.Close(); err != nil {
|
|
dm.logger.Job(dm.ctx, "data_manager", "").Warn("error closing Redis connection", "error", err)
|
|
}
|
|
}
|
|
}
|
|
|
|
func main() {
|
|
// Parse authentication flags
|
|
authFlags := auth.ParseAuthFlags()
|
|
if err := auth.ValidateFlags(authFlags); err != nil {
|
|
log.Fatalf("Authentication flag error: %v", err)
|
|
}
|
|
|
|
// Get API key from various sources
|
|
apiKey := auth.GetAPIKeyFromSources(authFlags)
|
|
|
|
configFile := "configs/config-local.yaml"
|
|
if authFlags.ConfigFile != "" {
|
|
configFile = authFlags.ConfigFile
|
|
}
|
|
|
|
// Parse command line args
|
|
if len(os.Args) < 2 {
|
|
fmt.Println("Usage:")
|
|
fmt.Println(" data_manager [--config configs/config-local.yaml] [--api-key <key>] " +
|
|
"fetch <job-name> <dataset> [dataset...]")
|
|
fmt.Println(" data_manager [--config configs/config-local.yaml] [--api-key <key>] list")
|
|
fmt.Println(" data_manager [--config configs/config-local.yaml] [--api-key <key>] cleanup")
|
|
fmt.Println(" data_manager [--config configs/config-local.yaml] [--api-key <key>] validate <dataset>")
|
|
fmt.Println(" data_manager [--config configs/config-local.yaml] [--api-key <key>] daemon")
|
|
fmt.Println()
|
|
auth.PrintAuthHelp()
|
|
os.Exit(1)
|
|
}
|
|
|
|
// Check for --config flag
|
|
if len(os.Args) >= 3 && os.Args[1] == "--config" {
|
|
configFile = os.Args[2]
|
|
// Shift args
|
|
os.Args = append([]string{os.Args[0]}, os.Args[3:]...)
|
|
}
|
|
|
|
cfg, err := LoadDataConfig(configFile)
|
|
if err != nil {
|
|
log.Fatalf("Failed to load config: %v", err)
|
|
}
|
|
|
|
// Validate authentication configuration
|
|
if err := cfg.Auth.ValidateAuthConfig(); err != nil {
|
|
log.Fatalf("Invalid authentication configuration: %v", err)
|
|
}
|
|
|
|
// Validate configuration
|
|
if err := cfg.Validate(); err != nil {
|
|
log.Fatalf("Invalid configuration: %v", err)
|
|
}
|
|
|
|
// Test authentication if enabled
|
|
if cfg.Auth.Enabled && apiKey != "" {
|
|
user, err := cfg.Auth.ValidateAPIKey(apiKey)
|
|
if err != nil {
|
|
log.Fatalf("Authentication failed: %v", err)
|
|
}
|
|
log.Printf("Data manager authenticated as user: %s (admin: %v)", user.Name, user.Admin)
|
|
} else if cfg.Auth.Enabled {
|
|
log.Fatal("Authentication required but no API key provided")
|
|
}
|
|
|
|
dm, err := NewDataManager(cfg, apiKey)
|
|
if err != nil {
|
|
log.Fatalf("Failed to create data manager: %v", err)
|
|
}
|
|
defer dm.Close()
|
|
|
|
cmd := os.Args[1]
|
|
|
|
switch cmd {
|
|
case "fetch":
|
|
if len(os.Args) < 4 {
|
|
log.Printf("Usage: data_manager fetch <job-name> <dataset> [dataset...]")
|
|
return
|
|
}
|
|
jobName := os.Args[2]
|
|
datasets := os.Args[3:]
|
|
|
|
for _, dataset := range datasets {
|
|
if err := dm.FetchDataset(jobName, dataset); err != nil {
|
|
dm.logger.Job(context.Background(), jobName, "").Error("failed to fetch dataset",
|
|
"dataset", dataset,
|
|
"error", err)
|
|
}
|
|
}
|
|
|
|
case "list":
|
|
datasets, err := dm.ListDatasetsOnML()
|
|
if err != nil {
|
|
log.Printf("Failed to list datasets: %v", err)
|
|
return
|
|
}
|
|
|
|
fmt.Println("Datasets on ML server:")
|
|
fmt.Println("======================")
|
|
var totalSize int64
|
|
for _, ds := range datasets {
|
|
sizeMB := float64(ds.SizeBytes) / (1024 * 1024)
|
|
lastAccess := "unknown"
|
|
if !ds.LastAccess.IsZero() {
|
|
lastAccess = ds.LastAccess.Format("2006-01-02 15:04:05")
|
|
}
|
|
fmt.Printf("%-30s %10.2f MB Last access: %s\n", ds.Name, sizeMB, lastAccess)
|
|
totalSize += ds.SizeBytes
|
|
}
|
|
fmt.Printf("\nTotal: %.2f GB\n", float64(totalSize)/(1024*1024*1024))
|
|
|
|
case "validate":
|
|
if len(os.Args) < 3 {
|
|
log.Printf("Usage: data_manager validate <dataset>")
|
|
return
|
|
}
|
|
dataset := os.Args[2]
|
|
|
|
fmt.Printf("Validating dataset: %s\n", dataset)
|
|
if err := dm.ValidateDatasetWithCleanup(dataset); err != nil {
|
|
log.Printf("Validation failed: %v", err)
|
|
return
|
|
}
|
|
fmt.Printf("✅ Dataset %s can be downloaded\n", dataset)
|
|
|
|
case "cleanup":
|
|
if err := dm.CleanupOldData(); err != nil {
|
|
log.Printf("Cleanup failed: %v", err)
|
|
return
|
|
}
|
|
|
|
case "daemon":
|
|
logger := dm.logger.Job(context.Background(), "data_manager", "")
|
|
logger.Info("starting data manager daemon")
|
|
dm.StartCleanupLoop()
|
|
logger.Info("cleanup configuration",
|
|
"interval_minutes", cfg.CleanupInterval,
|
|
"max_age_hours", cfg.MaxAgeHours,
|
|
"max_size_gb", cfg.MaxSizeGB)
|
|
|
|
// Handle graceful shutdown
|
|
sigChan := make(chan os.Signal, 1)
|
|
signal.Notify(sigChan, syscall.SIGINT, syscall.SIGTERM)
|
|
|
|
sig := <-sigChan
|
|
logger.Info("received shutdown signal", "signal", sig)
|
|
dm.Close()
|
|
logger.Info("data manager shut down gracefully")
|
|
|
|
default:
|
|
log.Printf("Unknown command: %s", cmd)
|
|
}
|
|
}
|