Add retries for reqwest connect errors

This commit is contained in:
Demmie 2023-11-26 14:47:41 -05:00
parent c6a5df58d8
commit 70001c3523
No known key found for this signature in database
GPG Key ID: B06DAA3D432C6E9A
2 changed files with 45 additions and 8 deletions

View File

@ -3,7 +3,7 @@
use serde::{Deserialize, Serialize};
use std::os::unix::fs::PermissionsExt;
use std::path::{Path, PathBuf};
use std::process::{Child, Command, Stdio};
use std::process::{Child, Command};
#[derive(Debug, Serialize, Deserialize, Default)]
struct Watchers {
@ -43,7 +43,7 @@ impl ExternalWatcher {
}
debug!("Starting an external watcher {}", self.name());
let command = Command::new(&self.path).stdout(Stdio::null()).spawn();
let command = Command::new(&self.path).spawn();
match command {
Ok(handle) => {

View File

@ -3,6 +3,7 @@ use anyhow::Context;
use aw_client_rust::{AwClient, Event as AwEvent};
use chrono::{DateTime, Duration, Utc};
use serde_json::{Map, Value};
use std::future::Future;
pub struct ReportClient {
pub client: AwClient,
@ -31,6 +32,32 @@ impl ReportClient {
})
}
async fn run_with_retries<F, Fut, T, E>(f: F) -> Result<T, E>
where
F: Fn() -> Fut,
Fut: Future<Output = Result<T, E>>,
E: std::error::Error + Send + Sync + 'static,
{
let mut interval = tokio::time::interval(tokio::time::Duration::from_secs(1));
let mut attempts = 0;
loop {
match f().await {
Ok(val) => return Ok(val),
Err(e)
if attempts < 3
&& e.to_string()
.contains("tcp connect error: Connection refused") =>
{
warn!("Failed to connect, retrying: {}", e);
attempts += 1;
interval.tick().await;
}
Err(e) => return Err(e),
}
}
}
pub async fn ping(
&self,
is_idle: bool,
@ -55,8 +82,13 @@ impl ReportClient {
}
let pulsetime = (self.config.idle_timeout + self.config.poll_time_idle).as_secs_f64();
self.client
.heartbeat(&self.idle_bucket_name, &event, pulsetime)
let request = || {
self.client
.heartbeat(&self.idle_bucket_name, &event, pulsetime)
};
Self::run_with_retries(request)
.await
.with_context(|| "Failed to send heartbeat")
}
@ -96,8 +128,12 @@ impl ReportClient {
}
let interval_margin = self.config.poll_time_window.as_secs_f64() + 1.0;
self.client
.heartbeat(&self.active_window_bucket_name, &event, interval_margin)
let request = || {
self.client
.heartbeat(&self.active_window_bucket_name, &event, interval_margin)
};
Self::run_with_retries(request)
.await
.with_context(|| "Failed to send heartbeat for active window")
}
@ -107,8 +143,9 @@ impl ReportClient {
bucket_name: &str,
bucket_type: &str,
) -> anyhow::Result<()> {
client
.create_bucket_simple(bucket_name, bucket_type)
let request = || client.create_bucket_simple(bucket_name, bucket_type);
Self::run_with_retries(request)
.await
.with_context(|| format!("Failed to create bucket {bucket_name}"))
}