From 3609d449c38398d779a6aa2597c112c94f03fa71 Mon Sep 17 00:00:00 2001 From: Bryan Chen Date: Tue, 3 Oct 2023 16:31:36 +1300 Subject: [PATCH 1/6] add more loggings --- src/extensions/client/mod.rs | 4 ++++ 1 file changed, 4 insertions(+) diff --git a/src/extensions/client/mod.rs b/src/extensions/client/mod.rs index 1692595..efcd237 100644 --- a/src/extensions/client/mod.rs +++ b/src/extensions/client/mod.rs @@ -130,8 +130,11 @@ impl Client { let ws = Arc::new(ws); let ws2 = ws.clone(); + tracing::info!("Endpoint connected"); + tokio::spawn(async move { ws2.on_disconnect().await; + tracing::info!("Endpoint disconnected"); if let Err(e) = disconnect_tx.send(()).await { tracing::warn!("Unable to send disconnect: {}", e); } @@ -315,6 +318,7 @@ impl Client { tracing::trace!("Received message {message:?}"); match message { Some(Message::RotateEndpoint) => { + tracing::info!("Rotate endpoint"); ws = build_ws().await; } Some(message) => handle_message(message, ws.clone()), From 46e4f209deff5f838b3a2dfed10fef5f29c63b49 Mon Sep 17 00:00:00 2001 From: Bryan Chen Date: Tue, 3 Oct 2023 17:13:35 +1300 Subject: [PATCH 2/6] use backoff strategy --- .rustfmt.toml | 1 + src/extensions/api/mod.rs | 10 +++--- src/extensions/client/mod.rs | 63 +++++++++++++++++++++++++++++------- 3 files changed, 59 insertions(+), 15 deletions(-) diff --git a/.rustfmt.toml b/.rustfmt.toml index f8aacc1..e0413b9 100644 --- a/.rustfmt.toml +++ b/.rustfmt.toml @@ -1,4 +1,5 @@ reorder_imports = true +max_width = 100 ignore = [ "vendor/*", diff --git a/src/extensions/api/mod.rs b/src/extensions/api/mod.rs index ea0751b..4a0e6a7 100644 --- a/src/extensions/api/mod.rs +++ b/src/extensions/api/mod.rs @@ -61,13 +61,15 @@ pub(crate) fn validate_new_head( ) -> anyhow::Result<()> { if let Some((current_hash, current_number)) = tx.borrow().as_ref() { if *current_number > number { - return Err(anyhow::Error::msg(format!("Head number is not increasing, current_number: {current_number} new_number: {number}"))); + return Err(anyhow::Error::msg(format!( + "Head number is not increasing, current_number: {current_number} new_number: {number}" + ))); } if *current_number == number && current_hash != hash { - return Err(anyhow::Error::msg( - format!("Head number is the same but hash is not matching, current_hash: {current_hash} new_hash: {hash}") - )); + return Err(anyhow::Error::msg(format!( + "Head number is the same but hash is not matching, current_hash: {current_hash} new_hash: {hash}" + ))); } } diff --git a/src/extensions/client/mod.rs b/src/extensions/client/mod.rs index efcd237..c4ebc8b 100644 --- a/src/extensions/client/mod.rs +++ b/src/extensions/client/mod.rs @@ -1,4 +1,10 @@ -use std::sync::{atomic::AtomicUsize, Arc}; +use std::{ + sync::{ + atomic::{AtomicU32, AtomicUsize}, + Arc, + }, + time::Duration, +}; use anyhow::anyhow; use async_trait::async_trait; @@ -101,6 +107,9 @@ impl Client { tokio::spawn(async move { let tx = tx2; + let connect_backoff_counter = Arc::new(AtomicU32::new(0)); + let subscribe_backoff_counter = Arc::new(AtomicU32::new(0)); + let current_endpoint = AtomicUsize::new(0); let build_ws = || async { @@ -131,6 +140,7 @@ impl Client { let ws2 = ws.clone(); tracing::info!("Endpoint connected"); + connect_backoff_counter.store(0, std::sync::atomic::Ordering::Relaxed); tokio::spawn(async move { ws2.on_disconnect().await; @@ -143,8 +153,7 @@ impl Client { } Err((e, url)) => { tracing::warn!("Unable to connect to endpoint: '{url}' error: {e}"); - // TODO: use a backoff strategy - tokio::time::sleep(std::time::Duration::from_millis(1000)).await; + tokio::time::sleep(get_backoff_time(counter)).await; } } } @@ -154,6 +163,7 @@ impl Client { let handle_message = |message: Message, ws: Arc| { let tx = tx.clone(); + let connect_backoff_counter = connect_backoff_counter.clone(); tokio::spawn(async move { match message { @@ -196,9 +206,8 @@ impl Client { Error::Transport(_) | Error::RestartNeeded(_) | Error::MaxSlotsExceeded => { - // TODO: use a backoff strategy - tokio::time::sleep(std::time::Duration::from_millis( - 200, + tokio::time::sleep(get_backoff_time( + connect_backoff_counter, )) .await; @@ -236,6 +245,9 @@ impl Client { ws.subscribe(&subscribe, params.clone(), &unsubscribe).await; match result { result @ Ok(_) => { + subscribe_backoff_counter + .store(0, std::sync::atomic::Ordering::Relaxed); + if let Err(e) = response.send(result) { tracing::warn!("Failed to send response: {:?}", e); } @@ -268,9 +280,8 @@ impl Client { Error::Transport(_) | Error::RestartNeeded(_) | Error::MaxSlotsExceeded => { - // TODO: use a backoff strategy - tokio::time::sleep(std::time::Duration::from_millis( - 200, + tokio::time::sleep(get_backoff_time( + subscribe_backoff_counter, )) .await; @@ -310,8 +321,7 @@ impl Client { tokio::select! { _ = disconnect_rx.recv() => { tracing::info!("Disconnected from endpoint"); - // TODO: use a backoff strategy - tokio::time::sleep(std::time::Duration::from_millis(100)).await; + tokio::time::sleep(get_backoff_time(counter)).await; ws = build_ws().await; } message = rx.recv() => { @@ -386,3 +396,34 @@ impl Client { .map_err(|_| ()) } } + +fn get_backoff_time(counter: Arc) -> Duration { + let min_time = 100u64; + let step = 100u64; + let max_count = 10u32; + + let backoff_count = counter.fetch_add(1, std::sync::atomic::Ordering::Relaxed); + + let backoff_count = backoff_count.min(max_count) as u64; + let backoff_time = backoff_count * backoff_count * step; + + Duration::from_millis(backoff_time + min_time) +} + +#[test] +fn test_get_backoff_time() { + let counter = Arc::new(AtomicU32::new(0)); + + let mut times = Vec::new(); + + for _ in 0..12 { + times.push(get_backoff_time(counter.clone())); + } + + let times = times.into_iter().map(|t| t.as_millis()).collect::>(); + + assert_eq!( + times, + vec![100, 200, 500, 1000, 1700, 2600, 3700, 5000, 6500, 8200, 10100, 10100] + ); +} From 84bc8628cced9197a6f9e31a3eaf280a98c4c5b0 Mon Sep 17 00:00:00 2001 From: Bryan Chen Date: Tue, 3 Oct 2023 17:22:24 +1300 Subject: [PATCH 3/6] update --- src/extensions/client/mod.rs | 17 +++++++++-------- 1 file changed, 9 insertions(+), 8 deletions(-) diff --git a/src/extensions/client/mod.rs b/src/extensions/client/mod.rs index eecc962..2409e10 100644 --- a/src/extensions/client/mod.rs +++ b/src/extensions/client/mod.rs @@ -112,6 +112,7 @@ impl Client { let current_endpoint = AtomicUsize::new(0); + let connect_backoff_counter2 = connect_backoff_counter.clone(); let build_ws = || async { let build = || { let current_endpoint = @@ -140,8 +141,7 @@ impl Client { let ws2 = ws.clone(); tracing::info!("Endpoint connected"); - - connect_backoff_counter.store(0, std::sync::atomic::Ordering::Relaxed); + connect_backoff_counter2.store(0, std::sync::atomic::Ordering::Relaxed); tokio::spawn(async move { ws2.on_disconnect().await; @@ -154,7 +154,7 @@ impl Client { } Err((e, url)) => { tracing::warn!("Unable to connect to endpoint: '{url}' error: {e}"); - tokio::time::sleep(get_backoff_time(counter)).await; + tokio::time::sleep(get_backoff_time(&connect_backoff_counter2)).await; } } } @@ -165,6 +165,7 @@ impl Client { let handle_message = |message: Message, ws: Arc| { let tx = tx.clone(); let connect_backoff_counter = connect_backoff_counter.clone(); + let subscribe_backoff_counter = subscribe_backoff_counter.clone(); tokio::spawn(async move { match message { @@ -208,7 +209,7 @@ impl Client { | Error::RestartNeeded(_) | Error::MaxSlotsExceeded => { tokio::time::sleep(get_backoff_time( - connect_backoff_counter, + &connect_backoff_counter, )) .await; @@ -282,7 +283,7 @@ impl Client { | Error::RestartNeeded(_) | Error::MaxSlotsExceeded => { tokio::time::sleep(get_backoff_time( - subscribe_backoff_counter, + &subscribe_backoff_counter, )) .await; @@ -322,7 +323,7 @@ impl Client { tokio::select! { _ = disconnect_rx.recv() => { tracing::info!("Disconnected from endpoint"); - tokio::time::sleep(get_backoff_time(counter)).await; + tokio::time::sleep(get_backoff_time(&connect_backoff_counter)).await; ws = build_ws().await; } message = rx.recv() => { @@ -398,7 +399,7 @@ impl Client { } } -fn get_backoff_time(counter: Arc) -> Duration { +fn get_backoff_time(counter: &Arc) -> Duration { let min_time = 100u64; let step = 100u64; let max_count = 10u32; @@ -418,7 +419,7 @@ fn test_get_backoff_time() { let mut times = Vec::new(); for _ in 0..12 { - times.push(get_backoff_time(counter.clone())); + times.push(get_backoff_time(&counter)); } let times = times.into_iter().map(|t| t.as_millis()).collect::>(); From 9c5cdba1ad5711bfaa081fd9b65d9c3818dcc9a1 Mon Sep 17 00:00:00 2001 From: Bryan Chen Date: Tue, 3 Oct 2023 17:24:09 +1300 Subject: [PATCH 4/6] fix --- src/extensions/client/mod.rs | 10 +++++----- 1 file changed, 5 insertions(+), 5 deletions(-) diff --git a/src/extensions/client/mod.rs b/src/extensions/client/mod.rs index 2409e10..10e590c 100644 --- a/src/extensions/client/mod.rs +++ b/src/extensions/client/mod.rs @@ -108,7 +108,7 @@ impl Client { let tx = tx2; let connect_backoff_counter = Arc::new(AtomicU32::new(0)); - let subscribe_backoff_counter = Arc::new(AtomicU32::new(0)); + let request_backoff_counter = Arc::new(AtomicU32::new(0)); let current_endpoint = AtomicUsize::new(0); @@ -165,7 +165,7 @@ impl Client { let handle_message = |message: Message, ws: Arc| { let tx = tx.clone(); let connect_backoff_counter = connect_backoff_counter.clone(); - let subscribe_backoff_counter = subscribe_backoff_counter.clone(); + let request_backoff_counter = request_backoff_counter.clone(); tokio::spawn(async move { match message { @@ -209,7 +209,7 @@ impl Client { | Error::RestartNeeded(_) | Error::MaxSlotsExceeded => { tokio::time::sleep(get_backoff_time( - &connect_backoff_counter, + &request_backoff_counter, )) .await; @@ -247,7 +247,7 @@ impl Client { ws.subscribe(&subscribe, params.clone(), &unsubscribe).await; match result { result @ Ok(_) => { - subscribe_backoff_counter + request_backoff_counter .store(0, std::sync::atomic::Ordering::Relaxed); if let Err(e) = response.send(result) { @@ -283,7 +283,7 @@ impl Client { | Error::RestartNeeded(_) | Error::MaxSlotsExceeded => { tokio::time::sleep(get_backoff_time( - &subscribe_backoff_counter, + &request_backoff_counter, )) .await; From 612bf935779c7bf0173299091bb9db3de51cf646 Mon Sep 17 00:00:00 2001 From: Bryan Chen Date: Tue, 3 Oct 2023 17:25:03 +1300 Subject: [PATCH 5/6] update --- src/extensions/client/mod.rs | 3 +++ 1 file changed, 3 insertions(+) diff --git a/src/extensions/client/mod.rs b/src/extensions/client/mod.rs index 10e590c..673eff0 100644 --- a/src/extensions/client/mod.rs +++ b/src/extensions/client/mod.rs @@ -177,6 +177,9 @@ impl Client { let result = ws.request(&method, params.clone()).await; match result { result @ Ok(_) => { + request_backoff_counter + .store(0, std::sync::atomic::Ordering::Relaxed); + if let Err(e) = response.send(result) { tracing::warn!("Failed to send response: {:?}", e); } From f6c13ae360517fb8af0eb712290f86c01b67ad22 Mon Sep 17 00:00:00 2001 From: Bryan Chen Date: Tue, 3 Oct 2023 17:29:59 +1300 Subject: [PATCH 6/6] fix --- src/extensions/client/mod.rs | 1 - 1 file changed, 1 deletion(-) diff --git a/src/extensions/client/mod.rs b/src/extensions/client/mod.rs index 673eff0..59b7271 100644 --- a/src/extensions/client/mod.rs +++ b/src/extensions/client/mod.rs @@ -164,7 +164,6 @@ impl Client { let handle_message = |message: Message, ws: Arc| { let tx = tx.clone(); - let connect_backoff_counter = connect_backoff_counter.clone(); let request_backoff_counter = request_backoff_counter.clone(); tokio::spawn(async move {