Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Backoff #106

Merged
merged 7 commits into from
Oct 3, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions .rustfmt.toml
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
reorder_imports = true
max_width = 100

ignore = [
"vendor/*",
Expand Down
10 changes: 6 additions & 4 deletions src/extensions/api/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -61,13 +61,15 @@ pub(crate) fn validate_new_head(
) -> anyhow::Result<()> {
if let Some((current_hash, current_number)) = tx.borrow().as_ref() {
if *current_number > number {
return Err(anyhow::Error::msg(format!("Head number is not increasing, current_number: {current_number} new_number: {number}")));
return Err(anyhow::Error::msg(format!(
"Head number is not increasing, current_number: {current_number} new_number: {number}"
)));
}

if *current_number == number && current_hash != hash {
return Err(anyhow::Error::msg(
format!("Head number is the same but hash is not matching, current_hash: {current_hash} new_hash: {hash}")
));
return Err(anyhow::Error::msg(format!(
"Head number is the same but hash is not matching, current_hash: {current_hash} new_hash: {hash}"
)));
}
}

Expand Down
67 changes: 56 additions & 11 deletions src/extensions/client/mod.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,10 @@
use std::sync::{atomic::AtomicUsize, Arc};
use std::{
sync::{
atomic::{AtomicU32, AtomicUsize},
Arc,
},
time::Duration,
};

use anyhow::anyhow;
use async_trait::async_trait;
Expand Down Expand Up @@ -101,8 +107,12 @@ impl Client {
tokio::spawn(async move {
let tx = tx2;

let connect_backoff_counter = Arc::new(AtomicU32::new(0));
let request_backoff_counter = Arc::new(AtomicU32::new(0));

let current_endpoint = AtomicUsize::new(0);

let connect_backoff_counter2 = connect_backoff_counter.clone();
let build_ws = || async {
let build = || {
let current_endpoint =
Expand Down Expand Up @@ -131,6 +141,7 @@ impl Client {
let ws2 = ws.clone();

tracing::info!("Endpoint connected");
connect_backoff_counter2.store(0, std::sync::atomic::Ordering::Relaxed);

tokio::spawn(async move {
ws2.on_disconnect().await;
Expand All @@ -143,8 +154,7 @@ impl Client {
}
Err((e, url)) => {
tracing::warn!("Unable to connect to endpoint: '{url}' error: {e}");
// TODO: use a backoff strategy
tokio::time::sleep(std::time::Duration::from_millis(1000)).await;
tokio::time::sleep(get_backoff_time(&connect_backoff_counter2)).await;
}
}
}
Expand All @@ -154,6 +164,7 @@ impl Client {

let handle_message = |message: Message, ws: Arc<WsClient>| {
let tx = tx.clone();
let request_backoff_counter = request_backoff_counter.clone();

tokio::spawn(async move {
match message {
Expand All @@ -165,6 +176,9 @@ impl Client {
let result = ws.request(&method, params.clone()).await;
match result {
result @ Ok(_) => {
request_backoff_counter
.store(0, std::sync::atomic::Ordering::Relaxed);

if let Err(e) = response.send(result) {
tracing::warn!("Failed to send response: {:?}", e);
}
Expand Down Expand Up @@ -196,9 +210,8 @@ impl Client {
Error::Transport(_)
| Error::RestartNeeded(_)
| Error::MaxSlotsExceeded => {
// TODO: use a backoff strategy
tokio::time::sleep(std::time::Duration::from_millis(
200,
tokio::time::sleep(get_backoff_time(
&request_backoff_counter,
))
.await;

Expand Down Expand Up @@ -236,6 +249,9 @@ impl Client {
ws.subscribe(&subscribe, params.clone(), &unsubscribe).await;
match result {
result @ Ok(_) => {
request_backoff_counter
.store(0, std::sync::atomic::Ordering::Relaxed);

if let Err(e) = response.send(result) {
tracing::warn!("Failed to send response: {:?}", e);
}
Expand Down Expand Up @@ -268,9 +284,8 @@ impl Client {
Error::Transport(_)
| Error::RestartNeeded(_)
| Error::MaxSlotsExceeded => {
// TODO: use a backoff strategy
tokio::time::sleep(std::time::Duration::from_millis(
200,
tokio::time::sleep(get_backoff_time(
&request_backoff_counter,
))
.await;

Expand Down Expand Up @@ -310,8 +325,7 @@ impl Client {
tokio::select! {
_ = disconnect_rx.recv() => {
tracing::info!("Disconnected from endpoint");
// TODO: use a backoff strategy
tokio::time::sleep(std::time::Duration::from_millis(100)).await;
tokio::time::sleep(get_backoff_time(&connect_backoff_counter)).await;
ws = build_ws().await;
}
message = rx.recv() => {
Expand Down Expand Up @@ -386,3 +400,34 @@ impl Client {
.map_err(|_| ())
}
}

fn get_backoff_time(counter: &Arc<AtomicU32>) -> Duration {
let min_time = 100u64;
let step = 100u64;
let max_count = 10u32;

let backoff_count = counter.fetch_add(1, std::sync::atomic::Ordering::Relaxed);

let backoff_count = backoff_count.min(max_count) as u64;
let backoff_time = backoff_count * backoff_count * step;

Duration::from_millis(backoff_time + min_time)
}

#[test]
fn test_get_backoff_time() {
let counter = Arc::new(AtomicU32::new(0));

let mut times = Vec::new();

for _ in 0..12 {
times.push(get_backoff_time(&counter));
}

let times = times.into_iter().map(|t| t.as_millis()).collect::<Vec<_>>();

assert_eq!(
times,
vec![100, 200, 500, 1000, 1700, 2600, 3700, 5000, 6500, 8200, 10100, 10100]
);
}