fetch_ml/internal/queue/event_store.go
Jeremie Fraeys 23e5f3d1dc
refactor(api): internal refactoring for TUI and worker modules
- Refactor internal/worker and internal/queue packages
- Update cmd/tui for monitoring interface
- Update test configurations
2026-02-20 15:51:23 -05:00

194 lines
4.8 KiB
Go

package queue
import (
"context"
"encoding/json"
"fmt"
"strconv"
"time"
"github.com/jfraeys/fetch_ml/internal/domain"
"github.com/redis/go-redis/v9"
)
// EventStore provides append-only event storage using Redis Streams.
// Events are stored chronologically and can be queried for audit trails.
type EventStore struct {
client *redis.Client
ctx context.Context
retentionDays int
maxStreamLen int64
}
// EventStoreConfig holds configuration for the event store.
type EventStoreConfig struct {
RedisAddr string
RedisPassword string
RedisDB int
RetentionDays int // How long to keep events (default: 7)
MaxStreamLen int64 // Max events per task stream (default: 1000)
}
// NewEventStore creates a new event store instance.
func NewEventStore(cfg EventStoreConfig) (*EventStore, error) {
retentionDays := cfg.RetentionDays
if retentionDays == 0 {
retentionDays = 7
}
maxStreamLen := cfg.MaxStreamLen
if maxStreamLen == 0 {
maxStreamLen = 1000
}
opts := &redis.Options{
Addr: cfg.RedisAddr,
Password: cfg.RedisPassword,
DB: cfg.RedisDB,
PoolSize: 50,
}
client := redis.NewClient(opts)
ctx := context.Background()
// Test connection
if err := client.Ping(ctx).Err(); err != nil {
return nil, fmt.Errorf("failed to connect to redis: %w", err)
}
return &EventStore{
client: client,
ctx: ctx,
retentionDays: retentionDays,
maxStreamLen: maxStreamLen,
}, nil
}
// Close closes the event store.
func (es *EventStore) Close() error {
return es.client.Close()
}
// RecordEvent records a task event to the append-only stream.
func (es *EventStore) RecordEvent(event domain.TaskEvent) error {
streamKey := fmt.Sprintf("task_events:%s", event.TaskID)
data, err := json.Marshal(event.Data)
if err != nil {
return fmt.Errorf("failed to marshal event data: %w", err)
}
values := map[string]interface{}{
"type": event.EventType,
"who": event.Who,
"ts": event.Timestamp.Unix(),
}
if len(data) > 0 {
values["data"] = string(data)
}
// Add to stream with approximate max length trimming
_, err = es.client.XAdd(es.ctx, &redis.XAddArgs{
Stream: streamKey,
MaxLen: es.maxStreamLen,
Approx: true, // Allow approximate trimming for performance
Values: values,
}).Result()
if err != nil {
return fmt.Errorf("failed to record event: %w", err)
}
// Set expiration on the stream
es.client.Expire(es.ctx, streamKey, time.Duration(es.retentionDays)*24*time.Hour)
return nil
}
// GetEvents retrieves all events for a task, ordered chronologically.
func (es *EventStore) GetEvents(taskID string) ([]domain.TaskEvent, error) {
streamKey := fmt.Sprintf("task_events:%s", taskID)
// Read all events from the stream
messages, err := es.client.XRange(es.ctx, streamKey, "-", "+").Result()
if err != nil {
return nil, fmt.Errorf("failed to get events: %w", err)
}
var events []domain.TaskEvent
for _, msg := range messages {
event, err := es.parseEvent(taskID, msg)
if err != nil {
continue // Skip malformed events
}
events = append(events, event)
}
return events, nil
}
// GetEventsSince retrieves events for a task since a specific time.
func (es *EventStore) GetEventsSince(taskID string, since time.Time) ([]domain.TaskEvent, error) {
streamKey := fmt.Sprintf("task_events:%s", taskID)
// Use XRANGEBYTIME equivalent - scan from timestamp
start := fmt.Sprintf("%d-0", since.Unix()*1000) // Redis stream IDs are ms-based
messages, err := es.client.XRange(es.ctx, streamKey, start, "+").Result()
if err != nil {
return nil, fmt.Errorf("failed to get events: %w", err)
}
var events []domain.TaskEvent
for _, msg := range messages {
event, err := es.parseEvent(taskID, msg)
if err != nil {
continue
}
// Filter by actual timestamp
if event.Timestamp.After(since) {
events = append(events, event)
}
}
return events, nil
}
// parseEvent converts a Redis stream message to a TaskEvent.
func (es *EventStore) parseEvent(taskID string, msg redis.XMessage) (domain.TaskEvent, error) {
var event domain.TaskEvent
event.TaskID = taskID
// Parse event type
if v, ok := msg.Values["type"]; ok {
event.EventType = domain.TaskEventType(v.(string))
}
// Parse who
if v, ok := msg.Values["who"]; ok {
event.Who = v.(string)
}
// Parse timestamp
if v, ok := msg.Values["ts"]; ok {
ts, err := strconv.ParseInt(v.(string), 10, 64)
if err == nil {
event.Timestamp = time.Unix(ts, 0).UTC()
}
}
// Parse data
if v, ok := msg.Values["data"]; ok {
event.Data = json.RawMessage(v.(string))
}
return event, nil
}
// DeleteOldEvents manually deletes events older than retention period.
// This is normally handled by Redis TTL, but can be called for cleanup.
func (es *EventStore) DeleteOldEvents(taskID string) error {
streamKey := fmt.Sprintf("task_events:%s", taskID)
return es.client.Del(es.ctx, streamKey).Err()
}