package queue import ( "context" "encoding/json" "fmt" "strconv" "time" "github.com/jfraeys/fetch_ml/internal/domain" "github.com/redis/go-redis/v9" ) // EventStore provides append-only event storage using Redis Streams. // Events are stored chronologically and can be queried for audit trails. type EventStore struct { client *redis.Client ctx context.Context retentionDays int maxStreamLen int64 } // EventStoreConfig holds configuration for the event store. type EventStoreConfig struct { RedisAddr string RedisPassword string RedisDB int RetentionDays int // How long to keep events (default: 7) MaxStreamLen int64 // Max events per task stream (default: 1000) } // NewEventStore creates a new event store instance. func NewEventStore(cfg EventStoreConfig) (*EventStore, error) { retentionDays := cfg.RetentionDays if retentionDays == 0 { retentionDays = 7 } maxStreamLen := cfg.MaxStreamLen if maxStreamLen == 0 { maxStreamLen = 1000 } opts := &redis.Options{ Addr: cfg.RedisAddr, Password: cfg.RedisPassword, DB: cfg.RedisDB, PoolSize: 50, } client := redis.NewClient(opts) ctx := context.Background() // Test connection if err := client.Ping(ctx).Err(); err != nil { return nil, fmt.Errorf("failed to connect to redis: %w", err) } return &EventStore{ client: client, ctx: ctx, retentionDays: retentionDays, maxStreamLen: maxStreamLen, }, nil } // Close closes the event store. func (es *EventStore) Close() error { return es.client.Close() } // RecordEvent records a task event to the append-only stream. func (es *EventStore) RecordEvent(event domain.TaskEvent) error { streamKey := fmt.Sprintf("task_events:%s", event.TaskID) data, err := json.Marshal(event.Data) if err != nil { return fmt.Errorf("failed to marshal event data: %w", err) } values := map[string]interface{}{ "type": event.EventType, "who": event.Who, "ts": event.Timestamp.Unix(), } if len(data) > 0 { values["data"] = string(data) } // Add to stream with approximate max length trimming _, err = es.client.XAdd(es.ctx, &redis.XAddArgs{ Stream: streamKey, MaxLen: es.maxStreamLen, Approx: true, // Allow approximate trimming for performance Values: values, }).Result() if err != nil { return fmt.Errorf("failed to record event: %w", err) } // Set expiration on the stream es.client.Expire(es.ctx, streamKey, time.Duration(es.retentionDays)*24*time.Hour) return nil } // GetEvents retrieves all events for a task, ordered chronologically. func (es *EventStore) GetEvents(taskID string) ([]domain.TaskEvent, error) { streamKey := fmt.Sprintf("task_events:%s", taskID) // Read all events from the stream messages, err := es.client.XRange(es.ctx, streamKey, "-", "+").Result() if err != nil { return nil, fmt.Errorf("failed to get events: %w", err) } var events []domain.TaskEvent for _, msg := range messages { event, err := es.parseEvent(taskID, msg) if err != nil { continue // Skip malformed events } events = append(events, event) } return events, nil } // GetEventsSince retrieves events for a task since a specific time. func (es *EventStore) GetEventsSince(taskID string, since time.Time) ([]domain.TaskEvent, error) { streamKey := fmt.Sprintf("task_events:%s", taskID) // Use XRANGEBYTIME equivalent - scan from timestamp start := fmt.Sprintf("%d-0", since.Unix()*1000) // Redis stream IDs are ms-based messages, err := es.client.XRange(es.ctx, streamKey, start, "+").Result() if err != nil { return nil, fmt.Errorf("failed to get events: %w", err) } var events []domain.TaskEvent for _, msg := range messages { event, err := es.parseEvent(taskID, msg) if err != nil { continue } // Filter by actual timestamp if event.Timestamp.After(since) { events = append(events, event) } } return events, nil } // parseEvent converts a Redis stream message to a TaskEvent. func (es *EventStore) parseEvent(taskID string, msg redis.XMessage) (domain.TaskEvent, error) { var event domain.TaskEvent event.TaskID = taskID // Parse event type if v, ok := msg.Values["type"]; ok { event.EventType = domain.TaskEventType(v.(string)) } // Parse who if v, ok := msg.Values["who"]; ok { event.Who = v.(string) } // Parse timestamp if v, ok := msg.Values["ts"]; ok { ts, err := strconv.ParseInt(v.(string), 10, 64) if err == nil { event.Timestamp = time.Unix(ts, 0).UTC() } } // Parse data if v, ok := msg.Values["data"]; ok { event.Data = json.RawMessage(v.(string)) } return event, nil } // DeleteOldEvents manually deletes events older than retention period. // This is normally handled by Redis TTL, but can be called for cleanup. func (es *EventStore) DeleteOldEvents(taskID string) error { streamKey := fmt.Sprintf("task_events:%s", taskID) return es.client.Del(es.ctx, streamKey).Err() }