From d3c16af4e94e2f47b9e966b8bd6284392368a37b Mon Sep 17 00:00:00 2001 From: Alexandre Pasmantier <47638216+alexpasmantier@users.noreply.github.com> Date: Wed, 27 Nov 2024 22:56:30 +0100 Subject: [PATCH] fix(stdin): better handling of long running stdin streams (#81) fix: long running pipes --- .../television-channels/src/channels/stdin.rs | 59 +++++++++++++------ crates/television/app.rs | 2 +- crates/television/render.rs | 1 + 3 files changed, 44 insertions(+), 18 deletions(-) diff --git a/crates/television-channels/src/channels/stdin.rs b/crates/television-channels/src/channels/stdin.rs index b19b621..13b8927 100644 --- a/crates/television-channels/src/channels/stdin.rs +++ b/crates/television-channels/src/channels/stdin.rs @@ -1,34 +1,28 @@ -use std::io::BufRead; -use std::path::Path; +use std::{ + io::{stdin, BufRead}, + path::Path, + thread::spawn, +}; use devicons::FileIcon; +use tracing::debug; use super::OnAir; use crate::entry::{Entry, PreviewType}; -use television_fuzzy::matcher::{config::Config, Matcher}; +use television_fuzzy::matcher::{config::Config, injector::Injector, Matcher}; pub struct Channel { matcher: Matcher, icon: FileIcon, } -const NUM_THREADS: usize = 2; - impl Channel { pub fn new() -> Self { - let mut lines = Vec::new(); - for line in std::io::stdin().lock().lines().map_while(Result::ok) { - if !line.trim().is_empty() { - lines.push(line); - } - } - let matcher = Matcher::new(Config::default().n_threads(NUM_THREADS)); + let matcher = Matcher::new(Config::default()); let injector = matcher.injector(); - for line in lines.iter().rev() { - let () = injector.push(line.clone(), |e, cols| { - cols[0] = e.clone().into(); - }); - } + + spawn(move || stream_from_stdin(injector.clone())); + Self { matcher, icon: FileIcon::from("nu"), @@ -42,6 +36,37 @@ impl Default for Channel { } } +const TIMEOUT: std::time::Duration = std::time::Duration::from_secs(10); + +fn stream_from_stdin(injector: Injector) { + let mut stdin = stdin().lock(); + let mut buffer = String::new(); + + let instant = std::time::Instant::now(); + loop { + match stdin.read_line(&mut buffer) { + Ok(c) if c > 0 => { + if !buffer.trim().is_empty() { + injector.push(buffer.clone(), |e, cols| { + cols[0] = e.clone().into(); + }); + } + buffer.clear(); + } + Ok(0) => { + debug!("EOF"); + break; + } + _ => { + debug!("Error reading from stdin"); + if instant.elapsed() > TIMEOUT { + break; + } + } + } + } +} + impl OnAir for Channel { fn find(&mut self, pattern: &str) { self.matcher.find(pattern); diff --git a/crates/television/app.rs b/crates/television/app.rs index 3bdc9d1..f3e0114 100644 --- a/crates/television/app.rs +++ b/crates/television/app.rs @@ -172,7 +172,7 @@ impl App { /// # Errors /// If an error occurs during the execution of the application. pub async fn run(&mut self, is_output_tty: bool) -> Result { - info!("Starting backend event loop"); + debug!("Starting backend event loop"); let event_loop = EventLoop::new(self.tick_rate, true); self.event_rx = event_loop.rx; self.event_abort_tx = event_loop.abort_tx; diff --git a/crates/television/render.rs b/crates/television/render.rs index 93f710d..f13b95b 100644 --- a/crates/television/render.rs +++ b/crates/television/render.rs @@ -111,6 +111,7 @@ pub async fn render( tui.enter()?; } RenderingTask::Quit => { + debug!("Exiting rendering loop"); tui.exit()?; break Ok(()); }