From 790277071b2738adc8ac2d97cd991f64e149cfa4 Mon Sep 17 00:00:00 2001 From: Demmie <2e3s19@gmail.com> Date: Sat, 17 Jun 2023 02:14:09 -0400 Subject: [PATCH] Add basic async support over threads --- Cargo.toml | 6 +-- src/bundle.rs | 16 +++--- src/bundle/server.rs | 70 +++++++++++-------------- src/main.rs | 39 ++++++-------- watchers/src/lib.rs | 2 +- watchers/src/watchers.rs | 109 +++++++++++++++++---------------------- 6 files changed, 109 insertions(+), 133 deletions(-) diff --git a/Cargo.toml b/Cargo.toml index 03a96d9..e5ce53a 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -33,19 +33,19 @@ fern = { version = "0.6.2", features = ["colored"] } log = { workspace = true } anyhow = { workspace = true } signal-hook = "0.3.15" +tokio = { version = "1.28.2", features = ["rt", "macros"] } ksni = {version = "0.2.0", optional = true} aw-server = { git = "https://github.com/2e3s/aw-server-rust", optional = true, rev = "1c23a55" } aw-datastore = { git = "https://github.com/2e3s/aw-server-rust", optional = true, rev = "1c23a55" } open = { version = "4.1.0", optional = true } -rust-embed = { version = "6.6.1", features = ["interpolate-folder-path"] } -tokio = { version = "1.28.2", optional = true } +rust-embed = { version = "6.6.1", features = ["interpolate-folder-path"], optional = true } [features] default = ["gnome", "kwin_window"] gnome = ["watchers/gnome"] kwin_window = ["watchers/kwin_window"] -bundle = ["ksni", "tokio", "aw-server", "aw-datastore", "open"] +bundle = ["ksni", "aw-server", "aw-datastore", "open", "rust-embed"] [package.metadata.deb] features = ["bundle"] diff --git a/src/bundle.rs b/src/bundle.rs index eea164c..d0b1fde 100644 --- a/src/bundle.rs +++ b/src/bundle.rs @@ -6,19 +6,23 @@ use std::{ path::PathBuf, sync::{atomic::AtomicBool, Arc}, }; -use watchers::config::Config; -pub fn run(config: &Config, config_file: PathBuf, no_tray: bool, is_stopped: Arc) { +pub async fn run( + host: String, + port: u32, + config_file: PathBuf, + no_tray: bool, + is_stopped: Arc, +) { if !no_tray { let service = ksni::TrayService::new(Tray { - server_host: config.host.clone(), - server_port: config.port, + server_host: host, + server_port: port, config_file, is_stopped: Arc::clone(&is_stopped), }); service.spawn(); } - let port = config.port; - server::run(port, is_stopped); + server::run(port, is_stopped).await; } diff --git a/src/bundle/server.rs b/src/bundle/server.rs index ac91880..c738628 100644 --- a/src/bundle/server.rs +++ b/src/bundle/server.rs @@ -6,47 +6,39 @@ use std::sync::{ }; use tokio::time::{sleep, Duration}; -pub fn run(port: u32, is_stopped: Arc) { - std::thread::spawn(move || { - let db_path = aw_server::dirs::db_path(false) - .map_err(|_| anyhow!("DB path is not found")) - .unwrap() - .to_str() - .unwrap() - .to_string(); - let device_id = aw_server::device_id::get_device_id(); - let mut config = aw_server::config::create_config(false); - config.address = "127.0.0.1".to_string(); - config.port = u16::try_from(port).unwrap(); +pub async fn run(port: u32, is_stopped: Arc) { + let db_path = aw_server::dirs::db_path(false) + .map_err(|_| anyhow!("DB path is not found")) + .unwrap() + .to_str() + .unwrap() + .to_string(); + let device_id = aw_server::device_id::get_device_id(); + let mut config = aw_server::config::create_config(false); + config.address = "127.0.0.1".to_string(); + config.port = u16::try_from(port).unwrap(); - let legacy_import = false; - let server_state = aw_server::endpoints::ServerState { - datastore: Mutex::new(aw_datastore::Datastore::new(db_path, legacy_import)), - asset_resolver: embed_asset_resolver!("$AW_WEBUI_DIST"), - device_id, - }; - let server = build_rocket(server_state, config).launch(); + let legacy_import = false; + let server_state = aw_server::endpoints::ServerState { + datastore: Mutex::new(aw_datastore::Datastore::new(db_path, legacy_import)), + asset_resolver: embed_asset_resolver!("$AW_WEBUI_DIST"), + device_id, + }; + let server = build_rocket(server_state, config).launch(); - let check = async { - loop { - if is_stopped.load(Ordering::Relaxed) { - warn!("Received an exit signal, stopping the server"); - break; - } - - sleep(Duration::from_secs(1)).await; + let check = async { + loop { + if is_stopped.load(Ordering::Relaxed) { + warn!("Received an exit signal, stopping the server"); + break; } - }; - tokio::runtime::Builder::new_current_thread() - .enable_all() - .build() - .unwrap() - .block_on(async { - tokio::select! ( - r = server => {r.unwrap();}, - _ = check => {}, - ); - }); - }); + sleep(Duration::from_secs(1)).await; + } + }; + + tokio::select! ( + r = server => {r.unwrap();}, + _ = check => {}, + ); } diff --git a/src/main.rs b/src/main.rs index f2b9b70..50dec18 100644 --- a/src/main.rs +++ b/src/main.rs @@ -10,10 +10,11 @@ mod config; use std::sync::atomic::AtomicBool; use std::sync::Arc; -use watchers::ConstructorFilter; +use watchers::run_first_supported; use watchers::ReportClient; -fn main() -> anyhow::Result<()> { +#[tokio::main(flavor = "current_thread")] +async fn main() -> anyhow::Result<()> { let is_stopped = Arc::new(AtomicBool::new(false)); signal_hook::flag::register(signal_hook::consts::SIGTERM, Arc::clone(&is_stopped))?; signal_hook::flag::register(signal_hook::consts::SIGINT, Arc::clone(&is_stopped))?; @@ -40,33 +41,27 @@ fn main() -> anyhow::Result<()> { config.poll_time_window.as_secs() ); - #[cfg(feature = "bundle")] - bundle::run(&config, config_file, no_tray, Arc::clone(&is_stopped)); - let client = ReportClient::new(config)?; let client = Arc::new(client); - let mut thread_handlers = Vec::new(); + let idle_handler = run_first_supported(watchers::IDLE, &client, Arc::clone(&is_stopped)); + let active_window_handler = + run_first_supported(watchers::ACTIVE_WINDOW, &client, Arc::clone(&is_stopped)); - if let Some(idle_handler) = watchers::IDLE.run_first_supported(&client, Arc::clone(&is_stopped)) + #[cfg(not(feature = "bundle"))] { - thread_handlers.push(idle_handler); - } else { - warn!("No supported idle handler is found"); + tokio::select!( + _ = idle_handler => Ok(()), + _ = active_window_handler => Ok(()), + ) } - if let Some(active_window_handler) = - watchers::ACTIVE_WINDOW.run_first_supported(&client, is_stopped) + #[cfg(feature = "bundle")] { - thread_handlers.push(active_window_handler); - } else { - warn!("No supported active window handler is found"); + tokio::select!( + _ = idle_handler => Ok(()), + _ = active_window_handler => Ok(()), + _ = bundle::run(client.config.host.clone(), client.config.port, config_file, no_tray, Arc::clone(&is_stopped)) => Ok(()), + ) } - - for handler in thread_handlers { - if handler.join().is_err() { - error!("Thread failed with error"); - } - } - Ok(()) } diff --git a/watchers/src/lib.rs b/watchers/src/lib.rs index 34a96e4..2924926 100644 --- a/watchers/src/lib.rs +++ b/watchers/src/lib.rs @@ -6,7 +6,7 @@ mod report_client; mod watchers; pub use crate::report_client::ReportClient; -pub use crate::watchers::ConstructorFilter; +pub use crate::watchers::run_first_supported; pub use crate::watchers::Watcher; pub use crate::watchers::ACTIVE_WINDOW; pub use crate::watchers::IDLE; diff --git a/watchers/src/watchers.rs b/watchers/src/watchers.rs index 21f88bd..963ac13 100644 --- a/watchers/src/watchers.rs +++ b/watchers/src/watchers.rs @@ -20,7 +20,7 @@ use std::{ atomic::{AtomicBool, Ordering}, Arc, }, - thread::{self, JoinHandle}, + thread, time::Duration, }; @@ -52,25 +52,6 @@ pub trait Watcher: Send { where Self: Sized; - fn run( - &mut self, - watcher_type: &WatcherType, - client: &Arc, - is_stopped: Arc, - ) { - info!("Starting {watcher_type} watcher"); - loop { - if is_stopped.load(Ordering::Relaxed) { - warn!("Received an exit signal, shutting down {watcher_type}"); - break; - } - if let Err(e) = self.run_iteration(client) { - error!("Error on {watcher_type} iteration: {e}"); - } - thread::sleep(watcher_type.sleep_time(&client.config)); - } - } - fn run_iteration(&mut self, client: &Arc) -> anyhow::Result<()>; } @@ -81,51 +62,55 @@ type WatcherConstructors = [( fn(&Arc) -> anyhow::Result, )]; -pub trait ConstructorFilter { - fn filter_first_supported( - &self, - client: &Arc, - ) -> Option<(&WatcherType, BoxedWatcher)>; - - fn run_first_supported( - &'static self, - client: &Arc, - is_stopped: Arc, - ) -> Option>; +pub fn filter_first_supported( + watcher_constructors: &'static WatcherConstructors, + client: &Arc, +) -> Option<(&'static WatcherType, BoxedWatcher)> { + watcher_constructors + .iter() + .find_map(|(name, watcher_type, watcher)| match watcher(client) { + Ok(watcher) => { + info!("Selected {name} as {watcher_type} watcher"); + Some((watcher_type, watcher)) + } + Err(e) => { + debug!("{name} cannot run: {e}"); + None + } + }) } -impl ConstructorFilter for WatcherConstructors { - fn filter_first_supported( - &self, - client: &Arc, - ) -> Option<(&WatcherType, BoxedWatcher)> { - self.iter() - .find_map(|(name, watcher_type, watcher)| match watcher(client) { - Ok(watcher) => { - info!("Selected {name} as {watcher_type} watcher"); - Some((watcher_type, watcher)) - } - Err(e) => { - debug!("{name} cannot run: {e}"); - None - } - }) - } - - fn run_first_supported( - &'static self, - client: &Arc, - is_stopped: Arc, - ) -> Option> { - let idle_watcher = self.filter_first_supported(client); - if let Some((watcher_type, mut watcher)) = idle_watcher { - let thread_client = Arc::clone(client); - let idle_handler = - thread::spawn(move || watcher.run(watcher_type, &thread_client, is_stopped)); - Some(idle_handler) - } else { - None +async fn run_watcher( + watcher: &mut Box, + watcher_type: &WatcherType, + client: &Arc, + is_stopped: Arc, +) { + info!("Starting {watcher_type} watcher"); + loop { + if is_stopped.load(Ordering::Relaxed) { + warn!("Received an exit signal, shutting down {watcher_type}"); + break; } + if let Err(e) = watcher.run_iteration(client) { + error!("Error on {watcher_type} iteration: {e}"); + } + thread::sleep(watcher_type.sleep_time(&client.config)); + } +} + +pub async fn run_first_supported( + watcher_constructors: &'static WatcherConstructors, + client: &Arc, + is_stopped: Arc, +) -> bool { + let supported_watcher = filter_first_supported(watcher_constructors, client); + if let Some((watcher_type, mut watcher)) = supported_watcher { + let thread_client = Arc::clone(client); + run_watcher(&mut watcher, watcher_type, &thread_client, is_stopped).await; + true + } else { + false } }