Skip to content

Commit

Permalink
Backoff (#106)
Browse files Browse the repository at this point in the history
* add more loggings

* use backoff strategy
  • Loading branch information
xlc authored Oct 3, 2023
1 parent 28ec7a1 commit c1b259f
Show file tree
Hide file tree
Showing 3 changed files with 63 additions and 15 deletions.
1 change: 1 addition & 0 deletions .rustfmt.toml
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
reorder_imports = true
max_width = 100

ignore = [
"vendor/*",
Expand Down
10 changes: 6 additions & 4 deletions src/extensions/api/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -61,13 +61,15 @@ pub(crate) fn validate_new_head(
) -> anyhow::Result<()> {
if let Some((current_hash, current_number)) = tx.borrow().as_ref() {
if *current_number > number {
return Err(anyhow::Error::msg(format!("Head number is not increasing, current_number: {current_number} new_number: {number}")));
return Err(anyhow::Error::msg(format!(
"Head number is not increasing, current_number: {current_number} new_number: {number}"
)));
}

if *current_number == number && current_hash != hash {
return Err(anyhow::Error::msg(
format!("Head number is the same but hash is not matching, current_hash: {current_hash} new_hash: {hash}")
));
return Err(anyhow::Error::msg(format!(
"Head number is the same but hash is not matching, current_hash: {current_hash} new_hash: {hash}"
)));
}
}

Expand Down
67 changes: 56 additions & 11 deletions src/extensions/client/mod.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,10 @@
use std::sync::{atomic::AtomicUsize, Arc};
use std::{
sync::{
atomic::{AtomicU32, AtomicUsize},
Arc,
},
time::Duration,
};

use anyhow::anyhow;
use async_trait::async_trait;
Expand Down Expand Up @@ -101,8 +107,12 @@ impl Client {
tokio::spawn(async move {
let tx = tx2;

let connect_backoff_counter = Arc::new(AtomicU32::new(0));
let request_backoff_counter = Arc::new(AtomicU32::new(0));

let current_endpoint = AtomicUsize::new(0);

let connect_backoff_counter2 = connect_backoff_counter.clone();
let build_ws = || async {
let build = || {
let current_endpoint =
Expand Down Expand Up @@ -131,6 +141,7 @@ impl Client {
let ws2 = ws.clone();

tracing::info!("Endpoint connected");
connect_backoff_counter2.store(0, std::sync::atomic::Ordering::Relaxed);

tokio::spawn(async move {
ws2.on_disconnect().await;
Expand All @@ -143,8 +154,7 @@ impl Client {
}
Err((e, url)) => {
tracing::warn!("Unable to connect to endpoint: '{url}' error: {e}");
// TODO: use a backoff strategy
tokio::time::sleep(std::time::Duration::from_millis(1000)).await;
tokio::time::sleep(get_backoff_time(&connect_backoff_counter2)).await;
}
}
}
Expand All @@ -154,6 +164,7 @@ impl Client {

let handle_message = |message: Message, ws: Arc<WsClient>| {
let tx = tx.clone();
let request_backoff_counter = request_backoff_counter.clone();

tokio::spawn(async move {
match message {
Expand All @@ -165,6 +176,9 @@ impl Client {
let result = ws.request(&method, params.clone()).await;
match result {
result @ Ok(_) => {
request_backoff_counter
.store(0, std::sync::atomic::Ordering::Relaxed);

if let Err(e) = response.send(result) {
tracing::warn!("Failed to send response: {:?}", e);
}
Expand Down Expand Up @@ -196,9 +210,8 @@ impl Client {
Error::Transport(_)
| Error::RestartNeeded(_)
| Error::MaxSlotsExceeded => {
// TODO: use a backoff strategy
tokio::time::sleep(std::time::Duration::from_millis(
200,
tokio::time::sleep(get_backoff_time(
&request_backoff_counter,
))
.await;

Expand Down Expand Up @@ -236,6 +249,9 @@ impl Client {
ws.subscribe(&subscribe, params.clone(), &unsubscribe).await;
match result {
result @ Ok(_) => {
request_backoff_counter
.store(0, std::sync::atomic::Ordering::Relaxed);

if let Err(e) = response.send(result) {
tracing::warn!("Failed to send response: {:?}", e);
}
Expand Down Expand Up @@ -268,9 +284,8 @@ impl Client {
Error::Transport(_)
| Error::RestartNeeded(_)
| Error::MaxSlotsExceeded => {
// TODO: use a backoff strategy
tokio::time::sleep(std::time::Duration::from_millis(
200,
tokio::time::sleep(get_backoff_time(
&request_backoff_counter,
))
.await;

Expand Down Expand Up @@ -310,8 +325,7 @@ impl Client {
tokio::select! {
_ = disconnect_rx.recv() => {
tracing::info!("Disconnected from endpoint");
// TODO: use a backoff strategy
tokio::time::sleep(std::time::Duration::from_millis(100)).await;
tokio::time::sleep(get_backoff_time(&connect_backoff_counter)).await;
ws = build_ws().await;
}
message = rx.recv() => {
Expand Down Expand Up @@ -386,3 +400,34 @@ impl Client {
.map_err(|_| ())
}
}

fn get_backoff_time(counter: &Arc<AtomicU32>) -> Duration {
let min_time = 100u64;
let step = 100u64;
let max_count = 10u32;

let backoff_count = counter.fetch_add(1, std::sync::atomic::Ordering::Relaxed);

let backoff_count = backoff_count.min(max_count) as u64;
let backoff_time = backoff_count * backoff_count * step;

Duration::from_millis(backoff_time + min_time)
}

#[test]
fn test_get_backoff_time() {
let counter = Arc::new(AtomicU32::new(0));

let mut times = Vec::new();

for _ in 0..12 {
times.push(get_backoff_time(&counter));
}

let times = times.into_iter().map(|t| t.as_millis()).collect::<Vec<_>>();

assert_eq!(
times,
vec![100, 200, 500, 1000, 1700, 2600, 3700, 5000, 6500, 8200, 10100, 10100]
);
}

0 comments on commit c1b259f

Please sign in to comment.