fetch_ml/internal/storage/dataset_test.go
Jeremie Fraeys 50b6506243
test(storage): add comprehensive storage layer tests
Add tests for:
- dataset: Redis dataset operations, transfer tracking
- db_audit: audit logging with hash chain, access tracking
- db_experiments: experiment metadata, dataset associations
- db_tasks: task listing with pagination for users and groups
- db_jobs: job CRUD, state transitions, worker assignment

Coverage: storage package ~40%+
2026-03-13 23:26:33 -04:00

296 lines
7.3 KiB
Go

package storage_test
import (
"context"
"encoding/json"
"errors"
"testing"
"time"
"github.com/jfraeys/fetch_ml/internal/storage"
tests "github.com/jfraeys/fetch_ml/tests/fixtures"
"github.com/redis/go-redis/v9"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
)
// setupDatasetStore creates a DatasetStore with a test Redis client
func setupDatasetStore(t *testing.T) *storage.DatasetStore {
t.Helper()
cleanup := tests.EnsureRedis(t)
t.Cleanup(cleanup)
client := redis.NewClient(&redis.Options{
Addr: "localhost:6379",
DB: 15, // Use a separate DB for tests
})
ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second)
defer cancel()
err := client.Ping(ctx).Err()
require.NoError(t, err, "Redis must be available")
// Clean up test DB
err = client.FlushDB(ctx).Err()
require.NoError(t, err)
store := storage.NewDatasetStore(client)
return store
}
// TestNewDatasetStore tests the constructor
func TestNewDatasetStore(t *testing.T) {
t.Parallel()
cleanup := tests.EnsureRedis(t)
defer cleanup()
client := redis.NewClient(&redis.Options{
Addr: "localhost:6379",
DB: 15,
})
store := storage.NewDatasetStore(client)
require.NotNil(t, store)
}
// TestNewDatasetStoreWithContext tests constructor with custom context
func TestNewDatasetStoreWithContext(t *testing.T) {
t.Parallel()
cleanup := tests.EnsureRedis(t)
defer cleanup()
client := redis.NewClient(&redis.Options{
Addr: "localhost:6379",
DB: 15,
})
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
store := storage.NewDatasetStoreWithContext(client, ctx)
require.NotNil(t, store)
}
// TestRecordTransferStart tests recording transfer start
func TestRecordTransferStart(t *testing.T) {
t.Parallel()
store := setupDatasetStore(t)
ctx := context.Background()
err := store.RecordTransferStart(ctx, "dataset-1", "job-1", 1024*1024)
require.NoError(t, err)
}
// TestRecordTransferComplete tests recording successful transfer
func TestRecordTransferComplete(t *testing.T) {
t.Parallel()
store := setupDatasetStore(t)
ctx := context.Background()
datasetName := "dataset-complete"
jobName := "job-transfer"
// Start transfer
err := store.RecordTransferStart(ctx, datasetName, jobName, 1024)
require.NoError(t, err)
// Complete transfer
duration := 5 * time.Second
err = store.RecordTransferComplete(ctx, datasetName, duration)
require.NoError(t, err)
}
// TestRecordTransferFailure tests recording failed transfer
func TestRecordTransferFailure(t *testing.T) {
t.Parallel()
store := setupDatasetStore(t)
ctx := context.Background()
datasetName := "dataset-fail"
transferErr := errors.New("network timeout")
err := store.RecordTransferFailure(ctx, datasetName, transferErr)
require.NoError(t, err)
}
// TestSaveDatasetInfo tests saving dataset metadata
func TestSaveDatasetInfo(t *testing.T) {
t.Parallel()
store := setupDatasetStore(t)
ctx := context.Background()
info := storage.DatasetInfo{
Name: "test-dataset",
Location: "s3://bucket/dataset",
SizeBytes: 1024 * 1024 * 100, // 100MB
LastAccess: time.Now(),
}
err := store.SaveDatasetInfo(ctx, info)
require.NoError(t, err)
// Verify by retrieving
retrieved, err := store.GetDatasetInfo(ctx, info.Name)
require.NoError(t, err)
require.NotNil(t, retrieved)
assert.Equal(t, info.Name, retrieved.Name)
assert.Equal(t, info.Location, retrieved.Location)
assert.Equal(t, info.SizeBytes, retrieved.SizeBytes)
}
// TestGetDatasetInfo tests retrieving dataset metadata
func TestGetDatasetInfo(t *testing.T) {
t.Parallel()
store := setupDatasetStore(t)
ctx := context.Background()
// Test nonexistent dataset
info, err := store.GetDatasetInfo(ctx, "nonexistent-dataset")
require.NoError(t, err)
assert.Nil(t, info)
// Save and retrieve
savedInfo := storage.DatasetInfo{
Name: "existing-dataset",
Location: "s3://bucket/data",
}
err = store.SaveDatasetInfo(ctx, savedInfo)
require.NoError(t, err)
retrieved, err := store.GetDatasetInfo(ctx, savedInfo.Name)
require.NoError(t, err)
require.NotNil(t, retrieved)
assert.Equal(t, savedInfo.Name, retrieved.Name)
assert.Equal(t, savedInfo.Location, retrieved.Location)
}
// TestUpdateLastAccess tests updating last access time
func TestUpdateLastAccess(t *testing.T) {
t.Parallel()
store := setupDatasetStore(t)
ctx := context.Background()
datasetName := "access-test-dataset"
// Update nonexistent dataset should not error
err := store.UpdateLastAccess(ctx, datasetName)
require.NoError(t, err)
// Create dataset info
info := storage.DatasetInfo{
Name: datasetName,
Location: "s3://bucket/test",
LastAccess: time.Now().Add(-1 * time.Hour),
}
err = store.SaveDatasetInfo(ctx, info)
require.NoError(t, err)
// Update last access
time.Sleep(10 * time.Millisecond) // Ensure time difference
err = store.UpdateLastAccess(ctx, datasetName)
require.NoError(t, err)
// Verify updated
retrieved, err := store.GetDatasetInfo(ctx, datasetName)
require.NoError(t, err)
require.NotNil(t, retrieved)
assert.True(t, retrieved.LastAccess.After(info.LastAccess))
}
// TestDeleteDatasetInfo tests deleting dataset metadata
func TestDeleteDatasetInfo(t *testing.T) {
t.Parallel()
store := setupDatasetStore(t)
ctx := context.Background()
datasetName := "delete-test-dataset"
// Create dataset info
info := storage.DatasetInfo{
Name: datasetName,
Location: "s3://bucket/delete-test",
}
err := store.SaveDatasetInfo(ctx, info)
require.NoError(t, err)
// Verify exists
retrieved, err := store.GetDatasetInfo(ctx, datasetName)
require.NoError(t, err)
require.NotNil(t, retrieved)
// Delete
err = store.DeleteDatasetInfo(ctx, datasetName)
require.NoError(t, err)
// Verify deleted
retrieved, err = store.GetDatasetInfo(ctx, datasetName)
require.NoError(t, err)
assert.Nil(t, retrieved)
}
// TestDatasetStoreWithNilClient tests behavior with nil client
func TestDatasetStoreWithNilClient(t *testing.T) {
t.Parallel()
// Create store with nil client
store := storage.NewDatasetStore(nil)
require.NotNil(t, store)
ctx := context.Background()
// All operations should return nil without error
err := store.RecordTransferStart(ctx, "test", "job", 100)
assert.NoError(t, err)
err = store.RecordTransferComplete(ctx, "test", time.Second)
assert.NoError(t, err)
err = store.RecordTransferFailure(ctx, "test", errors.New("test"))
assert.NoError(t, err)
err = store.SaveDatasetInfo(ctx, storage.DatasetInfo{Name: "test"})
assert.NoError(t, err)
info, err := store.GetDatasetInfo(ctx, "test")
assert.NoError(t, err)
assert.Nil(t, info)
err = store.UpdateLastAccess(ctx, "test")
assert.NoError(t, err)
err = store.DeleteDatasetInfo(ctx, "test")
assert.NoError(t, err)
}
// TestDatasetInfoJSONSerialization tests JSON marshaling/unmarshaling
func TestDatasetInfoJSONSerialization(t *testing.T) {
t.Parallel()
info := storage.DatasetInfo{
Name: "json-test",
Location: "s3://bucket/test",
SizeBytes: 123456,
LastAccess: time.Date(2024, 1, 1, 12, 0, 0, 0, time.UTC),
}
data, err := json.Marshal(info)
require.NoError(t, err)
var decoded storage.DatasetInfo
err = json.Unmarshal(data, &decoded)
require.NoError(t, err)
assert.Equal(t, info.Name, decoded.Name)
assert.Equal(t, info.Location, decoded.Location)
assert.Equal(t, info.SizeBytes, decoded.SizeBytes)
}