refactor(simd): improvement attempt broke the whole thing

This commit is contained in:
Alexandre Pasmantier 2024-12-01 21:33:46 +01:00
parent d88e5f7968
commit a93908fd13
5 changed files with 149 additions and 90 deletions

7
Cargo.lock generated
View File

@ -247,6 +247,12 @@ dependencies = [
"generic-array",
]
[[package]]
name = "boxcar"
version = "0.2.7"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "7f839cdf7e2d3198ac6ca003fd8ebc61715755f41c1cad15ff13df67531e00ed"
[[package]]
name = "bstr"
version = "1.11.0"
@ -2933,6 +2939,7 @@ dependencies = [
name = "television-fuzzy"
version = "0.0.7"
dependencies = [
"boxcar",
"crossbeam-channel",
"frizbee",
"nucleo",

View File

@ -17,7 +17,7 @@ pub struct Channel {
impl Channel {
pub fn new() -> Self {
let matcher = SimdMatcher::new(|s: &String| s.trim_end().to_string());
let matcher = SimdMatcher::new(|s: &String| s.trim_end());
let injector = matcher.injector();
spawn(move || stream_from_stdin(injector.clone()));
@ -85,7 +85,7 @@ impl OnAir for Channel {
fn get_result(&self, index: u32) -> Option<Entry> {
self.matcher
.get_result(index as usize)
.map(|s| Entry::new(s.clone(), PreviewType::Basic))
.map(|s| Entry::new(s.matched_string.clone(), PreviewType::Basic))
}
fn result_count(&self) -> u32 {

View File

@ -8,7 +8,9 @@ use std::{
path::{Path, PathBuf},
sync::{atomic::AtomicUsize, Arc},
};
use television_fuzzy::{NucleoConfig, NucleoInjector, NucleoMatcher};
use television_fuzzy::{
NucleoConfig, NucleoInjector, NucleoMatcher, SimdInjector, SimdMatcher,
};
use television_utils::files::{walk_builder, DEFAULT_NUM_THREADS};
use television_utils::strings::{
proportion_of_printable_ascii_characters, PRINTABLE_ASCII_THRESHOLD,
@ -34,13 +36,15 @@ impl CandidateLine {
#[allow(clippy::module_name_repetitions)]
pub struct Channel {
matcher: NucleoMatcher<CandidateLine>,
//matcher: NucleoMatcher<CandidateLine>,
matcher: SimdMatcher<CandidateLine>,
crawl_handle: tokio::task::JoinHandle<()>,
}
impl Channel {
pub fn new(directories: Vec<PathBuf>) -> Self {
let matcher = NucleoMatcher::new(NucleoConfig::default());
//let matcher = NucleoMatcher::new(NucleoConfig::default());
let matcher = SimdMatcher::new(|c: &CandidateLine| &*c.line);
// start loading files in the background
let crawl_handle = tokio::spawn(crawl_for_candidates(
directories,
@ -53,7 +57,8 @@ impl Channel {
}
fn from_file_paths(file_paths: Vec<PathBuf>) -> Self {
let matcher = NucleoMatcher::new(NucleoConfig::default());
//let matcher = NucleoMatcher::new(NucleoConfig::default());
let matcher = SimdMatcher::new(|c: &CandidateLine| &*c.line);
let injector = matcher.injector();
let current_dir = std::env::current_dir().unwrap();
let crawl_handle = tokio::spawn(async move {
@ -77,7 +82,8 @@ impl Channel {
}
fn from_text_entries(entries: Vec<Entry>) -> Self {
let matcher = NucleoMatcher::new(NucleoConfig::default());
//let matcher = NucleoMatcher::new(NucleoConfig::default());
let matcher = SimdMatcher::new(|c: &CandidateLine| &*c.line);
let injector = matcher.injector();
let load_handle = tokio::spawn(async move {
for entry in entries.into_iter().take(MAX_LINES_IN_MEM) {
@ -87,9 +93,9 @@ impl Channel {
entry.value.unwrap(),
entry.line_number.unwrap(),
),
|c, cols| {
cols[0] = c.line.clone().into();
},
//|c, cols| {
// cols[0] = c.line.clone().into();
//},
);
}
});
@ -178,7 +184,7 @@ impl OnAir for Channel {
}
fn get_result(&self, index: u32) -> Option<Entry> {
self.matcher.get_result(index).map(|item| {
self.matcher.get_result(index as usize).map(|item| {
let display_path = item.inner.path.to_string_lossy().to_string();
Entry::new(display_path, PreviewType::Files)
.with_icon(FileIcon::from(item.inner.path.as_path()))
@ -187,15 +193,18 @@ impl OnAir for Channel {
}
fn result_count(&self) -> u32 {
self.matcher.matched_item_count
//self.matcher.matched_item_count
self.matcher.result_count().try_into().unwrap()
}
fn total_count(&self) -> u32 {
self.matcher.total_item_count
//self.matcher.total_item_count
self.matcher.total_count().try_into().unwrap()
}
fn running(&self) -> bool {
self.matcher.status.running
//self.matcher.status.running
self.matcher.running()
}
fn shutdown(&self) {
@ -221,12 +230,13 @@ const MAX_FILE_SIZE: u64 = 4 * 1024 * 1024;
///
/// A typical line should take somewhere around 100 bytes in memory (for utf8 english text),
/// so this should take around 100 x `5_000_000` = 500MB of memory.
const MAX_LINES_IN_MEM: usize = 5_000_000;
const MAX_LINES_IN_MEM: usize = 50_000_000;
#[allow(clippy::unused_async)]
async fn crawl_for_candidates(
directories: Vec<PathBuf>,
injector: NucleoInjector<CandidateLine>,
//injector: NucleoInjector<CandidateLine>,
injector: SimdInjector<CandidateLine>,
) {
if directories.is_empty() {
return;
@ -274,7 +284,8 @@ async fn crawl_for_candidates(
}
fn try_inject_lines(
injector: &NucleoInjector<CandidateLine>,
//injector: &NucleoInjector<CandidateLine>,
injector: &SimdInjector<CandidateLine>,
current_dir: &PathBuf,
path: &Path,
) -> Option<usize> {
@ -315,9 +326,10 @@ fn try_inject_lines(
l,
line_number,
);
let () = injector.push(candidate, |c, cols| {
cols[0] = c.line.clone().into();
});
//let () = injector.push(candidate, |c, cols| {
// cols[0] = c.line.clone().into();
//});
injector.push(candidate);
injected_lines += 1;
}
Err(e) => {

View File

@ -20,3 +20,4 @@ threadpool = "1.8.1"
crossbeam-channel = "0.5.13"
tracing = "0.1.41"
rayon = "1.10.0"
boxcar = "0.2.7"

View File

@ -1,14 +1,16 @@
use std::fmt::Debug;
use std::num::NonZero;
use std::ops::Deref;
use std::sync::atomic::AtomicBool;
use std::sync::Arc;
use std::thread::available_parallelism;
use tracing::debug;
use std::thread::{available_parallelism, spawn};
use crossbeam_channel::{unbounded, Receiver, Sender};
use frizbee::{match_list, match_list_for_matched_indices, Options};
use parking_lot::Mutex;
use rayon::prelude::ParallelSliceMut;
use threadpool::ThreadPool;
use tracing::debug;
pub struct Injector<I>
where
@ -108,20 +110,19 @@ impl From<Arc<Status>> for MatcherStatus {
}
}
type IntoHaystackFn<I> = fn(&I) -> String;
type IntoHaystackFn<I> = fn(&I) -> &str;
pub struct Matcher<I>
where
I: Sync + Send + Clone + 'static,
I: Sync + Send + Clone + 'static + Debug,
{
pattern: String,
items: Arc<Mutex<Vec<I>>>,
items: Arc<boxcar::Vec<I>>,
into_haystack: IntoHaystackFn<I>,
worker_pool: WorkerPool,
injection_channel_rx: Receiver<I>,
injection_channel_tx: Sender<I>,
/// The indices of the matched items.
results: Arc<Mutex<Vec<MatchResult>>>,
results: Arc<boxcar::Vec<MatchResult>>,
status: Arc<Status>,
}
@ -129,9 +130,10 @@ const DEFAULT_ITEMS_CAPACITY: usize = 1024 * 1024;
/// The maximum number of items that can be acquired per tick.
///
/// This is used to prevent item acquisition from holding onto the lock on `self.items` for too long.
const MAX_ACQUIRED_ITEMS_PER_TICK: usize = 1024 * 1024;
const MAX_ACQUIRED_ITEMS_PER_TICK: usize = 1024 * 1024 * 4;
const JOB_CHUNK_SIZE: usize = 1024 * 64;
/// Number of items to match in a single simd job.
const JOB_CHUNK_SIZE: usize = 1024 * 1024;
const SMITH_WATERMAN_OPTS: Options = Options {
indices: false,
prefilter: true,
@ -142,26 +144,27 @@ const SMITH_WATERMAN_OPTS: Options = Options {
impl<I> Matcher<I>
where
I: Sync + Send + Clone + 'static,
I: Sync + Send + Clone + 'static + Debug,
{
pub fn new(f: IntoHaystackFn<I>) -> Self {
debug!("Creating threadpool");
let thread_pool = ThreadPool::new(usize::from(
let thread_pool = ThreadPool::with_name(
"SimdMatcher".to_string(),
usize::from(
available_parallelism().unwrap_or(NonZero::new(8).unwrap()),
));
),
);
let worker_pool = WorkerPool::new(thread_pool);
let (sender, receiver) = unbounded();
debug!("finished initializing matcher");
Self {
pattern: String::new(),
items: Arc::new(Mutex::new(Vec::with_capacity(
items: Arc::new(boxcar::Vec::with_capacity(
DEFAULT_ITEMS_CAPACITY,
))),
)),
into_haystack: f,
worker_pool,
injection_channel_rx: receiver,
injection_channel_tx: sender,
results: Arc::new(Mutex::new(Vec::new())),
results: Arc::new(boxcar::Vec::new()),
status: Arc::new(Status::default()),
}
}
@ -177,7 +180,8 @@ where
.results_need_sorting
.load(std::sync::atomic::Ordering::Relaxed)
{
let mut results = self.results.lock_arc();
debug!("Sorting results");
// let mut results = self.results.clone();
// results.par_sort_unstable_by_key(|r| std::cmp::Reverse(r.score));
self.status
.results_need_sorting
@ -196,65 +200,100 @@ where
pub fn match_items(&mut self) {
// debug!("items.len(): {}, injected: {}", self.items.lock_arc().len(), self.worker_pool.num_injected_items);
// if all items have already been fed to the worker pool, simply return
if self.items.lock_arc().len() == self.worker_pool.num_injected_items {
let item_count = self.items.count();
if item_count == self.worker_pool.num_injected_items {
return;
}
let n_injected_items = self.worker_pool.num_injected_items;
let items = self.items.lock_arc();
let new_item_chunks: Vec<&[I]> =
items[n_injected_items..].chunks(JOB_CHUNK_SIZE).collect();
let into_haystack = self.into_haystack;
let mut item_offset = n_injected_items;
for chunk in new_item_chunks {
let chunk = chunk.to_vec();
let chunk_size = chunk.len();
let pattern = self.pattern.clone();
let mut chunks = Vec::new();
let mut offsets = Vec::new();
let mut current_offset = n_injected_items;
let items = Arc::clone(&self.items);
loop {
if current_offset >= item_count {
break;
}
let chunk_size = (item_count - current_offset).min(JOB_CHUNK_SIZE);
chunks.push(
items
.iter()
.skip(current_offset)
.take(chunk_size)
.map(|(_, v)| (self.into_haystack)(v)),
);
offsets.push(current_offset);
current_offset += chunk_size;
}
let offsets_c = offsets.clone();
for (i, chunk) in chunks.into_iter().enumerate() {
let pattern = pattern.clone();
let results = Arc::clone(&self.results);
let status = Arc::clone(&self.status);
let cur_offset = offsets_c[i];
self.worker_pool.execute(move || {
let strings: Vec<String> =
chunk.iter().map(|item| (into_haystack)(item)).collect();
let matches =
match_list(&pattern, &strings.iter().map(|s| s.as_str()).collect::<Vec<_>>()[..], SMITH_WATERMAN_OPTS);
// debug!("matches: {:?}", matches);
let matches = match_list(
&pattern,
&chunk.collect::<Vec<&str>>(),
SMITH_WATERMAN_OPTS,
);
if matches.is_empty() {
return;
}
let mut results = results.lock_arc();
results.extend(matches.into_iter().map(|m| {
MatchResult::new(
m.index_in_haystack + item_offset,
for m in &matches {
results.push(MatchResult::new(
m.index_in_haystack + cur_offset,
m.score,
)
}));
));
}
status
.results_need_sorting
.store(true, std::sync::atomic::Ordering::Relaxed);
});
self.worker_pool.num_injected_items += chunk_size;
item_offset += chunk_size;
}
self.worker_pool.num_injected_items = item_count;
}
/// reads from the injection channel and puts new items into the items vec
fn acquire_new_items(&self) {
let items = Arc::clone(&self.items);
if self.injection_channel_rx.is_empty() {
return;
}
debug!("Acquiring new items");
let injection_channel_rx = self.injection_channel_rx.clone();
let status = Arc::clone(&self.status);
self.worker_pool.execute(move || {
let injection_channel_rx = injection_channel_rx;
spawn(move || {
status
.injector_running
.store(true, std::sync::atomic::Ordering::Relaxed);
items.lock_arc().extend(
injection_channel_rx
for item in injection_channel_rx
.try_iter()
.take(MAX_ACQUIRED_ITEMS_PER_TICK),
);
.take(MAX_ACQUIRED_ITEMS_PER_TICK)
{
items.push(item);
}
status
.injector_running
.store(false, std::sync::atomic::Ordering::Relaxed);
});
//self.worker_pool.execute(move || {
// let injection_channel_rx = injection_channel_rx;
// status
// .injector_running
// .store(true, std::sync::atomic::Ordering::Relaxed);
// items.lock_arc().extend(
// injection_channel_rx
// .try_iter()
// .take(MAX_ACQUIRED_ITEMS_PER_TICK),
// );
// status
// .injector_running
// .store(false, std::sync::atomic::Ordering::Relaxed);
//});
}
pub fn injector(&self) -> Injector<I> {
@ -269,7 +308,7 @@ where
return;
}
self.pattern = pattern.to_string();
self.results.lock_arc().clear();
self.results = Arc::new(boxcar::Vec::new());
self.worker_pool.num_injected_items = 0;
}
@ -278,28 +317,23 @@ where
num_entries: u32,
offset: u32,
) -> Vec<MatchedItem<I>> {
let global_results = self.results.lock_arc();
let mut indices = Vec::new();
let items = self.items.lock_arc();
global_results
self.results
.iter()
.skip(offset as usize)
.map(|(_, v)| v)
.take(num_entries as usize)
.for_each(|r| {
indices.push(r.index_in_haystack);
});
let matched_inner: Vec<_> =
indices.iter().map(|i| items[*i].clone()).collect();
let matched_strings = matched_inner
.iter()
.map(|s| (self.into_haystack)(s))
.collect::<Vec<_>>();
indices.iter().map(|i| self.items[*i].clone()).collect();
let matched_indices = match_list_for_matched_indices(
&self.pattern,
&matched_strings
&matched_inner
.iter()
.map(|s| s.as_str())
.collect::<Vec<_>>()[..],
.map(|item| (self.into_haystack)(item))
.collect::<Vec<_>>(),
);
let mut matched_items = Vec::new();
for (inner, indices) in
@ -307,7 +341,7 @@ where
{
matched_items.push(MatchedItem {
inner: inner.clone(),
matched_string: (self.into_haystack)(inner),
matched_string: (self.into_haystack)(inner).to_string(),
match_indices: indices
.iter()
.map(|i| (*i as u32, *i as u32 + 1))
@ -317,22 +351,27 @@ where
matched_items
}
pub fn get_result(&self, index: usize) -> Option<I> {
let results = self.results.lock_arc();
if index >= results.len() {
pub fn get_result(&self, index: usize) -> Option<MatchedItem<I>> {
if index >= self.results.count() {
return None;
}
let result = &results[index];
let items = self.items.lock_arc();
Some(items[result.index_in_haystack].clone())
let result = &self.results[index];
Some(MatchedItem {
inner: self.items[result.index_in_haystack].clone(),
matched_string: (self.into_haystack)(
&self.items[result.index_in_haystack],
)
.to_string(),
match_indices: vec![],
})
}
pub fn result_count(&self) -> usize {
self.results.lock_arc().len()
self.results.count()
}
pub fn total_count(&self) -> usize {
self.items.lock_arc().len()
self.items.count()
}
pub fn running(&self) -> bool {