From a93908fd13bcfa40281f38fb45779cca8414cbdd Mon Sep 17 00:00:00 2001 From: Alexandre Pasmantier Date: Sun, 1 Dec 2024 21:33:46 +0100 Subject: [PATCH] refactor(simd): improvement attempt broke the whole thing --- Cargo.lock | 7 + .../src/channels/stdin_simd.rs | 4 +- .../television-channels/src/channels/text.rs | 48 +++-- crates/television-fuzzy/Cargo.toml | 1 + crates/television-fuzzy/src/simd/mod.rs | 179 +++++++++++------- 5 files changed, 149 insertions(+), 90 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index 9c11d12..bc15157 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -247,6 +247,12 @@ dependencies = [ "generic-array", ] +[[package]] +name = "boxcar" +version = "0.2.7" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "7f839cdf7e2d3198ac6ca003fd8ebc61715755f41c1cad15ff13df67531e00ed" + [[package]] name = "bstr" version = "1.11.0" @@ -2933,6 +2939,7 @@ dependencies = [ name = "television-fuzzy" version = "0.0.7" dependencies = [ + "boxcar", "crossbeam-channel", "frizbee", "nucleo", diff --git a/crates/television-channels/src/channels/stdin_simd.rs b/crates/television-channels/src/channels/stdin_simd.rs index c91e5ca..a6ca0ac 100644 --- a/crates/television-channels/src/channels/stdin_simd.rs +++ b/crates/television-channels/src/channels/stdin_simd.rs @@ -17,7 +17,7 @@ pub struct Channel { impl Channel { pub fn new() -> Self { - let matcher = SimdMatcher::new(|s: &String| s.trim_end().to_string()); + let matcher = SimdMatcher::new(|s: &String| s.trim_end()); let injector = matcher.injector(); spawn(move || stream_from_stdin(injector.clone())); @@ -85,7 +85,7 @@ impl OnAir for Channel { fn get_result(&self, index: u32) -> Option { self.matcher .get_result(index as usize) - .map(|s| Entry::new(s.clone(), PreviewType::Basic)) + .map(|s| Entry::new(s.matched_string.clone(), PreviewType::Basic)) } fn result_count(&self) -> u32 { diff --git a/crates/television-channels/src/channels/text.rs b/crates/television-channels/src/channels/text.rs index 300fba2..e44135a 100644 --- a/crates/television-channels/src/channels/text.rs +++ b/crates/television-channels/src/channels/text.rs @@ -8,7 +8,9 @@ use std::{ path::{Path, PathBuf}, sync::{atomic::AtomicUsize, Arc}, }; -use television_fuzzy::{NucleoConfig, NucleoInjector, NucleoMatcher}; +use television_fuzzy::{ + NucleoConfig, NucleoInjector, NucleoMatcher, SimdInjector, SimdMatcher, +}; use television_utils::files::{walk_builder, DEFAULT_NUM_THREADS}; use television_utils::strings::{ proportion_of_printable_ascii_characters, PRINTABLE_ASCII_THRESHOLD, @@ -34,13 +36,15 @@ impl CandidateLine { #[allow(clippy::module_name_repetitions)] pub struct Channel { - matcher: NucleoMatcher, + //matcher: NucleoMatcher, + matcher: SimdMatcher, crawl_handle: tokio::task::JoinHandle<()>, } impl Channel { pub fn new(directories: Vec) -> Self { - let matcher = NucleoMatcher::new(NucleoConfig::default()); + //let matcher = NucleoMatcher::new(NucleoConfig::default()); + let matcher = SimdMatcher::new(|c: &CandidateLine| &*c.line); // start loading files in the background let crawl_handle = tokio::spawn(crawl_for_candidates( directories, @@ -53,7 +57,8 @@ impl Channel { } fn from_file_paths(file_paths: Vec) -> Self { - let matcher = NucleoMatcher::new(NucleoConfig::default()); + //let matcher = NucleoMatcher::new(NucleoConfig::default()); + let matcher = SimdMatcher::new(|c: &CandidateLine| &*c.line); let injector = matcher.injector(); let current_dir = std::env::current_dir().unwrap(); let crawl_handle = tokio::spawn(async move { @@ -77,7 +82,8 @@ impl Channel { } fn from_text_entries(entries: Vec) -> Self { - let matcher = NucleoMatcher::new(NucleoConfig::default()); + //let matcher = NucleoMatcher::new(NucleoConfig::default()); + let matcher = SimdMatcher::new(|c: &CandidateLine| &*c.line); let injector = matcher.injector(); let load_handle = tokio::spawn(async move { for entry in entries.into_iter().take(MAX_LINES_IN_MEM) { @@ -87,9 +93,9 @@ impl Channel { entry.value.unwrap(), entry.line_number.unwrap(), ), - |c, cols| { - cols[0] = c.line.clone().into(); - }, + //|c, cols| { + // cols[0] = c.line.clone().into(); + //}, ); } }); @@ -178,7 +184,7 @@ impl OnAir for Channel { } fn get_result(&self, index: u32) -> Option { - self.matcher.get_result(index).map(|item| { + self.matcher.get_result(index as usize).map(|item| { let display_path = item.inner.path.to_string_lossy().to_string(); Entry::new(display_path, PreviewType::Files) .with_icon(FileIcon::from(item.inner.path.as_path())) @@ -187,15 +193,18 @@ impl OnAir for Channel { } fn result_count(&self) -> u32 { - self.matcher.matched_item_count + //self.matcher.matched_item_count + self.matcher.result_count().try_into().unwrap() } fn total_count(&self) -> u32 { - self.matcher.total_item_count + //self.matcher.total_item_count + self.matcher.total_count().try_into().unwrap() } fn running(&self) -> bool { - self.matcher.status.running + //self.matcher.status.running + self.matcher.running() } fn shutdown(&self) { @@ -221,12 +230,13 @@ const MAX_FILE_SIZE: u64 = 4 * 1024 * 1024; /// /// A typical line should take somewhere around 100 bytes in memory (for utf8 english text), /// so this should take around 100 x `5_000_000` = 500MB of memory. -const MAX_LINES_IN_MEM: usize = 5_000_000; +const MAX_LINES_IN_MEM: usize = 50_000_000; #[allow(clippy::unused_async)] async fn crawl_for_candidates( directories: Vec, - injector: NucleoInjector, + //injector: NucleoInjector, + injector: SimdInjector, ) { if directories.is_empty() { return; @@ -274,7 +284,8 @@ async fn crawl_for_candidates( } fn try_inject_lines( - injector: &NucleoInjector, + //injector: &NucleoInjector, + injector: &SimdInjector, current_dir: &PathBuf, path: &Path, ) -> Option { @@ -315,9 +326,10 @@ fn try_inject_lines( l, line_number, ); - let () = injector.push(candidate, |c, cols| { - cols[0] = c.line.clone().into(); - }); + //let () = injector.push(candidate, |c, cols| { + // cols[0] = c.line.clone().into(); + //}); + injector.push(candidate); injected_lines += 1; } Err(e) => { diff --git a/crates/television-fuzzy/Cargo.toml b/crates/television-fuzzy/Cargo.toml index 5bcc870..ab8658f 100644 --- a/crates/television-fuzzy/Cargo.toml +++ b/crates/television-fuzzy/Cargo.toml @@ -20,3 +20,4 @@ threadpool = "1.8.1" crossbeam-channel = "0.5.13" tracing = "0.1.41" rayon = "1.10.0" +boxcar = "0.2.7" diff --git a/crates/television-fuzzy/src/simd/mod.rs b/crates/television-fuzzy/src/simd/mod.rs index 3635e07..d623cdd 100644 --- a/crates/television-fuzzy/src/simd/mod.rs +++ b/crates/television-fuzzy/src/simd/mod.rs @@ -1,14 +1,16 @@ +use std::fmt::Debug; use std::num::NonZero; +use std::ops::Deref; use std::sync::atomic::AtomicBool; use std::sync::Arc; -use std::thread::available_parallelism; -use tracing::debug; +use std::thread::{available_parallelism, spawn}; use crossbeam_channel::{unbounded, Receiver, Sender}; use frizbee::{match_list, match_list_for_matched_indices, Options}; use parking_lot::Mutex; use rayon::prelude::ParallelSliceMut; use threadpool::ThreadPool; +use tracing::debug; pub struct Injector where @@ -108,20 +110,19 @@ impl From> for MatcherStatus { } } -type IntoHaystackFn = fn(&I) -> String; +type IntoHaystackFn = fn(&I) -> &str; pub struct Matcher where - I: Sync + Send + Clone + 'static, + I: Sync + Send + Clone + 'static + Debug, { pattern: String, - items: Arc>>, + items: Arc>, into_haystack: IntoHaystackFn, worker_pool: WorkerPool, injection_channel_rx: Receiver, injection_channel_tx: Sender, - /// The indices of the matched items. - results: Arc>>, + results: Arc>, status: Arc, } @@ -129,9 +130,10 @@ const DEFAULT_ITEMS_CAPACITY: usize = 1024 * 1024; /// The maximum number of items that can be acquired per tick. /// /// This is used to prevent item acquisition from holding onto the lock on `self.items` for too long. -const MAX_ACQUIRED_ITEMS_PER_TICK: usize = 1024 * 1024; +const MAX_ACQUIRED_ITEMS_PER_TICK: usize = 1024 * 1024 * 4; -const JOB_CHUNK_SIZE: usize = 1024 * 64; +/// Number of items to match in a single simd job. +const JOB_CHUNK_SIZE: usize = 1024 * 1024; const SMITH_WATERMAN_OPTS: Options = Options { indices: false, prefilter: true, @@ -142,26 +144,27 @@ const SMITH_WATERMAN_OPTS: Options = Options { impl Matcher where - I: Sync + Send + Clone + 'static, + I: Sync + Send + Clone + 'static + Debug, { pub fn new(f: IntoHaystackFn) -> Self { - debug!("Creating threadpool"); - let thread_pool = ThreadPool::new(usize::from( - available_parallelism().unwrap_or(NonZero::new(8).unwrap()), - )); + let thread_pool = ThreadPool::with_name( + "SimdMatcher".to_string(), + usize::from( + available_parallelism().unwrap_or(NonZero::new(8).unwrap()), + ), + ); let worker_pool = WorkerPool::new(thread_pool); let (sender, receiver) = unbounded(); - debug!("finished initializing matcher"); Self { pattern: String::new(), - items: Arc::new(Mutex::new(Vec::with_capacity( + items: Arc::new(boxcar::Vec::with_capacity( DEFAULT_ITEMS_CAPACITY, - ))), + )), into_haystack: f, worker_pool, injection_channel_rx: receiver, injection_channel_tx: sender, - results: Arc::new(Mutex::new(Vec::new())), + results: Arc::new(boxcar::Vec::new()), status: Arc::new(Status::default()), } } @@ -177,7 +180,8 @@ where .results_need_sorting .load(std::sync::atomic::Ordering::Relaxed) { - let mut results = self.results.lock_arc(); + debug!("Sorting results"); + // let mut results = self.results.clone(); // results.par_sort_unstable_by_key(|r| std::cmp::Reverse(r.score)); self.status .results_need_sorting @@ -196,65 +200,100 @@ where pub fn match_items(&mut self) { // debug!("items.len(): {}, injected: {}", self.items.lock_arc().len(), self.worker_pool.num_injected_items); // if all items have already been fed to the worker pool, simply return - if self.items.lock_arc().len() == self.worker_pool.num_injected_items { + let item_count = self.items.count(); + if item_count == self.worker_pool.num_injected_items { return; } let n_injected_items = self.worker_pool.num_injected_items; - let items = self.items.lock_arc(); - let new_item_chunks: Vec<&[I]> = - items[n_injected_items..].chunks(JOB_CHUNK_SIZE).collect(); - let into_haystack = self.into_haystack; - let mut item_offset = n_injected_items; - for chunk in new_item_chunks { - let chunk = chunk.to_vec(); - let chunk_size = chunk.len(); - let pattern = self.pattern.clone(); + let pattern = self.pattern.clone(); + + let mut chunks = Vec::new(); + let mut offsets = Vec::new(); + let mut current_offset = n_injected_items; + let items = Arc::clone(&self.items); + loop { + if current_offset >= item_count { + break; + } + let chunk_size = (item_count - current_offset).min(JOB_CHUNK_SIZE); + chunks.push( + items + .iter() + .skip(current_offset) + .take(chunk_size) + .map(|(_, v)| (self.into_haystack)(v)), + ); + offsets.push(current_offset); + current_offset += chunk_size; + } + + let offsets_c = offsets.clone(); + + for (i, chunk) in chunks.into_iter().enumerate() { + let pattern = pattern.clone(); let results = Arc::clone(&self.results); let status = Arc::clone(&self.status); + let cur_offset = offsets_c[i]; self.worker_pool.execute(move || { - let strings: Vec = - chunk.iter().map(|item| (into_haystack)(item)).collect(); - let matches = - match_list(&pattern, &strings.iter().map(|s| s.as_str()).collect::>()[..], SMITH_WATERMAN_OPTS); - // debug!("matches: {:?}", matches); + let matches = match_list( + &pattern, + &chunk.collect::>(), + SMITH_WATERMAN_OPTS, + ); if matches.is_empty() { return; } - let mut results = results.lock_arc(); - results.extend(matches.into_iter().map(|m| { - MatchResult::new( - m.index_in_haystack + item_offset, + for m in &matches { + results.push(MatchResult::new( + m.index_in_haystack + cur_offset, m.score, - ) - })); + )); + } status .results_need_sorting .store(true, std::sync::atomic::Ordering::Relaxed); }); - self.worker_pool.num_injected_items += chunk_size; - item_offset += chunk_size; } + self.worker_pool.num_injected_items = item_count; } /// reads from the injection channel and puts new items into the items vec fn acquire_new_items(&self) { let items = Arc::clone(&self.items); + if self.injection_channel_rx.is_empty() { + return; + } + debug!("Acquiring new items"); let injection_channel_rx = self.injection_channel_rx.clone(); let status = Arc::clone(&self.status); - self.worker_pool.execute(move || { - let injection_channel_rx = injection_channel_rx; + spawn(move || { status .injector_running .store(true, std::sync::atomic::Ordering::Relaxed); - items.lock_arc().extend( - injection_channel_rx - .try_iter() - .take(MAX_ACQUIRED_ITEMS_PER_TICK), - ); + for item in injection_channel_rx + .try_iter() + .take(MAX_ACQUIRED_ITEMS_PER_TICK) + { + items.push(item); + } status .injector_running .store(false, std::sync::atomic::Ordering::Relaxed); }); + //self.worker_pool.execute(move || { + // let injection_channel_rx = injection_channel_rx; + // status + // .injector_running + // .store(true, std::sync::atomic::Ordering::Relaxed); + // items.lock_arc().extend( + // injection_channel_rx + // .try_iter() + // .take(MAX_ACQUIRED_ITEMS_PER_TICK), + // ); + // status + // .injector_running + // .store(false, std::sync::atomic::Ordering::Relaxed); + //}); } pub fn injector(&self) -> Injector { @@ -269,7 +308,7 @@ where return; } self.pattern = pattern.to_string(); - self.results.lock_arc().clear(); + self.results = Arc::new(boxcar::Vec::new()); self.worker_pool.num_injected_items = 0; } @@ -278,28 +317,23 @@ where num_entries: u32, offset: u32, ) -> Vec> { - let global_results = self.results.lock_arc(); let mut indices = Vec::new(); - let items = self.items.lock_arc(); - global_results + self.results .iter() .skip(offset as usize) + .map(|(_, v)| v) .take(num_entries as usize) .for_each(|r| { indices.push(r.index_in_haystack); }); let matched_inner: Vec<_> = - indices.iter().map(|i| items[*i].clone()).collect(); - let matched_strings = matched_inner - .iter() - .map(|s| (self.into_haystack)(s)) - .collect::>(); + indices.iter().map(|i| self.items[*i].clone()).collect(); let matched_indices = match_list_for_matched_indices( &self.pattern, - &matched_strings + &matched_inner .iter() - .map(|s| s.as_str()) - .collect::>()[..], + .map(|item| (self.into_haystack)(item)) + .collect::>(), ); let mut matched_items = Vec::new(); for (inner, indices) in @@ -307,7 +341,7 @@ where { matched_items.push(MatchedItem { inner: inner.clone(), - matched_string: (self.into_haystack)(inner), + matched_string: (self.into_haystack)(inner).to_string(), match_indices: indices .iter() .map(|i| (*i as u32, *i as u32 + 1)) @@ -317,22 +351,27 @@ where matched_items } - pub fn get_result(&self, index: usize) -> Option { - let results = self.results.lock_arc(); - if index >= results.len() { + pub fn get_result(&self, index: usize) -> Option> { + if index >= self.results.count() { return None; } - let result = &results[index]; - let items = self.items.lock_arc(); - Some(items[result.index_in_haystack].clone()) + let result = &self.results[index]; + Some(MatchedItem { + inner: self.items[result.index_in_haystack].clone(), + matched_string: (self.into_haystack)( + &self.items[result.index_in_haystack], + ) + .to_string(), + match_indices: vec![], + }) } pub fn result_count(&self) -> usize { - self.results.lock_arc().len() + self.results.count() } pub fn total_count(&self) -> usize { - self.items.lock_arc().len() + self.items.count() } pub fn running(&self) -> bool {