From 9637ead80caa19ff0a1eec2bd5a9acf2d70ea28a Mon Sep 17 00:00:00 2001 From: Cyle Witruk <35576205+cylewitruk@users.noreply.github.com> Date: Tue, 1 Oct 2024 17:04:00 +0200 Subject: [PATCH] feat: run transaction coordinator (#591) --- signer/src/bitcoin/client.rs | 4 +- signer/src/bitcoin/mod.rs | 6 +- signer/src/bitcoin/rpc.rs | 4 +- signer/src/block_observer.rs | 16 +- signer/src/context/messaging.rs | 10 +- signer/src/context/mod.rs | 79 +++++ signer/src/main.rs | 22 +- signer/src/testing/api_clients.rs | 14 +- signer/src/testing/context.rs | 315 ++++++++++++++++++ signer/src/testing/mod.rs | 1 + signer/src/testing/transaction_coordinator.rs | 234 +++++++------ signer/src/testing/wsts.rs | 2 +- signer/src/transaction_coordinator.rs | 103 ++++-- 13 files changed, 648 insertions(+), 162 deletions(-) create mode 100644 signer/src/testing/context.rs diff --git a/signer/src/bitcoin/client.rs b/signer/src/bitcoin/client.rs index 265429ee..160e6a78 100644 --- a/signer/src/bitcoin/client.rs +++ b/signer/src/bitcoin/client.rs @@ -50,11 +50,11 @@ impl BitcoinInteract for ApiFallbackClient { .await } - fn get_tx(&self, txid: &Txid) -> Result, Error> { + async fn get_tx(&self, txid: &Txid) -> Result, Error> { self.get_client().get_tx(txid) } - fn get_tx_info( + async fn get_tx_info( &self, txid: &Txid, block_hash: &BlockHash, diff --git a/signer/src/bitcoin/mod.rs b/signer/src/bitcoin/mod.rs index bf8228fb..8f4dadc9 100644 --- a/signer/src/bitcoin/mod.rs +++ b/signer/src/bitcoin/mod.rs @@ -19,7 +19,7 @@ pub mod zmq; /// Represents the ability to interact with the bitcoin blockchain #[cfg_attr(any(test, feature = "testing"), mockall::automock())] -pub trait BitcoinInteract { +pub trait BitcoinInteract: Sync + Send { /// Get block fn get_block( &self, @@ -27,14 +27,14 @@ pub trait BitcoinInteract { ) -> impl Future, Error>> + Send; /// get tx - fn get_tx(&self, txid: &Txid) -> Result, Error>; + fn get_tx(&self, txid: &Txid) -> impl Future, Error>>; /// get tx info fn get_tx_info( &self, txid: &Txid, block_hash: &BlockHash, - ) -> Result, Error>; + ) -> impl Future, Error>>; /// Estimate fee rate // This should be implemented with the help of the `fees::EstimateFees` trait diff --git a/signer/src/bitcoin/rpc.rs b/signer/src/bitcoin/rpc.rs index d817b772..3a9259b3 100644 --- a/signer/src/bitcoin/rpc.rs +++ b/signer/src/bitcoin/rpc.rs @@ -327,11 +327,11 @@ impl BitcoinInteract for BitcoinCoreClient { self.get_block(block_hash) } - fn get_tx(&self, txid: &Txid) -> Result, Error> { + async fn get_tx(&self, txid: &Txid) -> Result, Error> { self.get_tx(txid) } - fn get_tx_info( + async fn get_tx_info( &self, txid: &Txid, block_hash: &BlockHash, diff --git a/signer/src/block_observer.rs b/signer/src/block_observer.rs index 1561a26f..420e8f1d 100644 --- a/signer/src/block_observer.rs +++ b/signer/src/block_observer.rs @@ -18,6 +18,7 @@ //! - Set aggregate key transactions use std::collections::HashMap; +use std::future::Future; use crate::bitcoin::BitcoinInteract; use crate::context::Context; @@ -74,12 +75,12 @@ pub struct Deposit { } impl DepositRequestValidator for CreateDepositRequest { - fn validate(&self, client: &C) -> Result + async fn validate(&self, client: &C) -> Result where C: BitcoinInteract, { // Fetch the transaction from either a block or from the mempool - let Some(response) = client.get_tx(&self.outpoint.txid)? else { + let Some(response) = client.get_tx(&self.outpoint.txid).await? else { return Err(Error::BitcoinTxMissing(self.outpoint.txid)); }; @@ -98,7 +99,7 @@ pub trait DepositRequestValidator { /// This function fetches the transaction using the given client and /// checks that the transaction has been submitted. The transaction /// need not be confirmed. - fn validate(&self, client: &C) -> Result + fn validate(&self, client: &C) -> impl Future> where C: BitcoinInteract; } @@ -158,6 +159,7 @@ where for request in deposit_requests { let deposit = request .validate(&self.context.get_bitcoin_client()) + .await .inspect_err(|error| tracing::warn!(%error, "could not validate deposit request")); if let Ok(deposit) = deposit { @@ -799,11 +801,15 @@ mod tests { } impl BitcoinInteract for TestHarness { - fn get_tx(&self, txid: &bitcoin::Txid) -> Result, Error> { + async fn get_tx(&self, txid: &bitcoin::Txid) -> Result, Error> { Ok(self.deposits.get(txid).cloned()) } - fn get_tx_info(&self, _: &Txid, _: &BlockHash) -> Result, Error> { + async fn get_tx_info( + &self, + _: &Txid, + _: &BlockHash, + ) -> Result, Error> { unimplemented!() } diff --git a/signer/src/context/messaging.rs b/signer/src/context/messaging.rs index 908b50d1..61b3e9fa 100644 --- a/signer/src/context/messaging.rs +++ b/signer/src/context/messaging.rs @@ -2,7 +2,7 @@ //! messaging via the [`Context`]. /// Signals that can be sent within the signer binary. -#[derive(Debug, Clone)] +#[derive(Debug, Clone, PartialEq)] pub enum SignerSignal { /// Send a command to the application. Command(SignerCommand), @@ -11,14 +11,14 @@ pub enum SignerSignal { } /// Commands that can be sent on the signalling channel. -#[derive(Debug, Clone)] +#[derive(Debug, Clone, PartialEq)] pub enum SignerCommand { /// Signals to the application to publish a message to the P2P network. P2PPublish(crate::network::Msg), } /// Events that can be received on the signalling channel. -#[derive(Debug, Clone)] +#[derive(Debug, Clone, PartialEq)] pub enum SignerEvent { /// Signals that a P2P event has occurred. P2P(P2PEvent), @@ -27,7 +27,7 @@ pub enum SignerEvent { } /// Events that can be triggered from the P2P network. -#[derive(Debug, Clone)] +#[derive(Debug, Clone, PartialEq)] pub enum P2PEvent { /// Signals to the application that the P2P publish failed for the given message. PublishFailure(crate::network::MsgId), @@ -41,7 +41,7 @@ pub enum P2PEvent { } /// Events that can be triggered from the transaction signer. -#[derive(Debug, Clone)] +#[derive(Debug, Clone, PartialEq)] pub enum TxSignerEvent { /// Received a deposit decision ReceivedDepositDecision, diff --git a/signer/src/context/mod.rs b/signer/src/context/mod.rs index 95da5edf..32ae5b34 100644 --- a/signer/src/context/mod.rs +++ b/signer/src/context/mod.rs @@ -148,3 +148,82 @@ where self.bitcoin_client.clone() } } + +#[cfg(test)] +mod tests { + use std::sync::{ + atomic::{AtomicU8, Ordering}, + Arc, + }; + + use tokio::sync::Notify; + + use crate::{ + config::Settings, + context::{Context as _, SignerEvent, SignerSignal}, + storage::in_memory::Store, + testing::NoopSignerContext, + }; + + /// This test shows that cloning a context and signalling on the original + /// context will also signal on the cloned context. But it also demonstrates + /// that there can be timing issues (particularly in tests) when signalling + /// across threads/clones, and shows how to handle that. + #[tokio::test] + async fn context_clone_signalling_works() { + // Create a context. + let context = NoopSignerContext::init( + Settings::new_from_default_config().unwrap(), + Store::new_shared(), + ) + .unwrap(); + + // Clone the context. + let context_clone = context.clone(); + + // Get the receiver from the cloned context. + let mut cloned_receiver = context_clone.get_signal_receiver(); + + // Create a counter to track how many signals are received and some + // Notify channels so that we ensure we don't hit timing issues. + let recv_count = Arc::new(AtomicU8::new(0)); + let task_started = Arc::new(Notify::new()); + let task_completed = Arc::new(Notify::new()); + + // Spawn a task that will receive a signal (and clone values that will + // be used in the `move` closure). We will receive on the cloned context. + let task_started_clone = Arc::clone(&task_started); + let task_completed_clone = Arc::clone(&task_completed); + let recv_count_clone = Arc::clone(&recv_count); + tokio::spawn(async move { + task_started_clone.notify_one(); + let signal = cloned_receiver.recv().await.unwrap(); + + assert_eq!( + signal, + SignerSignal::Event(SignerEvent::BitcoinBlockObserved) + ); + + recv_count_clone.fetch_add(1, Ordering::Relaxed); + task_completed_clone.notify_one(); + }); + + // This wait is needed to ensure that the `recv_task` is started and + // the receiver subscribed before we send the signal. Otherwise, the + // signal may be sent before the receiver is ready to receive it, + // failing the test. + task_started.notified().await; + + // Signal the original context. + context + .signal(SignerEvent::BitcoinBlockObserved.into()) + .unwrap(); + + // This wait is needed to ensure that the below `abort()` doesn't + // kill the task before it has a chance to update `recv_count`. + task_completed.notified().await; + + // Ensure that the signal was received. + assert_eq!(recv_count.load(std::sync::atomic::Ordering::Relaxed), 1); + } +} diff --git a/signer/src/main.rs b/signer/src/main.rs index b50ccd59..b94da82e 100644 --- a/signer/src/main.rs +++ b/signer/src/main.rs @@ -1,5 +1,6 @@ use std::collections::HashMap; use std::path::PathBuf; +use std::time::Duration; use axum::routing::get; use axum::routing::post; @@ -17,8 +18,10 @@ use signer::context::SignerContext; use signer::emily_client::EmilyClient; use signer::error::Error; use signer::network::libp2p::SignerSwarmBuilder; +use signer::network::P2PNetwork; use signer::stacks::api::StacksClient; use signer::storage::postgres::PgStore; +use signer::transaction_coordinator; use signer::util::ApiFallbackClient; use tokio::signal; @@ -83,6 +86,7 @@ async fn main() -> Result<(), Box> { run_checked(run_stacks_event_observer, &context), run_checked(run_libp2p_swarm, &context), run_checked(run_block_observer, &context), + run_checked(run_transaction_coordinator, &context), ); Ok(()) @@ -155,7 +159,6 @@ async fn run_shutdown_signal_watcher(ctx: impl Context) -> Result<(), Error> { tracing::info!(signal = "Ctrl+C", "received termination signal"); } } - } } @@ -261,6 +264,19 @@ async fn run_transaction_signer(_ctx: impl Context) -> Result<(), Error> { } #[allow(dead_code)] // Remove when implemented -async fn run_transaction_coordinator(_ctx: impl Context) -> Result<(), Error> { - todo!() +async fn run_transaction_coordinator(ctx: impl Context) -> Result<(), Error> { + let config = ctx.config().clone(); + let network = P2PNetwork::new(&ctx); + + let coord = transaction_coordinator::TxCoordinatorEventLoop { + network, + context: ctx, + context_window: 10000, + private_key: config.signer.private_key, + signing_round_max_duration: Duration::from_secs(10), + threshold: 2, + bitcoin_network: config.signer.network.into(), + }; + + coord.run().await } diff --git a/signer/src/testing/api_clients.rs b/signer/src/testing/api_clients.rs index 261e6f41..dc6355bd 100644 --- a/signer/src/testing/api_clients.rs +++ b/signer/src/testing/api_clients.rs @@ -5,6 +5,7 @@ use url::Url; use crate::bitcoin::rpc::BitcoinTxInfo; use crate::bitcoin::rpc::GetTxResponse; use crate::bitcoin::BitcoinInteract; +use crate::bitcoin::MockBitcoinInteract; use crate::blocklist_client::BlocklistChecker; use crate::emily_client::EmilyInteract; use crate::error::Error; @@ -24,10 +25,11 @@ impl TryFrom<&[Url]> for NoopApiClient { /// Noop implementation of the BitcoinInteract trait. impl BitcoinInteract for NoopApiClient { - fn get_tx(&self, _: &bitcoin::Txid) -> Result, Error> { + async fn get_tx(&self, _: &bitcoin::Txid) -> Result, Error> { unimplemented!() } - fn get_tx_info( + + async fn get_tx_info( &self, _: &bitcoin::Txid, _: &bitcoin::BlockHash, @@ -134,3 +136,11 @@ impl BlocklistChecker for NoopApiClient { todo!() } } + +impl TryFrom<&[Url]> for MockBitcoinInteract { + type Error = Error; + + fn try_from(_: &[Url]) -> Result { + Ok(Self::default()) + } +} diff --git a/signer/src/testing/context.rs b/signer/src/testing/context.rs new file mode 100644 index 00000000..0ab144ab --- /dev/null +++ b/signer/src/testing/context.rs @@ -0,0 +1,315 @@ +//! Test Context implementation + +use std::{ops::Deref, sync::Arc}; + +use bitcoin::Txid; +use tokio::sync::Mutex; + +use crate::{ + bitcoin::{rpc::GetTxResponse, BitcoinInteract, MockBitcoinInteract}, + config::Settings, + context::{Context, SignerContext}, + error::Error, + storage::in_memory::{SharedStore, Store}, +}; + +/// A [`Context`] which can be used for testing. +/// +/// This context is opinionated and uses a shared in-memory store and mocked +/// clients, which can be used to simulate different scenarios. +/// +/// This context also provides you raw access to both the inner [`SignerContext`] +/// as well as the different mocked clients, so you can modify their behavior as +/// needed. +#[derive(Clone)] +pub struct TestContext { + /// The inner [`SignerContext`] which this context wraps. + pub inner: SignerContext, + + /// The mocked bitcoin client. + pub bitcoin_client: BC, +} + +impl TestContext +where + BC: BitcoinInteract + Clone + Send + Sync, +{ + /// Create a new test context. + pub fn new(bitcoin_client: BC) -> Self { + let settings = Settings::new_from_default_config().unwrap(); + let store = Store::new_shared(); + + let context = SignerContext::new(settings, store, bitcoin_client.clone()); + + Self { inner: context, bitcoin_client } + } + + /// Get an instance of the inner bitcoin client. This will be a clone of the + /// + pub fn inner_bitcoin_client(&self) -> BC { + self.bitcoin_client.clone() + } +} + +impl TestContext> { + /// Execute a closure with a mutable reference to the inner mocked + /// bitcoin client. + pub async fn with_bitcoin_client(&mut self, f: F) + where + F: FnOnce(&mut MockBitcoinInteract), + { + let mut client = self.bitcoin_client.lock().await; + f(&mut client); + } +} + +impl Context for TestContext +where + BC: BitcoinInteract + Clone + Send + Sync, +{ + fn config(&self) -> &Settings { + self.inner.config() + } + + fn get_signal_receiver( + &self, + ) -> tokio::sync::broadcast::Receiver { + self.inner.get_signal_receiver() + } + + fn get_signal_sender(&self) -> tokio::sync::broadcast::Sender { + self.inner.get_signal_sender() + } + + fn signal(&self, signal: crate::context::SignerSignal) -> Result<(), Error> { + self.inner.signal(signal) + } + + fn get_termination_handle(&self) -> crate::context::TerminationHandle { + self.inner.get_termination_handle() + } + + fn get_storage(&self) -> impl crate::storage::DbRead + Clone + Sync + Send { + self.inner.get_storage() + } + + fn get_storage_mut( + &self, + ) -> impl crate::storage::DbRead + crate::storage::DbWrite + Clone + Sync + Send { + self.inner.get_storage_mut() + } + + fn get_bitcoin_client(&self) -> impl BitcoinInteract + Clone { + self.inner.get_bitcoin_client() + } +} + +/// A wrapper around a mock which can be cloned and shared between threads. +pub struct WrappedMock { + inner: Arc>, +} + +impl Clone for WrappedMock { + fn clone(&self) -> Self { + Self { inner: self.inner.clone() } + } +} + +impl WrappedMock { + /// Create a new wrapped mock. + pub fn new(mock: T) -> Self { + Self { + inner: Arc::new(Mutex::new(mock)), + } + } +} + +impl Deref for WrappedMock { + type Target = Mutex; + + fn deref(&self) -> &Self::Target { + &self.inner + } +} + +impl Default for WrappedMock +where + T: Default, +{ + fn default() -> Self { + Self::new(T::default()) + } +} + +impl BitcoinInteract for WrappedMock { + async fn get_block( + &self, + block_hash: &bitcoin::BlockHash, + ) -> Result, Error> { + self.inner.lock().await.get_block(block_hash).await + } + + async fn get_tx(&self, txid: &Txid) -> Result, Error> { + self.inner.lock().await.get_tx(txid).await + } + + async fn get_tx_info( + &self, + txid: &bitcoin::Txid, + block_hash: &bitcoin::BlockHash, + ) -> Result, Error> { + self.inner.lock().await.get_tx_info(txid, block_hash).await + } + + async fn estimate_fee_rate(&self) -> Result { + self.inner.lock().await.estimate_fee_rate().await + } + + async fn get_last_fee( + &self, + utxo: bitcoin::OutPoint, + ) -> Result, Error> { + self.inner.lock().await.get_last_fee(utxo).await + } + + async fn broadcast_transaction(&self, tx: &bitcoin::Transaction) -> Result<(), Error> { + self.inner.lock().await.broadcast_transaction(tx).await + } +} + +#[cfg(test)] +mod tests { + use std::{ + sync::{ + atomic::{AtomicBool, AtomicU8, Ordering}, + Arc, + }, + time::Duration, + }; + + use tokio::sync::Notify; + + use crate::{ + bitcoin::MockBitcoinInteract, + context::{Context as _, SignerEvent, SignerSignal}, + testing::context::{TestContext, WrappedMock}, + }; + + /// This test ensures that the context can be cloned and signals can be sent + /// to both clones. + #[tokio::test] + async fn context_clone_signalling_works() { + let context = Arc::new(TestContext::new( + WrappedMock::::default(), + )); + let mut recv = context.get_signal_receiver(); + let recv_count = Arc::new(AtomicU8::new(0)); + + let recv1 = tokio::spawn(async move { + let signal = recv.recv().await.unwrap(); + assert_eq!( + signal, + SignerSignal::Event(SignerEvent::BitcoinBlockObserved) + ); + dbg!(&signal); + signal + }); + + let context_clone = Arc::clone(&context); + let recv_count_clone = Arc::clone(&recv_count); + let recv_task_started = Arc::new(AtomicBool::new(false)); + let recv_task_started_clone = Arc::clone(&recv_task_started); + let recv_signal_received = Arc::new(AtomicBool::new(false)); + let recv_signal_received_clone = Arc::clone(&recv_signal_received); + + let recv_task = tokio::spawn(async move { + let mut cloned_receiver = context_clone.get_signal_receiver(); + recv_task_started_clone.store(true, Ordering::Relaxed); + let signal = cloned_receiver.recv().await.unwrap(); + assert_eq!( + signal, + SignerSignal::Event(SignerEvent::BitcoinBlockObserved) + ); + recv_count_clone.fetch_add(1, Ordering::Relaxed); + recv_signal_received_clone.store(true, Ordering::Relaxed); + signal + }); + + while !recv_task_started.load(Ordering::Relaxed) { + tokio::time::sleep(Duration::from_millis(10)).await; + } + + context + .signal(SignerEvent::BitcoinBlockObserved.into()) + .unwrap(); + + while !recv_signal_received.load(Ordering::Relaxed) { + tokio::time::sleep(Duration::from_millis(10)).await; + } + + recv_task.abort(); + recv1.abort(); + + assert_eq!(recv_count.load(Ordering::Relaxed), 1); + } + + /// This test demonstrates that cloning a broadcast channel and subscribing to + /// it from multiple tasks works as expected (as according to the docs, but + /// there were some weird issues in some tests that behaved as-if the cloning + /// wasn't working as expected). + #[tokio::test] + async fn test_tokio_broadcast_clone_assumptions() { + let (tx1, mut rx1) = tokio::sync::broadcast::channel(100); + let tx2 = tx1.clone(); + let mut rx2 = tx2.subscribe(); + + assert_eq!(tx1.receiver_count(), 2); + + let count = Arc::new(AtomicU8::new(0)); + let count1 = Arc::clone(&count); + let count2 = Arc::clone(&count); + + let task1_started = Arc::new(Notify::new()); + let task1_started_clone = Arc::clone(&task1_started); + + let task1 = tokio::spawn(async move { + task1_started_clone.notify_one(); + + while let Ok(_) = rx2.recv().await { + count1.fetch_add(1, Ordering::Relaxed); + } + }); + + task1_started.notified().await; + + tx1.send(1).unwrap(); + + let task2_started = Arc::new(Notify::new()); + let task2_started_clone = Arc::clone(&task2_started); + + let task2 = tokio::spawn(async move { + task2_started_clone.notify_one(); + + while let Ok(_) = rx1.recv().await { + count2.fetch_add(1, Ordering::Relaxed); + } + }); + + task2_started.notified().await; + + tx2.send(2).unwrap(); + tx1.send(3).unwrap(); + tx1.send(4).unwrap(); + + // Just to ensure that the tasks have a chance to process the messages. + tokio::time::sleep(Duration::from_millis(100)).await; + + task1.abort(); + task2.abort(); + + // You might expect this to be 7 since we start the 2nd event loop + // after the first send, but the subscriptions are created at the + // beginning of this test, so the messages are buffered in the channel. + assert_eq!(count.load(Ordering::Relaxed), 8); + } +} diff --git a/signer/src/testing/mod.rs b/signer/src/testing/mod.rs index 2285fbd3..8c6c1331 100644 --- a/signer/src/testing/mod.rs +++ b/signer/src/testing/mod.rs @@ -3,6 +3,7 @@ #![allow(clippy::unwrap_in_result, clippy::unwrap_used, clippy::expect_used)] pub mod api_clients; +pub mod context; pub mod dummy; pub mod message; pub mod network; diff --git a/signer/src/testing/transaction_coordinator.rs b/signer/src/testing/transaction_coordinator.rs index dfa5f9b2..beb20330 100644 --- a/signer/src/testing/transaction_coordinator.rs +++ b/signer/src/testing/transaction_coordinator.rs @@ -1,17 +1,24 @@ //! Test utilities for the transaction coordinator use std::cell::RefCell; +use std::sync::atomic::AtomicBool; +use std::sync::atomic::Ordering; +use std::sync::Arc; use std::time::Duration; use crate::bitcoin::utxo::SignerUtxo; +use crate::bitcoin::MockBitcoinInteract; +use crate::context::Context; +use crate::context::SignerEvent; use crate::error; use crate::keys; use crate::keys::PrivateKey; use crate::keys::PublicKey; use crate::keys::SignerScriptPubKey; use crate::network; -use crate::storage; use crate::storage::model; +use crate::storage::DbRead as _; +use crate::storage::DbWrite; use crate::testing; use crate::testing::storage::model::TestData; use crate::testing::wsts::SignerSet; @@ -20,6 +27,9 @@ use crate::transaction_coordinator; use rand::SeedableRng as _; use sha2::Digest as _; +use super::context::TestContext; +use super::context::WrappedMock; + const EMPTY_BITCOIN_TX: bitcoin::Transaction = bitcoin::Transaction { version: bitcoin::transaction::Version::ONE, lock_time: bitcoin::absolute::LockTime::ZERO, @@ -27,87 +37,68 @@ const EMPTY_BITCOIN_TX: bitcoin::Transaction = bitcoin::Transaction { output: vec![], }; -struct EventLoopHarness { - event_loop: EventLoop, - block_observer_notification_tx: tokio::sync::watch::Sender<()>, - storage: S, +struct EventLoopHarness { + event_loop: EventLoop, + context: C, + is_started: Arc, } -impl EventLoopHarness +impl EventLoopHarness where - S: storage::DbRead + storage::DbWrite + Clone + Send + 'static, - C: crate::bitcoin::BitcoinInteract + Send + 'static, + C: Context + 'static, { fn create( + context: C, network: network::in_memory::MpmcBroadcaster, - storage: S, - bitcoin_client: C, context_window: usize, private_key: PrivateKey, threshold: u16, ) -> Self { - let (block_observer_notification_tx, block_observer_notifications) = - tokio::sync::watch::channel(()); - Self { event_loop: transaction_coordinator::TxCoordinatorEventLoop { - storage: storage.clone(), + context: context.clone(), network, - block_observer_notifications, private_key, context_window, threshold, - bitcoin_client, bitcoin_network: bitcoin::Network::Testnet, signing_round_max_duration: Duration::from_secs(10), }, - block_observer_notification_tx, - storage, + context, + is_started: Arc::new(AtomicBool::new(false)), } } - pub fn start(self) -> RunningEventLoopHandle { - let block_observer_notification_tx = self.block_observer_notification_tx; - let join_handle = tokio::spawn(async { self.event_loop.run().await }); - let storage = self.storage; + pub async fn start(self) -> RunningEventLoopHandle { + let is_started = self.is_started.clone(); + let join_handle = tokio::spawn(async move { + is_started.store(true, Ordering::SeqCst); + self.event_loop.run().await + }); + + while !self.is_started.load(Ordering::SeqCst) { + tokio::time::sleep(Duration::from_millis(10)).await; + } RunningEventLoopHandle { + context: self.context.clone(), join_handle, - block_observer_notification_tx, - storage, } } } -type EventLoop = - transaction_coordinator::TxCoordinatorEventLoop; +type EventLoop = + transaction_coordinator::TxCoordinatorEventLoop; -struct RunningEventLoopHandle { +struct RunningEventLoopHandle { + context: C, join_handle: tokio::task::JoinHandle>, - block_observer_notification_tx: tokio::sync::watch::Sender<()>, - storage: S, -} - -impl RunningEventLoopHandle { - /// Stop event loop - pub async fn stop_event_loop(self) -> S { - // While this explicit drop isn't strictly necessary, it serves to clarify our intention. - drop(self.block_observer_notification_tx); - - tokio::time::timeout(Duration::from_secs(10), self.join_handle) - .await - .unwrap() - .expect("joining event loop failed") - .expect("event loop returned error"); - - self.storage - } } /// Test environment. -pub struct TestEnvironment { - /// Function to construct a storage instance - pub storage_constructor: C, +pub struct TestEnvironment { + /// Signer context + pub context: Context, /// Bitcoin context window pub context_window: usize, /// Num signers @@ -118,17 +109,15 @@ pub struct TestEnvironment { pub test_model_parameters: testing::storage::model::Params, } -impl TestEnvironment -where - C: FnMut() -> S, - S: storage::DbRead + storage::DbWrite + Clone + Send + 'static, -{ +impl TestEnvironment>> { /// Assert that a coordinator should be able to coordiante a signing round pub async fn assert_should_be_able_to_coordinate_signing_rounds(mut self) { + // Get a handle to our mocked bitcoin client. + let mock_bitcoin_client = self.context.inner_bitcoin_client(); + let mut rng = rand::rngs::StdRng::seed_from_u64(46); let network = network::in_memory::Network::new(); let signer_info = testing::wsts::generate_signer_info(&mut rng, self.num_signers); - let mut storage = (self.storage_constructor)(); let mut testing_signer_set = testing::wsts::SignerSet::new(&signer_info, self.signing_threshold as u32, || { @@ -136,7 +125,7 @@ where }); let (aggregate_key, bitcoin_chain_tip, mut test_data) = self - .prepare_database_and_run_dkg(&mut storage, &mut rng, &mut testing_signer_set) + .prepare_database_and_run_dkg(&mut rng, &mut testing_signer_set) .await; let original_test_data = test_data.clone(); @@ -151,77 +140,99 @@ where test_data.push_sbtc_txs(&bitcoin_chain_tip, vec![tx_1.clone()]); test_data.remove(original_test_data); - Self::write_test_data(&test_data, &mut storage).await; - - let mut mock_bitcoin_client = crate::bitcoin::MockBitcoinInteract::new(); - - mock_bitcoin_client - .expect_estimate_fee_rate() - .times(1) - .returning(|| Box::pin(async { Ok(1.3) })); - - mock_bitcoin_client - .expect_get_last_fee() - .once() - .returning(|_| Box::pin(async { Ok(None) })); - - // TODO: multiple transactions can be generated and keeping this - // too low will cause issues. Figure out why. - let (broadcasted_tx_sender, mut broadcasted_tx_receiver) = tokio::sync::mpsc::channel(100); + self.write_test_data(&test_data).await; + + self.context + .with_bitcoin_client(|client| { + client + .expect_estimate_fee_rate() + .times(1) + .returning(|| Box::pin(async { Ok(1.3) })); + + client + .expect_get_last_fee() + .once() + .returning(|_| Box::pin(async { Ok(None) })); + }) + .await; + // Create a channel to log all transactions broadcasted by the coordinator. + // The receiver is created by this method but not used as it is held as a + // handle to ensure that the channel is alive until the end of the test. + // This is because the coordinator will produce multiple transactions after + // the first, and it will panic trying to send to the channel if it is closed + // (even though we don't use those transactions). + let (broadcasted_transaction_tx, _broadcasted_transaction_rxeiver) = + tokio::sync::broadcast::channel(1); + + // This task logs all transactions broadcasted by the coordinator. + let mut wait_for_transaction_rx = broadcasted_transaction_tx.subscribe(); + let wait_for_transaction_task = + tokio::spawn(async move { wait_for_transaction_rx.recv().await }); + + // Setup the bitcoin client mock to broadcast the transaction to our + // channel. mock_bitcoin_client + .lock() + .await .expect_broadcast_transaction() .times(1..) .returning(move |tx| { let tx = tx.clone(); - let broadcasted_tx_sender = broadcasted_tx_sender.clone(); + let broadcasted_transaction_tx = broadcasted_transaction_tx.clone(); Box::pin(async move { - broadcasted_tx_sender + broadcasted_transaction_tx .send(tx) - .await .expect("Failed to send result"); Ok(()) }) }); + // Get the private key of the coordinator of the signer set. let private_key = Self::select_coordinator(&bitcoin_chain_tip.block_hash, &signer_info); + // Bootstrap the tx coordinator within an event loop harness. let event_loop_harness = EventLoopHarness::create( + self.context.clone(), network.connect(), - storage, - mock_bitcoin_client, self.context_window, private_key, self.signing_threshold, ); - let handle = event_loop_harness.start(); + // Start the tx coordinator run loop. + let handle = event_loop_harness.start().await; + // Start the in-memory signer set. let _signers_handle = tokio::spawn(async move { testing_signer_set .participate_in_signing_rounds_forever() .await }); + // Signal `BitcoinBlockObserved` to trigger the coordinator. handle - .block_observer_notification_tx - .send(()) - .expect("failed to send notification"); + .context + .signal(SignerEvent::BitcoinBlockObserved.into()) + .expect("failed to signal"); - let future = broadcasted_tx_receiver.recv(); - let broadcasted_tx = tokio::time::timeout(Duration::from_secs(10), future) + // Await the `wait_for_tx_task` to receive the first transaction broadcasted. + let broadcasted_tx = wait_for_transaction_task .await - .unwrap() - .unwrap(); + .expect("failed to receive message") + .expect("no message received"); + // Extract the first script pubkey from the broadcasted transaction. let first_script_pubkey = broadcasted_tx .tx_out(0) .expect("missing tx output") .script_pubkey .clone(); - handle.stop_event_loop().await; + // Stop the event loop + handle.join_handle.abort(); + // Perform assertions assert_eq!(first_script_pubkey, aggregate_key.signers_script_pubkey()); } @@ -230,7 +241,6 @@ where let mut rng = rand::rngs::StdRng::seed_from_u64(46); let network = network::in_memory::Network::new(); let signer_info = testing::wsts::generate_signer_info(&mut rng, self.num_signers); - let mut storage = (self.storage_constructor)(); let mut signer_set = testing::wsts::SignerSet::new(&signer_info, self.signing_threshold as u32, || { @@ -238,7 +248,7 @@ where }); let (aggregate_key, bitcoin_chain_tip, mut test_data) = self - .prepare_database_and_run_dkg(&mut storage, &mut rng, &mut signer_set) + .prepare_database_and_run_dkg(&mut rng, &mut signer_set) .await; let original_test_data = test_data.clone(); @@ -273,16 +283,20 @@ where }; test_data.remove(original_test_data); - Self::write_test_data(&test_data, &mut storage).await; + self.write_test_data(&test_data).await; - let chain_tip = storage + let chain_tip = self + .context + .get_storage() .get_bitcoin_canonical_chain_tip() .await .expect("storage failure") .expect("missing block"); assert_eq!(chain_tip, block_ref.block_hash); - let signer_utxo = storage + let signer_utxo = self + .context + .get_storage() .get_signer_utxo(&chain_tip, &aggregate_key) .await .unwrap() @@ -296,7 +310,6 @@ where let mut rng = rand::rngs::StdRng::seed_from_u64(46); let network = network::in_memory::Network::new(); let signer_info = testing::wsts::generate_signer_info(&mut rng, self.num_signers); - let mut storage = (self.storage_constructor)(); let mut signer_set = testing::wsts::SignerSet::new(&signer_info, self.signing_threshold as u32, || { @@ -304,7 +317,7 @@ where }); let (aggregate_key, bitcoin_chain_tip, test_data) = self - .prepare_database_and_run_dkg(&mut storage, &mut rng, &mut signer_set) + .prepare_database_and_run_dkg(&mut rng, &mut signer_set) .await; let original_test_data = test_data.clone(); @@ -362,7 +375,7 @@ where let mut test_data = test_data_rc.into_inner(); test_data.remove(original_test_data); - Self::write_test_data(&test_data, &mut storage).await; + self.write_test_data(&test_data).await; for (chain_tip, tx, amt) in [ (&block_a1, &tx_a1, 0xA1), @@ -379,7 +392,9 @@ where amount: amt, public_key: bitcoin::XOnlyPublicKey::from(aggregate_key), }; - let signer_utxo = storage + let signer_utxo = self + .context + .get_storage() .get_signer_utxo(&chain_tip.block_hash, &aggregate_key) .await .unwrap() @@ -393,7 +408,6 @@ where let mut rng = rand::rngs::StdRng::seed_from_u64(46); let network = network::in_memory::Network::new(); let signer_info = testing::wsts::generate_signer_info(&mut rng, self.num_signers); - let mut storage = (self.storage_constructor)(); let mut signer_set = testing::wsts::SignerSet::new(&signer_info, self.signing_threshold as u32, || { @@ -401,7 +415,7 @@ where }); let (aggregate_key, bitcoin_chain_tip, mut test_data) = self - .prepare_database_and_run_dkg(&mut storage, &mut rng, &mut signer_set) + .prepare_database_and_run_dkg(&mut rng, &mut signer_set) .await; let original_test_data = test_data.clone(); @@ -459,16 +473,20 @@ where }; test_data.remove(original_test_data); - Self::write_test_data(&test_data, &mut storage).await; + self.write_test_data(&test_data).await; - let chain_tip = storage + let chain_tip = self + .context + .get_storage() .get_bitcoin_canonical_chain_tip() .await .expect("storage failure") .expect("missing block"); assert_eq!(chain_tip, block_ref.block_hash); - let signer_utxo = storage + let signer_utxo = self + .context + .get_storage() .get_signer_utxo(&chain_tip, &aggregate_key) .await .unwrap() @@ -479,16 +497,17 @@ where async fn prepare_database_and_run_dkg( &mut self, - storage: &mut S, rng: &mut Rng, signer_set: &mut SignerSet, ) -> (keys::PublicKey, model::BitcoinBlockRef, TestData) where Rng: rand::CryptoRng + rand::RngCore, { + let storage = self.context.get_storage_mut(); + let signer_keys = signer_set.signer_keys(); let test_data = self.generate_test_data(rng, signer_keys); - Self::write_test_data(&test_data, storage).await; + self.write_test_data(&test_data).await; let bitcoin_chain_tip = storage .get_bitcoin_canonical_chain_tip() @@ -508,7 +527,12 @@ where signer_set.run_dkg(bitcoin_chain_tip, dkg_txid, rng).await; signer_set - .write_as_rotate_keys_tx(storage, &bitcoin_chain_tip, aggregate_key, rng) + .write_as_rotate_keys_tx( + &self.context.get_storage_mut(), + &bitcoin_chain_tip, + aggregate_key, + rng, + ) .await; let encrypted_dkg_shares = all_dkg_shares.first().unwrap(); @@ -521,8 +545,8 @@ where (aggregate_key, bitcoin_chain_tip_ref, test_data) } - async fn write_test_data(test_data: &TestData, storage: &mut S) { - test_data.write_to(storage).await; + async fn write_test_data(&self, test_data: &TestData) { + test_data.write_to(&self.context.get_storage_mut()).await; } fn generate_test_data(&self, rng: &mut R, signer_keys: Vec) -> TestData diff --git a/signer/src/testing/wsts.rs b/signer/src/testing/wsts.rs index da18140b..3ce11341 100644 --- a/signer/src/testing/wsts.rs +++ b/signer/src/testing/wsts.rs @@ -495,7 +495,7 @@ impl SignerSet { /// Dump the current signer set as a dummy rotate-keys transaction to the given storage pub async fn write_as_rotate_keys_tx( &self, - storage: &mut S, + storage: &S, chain_tip: &model::BitcoinBlockHash, aggregate_key: PublicKey, rng: &mut Rng, diff --git a/signer/src/transaction_coordinator.rs b/signer/src/transaction_coordinator.rs index 78f6fcc1..43be17ce 100644 --- a/signer/src/transaction_coordinator.rs +++ b/signer/src/transaction_coordinator.rs @@ -11,13 +11,14 @@ use sha2::Digest; use crate::bitcoin::utxo; use crate::bitcoin::BitcoinInteract; +use crate::context::{messaging::SignerEvent, messaging::SignerSignal, Context}; use crate::error::Error; use crate::keys::PrivateKey; use crate::keys::PublicKey; use crate::message; use crate::network; -use crate::storage; use crate::storage::model; +use crate::storage::DbRead as _; use crate::wsts_state_machine; use crate::ecdsa::SignEcdsa as _; @@ -93,15 +94,11 @@ use wsts::state_machine::coordinator::Coordinator as _; /// BST --> DONE /// ``` #[derive(Debug)] -pub struct TxCoordinatorEventLoop { +pub struct TxCoordinatorEventLoop { + /// The signer context. + pub context: Context, /// Interface to the signer network. pub network: Network, - /// Database connection. - pub storage: Storage, - /// Bitcoin client - pub bitcoin_client: BitcoinClient, - /// Notification receiver from the block observer. - pub block_observer_notifications: tokio::sync::watch::Receiver<()>, /// Private key of the coordinator for network communication. pub private_key: PrivateKey, /// The threshold for the signer @@ -114,25 +111,47 @@ pub struct TxCoordinatorEventLoop { pub signing_round_max_duration: std::time::Duration, } -impl TxCoordinatorEventLoop +impl TxCoordinatorEventLoop where + C: Context, N: network::MessageTransfer, - S: storage::DbRead + storage::DbWrite, - B: BitcoinInteract, { /// Run the coordinator event loop #[tracing::instrument(skip(self))] pub async fn run(mut self) -> Result<(), Error> { + tracing::info!("starting transaction coordinator event loop"); + let mut term = self.context.get_termination_handle(); + let mut signal_rx = self.context.get_signal_receiver(); + loop { - match self.block_observer_notifications.changed().await { - Ok(()) => self.process_new_blocks().await?, - Err(_) => { - tracing::info!("block observer notification channel closed"); + tokio::select! { + _ = term.wait_for_shutdown() => { + tracing::info!("received termination signal"); break; - } + }, + signal = signal_rx.recv() => match signal { + // We're only interested in block observer notifications, which + // is our trigger to do some work. + Ok(SignerSignal::Event(SignerEvent::BitcoinBlockObserved)) => { + tracing::debug!("received block observer notification"); + self.process_new_blocks().await?; + }, + // If we get an error receiving, + Err(error) => { + tracing::error!(?error, "error receiving signal; application is probably shutting down"); + break; + }, + // Otherwise, we've received some other signal that we're not interested + // in, so we just continue. + _ => { + tracing::warn!("ignoring signal"); + continue; + } + }, } } - tracing::info!("shutting down transaction coordinator event loop"); + + tracing::info!("transaction coordinator event loop is stopping"); Ok(()) } @@ -140,7 +159,8 @@ where #[tracing::instrument(skip(self))] async fn process_new_blocks(&mut self) -> Result<(), Error> { let bitcoin_chain_tip = self - .storage + .context + .get_storage() .get_bitcoin_canonical_chain_tip() .await? .ok_or(Error::NoChainTip)?; @@ -227,7 +247,7 @@ where mut transaction: utxo::UnsignedTransaction<'_>, ) -> Result<(), Error> { let mut coordinator_state_machine = wsts_state_machine::CoordinatorStateMachine::load( - &mut self.storage, + &mut self.context.get_storage_mut(), aggregate_key, signer_public_keys.clone(), self.threshold, @@ -294,7 +314,8 @@ where tx_in.witness = witness; }); - self.bitcoin_client + self.context + .get_bitcoin_client() .broadcast_transaction(&transaction.tx) .await?; @@ -401,16 +422,23 @@ where &mut self, aggregate_key: &PublicKey, ) -> Result { - let fee_rate = self.bitcoin_client.estimate_fee_rate().await?; - let Some(chain_tip) = self.storage.get_bitcoin_canonical_chain_tip().await? else { + let bitcoin_client = self.context.get_bitcoin_client(); + let fee_rate = bitcoin_client.estimate_fee_rate().await?; + let Some(chain_tip) = self + .context + .get_storage() + .get_bitcoin_canonical_chain_tip() + .await? + else { return Err(Error::NoChainTip); }; let utxo = self - .storage + .context + .get_storage() .get_signer_utxo(&chain_tip, aggregate_key) .await? .ok_or(Error::MissingSignerUtxo)?; - let last_fees = self.bitcoin_client.get_last_fee(utxo.outpoint).await?; + let last_fees = bitcoin_client.get_last_fee(utxo.outpoint).await?; Ok(utxo::SignerBtcState { fee_rate, @@ -441,12 +469,14 @@ where let threshold = self.threshold; let pending_deposit_requests = self - .storage + .context + .get_storage() .get_pending_accepted_deposit_requests(bitcoin_chain_tip, context_window, threshold) .await?; let pending_withdraw_requests = self - .storage + .context + .get_storage() .get_pending_accepted_withdrawal_requests(bitcoin_chain_tip, context_window, threshold) .await?; @@ -456,7 +486,8 @@ where for req in pending_deposit_requests { let votes = self - .storage + .context + .get_storage() .get_deposit_request_signer_votes(&req.txid, req.output_index, &aggregate_key) .await?; @@ -468,7 +499,8 @@ where for req in pending_withdraw_requests { let votes = self - .storage + .context + .get_storage() .get_withdrawal_request_signer_votes(&req.qualified_id(), &aggregate_key) .await?; @@ -497,7 +529,8 @@ where bitcoin_chain_tip: &model::BitcoinBlockHash, ) -> Result<(PublicKey, BTreeSet), Error> { let last_key_rotation = self - .storage + .context + .get_storage() .get_last_key_rotation(bitcoin_chain_tip) .await? .ok_or(Error::MissingKeyRotation)?; @@ -559,12 +592,12 @@ pub fn coordinator_public_key( #[cfg(test)] mod tests { - use crate::storage; + use crate::bitcoin::MockBitcoinInteract; use crate::testing; + use crate::testing::context::{TestContext, WrappedMock}; + use crate::testing::transaction_coordinator::TestEnvironment; - fn test_environment( - ) -> testing::transaction_coordinator::TestEnvironment storage::in_memory::SharedStore> - { + fn test_environment() -> TestEnvironment>> { let test_model_parameters = testing::storage::model::Params { num_bitcoin_blocks: 20, num_stacks_blocks_per_bitcoin_block: 3, @@ -573,8 +606,10 @@ mod tests { num_signers_per_request: 7, }; + let context = TestContext::new(WrappedMock::::default()); + testing::transaction_coordinator::TestEnvironment { - storage_constructor: storage::in_memory::Store::new_shared, + context, context_window: 5, num_signers: 7, signing_threshold: 5,