//! Priority queue index implementation use crate::storage::SharedStorage; use crate::task::Task; use std::collections::BinaryHeap; use std::io; use std::path::Path; /// Task with ordering for priority queue #[derive(Clone, Eq, PartialEq)] struct QueuedTask { priority: i64, created_at: i64, task: Task, } impl Ord for QueuedTask { fn cmp(&self, other: &Self) -> std::cmp::Ordering { // BinaryHeap is a max-heap - Greater elements are popped first // We want: higher priority first, then older tasks first self.priority.cmp(&other.priority) // Higher priority = Greater .then_with(|| other.created_at.cmp(&self.created_at)) // Older (smaller created_at) = Greater } } impl PartialOrd for QueuedTask { fn partial_cmp(&self, other: &Self) -> Option { Some(self.cmp(other)) } } impl QueuedTask { fn from_task(task: Task) -> Self { Self { priority: task.priority, created_at: task.created_at, task, } } } /// Main queue index implementation pub struct QueueIndexImpl { storage: SharedStorage, heap: BinaryHeap, } impl QueueIndexImpl { /// Open or create a queue index at the given path pub fn open>(path: P) -> io::Result { let storage = SharedStorage::open(path)?; // Load existing tasks from storage let heap = BinaryHeap::new(); let mut index = Self { storage, heap }; index.load_tasks()?; Ok(index) } /// Add tasks to the index (idiomatic Rust version) pub fn add_tasks(&mut self, tasks: &[Task]) -> io::Result { // Use iterator methods instead of manual loop let new_tasks: Vec<_> = tasks .iter() .filter(|t| !self.exists(&t.id)) .cloned() .collect(); let added = new_tasks.len(); // Extend with iterator instead of loop self.heap.extend(new_tasks.into_iter().map(QueuedTask::from_task)); // Update storage header if added > 0 { let mut storage = self.storage.write(); storage.header_mut().entry_count += added as u64; storage.flush()?; } Ok(added) } /// Get the next batch of ready tasks pub fn get_next_batch(&mut self, max_count: usize) -> io::Result> { let mut tasks = Vec::with_capacity(max_count); let mut to_requeue = Vec::new(); while tasks.len() < max_count { match self.heap.pop() { Some(queued) => { if queued.task.is_ready() { tasks.push(queued.task); } else { // Not ready yet, requeue to_requeue.push(queued); } } None => break, } } // Requeue tasks that weren't ready for queued in to_requeue { self.heap.push(queued); } Ok(tasks) } /// Peek at the next ready task without removing it pub fn peek_next(&self) -> Option<&Task> { // Find first ready task without removing self.heap.iter().find(|q| q.task.is_ready()).map(|q| &q.task) } /// Get a task by ID (scans heap) pub fn get_task_by_id(&self, id: &str) -> Option<&Task> { self.heap.iter().find(|q| q.task.id == id).map(|q| &q.task) } /// Get all tasks pub fn get_all_tasks(&self) -> Vec { self.heap.iter().map(|q| q.task.clone()).collect() } /// Get count of tasks with given status pub fn get_task_count(&self, _status: &str) -> usize { // For now, return heap size as approximation // Full implementation would scan storage self.heap.len() } /// Remove tasks by ID, returns count removed pub fn remove_tasks(&mut self, ids: &[String]) -> usize { let initial_len = self.heap.len(); // Rebuild heap without the removed tasks let tasks: Vec = self.heap .drain() .filter(|q| !ids.contains(&q.task.id)) .map(|q| q.task) .collect(); self.heap.extend(tasks.into_iter().map(QueuedTask::from_task)); initial_len - self.heap.len() } /// Update existing tasks (by ID match) pub fn update_tasks(&mut self, updates: &[Task]) -> usize { let mut updated = 0; // BinaryHeap doesn't support iter_mut, so we drain and rebuild let mut tasks: Vec = self.heap.drain().map(|q| q.task).collect(); for update in updates { if let Some(existing) = tasks.iter_mut().find(|t| t.id == update.id) { *existing = update.clone(); updated += 1; } } self.heap.extend(tasks.into_iter().map(QueuedTask::from_task)); updated } /// Mark a task for retry pub fn retry_task(&mut self, id: &str, next_retry_at: i64, max_retries: u32) -> io::Result { // BinaryHeap doesn't support iter_mut, so we drain and rebuild let mut tasks: Vec = self.heap.drain().map(|q| q.task).collect(); let mut found = false; if let Some(task) = tasks.iter_mut().find(|t| t.id == id) { if task.retries >= max_retries { self.heap.extend(tasks.into_iter().map(QueuedTask::from_task)); return Ok(false); } task.status = "queued".to_string(); task.next_retry = next_retry_at; task.retries += 1; found = true; } self.heap.extend(tasks.into_iter().map(QueuedTask::from_task)); Ok(found) } /// Move a task to DLQ (mark as failed permanently) pub fn move_to_dlq(&mut self, id: &str, _reason: &str) -> io::Result { // BinaryHeap doesn't support iter_mut, so we drain and rebuild let mut tasks: Vec = self.heap.drain().map(|q| q.task).collect(); let mut found = false; if let Some(task) = tasks.iter_mut().find(|t| t.id == id) { task.status = "failed".to_string(); found = true; } self.heap.extend(tasks.into_iter().map(QueuedTask::from_task)); Ok(found) } /// Renew lease for a running task pub fn renew_lease(&mut self, id: &str, _worker_id: &str, _lease_expiry: i64) -> io::Result { // BinaryHeap doesn't support iter_mut, so we drain and rebuild let mut tasks: Vec = self.heap.drain().map(|q| q.task).collect(); let mut found = false; if let Some(task) = tasks.iter_mut().find(|t| t.id == id) { if task.status == "running" { found = true; } } self.heap.extend(tasks.into_iter().map(QueuedTask::from_task)); Ok(found) } /// Release lease for a task (mark as available) pub fn release_lease(&mut self, id: &str, _worker_id: &str) -> io::Result { // BinaryHeap doesn't support iter_mut, so we drain and rebuild let mut tasks: Vec = self.heap.drain().map(|q| q.task).collect(); let mut found = false; if let Some(task) = tasks.iter_mut().find(|t| t.id == id) { task.status = "queued".to_string(); found = true; } self.heap.extend(tasks.into_iter().map(QueuedTask::from_task)); Ok(found) } /// Rebuild index from storage (placeholder) pub fn rebuild_index(&mut self) -> io::Result<()> { // In full implementation, would scan storage and rebuild heap // For now, just reload from storage self.load_tasks() } /// Compact index storage (placeholder) pub fn compact_index(&mut self) -> io::Result<()> { // In full implementation, would defragment storage // For now, no-op since storage isn't fully implemented Ok(()) } fn exists(&self, _id: &str) -> bool { // Full implementation would check storage false } fn load_tasks(&mut self) -> io::Result<()> { // Load tasks from storage into heap // Full implementation would deserialize from storage Ok(()) } } #[cfg(test)] mod tests { use super::*; use tempfile::TempDir; #[test] fn test_add_and_get_tasks() { let temp = TempDir::new().unwrap(); let mut index = QueueIndexImpl::open(temp.path()).unwrap(); let tasks = vec![ Task::new("task-1", "job-a"), Task::new("task-2", "job-b"), ]; let added = index.add_tasks(&tasks).unwrap(); assert_eq!(added, 2); let batch = index.get_next_batch(10).unwrap(); assert_eq!(batch.len(), 2); } #[test] fn test_priority_ordering() { let mut heap = BinaryHeap::new(); let task1 = Task::new("task-1", "job-a"); let mut task2 = Task::new("task-2", "job-b"); task2.priority = 100; // Higher priority heap.push(QueuedTask::from_task(task1)); heap.push(QueuedTask::from_task(task2)); // Higher priority should come first let first = heap.pop().unwrap(); assert_eq!(first.task.id, "task-2"); } }