diff --git a/.rustfmt.toml b/.rustfmt.toml index f8aacc1..e0413b9 100644 --- a/.rustfmt.toml +++ b/.rustfmt.toml @@ -1,4 +1,5 @@ reorder_imports = true +max_width = 100 ignore = [ "vendor/*", diff --git a/src/extensions/api/mod.rs b/src/extensions/api/mod.rs index ea0751b..4a0e6a7 100644 --- a/src/extensions/api/mod.rs +++ b/src/extensions/api/mod.rs @@ -61,13 +61,15 @@ pub(crate) fn validate_new_head( ) -> anyhow::Result<()> { if let Some((current_hash, current_number)) = tx.borrow().as_ref() { if *current_number > number { - return Err(anyhow::Error::msg(format!("Head number is not increasing, current_number: {current_number} new_number: {number}"))); + return Err(anyhow::Error::msg(format!( + "Head number is not increasing, current_number: {current_number} new_number: {number}" + ))); } if *current_number == number && current_hash != hash { - return Err(anyhow::Error::msg( - format!("Head number is the same but hash is not matching, current_hash: {current_hash} new_hash: {hash}") - )); + return Err(anyhow::Error::msg(format!( + "Head number is the same but hash is not matching, current_hash: {current_hash} new_hash: {hash}" + ))); } } diff --git a/src/extensions/client/mod.rs b/src/extensions/client/mod.rs index efcd237..59b7271 100644 --- a/src/extensions/client/mod.rs +++ b/src/extensions/client/mod.rs @@ -1,4 +1,10 @@ -use std::sync::{atomic::AtomicUsize, Arc}; +use std::{ + sync::{ + atomic::{AtomicU32, AtomicUsize}, + Arc, + }, + time::Duration, +}; use anyhow::anyhow; use async_trait::async_trait; @@ -101,8 +107,12 @@ impl Client { tokio::spawn(async move { let tx = tx2; + let connect_backoff_counter = Arc::new(AtomicU32::new(0)); + let request_backoff_counter = Arc::new(AtomicU32::new(0)); + let current_endpoint = AtomicUsize::new(0); + let connect_backoff_counter2 = connect_backoff_counter.clone(); let build_ws = || async { let build = || { let current_endpoint = @@ -131,6 +141,7 @@ impl Client { let ws2 = ws.clone(); tracing::info!("Endpoint connected"); + connect_backoff_counter2.store(0, std::sync::atomic::Ordering::Relaxed); tokio::spawn(async move { ws2.on_disconnect().await; @@ -143,8 +154,7 @@ impl Client { } Err((e, url)) => { tracing::warn!("Unable to connect to endpoint: '{url}' error: {e}"); - // TODO: use a backoff strategy - tokio::time::sleep(std::time::Duration::from_millis(1000)).await; + tokio::time::sleep(get_backoff_time(&connect_backoff_counter2)).await; } } } @@ -154,6 +164,7 @@ impl Client { let handle_message = |message: Message, ws: Arc| { let tx = tx.clone(); + let request_backoff_counter = request_backoff_counter.clone(); tokio::spawn(async move { match message { @@ -165,6 +176,9 @@ impl Client { let result = ws.request(&method, params.clone()).await; match result { result @ Ok(_) => { + request_backoff_counter + .store(0, std::sync::atomic::Ordering::Relaxed); + if let Err(e) = response.send(result) { tracing::warn!("Failed to send response: {:?}", e); } @@ -196,9 +210,8 @@ impl Client { Error::Transport(_) | Error::RestartNeeded(_) | Error::MaxSlotsExceeded => { - // TODO: use a backoff strategy - tokio::time::sleep(std::time::Duration::from_millis( - 200, + tokio::time::sleep(get_backoff_time( + &request_backoff_counter, )) .await; @@ -236,6 +249,9 @@ impl Client { ws.subscribe(&subscribe, params.clone(), &unsubscribe).await; match result { result @ Ok(_) => { + request_backoff_counter + .store(0, std::sync::atomic::Ordering::Relaxed); + if let Err(e) = response.send(result) { tracing::warn!("Failed to send response: {:?}", e); } @@ -268,9 +284,8 @@ impl Client { Error::Transport(_) | Error::RestartNeeded(_) | Error::MaxSlotsExceeded => { - // TODO: use a backoff strategy - tokio::time::sleep(std::time::Duration::from_millis( - 200, + tokio::time::sleep(get_backoff_time( + &request_backoff_counter, )) .await; @@ -310,8 +325,7 @@ impl Client { tokio::select! { _ = disconnect_rx.recv() => { tracing::info!("Disconnected from endpoint"); - // TODO: use a backoff strategy - tokio::time::sleep(std::time::Duration::from_millis(100)).await; + tokio::time::sleep(get_backoff_time(&connect_backoff_counter)).await; ws = build_ws().await; } message = rx.recv() => { @@ -386,3 +400,34 @@ impl Client { .map_err(|_| ()) } } + +fn get_backoff_time(counter: &Arc) -> Duration { + let min_time = 100u64; + let step = 100u64; + let max_count = 10u32; + + let backoff_count = counter.fetch_add(1, std::sync::atomic::Ordering::Relaxed); + + let backoff_count = backoff_count.min(max_count) as u64; + let backoff_time = backoff_count * backoff_count * step; + + Duration::from_millis(backoff_time + min_time) +} + +#[test] +fn test_get_backoff_time() { + let counter = Arc::new(AtomicU32::new(0)); + + let mut times = Vec::new(); + + for _ in 0..12 { + times.push(get_backoff_time(&counter)); + } + + let times = times.into_iter().map(|t| t.as_millis()).collect::>(); + + assert_eq!( + times, + vec![100, 200, 500, 1000, 1700, 2600, 3700, 5000, 6500, 8200, 10100, 10100] + ); +}