package storage import ( "context" "encoding/json" "fmt" "log" "strconv" "strings" "time" "github.com/redis/go-redis/v9" ) // Migrator handles migration from Redis to SQLite type Migrator struct { redisClient *redis.Client sqliteDB *DB } // NewMigrator creates a new migrator for Redis to SQLite migration. func NewMigrator(redisAddr, sqlitePath string) (*Migrator, error) { // Connect to Redis rdb := redis.NewClient(&redis.Options{ Addr: redisAddr, }) // Connect to SQLite db, err := NewDBFromPath(sqlitePath) if err != nil { return nil, fmt.Errorf("failed to connect to SQLite: %w", err) } return &Migrator{ redisClient: rdb, sqliteDB: db, }, nil } // Close closes both Redis and SQLite connections. func (m *Migrator) Close() error { if err := m.sqliteDB.Close(); err != nil { return err } return m.redisClient.Close() } // MigrateJobs migrates job data from Redis to SQLite func (m *Migrator) MigrateJobs(ctx context.Context) error { log.Println("Starting job migration from Redis to SQLite...") // Get all job keys from Redis jobKeys, err := m.redisClient.Keys(ctx, "job:*").Result() if err != nil { return fmt.Errorf("failed to get job keys from Redis: %w", err) } for _, jobKey := range jobKeys { jobData, err := m.redisClient.HGetAll(ctx, jobKey).Result() if err != nil { log.Printf("Failed to get job data for %s: %v", jobKey, err) continue } // Parse job data job := &Job{ ID: jobKey[4:], // Remove "job:" prefix JobName: jobData["job_name"], Args: jobData["args"], Status: jobData["status"], Priority: parsePriority(jobData["priority"]), WorkerID: jobData["worker_id"], Error: jobData["error"], } // Parse timestamps if createdAtStr := jobData["created_at"]; createdAtStr != "" { if ts, err := time.Parse(time.RFC3339, createdAtStr); err == nil { job.CreatedAt = ts } } if startedAtStr := jobData["started_at"]; startedAtStr != "" { if ts, err := time.Parse(time.RFC3339, startedAtStr); err == nil { job.StartedAt = &ts } } if endedAtStr := jobData["ended_at"]; endedAtStr != "" { if ts, err := time.Parse(time.RFC3339, endedAtStr); err == nil { job.EndedAt = &ts } } // Parse JSON fields if datasetsStr := jobData["datasets"]; datasetsStr != "" { _ = json.Unmarshal([]byte(datasetsStr), &job.Datasets) } if metadataStr := jobData["metadata"]; metadataStr != "" { _ = json.Unmarshal([]byte(metadataStr), &job.Metadata) } // Insert into SQLite if err := m.sqliteDB.CreateJob(job); err != nil { log.Printf("Failed to create job %s in SQLite: %v", job.ID, err) continue } log.Printf("Migrated job: %s", job.ID) } log.Printf("Migrated %d jobs from Redis to SQLite", len(jobKeys)) return nil } // MigrateMetrics migrates metrics from Redis to SQLite func (m *Migrator) MigrateMetrics(ctx context.Context) error { log.Println("Starting metrics migration from Redis to SQLite...") // Get all metric keys from Redis metricKeys, err := m.redisClient.Keys(ctx, "metrics:*").Result() if err != nil { return fmt.Errorf("failed to get metric keys from Redis: %w", err) } for _, metricKey := range metricKeys { metricData, err := m.redisClient.HGetAll(ctx, metricKey).Result() if err != nil { log.Printf("Failed to get metric data for %s: %v", metricKey, err) continue } // Parse metric key format: metrics:job:job_id or metrics:system parts := parseMetricKey(metricKey) if len(parts) < 2 { continue } metricType := parts[1] // "job" or "system" for name, value := range metricData { if metricType == "job" && len(parts) == 3 { // Job metric jobID := parts[2] if err := m.sqliteDB.RecordJobMetric(jobID, name, value); err != nil { log.Printf("Failed to record job metric %s for job %s: %v", name, jobID, err) } } else if metricType == "system" { // System metric if err := m.sqliteDB.RecordSystemMetric(name, value); err != nil { log.Printf("Failed to record system metric %s: %v", name, err) } } } } log.Printf("Migrated %d metric keys from Redis to SQLite", len(metricKeys)) return nil } // MigrateWorkers migrates worker data from Redis to SQLite func (m *Migrator) MigrateWorkers(ctx context.Context) error { log.Println("Starting worker migration from Redis to SQLite...") // Get all worker keys from Redis workerKeys, err := m.redisClient.Keys(ctx, "worker:*").Result() if err != nil { return fmt.Errorf("failed to get worker keys from Redis: %w", err) } for _, workerKey := range workerKeys { workerData, err := m.redisClient.HGetAll(ctx, workerKey).Result() if err != nil { log.Printf("Failed to get worker data for %s: %v", workerKey, err) continue } worker := &Worker{ ID: workerKey[8:], // Remove "worker:" prefix Hostname: workerData["hostname"], Status: workerData["status"], CurrentJobs: parseInt(workerData["current_jobs"]), MaxJobs: parseInt(workerData["max_jobs"]), } // Parse heartbeat if heartbeatStr := workerData["last_heartbeat"]; heartbeatStr != "" { if ts, err := time.Parse(time.RFC3339, heartbeatStr); err == nil { worker.LastHeartbeat = ts } } // Parse metadata if metadataStr := workerData["metadata"]; metadataStr != "" { _ = json.Unmarshal([]byte(metadataStr), &worker.Metadata) } // Insert into SQLite if err := m.sqliteDB.RegisterWorker(worker); err != nil { log.Printf("Failed to register worker %s in SQLite: %v", worker.ID, err) continue } log.Printf("Migrated worker: %s", worker.ID) } log.Printf("Migrated %d workers from Redis to SQLite", len(workerKeys)) return nil } // MigrateAll performs complete migration from Redis to SQLite func (m *Migrator) MigrateAll(ctx context.Context) error { log.Println("Starting complete migration from Redis to SQLite...") // Test connections if err := m.redisClient.Ping(ctx).Err(); err != nil { return fmt.Errorf("failed to connect to Redis: %w", err) } // Run migrations in order if err := m.MigrateJobs(ctx); err != nil { return fmt.Errorf("job migration failed: %w", err) } if err := m.MigrateWorkers(ctx); err != nil { return fmt.Errorf("worker migration failed: %w", err) } if err := m.MigrateMetrics(ctx); err != nil { return fmt.Errorf("metrics migration failed: %w", err) } log.Println("Migration completed successfully!") return nil } func parseMetricKey(key string) []string { // Simple split - adjust based on your Redis key format parts := strings.Split(key, ":") return parts } // parsePriority parses priority string to int64 func parsePriority(s string) int64 { if s == "" { return 0 } if val, err := strconv.ParseInt(s, 10, 64); err == nil { return val } return 0 } // parseInt parses int string to int func parseInt(s string) int { if s == "" { return 0 } if val, err := strconv.Atoi(s); err == nil { return val } return 0 }