Skip to content

Commit

Permalink
Merge remote-tracking branch 'origin/main' into feat/get-signer-utxo-pg
Browse files Browse the repository at this point in the history
  • Loading branch information
matteojug committed Oct 1, 2024
2 parents 4e3d223 + 9637ead commit a93cab4
Show file tree
Hide file tree
Showing 14 changed files with 667 additions and 168 deletions.
4 changes: 2 additions & 2 deletions signer/src/bitcoin/client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -50,11 +50,11 @@ impl BitcoinInteract for ApiFallbackClient<BitcoinCoreClient> {
.await
}

fn get_tx(&self, txid: &Txid) -> Result<Option<GetTxResponse>, Error> {
async fn get_tx(&self, txid: &Txid) -> Result<Option<GetTxResponse>, Error> {
self.get_client().get_tx(txid)
}

fn get_tx_info(
async fn get_tx_info(
&self,
txid: &Txid,
block_hash: &BlockHash,
Expand Down
6 changes: 3 additions & 3 deletions signer/src/bitcoin/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,22 +19,22 @@ pub mod zmq;

/// Represents the ability to interact with the bitcoin blockchain
#[cfg_attr(any(test, feature = "testing"), mockall::automock())]
pub trait BitcoinInteract {
pub trait BitcoinInteract: Sync + Send {
/// Get block
fn get_block(
&self,
block_hash: &BlockHash,
) -> impl Future<Output = Result<Option<bitcoin::Block>, Error>> + Send;

/// get tx
fn get_tx(&self, txid: &Txid) -> Result<Option<GetTxResponse>, Error>;
fn get_tx(&self, txid: &Txid) -> impl Future<Output = Result<Option<GetTxResponse>, Error>>;

/// get tx info
fn get_tx_info(
&self,
txid: &Txid,
block_hash: &BlockHash,
) -> Result<Option<BitcoinTxInfo>, Error>;
) -> impl Future<Output = Result<Option<BitcoinTxInfo>, Error>>;

/// Estimate fee rate
// This should be implemented with the help of the `fees::EstimateFees` trait
Expand Down
4 changes: 2 additions & 2 deletions signer/src/bitcoin/rpc.rs
Original file line number Diff line number Diff line change
Expand Up @@ -327,11 +327,11 @@ impl BitcoinInteract for BitcoinCoreClient {
self.get_block(block_hash)
}

fn get_tx(&self, txid: &Txid) -> Result<Option<GetTxResponse>, Error> {
async fn get_tx(&self, txid: &Txid) -> Result<Option<GetTxResponse>, Error> {
self.get_tx(txid)
}

fn get_tx_info(
async fn get_tx_info(
&self,
txid: &Txid,
block_hash: &BlockHash,
Expand Down
16 changes: 11 additions & 5 deletions signer/src/block_observer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
//! - Set aggregate key transactions

use std::collections::HashMap;
use std::future::Future;

use crate::bitcoin::BitcoinInteract;
use crate::context::Context;
Expand Down Expand Up @@ -74,12 +75,12 @@ pub struct Deposit {
}

impl DepositRequestValidator for CreateDepositRequest {
fn validate<C>(&self, client: &C) -> Result<Deposit, Error>
async fn validate<C>(&self, client: &C) -> Result<Deposit, Error>
where
C: BitcoinInteract,
{
// Fetch the transaction from either a block or from the mempool
let Some(response) = client.get_tx(&self.outpoint.txid)? else {
let Some(response) = client.get_tx(&self.outpoint.txid).await? else {
return Err(Error::BitcoinTxMissing(self.outpoint.txid));
};

Expand All @@ -98,7 +99,7 @@ pub trait DepositRequestValidator {
/// This function fetches the transaction using the given client and
/// checks that the transaction has been submitted. The transaction
/// need not be confirmed.
fn validate<C>(&self, client: &C) -> Result<Deposit, Error>
fn validate<C>(&self, client: &C) -> impl Future<Output = Result<Deposit, Error>>
where
C: BitcoinInteract;
}
Expand Down Expand Up @@ -158,6 +159,7 @@ where
for request in deposit_requests {
let deposit = request
.validate(&self.context.get_bitcoin_client())
.await
.inspect_err(|error| tracing::warn!(%error, "could not validate deposit request"));

if let Ok(deposit) = deposit {
Expand Down Expand Up @@ -799,11 +801,15 @@ mod tests {
}

impl BitcoinInteract for TestHarness {
fn get_tx(&self, txid: &bitcoin::Txid) -> Result<Option<GetTxResponse>, Error> {
async fn get_tx(&self, txid: &bitcoin::Txid) -> Result<Option<GetTxResponse>, Error> {
Ok(self.deposits.get(txid).cloned())
}

fn get_tx_info(&self, _: &Txid, _: &BlockHash) -> Result<Option<BitcoinTxInfo>, Error> {
async fn get_tx_info(
&self,
_: &Txid,
_: &BlockHash,
) -> Result<Option<BitcoinTxInfo>, Error> {
unimplemented!()
}

Expand Down
10 changes: 5 additions & 5 deletions signer/src/context/messaging.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@
//! messaging via the [`Context`].

/// Signals that can be sent within the signer binary.
#[derive(Debug, Clone)]
#[derive(Debug, Clone, PartialEq)]
pub enum SignerSignal {
/// Send a command to the application.
Command(SignerCommand),
Expand All @@ -11,14 +11,14 @@ pub enum SignerSignal {
}

/// Commands that can be sent on the signalling channel.
#[derive(Debug, Clone)]
#[derive(Debug, Clone, PartialEq)]
pub enum SignerCommand {
/// Signals to the application to publish a message to the P2P network.
P2PPublish(crate::network::Msg),
}

/// Events that can be received on the signalling channel.
#[derive(Debug, Clone)]
#[derive(Debug, Clone, PartialEq)]
pub enum SignerEvent {
/// Signals that a P2P event has occurred.
P2P(P2PEvent),
Expand All @@ -27,7 +27,7 @@ pub enum SignerEvent {
}

/// Events that can be triggered from the P2P network.
#[derive(Debug, Clone)]
#[derive(Debug, Clone, PartialEq)]
pub enum P2PEvent {
/// Signals to the application that the P2P publish failed for the given message.
PublishFailure(crate::network::MsgId),
Expand All @@ -41,7 +41,7 @@ pub enum P2PEvent {
}

/// Events that can be triggered from the transaction signer.
#[derive(Debug, Clone)]
#[derive(Debug, Clone, PartialEq)]
pub enum TxSignerEvent {
/// Received a deposit decision
ReceivedDepositDecision,
Expand Down
79 changes: 79 additions & 0 deletions signer/src/context/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -148,3 +148,82 @@ where
self.bitcoin_client.clone()
}
}

#[cfg(test)]
mod tests {
use std::sync::{
atomic::{AtomicU8, Ordering},
Arc,
};

use tokio::sync::Notify;

use crate::{
config::Settings,
context::{Context as _, SignerEvent, SignerSignal},
storage::in_memory::Store,
testing::NoopSignerContext,
};

/// This test shows that cloning a context and signalling on the original
/// context will also signal on the cloned context. But it also demonstrates
/// that there can be timing issues (particularly in tests) when signalling
/// across threads/clones, and shows how to handle that.
#[tokio::test]
async fn context_clone_signalling_works() {
// Create a context.
let context = NoopSignerContext::init(
Settings::new_from_default_config().unwrap(),
Store::new_shared(),
)
.unwrap();

// Clone the context.
let context_clone = context.clone();

// Get the receiver from the cloned context.
let mut cloned_receiver = context_clone.get_signal_receiver();

// Create a counter to track how many signals are received and some
// Notify channels so that we ensure we don't hit timing issues.
let recv_count = Arc::new(AtomicU8::new(0));
let task_started = Arc::new(Notify::new());
let task_completed = Arc::new(Notify::new());

// Spawn a task that will receive a signal (and clone values that will
// be used in the `move` closure). We will receive on the cloned context.
let task_started_clone = Arc::clone(&task_started);
let task_completed_clone = Arc::clone(&task_completed);
let recv_count_clone = Arc::clone(&recv_count);
tokio::spawn(async move {
task_started_clone.notify_one();
let signal = cloned_receiver.recv().await.unwrap();

assert_eq!(
signal,
SignerSignal::Event(SignerEvent::BitcoinBlockObserved)
);

recv_count_clone.fetch_add(1, Ordering::Relaxed);
task_completed_clone.notify_one();
});

// This wait is needed to ensure that the `recv_task` is started and
// the receiver subscribed before we send the signal. Otherwise, the
// signal may be sent before the receiver is ready to receive it,
// failing the test.
task_started.notified().await;

// Signal the original context.
context
.signal(SignerEvent::BitcoinBlockObserved.into())
.unwrap();

// This wait is needed to ensure that the below `abort()` doesn't
// kill the task before it has a chance to update `recv_count`.
task_completed.notified().await;

// Ensure that the signal was received.
assert_eq!(recv_count.load(std::sync::atomic::Ordering::Relaxed), 1);
}
}
22 changes: 19 additions & 3 deletions signer/src/main.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
use std::collections::HashMap;
use std::path::PathBuf;
use std::time::Duration;

use axum::routing::get;
use axum::routing::post;
Expand All @@ -17,8 +18,10 @@ use signer::context::SignerContext;
use signer::emily_client::EmilyClient;
use signer::error::Error;
use signer::network::libp2p::SignerSwarmBuilder;
use signer::network::P2PNetwork;
use signer::stacks::api::StacksClient;
use signer::storage::postgres::PgStore;
use signer::transaction_coordinator;
use signer::util::ApiFallbackClient;
use tokio::signal;

Expand Down Expand Up @@ -83,6 +86,7 @@ async fn main() -> Result<(), Box<dyn std::error::Error>> {
run_checked(run_stacks_event_observer, &context),
run_checked(run_libp2p_swarm, &context),
run_checked(run_block_observer, &context),
run_checked(run_transaction_coordinator, &context),
);

Ok(())
Expand Down Expand Up @@ -155,7 +159,6 @@ async fn run_shutdown_signal_watcher(ctx: impl Context) -> Result<(), Error> {
tracing::info!(signal = "Ctrl+C", "received termination signal");
}
}

}
}

Expand Down Expand Up @@ -261,6 +264,19 @@ async fn run_transaction_signer(_ctx: impl Context) -> Result<(), Error> {
}

#[allow(dead_code)] // Remove when implemented
async fn run_transaction_coordinator(_ctx: impl Context) -> Result<(), Error> {
todo!()
async fn run_transaction_coordinator(ctx: impl Context) -> Result<(), Error> {
let config = ctx.config().clone();
let network = P2PNetwork::new(&ctx);

let coord = transaction_coordinator::TxCoordinatorEventLoop {
network,
context: ctx,
context_window: 10000,
private_key: config.signer.private_key,
signing_round_max_duration: Duration::from_secs(10),
threshold: 2,
bitcoin_network: config.signer.network.into(),
};

coord.run().await
}
14 changes: 12 additions & 2 deletions signer/src/testing/api_clients.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ use url::Url;
use crate::bitcoin::rpc::BitcoinTxInfo;
use crate::bitcoin::rpc::GetTxResponse;
use crate::bitcoin::BitcoinInteract;
use crate::bitcoin::MockBitcoinInteract;
use crate::blocklist_client::BlocklistChecker;
use crate::emily_client::EmilyInteract;
use crate::error::Error;
Expand All @@ -24,10 +25,11 @@ impl TryFrom<&[Url]> for NoopApiClient {

/// Noop implementation of the BitcoinInteract trait.
impl BitcoinInteract for NoopApiClient {
fn get_tx(&self, _: &bitcoin::Txid) -> Result<Option<GetTxResponse>, Error> {
async fn get_tx(&self, _: &bitcoin::Txid) -> Result<Option<GetTxResponse>, Error> {
unimplemented!()
}
fn get_tx_info(

async fn get_tx_info(
&self,
_: &bitcoin::Txid,
_: &bitcoin::BlockHash,
Expand Down Expand Up @@ -134,3 +136,11 @@ impl BlocklistChecker for NoopApiClient {
todo!()
}
}

impl TryFrom<&[Url]> for MockBitcoinInteract {
type Error = Error;

fn try_from(_: &[Url]) -> Result<Self, Self::Error> {
Ok(Self::default())
}
}
Loading

0 comments on commit a93cab4

Please sign in to comment.