Replace blocking zbus to async

This commit is contained in:
Demmie 2023-07-02 21:45:13 -04:00
parent 51deffabc4
commit 589d5e0cd6
No known key found for this signature in database
GPG Key ID: B06DAA3D432C6E9A
4 changed files with 63 additions and 38 deletions

View File

@ -3,15 +3,16 @@ use crate::report_client::ReportClient;
use anyhow::Context; use anyhow::Context;
use async_trait::async_trait; use async_trait::async_trait;
use std::sync::Arc; use std::sync::Arc;
use zbus::blocking::Connection; use zbus::Connection;
pub struct IdleWatcher { pub struct IdleWatcher {
dbus_connection: Connection, dbus_connection: Connection,
is_idle: bool, is_idle: bool,
} }
#[async_trait]
impl idle::SinceLastInput for IdleWatcher { impl idle::SinceLastInput for IdleWatcher {
fn seconds_since_input(&mut self) -> anyhow::Result<u32> { async fn seconds_since_input(&mut self) -> anyhow::Result<u32> {
let ms = self let ms = self
.dbus_connection .dbus_connection
.call_method( .call_method(
@ -20,7 +21,8 @@ impl idle::SinceLastInput for IdleWatcher {
Some("org.gnome.Mutter.IdleMonitor"), Some("org.gnome.Mutter.IdleMonitor"),
"GetIdletime", "GetIdletime",
&(), &(),
)? )
.await?
.body::<u64>()?; .body::<u64>()?;
u32::try_from(ms / 1000).with_context(|| format!("Number {ms} is invalid")) u32::try_from(ms / 1000).with_context(|| format!("Number {ms} is invalid"))
} }
@ -30,10 +32,10 @@ impl idle::SinceLastInput for IdleWatcher {
impl Watcher for IdleWatcher { impl Watcher for IdleWatcher {
async fn new(_: &Arc<ReportClient>) -> anyhow::Result<Self> { async fn new(_: &Arc<ReportClient>) -> anyhow::Result<Self> {
let mut watcher = Self { let mut watcher = Self {
dbus_connection: Connection::session()?, dbus_connection: Connection::session().await?,
is_idle: false, is_idle: false,
}; };
idle::SinceLastInput::seconds_since_input(&mut watcher)?; idle::SinceLastInput::seconds_since_input(&mut watcher).await?;
Ok(watcher) Ok(watcher)
} }

View File

@ -1,9 +1,11 @@
use crate::report_client::ReportClient; use crate::report_client::ReportClient;
use async_trait::async_trait;
use chrono::{Duration, Utc}; use chrono::{Duration, Utc};
use std::sync::Arc; use std::sync::Arc;
#[async_trait]
pub trait SinceLastInput { pub trait SinceLastInput {
fn seconds_since_input(&mut self) -> anyhow::Result<u32>; async fn seconds_since_input(&mut self) -> anyhow::Result<u32>;
} }
pub async fn ping_since_last_input( pub async fn ping_since_last_input(
@ -16,7 +18,7 @@ pub async fn ping_since_last_input(
let duration_1ms: Duration = Duration::milliseconds(1); let duration_1ms: Duration = Duration::milliseconds(1);
let duration_zero: Duration = Duration::zero(); let duration_zero: Duration = Duration::zero();
let seconds_since_input = watcher.seconds_since_input()?; let seconds_since_input = watcher.seconds_since_input().await?;
let now = Utc::now(); let now = Utc::now();
let time_since_input = Duration::seconds(i64::from(seconds_since_input)); let time_since_input = Duration::seconds(i64::from(seconds_since_input));
let last_input = now - time_since_input; let last_input = now - time_since_input;

View File

@ -12,8 +12,8 @@ use std::path::Path;
use std::sync::{mpsc::channel, Arc}; use std::sync::{mpsc::channel, Arc};
use std::thread; use std::thread;
use tokio::sync::Mutex; use tokio::sync::Mutex;
use zbus::blocking::{Connection, ConnectionBuilder};
use zbus::dbus_interface; use zbus::dbus_interface;
use zbus::{Connection, ConnectionBuilder};
const KWIN_SCRIPT_NAME: &str = "activity_watcher"; const KWIN_SCRIPT_NAME: &str = "activity_watcher";
const KWIN_SCRIPT: &str = include_str!("kwin_window.js"); const KWIN_SCRIPT: &str = include_str!("kwin_window.js");
@ -31,20 +31,19 @@ impl KWinScript {
} }
} }
fn load(&mut self) -> anyhow::Result<()> { async fn load(&mut self) -> anyhow::Result<()> {
let path = temp_dir().join("kwin_window.js"); let path = temp_dir().join("kwin_window.js");
std::fs::write(&path, KWIN_SCRIPT).unwrap(); std::fs::write(&path, KWIN_SCRIPT).unwrap();
let result = self let number = self.get_registered_number(&path).await?;
.get_registered_number(&path) let result = self.start(number).await;
.and_then(|number| self.start(number));
std::fs::remove_file(&path)?; std::fs::remove_file(&path)?;
self.is_loaded = true; self.is_loaded = true;
result result
} }
fn is_loaded(&self) -> anyhow::Result<bool> { async fn is_loaded(&self) -> anyhow::Result<bool> {
self.dbus_connection self.dbus_connection
.call_method( .call_method(
Some("org.kde.KWin"), Some("org.kde.KWin"),
@ -52,12 +51,13 @@ impl KWinScript {
Some("org.kde.kwin.Scripting"), Some("org.kde.kwin.Scripting"),
"isScriptLoaded", "isScriptLoaded",
&KWIN_SCRIPT_NAME, &KWIN_SCRIPT_NAME,
)? )
.await?
.body::<bool>() .body::<bool>()
.map_err(std::convert::Into::into) .map_err(std::convert::Into::into)
} }
fn get_registered_number(&self, path: &Path) -> anyhow::Result<i32> { async fn get_registered_number(&self, path: &Path) -> anyhow::Result<i32> {
let temp_path = path let temp_path = path
.to_str() .to_str()
.ok_or(anyhow!("Temporary file path is not valid"))?; .ok_or(anyhow!("Temporary file path is not valid"))?;
@ -70,12 +70,13 @@ impl KWinScript {
"loadScript", "loadScript",
// since OsStr does not implement zvariant::Type, the temp-path must be valid utf-8 // since OsStr does not implement zvariant::Type, the temp-path must be valid utf-8
&(temp_path, KWIN_SCRIPT_NAME), &(temp_path, KWIN_SCRIPT_NAME),
)? )
.await?
.body::<i32>() .body::<i32>()
.map_err(std::convert::Into::into) .map_err(std::convert::Into::into)
} }
fn unload(&self) -> anyhow::Result<bool> { async fn unload(&self) -> anyhow::Result<bool> {
self.dbus_connection self.dbus_connection
.call_method( .call_method(
Some("org.kde.KWin"), Some("org.kde.KWin"),
@ -83,12 +84,13 @@ impl KWinScript {
Some("org.kde.kwin.Scripting"), Some("org.kde.kwin.Scripting"),
"unloadScript", "unloadScript",
&KWIN_SCRIPT_NAME, &KWIN_SCRIPT_NAME,
)? )
.await?
.body::<bool>() .body::<bool>()
.map_err(std::convert::Into::into) .map_err(std::convert::Into::into)
} }
fn start(&self, script_number: i32) -> anyhow::Result<()> { async fn start(&self, script_number: i32) -> anyhow::Result<()> {
debug!("Starting KWin script {script_number}"); debug!("Starting KWin script {script_number}");
self.dbus_connection self.dbus_connection
.call_method( .call_method(
@ -98,6 +100,7 @@ impl KWinScript {
"run", "run",
&(), &(),
) )
.await
.with_context(|| "Error on starting the script")?; .with_context(|| "Error on starting the script")?;
Ok(()) Ok(())
} }
@ -106,10 +109,16 @@ impl KWinScript {
impl Drop for KWinScript { impl Drop for KWinScript {
fn drop(&mut self) { fn drop(&mut self) {
if self.is_loaded { if self.is_loaded {
debug!("Unloading KWin script"); tokio::runtime::Builder::new_current_thread()
if let Err(e) = self.unload() { .enable_all()
error!("Problem during stopping KWin script: {e}"); .build()
}; .unwrap()
.block_on(async {
debug!("Unloading KWin script");
if let Err(e) = self.unload().await {
error!("Problem during stopping KWin script: {e}");
};
});
} }
} }
} }
@ -161,12 +170,12 @@ pub struct WindowWatcher {
#[async_trait] #[async_trait]
impl Watcher for WindowWatcher { impl Watcher for WindowWatcher {
async fn new(_: &Arc<ReportClient>) -> anyhow::Result<Self> { async fn new(_: &Arc<ReportClient>) -> anyhow::Result<Self> {
let mut kwin_script = KWinScript::new(Connection::session()?); let mut kwin_script = KWinScript::new(Connection::session().await?);
if kwin_script.is_loaded()? { if kwin_script.is_loaded().await? {
debug!("KWin script is already loaded, unloading"); debug!("KWin script is already loaded, unloading");
kwin_script.unload()?; kwin_script.unload().await?;
} }
kwin_script.load().unwrap(); kwin_script.load().await.unwrap();
let active_window = Arc::new(Mutex::new(ActiveWindow { let active_window = Arc::new(Mutex::new(ActiveWindow {
caption: String::new(), caption: String::new(),
@ -179,21 +188,32 @@ impl Watcher for WindowWatcher {
let (tx, rx) = channel(); let (tx, rx) = channel();
thread::spawn(move || { thread::spawn(move || {
let result = (|| { async fn get_connection(
active_window_interface: ActiveWindowInterface,
) -> zbus::Result<Connection> {
ConnectionBuilder::session()? ConnectionBuilder::session()?
.name("com._2e3s.Awatcher")? .name("com._2e3s.Awatcher")?
.serve_at("/com/_2e3s/Awatcher", active_window_interface)? .serve_at("/com/_2e3s/Awatcher", active_window_interface)?
.build() .build()
})(); .await
match result {
Ok(connection) => {
tx.send(None).unwrap();
loop {
connection.monitor_activity().wait();
}
}
Err(e) => tx.send(Some(e)),
} }
tokio::runtime::Builder::new_current_thread()
.enable_all()
.build()
.unwrap()
.block_on(async move {
match get_connection(active_window_interface).await {
Ok(connection) => {
tx.send(None).unwrap();
loop {
connection.monitor_activity().wait();
}
}
Err(e) => tx.send(Some(e)),
}
})
.unwrap();
}); });
if let Some(error) = rx.recv().unwrap() { if let Some(error) = rx.recv().unwrap() {
panic!("Failed to run a DBus interface: {error}"); panic!("Failed to run a DBus interface: {error}");

View File

@ -9,8 +9,9 @@ pub struct IdleWatcher {
is_idle: bool, is_idle: bool,
} }
#[async_trait]
impl idle::SinceLastInput for IdleWatcher { impl idle::SinceLastInput for IdleWatcher {
fn seconds_since_input(&mut self) -> anyhow::Result<u32> { async fn seconds_since_input(&mut self) -> anyhow::Result<u32> {
self.client.seconds_since_last_input() self.client.seconds_since_last_input()
} }
} }