From 10d54856c17085ece82317723c94c96ccef9b58d Mon Sep 17 00:00:00 2001 From: Demmie <2e3s19@gmail.com> Date: Sun, 2 Jul 2023 14:34:13 -0400 Subject: [PATCH] Move the code to async instead of threads --- Cargo.lock | 20 +-- Cargo.toml | 4 +- src/bundle.rs | 12 +- src/bundle/menu.rs | 16 +- src/bundle/server.rs | 26 +-- src/main.rs | 52 ++++-- watchers/Cargo.toml | 4 +- watchers/src/lib.rs | 4 +- watchers/src/report_client.rs | 15 +- watchers/src/watchers.rs | 148 +++++++----------- watchers/src/watchers/gnome_idle.rs | 8 +- watchers/src/watchers/gnome_window.rs | 36 +++-- watchers/src/watchers/idle.rs | 18 ++- watchers/src/watchers/kwin_window.rs | 20 ++- watchers/src/watchers/wl_foreign_toplevel.rs | 11 +- watchers/src/watchers/wl_kwin_idle.rs | 50 +++--- watchers/src/watchers/x11_screensaver_idle.rs | 9 +- watchers/src/watchers/x11_window.rs | 11 +- 18 files changed, 228 insertions(+), 236 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index 4ac9b37..797d3d6 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -291,14 +291,15 @@ checksum = "d468802bab17cbc0cc575e9b053f41e72aa36bfa6b7f55e3529ffa43161b97fa" [[package]] name = "aw-client-rust" version = "0.1.0" -source = "git+https://github.com/ActivityWatch/aw-server-rust#49b2026b2b61f8d8d017e93dca1d3754dcfef0d5" +source = "git+https://github.com/ActivityWatch/aw-server-rust?rev=d4253ed#d4253ed64dc6547a821190cc29bf7dca72c616d1" dependencies = [ - "aw-models 0.1.0 (git+https://github.com/ActivityWatch/aw-server-rust)", + "aw-models 0.1.0 (git+https://github.com/ActivityWatch/aw-server-rust?rev=d4253ed)", "chrono", "gethostname 0.4.1", "reqwest", "serde", "serde_json", + "tokio", ] [[package]] @@ -332,7 +333,7 @@ dependencies = [ [[package]] name = "aw-models" version = "0.1.0" -source = "git+https://github.com/ActivityWatch/aw-server-rust#49b2026b2b61f8d8d017e93dca1d3754dcfef0d5" +source = "git+https://github.com/ActivityWatch/aw-server-rust?rev=d4253ed#d4253ed64dc6547a821190cc29bf7dca72c616d1" dependencies = [ "chrono", "log", @@ -418,7 +419,6 @@ dependencies = [ "log", "open", "rust-embed", - "signal-hook", "tokio", "toml 0.7.3", "watchers", @@ -3146,16 +3146,6 @@ dependencies = [ "dirs 4.0.0", ] -[[package]] -name = "signal-hook" -version = "0.3.15" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "732768f1176d21d09e076c23a93123d40bba92d50c4058da34d45c8de8e682b9" -dependencies = [ - "libc", - "signal-hook-registry", -] - [[package]] name = "signal-hook-registry" version = "1.4.1" @@ -3848,6 +3838,7 @@ name = "watchers" version = "0.1.1" dependencies = [ "anyhow", + "async-trait", "aw-client-rust", "chrono", "dirs 5.0.0", @@ -3858,6 +3849,7 @@ dependencies = [ "serde", "serde_default", "serde_json", + "tokio", "toml 0.7.3", "wayland-backend", "wayland-client", diff --git a/Cargo.toml b/Cargo.toml index e5ce53a..788b68a 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -23,6 +23,7 @@ version = "0.1.1" [workspace.dependencies] anyhow = "1.0.70" log = { version = "0.4.17", features = ["std"] } +tokio = { version = "1.28.2" } [dependencies] watchers = { path = "./watchers", default-features = false } @@ -32,8 +33,7 @@ clap = { version = "4.2.1", features = ["string"] } fern = { version = "0.6.2", features = ["colored"] } log = { workspace = true } anyhow = { workspace = true } -signal-hook = "0.3.15" -tokio = { version = "1.28.2", features = ["rt", "macros"] } +tokio = { workspace = true, features = ["rt", "macros", "signal"] } ksni = {version = "0.2.0", optional = true} aw-server = { git = "https://github.com/2e3s/aw-server-rust", optional = true, rev = "1c23a55" } diff --git a/src/bundle.rs b/src/bundle.rs index d0b1fde..16a3295 100644 --- a/src/bundle.rs +++ b/src/bundle.rs @@ -2,27 +2,25 @@ mod menu; mod server; pub use menu::Tray; -use std::{ - path::PathBuf, - sync::{atomic::AtomicBool, Arc}, -}; +use std::path::PathBuf; +use tokio::sync::mpsc::UnboundedSender; pub async fn run( host: String, port: u32, config_file: PathBuf, no_tray: bool, - is_stopped: Arc, + shutdown_sender: UnboundedSender<()>, ) { if !no_tray { let service = ksni::TrayService::new(Tray { server_host: host, server_port: port, config_file, - is_stopped: Arc::clone(&is_stopped), + shutdown_sender, }); service.spawn(); } - server::run(port, is_stopped).await; + server::run(port).await; } diff --git a/src/bundle/menu.rs b/src/bundle/menu.rs index 6a94dc9..4892f8a 100644 --- a/src/bundle/menu.rs +++ b/src/bundle/menu.rs @@ -1,17 +1,13 @@ -use std::{ - path::PathBuf, - sync::{ - atomic::{AtomicBool, Ordering}, - Arc, - }, -}; +use std::path::PathBuf; + +use tokio::sync::mpsc::UnboundedSender; #[derive(Debug)] pub struct Tray { pub server_host: String, pub server_port: u32, pub config_file: PathBuf, - pub is_stopped: Arc, + pub shutdown_sender: UnboundedSender<()>, } impl ksni::Tray for Tray { @@ -59,10 +55,10 @@ impl ksni::Tray for Tray { label: "Exit".into(), icon_name: "application-exit".into(), activate: { - let is_stopped = Arc::clone(&self.is_stopped); + let shutdown_sender = self.shutdown_sender.clone(); Box::new(move |_| { - is_stopped.store(true, Ordering::Relaxed); + shutdown_sender.send(()).unwrap(); }) }, ..Default::default() diff --git a/src/bundle/server.rs b/src/bundle/server.rs index c738628..2348c8b 100644 --- a/src/bundle/server.rs +++ b/src/bundle/server.rs @@ -1,12 +1,8 @@ use anyhow::anyhow; use aw_server::endpoints::{build_rocket, embed_asset_resolver}; -use std::sync::{ - atomic::{AtomicBool, Ordering}, - Arc, Mutex, -}; -use tokio::time::{sleep, Duration}; +use std::sync::Mutex; -pub async fn run(port: u32, is_stopped: Arc) { +pub async fn run(port: u32) { let db_path = aw_server::dirs::db_path(false) .map_err(|_| anyhow!("DB path is not found")) .unwrap() @@ -24,21 +20,5 @@ pub async fn run(port: u32, is_stopped: Arc) { asset_resolver: embed_asset_resolver!("$AW_WEBUI_DIST"), device_id, }; - let server = build_rocket(server_state, config).launch(); - - let check = async { - loop { - if is_stopped.load(Ordering::Relaxed) { - warn!("Received an exit signal, stopping the server"); - break; - } - - sleep(Duration::from_secs(1)).await; - } - }; - - tokio::select! ( - r = server => {r.unwrap();}, - _ = check => {}, - ); + build_rocket(server_state, config).launch().await.unwrap(); } diff --git a/src/main.rs b/src/main.rs index 50dec18..88b7b33 100644 --- a/src/main.rs +++ b/src/main.rs @@ -8,17 +8,14 @@ extern crate log; mod bundle; mod config; -use std::sync::atomic::AtomicBool; use std::sync::Arc; -use watchers::run_first_supported; -use watchers::ReportClient; +use tokio::signal::unix::{signal, SignalKind}; +#[cfg(feature = "bundle")] +use tokio::sync::mpsc; +use watchers::{run_first_supported, ReportClient, WatcherType}; #[tokio::main(flavor = "current_thread")] async fn main() -> anyhow::Result<()> { - let is_stopped = Arc::new(AtomicBool::new(false)); - signal_hook::flag::register(signal_hook::consts::SIGTERM, Arc::clone(&is_stopped))?; - signal_hook::flag::register(signal_hook::consts::SIGINT, Arc::clone(&is_stopped))?; - let config = config::from_cli()?; #[cfg(feature = "bundle")] let no_tray = config.no_tray; @@ -40,28 +37,49 @@ async fn main() -> anyhow::Result<()> { "Window polling period: {} seconds", config.poll_time_window.as_secs() ); + #[cfg(feature = "bundle")] + let (shutdown_send, mut shutdown_recv) = mpsc::unbounded_channel(); + #[cfg(feature = "bundle")] + let bundle_handle = tokio::spawn(bundle::run( + config.host.clone(), + config.port, + config_file, + no_tray, + shutdown_send, + )); - let client = ReportClient::new(config)?; - let client = Arc::new(client); + let client = Arc::new(ReportClient::new(config).await?); - let idle_handler = run_first_supported(watchers::IDLE, &client, Arc::clone(&is_stopped)); - let active_window_handler = - run_first_supported(watchers::ACTIVE_WINDOW, &client, Arc::clone(&is_stopped)); + let idle_future = run_first_supported(Arc::clone(&client), &WatcherType::Idle); + let active_window_future = run_first_supported(Arc::clone(&client), &WatcherType::ActiveWindow); + let sigterm = async { + signal(SignalKind::terminate()).unwrap().recv().await; + warn!("Caught SIGTERM, shutting down..."); + }; + let sigint = async { + signal(SignalKind::interrupt()).unwrap().recv().await; + warn!("Caught SIGINT, shutting down..."); + }; #[cfg(not(feature = "bundle"))] { tokio::select!( - _ = idle_handler => Ok(()), - _ = active_window_handler => Ok(()), + _ = tokio::spawn(idle_future) => Ok(()), + _ = tokio::spawn(active_window_future) => Ok(()), + _ = sigterm => Ok(()), + _ = sigint => Ok(()), ) } #[cfg(feature = "bundle")] { tokio::select!( - _ = idle_handler => Ok(()), - _ = active_window_handler => Ok(()), - _ = bundle::run(client.config.host.clone(), client.config.port, config_file, no_tray, Arc::clone(&is_stopped)) => Ok(()), + _ = bundle_handle => Ok(()), + _ = tokio::spawn(idle_future) => Ok(()), + _ = tokio::spawn(active_window_future) => Ok(()), + _ = sigterm => Ok(()), + _ = sigint => Ok(()), + _ = shutdown_recv.recv() => Ok(()), ) } } diff --git a/watchers/Cargo.toml b/watchers/Cargo.toml index e85b89c..396e94d 100644 --- a/watchers/Cargo.toml +++ b/watchers/Cargo.toml @@ -13,7 +13,7 @@ path = "src/lib.rs" rstest = "0.17.0" [dependencies] -aw-client-rust = { git = "https://github.com/ActivityWatch/aw-server-rust" } +aw-client-rust = { git = "https://github.com/ActivityWatch/aw-server-rust", rev = "d4253ed" } wayland-client = "0.30.1" wayland-scanner = "0.30" wayland-backend = "0.1" @@ -29,6 +29,8 @@ regex = "1.8.1" gethostname = "0.4.1" log = { workspace = true } anyhow = { workspace = true } +async-trait = "0.1.68" +tokio = { workspace = true, features = ["time", "sync"] } [features] default = ["gnome", "kwin_window"] diff --git a/watchers/src/lib.rs b/watchers/src/lib.rs index 2924926..174d5ae 100644 --- a/watchers/src/lib.rs +++ b/watchers/src/lib.rs @@ -7,6 +7,4 @@ mod watchers; pub use crate::report_client::ReportClient; pub use crate::watchers::run_first_supported; -pub use crate::watchers::Watcher; -pub use crate::watchers::ACTIVE_WINDOW; -pub use crate::watchers::IDLE; +pub use crate::watchers::WatcherType; diff --git a/watchers/src/report_client.rs b/watchers/src/report_client.rs index a033f3d..85c12de 100644 --- a/watchers/src/report_client.rs +++ b/watchers/src/report_client.rs @@ -12,15 +12,15 @@ pub struct ReportClient { } impl ReportClient { - pub fn new(config: Config) -> anyhow::Result { + pub async fn new(config: Config) -> anyhow::Result { let client = AwClient::new(&config.host, &config.port.to_string(), "awatcher"); let hostname = gethostname::gethostname().into_string().unwrap(); let idle_bucket_name = format!("aw-watcher-afk_{hostname}"); let active_window_bucket_name = format!("aw-watcher-window_{hostname}"); if !config.no_server { - Self::create_bucket(&client, &idle_bucket_name, "afkstatus")?; - Self::create_bucket(&client, &active_window_bucket_name, "currentwindow")?; + Self::create_bucket(&client, &idle_bucket_name, "afkstatus").await?; + Self::create_bucket(&client, &active_window_bucket_name, "currentwindow").await?; } Ok(Self { @@ -31,7 +31,7 @@ impl ReportClient { }) } - pub fn ping( + pub async fn ping( &self, is_idle: bool, timestamp: DateTime, @@ -57,10 +57,11 @@ impl ReportClient { let pulsetime = (self.config.idle_timeout + self.config.poll_time_idle).as_secs_f64(); self.client .heartbeat(&self.idle_bucket_name, &event, pulsetime) + .await .with_context(|| "Failed to send heartbeat") } - pub fn send_active_window(&self, app_id: &str, title: &str) -> anyhow::Result<()> { + pub async fn send_active_window(&self, app_id: &str, title: &str) -> anyhow::Result<()> { let mut data = Map::new(); let replacement = self.config.window_data_replacement(app_id, title); @@ -97,16 +98,18 @@ impl ReportClient { let interval_margin = self.config.poll_time_idle.as_secs_f64() + 1.0; self.client .heartbeat(&self.active_window_bucket_name, &event, interval_margin) + .await .with_context(|| "Failed to send heartbeat for active window") } - fn create_bucket( + async fn create_bucket( client: &AwClient, bucket_name: &str, bucket_type: &str, ) -> anyhow::Result<()> { client .create_bucket_simple(bucket_name, bucket_type) + .await .with_context(|| format!("Failed to create bucket {bucket_name}")) } } diff --git a/watchers/src/watchers.rs b/watchers/src/watchers.rs index 963ac13..080bd06 100644 --- a/watchers/src/watchers.rs +++ b/watchers/src/watchers.rs @@ -14,15 +14,9 @@ mod x11_screensaver_idle; mod x11_window; use crate::{config::Config, report_client::ReportClient}; -use std::{ - fmt::Display, - sync::{ - atomic::{AtomicBool, Ordering}, - Arc, - }, - thread, - time::Duration, -}; +use async_trait::async_trait; +use std::{fmt::Display, sync::Arc}; +use tokio::time; pub enum WatcherType { Idle, @@ -30,10 +24,10 @@ pub enum WatcherType { } impl WatcherType { - fn sleep_time(&self, config: &Config) -> Duration { + fn sleep_time(&self, config: &Config) -> time::Duration { match self { WatcherType::Idle => config.poll_time_idle, - WatcherType::ActiveWindow => config.poll_time_idle, + WatcherType::ActiveWindow => config.poll_time_window, } } } @@ -47,97 +41,69 @@ impl Display for WatcherType { } } +#[async_trait] pub trait Watcher: Send { - fn new(client: &Arc) -> anyhow::Result + async fn new(client: &Arc) -> anyhow::Result where Self: Sized; - fn run_iteration(&mut self, client: &Arc) -> anyhow::Result<()>; + async fn run_iteration(&mut self, client: &Arc) -> anyhow::Result<()>; } -type BoxedWatcher = Box; -type WatcherConstructors = [( - &'static str, - WatcherType, - fn(&Arc) -> anyhow::Result, -)]; - -pub fn filter_first_supported( - watcher_constructors: &'static WatcherConstructors, - client: &Arc, -) -> Option<(&'static WatcherType, BoxedWatcher)> { - watcher_constructors - .iter() - .find_map(|(name, watcher_type, watcher)| match watcher(client) { +macro_rules! watch { + ($client:expr, $watcher_type:expr, $watcher_struct:ty) => { + match <$watcher_struct>::new($client).await { Ok(watcher) => { - info!("Selected {name} as {watcher_type} watcher"); - Some((watcher_type, watcher)) + info!( + "Selected {} as {} watcher", + stringify!($watcher_struct), + $watcher_type + ); + return Some(Box::new(watcher)); } Err(e) => { - debug!("{name} cannot run: {e}"); - None + debug!("{} cannot run: {e}", stringify!($watcher_struct)); } - }) -} - -async fn run_watcher( - watcher: &mut Box, - watcher_type: &WatcherType, - client: &Arc, - is_stopped: Arc, -) { - info!("Starting {watcher_type} watcher"); - loop { - if is_stopped.load(Ordering::Relaxed) { - warn!("Received an exit signal, shutting down {watcher_type}"); - break; - } - if let Err(e) = watcher.run_iteration(client) { - error!("Error on {watcher_type} iteration: {e}"); - } - thread::sleep(watcher_type.sleep_time(&client.config)); - } -} - -pub async fn run_first_supported( - watcher_constructors: &'static WatcherConstructors, - client: &Arc, - is_stopped: Arc, -) -> bool { - let supported_watcher = filter_first_supported(watcher_constructors, client); - if let Some((watcher_type, mut watcher)) = supported_watcher { - let thread_client = Arc::clone(client); - run_watcher(&mut watcher, watcher_type, &thread_client, is_stopped).await; - true - } else { - false - } -} - -macro_rules! watcher { - ($watcher_struct:ty, $watcher_type:expr) => { - (stringify!($watcher_struct), $watcher_type, |client| { - Ok(Box::new(<$watcher_struct>::new(client)?)) - }) + }; }; } -pub const IDLE: &WatcherConstructors = &[ - watcher!(wl_kwin_idle::IdleWatcher, WatcherType::Idle), - watcher!(x11_screensaver_idle::IdleWatcher, WatcherType::Idle), - #[cfg(feature = "gnome")] - watcher!(gnome_idle::IdleWatcher, WatcherType::Idle), -]; +async fn filter_first_supported( + client: &Arc, + watcher_type: &WatcherType, +) -> Option> { + match watcher_type { + WatcherType::Idle => { + watch!(client, watcher_type, wl_kwin_idle::IdleWatcher); + watch!(client, watcher_type, x11_screensaver_idle::IdleWatcher); + #[cfg(feature = "gnome")] + watch!(client, watcher_type, gnome_idle::IdleWatcher); + } + WatcherType::ActiveWindow => { + watch!(client, watcher_type, wl_foreign_toplevel::WindowWatcher); + // XWayland gives _NET_WM_NAME on some windows in KDE, but not on others + #[cfg(feature = "kwin_window")] + watch!(client, watcher_type, kwin_window::WindowWatcher); + watch!(client, watcher_type, x11_window::WindowWatcher); + #[cfg(feature = "gnome")] + watch!(client, watcher_type, gnome_window::WindowWatcher); + } + }; -pub const ACTIVE_WINDOW: &WatcherConstructors = &[ - watcher!( - wl_foreign_toplevel::WindowWatcher, - WatcherType::ActiveWindow - ), - // XWayland gives _NET_WM_NAME on some windows in KDE, but not on others - #[cfg(feature = "kwin_window")] - watcher!(kwin_window::WindowWatcher, WatcherType::ActiveWindow), - watcher!(x11_window::WindowWatcher, WatcherType::ActiveWindow), - #[cfg(feature = "gnome")] - watcher!(gnome_window::WindowWatcher, WatcherType::ActiveWindow), -]; + None +} + +pub async fn run_first_supported(client: Arc, watcher_type: &WatcherType) -> bool { + let supported_watcher = filter_first_supported(&client, watcher_type).await; + if let Some(mut watcher) = supported_watcher { + info!("Starting {watcher_type} watcher"); + loop { + if let Err(e) = watcher.run_iteration(&client).await { + error!("Error on {watcher_type} iteration: {e}"); + } + time::sleep(watcher_type.sleep_time(&client.config)).await; + } + } + + false +} diff --git a/watchers/src/watchers/gnome_idle.rs b/watchers/src/watchers/gnome_idle.rs index 9c8cb5e..f59addd 100644 --- a/watchers/src/watchers/gnome_idle.rs +++ b/watchers/src/watchers/gnome_idle.rs @@ -1,6 +1,7 @@ use super::{idle, Watcher}; use crate::report_client::ReportClient; use anyhow::Context; +use async_trait::async_trait; use std::sync::Arc; use zbus::blocking::Connection; @@ -25,8 +26,9 @@ impl idle::SinceLastInput for IdleWatcher { } } +#[async_trait] impl Watcher for IdleWatcher { - fn new(_: &Arc) -> anyhow::Result { + async fn new(_: &Arc) -> anyhow::Result { let mut watcher = Self { dbus_connection: Connection::session()?, is_idle: false, @@ -36,8 +38,8 @@ impl Watcher for IdleWatcher { Ok(watcher) } - fn run_iteration(&mut self, client: &Arc) -> anyhow::Result<()> { - self.is_idle = idle::ping_since_last_input(self, self.is_idle, client)?; + async fn run_iteration(&mut self, client: &Arc) -> anyhow::Result<()> { + self.is_idle = idle::ping_since_last_input(self, self.is_idle, client).await?; Ok(()) } diff --git a/watchers/src/watchers/gnome_window.rs b/watchers/src/watchers/gnome_window.rs index 50d5de9..8769369 100644 --- a/watchers/src/watchers/gnome_window.rs +++ b/watchers/src/watchers/gnome_window.rs @@ -1,8 +1,9 @@ use crate::report_client::ReportClient; use anyhow::Context; +use async_trait::async_trait; use serde::Deserialize; use std::sync::Arc; -use zbus::blocking::Connection; +use zbus::Connection; use super::Watcher; @@ -19,14 +20,17 @@ struct WindowData { } impl WindowWatcher { - fn get_window_data(&self) -> anyhow::Result { - let call_response = self.dbus_connection.call_method( - Some("org.gnome.Shell"), - "/org/gnome/shell/extensions/FocusedWindow", - Some("org.gnome.shell.extensions.FocusedWindow"), - "Get", - &(), - ); + async fn get_window_data(&self) -> anyhow::Result { + let call_response = self + .dbus_connection + .call_method( + Some("org.gnome.Shell"), + "/org/gnome/shell/extensions/FocusedWindow", + Some("org.gnome.shell.extensions.FocusedWindow"), + "Get", + &(), + ) + .await; match call_response { Ok(json) => { @@ -48,8 +52,8 @@ impl WindowWatcher { } } - fn send_active_window(&mut self, client: &ReportClient) -> anyhow::Result<()> { - let data = self.get_window_data(); + async fn send_active_window(&mut self, client: &ReportClient) -> anyhow::Result<()> { + let data = self.get_window_data().await; if let Err(e) = data { if e.to_string().contains("Object does not exist at path") { trace!("The extension seems to have stopped"); @@ -70,14 +74,16 @@ impl WindowWatcher { client .send_active_window(&self.last_app_id, &self.last_title) + .await .with_context(|| "Failed to send heartbeat for active window") } } +#[async_trait] impl Watcher for WindowWatcher { - fn new(_: &Arc) -> anyhow::Result { + async fn new(_: &Arc) -> anyhow::Result { let watcher = Self { - dbus_connection: Connection::session()?, + dbus_connection: Connection::session().await?, last_app_id: String::new(), last_title: String::new(), }; @@ -85,7 +91,7 @@ impl Watcher for WindowWatcher { Ok(watcher) } - fn run_iteration(&mut self, client: &Arc) -> anyhow::Result<()> { - self.send_active_window(client) + async fn run_iteration(&mut self, client: &Arc) -> anyhow::Result<()> { + self.send_active_window(client).await } } diff --git a/watchers/src/watchers/idle.rs b/watchers/src/watchers/idle.rs index 85d3558..d9bc08b 100644 --- a/watchers/src/watchers/idle.rs +++ b/watchers/src/watchers/idle.rs @@ -6,7 +6,7 @@ pub trait SinceLastInput { fn seconds_since_input(&mut self) -> anyhow::Result; } -pub fn ping_since_last_input( +pub async fn ping_since_last_input( watcher: &mut impl SinceLastInput, is_idle: bool, client: &Arc, @@ -24,24 +24,28 @@ pub fn ping_since_last_input( if is_idle && u64::from(seconds_since_input) < client.config.idle_timeout.as_secs() { debug!("No longer idle"); - client.ping(is_idle, last_input, duration_zero)?; + client.ping(is_idle, last_input, duration_zero).await?; is_idle_again = false; // ping with timestamp+1ms with the next event (to ensure the latest event gets retrieved by get_event) - client.ping(is_idle, last_input + duration_1ms, duration_zero)?; + client + .ping(is_idle, last_input + duration_1ms, duration_zero) + .await?; } else if !is_idle && u64::from(seconds_since_input) >= client.config.idle_timeout.as_secs() { debug!("Idle again"); - client.ping(is_idle, last_input, duration_zero)?; + client.ping(is_idle, last_input, duration_zero).await?; is_idle_again = true; // ping with timestamp+1ms with the next event (to ensure the latest event gets retrieved by get_event) - client.ping(is_idle, last_input + duration_1ms, time_since_input)?; + client + .ping(is_idle, last_input + duration_1ms, time_since_input) + .await?; } else { // Send a heartbeat if no state change was made if is_idle { trace!("Reporting as idle"); - client.ping(is_idle, last_input, time_since_input)?; + client.ping(is_idle, last_input, time_since_input).await?; } else { trace!("Reporting as not idle"); - client.ping(is_idle, last_input, duration_zero)?; + client.ping(is_idle, last_input, duration_zero).await?; } } diff --git a/watchers/src/watchers/kwin_window.rs b/watchers/src/watchers/kwin_window.rs index 4b6e2cc..5855ad8 100644 --- a/watchers/src/watchers/kwin_window.rs +++ b/watchers/src/watchers/kwin_window.rs @@ -6,10 +6,12 @@ use super::Watcher; use crate::report_client::ReportClient; use anyhow::{anyhow, Context}; +use async_trait::async_trait; use std::env::temp_dir; use std::path::Path; -use std::sync::{mpsc::channel, Arc, Mutex}; +use std::sync::{mpsc::channel, Arc}; use std::thread; +use tokio::sync::Mutex; use zbus::blocking::{Connection, ConnectionBuilder}; use zbus::dbus_interface; @@ -112,14 +114,15 @@ impl Drop for KWinScript { } } -fn send_active_window( +async fn send_active_window( client: &ReportClient, active_window: &Arc>, ) -> anyhow::Result<()> { - let active_window = active_window.lock().expect("Lock cannot be acquired"); + let active_window = active_window.lock().await; client .send_active_window(&active_window.resource_class, &active_window.caption) + .await .with_context(|| "Failed to send heartbeat for active window") } @@ -135,14 +138,14 @@ struct ActiveWindowInterface { #[dbus_interface(name = "com._2e3s.Awatcher")] impl ActiveWindowInterface { - fn notify_active_window( + async fn notify_active_window( &mut self, caption: String, resource_class: String, resource_name: String, ) { debug!("Active window class: \"{resource_class}\", name: \"{resource_name}\", caption: \"{caption}\""); - let mut active_window = self.active_window.lock().unwrap(); + let mut active_window = self.active_window.lock().await; active_window.caption = caption; active_window.resource_class = resource_class; active_window.resource_name = resource_name; @@ -155,8 +158,9 @@ pub struct WindowWatcher { _kwin_script: KWinScript, } +#[async_trait] impl Watcher for WindowWatcher { - fn new(_: &Arc) -> anyhow::Result { + async fn new(_: &Arc) -> anyhow::Result { let mut kwin_script = KWinScript::new(Connection::session()?); if kwin_script.is_loaded()? { debug!("KWin script is already loaded, unloading"); @@ -201,7 +205,7 @@ impl Watcher for WindowWatcher { }) } - fn run_iteration(&mut self, client: &Arc) -> anyhow::Result<()> { - send_active_window(client, &self.active_window) + async fn run_iteration(&mut self, client: &Arc) -> anyhow::Result<()> { + send_active_window(client, &self.active_window).await } } diff --git a/watchers/src/watchers/wl_foreign_toplevel.rs b/watchers/src/watchers/wl_foreign_toplevel.rs index 4de86fc..88e3133 100644 --- a/watchers/src/watchers/wl_foreign_toplevel.rs +++ b/watchers/src/watchers/wl_foreign_toplevel.rs @@ -8,6 +8,7 @@ use super::wl_connection::WlEventConnection; use super::{wl_connection::subscribe_state, Watcher}; use crate::report_client::ReportClient; use anyhow::{anyhow, Context}; +use async_trait::async_trait; use std::collections::HashMap; use std::sync::Arc; use wayland_client::{ @@ -117,7 +118,7 @@ pub struct WindowWatcher { } impl WindowWatcher { - fn send_active_window(&self, client: &Arc) -> anyhow::Result<()> { + async fn send_active_window(&self, client: &Arc) -> anyhow::Result<()> { let active_window_id = self .toplevel_state .current_window_id @@ -133,12 +134,14 @@ impl WindowWatcher { client .send_active_window(&active_window.app_id, &active_window.title) + .await .with_context(|| "Failed to send heartbeat for active window") } } +#[async_trait] impl Watcher for WindowWatcher { - fn new(_: &Arc) -> anyhow::Result { + async fn new(_: &Arc) -> anyhow::Result { let mut connection: WlEventConnection = WlEventConnection::connect()?; connection.get_foreign_toplevel_manager()?; @@ -155,12 +158,12 @@ impl Watcher for WindowWatcher { }) } - fn run_iteration(&mut self, client: &Arc) -> anyhow::Result<()> { + async fn run_iteration(&mut self, client: &Arc) -> anyhow::Result<()> { self.connection .event_queue .roundtrip(&mut self.toplevel_state) .map_err(|e| anyhow!("Event queue is not processed: {e}"))?; - self.send_active_window(client) + self.send_active_window(client).await } } diff --git a/watchers/src/watchers/wl_kwin_idle.rs b/watchers/src/watchers/wl_kwin_idle.rs index 3b8350e..232d706 100644 --- a/watchers/src/watchers/wl_kwin_idle.rs +++ b/watchers/src/watchers/wl_kwin_idle.rs @@ -3,6 +3,7 @@ use super::wl_connection::{subscribe_state, WlEventConnection}; use super::Watcher; use crate::report_client::ReportClient; use anyhow::anyhow; +use async_trait::async_trait; use chrono::{DateTime, Duration, Utc}; use std::sync::Arc; use wayland_client::{ @@ -52,7 +53,7 @@ impl IdleState { debug!("Resumed"); } - fn send_ping(&mut self, client: &Arc) -> anyhow::Result<()> { + async fn send_ping(&mut self, client: &Arc) -> anyhow::Result<()> { let now = Utc::now(); if !self.is_idle { self.last_input_time = now; @@ -61,30 +62,42 @@ impl IdleState { if self.is_changed { let result = if self.is_idle { debug!("Reporting as changed to idle"); - client.ping(false, self.last_input_time, Duration::zero())?; - client.ping( - true, - self.last_input_time + Duration::milliseconds(1), - Duration::zero(), - ) + client + .ping(false, self.last_input_time, Duration::zero()) + .await?; + client + .ping( + true, + self.last_input_time + Duration::milliseconds(1), + Duration::zero(), + ) + .await } else { debug!("Reporting as no longer idle"); - client.ping(true, self.last_input_time, Duration::zero())?; - client.ping( - false, - self.last_input_time + Duration::milliseconds(1), - Duration::zero(), - ) + client + .ping(true, self.last_input_time, Duration::zero()) + .await?; + client + .ping( + false, + self.last_input_time + Duration::milliseconds(1), + Duration::zero(), + ) + .await }; self.is_changed = false; result } else if self.is_idle { trace!("Reporting as idle"); - client.ping(true, self.last_input_time, now - self.last_input_time) + client + .ping(true, self.last_input_time, now - self.last_input_time) + .await } else { trace!("Reporting as not idle"); - client.ping(false, self.last_input_time, Duration::zero()) + client + .ping(false, self.last_input_time, Duration::zero()) + .await } } } @@ -116,8 +129,9 @@ pub struct IdleWatcher { idle_state: IdleState, } +#[async_trait] impl Watcher for IdleWatcher { - fn new(client: &Arc) -> anyhow::Result { + async fn new(client: &Arc) -> anyhow::Result { let mut connection: WlEventConnection = WlEventConnection::connect()?; connection.get_kwin_idle()?; @@ -132,12 +146,12 @@ impl Watcher for IdleWatcher { }) } - fn run_iteration(&mut self, client: &Arc) -> anyhow::Result<()> { + async fn run_iteration(&mut self, client: &Arc) -> anyhow::Result<()> { self.connection .event_queue .roundtrip(&mut self.idle_state) .map_err(|e| anyhow!("Event queue is not processed: {e}"))?; - self.idle_state.send_ping(client) + self.idle_state.send_ping(client).await } } diff --git a/watchers/src/watchers/x11_screensaver_idle.rs b/watchers/src/watchers/x11_screensaver_idle.rs index 4964f29..0349393 100644 --- a/watchers/src/watchers/x11_screensaver_idle.rs +++ b/watchers/src/watchers/x11_screensaver_idle.rs @@ -1,3 +1,5 @@ +use async_trait::async_trait; + use super::{idle, x11_connection::X11Client, Watcher}; use crate::report_client::ReportClient; use std::sync::Arc; @@ -13,8 +15,9 @@ impl idle::SinceLastInput for IdleWatcher { } } +#[async_trait] impl Watcher for IdleWatcher { - fn new(_: &Arc) -> anyhow::Result { + async fn new(_: &Arc) -> anyhow::Result { let mut client = X11Client::new()?; // Check if screensaver extension is supported @@ -26,8 +29,8 @@ impl Watcher for IdleWatcher { }) } - fn run_iteration(&mut self, client: &Arc) -> anyhow::Result<()> { - self.is_idle = idle::ping_since_last_input(self, self.is_idle, client)?; + async fn run_iteration(&mut self, client: &Arc) -> anyhow::Result<()> { + self.is_idle = idle::ping_since_last_input(self, self.is_idle, client).await?; Ok(()) } diff --git a/watchers/src/watchers/x11_window.rs b/watchers/src/watchers/x11_window.rs index 231d233..3776a72 100644 --- a/watchers/src/watchers/x11_window.rs +++ b/watchers/src/watchers/x11_window.rs @@ -1,6 +1,7 @@ use super::{x11_connection::X11Client, Watcher}; use crate::report_client::ReportClient; use anyhow::Context; +use async_trait::async_trait; use std::sync::Arc; pub struct WindowWatcher { @@ -10,7 +11,7 @@ pub struct WindowWatcher { } impl WindowWatcher { - fn send_active_window(&mut self, client: &ReportClient) -> anyhow::Result<()> { + async fn send_active_window(&mut self, client: &ReportClient) -> anyhow::Result<()> { let data = self.client.active_window_data()?; if data.app_id != self.last_app_id || data.title != self.last_title { @@ -24,12 +25,14 @@ impl WindowWatcher { client .send_active_window(&self.last_app_id, &self.last_title) + .await .with_context(|| "Failed to send heartbeat for active window") } } +#[async_trait] impl Watcher for WindowWatcher { - fn new(_: &Arc) -> anyhow::Result { + async fn new(_: &Arc) -> anyhow::Result { let mut client = X11Client::new()?; client.active_window_data()?; @@ -40,7 +43,7 @@ impl Watcher for WindowWatcher { }) } - fn run_iteration(&mut self, client: &Arc) -> anyhow::Result<()> { - self.send_active_window(client) + async fn run_iteration(&mut self, client: &Arc) -> anyhow::Result<()> { + self.send_active_window(client).await } }