Skip to content

Commit

Permalink
fix(ampd): resolve deadlock for ampd commands with single broadcast (#…
Browse files Browse the repository at this point in the history
  • Loading branch information
cgorenflo authored Sep 12, 2024
1 parent 830d291 commit e852ac8
Show file tree
Hide file tree
Showing 3 changed files with 39 additions and 26 deletions.
10 changes: 7 additions & 3 deletions ampd/src/broadcaster/confirm_tx.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@ use thiserror::Error;
use tokio::sync::{mpsc, Mutex};
use tokio_stream::wrappers::ReceiverStream;
use tonic::Status;
use tracing::error;
use tracing::{debug, error, trace};

use super::cosmos;
use crate::asyncutil::future::{with_retry, RetryPolicy};
Expand Down Expand Up @@ -81,15 +81,19 @@ where
client,
retry_policy,
} = self;
let limit = tx_hash_receiver.capacity();
let limit = tx_hash_receiver.max_capacity();
let client = Arc::new(Mutex::new(client));

debug!(limit, "starting confirmation");

let mut tx_hash_stream = ReceiverStream::new(tx_hash_receiver)
.map(|tx_hash| {
.map(|tx_hash| async {
trace!(tx_hash, "handling confirmation");
// multiple instances of confirm_tx can be spawned due to buffer_unordered,
// so we need to clone the client to avoid a deadlock
confirm_tx_with_retry(client.clone(), tx_hash, retry_policy)
.and_then(|tx| async { send_response(&tx_response_sender, tx).await })
.await
})
.buffer_unordered(limit);

Expand Down
23 changes: 17 additions & 6 deletions ampd/src/broadcaster/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ use cosmrs::{Amount, Coin, Denom, Gas};
use dec_coin::DecCoin;
use error_stack::{ensure, report, FutureExt, Result, ResultExt};
use futures::TryFutureExt;
use itertools::Itertools;
use k256::sha2::{Digest, Sha256};
use mockall::automock;
use num_traits::{cast, Zero};
Expand Down Expand Up @@ -62,6 +63,8 @@ pub enum Error {
MalformedResponse { query: String },
#[error("address {address} is unknown, please make sure it is funded")]
AccountNotFound { address: TMAddress },
#[error("transaction not accepted by the node")]
TxNotAccepted,
}

#[derive(Debug, Deserialize, Serialize, Clone, PartialEq)]
Expand Down Expand Up @@ -216,9 +219,10 @@ where

async fn broadcast(&mut self, msgs: Vec<Any>) -> Result<TxResponse, Error> {
let (acc_number, acc_sequence) = self.acc_number_and_sequence().await?;
let fee = self.estimate_fee(msgs.clone(), acc_sequence).await?;
let tx = Tx::builder()
.msgs(msgs.clone())
.fee(self.estimate_fee(msgs, acc_sequence).await?)
.msgs(msgs)
.fee(fee.clone())
.pub_key(self.pub_key.1)
.acc_sequence(acc_sequence)
.build()
Expand Down Expand Up @@ -252,11 +256,18 @@ where
.broadcast_tx(tx)
.change_context(Error::Broadcast)
.await?;
let TxResponse {
txhash: tx_hash, ..
} = &response;

info!(tx_hash, "broadcasted transaction");
info!(
tx_hash = response.txhash,
acc_number,
acc_sequence = self.acc_sequence,
fee.amount = fee.amount.iter().map(Coin::to_string).join(", "),
?fee.gas_limit,
?response,
"transaction was broadcast"
);

ensure!(response.code == 0, Error::TxNotAccepted);

self.acc_sequence.replace(
acc_sequence
Expand Down
32 changes: 15 additions & 17 deletions ampd/src/commands/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,10 +8,11 @@ use cosmrs::AccountId;
use error_stack::{report, FutureExt, Result, ResultExt};
use futures::TryFutureExt;
use serde::{Deserialize, Serialize};
use tokio::sync::mpsc::{Receiver, Sender};
use tonic::transport::Channel;
use valuable::Valuable;

use crate::asyncutil::future::RetryPolicy;
use crate::broadcaster::confirm_tx::TxConfirmer;
use crate::broadcaster::Broadcaster;
use crate::config::{Config as AmpdConfig, Config};
use crate::tofnd::grpc::{Multisig, MultisigClient};
Expand Down Expand Up @@ -81,13 +82,7 @@ async fn broadcast_tx(
let (confirmation_sender, mut confirmation_receiver) = tokio::sync::mpsc::channel(1);
let (hash_to_confirm_sender, hash_to_confirm_receiver) = tokio::sync::mpsc::channel(1);

let mut broadcaster = instantiate_broadcaster(
config,
pub_key,
hash_to_confirm_receiver,
confirmation_sender,
)
.await?;
let (mut broadcaster, confirmer) = instantiate_broadcaster(config, pub_key).await?;

broadcaster
.broadcast(vec![tx])
Expand All @@ -99,6 +94,14 @@ async fn broadcast_tx(
})
.await?;

// drop the sender so the confirmer doesn't wait for more txs
drop(hash_to_confirm_sender);

confirmer
.run(hash_to_confirm_receiver, confirmation_sender)
.change_context(Error::TxConfirmation)
.await?;

confirmation_receiver
.recv()
.await
Expand All @@ -109,9 +112,7 @@ async fn broadcast_tx(
async fn instantiate_broadcaster(
config: Config,
pub_key: PublicKey,
tx_hashes_to_confirm: Receiver<String>,
confirmed_txs: Sender<broadcaster::confirm_tx::TxResponse>,
) -> Result<impl Broadcaster, Error> {
) -> Result<(impl Broadcaster, TxConfirmer<ServiceClient<Channel>>), Error> {
let AmpdConfig {
tm_grpc,
broadcast,
Expand All @@ -135,16 +136,13 @@ async fn instantiate_broadcaster(
.change_context(Error::Connection)
.attach_printable(tofnd_config.url)?;

broadcaster::confirm_tx::TxConfirmer::new(
let confirmer = TxConfirmer::new(
service_client.clone(),
RetryPolicy::RepeatConstant {
sleep: broadcast.tx_fetch_interval,
max_attempts: broadcast.tx_fetch_max_retries.saturating_add(1).into(),
},
)
.run(tx_hashes_to_confirm, confirmed_txs)
.await
.change_context(Error::TxConfirmation)?;
);

let basic_broadcaster = broadcaster::UnvalidatedBasicBroadcaster::builder()
.client(service_client)
Expand All @@ -158,5 +156,5 @@ async fn instantiate_broadcaster(
.validate_fee_denomination()
.await
.change_context(Error::Broadcaster)?;
Ok(basic_broadcaster)
Ok((basic_broadcaster, confirmer))
}

0 comments on commit e852ac8

Please sign in to comment.