diff --git a/src/commands/mod.rs b/src/commands/mod.rs index d303f57..f5027f9 100644 --- a/src/commands/mod.rs +++ b/src/commands/mod.rs @@ -4,11 +4,7 @@ mod compress; mod decompress; mod list; -use std::{ - ops::ControlFlow, - path::PathBuf, - sync::{Arc, Condvar, Mutex}, -}; +use std::{ops::ControlFlow, path::PathBuf}; use rayon::prelude::{IndexedParallelIterator, IntoParallelRefIterator, ParallelIterator}; use utils::colors; @@ -22,7 +18,7 @@ use crate::{ list::ListOptions, utils::{ self, - logger::{info_accessible, setup_channel, spawn_logger_thread, warning}, + logger::{info_accessible, spawn_logger_thread, warning}, to_utf, EscapedPathDisplay, FileVisibilityPolicy, }, CliArgs, QuestionPolicy, @@ -57,20 +53,9 @@ pub fn run( question_policy: QuestionPolicy, file_visibility_policy: FileVisibilityPolicy, ) -> crate::Result<()> { - let (log_receiver, dropper) = setup_channel(); - - let synchronization_pair = Arc::new((Mutex::new(false), Condvar::new())); - spawn_logger_thread(log_receiver, synchronization_pair.clone()); + let handler = spawn_logger_thread(); run_cmd(args, question_policy, file_visibility_policy)?; - - // Send a message for the logger thread to shut down. - // This is needed, otherwise the logging thread will never exit. - drop(dropper); - - // Hold the main thread from exiting until the background thread signals its shutdown. - let (lock, cvar) = &*synchronization_pair; - let guard = lock.lock().unwrap(); - let _flushed = cvar.wait(guard).unwrap(); + handler.shutdown_and_wait(); Ok(()) } diff --git a/src/utils/logger.rs b/src/utils/logger.rs index 22ac039..e985c0a 100644 --- a/src/utils/logger.rs +++ b/src/utils/logger.rs @@ -1,6 +1,6 @@ -use std::sync::{mpsc, Arc, Condvar, Mutex, OnceLock}; +use std::sync::{mpsc, OnceLock}; -pub use logger_thread::{setup_channel, spawn_logger_thread}; +pub use logger_thread::spawn_logger_thread; use super::colors::{ORANGE, RESET, YELLOW}; use crate::accessible::is_running_in_accessible_mode; @@ -49,7 +49,7 @@ pub fn warning(contents: String) { /// Message object used for sending logs from worker threads to a logging thread via channels. /// See #[derive(Debug)] -pub struct PrintMessage { +struct PrintMessage { contents: String, accessible: bool, level: MessageLevel, @@ -89,6 +89,8 @@ enum MessageLevel { } mod logger_thread { + use std::sync::{Arc, Barrier}; + use super::*; type LogReceiver = mpsc::Receiver>; @@ -97,10 +99,10 @@ mod logger_thread { static SENDER: OnceLock = OnceLock::new(); #[track_caller] - pub fn setup_channel() -> (LogReceiver, LoggerDropper) { + fn setup_channel() -> LogReceiver { let (tx, rx) = mpsc::channel(); SENDER.set(tx).expect("`setup_channel` should only be called once"); - (rx, LoggerDropper) + rx } #[track_caller] @@ -108,44 +110,8 @@ mod logger_thread { SENDER.get().expect("No sender, you need to call `setup_channel` first") } - pub fn spawn_logger_thread(log_receiver: LogReceiver, synchronization_pair: Arc<(Mutex, Condvar)>) { - rayon::spawn(move || { - const BUFFER_CAPACITY: usize = 10; - let mut buffer = Vec::::with_capacity(BUFFER_CAPACITY); - - loop { - let msg = log_receiver.recv().expect("Failed to receive log message"); - - let is_shutdown_message = msg.is_none(); - - // Append message to buffer - if let Some(msg) = msg.as_ref().and_then(PrintMessage::to_processed_message) { - buffer.push(msg); - } - - let should_flush = buffer.len() == BUFFER_CAPACITY || is_shutdown_message; - - if should_flush { - let text = buffer.join("\n"); - eprintln!("{text}"); - buffer.clear(); - } - - if is_shutdown_message { - break; - } - } - - // Wake up the main thread - let (lock, cvar) = &*synchronization_pair; - let mut flushed = lock.lock().unwrap(); - *flushed = true; - cvar.notify_one(); - }); - } - #[track_caller] - pub fn send_log_message(msg: PrintMessage) { + pub(super) fn send_log_message(msg: PrintMessage) { send_message(Some(msg)); } @@ -154,11 +120,61 @@ mod logger_thread { get_sender().send(msg).expect("Failed to send internal message"); } - pub struct LoggerDropper; + pub struct LoggerThreadHandle { + shutdown_barrier: Arc, + } - impl Drop for LoggerDropper { - fn drop(&mut self) { + impl LoggerThreadHandle { + /// Tell logger to shutdown and waits till it does. + pub fn shutdown_and_wait(self) { + // Signal the shutdown send_message(None); + // Wait for confirmation + self.shutdown_barrier.wait(); } } + + pub fn spawn_logger_thread() -> LoggerThreadHandle { + let log_receiver = setup_channel(); + + let shutdown_barrier = Arc::new(Barrier::new(2)); + + let handle = LoggerThreadHandle { + shutdown_barrier: shutdown_barrier.clone(), + }; + + rayon::spawn(move || run_logger(log_receiver, shutdown_barrier)); + + handle + } + + fn run_logger(log_receiver: LogReceiver, shutdown_barrier: Arc) { + const BUFFER_CAPACITY: usize = 10; + let mut buffer = Vec::::with_capacity(BUFFER_CAPACITY); + + loop { + let msg = log_receiver.recv().expect("Failed to receive log message"); + + let is_shutdown_message = msg.is_none(); + + // Append message to buffer + if let Some(msg) = msg.as_ref().and_then(PrintMessage::to_processed_message) { + buffer.push(msg); + } + + let should_flush = buffer.len() == BUFFER_CAPACITY || is_shutdown_message; + + if should_flush { + let text = buffer.join("\n"); + eprintln!("{text}"); + buffer.clear(); + } + + if is_shutdown_message { + break; + } + } + + shutdown_barrier.wait(); + } }