From e5303a3052caba10a07fb1ac8421ab5aa8b6dced Mon Sep 17 00:00:00 2001 From: lalvarezt Date: Mon, 28 Jul 2025 13:47:35 +0200 Subject: [PATCH] refactor(frecency): drop ArcMutex and opt for unbounded batch instead, no need to always lock --- television/channels/channel.rs | 112 ++++++++++++++++++++++----------- 1 file changed, 75 insertions(+), 37 deletions(-) diff --git a/television/channels/channel.rs b/television/channels/channel.rs index be45f3e..7d2b69e 100644 --- a/television/channels/channel.rs +++ b/television/channels/channel.rs @@ -14,10 +14,11 @@ use std::collections::HashSet; use std::io::{BufRead, BufReader}; use std::process::Stdio; use std::sync::{ - Arc, Mutex, + Arc, atomic::{AtomicBool, Ordering}, }; use std::time::Duration; +use tokio::sync::mpsc; use tracing::{debug, trace}; const RELOAD_RENDERING_DELAY: Duration = Duration::from_millis(200); @@ -32,7 +33,9 @@ pub struct Channel { /// by delaying the rendering of a new frame. pub reloading: Arc, /// Track current dataset items as they're loaded for frecency filtering - current_dataset: Arc>>, + current_dataset: FxHashSet, + /// Receiver for batched dataset updates from the loading task + dataset_rx: Option>>, } impl Channel { @@ -47,23 +50,24 @@ impl Channel { crawl_handle: None, current_source_index, reloading: Arc::new(AtomicBool::new(false)), - current_dataset: Arc::new(Mutex::new(FxHashSet::default())), + current_dataset: FxHashSet::default(), + dataset_rx: None, } } pub fn load(&mut self) { // Clear the current dataset at the start of each load - if let Ok(mut dataset) = self.current_dataset.lock() { - dataset.clear(); - } + self.current_dataset.clear(); + + let (dataset_tx, dataset_rx) = mpsc::unbounded_channel(); + self.dataset_rx = Some(dataset_rx); let injector = self.matcher.injector(); - let current_dataset = self.current_dataset.clone(); let crawl_handle = tokio::spawn(load_candidates( self.prototype.source.clone(), self.current_source_index, injector, - current_dataset, + dataset_tx, )); self.crawl_handle = Some(crawl_handle); } @@ -103,27 +107,30 @@ impl Channel { self.matcher.find(pattern); } + /// Try to update the dataset from the loading task if available + fn try_update_dataset(&mut self) { + if let Some(rx) = &mut self.dataset_rx { + // Process all available batches (non-blocking) + while let Ok(batch) = rx.try_recv() { + // Extend current dataset with the new batch + self.current_dataset.extend(batch); + } + } + } + /// Filter recent items to only include those that exist in the current dataset fn filter_recent_items_by_current_dataset( &self, recent_items: &[String], ) -> Vec { - match self.current_dataset.lock() { - Ok(current_dataset) => { - let mut filtered = Vec::with_capacity(recent_items.len()); - filtered.extend( - recent_items - .iter() - .filter(|item| current_dataset.contains(*item)) - .cloned(), - ); - filtered - } - Err(_) => { - // If we can't lock, return empty to prevent inconsistent results - Vec::new() - } - } + let mut filtered = Vec::with_capacity(recent_items.len()); + filtered.extend( + recent_items + .iter() + .filter(|item| self.current_dataset.contains(*item)) + .cloned(), + ); + filtered } /// Fuzzy match against a list of recent items @@ -165,6 +172,9 @@ impl Channel { offset: u32, frecency: Option<&Frecency>, ) -> Vec { + // Try to update dataset from loading task + self.try_update_dataset(); + self.matcher.tick(); let results = if let Some(frecency_data) = frecency { @@ -315,15 +325,28 @@ impl Channel { } const DEFAULT_LINE_BUFFER_SIZE: usize = 512; +const BATCH_SIZE: usize = 100; + +/// Helper function to send a batch if it's not empty +fn send_batch_if_not_empty( + batch: &mut Vec, + dataset_tx: &mpsc::UnboundedSender>, +) { + if !batch.is_empty() { + let _ = dataset_tx.send(std::mem::take(batch)); + } +} #[allow(clippy::unused_async)] async fn load_candidates( source: SourceSpec, source_command_index: usize, injector: Injector, - current_dataset: Arc>>, + dataset_tx: mpsc::UnboundedSender>, ) { debug!("Loading candidates from command: {:?}", source.command); + let mut current_batch = Vec::with_capacity(BATCH_SIZE); + let mut child = shell_command( source.command.get_nth(source_command_index).raw(), source.command.interactive, @@ -368,9 +391,15 @@ async fn load_candidates( if !l.trim().is_empty() { let entry = l.to_string(); - // Track this item in our current dataset - if let Ok(mut dataset) = current_dataset.lock() { - dataset.insert(entry.clone()); + // Add to current batch + current_batch.push(entry.clone()); + + // Send batch if it reaches the batch size + if current_batch.len() >= BATCH_SIZE { + send_batch_if_not_empty( + &mut current_batch, + &dataset_tx, + ); } let () = injector.push(entry, |e, cols| { @@ -405,9 +434,15 @@ async fn load_candidates( for line in reader.lines() { let line = line.unwrap(); if !line.trim().is_empty() { - // Track this item in our current dataset - if let Ok(mut dataset) = current_dataset.lock() { - dataset.insert(line.clone()); + // Add to current batch + current_batch.push(line.clone()); + + // Send batch if it reaches the batch size + if current_batch.len() >= BATCH_SIZE { + send_batch_if_not_empty( + &mut current_batch, + &dataset_tx, + ); } let () = injector.push(line, |e, cols| { @@ -418,6 +453,9 @@ async fn load_candidates( } } let _ = child.wait(); + + // Send any remaining entries in the final batch + send_batch_if_not_empty(&mut current_batch, &dataset_tx); } #[cfg(test)] @@ -436,9 +474,9 @@ mod tests { let mut matcher = Matcher::::new(&Config::default()); let injector = matcher.injector(); - let current_dataset = Arc::new(Mutex::new(FxHashSet::default())); + let (dataset_tx, _dataset_rx) = mpsc::unbounded_channel(); - load_candidates(source_spec, 0, injector, current_dataset).await; + load_candidates(source_spec, 0, injector, dataset_tx).await; // Check if the matcher has the expected results matcher.find("test"); @@ -460,9 +498,9 @@ mod tests { let mut matcher = Matcher::::new(&Config::default()); let injector = matcher.injector(); - let current_dataset = Arc::new(Mutex::new(FxHashSet::default())); + let (dataset_tx, _dataset_rx) = mpsc::unbounded_channel(); - load_candidates(source_spec, 0, injector, current_dataset).await; + load_candidates(source_spec, 0, injector, dataset_tx).await; // Check if the matcher has the expected results matcher.find("test"); @@ -484,9 +522,9 @@ mod tests { let mut matcher = Matcher::::new(&Config::default()); let injector = matcher.injector(); - let current_dataset = Arc::new(Mutex::new(FxHashSet::default())); + let (dataset_tx, _dataset_rx) = mpsc::unbounded_channel(); - load_candidates(source_spec, 0, injector, current_dataset).await; + load_candidates(source_spec, 0, injector, dataset_tx).await; // Check if the matcher has the expected results matcher.find("test");