Invert client dependency for idle watchers with basic tests

This commit is contained in:
Demmie 2024-06-25 02:16:53 -04:00
parent db3d14bf4a
commit 1b41533e1e
No known key found for this signature in database
GPG Key ID: B06DAA3D432C6E9A
10 changed files with 245 additions and 82 deletions

72
Cargo.lock generated
View File

@ -1084,6 +1084,12 @@ dependencies = [
"libloading", "libloading",
] ]
[[package]]
name = "downcast"
version = "0.11.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "1435fa1053d8b2fbbe9be7e97eca7f33d37b28409959813daefc1446a14247f1"
[[package]] [[package]]
name = "downcast-rs" name = "downcast-rs"
version = "1.2.0" version = "1.2.0"
@ -1332,6 +1338,12 @@ dependencies = [
"percent-encoding", "percent-encoding",
] ]
[[package]]
name = "fragile"
version = "2.0.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "6c2141d6d6c8512188a7891b4b01590a45f6dac67afb4f255c4124dbb86d4eaa"
[[package]] [[package]]
name = "futures" name = "futures"
version = "0.3.28" version = "0.3.28"
@ -2111,6 +2123,33 @@ dependencies = [
"windows-sys 0.48.0", "windows-sys 0.48.0",
] ]
[[package]]
name = "mockall"
version = "0.12.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "43766c2b5203b10de348ffe19f7e54564b64f3d6018ff7648d1e2d6d3a0f0a48"
dependencies = [
"cfg-if 1.0.0",
"downcast",
"fragile",
"lazy_static",
"mockall_derive",
"predicates",
"predicates-tree",
]
[[package]]
name = "mockall_derive"
version = "0.12.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "af7cbce79ec385a1d4f54baa90a76401eb15d9cab93685f62e7e9f942aa00ae2"
dependencies = [
"cfg-if 1.0.0",
"proc-macro2",
"quote",
"syn 2.0.64",
]
[[package]] [[package]]
name = "mpsc_requests" name = "mpsc_requests"
version = "0.3.3" version = "0.3.3"
@ -2536,6 +2575,32 @@ version = "0.2.17"
source = "registry+https://github.com/rust-lang/crates.io-index" source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "5b40af805b3121feab8a3c29f04d8ad262fa8e0561883e7653e024ae4479e6de" checksum = "5b40af805b3121feab8a3c29f04d8ad262fa8e0561883e7653e024ae4479e6de"
[[package]]
name = "predicates"
version = "3.1.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "68b87bfd4605926cdfefc1c3b5f8fe560e3feca9d5552cf68c466d3d8236c7e8"
dependencies = [
"anstyle",
"predicates-core",
]
[[package]]
name = "predicates-core"
version = "1.0.6"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "b794032607612e7abeb4db69adb4e33590fa6cf1149e95fd7cb00e634b92f174"
[[package]]
name = "predicates-tree"
version = "1.0.9"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "368ba315fb8c5052ab692e68a0eefec6ec57b23a36959c14496f0b0df2c0cecf"
dependencies = [
"predicates-core",
"termtree",
]
[[package]] [[package]]
name = "proc-macro-crate" name = "proc-macro-crate"
version = "3.1.0" version = "3.1.0"
@ -3476,6 +3541,12 @@ dependencies = [
"windows-sys 0.52.0", "windows-sys 0.52.0",
] ]
[[package]]
name = "termtree"
version = "0.4.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "3369f5ac52d5eb6ab48c6b4ffdc8efbcad6b89c765749064ba298f2c68a16a76"
[[package]] [[package]]
name = "textwrap" name = "textwrap"
version = "0.11.0" version = "0.11.0"
@ -4040,6 +4111,7 @@ dependencies = [
"dirs 5.0.1", "dirs 5.0.1",
"gethostname", "gethostname",
"log", "log",
"mockall",
"regex", "regex",
"rstest", "rstest",
"serde", "serde",

View File

@ -12,6 +12,7 @@ path = "src/lib.rs"
[dev-dependencies] [dev-dependencies]
rstest = "0.21.0" rstest = "0.21.0"
tempfile = "3.10.1" tempfile = "3.10.1"
mockall = "0.12.1"
[dependencies] [dependencies]
aw-client-rust = { git = "https://github.com/ActivityWatch/aw-server-rust", rev = "bb787fd" } aw-client-rust = { git = "https://github.com/ActivityWatch/aw-server-rust", rev = "bb787fd" }
@ -32,7 +33,7 @@ gethostname = "0.4.3"
log = { workspace = true } log = { workspace = true }
anyhow = { workspace = true } anyhow = { workspace = true }
async-trait = "0.1.80" async-trait = "0.1.80"
tokio = { workspace = true, features = ["time", "sync"] } tokio = { workspace = true, features = ["time", "sync", "macros"] }
[features] [features]
default = ["gnome", "kwin_window"] default = ["gnome", "kwin_window"]

View File

@ -3,6 +3,7 @@ extern crate log;
pub mod config; pub mod config;
mod report_client; mod report_client;
mod subscriber;
mod watchers; mod watchers;
pub use crate::report_client::ReportClient; pub use crate::report_client::ReportClient;

View File

@ -1,5 +1,8 @@
use crate::subscriber::IdleSubscriber;
use super::config::Config; use super::config::Config;
use anyhow::Context; use anyhow::Context;
use async_trait::async_trait;
use aw_client_rust::{AwClient, Event as AwEvent}; use aw_client_rust::{AwClient, Event as AwEvent};
use chrono::{DateTime, TimeDelta, Utc}; use chrono::{DateTime, TimeDelta, Utc};
use serde_json::{Map, Value}; use serde_json::{Map, Value};
@ -151,3 +154,57 @@ impl ReportClient {
.with_context(|| format!("Failed to create bucket {bucket_name}")) .with_context(|| format!("Failed to create bucket {bucket_name}"))
} }
} }
#[async_trait]
impl IdleSubscriber for ReportClient {
async fn idle(
&self,
changed: bool,
last_input_time: DateTime<Utc>,
duration: TimeDelta,
) -> anyhow::Result<()> {
if changed {
debug!(
"Reporting as changed to idle for {} seconds since {}",
duration.num_seconds(),
last_input_time.format("%Y-%m-%d %H:%M:%S"),
);
self.ping(false, last_input_time, TimeDelta::zero()).await?;
// ping with timestamp+1ms with the next event (to ensure the latest event gets retrieved by get_event)
self.ping(true, last_input_time, duration + TimeDelta::milliseconds(1))
.await
} else {
trace!(
"Reporting as idle for {} seconds since {}",
duration.num_seconds(),
last_input_time.format("%Y-%m-%d %H:%M:%S"),
);
self.ping(true, last_input_time, duration).await
}
}
async fn non_idle(&self, changed: bool, last_input_time: DateTime<Utc>) -> anyhow::Result<()> {
if changed {
debug!(
"Reporting as no longer idle at {}",
last_input_time.format("%Y-%m-%d %H:%M:%S")
);
self.ping(true, last_input_time, TimeDelta::zero()).await?;
self.ping(
false,
last_input_time + TimeDelta::milliseconds(1),
TimeDelta::zero(),
)
.await
} else {
trace!(
"Reporting as not idle at {}",
last_input_time.format("%Y-%m-%d %H:%M:%S")
);
self.ping(false, last_input_time, TimeDelta::zero()).await
}
}
}

View File

@ -0,0 +1,14 @@
use async_trait::async_trait;
use chrono::{DateTime, TimeDelta, Utc};
#[async_trait]
pub trait IdleSubscriber: Sync + Send {
async fn idle(
&self,
changed: bool,
last_input_time: DateTime<Utc>,
duration: TimeDelta,
) -> anyhow::Result<()>;
async fn non_idle(&self, changed: bool, last_input_time: DateTime<Utc>) -> anyhow::Result<()>;
}

View File

@ -35,7 +35,7 @@ impl Watcher for IdleWatcher {
load_watcher(|| async move { load_watcher(|| async move {
let mut watcher = Self { let mut watcher = Self {
dbus_connection: Connection::session().await?, dbus_connection: Connection::session().await?,
idle_state: idle::State::new(duration), idle_state: idle::State::new(duration, client.clone()),
}; };
watcher.seconds_since_input().await?; watcher.seconds_since_input().await?;
Ok(watcher) Ok(watcher)
@ -43,11 +43,9 @@ impl Watcher for IdleWatcher {
.await .await
} }
async fn run_iteration(&mut self, client: &Arc<ReportClient>) -> anyhow::Result<()> { async fn run_iteration(&mut self, _: &Arc<ReportClient>) -> anyhow::Result<()> {
let seconds = self.seconds_since_input().await?; let seconds = self.seconds_since_input().await?;
self.idle_state self.idle_state.send_with_last_input(seconds).await?;
.send_with_last_input(seconds, client)
.await?;
Ok(()) Ok(())
} }

View File

@ -1,4 +1,4 @@
use crate::report_client::ReportClient; use crate::subscriber::IdleSubscriber;
use chrono::{DateTime, TimeDelta, Utc}; use chrono::{DateTime, TimeDelta, Utc};
use std::{cmp::max, sync::Arc}; use std::{cmp::max, sync::Arc};
@ -8,13 +8,14 @@ pub struct State {
is_idle: bool, is_idle: bool,
is_changed: bool, is_changed: bool,
idle_timeout: TimeDelta, idle_timeout: TimeDelta,
subscriber: Arc<dyn IdleSubscriber>,
idle_start: Option<DateTime<Utc>>, idle_start: Option<DateTime<Utc>>,
idle_end: Option<DateTime<Utc>>, idle_end: Option<DateTime<Utc>>,
} }
impl State { impl State {
pub fn new(idle_timeout: TimeDelta) -> Self { pub fn new(idle_timeout: TimeDelta, subscriber: Arc<dyn IdleSubscriber>) -> Self {
Self { Self {
last_input_time: Utc::now(), last_input_time: Utc::now(),
changed_time: Utc::now(), changed_time: Utc::now(),
@ -23,6 +24,7 @@ impl State {
idle_timeout, idle_timeout,
idle_start: None, idle_start: None,
idle_end: None, idle_end: None,
subscriber,
} }
} }
@ -47,11 +49,7 @@ impl State {
// The logic is rewritten from the original Python code: // The logic is rewritten from the original Python code:
// https://github.com/ActivityWatch/aw-watcher-afk/blob/ef531605cd8238e00138bbb980e5457054e05248/aw_watcher_afk/afk.py#L73 // https://github.com/ActivityWatch/aw-watcher-afk/blob/ef531605cd8238e00138bbb980e5457054e05248/aw_watcher_afk/afk.py#L73
pub async fn send_with_last_input( pub async fn send_with_last_input(&mut self, seconds_since_input: u32) -> anyhow::Result<()> {
&mut self,
seconds_since_input: u32,
client: &Arc<ReportClient>,
) -> anyhow::Result<()> {
let now = Utc::now(); let now = Utc::now();
let time_since_input = TimeDelta::seconds(i64::from(seconds_since_input)); let time_since_input = TimeDelta::seconds(i64::from(seconds_since_input));
@ -69,10 +67,10 @@ impl State {
self.set_idle(true, now); self.set_idle(true, now);
} }
self.send_ping(now, client).await self.send_ping(now).await
} }
pub async fn send_reactive(&mut self, client: &Arc<ReportClient>) -> anyhow::Result<()> { pub async fn send_reactive(&mut self) -> anyhow::Result<()> {
let now = Utc::now(); let now = Utc::now();
if !self.is_idle { if !self.is_idle {
self.last_input_time = max(now - self.idle_timeout, self.changed_time); self.last_input_time = max(now - self.idle_timeout, self.changed_time);
@ -90,67 +88,79 @@ impl State {
} }
} }
self.send_ping(now, client).await self.send_ping(now).await
} }
async fn send_ping( async fn send_ping(&mut self, now: DateTime<Utc>) -> anyhow::Result<()> {
&mut self,
now: DateTime<Utc>,
client: &Arc<ReportClient>,
) -> anyhow::Result<()> {
if self.is_changed { if self.is_changed {
let result = if self.is_idle { if self.is_idle {
debug!( self.subscriber
"Reporting as changed to idle for {} seconds since {}", .idle(
(now - self.last_input_time).num_seconds(), self.is_changed,
self.last_input_time.format("%Y-%m-%d %H:%M:%S"), self.last_input_time,
); now - self.last_input_time,
client
.ping(false, self.last_input_time, TimeDelta::zero())
.await?;
// ping with timestamp+1ms with the next event (to ensure the latest event gets retrieved by get_event)
self.last_input_time += TimeDelta::milliseconds(1);
client
.ping(true, self.last_input_time, now - self.last_input_time)
.await
} else {
debug!(
"Reporting as no longer idle at {}",
self.last_input_time.format("%Y-%m-%d %H:%M:%S")
);
client
.ping(true, self.last_input_time, TimeDelta::zero())
.await?;
client
.ping(
false,
self.last_input_time + TimeDelta::milliseconds(1),
TimeDelta::zero(),
) )
.await .await?;
} else {
self.subscriber
.non_idle(self.is_changed, self.last_input_time)
.await?;
}; };
self.is_changed = false;
result
} else if self.is_idle { } else if self.is_idle {
trace!( self.subscriber
"Reporting as idle for {} seconds since {}", .idle(
(now - self.last_input_time).num_seconds(), self.is_changed,
self.last_input_time.format("%Y-%m-%d %H:%M:%S"), self.last_input_time,
); now - self.last_input_time,
client )
.ping(true, self.last_input_time, now - self.last_input_time) .await?;
.await
} else { } else {
trace!( self.subscriber
"Reporting as not idle at {}", .non_idle(self.is_changed, self.last_input_time)
self.last_input_time.format("%Y-%m-%d %H:%M:%S") .await?;
);
client
.ping(false, self.last_input_time, TimeDelta::zero())
.await
} }
self.is_changed = false;
Ok(())
}
}
#[cfg(test)]
mod tests {
use super::*;
use async_trait::async_trait;
use chrono::Duration;
use mockall::mock;
use rstest::rstest;
mock! {
pub Subscriber {}
#[async_trait]
impl IdleSubscriber for Subscriber {
async fn idle(&self, changed: bool, last_input_time: DateTime<Utc>, duration: TimeDelta) -> anyhow::Result<()>;
async fn non_idle(&self, changed: bool, last_input_time: DateTime<Utc>) -> anyhow::Result<()>;
}
}
#[rstest]
#[tokio::test]
async fn test_mark_not_idle() {
let subscriber = Arc::new(MockSubscriber::new());
let mut state = State::new(Duration::seconds(300), subscriber.clone());
state.mark_not_idle();
assert!(!state.is_idle);
assert!(state.is_changed);
}
#[rstest]
#[tokio::test]
async fn test_mark_idle() {
let subscriber = Arc::new(MockSubscriber::new());
let mut state = State::new(Duration::seconds(300), subscriber.clone());
state.mark_idle();
assert!(state.is_idle);
assert!(state.is_changed);
} }
} }

View File

@ -2,6 +2,7 @@ use super::idle;
use super::wl_connection::{subscribe_state, WlEventConnection}; use super::wl_connection::{subscribe_state, WlEventConnection};
use super::Watcher; use super::Watcher;
use crate::report_client::ReportClient; use crate::report_client::ReportClient;
use crate::subscriber::IdleSubscriber;
use anyhow::anyhow; use anyhow::anyhow;
use async_trait::async_trait; use async_trait::async_trait;
use chrono::TimeDelta; use chrono::TimeDelta;
@ -28,10 +29,14 @@ impl Drop for WatcherState {
} }
impl WatcherState { impl WatcherState {
fn new(idle_notification: ExtIdleNotificationV1, idle_timeout: TimeDelta) -> Self { fn new(
idle_notification: ExtIdleNotificationV1,
idle_timeout: TimeDelta,
subscriber: Arc<dyn IdleSubscriber>,
) -> Self {
Self { Self {
idle_notification, idle_notification,
idle_state: idle::State::new(idle_timeout), idle_state: idle::State::new(idle_timeout, subscriber),
} }
} }
@ -85,6 +90,7 @@ impl Watcher for IdleWatcher {
.get_ext_idle_notification(timeout.unwrap()) .get_ext_idle_notification(timeout.unwrap())
.unwrap(), .unwrap(),
client.config.idle_timeout, client.config.idle_timeout,
client.clone(),
); );
connection connection
.event_queue .event_queue
@ -97,12 +103,12 @@ impl Watcher for IdleWatcher {
}) })
} }
async fn run_iteration(&mut self, client: &Arc<ReportClient>) -> anyhow::Result<()> { async fn run_iteration(&mut self, _: &Arc<ReportClient>) -> anyhow::Result<()> {
self.connection self.connection
.event_queue .event_queue
.roundtrip(&mut self.watcher_state) .roundtrip(&mut self.watcher_state)
.map_err(|e| anyhow!("Event queue is not processed: {e}"))?; .map_err(|e| anyhow!("Event queue is not processed: {e}"))?;
self.watcher_state.idle_state.send_reactive(client).await self.watcher_state.idle_state.send_reactive().await
} }
} }

View File

@ -2,6 +2,7 @@ use super::idle;
use super::wl_connection::{subscribe_state, WlEventConnection}; use super::wl_connection::{subscribe_state, WlEventConnection};
use super::Watcher; use super::Watcher;
use crate::report_client::ReportClient; use crate::report_client::ReportClient;
use crate::subscriber::IdleSubscriber;
use anyhow::anyhow; use anyhow::anyhow;
use async_trait::async_trait; use async_trait::async_trait;
use chrono::TimeDelta; use chrono::TimeDelta;
@ -29,10 +30,14 @@ impl Drop for WatcherState {
} }
impl WatcherState { impl WatcherState {
fn new(kwin_idle_timeout: OrgKdeKwinIdleTimeout, idle_timeout: TimeDelta) -> Self { fn new(
kwin_idle_timeout: OrgKdeKwinIdleTimeout,
idle_timeout: TimeDelta,
subscriber: Arc<dyn IdleSubscriber>,
) -> Self {
Self { Self {
kwin_idle_timeout, kwin_idle_timeout,
idle_state: idle::State::new(idle_timeout), idle_state: idle::State::new(idle_timeout, subscriber),
} }
} }
@ -84,6 +89,7 @@ impl Watcher for IdleWatcher {
let mut watcher_state = WatcherState::new( let mut watcher_state = WatcherState::new(
connection.get_kwin_idle_timeout(timeout.unwrap()).unwrap(), connection.get_kwin_idle_timeout(timeout.unwrap()).unwrap(),
client.config.idle_timeout, client.config.idle_timeout,
client.clone(),
); );
connection connection
.event_queue .event_queue
@ -96,12 +102,12 @@ impl Watcher for IdleWatcher {
}) })
} }
async fn run_iteration(&mut self, client: &Arc<ReportClient>) -> anyhow::Result<()> { async fn run_iteration(&mut self, _: &Arc<ReportClient>) -> anyhow::Result<()> {
self.connection self.connection
.event_queue .event_queue
.roundtrip(&mut self.watcher_state) .roundtrip(&mut self.watcher_state)
.map_err(|e| anyhow!("Event queue is not processed: {e}"))?; .map_err(|e| anyhow!("Event queue is not processed: {e}"))?;
self.watcher_state.idle_state.send_reactive(client).await self.watcher_state.idle_state.send_reactive().await
} }
} }

View File

@ -25,15 +25,13 @@ impl Watcher for IdleWatcher {
Ok(IdleWatcher { Ok(IdleWatcher {
client, client,
idle_state: idle::State::new(report_client.config.idle_timeout), idle_state: idle::State::new(report_client.config.idle_timeout, report_client.clone()),
}) })
} }
async fn run_iteration(&mut self, client: &Arc<ReportClient>) -> anyhow::Result<()> { async fn run_iteration(&mut self, _: &Arc<ReportClient>) -> anyhow::Result<()> {
let seconds = self.seconds_since_input().await?; let seconds = self.seconds_since_input().await?;
self.idle_state self.idle_state.send_with_last_input(seconds).await?;
.send_with_last_input(seconds, client)
.await?;
Ok(()) Ok(())
} }