// Package storage provides storage abstractions for datasets and transfer tracking. package storage import ( "context" "encoding/json" "fmt" "time" "github.com/redis/go-redis/v9" ) // DatasetInfo contains information about a dataset. type DatasetInfo struct { LastAccess time.Time `json:"last_access"` Name string `json:"name"` Location string `json:"location"` SizeBytes int64 `json:"size_bytes"` } // DatasetStore manages dataset metadata and transfer tracking. type DatasetStore struct { client redis.UniversalClient ctx context.Context } // NewDatasetStore creates a new DatasetStore with the given Redis client. func NewDatasetStore(client redis.UniversalClient) *DatasetStore { return &DatasetStore{ client: client, ctx: context.Background(), } } // NewDatasetStoreWithContext creates a new DatasetStore with a custom context. func NewDatasetStoreWithContext(client redis.UniversalClient, ctx context.Context) *DatasetStore { return &DatasetStore{ client: client, ctx: ctx, } } // datasetKey returns the Redis key for dataset info. func datasetKey(name string) string { return fmt.Sprintf("ml:dataset:%s", name) } // transferKey returns the Redis key for transfer tracking. func transferKey(datasetName string) string { return fmt.Sprintf("ml:data:transfer:%s", datasetName) } // RecordTransferStart records the start of a dataset transfer. func (s *DatasetStore) RecordTransferStart(ctx context.Context, datasetName, jobName string, sizeBytes int64) error { if s.client == nil { return nil } return s.client.HSet(ctx, transferKey(datasetName), "status", "transferring", "job_name", jobName, "size_bytes", sizeBytes, "started_at", time.Now().Unix(), ).Err() } // RecordTransferComplete records the successful completion of a dataset transfer. func (s *DatasetStore) RecordTransferComplete(ctx context.Context, datasetName string, duration time.Duration) error { if s.client == nil { return nil } return s.client.HSet(ctx, transferKey(datasetName), "status", "completed", "completed_at", time.Now().Unix(), "duration_seconds", duration.Seconds(), ).Err() } // RecordTransferFailure records a failed dataset transfer. func (s *DatasetStore) RecordTransferFailure(ctx context.Context, datasetName string, transferErr error) error { if s.client == nil { return nil } return s.client.HSet(ctx, transferKey(datasetName), "status", "failed", "error", transferErr.Error(), ).Err() } // SaveDatasetInfo saves dataset metadata to Redis. func (s *DatasetStore) SaveDatasetInfo(ctx context.Context, info DatasetInfo) error { if s.client == nil { return nil } data, err := json.Marshal(info) if err != nil { return fmt.Errorf("failed to marshal dataset info: %w", err) } return s.client.Set(ctx, datasetKey(info.Name), data, 0).Err() } // GetDatasetInfo retrieves dataset metadata from Redis. func (s *DatasetStore) GetDatasetInfo(ctx context.Context, name string) (*DatasetInfo, error) { if s.client == nil { return nil, nil } data, err := s.client.Get(ctx, datasetKey(name)).Result() if err == redis.Nil { return nil, nil } if err != nil { return nil, fmt.Errorf("failed to get dataset info: %w", err) } var info DatasetInfo if err := json.Unmarshal([]byte(data), &info); err != nil { return nil, fmt.Errorf("failed to unmarshal dataset info: %w", err) } return &info, nil } // UpdateLastAccess updates the last access time for a dataset. func (s *DatasetStore) UpdateLastAccess(ctx context.Context, name string) error { if s.client == nil { return nil } info, err := s.GetDatasetInfo(ctx, name) if err != nil { return err } if info == nil { return nil // No record to update } info.LastAccess = time.Now() return s.SaveDatasetInfo(ctx, *info) } // DeleteDatasetInfo removes dataset metadata from Redis. func (s *DatasetStore) DeleteDatasetInfo(ctx context.Context, name string) error { if s.client == nil { return nil } return s.client.Del(ctx, datasetKey(name)).Err() }