refactor(frecency): drop ArcMutex and opt for unbounded batch instead, no need to always lock

This commit is contained in:
lalvarezt 2025-07-28 13:47:35 +02:00
parent 26db5a95d0
commit e5303a3052

View File

@ -14,10 +14,11 @@ use std::collections::HashSet;
use std::io::{BufRead, BufReader};
use std::process::Stdio;
use std::sync::{
Arc, Mutex,
Arc,
atomic::{AtomicBool, Ordering},
};
use std::time::Duration;
use tokio::sync::mpsc;
use tracing::{debug, trace};
const RELOAD_RENDERING_DELAY: Duration = Duration::from_millis(200);
@ -32,7 +33,9 @@ pub struct Channel {
/// by delaying the rendering of a new frame.
pub reloading: Arc<AtomicBool>,
/// Track current dataset items as they're loaded for frecency filtering
current_dataset: Arc<Mutex<FxHashSet<String>>>,
current_dataset: FxHashSet<String>,
/// Receiver for batched dataset updates from the loading task
dataset_rx: Option<mpsc::UnboundedReceiver<Vec<String>>>,
}
impl Channel {
@ -47,23 +50,24 @@ impl Channel {
crawl_handle: None,
current_source_index,
reloading: Arc::new(AtomicBool::new(false)),
current_dataset: Arc::new(Mutex::new(FxHashSet::default())),
current_dataset: FxHashSet::default(),
dataset_rx: None,
}
}
pub fn load(&mut self) {
// Clear the current dataset at the start of each load
if let Ok(mut dataset) = self.current_dataset.lock() {
dataset.clear();
}
self.current_dataset.clear();
let (dataset_tx, dataset_rx) = mpsc::unbounded_channel();
self.dataset_rx = Some(dataset_rx);
let injector = self.matcher.injector();
let current_dataset = self.current_dataset.clone();
let crawl_handle = tokio::spawn(load_candidates(
self.prototype.source.clone(),
self.current_source_index,
injector,
current_dataset,
dataset_tx,
));
self.crawl_handle = Some(crawl_handle);
}
@ -103,27 +107,30 @@ impl Channel {
self.matcher.find(pattern);
}
/// Try to update the dataset from the loading task if available
fn try_update_dataset(&mut self) {
if let Some(rx) = &mut self.dataset_rx {
// Process all available batches (non-blocking)
while let Ok(batch) = rx.try_recv() {
// Extend current dataset with the new batch
self.current_dataset.extend(batch);
}
}
}
/// Filter recent items to only include those that exist in the current dataset
fn filter_recent_items_by_current_dataset(
&self,
recent_items: &[String],
) -> Vec<String> {
match self.current_dataset.lock() {
Ok(current_dataset) => {
let mut filtered = Vec::with_capacity(recent_items.len());
filtered.extend(
recent_items
.iter()
.filter(|item| current_dataset.contains(*item))
.cloned(),
);
filtered
}
Err(_) => {
// If we can't lock, return empty to prevent inconsistent results
Vec::new()
}
}
let mut filtered = Vec::with_capacity(recent_items.len());
filtered.extend(
recent_items
.iter()
.filter(|item| self.current_dataset.contains(*item))
.cloned(),
);
filtered
}
/// Fuzzy match against a list of recent items
@ -165,6 +172,9 @@ impl Channel {
offset: u32,
frecency: Option<&Frecency>,
) -> Vec<Entry> {
// Try to update dataset from loading task
self.try_update_dataset();
self.matcher.tick();
let results = if let Some(frecency_data) = frecency {
@ -315,15 +325,28 @@ impl Channel {
}
const DEFAULT_LINE_BUFFER_SIZE: usize = 512;
const BATCH_SIZE: usize = 100;
/// Helper function to send a batch if it's not empty
fn send_batch_if_not_empty(
batch: &mut Vec<String>,
dataset_tx: &mpsc::UnboundedSender<Vec<String>>,
) {
if !batch.is_empty() {
let _ = dataset_tx.send(std::mem::take(batch));
}
}
#[allow(clippy::unused_async)]
async fn load_candidates(
source: SourceSpec,
source_command_index: usize,
injector: Injector<String>,
current_dataset: Arc<Mutex<FxHashSet<String>>>,
dataset_tx: mpsc::UnboundedSender<Vec<String>>,
) {
debug!("Loading candidates from command: {:?}", source.command);
let mut current_batch = Vec::with_capacity(BATCH_SIZE);
let mut child = shell_command(
source.command.get_nth(source_command_index).raw(),
source.command.interactive,
@ -368,9 +391,15 @@ async fn load_candidates(
if !l.trim().is_empty() {
let entry = l.to_string();
// Track this item in our current dataset
if let Ok(mut dataset) = current_dataset.lock() {
dataset.insert(entry.clone());
// Add to current batch
current_batch.push(entry.clone());
// Send batch if it reaches the batch size
if current_batch.len() >= BATCH_SIZE {
send_batch_if_not_empty(
&mut current_batch,
&dataset_tx,
);
}
let () = injector.push(entry, |e, cols| {
@ -405,9 +434,15 @@ async fn load_candidates(
for line in reader.lines() {
let line = line.unwrap();
if !line.trim().is_empty() {
// Track this item in our current dataset
if let Ok(mut dataset) = current_dataset.lock() {
dataset.insert(line.clone());
// Add to current batch
current_batch.push(line.clone());
// Send batch if it reaches the batch size
if current_batch.len() >= BATCH_SIZE {
send_batch_if_not_empty(
&mut current_batch,
&dataset_tx,
);
}
let () = injector.push(line, |e, cols| {
@ -418,6 +453,9 @@ async fn load_candidates(
}
}
let _ = child.wait();
// Send any remaining entries in the final batch
send_batch_if_not_empty(&mut current_batch, &dataset_tx);
}
#[cfg(test)]
@ -436,9 +474,9 @@ mod tests {
let mut matcher = Matcher::<String>::new(&Config::default());
let injector = matcher.injector();
let current_dataset = Arc::new(Mutex::new(FxHashSet::default()));
let (dataset_tx, _dataset_rx) = mpsc::unbounded_channel();
load_candidates(source_spec, 0, injector, current_dataset).await;
load_candidates(source_spec, 0, injector, dataset_tx).await;
// Check if the matcher has the expected results
matcher.find("test");
@ -460,9 +498,9 @@ mod tests {
let mut matcher = Matcher::<String>::new(&Config::default());
let injector = matcher.injector();
let current_dataset = Arc::new(Mutex::new(FxHashSet::default()));
let (dataset_tx, _dataset_rx) = mpsc::unbounded_channel();
load_candidates(source_spec, 0, injector, current_dataset).await;
load_candidates(source_spec, 0, injector, dataset_tx).await;
// Check if the matcher has the expected results
matcher.find("test");
@ -484,9 +522,9 @@ mod tests {
let mut matcher = Matcher::<String>::new(&Config::default());
let injector = matcher.injector();
let current_dataset = Arc::new(Mutex::new(FxHashSet::default()));
let (dataset_tx, _dataset_rx) = mpsc::unbounded_channel();
load_candidates(source_spec, 0, injector, current_dataset).await;
load_candidates(source_spec, 0, injector, dataset_tx).await;
// Check if the matcher has the expected results
matcher.find("test");