Update queue and storage systems for scheduler integration: - Queue backend with scheduler coordination - Filesystem queue with batch operations - Deduplication with tenant-aware keys - Storage layer with audit logging hooks - Domain models (Task, Events, Errors) with scheduler fields - Database layer with tenant isolation - Dataset storage with integrity checks
144 lines
3.9 KiB
Go
144 lines
3.9 KiB
Go
// Package storage provides storage abstractions for datasets and transfer tracking.
|
|
package storage
|
|
|
|
import (
|
|
"context"
|
|
"encoding/json"
|
|
"fmt"
|
|
"time"
|
|
|
|
"github.com/redis/go-redis/v9"
|
|
)
|
|
|
|
// DatasetInfo contains information about a dataset.
|
|
type DatasetInfo struct {
|
|
LastAccess time.Time `json:"last_access"`
|
|
Name string `json:"name"`
|
|
Location string `json:"location"`
|
|
SizeBytes int64 `json:"size_bytes"`
|
|
}
|
|
|
|
// DatasetStore manages dataset metadata and transfer tracking.
|
|
type DatasetStore struct {
|
|
client redis.UniversalClient
|
|
ctx context.Context
|
|
}
|
|
|
|
// NewDatasetStore creates a new DatasetStore with the given Redis client.
|
|
func NewDatasetStore(client redis.UniversalClient) *DatasetStore {
|
|
return &DatasetStore{
|
|
client: client,
|
|
ctx: context.Background(),
|
|
}
|
|
}
|
|
|
|
// NewDatasetStoreWithContext creates a new DatasetStore with a custom context.
|
|
func NewDatasetStoreWithContext(client redis.UniversalClient, ctx context.Context) *DatasetStore {
|
|
return &DatasetStore{
|
|
client: client,
|
|
ctx: ctx,
|
|
}
|
|
}
|
|
|
|
// datasetKey returns the Redis key for dataset info.
|
|
func datasetKey(name string) string {
|
|
return fmt.Sprintf("ml:dataset:%s", name)
|
|
}
|
|
|
|
// transferKey returns the Redis key for transfer tracking.
|
|
func transferKey(datasetName string) string {
|
|
return fmt.Sprintf("ml:data:transfer:%s", datasetName)
|
|
}
|
|
|
|
// RecordTransferStart records the start of a dataset transfer.
|
|
func (s *DatasetStore) RecordTransferStart(ctx context.Context, datasetName, jobName string, sizeBytes int64) error {
|
|
if s.client == nil {
|
|
return nil
|
|
}
|
|
return s.client.HSet(ctx, transferKey(datasetName),
|
|
"status", "transferring",
|
|
"job_name", jobName,
|
|
"size_bytes", sizeBytes,
|
|
"started_at", time.Now().Unix(),
|
|
).Err()
|
|
}
|
|
|
|
// RecordTransferComplete records the successful completion of a dataset transfer.
|
|
func (s *DatasetStore) RecordTransferComplete(ctx context.Context, datasetName string, duration time.Duration) error {
|
|
if s.client == nil {
|
|
return nil
|
|
}
|
|
return s.client.HSet(ctx, transferKey(datasetName),
|
|
"status", "completed",
|
|
"completed_at", time.Now().Unix(),
|
|
"duration_seconds", duration.Seconds(),
|
|
).Err()
|
|
}
|
|
|
|
// RecordTransferFailure records a failed dataset transfer.
|
|
func (s *DatasetStore) RecordTransferFailure(ctx context.Context, datasetName string, transferErr error) error {
|
|
if s.client == nil {
|
|
return nil
|
|
}
|
|
return s.client.HSet(ctx, transferKey(datasetName),
|
|
"status", "failed",
|
|
"error", transferErr.Error(),
|
|
).Err()
|
|
}
|
|
|
|
// SaveDatasetInfo saves dataset metadata to Redis.
|
|
func (s *DatasetStore) SaveDatasetInfo(ctx context.Context, info DatasetInfo) error {
|
|
if s.client == nil {
|
|
return nil
|
|
}
|
|
data, err := json.Marshal(info)
|
|
if err != nil {
|
|
return fmt.Errorf("failed to marshal dataset info: %w", err)
|
|
}
|
|
return s.client.Set(ctx, datasetKey(info.Name), data, 0).Err()
|
|
}
|
|
|
|
// GetDatasetInfo retrieves dataset metadata from Redis.
|
|
func (s *DatasetStore) GetDatasetInfo(ctx context.Context, name string) (*DatasetInfo, error) {
|
|
if s.client == nil {
|
|
return nil, nil
|
|
}
|
|
data, err := s.client.Get(ctx, datasetKey(name)).Result()
|
|
if err == redis.Nil {
|
|
return nil, nil
|
|
}
|
|
if err != nil {
|
|
return nil, fmt.Errorf("failed to get dataset info: %w", err)
|
|
}
|
|
|
|
var info DatasetInfo
|
|
if err := json.Unmarshal([]byte(data), &info); err != nil {
|
|
return nil, fmt.Errorf("failed to unmarshal dataset info: %w", err)
|
|
}
|
|
return &info, nil
|
|
}
|
|
|
|
// UpdateLastAccess updates the last access time for a dataset.
|
|
func (s *DatasetStore) UpdateLastAccess(ctx context.Context, name string) error {
|
|
if s.client == nil {
|
|
return nil
|
|
}
|
|
info, err := s.GetDatasetInfo(ctx, name)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
if info == nil {
|
|
return nil // No record to update
|
|
}
|
|
|
|
info.LastAccess = time.Now()
|
|
return s.SaveDatasetInfo(ctx, *info)
|
|
}
|
|
|
|
// DeleteDatasetInfo removes dataset metadata from Redis.
|
|
func (s *DatasetStore) DeleteDatasetInfo(ctx context.Context, name string) error {
|
|
if s.client == nil {
|
|
return nil
|
|
}
|
|
return s.client.Del(ctx, datasetKey(name)).Err()
|
|
}
|