Phase 1: Event Sourcing - Add TaskEvent types (queued, started, completed, failed, etc.) - Create EventStore with Redis Streams (append-only) - Support event querying by task ID and time range Phase 3: Diagnosable Failures - Enhance TaskExecutionError with Context map, Timestamp, Recoverable flag - Update container.go to populate error context (image, GPU, duration) - Add WithContext helper for building error context - Create cmd/errors CLI for querying task errors Phase 4: Testable Security - Add security fields to PodmanConfig (Privileged, Network, ReadOnlyMounts) - Create ValidateSecurityPolicy() with ErrSecurityViolation - Add security contract tests (privileged rejection, host network rejection) - Tests serve as executable security documentation Phase 7: Reproducible Builds - Add BuildHash and BuildTime ldflags to Makefile - Create verify-build target for reproducibility testing - Add -version and -verify flags to api-server All tests pass: - go test ./internal/errtypes/... - go test ./internal/container/... -run Security - go test ./internal/queue/... - go build ./cmd/api-server/...
194 lines
4.8 KiB
Go
194 lines
4.8 KiB
Go
package queue
|
|
|
|
import (
|
|
"context"
|
|
"encoding/json"
|
|
"fmt"
|
|
"strconv"
|
|
"time"
|
|
|
|
"github.com/jfraeys/fetch_ml/internal/domain"
|
|
"github.com/redis/go-redis/v9"
|
|
)
|
|
|
|
// EventStore provides append-only event storage using Redis Streams.
|
|
// Events are stored chronologically and can be queried for audit trails.
|
|
type EventStore struct {
|
|
client *redis.Client
|
|
ctx context.Context
|
|
retentionDays int
|
|
maxStreamLen int64
|
|
}
|
|
|
|
// EventStoreConfig holds configuration for the event store.
|
|
type EventStoreConfig struct {
|
|
RedisAddr string
|
|
RedisPassword string
|
|
RedisDB int
|
|
RetentionDays int // How long to keep events (default: 7)
|
|
MaxStreamLen int64 // Max events per task stream (default: 1000)
|
|
}
|
|
|
|
// NewEventStore creates a new event store instance.
|
|
func NewEventStore(cfg EventStoreConfig) (*EventStore, error) {
|
|
retentionDays := cfg.RetentionDays
|
|
if retentionDays == 0 {
|
|
retentionDays = 7
|
|
}
|
|
|
|
maxStreamLen := cfg.MaxStreamLen
|
|
if maxStreamLen == 0 {
|
|
maxStreamLen = 1000
|
|
}
|
|
|
|
opts := &redis.Options{
|
|
Addr: cfg.RedisAddr,
|
|
Password: cfg.RedisPassword,
|
|
DB: cfg.RedisDB,
|
|
PoolSize: 50,
|
|
}
|
|
|
|
client := redis.NewClient(opts)
|
|
ctx := context.Background()
|
|
|
|
// Test connection
|
|
if err := client.Ping(ctx).Err(); err != nil {
|
|
return nil, fmt.Errorf("failed to connect to redis: %w", err)
|
|
}
|
|
|
|
return &EventStore{
|
|
client: client,
|
|
ctx: ctx,
|
|
retentionDays: retentionDays,
|
|
maxStreamLen: maxStreamLen,
|
|
}, nil
|
|
}
|
|
|
|
// Close closes the event store.
|
|
func (es *EventStore) Close() error {
|
|
return es.client.Close()
|
|
}
|
|
|
|
// RecordEvent records a task event to the append-only stream.
|
|
func (es *EventStore) RecordEvent(event domain.TaskEvent) error {
|
|
streamKey := fmt.Sprintf("task_events:%s", event.TaskID)
|
|
|
|
data, err := json.Marshal(event.Data)
|
|
if err != nil {
|
|
return fmt.Errorf("failed to marshal event data: %w", err)
|
|
}
|
|
|
|
values := map[string]interface{}{
|
|
"type": event.EventType,
|
|
"who": event.Who,
|
|
"ts": event.Timestamp.Unix(),
|
|
}
|
|
|
|
if len(data) > 0 {
|
|
values["data"] = string(data)
|
|
}
|
|
|
|
// Add to stream with approximate max length trimming
|
|
_, err = es.client.XAdd(es.ctx, &redis.XAddArgs{
|
|
Stream: streamKey,
|
|
MaxLen: es.maxStreamLen,
|
|
Approx: true, // Allow approximate trimming for performance
|
|
Values: values,
|
|
}).Result()
|
|
|
|
if err != nil {
|
|
return fmt.Errorf("failed to record event: %w", err)
|
|
}
|
|
|
|
// Set expiration on the stream
|
|
es.client.Expire(es.ctx, streamKey, time.Duration(es.retentionDays)*24*time.Hour)
|
|
|
|
return nil
|
|
}
|
|
|
|
// GetEvents retrieves all events for a task, ordered chronologically.
|
|
func (es *EventStore) GetEvents(taskID string) ([]domain.TaskEvent, error) {
|
|
streamKey := fmt.Sprintf("task_events:%s", taskID)
|
|
|
|
// Read all events from the stream
|
|
messages, err := es.client.XRange(es.ctx, streamKey, "-", "+").Result()
|
|
if err != nil {
|
|
return nil, fmt.Errorf("failed to get events: %w", err)
|
|
}
|
|
|
|
var events []domain.TaskEvent
|
|
for _, msg := range messages {
|
|
event, err := es.parseEvent(taskID, msg)
|
|
if err != nil {
|
|
continue // Skip malformed events
|
|
}
|
|
events = append(events, event)
|
|
}
|
|
|
|
return events, nil
|
|
}
|
|
|
|
// GetEventsSince retrieves events for a task since a specific time.
|
|
func (es *EventStore) GetEventsSince(taskID string, since time.Time) ([]domain.TaskEvent, error) {
|
|
streamKey := fmt.Sprintf("task_events:%s", taskID)
|
|
|
|
// Use XRANGEBYTIME equivalent - scan from timestamp
|
|
start := fmt.Sprintf("%d-0", since.Unix()*1000) // Redis stream IDs are ms-based
|
|
|
|
messages, err := es.client.XRange(es.ctx, streamKey, start, "+").Result()
|
|
if err != nil {
|
|
return nil, fmt.Errorf("failed to get events: %w", err)
|
|
}
|
|
|
|
var events []domain.TaskEvent
|
|
for _, msg := range messages {
|
|
event, err := es.parseEvent(taskID, msg)
|
|
if err != nil {
|
|
continue
|
|
}
|
|
// Filter by actual timestamp
|
|
if event.Timestamp.After(since) {
|
|
events = append(events, event)
|
|
}
|
|
}
|
|
|
|
return events, nil
|
|
}
|
|
|
|
// parseEvent converts a Redis stream message to a TaskEvent.
|
|
func (es *EventStore) parseEvent(taskID string, msg redis.XMessage) (domain.TaskEvent, error) {
|
|
var event domain.TaskEvent
|
|
event.TaskID = taskID
|
|
|
|
// Parse event type
|
|
if v, ok := msg.Values["type"]; ok {
|
|
event.EventType = domain.TaskEventType(v.(string))
|
|
}
|
|
|
|
// Parse who
|
|
if v, ok := msg.Values["who"]; ok {
|
|
event.Who = v.(string)
|
|
}
|
|
|
|
// Parse timestamp
|
|
if v, ok := msg.Values["ts"]; ok {
|
|
ts, err := strconv.ParseInt(v.(string), 10, 64)
|
|
if err == nil {
|
|
event.Timestamp = time.Unix(ts, 0).UTC()
|
|
}
|
|
}
|
|
|
|
// Parse data
|
|
if v, ok := msg.Values["data"]; ok {
|
|
event.Data = json.RawMessage(v.(string))
|
|
}
|
|
|
|
return event, nil
|
|
}
|
|
|
|
// DeleteOldEvents manually deletes events older than retention period.
|
|
// This is normally handled by Redis TTL, but can be called for cleanup.
|
|
func (es *EventStore) DeleteOldEvents(taskID string) error {
|
|
streamKey := fmt.Sprintf("task_events:%s", taskID)
|
|
return es.client.Del(es.ctx, streamKey).Err()
|
|
}
|