fetch_ml/internal/storage/dataset.go
Jeremie Fraeys 6866ba9366
refactor(queue): integrate scheduler backend and storage improvements
Update queue and storage systems for scheduler integration:
- Queue backend with scheduler coordination
- Filesystem queue with batch operations
- Deduplication with tenant-aware keys
- Storage layer with audit logging hooks
- Domain models (Task, Events, Errors) with scheduler fields
- Database layer with tenant isolation
- Dataset storage with integrity checks
2026-02-26 12:06:46 -05:00

144 lines
3.9 KiB
Go

// Package storage provides storage abstractions for datasets and transfer tracking.
package storage
import (
"context"
"encoding/json"
"fmt"
"time"
"github.com/redis/go-redis/v9"
)
// DatasetInfo contains information about a dataset.
type DatasetInfo struct {
LastAccess time.Time `json:"last_access"`
Name string `json:"name"`
Location string `json:"location"`
SizeBytes int64 `json:"size_bytes"`
}
// DatasetStore manages dataset metadata and transfer tracking.
type DatasetStore struct {
client redis.UniversalClient
ctx context.Context
}
// NewDatasetStore creates a new DatasetStore with the given Redis client.
func NewDatasetStore(client redis.UniversalClient) *DatasetStore {
return &DatasetStore{
client: client,
ctx: context.Background(),
}
}
// NewDatasetStoreWithContext creates a new DatasetStore with a custom context.
func NewDatasetStoreWithContext(client redis.UniversalClient, ctx context.Context) *DatasetStore {
return &DatasetStore{
client: client,
ctx: ctx,
}
}
// datasetKey returns the Redis key for dataset info.
func datasetKey(name string) string {
return fmt.Sprintf("ml:dataset:%s", name)
}
// transferKey returns the Redis key for transfer tracking.
func transferKey(datasetName string) string {
return fmt.Sprintf("ml:data:transfer:%s", datasetName)
}
// RecordTransferStart records the start of a dataset transfer.
func (s *DatasetStore) RecordTransferStart(ctx context.Context, datasetName, jobName string, sizeBytes int64) error {
if s.client == nil {
return nil
}
return s.client.HSet(ctx, transferKey(datasetName),
"status", "transferring",
"job_name", jobName,
"size_bytes", sizeBytes,
"started_at", time.Now().Unix(),
).Err()
}
// RecordTransferComplete records the successful completion of a dataset transfer.
func (s *DatasetStore) RecordTransferComplete(ctx context.Context, datasetName string, duration time.Duration) error {
if s.client == nil {
return nil
}
return s.client.HSet(ctx, transferKey(datasetName),
"status", "completed",
"completed_at", time.Now().Unix(),
"duration_seconds", duration.Seconds(),
).Err()
}
// RecordTransferFailure records a failed dataset transfer.
func (s *DatasetStore) RecordTransferFailure(ctx context.Context, datasetName string, transferErr error) error {
if s.client == nil {
return nil
}
return s.client.HSet(ctx, transferKey(datasetName),
"status", "failed",
"error", transferErr.Error(),
).Err()
}
// SaveDatasetInfo saves dataset metadata to Redis.
func (s *DatasetStore) SaveDatasetInfo(ctx context.Context, info DatasetInfo) error {
if s.client == nil {
return nil
}
data, err := json.Marshal(info)
if err != nil {
return fmt.Errorf("failed to marshal dataset info: %w", err)
}
return s.client.Set(ctx, datasetKey(info.Name), data, 0).Err()
}
// GetDatasetInfo retrieves dataset metadata from Redis.
func (s *DatasetStore) GetDatasetInfo(ctx context.Context, name string) (*DatasetInfo, error) {
if s.client == nil {
return nil, nil
}
data, err := s.client.Get(ctx, datasetKey(name)).Result()
if err == redis.Nil {
return nil, nil
}
if err != nil {
return nil, fmt.Errorf("failed to get dataset info: %w", err)
}
var info DatasetInfo
if err := json.Unmarshal([]byte(data), &info); err != nil {
return nil, fmt.Errorf("failed to unmarshal dataset info: %w", err)
}
return &info, nil
}
// UpdateLastAccess updates the last access time for a dataset.
func (s *DatasetStore) UpdateLastAccess(ctx context.Context, name string) error {
if s.client == nil {
return nil
}
info, err := s.GetDatasetInfo(ctx, name)
if err != nil {
return err
}
if info == nil {
return nil // No record to update
}
info.LastAccess = time.Now()
return s.SaveDatasetInfo(ctx, *info)
}
// DeleteDatasetInfo removes dataset metadata from Redis.
func (s *DatasetStore) DeleteDatasetInfo(ctx context.Context, name string) error {
if s.client == nil {
return nil
}
return s.client.Del(ctx, datasetKey(name)).Err()
}