fetch_ml/internal/queue/event_store.go
Jeremie Fraeys 7194826871
feat: implement research-grade maintainability phases 1,3,4,7
Phase 1: Event Sourcing
- Add TaskEvent types (queued, started, completed, failed, etc.)
- Create EventStore with Redis Streams (append-only)
- Support event querying by task ID and time range

Phase 3: Diagnosable Failures
- Enhance TaskExecutionError with Context map, Timestamp, Recoverable flag
- Update container.go to populate error context (image, GPU, duration)
- Add WithContext helper for building error context
- Create cmd/errors CLI for querying task errors

Phase 4: Testable Security
- Add security fields to PodmanConfig (Privileged, Network, ReadOnlyMounts)
- Create ValidateSecurityPolicy() with ErrSecurityViolation
- Add security contract tests (privileged rejection, host network rejection)
- Tests serve as executable security documentation

Phase 7: Reproducible Builds
- Add BuildHash and BuildTime ldflags to Makefile
- Create verify-build target for reproducibility testing
- Add -version and -verify flags to api-server

All tests pass:
- go test ./internal/errtypes/...
- go test ./internal/container/... -run Security
- go test ./internal/queue/...
- go build ./cmd/api-server/...
2026-02-18 15:27:50 -05:00

194 lines
4.8 KiB
Go

package queue
import (
"context"
"encoding/json"
"fmt"
"strconv"
"time"
"github.com/jfraeys/fetch_ml/internal/domain"
"github.com/redis/go-redis/v9"
)
// EventStore provides append-only event storage using Redis Streams.
// Events are stored chronologically and can be queried for audit trails.
type EventStore struct {
client *redis.Client
ctx context.Context
retentionDays int
maxStreamLen int64
}
// EventStoreConfig holds configuration for the event store.
type EventStoreConfig struct {
RedisAddr string
RedisPassword string
RedisDB int
RetentionDays int // How long to keep events (default: 7)
MaxStreamLen int64 // Max events per task stream (default: 1000)
}
// NewEventStore creates a new event store instance.
func NewEventStore(cfg EventStoreConfig) (*EventStore, error) {
retentionDays := cfg.RetentionDays
if retentionDays == 0 {
retentionDays = 7
}
maxStreamLen := cfg.MaxStreamLen
if maxStreamLen == 0 {
maxStreamLen = 1000
}
opts := &redis.Options{
Addr: cfg.RedisAddr,
Password: cfg.RedisPassword,
DB: cfg.RedisDB,
PoolSize: 50,
}
client := redis.NewClient(opts)
ctx := context.Background()
// Test connection
if err := client.Ping(ctx).Err(); err != nil {
return nil, fmt.Errorf("failed to connect to redis: %w", err)
}
return &EventStore{
client: client,
ctx: ctx,
retentionDays: retentionDays,
maxStreamLen: maxStreamLen,
}, nil
}
// Close closes the event store.
func (es *EventStore) Close() error {
return es.client.Close()
}
// RecordEvent records a task event to the append-only stream.
func (es *EventStore) RecordEvent(event domain.TaskEvent) error {
streamKey := fmt.Sprintf("task_events:%s", event.TaskID)
data, err := json.Marshal(event.Data)
if err != nil {
return fmt.Errorf("failed to marshal event data: %w", err)
}
values := map[string]interface{}{
"type": event.EventType,
"who": event.Who,
"ts": event.Timestamp.Unix(),
}
if len(data) > 0 {
values["data"] = string(data)
}
// Add to stream with approximate max length trimming
_, err = es.client.XAdd(es.ctx, &redis.XAddArgs{
Stream: streamKey,
MaxLen: es.maxStreamLen,
Approx: true, // Allow approximate trimming for performance
Values: values,
}).Result()
if err != nil {
return fmt.Errorf("failed to record event: %w", err)
}
// Set expiration on the stream
es.client.Expire(es.ctx, streamKey, time.Duration(es.retentionDays)*24*time.Hour)
return nil
}
// GetEvents retrieves all events for a task, ordered chronologically.
func (es *EventStore) GetEvents(taskID string) ([]domain.TaskEvent, error) {
streamKey := fmt.Sprintf("task_events:%s", taskID)
// Read all events from the stream
messages, err := es.client.XRange(es.ctx, streamKey, "-", "+").Result()
if err != nil {
return nil, fmt.Errorf("failed to get events: %w", err)
}
var events []domain.TaskEvent
for _, msg := range messages {
event, err := es.parseEvent(taskID, msg)
if err != nil {
continue // Skip malformed events
}
events = append(events, event)
}
return events, nil
}
// GetEventsSince retrieves events for a task since a specific time.
func (es *EventStore) GetEventsSince(taskID string, since time.Time) ([]domain.TaskEvent, error) {
streamKey := fmt.Sprintf("task_events:%s", taskID)
// Use XRANGEBYTIME equivalent - scan from timestamp
start := fmt.Sprintf("%d-0", since.Unix()*1000) // Redis stream IDs are ms-based
messages, err := es.client.XRange(es.ctx, streamKey, start, "+").Result()
if err != nil {
return nil, fmt.Errorf("failed to get events: %w", err)
}
var events []domain.TaskEvent
for _, msg := range messages {
event, err := es.parseEvent(taskID, msg)
if err != nil {
continue
}
// Filter by actual timestamp
if event.Timestamp.After(since) {
events = append(events, event)
}
}
return events, nil
}
// parseEvent converts a Redis stream message to a TaskEvent.
func (es *EventStore) parseEvent(taskID string, msg redis.XMessage) (domain.TaskEvent, error) {
var event domain.TaskEvent
event.TaskID = taskID
// Parse event type
if v, ok := msg.Values["type"]; ok {
event.EventType = domain.TaskEventType(v.(string))
}
// Parse who
if v, ok := msg.Values["who"]; ok {
event.Who = v.(string)
}
// Parse timestamp
if v, ok := msg.Values["ts"]; ok {
ts, err := strconv.ParseInt(v.(string), 10, 64)
if err == nil {
event.Timestamp = time.Unix(ts, 0).UTC()
}
}
// Parse data
if v, ok := msg.Values["data"]; ok {
event.Data = json.RawMessage(v.(string))
}
return event, nil
}
// DeleteOldEvents manually deletes events older than retention period.
// This is normally handled by Redis TTL, but can be called for cleanup.
func (es *EventStore) DeleteOldEvents(taskID string) error {
streamKey := fmt.Sprintf("task_events:%s", taskID)
return es.client.Del(es.ctx, streamKey).Err()
}