fetch_ml/native/rust/queue_index/src/index.rs
Jeremie Fraeys 7efefa1933
feat(native): implement Rust native layer as a test
- queue_index: mmap-based priority queue with safe storage wrapper
- dataset_hash: BLAKE3 parallel hashing with rayon
- common: FFI utilities with panic recovery
- Minimal deps: ~20 total (rayon, blake3, memmap2, walkdir, chrono)
- Drop crossbeam, prometheus - use stdlib + manual metrics
- Makefile: cargo build targets, help text updated
- Forgejo CI: clippy, tests, miri, cargo-deny
- C FFI compatible with existing Go bindings
2026-03-14 17:45:58 -04:00

171 lines
4.5 KiB
Rust

//! Priority queue index implementation
use crate::storage::SharedStorage;
use crate::task::Task;
use std::collections::BinaryHeap;
use std::io;
use std::path::Path;
/// Task with ordering for priority queue
#[derive(Clone, Eq, PartialEq)]
struct QueuedTask {
priority: i64,
created_at: i64,
task: Task,
}
impl Ord for QueuedTask {
fn cmp(&self, other: &Self) -> std::cmp::Ordering {
// Higher priority first, then older tasks first
self.priority
.cmp(&other.priority)
.then_with(|| other.created_at.cmp(&self.created_at))
.reverse()
}
}
impl PartialOrd for QueuedTask {
fn partial_cmp(&self, other: &Self) -> Option<std::cmp::Ordering> {
Some(self.cmp(other))
}
}
impl QueuedTask {
fn from_task(task: Task) -> Self {
Self {
priority: task.priority,
created_at: task.created_at,
task,
}
}
}
/// Main queue index implementation
pub struct QueueIndexImpl {
storage: SharedStorage,
heap: BinaryHeap<QueuedTask>,
}
impl QueueIndexImpl {
/// Open or create a queue index at the given path
pub fn open<P: AsRef<Path>>(path: P) -> io::Result<Self> {
let storage = SharedStorage::open(path)?;
// Load existing tasks from storage
let heap = BinaryHeap::new();
let mut index = Self { storage, heap };
index.load_tasks()?;
Ok(index)
}
/// Add tasks to the index
pub fn add_tasks(&mut self, tasks: &[Task]) -> io::Result<usize> {
let mut added = 0;
for task in tasks {
if self.task_exists(&task.id)? {
continue;
}
let queued = QueuedTask::from_task(task.clone());
self.heap.push(queued);
added += 1;
}
// Update storage header
{
let mut storage = self.storage.write();
storage.header_mut().entry_count += added as u64;
storage.flush()?;
}
Ok(added)
}
/// Get the next batch of ready tasks
pub fn get_next_batch(&mut self, max_count: usize) -> io::Result<Vec<Task>> {
let mut tasks = Vec::with_capacity(max_count);
let mut to_requeue = Vec::new();
while tasks.len() < max_count {
match self.heap.pop() {
Some(queued) => {
if queued.task.is_ready() {
tasks.push(queued.task);
} else {
// Not ready yet, requeue
to_requeue.push(queued);
}
}
None => break,
}
}
// Requeue tasks that weren't ready
for queued in to_requeue {
self.heap.push(queued);
}
Ok(tasks)
}
/// Get count of tasks with given status
pub fn get_task_count(&self, status: &str) -> usize {
// For now, return heap size as approximation
// Full implementation would scan storage
self.heap.len()
}
fn task_exists(&self, id: &str) -> io::Result<bool> {
// Check if task already exists
// Full implementation would check storage
Ok(false)
}
fn load_tasks(&mut self) -> io::Result<()> {
// Load tasks from storage into heap
// Full implementation would deserialize from storage
Ok(())
}
}
#[cfg(test)]
mod tests {
use super::*;
use tempfile::TempDir;
#[test]
fn test_add_and_get_tasks() {
let temp = TempDir::new().unwrap();
let mut index = QueueIndexImpl::open(temp.path()).unwrap();
let tasks = vec![
Task::new("task-1", "job-a"),
Task::new("task-2", "job-b"),
];
let added = index.add_tasks(&tasks).unwrap();
assert_eq!(added, 2);
let batch = index.get_next_batch(10).unwrap();
assert_eq!(batch.len(), 2);
}
#[test]
fn test_priority_ordering() {
let mut heap = BinaryHeap::new();
let task1 = Task::new("task-1", "job-a");
let mut task2 = Task::new("task-2", "job-b");
task2.priority = 100; // Higher priority
heap.push(QueuedTask::from_task(task1));
heap.push(QueuedTask::from_task(task2));
// Higher priority should come first
let first = heap.pop().unwrap();
assert_eq!(first.task.id, "task-2");
}
}