Phase 1: Fix Redis Schema Leak - Create internal/storage/dataset.go with DatasetStore abstraction - Remove all direct Redis calls from cmd/data_manager/data_sync.go - data_manager now uses DatasetStore for transfer tracking and metadata Phase 2: Simplify TUI Services - Embed *queue.TaskQueue directly in services.TaskQueue - Eliminate 60% of wrapper boilerplate (203 -> ~100 lines) - Keep only TUI-specific methods (EnqueueTask, GetJobStatus, experiment methods) Phase 5: Clean go.mod Dependencies - Remove duplicate go-redis/redis/v8 dependency - Migrate internal/storage/migrate.go to redis/go-redis/v9 - Separate test-only deps (miniredis, testify) into own block Results: - Zero direct Redis calls in cmd/ - 60% fewer lines in TUI services - Cleaner dependency structure
264 lines
6.8 KiB
Go
264 lines
6.8 KiB
Go
package storage
|
|
|
|
import (
|
|
"context"
|
|
"encoding/json"
|
|
"fmt"
|
|
"log"
|
|
"strconv"
|
|
"strings"
|
|
"time"
|
|
|
|
"github.com/redis/go-redis/v9"
|
|
)
|
|
|
|
// Migrator handles migration from Redis to SQLite
|
|
type Migrator struct {
|
|
redisClient *redis.Client
|
|
sqliteDB *DB
|
|
}
|
|
|
|
// NewMigrator creates a new migrator for Redis to SQLite migration.
|
|
func NewMigrator(redisAddr, sqlitePath string) (*Migrator, error) {
|
|
// Connect to Redis
|
|
rdb := redis.NewClient(&redis.Options{
|
|
Addr: redisAddr,
|
|
})
|
|
|
|
// Connect to SQLite
|
|
db, err := NewDBFromPath(sqlitePath)
|
|
if err != nil {
|
|
return nil, fmt.Errorf("failed to connect to SQLite: %w", err)
|
|
}
|
|
|
|
return &Migrator{
|
|
redisClient: rdb,
|
|
sqliteDB: db,
|
|
}, nil
|
|
}
|
|
|
|
// Close closes both Redis and SQLite connections.
|
|
func (m *Migrator) Close() error {
|
|
if err := m.sqliteDB.Close(); err != nil {
|
|
return err
|
|
}
|
|
return m.redisClient.Close()
|
|
}
|
|
|
|
// MigrateJobs migrates job data from Redis to SQLite
|
|
func (m *Migrator) MigrateJobs(ctx context.Context) error {
|
|
log.Println("Starting job migration from Redis to SQLite...")
|
|
|
|
// Get all job keys from Redis
|
|
jobKeys, err := m.redisClient.Keys(ctx, "job:*").Result()
|
|
if err != nil {
|
|
return fmt.Errorf("failed to get job keys from Redis: %w", err)
|
|
}
|
|
|
|
for _, jobKey := range jobKeys {
|
|
jobData, err := m.redisClient.HGetAll(ctx, jobKey).Result()
|
|
if err != nil {
|
|
log.Printf("Failed to get job data for %s: %v", jobKey, err)
|
|
continue
|
|
}
|
|
|
|
// Parse job data
|
|
job := &Job{
|
|
ID: jobKey[4:], // Remove "job:" prefix
|
|
JobName: jobData["job_name"],
|
|
Args: jobData["args"],
|
|
Status: jobData["status"],
|
|
Priority: parsePriority(jobData["priority"]),
|
|
WorkerID: jobData["worker_id"],
|
|
Error: jobData["error"],
|
|
}
|
|
|
|
// Parse timestamps
|
|
if createdAtStr := jobData["created_at"]; createdAtStr != "" {
|
|
if ts, err := time.Parse(time.RFC3339, createdAtStr); err == nil {
|
|
job.CreatedAt = ts
|
|
}
|
|
}
|
|
|
|
if startedAtStr := jobData["started_at"]; startedAtStr != "" {
|
|
if ts, err := time.Parse(time.RFC3339, startedAtStr); err == nil {
|
|
job.StartedAt = &ts
|
|
}
|
|
}
|
|
|
|
if endedAtStr := jobData["ended_at"]; endedAtStr != "" {
|
|
if ts, err := time.Parse(time.RFC3339, endedAtStr); err == nil {
|
|
job.EndedAt = &ts
|
|
}
|
|
}
|
|
|
|
// Parse JSON fields
|
|
if datasetsStr := jobData["datasets"]; datasetsStr != "" {
|
|
_ = json.Unmarshal([]byte(datasetsStr), &job.Datasets)
|
|
}
|
|
|
|
if metadataStr := jobData["metadata"]; metadataStr != "" {
|
|
_ = json.Unmarshal([]byte(metadataStr), &job.Metadata)
|
|
}
|
|
|
|
// Insert into SQLite
|
|
if err := m.sqliteDB.CreateJob(job); err != nil {
|
|
log.Printf("Failed to create job %s in SQLite: %v", job.ID, err)
|
|
continue
|
|
}
|
|
|
|
log.Printf("Migrated job: %s", job.ID)
|
|
}
|
|
|
|
log.Printf("Migrated %d jobs from Redis to SQLite", len(jobKeys))
|
|
return nil
|
|
}
|
|
|
|
// MigrateMetrics migrates metrics from Redis to SQLite
|
|
func (m *Migrator) MigrateMetrics(ctx context.Context) error {
|
|
log.Println("Starting metrics migration from Redis to SQLite...")
|
|
|
|
// Get all metric keys from Redis
|
|
metricKeys, err := m.redisClient.Keys(ctx, "metrics:*").Result()
|
|
if err != nil {
|
|
return fmt.Errorf("failed to get metric keys from Redis: %w", err)
|
|
}
|
|
|
|
for _, metricKey := range metricKeys {
|
|
metricData, err := m.redisClient.HGetAll(ctx, metricKey).Result()
|
|
if err != nil {
|
|
log.Printf("Failed to get metric data for %s: %v", metricKey, err)
|
|
continue
|
|
}
|
|
|
|
// Parse metric key format: metrics:job:job_id or metrics:system
|
|
parts := parseMetricKey(metricKey)
|
|
if len(parts) < 2 {
|
|
continue
|
|
}
|
|
|
|
metricType := parts[1] // "job" or "system"
|
|
|
|
for name, value := range metricData {
|
|
if metricType == "job" && len(parts) == 3 {
|
|
// Job metric
|
|
jobID := parts[2]
|
|
if err := m.sqliteDB.RecordJobMetric(jobID, name, value); err != nil {
|
|
log.Printf("Failed to record job metric %s for job %s: %v", name, jobID, err)
|
|
}
|
|
} else if metricType == "system" {
|
|
// System metric
|
|
if err := m.sqliteDB.RecordSystemMetric(name, value); err != nil {
|
|
log.Printf("Failed to record system metric %s: %v", name, err)
|
|
}
|
|
}
|
|
}
|
|
}
|
|
|
|
log.Printf("Migrated %d metric keys from Redis to SQLite", len(metricKeys))
|
|
return nil
|
|
}
|
|
|
|
// MigrateWorkers migrates worker data from Redis to SQLite
|
|
func (m *Migrator) MigrateWorkers(ctx context.Context) error {
|
|
log.Println("Starting worker migration from Redis to SQLite...")
|
|
|
|
// Get all worker keys from Redis
|
|
workerKeys, err := m.redisClient.Keys(ctx, "worker:*").Result()
|
|
if err != nil {
|
|
return fmt.Errorf("failed to get worker keys from Redis: %w", err)
|
|
}
|
|
|
|
for _, workerKey := range workerKeys {
|
|
workerData, err := m.redisClient.HGetAll(ctx, workerKey).Result()
|
|
if err != nil {
|
|
log.Printf("Failed to get worker data for %s: %v", workerKey, err)
|
|
continue
|
|
}
|
|
|
|
worker := &Worker{
|
|
ID: workerKey[8:], // Remove "worker:" prefix
|
|
Hostname: workerData["hostname"],
|
|
Status: workerData["status"],
|
|
CurrentJobs: parseInt(workerData["current_jobs"]),
|
|
MaxJobs: parseInt(workerData["max_jobs"]),
|
|
}
|
|
|
|
// Parse heartbeat
|
|
if heartbeatStr := workerData["last_heartbeat"]; heartbeatStr != "" {
|
|
if ts, err := time.Parse(time.RFC3339, heartbeatStr); err == nil {
|
|
worker.LastHeartbeat = ts
|
|
}
|
|
}
|
|
|
|
// Parse metadata
|
|
if metadataStr := workerData["metadata"]; metadataStr != "" {
|
|
_ = json.Unmarshal([]byte(metadataStr), &worker.Metadata)
|
|
}
|
|
|
|
// Insert into SQLite
|
|
if err := m.sqliteDB.RegisterWorker(worker); err != nil {
|
|
log.Printf("Failed to register worker %s in SQLite: %v", worker.ID, err)
|
|
continue
|
|
}
|
|
|
|
log.Printf("Migrated worker: %s", worker.ID)
|
|
}
|
|
|
|
log.Printf("Migrated %d workers from Redis to SQLite", len(workerKeys))
|
|
return nil
|
|
}
|
|
|
|
// MigrateAll performs complete migration from Redis to SQLite
|
|
func (m *Migrator) MigrateAll(ctx context.Context) error {
|
|
log.Println("Starting complete migration from Redis to SQLite...")
|
|
|
|
// Test connections
|
|
if err := m.redisClient.Ping(ctx).Err(); err != nil {
|
|
return fmt.Errorf("failed to connect to Redis: %w", err)
|
|
}
|
|
|
|
// Run migrations in order
|
|
if err := m.MigrateJobs(ctx); err != nil {
|
|
return fmt.Errorf("job migration failed: %w", err)
|
|
}
|
|
|
|
if err := m.MigrateWorkers(ctx); err != nil {
|
|
return fmt.Errorf("worker migration failed: %w", err)
|
|
}
|
|
|
|
if err := m.MigrateMetrics(ctx); err != nil {
|
|
return fmt.Errorf("metrics migration failed: %w", err)
|
|
}
|
|
|
|
log.Println("Migration completed successfully!")
|
|
return nil
|
|
}
|
|
|
|
func parseMetricKey(key string) []string {
|
|
// Simple split - adjust based on your Redis key format
|
|
parts := strings.Split(key, ":")
|
|
return parts
|
|
}
|
|
|
|
// parsePriority parses priority string to int64
|
|
func parsePriority(s string) int64 {
|
|
if s == "" {
|
|
return 0
|
|
}
|
|
if val, err := strconv.ParseInt(s, 10, 64); err == nil {
|
|
return val
|
|
}
|
|
return 0
|
|
}
|
|
|
|
// parseInt parses int string to int
|
|
func parseInt(s string) int {
|
|
if s == "" {
|
|
return 0
|
|
}
|
|
if val, err := strconv.Atoi(s); err == nil {
|
|
return val
|
|
}
|
|
return 0
|
|
}
|