diff --git a/ampd/src/asyncutil/future.rs b/ampd/src/asyncutil/future.rs index a90b4c4b4..645ff6b43 100644 --- a/ampd/src/asyncutil/future.rs +++ b/ampd/src/asyncutil/future.rs @@ -1,102 +1,53 @@ -use std::pin::Pin; -use std::task::{Context, Poll}; use std::time::Duration; -use futures::{Future, FutureExt}; +use futures::Future; use tokio::time; -pub fn with_retry( - future: F, - policy: RetryPolicy, -) -> impl Future> +pub async fn with_retry(mut future: F, policy: RetryPolicy) -> Result where - F: Fn() -> Fut, + F: FnMut() -> Fut, Fut: Future>, { - RetriableFuture::new(future, policy) -} - -pub enum RetryPolicy { - RepeatConstant { sleep: Duration, max_attempts: u64 }, -} - -struct RetriableFuture -where - F: Fn() -> Fut, - Fut: Future>, -{ - future: F, - inner: Pin>, - policy: RetryPolicy, - err_count: u64, -} - -impl Unpin for RetriableFuture -where - F: Fn() -> Fut, - Fut: Future>, -{ -} - -impl RetriableFuture -where - F: Fn() -> Fut, - Fut: Future>, -{ - fn new(get_future: F, policy: RetryPolicy) -> Self { - let future = get_future(); - - Self { - future: get_future, - inner: Box::pin(future), - policy, - err_count: 0, + let mut attempt_count = 0u64; + loop { + match future().await { + Ok(result) => return Ok(result), + Err(err) => { + attempt_count = attempt_count.saturating_add(1); + + match enact_policy(attempt_count, policy).await { + PolicyAction::Retry => continue, + PolicyAction::Abort => return Err(err), + } + } } } +} - fn handle_err( - mut self: Pin<&mut Self>, - cx: &mut Context<'_>, - error: Err, - ) -> Poll> { - self.err_count = self.err_count.saturating_add(1); - - match self.policy { - RetryPolicy::RepeatConstant { - sleep, - max_attempts, - } => { - if self.err_count >= max_attempts { - return Poll::Ready(Err(error)); - } - - self.inner = Box::pin((self.future)()); - - let waker = cx.waker().clone(); - tokio::spawn(time::sleep(sleep).then(|_| async { - waker.wake(); - })); - - Poll::Pending +async fn enact_policy(attempt_count: u64, policy: RetryPolicy) -> PolicyAction { + match policy { + RetryPolicy::RepeatConstant { + sleep, + max_attempts, + } => { + if attempt_count >= max_attempts { + PolicyAction::Abort + } else { + time::sleep(sleep).await; + PolicyAction::Retry } } } } -impl Future for RetriableFuture -where - F: Fn() -> Fut, - Fut: Future>, -{ - type Output = Result; +enum PolicyAction { + Retry, + Abort, +} - fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll { - match self.inner.as_mut().poll(cx) { - Poll::Pending => Poll::Pending, - Poll::Ready(Ok(result)) => Poll::Ready(Ok(result)), - Poll::Ready(Err(error)) => self.handle_err(cx, error), - } - } +#[derive(Copy, Clone)] +pub enum RetryPolicy { + RepeatConstant { sleep: Duration, max_attempts: u64 }, } #[cfg(test)] diff --git a/ampd/src/broadcaster/confirm_tx.rs b/ampd/src/broadcaster/confirm_tx.rs index 7499af9e3..a0d0c8fdc 100644 --- a/ampd/src/broadcaster/confirm_tx.rs +++ b/ampd/src/broadcaster/confirm_tx.rs @@ -1,17 +1,17 @@ -use std::time::Duration; +use std::sync::Arc; use axelar_wasm_std::FnExt; use cosmrs::proto::cosmos::tx::v1beta1::{GetTxRequest, GetTxResponse}; -use error_stack::{report, Report, Result}; +use error_stack::{bail, Report, Result}; use futures::{StreamExt, TryFutureExt}; use thiserror::Error; use tokio::sync::{mpsc, Mutex}; -use tokio::time; use tokio_stream::wrappers::ReceiverStream; use tonic::Status; use tracing::error; use super::cosmos; +use crate::asyncutil::future::{with_retry, RetryPolicy}; #[derive(Debug, PartialEq)] pub enum TxStatus { @@ -53,19 +53,12 @@ pub enum Error { SendTxRes(#[from] Box>), } -enum ConfirmationResult { - Confirmed(Box), - NotFound, - GRPCError(Status), -} - pub struct TxConfirmer where T: cosmos::BroadcastClient, { client: T, - sleep: Duration, - max_attempts: u32, + retry_policy: RetryPolicy, tx_hash_receiver: mpsc::Receiver, tx_res_sender: mpsc::Sender, } @@ -76,15 +69,13 @@ where { pub fn new( client: T, - sleep: Duration, - max_attempts: u32, + retry_policy: RetryPolicy, tx_hash_receiver: mpsc::Receiver, tx_res_sender: mpsc::Sender, ) -> Self { Self { client, - sleep, - max_attempts, + retry_policy, tx_hash_receiver, tx_res_sender, } @@ -93,23 +84,19 @@ where pub async fn run(self) -> Result<(), Error> { let Self { client, - sleep, - max_attempts, + retry_policy, tx_hash_receiver, tx_res_sender, } = self; let limit = tx_hash_receiver.capacity(); - let client = Mutex::new(client); + let client = Arc::new(Mutex::new(client)); + let mut tx_hash_stream = ReceiverStream::new(tx_hash_receiver) .map(|tx_hash| { - confirm_tx(&client, tx_hash, sleep, max_attempts).and_then(|tx| async { - tx_res_sender - .send(tx) - .await - .map_err(Box::new) - .map_err(Into::into) - .map_err(Report::new) - }) + // multiple instances of confirm_tx can be spawned due to buffer_unordered, + // so we need to clone the client to avoid a deadlock + confirm_tx_with_retry(client.clone(), tx_hash, retry_policy) + .and_then(|tx| async { send_response(&tx_res_sender, tx).await }) }) .buffer_unordered(limit); @@ -121,50 +108,63 @@ where } } -async fn confirm_tx( - client: &Mutex, +async fn confirm_tx_with_retry( + client: Arc>, tx_hash: String, - sleep: Duration, - attempts: u32, -) -> Result -where - T: cosmos::BroadcastClient, -{ - for i in 0..attempts { - let req = GetTxRequest { - hash: tx_hash.clone(), - }; - - match client.lock().await.tx(req).await.then(evaluate_tx_response) { - ConfirmationResult::Confirmed(tx) => return Ok(*tx), - ConfirmationResult::NotFound if i == attempts.saturating_sub(1) => { - return Err(report!(Error::Confirmation { tx_hash })) - } - ConfirmationResult::GRPCError(status) if i == attempts.saturating_sub(1) => { - return Err(report!(Error::Grpc { status, tx_hash })) - } - _ => time::sleep(sleep).await, - } - } + retry_policy: RetryPolicy, +) -> Result { + with_retry(|| confirm_tx(client.clone(), tx_hash.clone()), retry_policy).await +} - unreachable!("confirmation loop should have returned by now") +// do to limitations of lambdas and lifetime issues this needs to be a separate function +async fn confirm_tx( + client: Arc>, + tx_hash: String, +) -> Result { + let req = GetTxRequest { + hash: tx_hash.clone(), + }; + + client + .lock() + .await + .tx(req) + .await + .then(evaluate_tx_response(tx_hash)) } fn evaluate_tx_response( - response: core::result::Result, -) -> ConfirmationResult { - match response { - Err(status) => ConfirmationResult::GRPCError(status), + tx_hash: String, +) -> impl Fn(core::result::Result) -> Result { + move |response| match response { + Err(status) => bail!(Error::Grpc { + status, + tx_hash: tx_hash.clone() + }), Ok(GetTxResponse { tx_response: None, .. - }) => ConfirmationResult::NotFound, + }) => bail!(Error::Confirmation { + tx_hash: tx_hash.clone() + }), Ok(GetTxResponse { tx_response: Some(response), .. - }) => ConfirmationResult::Confirmed(Box::new(response.into())), + }) => Ok(response.into()), } } +async fn send_response( + tx_res_sender: &mpsc::Sender, + tx: TxResponse, +) -> Result<(), Error> { + tx_res_sender + .send(tx) + .await + .map_err(Box::new) + .map_err(Into::into) + .map_err(Report::new) +} + #[cfg(test)] mod test { use std::time::Duration; @@ -175,6 +175,7 @@ mod test { use tokio::test; use super::{Error, TxConfirmer, TxResponse, TxStatus}; + use crate::asyncutil::future::RetryPolicy; use crate::broadcaster::cosmos::MockBroadcastClient; #[test] @@ -205,8 +206,10 @@ mod test { let tx_confirmer = TxConfirmer::new( client, - sleep, - max_attempts, + RetryPolicy::RepeatConstant { + sleep, + max_attempts, + }, tx_confirmer_receiver, tx_res_sender, ); @@ -252,8 +255,10 @@ mod test { let tx_confirmer = TxConfirmer::new( client, - sleep, - max_attempts, + RetryPolicy::RepeatConstant { + sleep, + max_attempts, + }, tx_confirmer_receiver, tx_res_sender, ); @@ -291,8 +296,10 @@ mod test { let tx_confirmer = TxConfirmer::new( client, - sleep, - max_attempts, + RetryPolicy::RepeatConstant { + sleep, + max_attempts, + }, tx_confirmer_receiver, tx_res_sender, ); @@ -330,8 +337,10 @@ mod test { let tx_confirmer = TxConfirmer::new( client, - sleep, - max_attempts, + RetryPolicy::RepeatConstant { + sleep, + max_attempts, + }, tx_confirmer_receiver, tx_res_sender, ); diff --git a/ampd/src/commands/mod.rs b/ampd/src/commands/mod.rs index 3ed6b3276..257946ded 100644 --- a/ampd/src/commands/mod.rs +++ b/ampd/src/commands/mod.rs @@ -5,12 +5,15 @@ use cosmrs::proto::cosmos::base::abci::v1beta1::TxResponse; use cosmrs::proto::cosmos::tx::v1beta1::service_client::ServiceClient; use cosmrs::proto::Any; use cosmrs::AccountId; -use error_stack::{Result, ResultExt}; +use error_stack::{report, FutureExt, Result, ResultExt}; +use futures::TryFutureExt; use serde::{Deserialize, Serialize}; +use tokio::sync::mpsc::{Receiver, Sender}; use valuable::Valuable; +use crate::asyncutil::future::RetryPolicy; use crate::broadcaster::Broadcaster; -use crate::config::Config as AmpdConfig; +use crate::config::{Config as AmpdConfig, Config}; use crate::tofnd::grpc::{Multisig, MultisigClient}; use crate::types::{PublicKey, TMAddress}; use crate::{broadcaster, tofnd, Error, PREFIX}; @@ -75,13 +78,46 @@ async fn broadcast_tx( tx: Any, pub_key: PublicKey, ) -> Result { + let (confirmation_sender, mut confirmation_receiver) = tokio::sync::mpsc::channel(1); + let (hash_to_confirm_sender, hash_to_confirm_receiver) = tokio::sync::mpsc::channel(1); + + let mut broadcaster = instantiate_broadcaster( + config, + pub_key, + hash_to_confirm_receiver, + confirmation_sender, + ) + .await?; + + broadcaster + .broadcast(vec![tx]) + .change_context(Error::Broadcaster) + .and_then(|response| { + hash_to_confirm_sender + .send(response.txhash) + .change_context(Error::Broadcaster) + }) + .await?; + + confirmation_receiver + .recv() + .await + .ok_or(report!(Error::TxConfirmation)) + .map(|tx| tx.response) +} + +async fn instantiate_broadcaster( + config: Config, + pub_key: PublicKey, + tx_hashes_to_confirm: Receiver, + confirmed_txs: Sender, +) -> Result { let AmpdConfig { tm_grpc, broadcast, tofnd_config, .. } = config; - let service_client = ServiceClient::connect(tm_grpc.to_string()) .await .change_context(Error::Connection) @@ -99,7 +135,20 @@ async fn broadcast_tx( .change_context(Error::Connection) .attach_printable(tofnd_config.url)?; - broadcaster::UnvalidatedBasicBroadcaster::builder() + broadcaster::confirm_tx::TxConfirmer::new( + service_client.clone(), + RetryPolicy::RepeatConstant { + sleep: broadcast.tx_fetch_interval, + max_attempts: broadcast.tx_fetch_max_retries.saturating_add(1).into(), + }, + tx_hashes_to_confirm, + confirmed_txs, + ) + .run() + .await + .change_context(Error::TxConfirmation)?; + + let basic_broadcaster = broadcaster::UnvalidatedBasicBroadcaster::builder() .client(service_client) .signer(multisig_client) .auth_query_client(auth_query_client) @@ -110,8 +159,6 @@ async fn broadcast_tx( .build() .validate_fee_denomination() .await - .change_context(Error::Broadcaster)? - .broadcast(vec![tx]) - .await - .change_context(Error::Broadcaster) + .change_context(Error::Broadcaster)?; + Ok(basic_broadcaster) } diff --git a/ampd/src/event_processor.rs b/ampd/src/event_processor.rs index 204b0c64f..c0d3e3437 100644 --- a/ampd/src/event_processor.rs +++ b/ampd/src/event_processor.rs @@ -86,8 +86,10 @@ where &handler, &broadcaster, event, - event_processor_config.retry_delay, - event_processor_config.retry_max_attempts, + RetryPolicy::RepeatConstant { + sleep: event_processor_config.retry_delay, + max_attempts: event_processor_config.retry_max_attempts, + }, ) .await?; } @@ -110,23 +112,14 @@ async fn handle_event( handler: &H, broadcaster: &B, event: &Event, - handle_sleep_duration: Duration, - handle_max_attempts: u64, + retry_policy: RetryPolicy, ) -> Result<(), Error> where H: EventHandler, B: BroadcasterClient, { // if handlers run into errors we log them and then move on to the next event - match future::with_retry( - || handler.handle(event), - RetryPolicy::RepeatConstant { - sleep: handle_sleep_duration, - max_attempts: handle_max_attempts, - }, - ) - .await - { + match future::with_retry(|| handler.handle(event), retry_policy).await { Ok(msgs) => { for msg in msgs { broadcaster diff --git a/ampd/src/lib.rs b/ampd/src/lib.rs index e409a658c..89ca29a83 100644 --- a/ampd/src/lib.rs +++ b/ampd/src/lib.rs @@ -2,7 +2,6 @@ use std::time::Duration; use asyncutil::task::{CancellableTask, TaskError, TaskGroup}; use block_height_monitor::BlockHeightMonitor; -use broadcaster::confirm_tx::TxConfirmer; use broadcaster::Broadcaster; use cosmrs::proto::cosmos::auth::v1beta1::query_client::QueryClient as AuthQueryClient; use cosmrs::proto::cosmos::bank::v1beta1::query_client::QueryClient as BankQueryClient; @@ -49,6 +48,9 @@ mod url; pub use grpc::{client, proto}; +use crate::asyncutil::future::RetryPolicy; +use crate::broadcaster::confirm_tx::TxConfirmer; + const PREFIX: &str = "axelar"; const DEFAULT_RPC_TIMEOUT: Duration = Duration::from_secs(3); @@ -183,8 +185,8 @@ where let (event_publisher, event_subscriber) = event_sub::EventPublisher::new(tm_client, event_buffer_cap); - let (tx_confirmer_sender, tx_confirmer_receiver) = mpsc::channel(1000); - let (tx_res_sender, tx_res_receiver) = mpsc::channel(1000); + let (tx_hash_sender, tx_hash_receiver) = mpsc::channel(1000); + let (tx_response_sender, tx_response_receiver) = mpsc::channel(1000); let event_processor = TaskGroup::new(); let broadcaster = QueuedBroadcaster::new( @@ -192,15 +194,17 @@ where broadcast_cfg.batch_gas_limit, broadcast_cfg.queue_cap, interval(broadcast_cfg.broadcast_interval), - tx_confirmer_sender, - tx_res_receiver, + tx_hash_sender, + tx_response_receiver, ); let tx_confirmer = TxConfirmer::new( service_client, - broadcast_cfg.tx_fetch_interval, - broadcast_cfg.tx_fetch_max_retries.saturating_add(1), - tx_confirmer_receiver, - tx_res_sender, + RetryPolicy::RepeatConstant { + sleep: broadcast_cfg.tx_fetch_interval, + max_attempts: broadcast_cfg.tx_fetch_max_retries.saturating_add(1).into(), + }, + tx_hash_receiver, + tx_response_sender, ); Self { @@ -443,7 +447,7 @@ where .change_context(Error::EventProcessor) })) .add_task(CancellableTask::create(|_| { - tx_confirmer.run().change_context(Error::TxConfirmer) + tx_confirmer.run().change_context(Error::TxConfirmation) })) .add_task(CancellableTask::create(|_| { broadcaster.run().change_context(Error::Broadcaster) @@ -461,8 +465,8 @@ pub enum Error { EventProcessor, #[error("broadcaster failed")] Broadcaster, - #[error("tx confirmer failed")] - TxConfirmer, + #[error("tx confirmation failed")] + TxConfirmation, #[error("tofnd failed")] Tofnd, #[error("connection failed")] diff --git a/contracts/multisig/src/contract.rs b/contracts/multisig/src/contract.rs index 7ef4af54c..b30683d52 100644 --- a/contracts/multisig/src/contract.rs +++ b/contracts/multisig/src/contract.rs @@ -857,7 +857,7 @@ mod tests { let res = query(deps.as_ref(), mock_env(), msg); assert!(res.is_ok()); - let query_res: Multisig = from_json(&res.unwrap()).unwrap(); + let query_res: Multisig = from_json(res.unwrap()).unwrap(); let session = SIGNING_SESSIONS .load(deps.as_ref().storage, session_id.into()) .unwrap(); @@ -939,7 +939,7 @@ mod tests { for (addr, _, _) in &expected_pub_keys { let res = query_registered_public_key(deps.as_ref(), addr.clone(), key_type); assert!(res.is_ok()); - ret_pub_keys.push(from_json(&res.unwrap()).unwrap()); + ret_pub_keys.push(from_json(res.unwrap()).unwrap()); } assert_eq!( expected_pub_keys diff --git a/contracts/nexus-gateway/Cargo.toml b/contracts/nexus-gateway/Cargo.toml index 04ca59083..2e7d7ca53 100644 --- a/contracts/nexus-gateway/Cargo.toml +++ b/contracts/nexus-gateway/Cargo.toml @@ -12,6 +12,9 @@ crate-type = ["cdylib", "rlib"] name = "nexus-gateway-schema" path = "src/bin/schema.rs" +[features] +library = [] + [dependencies] axelar-wasm-std = { workspace = true, features = ["derive"] } cosmwasm-schema = { workspace = true } diff --git a/contracts/voting-verifier/src/contract.rs b/contracts/voting-verifier/src/contract.rs index a7f8ccaab..805b4f423 100644 --- a/contracts/voting-verifier/src/contract.rs +++ b/contracts/voting-verifier/src/contract.rs @@ -764,7 +764,7 @@ mod test { // check status corresponds to votes let statuses: Vec = from_json( - &query( + query( deps.as_ref(), mock_env(), QueryMsg::MessagesStatus(messages.clone()),