Move the code to async instead of threads

This commit is contained in:
Demmie 2023-07-02 14:34:13 -04:00
parent 790277071b
commit 10d54856c1
No known key found for this signature in database
GPG Key ID: B06DAA3D432C6E9A
18 changed files with 228 additions and 236 deletions

20
Cargo.lock generated
View File

@ -291,14 +291,15 @@ checksum = "d468802bab17cbc0cc575e9b053f41e72aa36bfa6b7f55e3529ffa43161b97fa"
[[package]] [[package]]
name = "aw-client-rust" name = "aw-client-rust"
version = "0.1.0" version = "0.1.0"
source = "git+https://github.com/ActivityWatch/aw-server-rust#49b2026b2b61f8d8d017e93dca1d3754dcfef0d5" source = "git+https://github.com/ActivityWatch/aw-server-rust?rev=d4253ed#d4253ed64dc6547a821190cc29bf7dca72c616d1"
dependencies = [ dependencies = [
"aw-models 0.1.0 (git+https://github.com/ActivityWatch/aw-server-rust)", "aw-models 0.1.0 (git+https://github.com/ActivityWatch/aw-server-rust?rev=d4253ed)",
"chrono", "chrono",
"gethostname 0.4.1", "gethostname 0.4.1",
"reqwest", "reqwest",
"serde", "serde",
"serde_json", "serde_json",
"tokio",
] ]
[[package]] [[package]]
@ -332,7 +333,7 @@ dependencies = [
[[package]] [[package]]
name = "aw-models" name = "aw-models"
version = "0.1.0" version = "0.1.0"
source = "git+https://github.com/ActivityWatch/aw-server-rust#49b2026b2b61f8d8d017e93dca1d3754dcfef0d5" source = "git+https://github.com/ActivityWatch/aw-server-rust?rev=d4253ed#d4253ed64dc6547a821190cc29bf7dca72c616d1"
dependencies = [ dependencies = [
"chrono", "chrono",
"log", "log",
@ -418,7 +419,6 @@ dependencies = [
"log", "log",
"open", "open",
"rust-embed", "rust-embed",
"signal-hook",
"tokio", "tokio",
"toml 0.7.3", "toml 0.7.3",
"watchers", "watchers",
@ -3146,16 +3146,6 @@ dependencies = [
"dirs 4.0.0", "dirs 4.0.0",
] ]
[[package]]
name = "signal-hook"
version = "0.3.15"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "732768f1176d21d09e076c23a93123d40bba92d50c4058da34d45c8de8e682b9"
dependencies = [
"libc",
"signal-hook-registry",
]
[[package]] [[package]]
name = "signal-hook-registry" name = "signal-hook-registry"
version = "1.4.1" version = "1.4.1"
@ -3848,6 +3838,7 @@ name = "watchers"
version = "0.1.1" version = "0.1.1"
dependencies = [ dependencies = [
"anyhow", "anyhow",
"async-trait",
"aw-client-rust", "aw-client-rust",
"chrono", "chrono",
"dirs 5.0.0", "dirs 5.0.0",
@ -3858,6 +3849,7 @@ dependencies = [
"serde", "serde",
"serde_default", "serde_default",
"serde_json", "serde_json",
"tokio",
"toml 0.7.3", "toml 0.7.3",
"wayland-backend", "wayland-backend",
"wayland-client", "wayland-client",

View File

@ -23,6 +23,7 @@ version = "0.1.1"
[workspace.dependencies] [workspace.dependencies]
anyhow = "1.0.70" anyhow = "1.0.70"
log = { version = "0.4.17", features = ["std"] } log = { version = "0.4.17", features = ["std"] }
tokio = { version = "1.28.2" }
[dependencies] [dependencies]
watchers = { path = "./watchers", default-features = false } watchers = { path = "./watchers", default-features = false }
@ -32,8 +33,7 @@ clap = { version = "4.2.1", features = ["string"] }
fern = { version = "0.6.2", features = ["colored"] } fern = { version = "0.6.2", features = ["colored"] }
log = { workspace = true } log = { workspace = true }
anyhow = { workspace = true } anyhow = { workspace = true }
signal-hook = "0.3.15" tokio = { workspace = true, features = ["rt", "macros", "signal"] }
tokio = { version = "1.28.2", features = ["rt", "macros"] }
ksni = {version = "0.2.0", optional = true} ksni = {version = "0.2.0", optional = true}
aw-server = { git = "https://github.com/2e3s/aw-server-rust", optional = true, rev = "1c23a55" } aw-server = { git = "https://github.com/2e3s/aw-server-rust", optional = true, rev = "1c23a55" }

View File

@ -2,27 +2,25 @@ mod menu;
mod server; mod server;
pub use menu::Tray; pub use menu::Tray;
use std::{ use std::path::PathBuf;
path::PathBuf, use tokio::sync::mpsc::UnboundedSender;
sync::{atomic::AtomicBool, Arc},
};
pub async fn run( pub async fn run(
host: String, host: String,
port: u32, port: u32,
config_file: PathBuf, config_file: PathBuf,
no_tray: bool, no_tray: bool,
is_stopped: Arc<AtomicBool>, shutdown_sender: UnboundedSender<()>,
) { ) {
if !no_tray { if !no_tray {
let service = ksni::TrayService::new(Tray { let service = ksni::TrayService::new(Tray {
server_host: host, server_host: host,
server_port: port, server_port: port,
config_file, config_file,
is_stopped: Arc::clone(&is_stopped), shutdown_sender,
}); });
service.spawn(); service.spawn();
} }
server::run(port, is_stopped).await; server::run(port).await;
} }

View File

@ -1,17 +1,13 @@
use std::{ use std::path::PathBuf;
path::PathBuf,
sync::{ use tokio::sync::mpsc::UnboundedSender;
atomic::{AtomicBool, Ordering},
Arc,
},
};
#[derive(Debug)] #[derive(Debug)]
pub struct Tray { pub struct Tray {
pub server_host: String, pub server_host: String,
pub server_port: u32, pub server_port: u32,
pub config_file: PathBuf, pub config_file: PathBuf,
pub is_stopped: Arc<AtomicBool>, pub shutdown_sender: UnboundedSender<()>,
} }
impl ksni::Tray for Tray { impl ksni::Tray for Tray {
@ -59,10 +55,10 @@ impl ksni::Tray for Tray {
label: "Exit".into(), label: "Exit".into(),
icon_name: "application-exit".into(), icon_name: "application-exit".into(),
activate: { activate: {
let is_stopped = Arc::clone(&self.is_stopped); let shutdown_sender = self.shutdown_sender.clone();
Box::new(move |_| { Box::new(move |_| {
is_stopped.store(true, Ordering::Relaxed); shutdown_sender.send(()).unwrap();
}) })
}, },
..Default::default() ..Default::default()

View File

@ -1,12 +1,8 @@
use anyhow::anyhow; use anyhow::anyhow;
use aw_server::endpoints::{build_rocket, embed_asset_resolver}; use aw_server::endpoints::{build_rocket, embed_asset_resolver};
use std::sync::{ use std::sync::Mutex;
atomic::{AtomicBool, Ordering},
Arc, Mutex,
};
use tokio::time::{sleep, Duration};
pub async fn run(port: u32, is_stopped: Arc<AtomicBool>) { pub async fn run(port: u32) {
let db_path = aw_server::dirs::db_path(false) let db_path = aw_server::dirs::db_path(false)
.map_err(|_| anyhow!("DB path is not found")) .map_err(|_| anyhow!("DB path is not found"))
.unwrap() .unwrap()
@ -24,21 +20,5 @@ pub async fn run(port: u32, is_stopped: Arc<AtomicBool>) {
asset_resolver: embed_asset_resolver!("$AW_WEBUI_DIST"), asset_resolver: embed_asset_resolver!("$AW_WEBUI_DIST"),
device_id, device_id,
}; };
let server = build_rocket(server_state, config).launch(); build_rocket(server_state, config).launch().await.unwrap();
let check = async {
loop {
if is_stopped.load(Ordering::Relaxed) {
warn!("Received an exit signal, stopping the server");
break;
}
sleep(Duration::from_secs(1)).await;
}
};
tokio::select! (
r = server => {r.unwrap();},
_ = check => {},
);
} }

View File

@ -8,17 +8,14 @@ extern crate log;
mod bundle; mod bundle;
mod config; mod config;
use std::sync::atomic::AtomicBool;
use std::sync::Arc; use std::sync::Arc;
use watchers::run_first_supported; use tokio::signal::unix::{signal, SignalKind};
use watchers::ReportClient; #[cfg(feature = "bundle")]
use tokio::sync::mpsc;
use watchers::{run_first_supported, ReportClient, WatcherType};
#[tokio::main(flavor = "current_thread")] #[tokio::main(flavor = "current_thread")]
async fn main() -> anyhow::Result<()> { async fn main() -> anyhow::Result<()> {
let is_stopped = Arc::new(AtomicBool::new(false));
signal_hook::flag::register(signal_hook::consts::SIGTERM, Arc::clone(&is_stopped))?;
signal_hook::flag::register(signal_hook::consts::SIGINT, Arc::clone(&is_stopped))?;
let config = config::from_cli()?; let config = config::from_cli()?;
#[cfg(feature = "bundle")] #[cfg(feature = "bundle")]
let no_tray = config.no_tray; let no_tray = config.no_tray;
@ -40,28 +37,49 @@ async fn main() -> anyhow::Result<()> {
"Window polling period: {} seconds", "Window polling period: {} seconds",
config.poll_time_window.as_secs() config.poll_time_window.as_secs()
); );
#[cfg(feature = "bundle")]
let (shutdown_send, mut shutdown_recv) = mpsc::unbounded_channel();
#[cfg(feature = "bundle")]
let bundle_handle = tokio::spawn(bundle::run(
config.host.clone(),
config.port,
config_file,
no_tray,
shutdown_send,
));
let client = ReportClient::new(config)?; let client = Arc::new(ReportClient::new(config).await?);
let client = Arc::new(client);
let idle_handler = run_first_supported(watchers::IDLE, &client, Arc::clone(&is_stopped)); let idle_future = run_first_supported(Arc::clone(&client), &WatcherType::Idle);
let active_window_handler = let active_window_future = run_first_supported(Arc::clone(&client), &WatcherType::ActiveWindow);
run_first_supported(watchers::ACTIVE_WINDOW, &client, Arc::clone(&is_stopped)); let sigterm = async {
signal(SignalKind::terminate()).unwrap().recv().await;
warn!("Caught SIGTERM, shutting down...");
};
let sigint = async {
signal(SignalKind::interrupt()).unwrap().recv().await;
warn!("Caught SIGINT, shutting down...");
};
#[cfg(not(feature = "bundle"))] #[cfg(not(feature = "bundle"))]
{ {
tokio::select!( tokio::select!(
_ = idle_handler => Ok(()), _ = tokio::spawn(idle_future) => Ok(()),
_ = active_window_handler => Ok(()), _ = tokio::spawn(active_window_future) => Ok(()),
_ = sigterm => Ok(()),
_ = sigint => Ok(()),
) )
} }
#[cfg(feature = "bundle")] #[cfg(feature = "bundle")]
{ {
tokio::select!( tokio::select!(
_ = idle_handler => Ok(()), _ = bundle_handle => Ok(()),
_ = active_window_handler => Ok(()), _ = tokio::spawn(idle_future) => Ok(()),
_ = bundle::run(client.config.host.clone(), client.config.port, config_file, no_tray, Arc::clone(&is_stopped)) => Ok(()), _ = tokio::spawn(active_window_future) => Ok(()),
_ = sigterm => Ok(()),
_ = sigint => Ok(()),
_ = shutdown_recv.recv() => Ok(()),
) )
} }
} }

View File

@ -13,7 +13,7 @@ path = "src/lib.rs"
rstest = "0.17.0" rstest = "0.17.0"
[dependencies] [dependencies]
aw-client-rust = { git = "https://github.com/ActivityWatch/aw-server-rust" } aw-client-rust = { git = "https://github.com/ActivityWatch/aw-server-rust", rev = "d4253ed" }
wayland-client = "0.30.1" wayland-client = "0.30.1"
wayland-scanner = "0.30" wayland-scanner = "0.30"
wayland-backend = "0.1" wayland-backend = "0.1"
@ -29,6 +29,8 @@ regex = "1.8.1"
gethostname = "0.4.1" gethostname = "0.4.1"
log = { workspace = true } log = { workspace = true }
anyhow = { workspace = true } anyhow = { workspace = true }
async-trait = "0.1.68"
tokio = { workspace = true, features = ["time", "sync"] }
[features] [features]
default = ["gnome", "kwin_window"] default = ["gnome", "kwin_window"]

View File

@ -7,6 +7,4 @@ mod watchers;
pub use crate::report_client::ReportClient; pub use crate::report_client::ReportClient;
pub use crate::watchers::run_first_supported; pub use crate::watchers::run_first_supported;
pub use crate::watchers::Watcher; pub use crate::watchers::WatcherType;
pub use crate::watchers::ACTIVE_WINDOW;
pub use crate::watchers::IDLE;

View File

@ -12,15 +12,15 @@ pub struct ReportClient {
} }
impl ReportClient { impl ReportClient {
pub fn new(config: Config) -> anyhow::Result<Self> { pub async fn new(config: Config) -> anyhow::Result<Self> {
let client = AwClient::new(&config.host, &config.port.to_string(), "awatcher"); let client = AwClient::new(&config.host, &config.port.to_string(), "awatcher");
let hostname = gethostname::gethostname().into_string().unwrap(); let hostname = gethostname::gethostname().into_string().unwrap();
let idle_bucket_name = format!("aw-watcher-afk_{hostname}"); let idle_bucket_name = format!("aw-watcher-afk_{hostname}");
let active_window_bucket_name = format!("aw-watcher-window_{hostname}"); let active_window_bucket_name = format!("aw-watcher-window_{hostname}");
if !config.no_server { if !config.no_server {
Self::create_bucket(&client, &idle_bucket_name, "afkstatus")?; Self::create_bucket(&client, &idle_bucket_name, "afkstatus").await?;
Self::create_bucket(&client, &active_window_bucket_name, "currentwindow")?; Self::create_bucket(&client, &active_window_bucket_name, "currentwindow").await?;
} }
Ok(Self { Ok(Self {
@ -31,7 +31,7 @@ impl ReportClient {
}) })
} }
pub fn ping( pub async fn ping(
&self, &self,
is_idle: bool, is_idle: bool,
timestamp: DateTime<Utc>, timestamp: DateTime<Utc>,
@ -57,10 +57,11 @@ impl ReportClient {
let pulsetime = (self.config.idle_timeout + self.config.poll_time_idle).as_secs_f64(); let pulsetime = (self.config.idle_timeout + self.config.poll_time_idle).as_secs_f64();
self.client self.client
.heartbeat(&self.idle_bucket_name, &event, pulsetime) .heartbeat(&self.idle_bucket_name, &event, pulsetime)
.await
.with_context(|| "Failed to send heartbeat") .with_context(|| "Failed to send heartbeat")
} }
pub fn send_active_window(&self, app_id: &str, title: &str) -> anyhow::Result<()> { pub async fn send_active_window(&self, app_id: &str, title: &str) -> anyhow::Result<()> {
let mut data = Map::new(); let mut data = Map::new();
let replacement = self.config.window_data_replacement(app_id, title); let replacement = self.config.window_data_replacement(app_id, title);
@ -97,16 +98,18 @@ impl ReportClient {
let interval_margin = self.config.poll_time_idle.as_secs_f64() + 1.0; let interval_margin = self.config.poll_time_idle.as_secs_f64() + 1.0;
self.client self.client
.heartbeat(&self.active_window_bucket_name, &event, interval_margin) .heartbeat(&self.active_window_bucket_name, &event, interval_margin)
.await
.with_context(|| "Failed to send heartbeat for active window") .with_context(|| "Failed to send heartbeat for active window")
} }
fn create_bucket( async fn create_bucket(
client: &AwClient, client: &AwClient,
bucket_name: &str, bucket_name: &str,
bucket_type: &str, bucket_type: &str,
) -> anyhow::Result<()> { ) -> anyhow::Result<()> {
client client
.create_bucket_simple(bucket_name, bucket_type) .create_bucket_simple(bucket_name, bucket_type)
.await
.with_context(|| format!("Failed to create bucket {bucket_name}")) .with_context(|| format!("Failed to create bucket {bucket_name}"))
} }
} }

View File

@ -14,15 +14,9 @@ mod x11_screensaver_idle;
mod x11_window; mod x11_window;
use crate::{config::Config, report_client::ReportClient}; use crate::{config::Config, report_client::ReportClient};
use std::{ use async_trait::async_trait;
fmt::Display, use std::{fmt::Display, sync::Arc};
sync::{ use tokio::time;
atomic::{AtomicBool, Ordering},
Arc,
},
thread,
time::Duration,
};
pub enum WatcherType { pub enum WatcherType {
Idle, Idle,
@ -30,10 +24,10 @@ pub enum WatcherType {
} }
impl WatcherType { impl WatcherType {
fn sleep_time(&self, config: &Config) -> Duration { fn sleep_time(&self, config: &Config) -> time::Duration {
match self { match self {
WatcherType::Idle => config.poll_time_idle, WatcherType::Idle => config.poll_time_idle,
WatcherType::ActiveWindow => config.poll_time_idle, WatcherType::ActiveWindow => config.poll_time_window,
} }
} }
} }
@ -47,97 +41,69 @@ impl Display for WatcherType {
} }
} }
#[async_trait]
pub trait Watcher: Send { pub trait Watcher: Send {
fn new(client: &Arc<ReportClient>) -> anyhow::Result<Self> async fn new(client: &Arc<ReportClient>) -> anyhow::Result<Self>
where where
Self: Sized; Self: Sized;
fn run_iteration(&mut self, client: &Arc<ReportClient>) -> anyhow::Result<()>; async fn run_iteration(&mut self, client: &Arc<ReportClient>) -> anyhow::Result<()>;
} }
type BoxedWatcher = Box<dyn Watcher>; macro_rules! watch {
type WatcherConstructors = [( ($client:expr, $watcher_type:expr, $watcher_struct:ty) => {
&'static str, match <$watcher_struct>::new($client).await {
WatcherType,
fn(&Arc<ReportClient>) -> anyhow::Result<BoxedWatcher>,
)];
pub fn filter_first_supported(
watcher_constructors: &'static WatcherConstructors,
client: &Arc<ReportClient>,
) -> Option<(&'static WatcherType, BoxedWatcher)> {
watcher_constructors
.iter()
.find_map(|(name, watcher_type, watcher)| match watcher(client) {
Ok(watcher) => { Ok(watcher) => {
info!("Selected {name} as {watcher_type} watcher"); info!(
Some((watcher_type, watcher)) "Selected {} as {} watcher",
stringify!($watcher_struct),
$watcher_type
);
return Some(Box::new(watcher));
} }
Err(e) => { Err(e) => {
debug!("{name} cannot run: {e}"); debug!("{} cannot run: {e}", stringify!($watcher_struct));
None
} }
}) };
}
async fn run_watcher(
watcher: &mut Box<dyn Watcher>,
watcher_type: &WatcherType,
client: &Arc<ReportClient>,
is_stopped: Arc<AtomicBool>,
) {
info!("Starting {watcher_type} watcher");
loop {
if is_stopped.load(Ordering::Relaxed) {
warn!("Received an exit signal, shutting down {watcher_type}");
break;
}
if let Err(e) = watcher.run_iteration(client) {
error!("Error on {watcher_type} iteration: {e}");
}
thread::sleep(watcher_type.sleep_time(&client.config));
}
}
pub async fn run_first_supported(
watcher_constructors: &'static WatcherConstructors,
client: &Arc<ReportClient>,
is_stopped: Arc<AtomicBool>,
) -> bool {
let supported_watcher = filter_first_supported(watcher_constructors, client);
if let Some((watcher_type, mut watcher)) = supported_watcher {
let thread_client = Arc::clone(client);
run_watcher(&mut watcher, watcher_type, &thread_client, is_stopped).await;
true
} else {
false
}
}
macro_rules! watcher {
($watcher_struct:ty, $watcher_type:expr) => {
(stringify!($watcher_struct), $watcher_type, |client| {
Ok(Box::new(<$watcher_struct>::new(client)?))
})
}; };
} }
pub const IDLE: &WatcherConstructors = &[ async fn filter_first_supported(
watcher!(wl_kwin_idle::IdleWatcher, WatcherType::Idle), client: &Arc<ReportClient>,
watcher!(x11_screensaver_idle::IdleWatcher, WatcherType::Idle), watcher_type: &WatcherType,
#[cfg(feature = "gnome")] ) -> Option<Box<dyn Watcher>> {
watcher!(gnome_idle::IdleWatcher, WatcherType::Idle), match watcher_type {
]; WatcherType::Idle => {
watch!(client, watcher_type, wl_kwin_idle::IdleWatcher);
watch!(client, watcher_type, x11_screensaver_idle::IdleWatcher);
#[cfg(feature = "gnome")]
watch!(client, watcher_type, gnome_idle::IdleWatcher);
}
WatcherType::ActiveWindow => {
watch!(client, watcher_type, wl_foreign_toplevel::WindowWatcher);
// XWayland gives _NET_WM_NAME on some windows in KDE, but not on others
#[cfg(feature = "kwin_window")]
watch!(client, watcher_type, kwin_window::WindowWatcher);
watch!(client, watcher_type, x11_window::WindowWatcher);
#[cfg(feature = "gnome")]
watch!(client, watcher_type, gnome_window::WindowWatcher);
}
};
pub const ACTIVE_WINDOW: &WatcherConstructors = &[ None
watcher!( }
wl_foreign_toplevel::WindowWatcher,
WatcherType::ActiveWindow pub async fn run_first_supported(client: Arc<ReportClient>, watcher_type: &WatcherType) -> bool {
), let supported_watcher = filter_first_supported(&client, watcher_type).await;
// XWayland gives _NET_WM_NAME on some windows in KDE, but not on others if let Some(mut watcher) = supported_watcher {
#[cfg(feature = "kwin_window")] info!("Starting {watcher_type} watcher");
watcher!(kwin_window::WindowWatcher, WatcherType::ActiveWindow), loop {
watcher!(x11_window::WindowWatcher, WatcherType::ActiveWindow), if let Err(e) = watcher.run_iteration(&client).await {
#[cfg(feature = "gnome")] error!("Error on {watcher_type} iteration: {e}");
watcher!(gnome_window::WindowWatcher, WatcherType::ActiveWindow), }
]; time::sleep(watcher_type.sleep_time(&client.config)).await;
}
}
false
}

View File

@ -1,6 +1,7 @@
use super::{idle, Watcher}; use super::{idle, Watcher};
use crate::report_client::ReportClient; use crate::report_client::ReportClient;
use anyhow::Context; use anyhow::Context;
use async_trait::async_trait;
use std::sync::Arc; use std::sync::Arc;
use zbus::blocking::Connection; use zbus::blocking::Connection;
@ -25,8 +26,9 @@ impl idle::SinceLastInput for IdleWatcher {
} }
} }
#[async_trait]
impl Watcher for IdleWatcher { impl Watcher for IdleWatcher {
fn new(_: &Arc<ReportClient>) -> anyhow::Result<Self> { async fn new(_: &Arc<ReportClient>) -> anyhow::Result<Self> {
let mut watcher = Self { let mut watcher = Self {
dbus_connection: Connection::session()?, dbus_connection: Connection::session()?,
is_idle: false, is_idle: false,
@ -36,8 +38,8 @@ impl Watcher for IdleWatcher {
Ok(watcher) Ok(watcher)
} }
fn run_iteration(&mut self, client: &Arc<ReportClient>) -> anyhow::Result<()> { async fn run_iteration(&mut self, client: &Arc<ReportClient>) -> anyhow::Result<()> {
self.is_idle = idle::ping_since_last_input(self, self.is_idle, client)?; self.is_idle = idle::ping_since_last_input(self, self.is_idle, client).await?;
Ok(()) Ok(())
} }

View File

@ -1,8 +1,9 @@
use crate::report_client::ReportClient; use crate::report_client::ReportClient;
use anyhow::Context; use anyhow::Context;
use async_trait::async_trait;
use serde::Deserialize; use serde::Deserialize;
use std::sync::Arc; use std::sync::Arc;
use zbus::blocking::Connection; use zbus::Connection;
use super::Watcher; use super::Watcher;
@ -19,14 +20,17 @@ struct WindowData {
} }
impl WindowWatcher { impl WindowWatcher {
fn get_window_data(&self) -> anyhow::Result<WindowData> { async fn get_window_data(&self) -> anyhow::Result<WindowData> {
let call_response = self.dbus_connection.call_method( let call_response = self
Some("org.gnome.Shell"), .dbus_connection
"/org/gnome/shell/extensions/FocusedWindow", .call_method(
Some("org.gnome.shell.extensions.FocusedWindow"), Some("org.gnome.Shell"),
"Get", "/org/gnome/shell/extensions/FocusedWindow",
&(), Some("org.gnome.shell.extensions.FocusedWindow"),
); "Get",
&(),
)
.await;
match call_response { match call_response {
Ok(json) => { Ok(json) => {
@ -48,8 +52,8 @@ impl WindowWatcher {
} }
} }
fn send_active_window(&mut self, client: &ReportClient) -> anyhow::Result<()> { async fn send_active_window(&mut self, client: &ReportClient) -> anyhow::Result<()> {
let data = self.get_window_data(); let data = self.get_window_data().await;
if let Err(e) = data { if let Err(e) = data {
if e.to_string().contains("Object does not exist at path") { if e.to_string().contains("Object does not exist at path") {
trace!("The extension seems to have stopped"); trace!("The extension seems to have stopped");
@ -70,14 +74,16 @@ impl WindowWatcher {
client client
.send_active_window(&self.last_app_id, &self.last_title) .send_active_window(&self.last_app_id, &self.last_title)
.await
.with_context(|| "Failed to send heartbeat for active window") .with_context(|| "Failed to send heartbeat for active window")
} }
} }
#[async_trait]
impl Watcher for WindowWatcher { impl Watcher for WindowWatcher {
fn new(_: &Arc<ReportClient>) -> anyhow::Result<Self> { async fn new(_: &Arc<ReportClient>) -> anyhow::Result<Self> {
let watcher = Self { let watcher = Self {
dbus_connection: Connection::session()?, dbus_connection: Connection::session().await?,
last_app_id: String::new(), last_app_id: String::new(),
last_title: String::new(), last_title: String::new(),
}; };
@ -85,7 +91,7 @@ impl Watcher for WindowWatcher {
Ok(watcher) Ok(watcher)
} }
fn run_iteration(&mut self, client: &Arc<ReportClient>) -> anyhow::Result<()> { async fn run_iteration(&mut self, client: &Arc<ReportClient>) -> anyhow::Result<()> {
self.send_active_window(client) self.send_active_window(client).await
} }
} }

View File

@ -6,7 +6,7 @@ pub trait SinceLastInput {
fn seconds_since_input(&mut self) -> anyhow::Result<u32>; fn seconds_since_input(&mut self) -> anyhow::Result<u32>;
} }
pub fn ping_since_last_input( pub async fn ping_since_last_input(
watcher: &mut impl SinceLastInput, watcher: &mut impl SinceLastInput,
is_idle: bool, is_idle: bool,
client: &Arc<ReportClient>, client: &Arc<ReportClient>,
@ -24,24 +24,28 @@ pub fn ping_since_last_input(
if is_idle && u64::from(seconds_since_input) < client.config.idle_timeout.as_secs() { if is_idle && u64::from(seconds_since_input) < client.config.idle_timeout.as_secs() {
debug!("No longer idle"); debug!("No longer idle");
client.ping(is_idle, last_input, duration_zero)?; client.ping(is_idle, last_input, duration_zero).await?;
is_idle_again = false; is_idle_again = false;
// ping with timestamp+1ms with the next event (to ensure the latest event gets retrieved by get_event) // ping with timestamp+1ms with the next event (to ensure the latest event gets retrieved by get_event)
client.ping(is_idle, last_input + duration_1ms, duration_zero)?; client
.ping(is_idle, last_input + duration_1ms, duration_zero)
.await?;
} else if !is_idle && u64::from(seconds_since_input) >= client.config.idle_timeout.as_secs() { } else if !is_idle && u64::from(seconds_since_input) >= client.config.idle_timeout.as_secs() {
debug!("Idle again"); debug!("Idle again");
client.ping(is_idle, last_input, duration_zero)?; client.ping(is_idle, last_input, duration_zero).await?;
is_idle_again = true; is_idle_again = true;
// ping with timestamp+1ms with the next event (to ensure the latest event gets retrieved by get_event) // ping with timestamp+1ms with the next event (to ensure the latest event gets retrieved by get_event)
client.ping(is_idle, last_input + duration_1ms, time_since_input)?; client
.ping(is_idle, last_input + duration_1ms, time_since_input)
.await?;
} else { } else {
// Send a heartbeat if no state change was made // Send a heartbeat if no state change was made
if is_idle { if is_idle {
trace!("Reporting as idle"); trace!("Reporting as idle");
client.ping(is_idle, last_input, time_since_input)?; client.ping(is_idle, last_input, time_since_input).await?;
} else { } else {
trace!("Reporting as not idle"); trace!("Reporting as not idle");
client.ping(is_idle, last_input, duration_zero)?; client.ping(is_idle, last_input, duration_zero).await?;
} }
} }

View File

@ -6,10 +6,12 @@
use super::Watcher; use super::Watcher;
use crate::report_client::ReportClient; use crate::report_client::ReportClient;
use anyhow::{anyhow, Context}; use anyhow::{anyhow, Context};
use async_trait::async_trait;
use std::env::temp_dir; use std::env::temp_dir;
use std::path::Path; use std::path::Path;
use std::sync::{mpsc::channel, Arc, Mutex}; use std::sync::{mpsc::channel, Arc};
use std::thread; use std::thread;
use tokio::sync::Mutex;
use zbus::blocking::{Connection, ConnectionBuilder}; use zbus::blocking::{Connection, ConnectionBuilder};
use zbus::dbus_interface; use zbus::dbus_interface;
@ -112,14 +114,15 @@ impl Drop for KWinScript {
} }
} }
fn send_active_window( async fn send_active_window(
client: &ReportClient, client: &ReportClient,
active_window: &Arc<Mutex<ActiveWindow>>, active_window: &Arc<Mutex<ActiveWindow>>,
) -> anyhow::Result<()> { ) -> anyhow::Result<()> {
let active_window = active_window.lock().expect("Lock cannot be acquired"); let active_window = active_window.lock().await;
client client
.send_active_window(&active_window.resource_class, &active_window.caption) .send_active_window(&active_window.resource_class, &active_window.caption)
.await
.with_context(|| "Failed to send heartbeat for active window") .with_context(|| "Failed to send heartbeat for active window")
} }
@ -135,14 +138,14 @@ struct ActiveWindowInterface {
#[dbus_interface(name = "com._2e3s.Awatcher")] #[dbus_interface(name = "com._2e3s.Awatcher")]
impl ActiveWindowInterface { impl ActiveWindowInterface {
fn notify_active_window( async fn notify_active_window(
&mut self, &mut self,
caption: String, caption: String,
resource_class: String, resource_class: String,
resource_name: String, resource_name: String,
) { ) {
debug!("Active window class: \"{resource_class}\", name: \"{resource_name}\", caption: \"{caption}\""); debug!("Active window class: \"{resource_class}\", name: \"{resource_name}\", caption: \"{caption}\"");
let mut active_window = self.active_window.lock().unwrap(); let mut active_window = self.active_window.lock().await;
active_window.caption = caption; active_window.caption = caption;
active_window.resource_class = resource_class; active_window.resource_class = resource_class;
active_window.resource_name = resource_name; active_window.resource_name = resource_name;
@ -155,8 +158,9 @@ pub struct WindowWatcher {
_kwin_script: KWinScript, _kwin_script: KWinScript,
} }
#[async_trait]
impl Watcher for WindowWatcher { impl Watcher for WindowWatcher {
fn new(_: &Arc<ReportClient>) -> anyhow::Result<Self> { async fn new(_: &Arc<ReportClient>) -> anyhow::Result<Self> {
let mut kwin_script = KWinScript::new(Connection::session()?); let mut kwin_script = KWinScript::new(Connection::session()?);
if kwin_script.is_loaded()? { if kwin_script.is_loaded()? {
debug!("KWin script is already loaded, unloading"); debug!("KWin script is already loaded, unloading");
@ -201,7 +205,7 @@ impl Watcher for WindowWatcher {
}) })
} }
fn run_iteration(&mut self, client: &Arc<ReportClient>) -> anyhow::Result<()> { async fn run_iteration(&mut self, client: &Arc<ReportClient>) -> anyhow::Result<()> {
send_active_window(client, &self.active_window) send_active_window(client, &self.active_window).await
} }
} }

View File

@ -8,6 +8,7 @@ use super::wl_connection::WlEventConnection;
use super::{wl_connection::subscribe_state, Watcher}; use super::{wl_connection::subscribe_state, Watcher};
use crate::report_client::ReportClient; use crate::report_client::ReportClient;
use anyhow::{anyhow, Context}; use anyhow::{anyhow, Context};
use async_trait::async_trait;
use std::collections::HashMap; use std::collections::HashMap;
use std::sync::Arc; use std::sync::Arc;
use wayland_client::{ use wayland_client::{
@ -117,7 +118,7 @@ pub struct WindowWatcher {
} }
impl WindowWatcher { impl WindowWatcher {
fn send_active_window(&self, client: &Arc<ReportClient>) -> anyhow::Result<()> { async fn send_active_window(&self, client: &Arc<ReportClient>) -> anyhow::Result<()> {
let active_window_id = self let active_window_id = self
.toplevel_state .toplevel_state
.current_window_id .current_window_id
@ -133,12 +134,14 @@ impl WindowWatcher {
client client
.send_active_window(&active_window.app_id, &active_window.title) .send_active_window(&active_window.app_id, &active_window.title)
.await
.with_context(|| "Failed to send heartbeat for active window") .with_context(|| "Failed to send heartbeat for active window")
} }
} }
#[async_trait]
impl Watcher for WindowWatcher { impl Watcher for WindowWatcher {
fn new(_: &Arc<ReportClient>) -> anyhow::Result<Self> { async fn new(_: &Arc<ReportClient>) -> anyhow::Result<Self> {
let mut connection: WlEventConnection<ToplevelState> = WlEventConnection::connect()?; let mut connection: WlEventConnection<ToplevelState> = WlEventConnection::connect()?;
connection.get_foreign_toplevel_manager()?; connection.get_foreign_toplevel_manager()?;
@ -155,12 +158,12 @@ impl Watcher for WindowWatcher {
}) })
} }
fn run_iteration(&mut self, client: &Arc<ReportClient>) -> anyhow::Result<()> { async fn run_iteration(&mut self, client: &Arc<ReportClient>) -> anyhow::Result<()> {
self.connection self.connection
.event_queue .event_queue
.roundtrip(&mut self.toplevel_state) .roundtrip(&mut self.toplevel_state)
.map_err(|e| anyhow!("Event queue is not processed: {e}"))?; .map_err(|e| anyhow!("Event queue is not processed: {e}"))?;
self.send_active_window(client) self.send_active_window(client).await
} }
} }

View File

@ -3,6 +3,7 @@ use super::wl_connection::{subscribe_state, WlEventConnection};
use super::Watcher; use super::Watcher;
use crate::report_client::ReportClient; use crate::report_client::ReportClient;
use anyhow::anyhow; use anyhow::anyhow;
use async_trait::async_trait;
use chrono::{DateTime, Duration, Utc}; use chrono::{DateTime, Duration, Utc};
use std::sync::Arc; use std::sync::Arc;
use wayland_client::{ use wayland_client::{
@ -52,7 +53,7 @@ impl IdleState {
debug!("Resumed"); debug!("Resumed");
} }
fn send_ping(&mut self, client: &Arc<ReportClient>) -> anyhow::Result<()> { async fn send_ping(&mut self, client: &Arc<ReportClient>) -> anyhow::Result<()> {
let now = Utc::now(); let now = Utc::now();
if !self.is_idle { if !self.is_idle {
self.last_input_time = now; self.last_input_time = now;
@ -61,30 +62,42 @@ impl IdleState {
if self.is_changed { if self.is_changed {
let result = if self.is_idle { let result = if self.is_idle {
debug!("Reporting as changed to idle"); debug!("Reporting as changed to idle");
client.ping(false, self.last_input_time, Duration::zero())?; client
client.ping( .ping(false, self.last_input_time, Duration::zero())
true, .await?;
self.last_input_time + Duration::milliseconds(1), client
Duration::zero(), .ping(
) true,
self.last_input_time + Duration::milliseconds(1),
Duration::zero(),
)
.await
} else { } else {
debug!("Reporting as no longer idle"); debug!("Reporting as no longer idle");
client.ping(true, self.last_input_time, Duration::zero())?; client
client.ping( .ping(true, self.last_input_time, Duration::zero())
false, .await?;
self.last_input_time + Duration::milliseconds(1), client
Duration::zero(), .ping(
) false,
self.last_input_time + Duration::milliseconds(1),
Duration::zero(),
)
.await
}; };
self.is_changed = false; self.is_changed = false;
result result
} else if self.is_idle { } else if self.is_idle {
trace!("Reporting as idle"); trace!("Reporting as idle");
client.ping(true, self.last_input_time, now - self.last_input_time) client
.ping(true, self.last_input_time, now - self.last_input_time)
.await
} else { } else {
trace!("Reporting as not idle"); trace!("Reporting as not idle");
client.ping(false, self.last_input_time, Duration::zero()) client
.ping(false, self.last_input_time, Duration::zero())
.await
} }
} }
} }
@ -116,8 +129,9 @@ pub struct IdleWatcher {
idle_state: IdleState, idle_state: IdleState,
} }
#[async_trait]
impl Watcher for IdleWatcher { impl Watcher for IdleWatcher {
fn new(client: &Arc<ReportClient>) -> anyhow::Result<Self> { async fn new(client: &Arc<ReportClient>) -> anyhow::Result<Self> {
let mut connection: WlEventConnection<IdleState> = WlEventConnection::connect()?; let mut connection: WlEventConnection<IdleState> = WlEventConnection::connect()?;
connection.get_kwin_idle()?; connection.get_kwin_idle()?;
@ -132,12 +146,12 @@ impl Watcher for IdleWatcher {
}) })
} }
fn run_iteration(&mut self, client: &Arc<ReportClient>) -> anyhow::Result<()> { async fn run_iteration(&mut self, client: &Arc<ReportClient>) -> anyhow::Result<()> {
self.connection self.connection
.event_queue .event_queue
.roundtrip(&mut self.idle_state) .roundtrip(&mut self.idle_state)
.map_err(|e| anyhow!("Event queue is not processed: {e}"))?; .map_err(|e| anyhow!("Event queue is not processed: {e}"))?;
self.idle_state.send_ping(client) self.idle_state.send_ping(client).await
} }
} }

View File

@ -1,3 +1,5 @@
use async_trait::async_trait;
use super::{idle, x11_connection::X11Client, Watcher}; use super::{idle, x11_connection::X11Client, Watcher};
use crate::report_client::ReportClient; use crate::report_client::ReportClient;
use std::sync::Arc; use std::sync::Arc;
@ -13,8 +15,9 @@ impl idle::SinceLastInput for IdleWatcher {
} }
} }
#[async_trait]
impl Watcher for IdleWatcher { impl Watcher for IdleWatcher {
fn new(_: &Arc<ReportClient>) -> anyhow::Result<Self> { async fn new(_: &Arc<ReportClient>) -> anyhow::Result<Self> {
let mut client = X11Client::new()?; let mut client = X11Client::new()?;
// Check if screensaver extension is supported // Check if screensaver extension is supported
@ -26,8 +29,8 @@ impl Watcher for IdleWatcher {
}) })
} }
fn run_iteration(&mut self, client: &Arc<ReportClient>) -> anyhow::Result<()> { async fn run_iteration(&mut self, client: &Arc<ReportClient>) -> anyhow::Result<()> {
self.is_idle = idle::ping_since_last_input(self, self.is_idle, client)?; self.is_idle = idle::ping_since_last_input(self, self.is_idle, client).await?;
Ok(()) Ok(())
} }

View File

@ -1,6 +1,7 @@
use super::{x11_connection::X11Client, Watcher}; use super::{x11_connection::X11Client, Watcher};
use crate::report_client::ReportClient; use crate::report_client::ReportClient;
use anyhow::Context; use anyhow::Context;
use async_trait::async_trait;
use std::sync::Arc; use std::sync::Arc;
pub struct WindowWatcher { pub struct WindowWatcher {
@ -10,7 +11,7 @@ pub struct WindowWatcher {
} }
impl WindowWatcher { impl WindowWatcher {
fn send_active_window(&mut self, client: &ReportClient) -> anyhow::Result<()> { async fn send_active_window(&mut self, client: &ReportClient) -> anyhow::Result<()> {
let data = self.client.active_window_data()?; let data = self.client.active_window_data()?;
if data.app_id != self.last_app_id || data.title != self.last_title { if data.app_id != self.last_app_id || data.title != self.last_title {
@ -24,12 +25,14 @@ impl WindowWatcher {
client client
.send_active_window(&self.last_app_id, &self.last_title) .send_active_window(&self.last_app_id, &self.last_title)
.await
.with_context(|| "Failed to send heartbeat for active window") .with_context(|| "Failed to send heartbeat for active window")
} }
} }
#[async_trait]
impl Watcher for WindowWatcher { impl Watcher for WindowWatcher {
fn new(_: &Arc<ReportClient>) -> anyhow::Result<Self> { async fn new(_: &Arc<ReportClient>) -> anyhow::Result<Self> {
let mut client = X11Client::new()?; let mut client = X11Client::new()?;
client.active_window_data()?; client.active_window_data()?;
@ -40,7 +43,7 @@ impl Watcher for WindowWatcher {
}) })
} }
fn run_iteration(&mut self, client: &Arc<ReportClient>) -> anyhow::Result<()> { async fn run_iteration(&mut self, client: &Arc<ReportClient>) -> anyhow::Result<()> {
self.send_active_window(client) self.send_active_window(client).await
} }
} }