fix(frecency): replace unbounded batching with bounded channels to prevent memory exhaustion

Replace unbounded channel with bounded (32 capacity) + dedicated update
task.
Prevents OutOfMemory on large datasets, adds proper synchronization with
Arc<RwLock>. Better error recovery.
This commit is contained in:
lalvarezt 2025-07-28 18:08:46 +02:00
parent acba4b5f11
commit 86e3d924f3

View File

@ -14,7 +14,7 @@ use std::collections::HashSet;
use std::io::{BufRead, BufReader};
use std::process::Stdio;
use std::sync::{
Arc,
Arc, RwLock,
atomic::{AtomicBool, Ordering},
};
use std::time::Duration;
@ -22,6 +22,11 @@ use tokio::sync::mpsc;
use tracing::{debug, trace};
const RELOAD_RENDERING_DELAY: Duration = Duration::from_millis(200);
const DEFAULT_LINE_BUFFER_SIZE: usize = 512;
const DATASET_CHANNEL_CAPACITY: usize = 32;
const DATASET_UPDATE_INTERVAL: Duration = Duration::from_millis(50);
const LOAD_CANDIDATE_BATCH_SIZE: usize = 100;
pub struct Channel {
pub prototype: ChannelPrototype,
@ -32,10 +37,10 @@ pub struct Channel {
/// Indicates if the channel is currently reloading to prevent UI flickering
/// by delaying the rendering of a new frame.
pub reloading: Arc<AtomicBool>,
/// Track current dataset items as they're loaded for frecency filtering
current_dataset: FxHashSet<String>,
/// Receiver for batched dataset updates from the loading task
dataset_rx: Option<mpsc::UnboundedReceiver<Vec<String>>>,
/// Track current dataset items
current_dataset: Arc<RwLock<FxHashSet<String>>>,
/// Handle for the dataset update task
dataset_update_handle: Option<tokio::task::JoinHandle<()>>,
}
impl Channel {
@ -50,17 +55,26 @@ impl Channel {
crawl_handle: None,
current_source_index,
reloading: Arc::new(AtomicBool::new(false)),
current_dataset: FxHashSet::default(),
dataset_rx: None,
current_dataset: Arc::new(RwLock::new(FxHashSet::default())),
dataset_update_handle: None,
}
}
pub fn load(&mut self) {
// Clear the current dataset at the start of each load
self.current_dataset.clear();
if let Ok(mut dataset) = self.current_dataset.write() {
dataset.clear();
}
let (dataset_tx, dataset_rx) = mpsc::unbounded_channel();
self.dataset_rx = Some(dataset_rx);
// Create bounded channel to prevent unbounded memory growth
let (dataset_tx, dataset_rx) = mpsc::channel(DATASET_CHANNEL_CAPACITY);
// Create dedicated dataset update task
let dataset_clone = self.current_dataset.clone();
let dataset_update_handle = tokio::spawn(async move {
dataset_update_task(dataset_rx, dataset_clone).await;
});
self.dataset_update_handle = Some(dataset_update_handle);
let injector = self.matcher.injector();
let crawl_handle = tokio::spawn(load_candidates(
@ -79,11 +93,17 @@ impl Channel {
}
self.reloading.store(true, Ordering::Relaxed);
// Abort existing tasks
if let Some(handle) = self.crawl_handle.take() {
if !handle.is_finished() {
handle.abort();
}
}
if let Some(handle) = self.dataset_update_handle.take() {
if !handle.is_finished() {
handle.abort();
}
}
self.matcher.restart();
self.load();
// Spawn a thread that turns off reloading after a short delay
@ -107,27 +127,24 @@ impl Channel {
self.matcher.find(pattern);
}
/// Try to update the dataset from the loading task if available
fn try_update_dataset(&mut self) {
if let Some(rx) = &mut self.dataset_rx {
// Process all available batches (non-blocking)
while let Ok(batch) = rx.try_recv() {
// Extend current dataset with the new batch
self.current_dataset.extend(batch);
}
}
}
/// Filter recent items to only include those that exist in the current dataset
fn filter_recent_items_by_current_dataset(
&self,
recent_items: &[String],
) -> Vec<String> {
// Try to read dataset, return empty on lock failure to prevent blocking
let Ok(dataset) = self.current_dataset.read() else {
debug!(
"Failed to acquire dataset read lock, skipping frecency filtering"
);
return Vec::new();
};
let mut filtered = Vec::with_capacity(recent_items.len());
filtered.extend(
recent_items
.iter()
.filter(|item| self.current_dataset.contains(*item))
.filter(|item| dataset.contains(*item))
.cloned(),
);
filtered
@ -172,9 +189,6 @@ impl Channel {
offset: u32,
frecency: Option<&Frecency>,
) -> Vec<Entry> {
// Try to update dataset from loading task
self.try_update_dataset();
self.matcher.tick();
let results = if let Some(frecency_data) = frecency {
@ -314,28 +328,96 @@ impl Channel {
}
}
const DEFAULT_LINE_BUFFER_SIZE: usize = 512;
const BATCH_SIZE: usize = 100;
/// Helper function to send a batch if it's not empty
fn send_batch_if_not_empty(
batch: &mut Vec<String>,
dataset_tx: &mpsc::UnboundedSender<Vec<String>>,
/// Dedicated task for updating the dataset from batched updates
/// This runs independently from the UI to prevent blocking
async fn dataset_update_task(
mut dataset_rx: mpsc::Receiver<Vec<String>>,
current_dataset: Arc<RwLock<FxHashSet<String>>>,
) {
if !batch.is_empty() {
let _ = dataset_tx.send(std::mem::take(batch));
debug!("Starting dataset update task");
let mut update_interval = tokio::time::interval(DATASET_UPDATE_INTERVAL);
let mut pending_updates = Vec::new();
loop {
tokio::select! {
// Receive new batches
batch_result = dataset_rx.recv() => {
if let Some(batch) = batch_result {
pending_updates.push(batch);
} else {
// Channel closed, process remaining updates and exit
if !pending_updates.is_empty() {
apply_pending_updates(&pending_updates, &current_dataset);
}
debug!("Dataset update task exiting - channel closed");
break;
}
}
// Periodic updates to apply accumulated batches
_ = update_interval.tick() => {
if !pending_updates.is_empty() {
apply_pending_updates(&pending_updates, &current_dataset);
pending_updates.clear();
}
}
}
}
}
/// Apply accumulated updates to the dataset with proper error handling
fn apply_pending_updates(
pending_updates: &[Vec<String>],
current_dataset: &Arc<RwLock<FxHashSet<String>>>,
) {
match current_dataset.write() {
Ok(mut dataset) => {
// Pre-calculate capacity to minimize reallocations
let total_items: usize =
pending_updates.iter().map(Vec::len).sum();
dataset.reserve(total_items);
// Apply all pending updates
for batch in pending_updates {
dataset.extend(batch.iter().cloned());
}
trace!(
"Applied {} batches containing {} total items to dataset",
pending_updates.len(),
total_items
);
}
Err(e) => {
debug!(
"Failed to acquire dataset write lock: {:?}. Skipping batch updates.",
e
);
}
}
}
/// Helper function to send a batch if it's not empty with backpressure handling
async fn send_batch_if_not_empty(
batch: &mut Vec<String>,
dataset_tx: &mpsc::Sender<Vec<String>>,
) -> Result<(), mpsc::error::SendError<Vec<String>>> {
if batch.is_empty() {
Ok(())
} else {
let batch_to_send = std::mem::take(batch);
dataset_tx.send(batch_to_send).await
}
}
#[allow(clippy::unused_async)]
async fn load_candidates(
source: SourceSpec,
source_command_index: usize,
injector: Injector<String>,
dataset_tx: mpsc::UnboundedSender<Vec<String>>,
dataset_tx: mpsc::Sender<Vec<String>>,
) {
debug!("Loading candidates from command: {:?}", source.command);
let mut current_batch = Vec::with_capacity(BATCH_SIZE);
let mut current_batch = Vec::with_capacity(LOAD_CANDIDATE_BATCH_SIZE);
let mut child = shell_command(
source.command.get_nth(source_command_index).raw(),
@ -385,11 +467,19 @@ async fn load_candidates(
current_batch.push(entry.clone());
// Send batch if it reaches the batch size
if current_batch.len() >= BATCH_SIZE {
send_batch_if_not_empty(
if current_batch.len() >= LOAD_CANDIDATE_BATCH_SIZE {
if let Err(e) = send_batch_if_not_empty(
&mut current_batch,
&dataset_tx,
);
)
.await
{
debug!(
"Failed to send dataset batch: {:?}. Dataset may be incomplete.",
e
);
break; // Exit loop if channel is closed
}
}
let () = injector.push(entry, |e, cols| {
@ -428,11 +518,19 @@ async fn load_candidates(
current_batch.push(line.clone());
// Send batch if it reaches the batch size
if current_batch.len() >= BATCH_SIZE {
send_batch_if_not_empty(
if current_batch.len() >= LOAD_CANDIDATE_BATCH_SIZE {
if let Err(e) = send_batch_if_not_empty(
&mut current_batch,
&dataset_tx,
);
)
.await
{
debug!(
"Failed to send dataset batch: {:?}. Dataset may be incomplete.",
e
);
break; // Exit loop if channel is closed
}
}
let () = injector.push(line, |e, cols| {
@ -445,7 +543,11 @@ async fn load_candidates(
let _ = child.wait();
// Send any remaining entries in the final batch
send_batch_if_not_empty(&mut current_batch, &dataset_tx);
if let Err(e) =
send_batch_if_not_empty(&mut current_batch, &dataset_tx).await
{
debug!("Failed to send final dataset batch: {:?}", e);
}
}
#[cfg(test)]
@ -464,7 +566,8 @@ mod tests {
let mut matcher = Matcher::<String>::new(&Config::default());
let injector = matcher.injector();
let (dataset_tx, _dataset_rx) = mpsc::unbounded_channel();
let (dataset_tx, _dataset_rx) =
mpsc::channel(DATASET_CHANNEL_CAPACITY);
load_candidates(source_spec, 0, injector, dataset_tx).await;
@ -488,7 +591,8 @@ mod tests {
let mut matcher = Matcher::<String>::new(&Config::default());
let injector = matcher.injector();
let (dataset_tx, _dataset_rx) = mpsc::unbounded_channel();
let (dataset_tx, _dataset_rx) =
mpsc::channel(DATASET_CHANNEL_CAPACITY);
load_candidates(source_spec, 0, injector, dataset_tx).await;
@ -512,7 +616,8 @@ mod tests {
let mut matcher = Matcher::<String>::new(&Config::default());
let injector = matcher.injector();
let (dataset_tx, _dataset_rx) = mpsc::unbounded_channel();
let (dataset_tx, _dataset_rx) =
mpsc::channel(DATASET_CHANNEL_CAPACITY);
load_candidates(source_spec, 0, injector, dataset_tx).await;