fetch_ml/internal/queue/dedup.go
Jeremie Fraeys 37aad7ae87
feat: add manifest signing and native hashing support
- Integrate RunManifest.Validate with existing Validator
- Add manifest Sign() and Verify() methods
- Add native C++ hashing libraries (dataset_hash, queue_index)
- Add native bridge for Go/C++ integration
- Add deduplication support in queue
2026-02-19 15:34:39 -05:00

78 lines
1.8 KiB
Go

package queue
import (
"fmt"
"sync"
"time"
)
// ErrAlreadyQueued is returned when a job with the same commit was recently queued
var ErrAlreadyQueued = fmt.Errorf("job already queued with this commit")
// CommitDedup tracks recently queued commits to prevent duplicate submissions
type CommitDedup struct {
mu sync.RWMutex
commits map[string]time.Time // key: "job_name:commit_id" -> queued_at
ttl time.Duration
}
// NewCommitDedup creates a new commit deduplication tracker
func NewCommitDedup(ttl time.Duration) *CommitDedup {
if ttl <= 0 {
ttl = 1 * time.Hour // Default 1 hour TTL
}
return &CommitDedup{
commits: make(map[string]time.Time),
ttl: ttl,
}
}
// IsDuplicate checks if a job+commit combination was recently queued
func (d *CommitDedup) IsDuplicate(jobName, commitID string) bool {
key := d.key(jobName, commitID)
d.mu.RLock()
defer d.mu.RUnlock()
if t, ok := d.commits[key]; ok {
if time.Since(t) < d.ttl {
return true // Still within TTL, consider duplicate
}
}
return false
}
// MarkQueued records that a job+commit combination was just queued
func (d *CommitDedup) MarkQueued(jobName, commitID string) {
key := d.key(jobName, commitID)
d.mu.Lock()
defer d.mu.Unlock()
d.commits[key] = time.Now()
}
// Cleanup removes expired entries (call periodically, e.g., every 5 minutes)
func (d *CommitDedup) Cleanup() {
d.mu.Lock()
defer d.mu.Unlock()
now := time.Now()
for id, t := range d.commits {
if now.Sub(t) > d.ttl {
delete(d.commits, id)
}
}
}
// key generates a unique key for job+commit combination
func (d *CommitDedup) key(jobName, commitID string) string {
return jobName + ":" + commitID
}
// Size returns the number of tracked commits (for metrics/debugging)
func (d *CommitDedup) Size() int {
d.mu.RLock()
defer d.mu.RUnlock()
return len(d.commits)
}