Skip to content

Commit

Permalink
fix(ampd): ensure that txs get confirmed when broadcast with an ampd …
Browse files Browse the repository at this point in the history
…subcommand (#597)
  • Loading branch information
cgorenflo authored Aug 23, 2024
1 parent 347cd15 commit 490f6f2
Show file tree
Hide file tree
Showing 8 changed files with 191 additions and 184 deletions.
117 changes: 34 additions & 83 deletions ampd/src/asyncutil/future.rs
Original file line number Diff line number Diff line change
@@ -1,102 +1,53 @@
use std::pin::Pin;
use std::task::{Context, Poll};
use std::time::Duration;

use futures::{Future, FutureExt};
use futures::Future;
use tokio::time;

pub fn with_retry<F, Fut, R, Err>(
future: F,
policy: RetryPolicy,
) -> impl Future<Output = Result<R, Err>>
pub async fn with_retry<F, Fut, R, Err>(mut future: F, policy: RetryPolicy) -> Result<R, Err>
where
F: Fn() -> Fut,
F: FnMut() -> Fut,
Fut: Future<Output = Result<R, Err>>,
{
RetriableFuture::new(future, policy)
}

pub enum RetryPolicy {
RepeatConstant { sleep: Duration, max_attempts: u64 },
}

struct RetriableFuture<F, Fut, R, Err>
where
F: Fn() -> Fut,
Fut: Future<Output = Result<R, Err>>,
{
future: F,
inner: Pin<Box<Fut>>,
policy: RetryPolicy,
err_count: u64,
}

impl<F, Fut, R, Err> Unpin for RetriableFuture<F, Fut, R, Err>
where
F: Fn() -> Fut,
Fut: Future<Output = Result<R, Err>>,
{
}

impl<F, Fut, R, Err> RetriableFuture<F, Fut, R, Err>
where
F: Fn() -> Fut,
Fut: Future<Output = Result<R, Err>>,
{
fn new(get_future: F, policy: RetryPolicy) -> Self {
let future = get_future();

Self {
future: get_future,
inner: Box::pin(future),
policy,
err_count: 0,
let mut attempt_count = 0u64;
loop {
match future().await {
Ok(result) => return Ok(result),
Err(err) => {
attempt_count = attempt_count.saturating_add(1);

match enact_policy(attempt_count, policy).await {
PolicyAction::Retry => continue,
PolicyAction::Abort => return Err(err),
}
}
}
}
}

fn handle_err(
mut self: Pin<&mut Self>,
cx: &mut Context<'_>,
error: Err,
) -> Poll<Result<R, Err>> {
self.err_count = self.err_count.saturating_add(1);

match self.policy {
RetryPolicy::RepeatConstant {
sleep,
max_attempts,
} => {
if self.err_count >= max_attempts {
return Poll::Ready(Err(error));
}

self.inner = Box::pin((self.future)());

let waker = cx.waker().clone();
tokio::spawn(time::sleep(sleep).then(|_| async {
waker.wake();
}));

Poll::Pending
async fn enact_policy(attempt_count: u64, policy: RetryPolicy) -> PolicyAction {
match policy {
RetryPolicy::RepeatConstant {
sleep,
max_attempts,
} => {
if attempt_count >= max_attempts {
PolicyAction::Abort
} else {
time::sleep(sleep).await;
PolicyAction::Retry
}
}
}
}

impl<F, Fut, R, Err> Future for RetriableFuture<F, Fut, R, Err>
where
F: Fn() -> Fut,
Fut: Future<Output = Result<R, Err>>,
{
type Output = Result<R, Err>;
enum PolicyAction {
Retry,
Abort,
}

fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
match self.inner.as_mut().poll(cx) {
Poll::Pending => Poll::Pending,
Poll::Ready(Ok(result)) => Poll::Ready(Ok(result)),
Poll::Ready(Err(error)) => self.handle_err(cx, error),
}
}
#[derive(Copy, Clone)]
pub enum RetryPolicy {
RepeatConstant { sleep: Duration, max_attempts: u64 },
}

#[cfg(test)]
Expand Down
139 changes: 74 additions & 65 deletions ampd/src/broadcaster/confirm_tx.rs
Original file line number Diff line number Diff line change
@@ -1,17 +1,17 @@
use std::time::Duration;
use std::sync::Arc;

use axelar_wasm_std::FnExt;
use cosmrs::proto::cosmos::tx::v1beta1::{GetTxRequest, GetTxResponse};
use error_stack::{report, Report, Result};
use error_stack::{bail, Report, Result};
use futures::{StreamExt, TryFutureExt};
use thiserror::Error;
use tokio::sync::{mpsc, Mutex};
use tokio::time;
use tokio_stream::wrappers::ReceiverStream;
use tonic::Status;
use tracing::error;

use super::cosmos;
use crate::asyncutil::future::{with_retry, RetryPolicy};

#[derive(Debug, PartialEq)]
pub enum TxStatus {
Expand Down Expand Up @@ -53,19 +53,12 @@ pub enum Error {
SendTxRes(#[from] Box<mpsc::error::SendError<TxResponse>>),
}

enum ConfirmationResult {
Confirmed(Box<TxResponse>),
NotFound,
GRPCError(Status),
}

pub struct TxConfirmer<T>
where
T: cosmos::BroadcastClient,
{
client: T,
sleep: Duration,
max_attempts: u32,
retry_policy: RetryPolicy,
tx_hash_receiver: mpsc::Receiver<String>,
tx_res_sender: mpsc::Sender<TxResponse>,
}
Expand All @@ -76,15 +69,13 @@ where
{
pub fn new(
client: T,
sleep: Duration,
max_attempts: u32,
retry_policy: RetryPolicy,
tx_hash_receiver: mpsc::Receiver<String>,
tx_res_sender: mpsc::Sender<TxResponse>,
) -> Self {
Self {
client,
sleep,
max_attempts,
retry_policy,
tx_hash_receiver,
tx_res_sender,
}
Expand All @@ -93,23 +84,19 @@ where
pub async fn run(self) -> Result<(), Error> {
let Self {
client,
sleep,
max_attempts,
retry_policy,
tx_hash_receiver,
tx_res_sender,
} = self;
let limit = tx_hash_receiver.capacity();
let client = Mutex::new(client);
let client = Arc::new(Mutex::new(client));

let mut tx_hash_stream = ReceiverStream::new(tx_hash_receiver)
.map(|tx_hash| {
confirm_tx(&client, tx_hash, sleep, max_attempts).and_then(|tx| async {
tx_res_sender
.send(tx)
.await
.map_err(Box::new)
.map_err(Into::into)
.map_err(Report::new)
})
// multiple instances of confirm_tx can be spawned due to buffer_unordered,
// so we need to clone the client to avoid a deadlock
confirm_tx_with_retry(client.clone(), tx_hash, retry_policy)
.and_then(|tx| async { send_response(&tx_res_sender, tx).await })
})
.buffer_unordered(limit);

Expand All @@ -121,50 +108,63 @@ where
}
}

async fn confirm_tx<T>(
client: &Mutex<T>,
async fn confirm_tx_with_retry(
client: Arc<Mutex<impl cosmos::BroadcastClient>>,
tx_hash: String,
sleep: Duration,
attempts: u32,
) -> Result<TxResponse, Error>
where
T: cosmos::BroadcastClient,
{
for i in 0..attempts {
let req = GetTxRequest {
hash: tx_hash.clone(),
};

match client.lock().await.tx(req).await.then(evaluate_tx_response) {
ConfirmationResult::Confirmed(tx) => return Ok(*tx),
ConfirmationResult::NotFound if i == attempts.saturating_sub(1) => {
return Err(report!(Error::Confirmation { tx_hash }))
}
ConfirmationResult::GRPCError(status) if i == attempts.saturating_sub(1) => {
return Err(report!(Error::Grpc { status, tx_hash }))
}
_ => time::sleep(sleep).await,
}
}
retry_policy: RetryPolicy,
) -> Result<TxResponse, Error> {
with_retry(|| confirm_tx(client.clone(), tx_hash.clone()), retry_policy).await
}

unreachable!("confirmation loop should have returned by now")
// do to limitations of lambdas and lifetime issues this needs to be a separate function
async fn confirm_tx(
client: Arc<Mutex<impl cosmos::BroadcastClient>>,
tx_hash: String,
) -> Result<TxResponse, Error> {
let req = GetTxRequest {
hash: tx_hash.clone(),
};

client
.lock()
.await
.tx(req)
.await
.then(evaluate_tx_response(tx_hash))
}

fn evaluate_tx_response(
response: core::result::Result<GetTxResponse, Status>,
) -> ConfirmationResult {
match response {
Err(status) => ConfirmationResult::GRPCError(status),
tx_hash: String,
) -> impl Fn(core::result::Result<GetTxResponse, Status>) -> Result<TxResponse, Error> {
move |response| match response {
Err(status) => bail!(Error::Grpc {
status,
tx_hash: tx_hash.clone()
}),
Ok(GetTxResponse {
tx_response: None, ..
}) => ConfirmationResult::NotFound,
}) => bail!(Error::Confirmation {
tx_hash: tx_hash.clone()
}),
Ok(GetTxResponse {
tx_response: Some(response),
..
}) => ConfirmationResult::Confirmed(Box::new(response.into())),
}) => Ok(response.into()),
}
}

async fn send_response(
tx_res_sender: &mpsc::Sender<TxResponse>,
tx: TxResponse,
) -> Result<(), Error> {
tx_res_sender
.send(tx)
.await
.map_err(Box::new)
.map_err(Into::into)
.map_err(Report::new)
}

#[cfg(test)]
mod test {
use std::time::Duration;
Expand All @@ -175,6 +175,7 @@ mod test {
use tokio::test;

use super::{Error, TxConfirmer, TxResponse, TxStatus};
use crate::asyncutil::future::RetryPolicy;
use crate::broadcaster::cosmos::MockBroadcastClient;

#[test]
Expand Down Expand Up @@ -205,8 +206,10 @@ mod test {

let tx_confirmer = TxConfirmer::new(
client,
sleep,
max_attempts,
RetryPolicy::RepeatConstant {
sleep,
max_attempts,
},
tx_confirmer_receiver,
tx_res_sender,
);
Expand Down Expand Up @@ -252,8 +255,10 @@ mod test {

let tx_confirmer = TxConfirmer::new(
client,
sleep,
max_attempts,
RetryPolicy::RepeatConstant {
sleep,
max_attempts,
},
tx_confirmer_receiver,
tx_res_sender,
);
Expand Down Expand Up @@ -291,8 +296,10 @@ mod test {

let tx_confirmer = TxConfirmer::new(
client,
sleep,
max_attempts,
RetryPolicy::RepeatConstant {
sleep,
max_attempts,
},
tx_confirmer_receiver,
tx_res_sender,
);
Expand Down Expand Up @@ -330,8 +337,10 @@ mod test {

let tx_confirmer = TxConfirmer::new(
client,
sleep,
max_attempts,
RetryPolicy::RepeatConstant {
sleep,
max_attempts,
},
tx_confirmer_receiver,
tx_res_sender,
);
Expand Down
Loading

0 comments on commit 490f6f2

Please sign in to comment.