fetch_ml/internal/domain/events.go
Jeremie Fraeys 6866ba9366
refactor(queue): integrate scheduler backend and storage improvements
Update queue and storage systems for scheduler integration:
- Queue backend with scheduler coordination
- Filesystem queue with batch operations
- Deduplication with tenant-aware keys
- Storage layer with audit logging hooks
- Domain models (Task, Events, Errors) with scheduler fields
- Database layer with tenant isolation
- Dataset storage with integrity checks
2026-02-26 12:06:46 -05:00

86 lines
2.6 KiB
Go

package domain
import (
"encoding/json"
"time"
)
// TaskEventType represents the type of task event.
type TaskEventType string
const (
// TaskEventQueued is fired when a task is added to the queue.
TaskEventQueued TaskEventType = "queued"
// TaskEventStarted is fired when a task begins execution.
TaskEventStarted TaskEventType = "started"
// TaskEventCompleted is fired when a task finishes successfully.
TaskEventCompleted TaskEventType = "completed"
// TaskEventFailed is fired when a task fails.
TaskEventFailed TaskEventType = "failed"
// TaskEventCancelled is fired when a task is cancelled.
TaskEventCancelled TaskEventType = "cancelled"
// TaskEventRetrying is fired when a task is being retried.
TaskEventRetrying TaskEventType = "retrying"
// TaskEventCheckpointSaved is fired when a checkpoint is saved.
TaskEventCheckpointSaved TaskEventType = "checkpoint_saved"
// TaskEventGPUAssigned is fired when a GPU is assigned.
TaskEventGPUAssigned TaskEventType = "gpu_assigned"
)
// TaskEvent represents an event in a task's lifecycle.
// Events are stored in Redis Streams for append-only audit trails.
type TaskEvent struct {
Timestamp time.Time `json:"timestamp"`
TaskID string `json:"task_id"`
EventType TaskEventType `json:"event_type"`
Who string `json:"who"`
Data json.RawMessage `json:"data,omitempty"`
}
// EventDataStarted contains data for the "started" event.
type EventDataStarted struct {
WorkerID string `json:"worker_id"`
Image string `json:"image,omitempty"`
GPUDevices []string `json:"gpu_devices,omitempty"`
}
// EventDataFailed contains data for the "failed" event.
type EventDataFailed struct {
Error string `json:"error"`
Phase string `json:"phase"`
Recoverable bool `json:"recoverable"`
}
// EventDataGPUAssigned contains data for the "gpu_assigned" event.
type EventDataGPUAssigned struct {
GPUEnvVar string `json:"gpu_env_var,omitempty"`
GPUDevices []string `json:"gpu_devices"`
}
// NewTaskEvent creates a new task event with the current timestamp.
func NewTaskEvent(taskID string, eventType TaskEventType, who string) TaskEvent {
return TaskEvent{
TaskID: taskID,
EventType: eventType,
Timestamp: time.Now().UTC(),
Who: who,
}
}
// WithData adds data to the event.
func (e TaskEvent) WithData(data any) (TaskEvent, error) {
encoded, err := json.Marshal(data)
if err != nil {
return e, err
}
e.Data = encoded
return e, nil
}
// ParseData parses the event data into the provided type.
func (e TaskEvent) ParseData(out any) error {
if len(e.Data) == 0 {
return nil
}
return json.Unmarshal(e.Data, out)
}