Add basic async support over threads

This commit is contained in:
Demmie 2023-06-17 02:14:09 -04:00
parent 5685631868
commit 790277071b
No known key found for this signature in database
GPG Key ID: B06DAA3D432C6E9A
6 changed files with 109 additions and 133 deletions

View File

@ -33,19 +33,19 @@ fern = { version = "0.6.2", features = ["colored"] }
log = { workspace = true } log = { workspace = true }
anyhow = { workspace = true } anyhow = { workspace = true }
signal-hook = "0.3.15" signal-hook = "0.3.15"
tokio = { version = "1.28.2", features = ["rt", "macros"] }
ksni = {version = "0.2.0", optional = true} ksni = {version = "0.2.0", optional = true}
aw-server = { git = "https://github.com/2e3s/aw-server-rust", optional = true, rev = "1c23a55" } aw-server = { git = "https://github.com/2e3s/aw-server-rust", optional = true, rev = "1c23a55" }
aw-datastore = { git = "https://github.com/2e3s/aw-server-rust", optional = true, rev = "1c23a55" } aw-datastore = { git = "https://github.com/2e3s/aw-server-rust", optional = true, rev = "1c23a55" }
open = { version = "4.1.0", optional = true } open = { version = "4.1.0", optional = true }
rust-embed = { version = "6.6.1", features = ["interpolate-folder-path"] } rust-embed = { version = "6.6.1", features = ["interpolate-folder-path"], optional = true }
tokio = { version = "1.28.2", optional = true }
[features] [features]
default = ["gnome", "kwin_window"] default = ["gnome", "kwin_window"]
gnome = ["watchers/gnome"] gnome = ["watchers/gnome"]
kwin_window = ["watchers/kwin_window"] kwin_window = ["watchers/kwin_window"]
bundle = ["ksni", "tokio", "aw-server", "aw-datastore", "open"] bundle = ["ksni", "aw-server", "aw-datastore", "open", "rust-embed"]
[package.metadata.deb] [package.metadata.deb]
features = ["bundle"] features = ["bundle"]

View File

@ -6,19 +6,23 @@ use std::{
path::PathBuf, path::PathBuf,
sync::{atomic::AtomicBool, Arc}, sync::{atomic::AtomicBool, Arc},
}; };
use watchers::config::Config;
pub fn run(config: &Config, config_file: PathBuf, no_tray: bool, is_stopped: Arc<AtomicBool>) { pub async fn run(
host: String,
port: u32,
config_file: PathBuf,
no_tray: bool,
is_stopped: Arc<AtomicBool>,
) {
if !no_tray { if !no_tray {
let service = ksni::TrayService::new(Tray { let service = ksni::TrayService::new(Tray {
server_host: config.host.clone(), server_host: host,
server_port: config.port, server_port: port,
config_file, config_file,
is_stopped: Arc::clone(&is_stopped), is_stopped: Arc::clone(&is_stopped),
}); });
service.spawn(); service.spawn();
} }
let port = config.port; server::run(port, is_stopped).await;
server::run(port, is_stopped);
} }

View File

@ -6,47 +6,39 @@ use std::sync::{
}; };
use tokio::time::{sleep, Duration}; use tokio::time::{sleep, Duration};
pub fn run(port: u32, is_stopped: Arc<AtomicBool>) { pub async fn run(port: u32, is_stopped: Arc<AtomicBool>) {
std::thread::spawn(move || { let db_path = aw_server::dirs::db_path(false)
let db_path = aw_server::dirs::db_path(false) .map_err(|_| anyhow!("DB path is not found"))
.map_err(|_| anyhow!("DB path is not found")) .unwrap()
.unwrap() .to_str()
.to_str() .unwrap()
.unwrap() .to_string();
.to_string(); let device_id = aw_server::device_id::get_device_id();
let device_id = aw_server::device_id::get_device_id(); let mut config = aw_server::config::create_config(false);
let mut config = aw_server::config::create_config(false); config.address = "127.0.0.1".to_string();
config.address = "127.0.0.1".to_string(); config.port = u16::try_from(port).unwrap();
config.port = u16::try_from(port).unwrap();
let legacy_import = false; let legacy_import = false;
let server_state = aw_server::endpoints::ServerState { let server_state = aw_server::endpoints::ServerState {
datastore: Mutex::new(aw_datastore::Datastore::new(db_path, legacy_import)), datastore: Mutex::new(aw_datastore::Datastore::new(db_path, legacy_import)),
asset_resolver: embed_asset_resolver!("$AW_WEBUI_DIST"), asset_resolver: embed_asset_resolver!("$AW_WEBUI_DIST"),
device_id, device_id,
}; };
let server = build_rocket(server_state, config).launch(); let server = build_rocket(server_state, config).launch();
let check = async { let check = async {
loop { loop {
if is_stopped.load(Ordering::Relaxed) { if is_stopped.load(Ordering::Relaxed) {
warn!("Received an exit signal, stopping the server"); warn!("Received an exit signal, stopping the server");
break; break;
}
sleep(Duration::from_secs(1)).await;
} }
};
tokio::runtime::Builder::new_current_thread() sleep(Duration::from_secs(1)).await;
.enable_all() }
.build() };
.unwrap()
.block_on(async { tokio::select! (
tokio::select! ( r = server => {r.unwrap();},
r = server => {r.unwrap();}, _ = check => {},
_ = check => {}, );
);
});
});
} }

View File

@ -10,10 +10,11 @@ mod config;
use std::sync::atomic::AtomicBool; use std::sync::atomic::AtomicBool;
use std::sync::Arc; use std::sync::Arc;
use watchers::ConstructorFilter; use watchers::run_first_supported;
use watchers::ReportClient; use watchers::ReportClient;
fn main() -> anyhow::Result<()> { #[tokio::main(flavor = "current_thread")]
async fn main() -> anyhow::Result<()> {
let is_stopped = Arc::new(AtomicBool::new(false)); let is_stopped = Arc::new(AtomicBool::new(false));
signal_hook::flag::register(signal_hook::consts::SIGTERM, Arc::clone(&is_stopped))?; signal_hook::flag::register(signal_hook::consts::SIGTERM, Arc::clone(&is_stopped))?;
signal_hook::flag::register(signal_hook::consts::SIGINT, Arc::clone(&is_stopped))?; signal_hook::flag::register(signal_hook::consts::SIGINT, Arc::clone(&is_stopped))?;
@ -40,33 +41,27 @@ fn main() -> anyhow::Result<()> {
config.poll_time_window.as_secs() config.poll_time_window.as_secs()
); );
#[cfg(feature = "bundle")]
bundle::run(&config, config_file, no_tray, Arc::clone(&is_stopped));
let client = ReportClient::new(config)?; let client = ReportClient::new(config)?;
let client = Arc::new(client); let client = Arc::new(client);
let mut thread_handlers = Vec::new(); let idle_handler = run_first_supported(watchers::IDLE, &client, Arc::clone(&is_stopped));
let active_window_handler =
run_first_supported(watchers::ACTIVE_WINDOW, &client, Arc::clone(&is_stopped));
if let Some(idle_handler) = watchers::IDLE.run_first_supported(&client, Arc::clone(&is_stopped)) #[cfg(not(feature = "bundle"))]
{ {
thread_handlers.push(idle_handler); tokio::select!(
} else { _ = idle_handler => Ok(()),
warn!("No supported idle handler is found"); _ = active_window_handler => Ok(()),
)
} }
if let Some(active_window_handler) = #[cfg(feature = "bundle")]
watchers::ACTIVE_WINDOW.run_first_supported(&client, is_stopped)
{ {
thread_handlers.push(active_window_handler); tokio::select!(
} else { _ = idle_handler => Ok(()),
warn!("No supported active window handler is found"); _ = active_window_handler => Ok(()),
_ = bundle::run(client.config.host.clone(), client.config.port, config_file, no_tray, Arc::clone(&is_stopped)) => Ok(()),
)
} }
for handler in thread_handlers {
if handler.join().is_err() {
error!("Thread failed with error");
}
}
Ok(())
} }

View File

@ -6,7 +6,7 @@ mod report_client;
mod watchers; mod watchers;
pub use crate::report_client::ReportClient; pub use crate::report_client::ReportClient;
pub use crate::watchers::ConstructorFilter; pub use crate::watchers::run_first_supported;
pub use crate::watchers::Watcher; pub use crate::watchers::Watcher;
pub use crate::watchers::ACTIVE_WINDOW; pub use crate::watchers::ACTIVE_WINDOW;
pub use crate::watchers::IDLE; pub use crate::watchers::IDLE;

View File

@ -20,7 +20,7 @@ use std::{
atomic::{AtomicBool, Ordering}, atomic::{AtomicBool, Ordering},
Arc, Arc,
}, },
thread::{self, JoinHandle}, thread,
time::Duration, time::Duration,
}; };
@ -52,25 +52,6 @@ pub trait Watcher: Send {
where where
Self: Sized; Self: Sized;
fn run(
&mut self,
watcher_type: &WatcherType,
client: &Arc<ReportClient>,
is_stopped: Arc<AtomicBool>,
) {
info!("Starting {watcher_type} watcher");
loop {
if is_stopped.load(Ordering::Relaxed) {
warn!("Received an exit signal, shutting down {watcher_type}");
break;
}
if let Err(e) = self.run_iteration(client) {
error!("Error on {watcher_type} iteration: {e}");
}
thread::sleep(watcher_type.sleep_time(&client.config));
}
}
fn run_iteration(&mut self, client: &Arc<ReportClient>) -> anyhow::Result<()>; fn run_iteration(&mut self, client: &Arc<ReportClient>) -> anyhow::Result<()>;
} }
@ -81,51 +62,55 @@ type WatcherConstructors = [(
fn(&Arc<ReportClient>) -> anyhow::Result<BoxedWatcher>, fn(&Arc<ReportClient>) -> anyhow::Result<BoxedWatcher>,
)]; )];
pub trait ConstructorFilter { pub fn filter_first_supported(
fn filter_first_supported( watcher_constructors: &'static WatcherConstructors,
&self, client: &Arc<ReportClient>,
client: &Arc<ReportClient>, ) -> Option<(&'static WatcherType, BoxedWatcher)> {
) -> Option<(&WatcherType, BoxedWatcher)>; watcher_constructors
.iter()
fn run_first_supported( .find_map(|(name, watcher_type, watcher)| match watcher(client) {
&'static self, Ok(watcher) => {
client: &Arc<ReportClient>, info!("Selected {name} as {watcher_type} watcher");
is_stopped: Arc<AtomicBool>, Some((watcher_type, watcher))
) -> Option<JoinHandle<()>>; }
Err(e) => {
debug!("{name} cannot run: {e}");
None
}
})
} }
impl ConstructorFilter for WatcherConstructors { async fn run_watcher(
fn filter_first_supported( watcher: &mut Box<dyn Watcher>,
&self, watcher_type: &WatcherType,
client: &Arc<ReportClient>, client: &Arc<ReportClient>,
) -> Option<(&WatcherType, BoxedWatcher)> { is_stopped: Arc<AtomicBool>,
self.iter() ) {
.find_map(|(name, watcher_type, watcher)| match watcher(client) { info!("Starting {watcher_type} watcher");
Ok(watcher) => { loop {
info!("Selected {name} as {watcher_type} watcher"); if is_stopped.load(Ordering::Relaxed) {
Some((watcher_type, watcher)) warn!("Received an exit signal, shutting down {watcher_type}");
} break;
Err(e) => {
debug!("{name} cannot run: {e}");
None
}
})
}
fn run_first_supported(
&'static self,
client: &Arc<ReportClient>,
is_stopped: Arc<AtomicBool>,
) -> Option<JoinHandle<()>> {
let idle_watcher = self.filter_first_supported(client);
if let Some((watcher_type, mut watcher)) = idle_watcher {
let thread_client = Arc::clone(client);
let idle_handler =
thread::spawn(move || watcher.run(watcher_type, &thread_client, is_stopped));
Some(idle_handler)
} else {
None
} }
if let Err(e) = watcher.run_iteration(client) {
error!("Error on {watcher_type} iteration: {e}");
}
thread::sleep(watcher_type.sleep_time(&client.config));
}
}
pub async fn run_first_supported(
watcher_constructors: &'static WatcherConstructors,
client: &Arc<ReportClient>,
is_stopped: Arc<AtomicBool>,
) -> bool {
let supported_watcher = filter_first_supported(watcher_constructors, client);
if let Some((watcher_type, mut watcher)) = supported_watcher {
let thread_client = Arc::clone(client);
run_watcher(&mut watcher, watcher_type, &thread_client, is_stopped).await;
true
} else {
false
} }
} }