diff --git a/teos/src/api/internal.rs b/teos/src/api/internal.rs index 4d8f8951..520227b3 100644 --- a/teos/src/api/internal.rs +++ b/teos/src/api/internal.rs @@ -1,4 +1,5 @@ -use std::sync::{Arc, Condvar, Mutex}; +use std::sync::{Arc, Mutex}; +use tokio::sync::Notify; use tonic::{Code, Request, Response, Status}; use triggered::Trigger; @@ -22,7 +23,7 @@ pub struct InternalAPI { /// A [Watcher] instance. watcher: Arc, /// A flag that indicates wether bitcoind is reachable or not. - bitcoind_reachable: Arc<(Mutex, Condvar)>, + bitcoind_reachable: Arc<(Mutex, Notify)>, /// A signal indicating the tower is shuting down. shutdown_trigger: Trigger, } @@ -31,7 +32,7 @@ impl InternalAPI { /// Creates a new [InternalAPI] instance. pub fn new( watcher: Arc, - bitcoind_reachable: Arc<(Mutex, Condvar)>, + bitcoind_reachable: Arc<(Mutex, Notify)>, shutdown_trigger: Trigger, ) -> Self { Self { @@ -107,7 +108,7 @@ impl PublicTowerServices for Arc { .watcher .add_appointment(appointment, req_data.signature) { - Ok((receipt, available_slots, subscription_expiry)) => { + Ok((receipt, available_slots, subscription_expiry, _)) => { Ok(Response::new(msgs::AddAppointmentResponse { locator: locator.serialize(), start_block: receipt.start_block(), diff --git a/teos/src/carrier.rs b/teos/src/carrier.rs index 841abda7..b26c6bdd 100644 --- a/teos/src/carrier.rs +++ b/teos/src/carrier.rs @@ -1,7 +1,8 @@ //! Logic related to the Carrier, the component in charge or sending/requesting transaction data from/to `bitcoind`. use std::collections::HashMap; -use std::sync::{Arc, Condvar, Mutex}; +use std::sync::{Arc, Mutex}; +use tokio::sync::Notify; use crate::responder::ConfirmationStatus; use crate::{errors, rpc_errors}; @@ -16,50 +17,56 @@ use bitcoincore_rpc::{ #[derive(Debug)] pub struct Carrier { /// The underlying bitcoin client used by the [Carrier]. - bitcoin_cli: Arc, + bitcoin_cli: Arc>, /// A flag that indicates wether bitcoind is reachable or not. - bitcoind_reachable: Arc<(Mutex, Condvar)>, + bitcoind_reachable: Arc<(Mutex, Notify)>, /// A map of receipts already issued by the [Carrier]. /// Used to prevent potentially re-sending the same transaction over and over. - issued_receipts: HashMap, + issued_receipts: Arc>>, /// The last known block header. - block_height: u32, + block_height: Arc>, } impl Carrier { /// Creates a new [Carrier] instance. pub fn new( - bitcoin_cli: Arc, - bitcoind_reachable: Arc<(Mutex, Condvar)>, + bitcoin_cli: BitcoindClient, + bitcoind_reachable: Arc<(Mutex, Notify)>, last_known_block_height: u32, ) -> Self { Carrier { - bitcoin_cli, + bitcoin_cli: Arc::new(Mutex::new(bitcoin_cli)), bitcoind_reachable, - issued_receipts: HashMap::new(), - block_height: last_known_block_height, + issued_receipts: Arc::new(Mutex::new(HashMap::new())), + block_height: Arc::new(Mutex::new(last_known_block_height)), } } /// Clears the receipts cached by the [Carrier]. Should be called periodically to prevent it from /// growing unbounded. - pub(crate) fn clear_receipts(&mut self) { - if !self.issued_receipts.is_empty() { - self.issued_receipts = HashMap::new() + pub(crate) fn clear_receipts(&self) { + let mut issued_receipts = self.issued_receipts.lock().unwrap(); + if !issued_receipts.is_empty() { + issued_receipts.clear(); } } /// Updates the last known block height by the [Carrier]. - pub(crate) fn update_height(&mut self, height: u32) { - self.block_height = height + pub(crate) fn update_height(&self, height: u32) { + *self.block_height.lock().unwrap() = height; + } + + /// Helper function to check carrier's status of bitcoind connection + fn is_bitcoind_reachable(&self) -> bool { + let (lock, _) = &*self.bitcoind_reachable; + *lock.lock().unwrap() } /// Hangs the process until bitcoind is reachable. If bitcoind is already reachable it just passes trough. - fn hang_until_bitcoind_reachable(&self) { - let (lock, notifier) = &*self.bitcoind_reachable; - let mut reachable = lock.lock().unwrap(); - while !*reachable { - reachable = notifier.wait(reachable).unwrap(); + async fn hang_until_bitcoind_reachable(&self) { + let notifier = &self.bitcoind_reachable.1; + while !self.is_bitcoind_reachable() { + notifier.notified().await; } } @@ -72,153 +79,200 @@ impl Carrier { /// Sends a [Transaction] to the Bitcoin network. /// /// Returns a [ConfirmationStatus] indicating whether the transaction was accepted by the node or not. - pub(crate) fn send_transaction(&mut self, tx: &Transaction) -> ConfirmationStatus { - self.hang_until_bitcoind_reachable(); - - if let Some(receipt) = self.issued_receipts.get(&tx.txid()) { - log::info!("Transaction already sent: {}", tx.txid()); - return *receipt; - } - - log::info!("Pushing transaction to the network: {}", tx.txid()); - let receipt = match self.bitcoin_cli.send_raw_transaction(tx) { - Ok(_) => { - // Here the transaction could, potentially, have been in mempool before the current height. - // This shouldn't really matter though. - log::info!("Transaction successfully delivered: {}", tx.txid()); - ConfirmationStatus::InMempoolSince(self.block_height) + pub(crate) async fn send_transaction(&self, tx: &Transaction) -> ConfirmationStatus { + let mut continue_looping = true; + let mut receipt: Option = None; + while continue_looping { + // We only need to loop once unless we have a bitcoind connectivity + // issue + continue_looping = false; + + // Wait until bitcoind is reachable + self.hang_until_bitcoind_reachable().await; + + if let Some(receipt) = self.issued_receipts.lock().unwrap().get(&tx.txid()) { + log::info!("Transaction already sent: {}", tx.txid()); + return *receipt; } - Err(JsonRpcError(RpcError(rpcerr))) => match rpcerr.code { - // Since we're pushing a raw transaction to the network we can face several rejections - rpc_errors::RPC_VERIFY_REJECTED => { - log::error!("Transaction couldn't be broadcast. {:?}", rpcerr); - ConfirmationStatus::Rejected(rpc_errors::RPC_VERIFY_REJECTED) - } - rpc_errors::RPC_VERIFY_ERROR => { - log::error!("Transaction couldn't be broadcast. {:?}", rpcerr); - ConfirmationStatus::Rejected(rpc_errors::RPC_VERIFY_ERROR) - } - rpc_errors::RPC_VERIFY_ALREADY_IN_CHAIN => { - log::info!( - "Transaction is already in the blockchain: {}. Getting confirmation count", - tx.txid() - ); - ConfirmationStatus::ConfirmedIn(self.get_tx_height(&tx.txid()).unwrap()) + log::info!("Pushing transaction to the network: {}", tx.txid()); + let send_raw_tx_option = self.bitcoin_cli.lock().unwrap().send_raw_transaction(tx); + match send_raw_tx_option { + Ok(_) => { + // Here the transaction could, potentially, have been in mempool before the current height. + // This shouldn't really matter though. + log::info!("Transaction successfully delivered: {}", tx.txid()); + receipt = Some(ConfirmationStatus::InMempoolSince( + *self.block_height.lock().unwrap(), + )); } - rpc_errors::RPC_DESERIALIZATION_ERROR => { - // Adding this here just for completeness. We should never end up here. The Carrier only sends txs handed by the Responder, - // who receives them from the Watcher, who checks that the tx can be properly deserialized. - log::info!("Transaction cannot be deserialized: {}", tx.txid()); - ConfirmationStatus::Rejected(rpc_errors::RPC_DESERIALIZATION_ERROR) + Err(JsonRpcError(RpcError(rpcerr))) => match rpcerr.code { + // Since we're pushing a raw transaction to the network we can face several rejections + rpc_errors::RPC_VERIFY_REJECTED => { + log::error!("Transaction couldn't be broadcast. {:?}", rpcerr); + receipt = Some(ConfirmationStatus::Rejected( + rpc_errors::RPC_VERIFY_REJECTED, + )); + } + rpc_errors::RPC_VERIFY_ERROR => { + log::error!("Transaction couldn't be broadcast. {:?}", rpcerr); + receipt = Some(ConfirmationStatus::Rejected(rpc_errors::RPC_VERIFY_ERROR)); + } + rpc_errors::RPC_VERIFY_ALREADY_IN_CHAIN => { + log::info!( + "Transaction is already in the blockchain: {}. Getting confirmation count", + tx.txid() + ); + + receipt = Some(ConfirmationStatus::ConfirmedIn( + self.get_tx_height(&tx.txid()).await.unwrap(), + )) + } + rpc_errors::RPC_DESERIALIZATION_ERROR => { + // Adding this here just for completeness. We should never end up here. The Carrier only sends txs handed by the Responder, + // who receives them from the Watcher, who checks that the tx can be properly deserialized. + log::info!("Transaction cannot be deserialized: {}", tx.txid()); + receipt = Some(ConfirmationStatus::Rejected( + rpc_errors::RPC_DESERIALIZATION_ERROR, + )); + } + _ => { + // If something else happens (unlikely but possible) log it so we can treat it in future releases. + log::error!( + "Unexpected rpc error when calling sendrawtransaction: {:?}", + rpcerr + ); + receipt = Some(ConfirmationStatus::Rejected( + errors::UNKNOWN_JSON_RPC_EXCEPTION, + )); + } + }, + Err(JsonRpcError(TransportError(_))) => { + // Connection refused, bitcoind is down. + log::error!("Connection lost with bitcoind, retrying request when possible"); + self.flag_bitcoind_unreachable(); + continue_looping = true; } - _ => { - // If something else happens (unlikely but possible) log it so we can treat it in future releases. - log::error!( - "Unexpected rpc error when calling sendrawtransaction: {:?}", - rpcerr - ); - ConfirmationStatus::Rejected(errors::UNKNOWN_JSON_RPC_EXCEPTION) + Err(e) => { + // TODO: This may need finer catching. + log::error!("Unexpected error when calling sendrawtransaction: {:?}", e); + receipt = Some(ConfirmationStatus::Rejected( + errors::UNKNOWN_JSON_RPC_EXCEPTION, + )); } - }, - Err(JsonRpcError(TransportError(_))) => { - // Connection refused, bitcoind is down. - log::error!("Connection lost with bitcoind, retrying request when possible"); - self.flag_bitcoind_unreachable(); - self.send_transaction(tx) - } - Err(e) => { - // TODO: This may need finer catching. - log::error!("Unexpected error when calling sendrawtransaction: {:?}", e); - ConfirmationStatus::Rejected(errors::UNKNOWN_JSON_RPC_EXCEPTION) - } - }; + }; + } - self.issued_receipts.insert(tx.txid(), receipt); + self.issued_receipts + .lock() + .unwrap() + .insert(tx.txid(), receipt.unwrap()); - receipt + receipt.unwrap() } /// Gets the block height at where a given [Transaction] was confirmed at (if any). - fn get_tx_height(&self, txid: &Txid) -> Option { - if let Some(block_hash) = self.get_block_hash_for_tx(txid) { - self.get_block_height(&block_hash) + async fn get_tx_height(&self, txid: &Txid) -> Option { + if let Some(block_hash) = self.get_block_hash_for_tx(txid).await { + self.get_block_height(&block_hash).await } else { None } } /// Queries the height of a given [Block](bitcoin::Block). Returns it if the block can be found. Returns [None] otherwise. - fn get_block_height(&self, block_hash: &BlockHash) -> Option { - self.hang_until_bitcoind_reachable(); - - match self.bitcoin_cli.get_block_header_info(block_hash) { - Ok(header_data) => Some(header_data.height as u32), - Err(JsonRpcError(RpcError(rpcerr))) => match rpcerr.code { - rpc_errors::RPC_INVALID_ADDRESS_OR_KEY => { - log::info!("Block not found: {}", block_hash); - None + async fn get_block_height(&self, block_hash: &BlockHash) -> Option { + self.hang_until_bitcoind_reachable().await; + + let mut continue_looping = true; + let mut block_height: Option = None; + while continue_looping { + // We only need to loop once unless we have a bitcoind connectivity + // issue + continue_looping = false; + + match self + .bitcoin_cli + .lock() + .unwrap() + .get_block_header_info(block_hash) + { + Ok(header_data) => block_height = Some(header_data.height as u32), + Err(JsonRpcError(RpcError(rpcerr))) => match rpcerr.code { + rpc_errors::RPC_INVALID_ADDRESS_OR_KEY => { + log::info!("Block not found: {}", block_hash); + } + e => { + log::error!("Unexpected error code when calling getblockheader: {}", e); + } + }, + Err(JsonRpcError(TransportError(_))) => { + // Connection refused, bitcoind is down. + log::error!("Connection lost with bitcoind, retrying request when possible"); + self.flag_bitcoind_unreachable(); + continue_looping = true; } - e => { - log::error!("Unexpected error code when calling getblockheader: {}", e); - None + // TODO: This may need finer catching. + Err(e) => { + log::error!("Unexpected JSONRPCError when calling getblockheader: {}", e); } - }, - Err(JsonRpcError(TransportError(_))) => { - // Connection refused, bitcoind is down. - log::error!("Connection lost with bitcoind, retrying request when possible"); - self.flag_bitcoind_unreachable(); - self.get_block_height(block_hash) - } - // TODO: This may need finer catching. - Err(e) => { - log::error!("Unexpected JSONRPCError when calling getblockheader: {}", e); - None } } + + block_height } /// Gets the block hash where a given [Transaction] was confirmed at (if any). - pub(crate) fn get_block_hash_for_tx(&self, txid: &Txid) -> Option { - self.hang_until_bitcoind_reachable(); - - match self.bitcoin_cli.get_raw_transaction_info(txid, None) { - Ok(tx_data) => tx_data.blockhash, - Err(JsonRpcError(RpcError(rpcerr))) => match rpcerr.code { - rpc_errors::RPC_INVALID_ADDRESS_OR_KEY => { - log::info!("Transaction not found in mempool nor blockchain: {}", txid); - None + pub(crate) async fn get_block_hash_for_tx(&self, txid: &Txid) -> Option { + self.hang_until_bitcoind_reachable().await; + + let mut continue_looping = true; + let mut block_hash: Option = None; + while continue_looping { + // We only need to loop once unless we have a bitcoind connectivity + // issue + continue_looping = false; + match self + .bitcoin_cli + .lock() + .unwrap() + .get_raw_transaction_info(txid, None) + { + Ok(tx_data) => block_hash = tx_data.blockhash, + Err(JsonRpcError(RpcError(rpcerr))) => match rpcerr.code { + rpc_errors::RPC_INVALID_ADDRESS_OR_KEY => { + log::info!("Transaction not found in mempool nor blockchain: {}", txid); + } + e => { + log::error!( + "Unexpected error code when calling getrawtransaction: {}", + e + ); + } + }, + Err(JsonRpcError(TransportError(_))) => { + // Connection refused, bitcoind is down. + log::error!("Connection lost with bitcoind, retrying request when possible"); + self.flag_bitcoind_unreachable(); + continue_looping = true; } - e => { + // TODO: This may need finer catching. + Err(e) => { log::error!( - "Unexpected error code when calling getrawtransaction: {}", + "Unexpected JSONRPCError when calling getrawtransaction: {}", e ); - None } - }, - Err(JsonRpcError(TransportError(_))) => { - // Connection refused, bitcoind is down. - log::error!("Connection lost with bitcoind, retrying request when possible"); - self.flag_bitcoind_unreachable(); - self.get_block_hash_for_tx(txid) - } - // TODO: This may need finer catching. - Err(e) => { - log::error!( - "Unexpected JSONRPCError when calling getrawtransaction: {}", - e - ); - None } } + + block_hash } } #[cfg(test)] mod tests { use super::*; - use std::thread; use crate::test_utils::{ get_random_tx, start_server, BitcoindMock, MockOptions, START_HEIGHT, TX_HEX, @@ -230,71 +284,93 @@ mod tests { impl Carrier { // Helper function to access issued_receipts in tests - pub(crate) fn get_issued_receipts(&mut self) -> &mut HashMap { - &mut self.issued_receipts + pub(crate) fn get_issued_receipts( + &self, + ) -> std::sync::MutexGuard<'_, HashMap> { + self.issued_receipts.lock().unwrap() } // Helper function to access height in tests pub(crate) fn get_height(&self) -> u32 { - self.block_height + *self.block_height.lock().unwrap() + } + + // Helper function to access bitcoind_reachable in tests + pub(crate) fn get_bitcoind_reachable(&self) -> Arc<(Mutex, Notify)> { + self.bitcoind_reachable.clone() + } + + // Helper function to update the bitcoin client in tests + pub(crate) fn update_bitcoind_cli(&self, bitcoin_cli: BitcoindClient) { + *self.bitcoin_cli.lock().unwrap() = bitcoin_cli; } } #[test] fn test_clear_receipts() { let bitcoind_mock = BitcoindMock::new(MockOptions::empty()); - let bitcoind_reachable = Arc::new((Mutex::new(true), Condvar::new())); - let bitcoin_cli = Arc::new(BitcoindClient::new(bitcoind_mock.url(), Auth::None).unwrap()); + let bitcoind_reachable = Arc::new((Mutex::new(true), Notify::new())); + let bitcoin_cli = BitcoindClient::new(bitcoind_mock.url(), Auth::None).unwrap(); let start_height = START_HEIGHT as u32; start_server(bitcoind_mock); - let mut carrier = Carrier::new(bitcoin_cli, bitcoind_reachable, start_height); - - // Lets add some dummy data into the cache - for i in 0..10 { - carrier.issued_receipts.insert( - get_random_tx().txid(), - ConfirmationStatus::ConfirmedIn(start_height - i), - ); + let carrier = Carrier::new(bitcoin_cli, bitcoind_reachable, start_height); + { + // Lets add some dummy data into the cache + let mut issued_receipts = carrier.issued_receipts.lock().unwrap(); + for i in 0..10 { + issued_receipts.insert( + get_random_tx().txid(), + ConfirmationStatus::ConfirmedIn(start_height - i), + ); + } } // Check it empties on request - assert!(!carrier.issued_receipts.is_empty()); + assert!(!carrier.issued_receipts.lock().unwrap().is_empty()); carrier.clear_receipts(); - assert!(carrier.issued_receipts.is_empty()); + assert!(carrier.issued_receipts.lock().unwrap().is_empty()); } - #[test] - fn test_send_transaction_ok() { + #[tokio::test] + async fn test_send_transaction_ok() { let bitcoind_mock = BitcoindMock::new(MockOptions::empty()); - let bitcoind_reachable = Arc::new((Mutex::new(true), Condvar::new())); - let bitcoin_cli = Arc::new(BitcoindClient::new(bitcoind_mock.url(), Auth::None).unwrap()); + let bitcoind_reachable = Arc::new((Mutex::new(true), Notify::new())); + let bitcoin_cli = BitcoindClient::new(bitcoind_mock.url(), Auth::None).unwrap(); let start_height = START_HEIGHT as u32; start_server(bitcoind_mock); - let mut carrier = Carrier::new(bitcoin_cli, bitcoind_reachable, start_height); + let carrier = Carrier::new(bitcoin_cli, bitcoind_reachable, start_height); let tx = consensus::deserialize(&Vec::from_hex(TX_HEX).unwrap()).unwrap(); - let r = carrier.send_transaction(&tx); + let r = carrier.send_transaction(&tx).await; assert_eq!(r, ConfirmationStatus::InMempoolSince(start_height)); // Check the receipt is on the cache - assert_eq!(carrier.issued_receipts.get(&tx.txid()).unwrap(), &r); + assert_eq!( + carrier + .issued_receipts + .lock() + .unwrap() + .get(&tx.txid()) + .unwrap(), + &r + ); } - #[test] - fn test_send_transaction_verify_rejected() { + #[tokio::test] + async fn test_send_transaction_verify_rejected() { let bitcoind_mock = BitcoindMock::new(MockOptions::with_error( rpc_errors::RPC_VERIFY_REJECTED as i64, )); - let bitcoind_reachable = Arc::new((Mutex::new(true), Condvar::new())); - let bitcoin_cli = Arc::new(BitcoindClient::new(bitcoind_mock.url(), Auth::None).unwrap()); + let bitcoind_reachable = Arc::new((Mutex::new(true), Notify::new())); + let bitcoin_cli = BitcoindClient::new(bitcoind_mock.url(), Auth::None).unwrap(); let start_height = START_HEIGHT as u32; start_server(bitcoind_mock); - let mut carrier = Carrier::new(bitcoin_cli, bitcoind_reachable, start_height); + let carrier = Carrier::new(bitcoin_cli, bitcoind_reachable, start_height); let tx = consensus::deserialize(&Vec::from_hex(TX_HEX).unwrap()).unwrap(); - let r = carrier.send_transaction(&tx); + let r = carrier.send_transaction(&tx).await; assert_eq!( r, @@ -302,21 +378,29 @@ mod tests { ); // Check the receipt is on the cache - assert_eq!(carrier.issued_receipts.get(&tx.txid()).unwrap(), &r); + assert_eq!( + carrier + .issued_receipts + .lock() + .unwrap() + .get(&tx.txid()) + .unwrap(), + &r + ); } - #[test] - fn test_send_transaction_verify_error() { + #[tokio::test] + async fn test_send_transaction_verify_error() { let bitcoind_mock = BitcoindMock::new(MockOptions::with_error(rpc_errors::RPC_VERIFY_ERROR as i64)); - let bitcoind_reachable = Arc::new((Mutex::new(true), Condvar::new())); - let bitcoin_cli = Arc::new(BitcoindClient::new(bitcoind_mock.url(), Auth::None).unwrap()); + let bitcoind_reachable = Arc::new((Mutex::new(true), Notify::new())); + let bitcoin_cli = BitcoindClient::new(bitcoind_mock.url(), Auth::None).unwrap(); let start_height = START_HEIGHT as u32; start_server(bitcoind_mock); - let mut carrier = Carrier::new(bitcoin_cli, bitcoind_reachable, start_height); + let carrier = Carrier::new(bitcoin_cli, bitcoind_reachable, start_height); let tx = consensus::deserialize(&Vec::from_hex(TX_HEX).unwrap()).unwrap(); - let r = carrier.send_transaction(&tx); + let r = carrier.send_transaction(&tx).await; assert_eq!( r, @@ -324,43 +408,59 @@ mod tests { ); // Check the receipt is on the cache - assert_eq!(carrier.issued_receipts.get(&tx.txid()).unwrap(), &r); + assert_eq!( + carrier + .issued_receipts + .lock() + .unwrap() + .get(&tx.txid()) + .unwrap(), + &r + ); } - #[test] - fn test_send_transaction_verify_already_in_chain() { + #[tokio::test] + async fn test_send_transaction_verify_already_in_chain() { let bitcoind_mock = BitcoindMock::new(MockOptions::new( rpc_errors::RPC_VERIFY_ALREADY_IN_CHAIN as i64, BlockHash::default(), START_HEIGHT, )); - let bitcoind_reachable = Arc::new((Mutex::new(true), Condvar::new())); - let bitcoin_cli = Arc::new(BitcoindClient::new(bitcoind_mock.url(), Auth::None).unwrap()); + let bitcoind_reachable = Arc::new((Mutex::new(true), Notify::new())); + let bitcoin_cli = BitcoindClient::new(bitcoind_mock.url(), Auth::None).unwrap(); let start_height = START_HEIGHT as u32; start_server(bitcoind_mock); - let mut carrier = Carrier::new(bitcoin_cli, bitcoind_reachable, start_height); + let carrier = Carrier::new(bitcoin_cli, bitcoind_reachable, start_height); let tx = consensus::deserialize(&Vec::from_hex(TX_HEX).unwrap()).unwrap(); - let r = carrier.send_transaction(&tx); + let r = carrier.send_transaction(&tx).await; assert_eq!(r, ConfirmationStatus::ConfirmedIn(start_height)); // Check the receipt is on the cache - assert_eq!(carrier.issued_receipts.get(&tx.txid()).unwrap(), &r); + assert_eq!( + carrier + .issued_receipts + .lock() + .unwrap() + .get(&tx.txid()) + .unwrap(), + &r + ); } - #[test] - fn test_send_transaction_unexpected_error() { + #[tokio::test] + async fn test_send_transaction_unexpected_error() { let bitcoind_mock = BitcoindMock::new(MockOptions::with_error(rpc_errors::RPC_MISC_ERROR as i64)); - let bitcoind_reachable = Arc::new((Mutex::new(true), Condvar::new())); - let bitcoin_cli = Arc::new(BitcoindClient::new(bitcoind_mock.url(), Auth::None).unwrap()); + let bitcoind_reachable = Arc::new((Mutex::new(true), Notify::new())); + let bitcoin_cli = BitcoindClient::new(bitcoind_mock.url(), Auth::None).unwrap(); let start_height = START_HEIGHT as u32; start_server(bitcoind_mock); - let mut carrier = Carrier::new(bitcoin_cli, bitcoind_reachable, start_height); + let carrier = Carrier::new(bitcoin_cli, bitcoind_reachable, start_height); let tx = consensus::deserialize(&Vec::from_hex(TX_HEX).unwrap()).unwrap(); - let r = carrier.send_transaction(&tx); + let r = carrier.send_transaction(&tx).await; assert_eq!( r, @@ -368,124 +468,139 @@ mod tests { ); // Check the receipt is on the cache - assert_eq!(carrier.issued_receipts.get(&tx.txid()).unwrap(), &r); + assert_eq!( + carrier + .issued_receipts + .lock() + .unwrap() + .get(&tx.txid()) + .unwrap(), + &r + ); } - #[test] - fn test_send_transaction_connection_error() { + #[tokio::test] + async fn test_send_transaction_connection_error() { // Try to connect to an offline bitcoind. let bitcoind_mock = BitcoindMock::new(MockOptions::empty()); - let bitcoind_reachable = Arc::new((Mutex::new(false), Condvar::new())); - let bitcoin_cli = Arc::new(BitcoindClient::new(bitcoind_mock.url(), Auth::None).unwrap()); + let bitcoind_reachable = Arc::new((Mutex::new(false), Notify::new())); + let bitcoin_cli = BitcoindClient::new(bitcoind_mock.url(), Auth::None).unwrap(); let start_height = START_HEIGHT as u32; - let mut carrier = Carrier::new(bitcoin_cli, bitcoind_reachable.clone(), start_height); + let carrier = Carrier::new(bitcoin_cli, bitcoind_reachable.clone(), start_height); let tx = consensus::deserialize(&Vec::from_hex(TX_HEX).unwrap()).unwrap(); let delay = std::time::Duration::new(3, 0); - thread::spawn(move || { - thread::sleep(delay); - let (reachable, notifier) = &*bitcoind_reachable; + let bitcoind_reachable_clone = bitcoind_reachable.clone(); + tokio::spawn(async move { + tokio::time::sleep(delay).await; + let (reachable, notifier) = &*bitcoind_reachable_clone; *reachable.lock().unwrap() = true; - notifier.notify_all(); + notifier.notify_waiters(); }); let before = std::time::Instant::now(); - carrier.send_transaction(&tx); + carrier.send_transaction(&tx).await; - // Check the request has hanged for ~delay + // Check the request has hanged for ~delay and bitcoind is now + // reachable (even though this test would not complete if + // bitcoind_reachable != true) assert_eq!( (std::time::Instant::now() - before).as_secs(), delay.as_secs() ); + assert!(*bitcoind_reachable.0.lock().unwrap()); } - #[test] - fn test_get_tx_height_ok() { + #[tokio::test] + async fn test_get_tx_height_ok() { let target_height = 21; let bitcoind_mock = BitcoindMock::new(MockOptions::with_block(BlockHash::default(), target_height)); - let bitcoind_reachable = Arc::new((Mutex::new(true), Condvar::new())); - let bitcoin_cli = Arc::new(BitcoindClient::new(bitcoind_mock.url(), Auth::None).unwrap()); + let bitcoind_reachable = Arc::new((Mutex::new(true), Notify::new())); + let bitcoin_cli = BitcoindClient::new(bitcoind_mock.url(), Auth::None).unwrap(); let start_height = START_HEIGHT as u32; start_server(bitcoind_mock); let carrier = Carrier::new(bitcoin_cli, bitcoind_reachable, start_height); let tx = consensus::deserialize::(&Vec::from_hex(TX_HEX).unwrap()).unwrap(); assert_eq!( - carrier.get_tx_height(&tx.txid()), + carrier.get_tx_height(&tx.txid()).await, Some(target_height as u32) ); } - #[test] - fn test_get_tx_height_not_found() { + #[tokio::test] + async fn test_get_tx_height_not_found() { // Hee we are not testing the case where the block hash is unknown (which will also return None). This is because we only // learn block hashes from bitcoind, and once a block is known, it cannot disappear (ir can be disconnected, but not banish). let bitcoind_mock = BitcoindMock::new(MockOptions::empty()); - let bitcoind_reachable = Arc::new((Mutex::new(true), Condvar::new())); - let bitcoin_cli = Arc::new(BitcoindClient::new(bitcoind_mock.url(), Auth::None).unwrap()); + let bitcoind_reachable = Arc::new((Mutex::new(true), Notify::new())); + let bitcoin_cli = BitcoindClient::new(bitcoind_mock.url(), Auth::None).unwrap(); let start_height = START_HEIGHT as u32; start_server(bitcoind_mock); let carrier = Carrier::new(bitcoin_cli, bitcoind_reachable, start_height); let tx = consensus::deserialize::(&Vec::from_hex(TX_HEX).unwrap()).unwrap(); - assert_eq!(carrier.get_tx_height(&tx.txid()), None); + assert_eq!(carrier.get_tx_height(&tx.txid()).await, None); } - #[test] - fn test_get_block_height_ok() { + #[tokio::test] + async fn test_get_block_height_ok() { let target_height = 21; let block_hash = BlockHash::default(); let bitcoind_mock = BitcoindMock::new(MockOptions::with_block(block_hash, target_height)); - let bitcoind_reachable = Arc::new((Mutex::new(true), Condvar::new())); - let bitcoin_cli = Arc::new(BitcoindClient::new(bitcoind_mock.url(), Auth::None).unwrap()); + let bitcoind_reachable = Arc::new((Mutex::new(true), Notify::new())); + let bitcoin_cli = BitcoindClient::new(bitcoind_mock.url(), Auth::None).unwrap(); let start_height = START_HEIGHT as u32; start_server(bitcoind_mock); let carrier = Carrier::new(bitcoin_cli, bitcoind_reachable, start_height); assert_eq!( - carrier.get_block_height(&block_hash), + carrier.get_block_height(&block_hash).await, Some(target_height as u32) ); } - #[test] - fn test_get_block_height_not_found() { + #[tokio::test] + async fn test_get_block_height_not_found() { let bitcoind_mock = BitcoindMock::new(MockOptions::empty()); - let bitcoind_reachable = Arc::new((Mutex::new(true), Condvar::new())); - let bitcoin_cli = Arc::new(BitcoindClient::new(bitcoind_mock.url(), Auth::None).unwrap()); + let bitcoind_reachable = Arc::new((Mutex::new(true), Notify::new())); + let bitcoin_cli = BitcoindClient::new(bitcoind_mock.url(), Auth::None).unwrap(); let start_height = START_HEIGHT as u32; start_server(bitcoind_mock); let carrier = Carrier::new(bitcoin_cli, bitcoind_reachable, start_height); - assert_eq!(carrier.get_block_height(&BlockHash::default()), None); + assert_eq!(carrier.get_block_height(&BlockHash::default()).await, None); } - #[test] - fn test_get_block_hash_for_tx_ok() { + #[tokio::test] + async fn test_get_block_hash_for_tx_ok() { let block_hash = BlockHash::default(); let bitcoind_mock = BitcoindMock::new(MockOptions::with_block(block_hash, 21)); - let bitcoind_reachable = Arc::new((Mutex::new(true), Condvar::new())); - let bitcoin_cli = Arc::new(BitcoindClient::new(bitcoind_mock.url(), Auth::None).unwrap()); + let bitcoind_reachable = Arc::new((Mutex::new(true), Notify::new())); + let bitcoin_cli = BitcoindClient::new(bitcoind_mock.url(), Auth::None).unwrap(); let start_height = START_HEIGHT as u32; start_server(bitcoind_mock); let tx = consensus::deserialize::(&Vec::from_hex(TX_HEX).unwrap()).unwrap(); let carrier = Carrier::new(bitcoin_cli, bitcoind_reachable, start_height); - assert_eq!(carrier.get_block_hash_for_tx(&tx.txid()), Some(block_hash)); + assert_eq!( + carrier.get_block_hash_for_tx(&tx.txid()).await, + Some(block_hash) + ); } - #[test] - fn test_get_block_hash_for_tx_not_found() { + #[tokio::test] + async fn test_get_block_hash_for_tx_not_found() { let bitcoind_mock = BitcoindMock::new(MockOptions::empty()); - let bitcoind_reachable = Arc::new((Mutex::new(true), Condvar::new())); - let bitcoin_cli = Arc::new(BitcoindClient::new(bitcoind_mock.url(), Auth::None).unwrap()); + let bitcoind_reachable = Arc::new((Mutex::new(true), Notify::new())); + let bitcoin_cli = BitcoindClient::new(bitcoind_mock.url(), Auth::None).unwrap(); let start_height = START_HEIGHT as u32; start_server(bitcoind_mock); let tx = consensus::deserialize::(&Vec::from_hex(TX_HEX).unwrap()).unwrap(); let carrier = Carrier::new(bitcoin_cli, bitcoind_reachable, start_height); - assert_eq!(carrier.get_block_hash_for_tx(&tx.txid()), None); + assert_eq!(carrier.get_block_hash_for_tx(&tx.txid()).await, None); } } diff --git a/teos/src/chain_monitor.rs b/teos/src/chain_monitor.rs index 9c9f2e68..984954d5 100644 --- a/teos/src/chain_monitor.rs +++ b/teos/src/chain_monitor.rs @@ -2,8 +2,9 @@ //! use std::ops::Deref; -use std::sync::{Arc, Condvar, Mutex}; +use std::sync::{Arc, Mutex}; use std::time; +use tokio::sync::Notify; use tokio::time::timeout; use triggered::Listener; @@ -35,7 +36,7 @@ where /// A signal from the main thread indicating the tower is shuting down. shutdown_signal: Listener, /// A flag that indicates wether bitcoind is reachable or not. - bitcoind_reachable: Arc<(Mutex, Condvar)>, + bitcoind_reachable: Arc<(Mutex, Notify)>, } impl<'a, P, C, L> ChainMonitor<'a, P, C, L> @@ -52,7 +53,7 @@ where dbm: Arc>, polling_delta_sec: u16, shutdown_signal: Listener, - bitcoind_reachable: Arc<(Mutex, Condvar)>, + bitcoind_reachable: Arc<(Mutex, Notify)>, ) -> ChainMonitor<'a, P, C, L> { ChainMonitor { spv_client, @@ -95,7 +96,7 @@ where } } *reachable.lock().unwrap() = true; - notifier.notify_all(); + notifier.notify_waiters(); } Err(e) => match e.kind() { BlockSourceErrorKind::Persistent => { @@ -133,7 +134,6 @@ mod tests { use std::cell::RefCell; use std::collections::HashSet; use std::iter::FromIterator; - use std::thread; use bitcoin::network::constants::Network; use bitcoin::BlockHash; @@ -181,7 +181,7 @@ mod tests { let poller = ChainPoller::new(&mut chain, Network::Bitcoin); let cache = &mut UnboundedCache::new(); let spv_client = SpvClient::new(tip, poller, cache, &listener); - let bitcoind_reachable = Arc::new((Mutex::new(true), Condvar::new())); + let bitcoind_reachable = Arc::new((Mutex::new(true), Notify::new())); let mut cm = ChainMonitor::new(spv_client, tip, dbm, 1, shutdown_signal, bitcoind_reachable).await; @@ -205,7 +205,7 @@ mod tests { let poller = ChainPoller::new(&mut chain, Network::Bitcoin); let cache = &mut UnboundedCache::new(); let spv_client = SpvClient::new(old_tip, poller, cache, &listener); - let bitcoind_reachable = Arc::new((Mutex::new(true), Condvar::new())); + let bitcoind_reachable = Arc::new((Mutex::new(true), Notify::new())); let mut cm = ChainMonitor::new( spv_client, @@ -244,7 +244,7 @@ mod tests { let poller = ChainPoller::new(&mut chain, Network::Bitcoin); let cache = &mut UnboundedCache::new(); let spv_client = SpvClient::new(best_tip, poller, cache, &listener); - let bitcoind_reachable = Arc::new((Mutex::new(true), Condvar::new())); + let bitcoind_reachable = Arc::new((Mutex::new(true), Notify::new())); let mut cm = ChainMonitor::new( spv_client, @@ -286,7 +286,7 @@ mod tests { let poller = ChainPoller::new(&mut chain, Network::Bitcoin); let cache = &mut UnboundedCache::new(); let spv_client = SpvClient::new(old_best, poller, cache, &listener); - let bitcoind_reachable = Arc::new((Mutex::new(true), Condvar::new())); + let bitcoind_reachable = Arc::new((Mutex::new(true), Notify::new())); let mut cm = ChainMonitor::new( spv_client, @@ -325,7 +325,7 @@ mod tests { let poller = ChainPoller::new(&mut chain, Network::Bitcoin); let cache = &mut UnboundedCache::new(); let spv_client = SpvClient::new(tip, poller, cache, &listener); - let bitcoind_reachable = Arc::new((Mutex::new(true), Condvar::new())); + let bitcoind_reachable = Arc::new((Mutex::new(true), Notify::new())); let mut cm = ChainMonitor::new( spv_client, @@ -342,12 +342,13 @@ mod tests { let (reachable, _) = &*bitcoind_reachable.clone(); assert!(!*reachable.lock().unwrap()); - // Set a thread to block on bitcoind unreachable to check that it gets notified once bitcoind comes back online - let t = thread::spawn(move || { - let (lock, notifier) = &*bitcoind_reachable; - let mut reachable = lock.lock().unwrap(); - while !*reachable { - reachable = notifier.wait(reachable).unwrap(); + // Set an async task to block on bitcoind unreachable to check that it gets notified once bitcoind comes back online + let join_handle = tokio::spawn(async move { + let (lock, notify) = &*bitcoind_reachable; + let mut reachable = *lock.lock().unwrap(); + while !reachable { + notify.notified().await; + reachable = *lock.lock().unwrap(); } }); @@ -357,6 +358,9 @@ mod tests { assert!(*reachable.lock().unwrap()); // This would hang if the cm didn't notify their subscribers about the bitcoind status, so it serves as out assert. - t.join().unwrap(); + match join_handle.await { + Ok(_) => (), + Err(_) => assert!(false), + }; } } diff --git a/teos/src/main.rs b/teos/src/main.rs index e1564c75..ddd10c52 100644 --- a/teos/src/main.rs +++ b/teos/src/main.rs @@ -3,8 +3,9 @@ use std::fs; use std::io::ErrorKind; use std::ops::{Deref, DerefMut}; use std::str::FromStr; -use std::sync::{Arc, Condvar, Mutex}; +use std::sync::{Arc, Mutex}; use structopt::StructOpt; +use tokio::sync::Notify; use tokio::task; use tonic::transport::Server; @@ -127,7 +128,7 @@ async fn main() { { Ok(client) => ( Arc::new(client), - Arc::new((Mutex::new(true), Condvar::new())), + Arc::new((Mutex::new(true), Notify::new())), ), Err(e) => { let e_msg = match e.kind() { @@ -146,13 +147,11 @@ async fn main() { } else { "" }; - let rpc = Arc::new( - Client::new( - &format!("{}{}:{}", schema, conf.btc_rpc_connect, conf.btc_rpc_port), - Auth::UserPass(conf.btc_rpc_user.clone(), conf.btc_rpc_password.clone()), - ) - .unwrap(), - ); + let rpc = Client::new( + &format!("{}{}:{}", schema, conf.btc_rpc_connect, conf.btc_rpc_port), + Auth::UserPass(conf.btc_rpc_user.clone(), conf.btc_rpc_password.clone()), + ) + .unwrap(); let mut derefed = bitcoin_cli.deref(); // Load last known block from DB if found. Poll it from Bitcoind otherwise. let tip = if let Ok(block_hash) = dbm.lock().unwrap().load_last_known_block() { diff --git a/teos/src/responder.rs b/teos/src/responder.rs index d5e7e94d..37b5717f 100644 --- a/teos/src/responder.rs +++ b/teos/src/responder.rs @@ -3,6 +3,7 @@ use std::collections::{HashMap, HashSet}; use std::iter::FromIterator; use std::sync::{Arc, Mutex}; +use tokio::task::JoinHandle; use bitcoin::consensus; use bitcoin::{BlockHeader, Transaction, Txid}; @@ -127,11 +128,11 @@ impl From for msgs::Tracker { pub struct Responder { /// A map holding a summary of every tracker ([TransactionTracker]) hold by the [Responder], identified by [UUID]. /// The identifiers match those used by the [Watcher](crate::watcher::Watcher). - trackers: Mutex>, + trackers: Arc>>, /// A map between [Txid]s and [UUID]s. - tx_tracker_map: Mutex>>, + tx_tracker_map: Arc>>>, /// A [Carrier] instance. Data is sent to the `bitcoind` through it. - carrier: Mutex, + carrier: Arc, /// A [Gatekeeper] instance. Data regarding users is requested to it. gatekeeper: Arc, /// A [DBM] (database manager) instance. Used to persist tracker data into disk. @@ -155,9 +156,9 @@ impl Responder { } Responder { - carrier: Mutex::new(carrier), - trackers: Mutex::new(trackers), - tx_tracker_map: Mutex::new(tx_tracker_map), + carrier: Arc::new(carrier), + trackers: Arc::new(Mutex::new(trackers)), + tx_tracker_map: Arc::new(Mutex::new(tx_tracker_map)), dbm, gatekeeper, } @@ -177,7 +178,7 @@ impl Responder { /// /// Breaches can either be added to the [Responder] in the form of a [TransactionTracker] if the [penalty transaction](Breach::penalty_tx) /// is accepted by the `bitcoind` or rejected otherwise. - pub(crate) fn handle_breach( + pub(crate) async fn handle_breach( &self, uuid: UUID, breach: Breach, @@ -189,11 +190,7 @@ impl Responder { return tracker.status; } - let status = self - .carrier - .lock() - .unwrap() - .send_transaction(&breach.penalty_tx); + let status = self.carrier.send_transaction(&breach.penalty_tx).await; if !matches!(status, ConfirmationStatus::Rejected { .. }) { self.add_tracker(uuid, breach, user_id, status); } @@ -365,38 +362,40 @@ impl Responder { /// otherwise, we could potentially try to rebroadcast again while processing the upcoming reorged blocks (if the tx hits [CONFIRMATIONS_BEFORE_RETRY]). /// /// Returns a tuple with two maps, one containing the trackers that where successfully rebroadcast and another one containing the ones that were rejected. - fn rebroadcast( - &self, + async fn rebroadcast( txs: HashMap)>, + trackers: Arc>>, + carrier: Arc, ) -> (HashMap, HashSet) { let mut accepted = HashMap::new(); let mut rejected = HashSet::new(); - let mut trackers = self.trackers.lock().unwrap(); - let mut carrier = self.carrier.lock().unwrap(); - for (uuid, (penalty_tx, dispute_tx)) in txs.into_iter() { let status = if let Some(dispute_tx) = dispute_tx { // The tracker was reorged out, and the dispute may potentially not be in the chain anymore. - if carrier.get_block_hash_for_tx(&dispute_tx.txid()).is_some() { + if carrier + .get_block_hash_for_tx(&dispute_tx.txid()) + .await + .is_some() + { // Dispute tx is on chain, so we only need to care about the penalty - carrier.send_transaction(&penalty_tx) + carrier.send_transaction(&penalty_tx).await } else { // Dispute tx has also been reorged out, meaning that both transactions need to be broadcast. // DISCUSS: For lightning transactions, if the dispute has been reorged the penalty cannot make it to the network. // If we keep this general, the dispute can simply be a trigger and the penalty doesn't necessarily have to spend from it. // We'll keel it lightning specific, at least for now. - let status = carrier.send_transaction(&dispute_tx); + let status = carrier.send_transaction(&dispute_tx).await; if let ConfirmationStatus::Rejected(e) = status { log::error!( - "Reorged dispute transaction rejected during rebroadcast: {} (reason: {:?})", - dispute_tx.txid(), - e - ); + "Reorged dispute transaction rejected during rebroadcast: {} (reason: {:?})", + dispute_tx.txid(), + e + ); status } else { // The dispute was accepted, so we can rebroadcast the penalty. - carrier.send_transaction(&penalty_tx) + carrier.send_transaction(&penalty_tx).await } } } else { @@ -405,7 +404,7 @@ impl Responder { "Penalty transaction has missed many confirmations: {}", penalty_tx.txid() ); - carrier.send_transaction(&penalty_tx) + carrier.send_transaction(&penalty_tx).await }; if let ConfirmationStatus::Rejected(_) = status { @@ -414,7 +413,7 @@ impl Responder { // Update the tracker if it gets accepted. This will also update the height (since when we are counting the tracker // to have been in mempool), so it resets the wait period instead of trying to rebroadcast every block. // DISCUSS: We may want to find another approach in the future for the InMempoool transactions. - trackers.get_mut(&uuid).unwrap().status = status; + trackers.lock().unwrap().get_mut(&uuid).unwrap().status = status; accepted.insert(uuid, status); } } @@ -428,8 +427,23 @@ impl Responder { /// /// Logs a different message depending on whether the trackers have been outdated or completed. fn delete_trackers_from_memory(&self, uuids: &HashSet, reason: DeletionReason) { - let mut trackers = self.trackers.lock().unwrap(); - let mut tx_tracker_map = self.tx_tracker_map.lock().unwrap(); + Responder::delete_trackers_from_memory_no_self( + uuids, + reason, + self.trackers.clone(), + self.tx_tracker_map.clone(), + ) + } + + /// See [Responder::delete_trackers_from_memory] + fn delete_trackers_from_memory_no_self( + uuids: &HashSet, + reason: DeletionReason, + trackers: Arc>>, + tx_tracker_map: Arc>>>, + ) { + let mut trackers_mutable = trackers.lock().unwrap(); + let mut tx_tracker_map_mutable = tx_tracker_map.lock().unwrap(); for uuid in uuids.iter() { match reason { DeletionReason::Completed => log::info!("Appointment completed. Penalty transaction was irrevocably confirmed: {}", uuid), @@ -437,19 +451,21 @@ impl Responder { DeletionReason::Rejected => log::info!("Appointment couldn't be completed. Either the dispute or the penalty txs where rejected during rebroadcast: {}", uuid), } - match trackers.remove(uuid) { + match trackers_mutable.remove(uuid) { Some(tracker) => { - let trackers = tx_tracker_map.get_mut(&tracker.penalty_txid).unwrap(); + let trackers_mutable = tx_tracker_map_mutable + .get_mut(&tracker.penalty_txid) + .unwrap(); - if trackers.len() == 1 { - tx_tracker_map.remove(&tracker.penalty_txid); + if trackers_mutable.len() == 1 { + tx_tracker_map_mutable.remove(&tracker.penalty_txid); log::info!( "No more trackers for penalty transaction: {}", tracker.penalty_txid ); } else { - trackers.remove(uuid); + trackers_mutable.remove(uuid); } } None => { @@ -469,29 +485,40 @@ impl Responder { updated_users: &HashMap, reason: DeletionReason, ) { - self.delete_trackers_from_memory(uuids, reason); - self.dbm - .lock() + Responder::delete_trackers_no_self( + uuids, + updated_users, + reason, + self.dbm.clone(), + self.trackers.clone(), + self.tx_tracker_map.clone(), + ) + } + + /// See [Responder::delete_trackers] + fn delete_trackers_no_self( + uuids: &HashSet, + updated_users: &HashMap, + reason: DeletionReason, + dbm: Arc>, + trackers: Arc>>, + tx_tracker_map: Arc>>>, + ) { + Responder::delete_trackers_from_memory_no_self(uuids, reason, trackers, tx_tracker_map); + dbm.lock() .unwrap() .batch_remove_appointments(uuids, updated_users); } -} -/// Listen implementation by the [Responder]. Handles monitoring and reorgs. -impl chain::Listen for Responder { - /// Handles the monitoring process by the [Responder]. - /// - /// Watching is performed in a per-block basis. A [TransactionTracker] is tracked until: - /// - It gets [irrevocably resolved](https://github.com/lightning/bolts/blob/master/05-onchain.md#general-nomenclature) or - /// - The user subscription expires - /// - The trackers becomes invalid (due to a reorg) - /// - /// Every time a block is received the tracking conditions are checked against the monitored [TransactionTracker]s and - /// data deletion is performed accordingly. Moreover, lack of confirmations is check for the tracked transactions and - /// rebroadcasting is performed for those that have missed too many. - fn block_connected(&self, block: &bitcoin::Block, height: u32) { + /// See [Responder::block_connected] + fn block_connected_helper( + &self, + block: &bitcoin::Block, + height: u32, + ) -> Option> { log::info!("New block received: {}", block.header.block_hash()); - self.carrier.lock().unwrap().update_height(height); + self.carrier.update_height(height); + let mut join_handle_option: Option> = None; if self.trackers.lock().unwrap().len() > 0 { // Complete those appointments that are due at this height @@ -521,34 +548,64 @@ impl chain::Listen for Responder { DeletionReason::Outdated, ); - // Rebroadcast those transactions that need to - let (_, rejected_trackers) = self.rebroadcast(self.get_txs_to_rebroadcast(height)); - // Delete trackers rejected during rebroadcast - let trackers_to_delete_gk = rejected_trackers - .iter() - .map(|uuid| (*uuid, self.trackers.lock().unwrap()[uuid].user_id)) - .collect(); - self.delete_trackers( - &rejected_trackers, - &self - .gatekeeper - .delete_appointments_from_memory(&trackers_to_delete_gk), - DeletionReason::Rejected, - ); + // Aynchronously rebroadcast those transactions that need to + let txs_to_rebroadcast = self.get_txs_to_rebroadcast(height); + let trackers = self.trackers.clone(); + let gatekeeper = self.gatekeeper.clone(); + let carrier = self.carrier.clone(); + let dbm = self.dbm.clone(); + let tx_tracker_map = self.tx_tracker_map.clone(); + join_handle_option = Some(tokio::spawn(async move { + let (_, rejected_trackers) = + Responder::rebroadcast(txs_to_rebroadcast, trackers.clone(), carrier.clone()) + .await; + // Delete trackers rejected during rebroadcast + let trackers_to_delete_gk = rejected_trackers + .iter() + .map(|uuid| (*uuid, trackers.lock().unwrap()[uuid].user_id)) + .collect(); + Responder::delete_trackers_no_self( + &rejected_trackers, + &gatekeeper.delete_appointments_from_memory(&trackers_to_delete_gk), + DeletionReason::Rejected, + dbm, + trackers.clone(), + tx_tracker_map, + ); - // Remove all receipts created in this block - self.carrier.lock().unwrap().clear_receipts(); + // Remove all receipts created in this block + carrier.clear_receipts(); - if self.trackers.lock().unwrap().is_empty() { - log::info!("No more pending trackers"); - } + if trackers.lock().unwrap().is_empty() { + log::info!("No more pending trackers"); + } + })); } + + join_handle_option + } +} + +/// Listen implementation by the [Responder]. Handles monitoring and reorgs. +impl chain::Listen for Responder { + /// Handles the monitoring process by the [Responder]. + /// + /// Watching is performed in a per-block basis. A [TransactionTracker] is tracked until: + /// - It gets [irrevocably resolved](https://github.com/lightning/bolts/blob/master/05-onchain.md#general-nomenclature) or + /// - The user subscription expires + /// - The trackers becomes invalid (due to a reorg) + /// + /// Every time a block is received the tracking conditions are checked against the monitored [TransactionTracker]s and + /// data deletion is performed accordingly. Moreover, lack of confirmations is check for the tracked transactions and + /// rebroadcasting is performed for those that have missed too many. + fn block_connected(&self, block: &bitcoin::Block, height: u32) { + self.block_connected_helper(block, height); } /// Handles reorgs in the [Responder]. fn block_disconnected(&self, header: &BlockHeader, height: u32) { log::warn!("Block disconnected: {}", header.block_hash()); - self.carrier.lock().unwrap().update_height(height); + self.carrier.update_height(height); for tracker in self.trackers.lock().unwrap().values_mut() { // The transaction has been unconfirmed. Flag it as reorged out so we can rebroadcast it. @@ -566,6 +623,7 @@ mod tests { use std::ops::Deref; use std::sync::{Arc, Mutex}; + use tokio::sync::Notify; use crate::dbm::{Error as DBError, DBM}; use crate::gatekeeper::UserInfo; @@ -589,8 +647,8 @@ mod tests { &self.trackers } - pub(crate) fn get_carrier(&self) -> &Mutex { - &self.carrier + pub(crate) fn get_carrier(&self) -> Arc { + self.carrier.clone() } pub(crate) fn add_random_tracker(&self, uuid: UUID, status: ConfirmationStatus) { @@ -626,7 +684,15 @@ mod tests { query: MockedServerQuery, ) -> Responder { let tip = chain.tip(); - Responder::new(create_carrier(query, tip.deref().height), gatekeeper, dbm) + Responder::new( + create_carrier( + query, + tip.deref().height, + Arc::new((Mutex::new(true), Notify::new())), + ), + gatekeeper, + dbm, + ) } fn init_responder_with_chain_and_dbm( @@ -722,8 +788,8 @@ mod tests { assert_eq!(responder, another_r); } - #[test] - fn test_handle_breach_delivered() { + #[tokio::test] + async fn test_handle_breach_delivered() { let start_height = START_HEIGHT as u32; let responder = init_responder(MockedServerQuery::Regular); @@ -735,7 +801,7 @@ mod tests { let penalty_txid = breach.penalty_tx.txid(); assert_eq!( - responder.handle_breach(uuid, breach, user_id), + responder.handle_breach(uuid, breach, user_id).await, ConfirmationStatus::InMempoolSince(start_height) ); assert!(responder.trackers.lock().unwrap().contains_key(&uuid)); @@ -753,7 +819,9 @@ mod tests { // passed twice, the receipt corresponding to the first breach will be handed back. let another_breach = get_random_breach(); assert_eq!( - responder.handle_breach(uuid, another_breach.clone(), user_id), + responder + .handle_breach(uuid, another_breach.clone(), user_id) + .await, ConfirmationStatus::InMempoolSince(start_height) ); @@ -769,8 +837,8 @@ mod tests { .contains_key(&another_breach.penalty_tx.txid())); } - #[test] - fn test_handle_breach_rejected() { + #[tokio::test] + async fn test_handle_breach_rejected() { let responder = init_responder(MockedServerQuery::Error( rpc_errors::RPC_VERIFY_ERROR as i64, )); @@ -781,7 +849,7 @@ mod tests { let penalty_txid = breach.penalty_tx.txid(); assert_eq!( - responder.handle_breach(uuid, breach, user_id), + responder.handle_breach(uuid, breach, user_id).await, ConfirmationStatus::Rejected(rpc_errors::RPC_VERIFY_ERROR) ); assert!(!responder.trackers.lock().unwrap().contains_key(&uuid)); @@ -1242,8 +1310,8 @@ mod tests { ); } - #[test] - fn test_rebroadcast_accepted() { + #[tokio::test] + async fn test_rebroadcast_accepted() { // This test positive rebroadcast cases, including reorgs. However, complex reorg logic is not tested here, it will need a // dedicated test (against bitcoind, not mocked). let responder = init_responder(MockedServerQuery::Regular); @@ -1300,15 +1368,19 @@ mod tests { } // Check all are accepted - let (accepted, rejected) = - responder.rebroadcast(responder.get_txs_to_rebroadcast(current_height)); + let (accepted, rejected) = Responder::rebroadcast( + responder.get_txs_to_rebroadcast(current_height), + responder.trackers.clone(), + responder.carrier.clone(), + ) + .await; let accepted_uuids: HashSet = accepted.keys().cloned().collect(); assert_eq!(accepted_uuids, need_rebroadcast); assert!(rejected.is_empty()); } - #[test] - fn test_rebroadcast_rejected() { + #[tokio::test] + async fn test_rebroadcast_rejected() { // This test negative rebroadcast cases, including reorgs. However, complex reorg logic is not tested here, it will need a // dedicated test (against bitcoind, not mocked). let responder = init_responder(MockedServerQuery::Error( @@ -1367,8 +1439,12 @@ mod tests { } // Check all are rejected - let (accepted, rejected) = - responder.rebroadcast(responder.get_txs_to_rebroadcast(current_height)); + let (accepted, rejected) = Responder::rebroadcast( + responder.get_txs_to_rebroadcast(current_height), + responder.trackers.clone(), + responder.carrier.clone(), + ) + .await; assert_eq!(rejected, need_rebroadcast); assert!(accepted.is_empty()); } @@ -1441,7 +1517,7 @@ mod tests { .unwrap(); // Delete trackers removes data from the trackers, tx_tracker_map maps, the database. The deletion of the later is - // better check in test_block_connected. Add data to the map first. + // better checked in test_block_connected. Add data to the map first. let mut all_trackers = HashSet::new(); let mut target_trackers = HashSet::new(); let mut uuid_txid_map = HashMap::new(); @@ -1553,8 +1629,8 @@ mod tests { } } - #[test] - fn test_block_connected() { + #[tokio::test] + async fn test_block_connected() { let dbm = Arc::new(Mutex::new(DBM::in_memory().unwrap())); let start_height = START_HEIGHT * 2; let mut chain = Blockchain::default().with_height(start_height); @@ -1708,30 +1784,27 @@ mod tests { // Add some dummy data in the cache to check that it gets cleared responder .carrier - .lock() - .unwrap() .get_issued_receipts() .insert(get_random_tx().txid(), ConfirmationStatus::ConfirmedIn(21)); // Connecting a block should trigger all the state transitions - responder.block_connected( + let join_handle_option = responder.block_connected_helper( &chain.generate(Some(just_confirmed_txs.clone())), chain.get_block_count(), ); + // Ensure a join handle was created, then join the async task + assert!(!join_handle_option.is_none()); + match tokio::join!(join_handle_option.unwrap()).0 { + Ok(_) => (), + Err(_) => assert!(false), + }; + // CARRIER CHECKS - assert!(responder - .carrier - .lock() - .unwrap() - .get_issued_receipts() - .is_empty()); + assert!(responder.carrier.get_issued_receipts().is_empty()); // Check that the carrier last_known_block_height has been updated - assert_eq!( - responder.carrier.lock().unwrap().get_height(), - target_block_height - ); + assert_eq!(responder.carrier.get_height(), target_block_height); // COMPLETED TRACKERS CHECKS // Data should have been removed @@ -1852,7 +1925,7 @@ mod tests { ); // Check that the carrier block_height has been updated - assert_eq!(responder.carrier.lock().unwrap().get_height(), i); + assert_eq!(responder.carrier.get_height(), i); } // Check that all reorged trackers are still reorged diff --git a/teos/src/test_utils.rs b/teos/src/test_utils.rs index 41123e46..91f99aee 100644 --- a/teos/src/test_utils.rs +++ b/teos/src/test_utils.rs @@ -10,8 +10,9 @@ use rand::Rng; use std::convert::TryInto; use std::ops::Deref; -use std::sync::{Arc, Condvar, Mutex}; +use std::sync::{Arc, Mutex}; use std::thread; +use tokio::sync::Notify; use jsonrpc_http_server::jsonrpc_core::error::ErrorCode as JsonRpcErrorCode; use jsonrpc_http_server::jsonrpc_core::{Error as JsonRpcError, IoHandler, Params, Value}; @@ -405,15 +406,41 @@ pub(crate) enum MockedServerQuery { Error(i64), } -pub(crate) fn create_carrier(query: MockedServerQuery, height: u32) -> Carrier { +pub(crate) fn start_bitcoind(query: MockedServerQuery) -> BitcoindClient { + // Create a new bitcoind client/server let bitcoind_mock = match query { MockedServerQuery::Regular => BitcoindMock::new(MockOptions::empty()), MockedServerQuery::Error(x) => BitcoindMock::new(MockOptions::with_error(x)), }; - let bitcoin_cli = Arc::new(BitcoindClient::new(bitcoind_mock.url(), Auth::None).unwrap()); - let bitcoind_reachable = Arc::new((Mutex::new(true), Condvar::new())); + let bitcoin_cli = BitcoindClient::new(bitcoind_mock.url(), Auth::None).unwrap(); start_server(bitcoind_mock); + bitcoin_cli +} + +pub(crate) fn reset_carrier( + carrier: Arc, + query: MockedServerQuery, + height: u32, + bitcoind_reachable: bool, +) { + // Create a new bitcoind client/server + let bitcoin_cli = start_bitcoind(query); + + // Update the carrier with the new values + carrier.update_bitcoind_cli(bitcoin_cli); + carrier.update_height(height); + carrier.clear_receipts(); + let (lock, _) = &*carrier.get_bitcoind_reachable(); + let mut reachable = lock.lock().unwrap(); + *reachable = bitcoind_reachable; +} +pub(crate) fn create_carrier( + query: MockedServerQuery, + height: u32, + bitcoind_reachable: Arc<(Mutex, Notify)>, +) -> Carrier { + let bitcoin_cli = start_bitcoind(query); Carrier::new(bitcoin_cli, bitcoind_reachable, height) } @@ -423,8 +450,8 @@ pub(crate) fn create_responder( dbm: Arc>, server_url: &str, ) -> Responder { - let bitcoin_cli = Arc::new(BitcoindClient::new(server_url, Auth::None).unwrap()); - let bitcoind_reachable = Arc::new((Mutex::new(true), Condvar::new())); + let bitcoin_cli = BitcoindClient::new(server_url, Auth::None).unwrap(); + let bitcoind_reachable = Arc::new((Mutex::new(true), Notify::new())); let carrier = Carrier::new(bitcoin_cli, bitcoind_reachable, tip.deref().height); Responder::new(carrier, gatekeeper, dbm) @@ -506,7 +533,7 @@ pub(crate) async fn create_api_with_config(api_config: ApiConfig) -> Arc>, + appointments: Arc>>, /// A map between [Locator]s (user identifiers for [Appointment]s) and [UUID]s (tower identifiers). - locator_uuid_map: Mutex>>, + locator_uuid_map: Arc>>>, /// A cache of the [Locator]s computed for the transactions in the last few blocks. - locator_cache: Mutex, + locator_cache: Arc>, /// A [Responder] instance. Data will be passed to it once triggered (if valid). responder: Arc, /// A [Gatekeeper] instance. Data regarding users is requested to it. @@ -280,9 +281,9 @@ impl Watcher { } Watcher { - appointments: Mutex::new(appointments), - locator_uuid_map: Mutex::new(locator_uuid_map), - locator_cache: Mutex::new(LocatorCache::new(last_n_blocks)), + appointments: Arc::new(Mutex::new(appointments)), + locator_uuid_map: Arc::new(Mutex::new(locator_uuid_map)), + locator_cache: Arc::new(Mutex::new(LocatorCache::new(last_n_blocks))), responder, gatekeeper, last_known_block_height: AtomicU32::new(last_known_block_height), @@ -322,7 +323,10 @@ impl Watcher { &self, appointment: Appointment, user_signature: String, - ) -> Result<(AppointmentReceipt, u32, u32), AddAppointmentFailure> { + ) -> Result< + (AppointmentReceipt, u32, u32, Option>), + AddAppointmentFailure + > { let user_id = self .gatekeeper .authenticate_user(&appointment.serialize(), &user_signature) @@ -335,12 +339,12 @@ impl Watcher { return Err(AddAppointmentFailure::SubscriptionExpired(expiry)); } - let extended_appointment = ExtendedAppointment::new( + let extended_appointment = Arc::new(ExtendedAppointment::new( appointment, user_id, user_signature, self.last_known_block_height.load(Ordering::Acquire), - ); + )); let uuid = UUID::new(extended_appointment.locator(), user_id); @@ -354,19 +358,54 @@ impl Watcher { .add_update_appointment(user_id, uuid, &extended_appointment) .map_err(|_| AddAppointmentFailure::NotEnoughSlots)?; - // FIXME: There's an edge case here if store_triggered_appointment is called and bitcoind is unreachable. - // This will hang, the request will timeout but be accepted. However, the user will not be handed the receipt. - // This could be fixed adding a thread to take care of storing while the main thread returns the receipt. - // Not fixing this atm since working with threads that call self.method is surprisingly non-trivial. - match self - .locator_cache - .lock() - .unwrap() - .get_tx(extended_appointment.locator()) + // Store the appointment. If this appointment has been triggered, we'll + // need to contact bitcoind which could be unreachable. This would cause + // our current thread to hang and prevent the user from receiving a + // receipt. To remedy this, we define store_triggered_appointment as an + // async task. + // + // Implementing this requires accessing the MutexGuard<> associated + // with the LocatorCache inside a scope which is not shared with the + // async call to store_triggered_appointment. This is so the Mutex<> + // is not shared across async calls. In addition, the &Transaction + // protected by the Mutex<> only has a liftime as long as the + // MutexGuard<>. As such, the &Transaction must be cloned in order to + // outlive the MutexGuard<>. + let mut get_tx_opt: Option = None; + { + match self + .locator_cache + .lock() + .unwrap() + .get_tx(extended_appointment.locator()) + { + Some(dispute_tx) => { + get_tx_opt = Some(dispute_tx.clone()); + } + None => {} + }; + } + let mut store_triggered_apt_task_handle: Option> = None; + match get_tx_opt { // Appointments that were triggered in blocks held in the cache Some(dispute_tx) => { - self.store_triggered_appointment(uuid, &extended_appointment, user_id, dispute_tx); + let extended_appointment_clone = extended_appointment.clone(); + let dbm = self.dbm.clone(); + let responder = self.responder.clone(); + store_triggered_apt_task_handle = Some( + tokio::spawn(async move { + Watcher::store_triggered_appointment( + uuid, + extended_appointment_clone, + user_id, + &dispute_tx, + dbm, + responder + ) + .await; + }) + ); } // Regular appointments that have not been triggered (or, at least, not recently) None => { @@ -375,12 +414,12 @@ impl Watcher { }; let mut receipt = AppointmentReceipt::new( - extended_appointment.user_signature, + extended_appointment.user_signature.clone(), extended_appointment.start_block, ); receipt.sign(&self.signing_key); - Ok((receipt, available_slots, expiry)) + Ok((receipt, available_slots, expiry, store_triggered_apt_task_handle)) } /// Stores an appointment in the [Watcher] memory and into the database (or updates it if it already exists). @@ -439,12 +478,13 @@ impl Watcher { /// /// If the appointment is rejected by the [Responder] (i.e. for being invalid), the data is wiped /// from the database but the slot is not freed. - fn store_triggered_appointment( - &self, + async fn store_triggered_appointment( uuid: UUID, - appointment: &ExtendedAppointment, + appointment: Arc, user_id: UserId, dispute_tx: &Transaction, + dbm: Arc>, + responder: Arc, ) -> TriggeredAppointment { log::info!( "Trigger for locator {} found in cache", @@ -454,22 +494,24 @@ impl Watcher { Ok(penalty_tx) => { // Data needs to be added the database straightaway since appointments are // FKs to trackers. If handle breach fails, data will be deleted later. - self.dbm + dbm .lock() .unwrap() - .store_appointment(uuid, appointment) + .store_appointment(uuid, &appointment) .unwrap(); - if let ConfirmationStatus::Rejected(reason) = self.responder.handle_breach( + if let ConfirmationStatus::Rejected(reason) = responder.handle_breach( uuid, Breach::new(dispute_tx.clone(), penalty_tx), user_id, - ) { + ) + .await + { // DISCUSS: We could either free the slots or keep it occupied as if this was misbehavior. // Keeping it for now. log::warn!("Appointment bounced in the Responder. Reason: {:?}", reason); - self.dbm.lock().unwrap().remove_appointment(uuid); + dbm.lock().unwrap().remove_appointment(uuid); TriggeredAppointment::Rejected } else { log::info!("Appointment went straight to the Responder"); @@ -634,8 +676,21 @@ impl Watcher { /// The appointments are deleted from the appointments and locator_uuid_map maps. /// Logs a different message depending on whether the appointments have been outdated, invalid, or accepted. fn delete_appointments_from_memory(&self, uuids: &HashSet, reason: DeletionReason) { - let mut appointments = self.appointments.lock().unwrap(); - let mut locator_uuid_map = self.locator_uuid_map.lock().unwrap(); + Watcher::delete_appointments_from_memory_no_self( + uuids, + reason, + self.appointments.clone(), + self.locator_uuid_map.clone()) + } + + fn delete_appointments_from_memory_no_self( + uuids: &HashSet, + reason: DeletionReason, + appointments: Arc>>, + locator_uuid_map: Arc>>>, + ) { + let mut appointments_mut = appointments.lock().unwrap(); + let mut locator_uuid_map_mut = locator_uuid_map.lock().unwrap(); for uuid in uuids { match reason { @@ -651,16 +706,16 @@ impl Watcher { log::info!("{} accepted by the Responder. Deleting appointment", uuid) } }; - match appointments.remove(uuid) { + match appointments_mut.remove(uuid) { Some(appointment) => { - let appointments = locator_uuid_map.get_mut(&appointment.locator).unwrap(); + let appointments_mut = locator_uuid_map_mut.get_mut(&appointment.locator).unwrap(); - if appointments.len() == 1 { - locator_uuid_map.remove(&appointment.locator); + if appointments_mut.len() == 1 { + locator_uuid_map_mut.remove(&appointment.locator); log::info!("No more appointments for locator: {}", appointment.locator); } else { - appointments.remove(uuid); + appointments_mut.remove(uuid); } } None => { @@ -673,13 +728,20 @@ impl Watcher { /// Deletes appointments from memory and the database. fn delete_appointments( - &self, uuids: &HashSet, updated_users: &HashMap, reason: DeletionReason, + dbm: Arc>, + appointments: Arc>>, + locator_uuid_map: Arc>>>, ) { - self.delete_appointments_from_memory(uuids, reason); - self.dbm + Watcher::delete_appointments_from_memory_no_self( + uuids, + reason, + appointments, + locator_uuid_map + ); + dbm .lock() .unwrap() .batch_remove_appointments(uuids, updated_users); @@ -765,22 +827,13 @@ impl Watcher { Ok((subscription_info, locators)) } -} -/// Listen implementation by the [Watcher]. Handles monitoring and reorgs. -impl chain::Listen for Watcher { - /// Handles the monitoring process by the [Watcher]. - /// - /// Watching is performed in a per-block basis. Therefore, a breach is only considered (and detected) if seen - /// in a block. - /// - /// Every time a new block is received a list of all potential locators is computed using the transaction data. - /// Then, the potential locators are checked against the data being monitored by the [Watcher] and passed to the - /// [Responder] if valid. Otherwise data is removed from the tower. - /// - /// This also takes care of updating the [LocatorCache] and removing outdated data from the [Watcher] when - /// told by the [Gatekeeper]. - fn block_connected(&self, block: &Block, height: u32) { + /// See [Watcher::block_connected] + fn block_connected_helper( + &self, + block: &Block, + height: u32 + ) -> Option> { log::info!("New block received: {}", block.header.block_hash()); let locator_tx_map = block @@ -794,6 +847,7 @@ impl chain::Listen for Watcher { .unwrap() .update(block.header, &locator_tx_map); + let mut join_handle_option: Option> = None; if !self.appointments.lock().unwrap().is_empty() { // Start by removing outdated data so it is not taken into account from this point on self.delete_appointments_from_memory( @@ -806,50 +860,87 @@ impl chain::Listen for Watcher { self.filter_breaches(self.get_breaches(locator_tx_map)); // Send data to the Responder - let mut appointments_to_delete = HashSet::from_iter(invalid_breaches.into_keys()); - let mut delivered_appointments = HashSet::new(); - for (uuid, breach) in valid_breaches { - log::info!( - "Notifying Responder and deleting appointment (uuid: {})", - uuid - ); + let responder = self.responder.clone(); + let appointments = self.appointments.clone(); + let locator_uuid_map = self.locator_uuid_map.clone(); + let gatekeeper = self.gatekeeper.clone(); + let dbm = self.dbm.clone(); + join_handle_option = Some(tokio::spawn(async move { + let mut appointments_to_delete = HashSet::from_iter( + invalid_breaches.into_keys()); + let mut delivered_appointments = HashSet::new(); + for (uuid, breach) in valid_breaches { + log::info!( + "Notifying Responder and deleting appointment (uuid: {})", + uuid + ); - if let ConfirmationStatus::Rejected(_) = self.responder.handle_breach( - uuid, - breach, - self.appointments.lock().unwrap()[&uuid].user_id, - ) { - appointments_to_delete.insert(uuid); - } else { - delivered_appointments.insert(uuid); + let appointment_summary = appointments.lock().unwrap()[&uuid].clone(); + if let ConfirmationStatus::Rejected(_) = responder.handle_breach( + uuid, + breach, + appointment_summary.user_id, + ) + .await + { + appointments_to_delete.insert(uuid); + } else { + delivered_appointments.insert(uuid); + } } - } - // Delete data - let appointments_to_delete_gatekeeper = { - let appointments = self.appointments.lock().unwrap(); - appointments_to_delete - .iter() - .map(|uuid| (*uuid, appointments[uuid].user_id)) - .collect() - }; - self.delete_appointments_from_memory(&delivered_appointments, DeletionReason::Accepted); - self.delete_appointments( - &appointments_to_delete, - &self - .gatekeeper - .delete_appointments_from_memory(&appointments_to_delete_gatekeeper), - DeletionReason::Invalid, - ); + // Delete data + let appointments_to_delete_gatekeeper = { + let appointments = appointments.lock().unwrap(); + appointments_to_delete + .iter() + .map(|uuid| (*uuid, appointments[uuid].user_id)) + .collect() + }; + Watcher::delete_appointments_from_memory_no_self( + &delivered_appointments, + DeletionReason::Accepted, + appointments.clone(), + locator_uuid_map.clone()); + Watcher::delete_appointments( + &appointments_to_delete, + &gatekeeper + .delete_appointments_from_memory(&appointments_to_delete_gatekeeper), + DeletionReason::Invalid, + dbm, + appointments.clone(), + locator_uuid_map + ); - if self.appointments.lock().unwrap().is_empty() { - log::info!("No more pending appointments"); - } + if appointments.lock().unwrap().is_empty() { + log::info!("No more pending appointments"); + } + })); } // Update last known block self.last_known_block_height .store(height, Ordering::Release); + + join_handle_option + } +} + +/// Listen implementation by the [Watcher]. Handles monitoring and reorgs. +impl chain::Listen for Watcher { + /// Handles the monitoring process by the [Watcher]. + /// + /// Watching is performed in a per-block basis. Therefore, a breach is only considered (and detected) if seen + /// in a block. + /// + /// Every time a new block is received a list of all potential locators is computed using the transaction data. + /// Then, the potential locators are checked against the data being monitored by the [Watcher] and passed to the + /// [Responder] if valid. Otherwise data is removed from the tower. + /// + /// This also takes care of updating the [LocatorCache] and removing outdated data from the [Watcher] when + /// told by the [Gatekeeper]. + fn block_connected(&self, block: &Block, height: u32) { + self.block_connected_helper(block, height); } /// Handle reorgs in the [Watcher]. @@ -873,10 +964,10 @@ mod tests { use crate::responder::ConfirmationStatus; use crate::rpc_errors; use crate::test_utils::{ - create_carrier, create_responder, create_watcher, generate_dummy_appointment, + create_responder, create_watcher, generate_dummy_appointment, generate_dummy_appointment_with_user, generate_uuid, get_last_n_blocks, get_random_breach, - get_random_tx, store_appointment_and_fks_to_db, BitcoindMock, Blockchain, MockOptions, - MockedServerQuery, DURATION, EXPIRY_DELTA, SLOTS, START_HEIGHT, + get_random_tx, reset_carrier, store_appointment_and_fks_to_db, BitcoindMock, Blockchain, + MockOptions, MockedServerQuery, DURATION, EXPIRY_DELTA, SLOTS, START_HEIGHT, }; use teos_common::cryptography::{get_random_bytes, get_random_keypair}; @@ -1145,10 +1236,11 @@ mod tests { // Add the appointment for a new user (twice so we can check that updates work) for _ in 0..2 { let user_sig = cryptography::sign(&appointment.serialize(), &user_sk).unwrap(); - let (receipt, slots, expiry) = watcher + let (receipt, slots, expiry, join_handle_option) = watcher .add_appointment(appointment.clone(), user_sig.clone()) .unwrap(); - + + assert!(join_handle_option.is_none()); assert_appointment_added(slots, SLOTS - 1, expiry, receipt, &user_sig, tower_id); } @@ -1158,10 +1250,11 @@ mod tests { watcher.register(user2_id).unwrap(); let user2_sig = cryptography::sign(&appointment.serialize(), &user2_sk).unwrap(); - let (receipt, slots, expiry) = watcher + let (receipt, slots, expiry, join_handle_option) = watcher .add_appointment(appointment.clone(), user2_sig.clone()) .unwrap(); + assert!(join_handle_option.is_none()); assert_appointment_added(slots, SLOTS - 1, expiry, receipt, &user2_sig, tower_id); // There should be now two appointments in the Watcher and the same locator should have two different uuids @@ -1183,9 +1276,11 @@ mod tests { let (uuid, triggered_appointment) = generate_dummy_appointment_with_user(user_id, None); let signature = cryptography::sign(&triggered_appointment.inner.serialize(), &user_sk).unwrap(); - watcher + let (_, _, _, join_handle_option) = watcher .add_appointment(triggered_appointment.inner.clone(), signature.clone()) .unwrap(); + + assert!(join_handle_option.is_none()); let breach = get_random_breach(); watcher.responder.add_tracker( @@ -1207,9 +1302,14 @@ mod tests { generate_dummy_appointment_with_user(user_id, Some(&dispute_tx.txid())); let user_sig = cryptography::sign(&appointment_in_cache.inner.serialize(), &user_sk).unwrap(); - let (receipt, slots, expiry) = watcher + let (receipt, slots, expiry, join_handle_option) = watcher .add_appointment(appointment_in_cache.inner.clone(), user_sig.clone()) .unwrap(); + assert!(!join_handle_option.is_none()); + match join_handle_option.unwrap().await { + Ok(_) => (), + Err(_) => assert!(false) + }; // The appointment should have been accepted, slots should have been decreased, and data should have been deleted from // the Watcher's memory. Moreover, a new tracker should be found in the Responder @@ -1240,9 +1340,14 @@ mod tests { invalid_appointment.inner.encrypted_blob.reverse(); let user_sig = cryptography::sign(&invalid_appointment.inner.serialize(), &user_sk).unwrap(); - let (receipt, slots, expiry) = watcher + let (receipt, slots, expiry, join_handle_option) = watcher .add_appointment(invalid_appointment.inner.clone(), user_sig.clone()) .unwrap(); + assert!(!join_handle_option.is_none()); + match join_handle_option.unwrap().await { + Ok(_) => (), + Err(_) => assert!(false) + }; assert_appointment_added(slots, SLOTS - 4, expiry, receipt, &user_sig, tower_id); assert_eq!(watcher.appointments.lock().unwrap().len(), 3); @@ -1259,17 +1364,24 @@ mod tests { // Transaction rejected // Update the Responder with a new Carrier - *watcher.responder.get_carrier().lock().unwrap() = create_carrier( + reset_carrier( + watcher.responder.get_carrier(), MockedServerQuery::Error(rpc_errors::RPC_VERIFY_ERROR as i64), chain.tip().deref().height, + true ); let dispute_tx = &tip_txs[tip_txs.len() - 2]; let invalid_appointment = generate_dummy_appointment(Some(&dispute_tx.txid())).inner; let user_sig = cryptography::sign(&invalid_appointment.serialize(), &user_sk).unwrap(); - let (receipt, slots, expiry) = watcher + let (receipt, slots, expiry, join_handle_option) = watcher .add_appointment(invalid_appointment, user_sig.clone()) .unwrap(); + assert!(!join_handle_option.is_none()); + match join_handle_option.unwrap().await { + Ok(_) => (), + Err(_) => assert!(false) + }; assert_appointment_added(slots, SLOTS - 4, expiry, receipt, &user_sig, tower_id); assert_eq!(watcher.appointments.lock().unwrap().len(), 3); @@ -1342,6 +1454,90 @@ mod tests { )); } + #[tokio::test] + async fn test_add_appointment_bitcoind_unreachable() { + // Create the blockchain and watcher. Immediately replace the carrier + // so we have control over whether bitcoind is reachable + let mut chain = Blockchain::default().with_height_and_txs(START_HEIGHT, 10); + let tip_txs = chain.blocks.last().unwrap().txdata.clone(); + let watcher = init_watcher(&mut chain).await; + let bitcoind_reachable = watcher + .responder + .get_carrier() + .get_bitcoind_reachable(); + + // Generate the tower id + let tower_id: UserId = UserId(PublicKey::from_secret_key( + &Secp256k1::new(), + &watcher.signing_key, + )); + + // Create and register a user + let (user_sk, user_pk) = get_random_keypair(); + let user_id = UserId(user_pk); + watcher.register(user_id).unwrap(); + + + // Set bitcoind to be unreachable + let (lock, _) = &*bitcoind_reachable; + *lock.lock().unwrap() = false; + + // Try to add an appointment which has a trigger in the cache + let dispute_tx = tip_txs.last().unwrap(); + let (uuid, appointment_in_cache) = + generate_dummy_appointment_with_user(user_id, Some(&dispute_tx.txid())); + let user_sig = + cryptography::sign(&appointment_in_cache.inner.serialize(), &user_sk).unwrap(); + let (receipt, slots, expiry, join_handle_option) = watcher + .add_appointment(appointment_in_cache.inner.clone(), user_sig.clone()) + .unwrap(); + assert!(!join_handle_option.is_none()); + + // Nothing shouldve been added + assert!(matches!( + watcher.dbm.lock().unwrap().load_appointment(uuid), + Err(DBError::NotFound) + )); + assert!(matches!( + watcher.dbm.lock().unwrap().load_tracker(uuid), + Err(DBError::NotFound) + )); + + // Set bitcoind back to reachable and join the on the add_appointment + // handle + let bitcoind_reachable_clone = bitcoind_reachable.clone(); + let handle = tokio::spawn(async move { + let (lock, notify) = &*bitcoind_reachable_clone; + let mut reachable = lock.lock().unwrap(); + *reachable = true; + notify.notify_waiters(); + }); + match tokio::join!(join_handle_option.unwrap(), handle).0 { + Ok(_) => (), + Err(_) => assert!(false) + }; + + // The appointment should have been accepted, slots should have been decreased, and data should have been deleted from + // the Watcher's memory. Moreover, a new tracker should be found in the Responder + assert_appointment_added(slots, SLOTS - 1, expiry, receipt, &user_sig, tower_id); + assert!(!watcher + .locator_uuid_map + .lock() + .unwrap() + .contains_key(&appointment_in_cache.locator())); + assert!(watcher.responder.has_tracker(uuid)); + + // Check data was added to the database + assert!(matches!( + watcher.dbm.lock().unwrap().load_appointment(uuid), + Ok(ExtendedAppointment { .. }) + )); + assert!(matches!( + watcher.dbm.lock().unwrap().load_tracker(uuid), + Ok(TransactionTracker { .. }) + )); + } + #[tokio::test] async fn test_store_appointment() { let mut chain = Blockchain::default().with_height(START_HEIGHT); @@ -1416,10 +1612,19 @@ mod tests { let dispute_tx = get_random_tx(); let (uuid, appointment) = generate_dummy_appointment_with_user(user_id, Some(&dispute_tx.txid())); + let appointment_clone = Arc::new(appointment); // Valid triggered appointments should be accepted by the Responder assert_eq!( - watcher.store_triggered_appointment(uuid, &appointment, user_id, &dispute_tx), + Watcher::store_triggered_appointment( + uuid, + appointment_clone, + user_id, + &dispute_tx, + watcher.dbm.clone(), + watcher.responder.clone() + ) + .await, TriggeredAppointment::Accepted, ); // In this case the appointment is kept in the Responder and, therefore, in the database @@ -1431,15 +1636,26 @@ mod tests { // A properly formatted but invalid transaction should be rejected by the Responder // Update the Responder with a new Carrier that will reject the transaction - *watcher.responder.get_carrier().lock().unwrap() = create_carrier( + reset_carrier( + watcher.responder.get_carrier(), MockedServerQuery::Error(rpc_errors::RPC_VERIFY_ERROR as i64), chain.tip().deref().height, + true ); let dispute_tx = get_random_tx(); let (uuid, appointment) = generate_dummy_appointment_with_user(user_id, Some(&dispute_tx.txid())); + let appointment_clone = Arc::new(appointment); assert_eq!( - watcher.store_triggered_appointment(uuid, &appointment, user_id, &dispute_tx), + Watcher::store_triggered_appointment( + uuid, + appointment_clone.clone(), + user_id, + &dispute_tx, + watcher.dbm.clone(), + watcher.responder.clone() + ) + .await, TriggeredAppointment::Rejected, ); // In this case the appointment is not kept in the Responder nor in the database @@ -1454,7 +1670,15 @@ mod tests { // (the same applies to invalid formatted transactions) let uuid = generate_uuid(); assert_eq!( - watcher.store_triggered_appointment(uuid, &appointment, user_id, &get_random_tx()), + Watcher::store_triggered_appointment( + uuid, + appointment_clone, + user_id, + &get_random_tx(), + watcher.dbm.clone(), + watcher.responder.clone() + ) + .await, TriggeredAppointment::Invalid, ); // The appointment is not kept anywhere @@ -1780,10 +2004,13 @@ mod tests { } // The deletion reason does not matter here, it only changes the logged message when deleting data - watcher.delete_appointments( + Watcher::delete_appointments( &target_appointments, &updated_users, DeletionReason::Accepted, + watcher.dbm.clone(), + watcher.appointments.clone(), + watcher.locator_uuid_map.clone() ); // Only appointments in the target_appointments map should have been removed from @@ -1861,7 +2088,11 @@ mod tests { watcher.last_known_block_height.load(Ordering::Relaxed), chain.get_block_count() ); - watcher.block_connected(&chain.generate(None), chain.get_block_count() as u32); + let join_handle_option = watcher.block_connected_helper( + &chain.generate(None), + chain.get_block_count() as u32 + ); + assert!(join_handle_option.is_none()); assert_eq!( watcher.last_known_block_height.load(Ordering::Relaxed), chain.get_block_count() @@ -1922,7 +2153,15 @@ mod tests { .contains_key(&uuid2) ); - watcher.block_connected(&chain.generate(None), chain.get_block_count()); + let join_handle_option = watcher.block_connected_helper( + &chain.generate(None), + chain.get_block_count() + ); + assert!(!join_handle_option.is_none()); + match join_handle_option.unwrap().await { + Ok(_) => (), + Err(_) => assert!(false) + }; assert!(!watcher.appointments.lock().unwrap().contains_key(&uuid1)); assert!(!watcher.locator_uuid_map.lock().unwrap()[&appointment.locator()].contains(&uuid1)); @@ -1959,10 +2198,15 @@ mod tests { assert!(watcher.appointments.lock().unwrap().contains_key(&uuid)); - watcher.block_connected( + let join_handle_option = watcher.block_connected_helper( &chain.generate(Some(vec![dispute_tx])), chain.get_block_count(), ); + assert!(!join_handle_option.is_none()); + match join_handle_option.unwrap().await { + Ok(_) => (), + Err(_) => assert!(false) + }; // Data should have been moved to the Responder and kept in the Gatekeeper, since it is still part of the system. assert!(!watcher.appointments.lock().unwrap().contains_key(&uuid)); @@ -1996,15 +2240,22 @@ mod tests { watcher.add_appointment(appointment.inner, sig).unwrap(); // Set the carrier response - *watcher.responder.get_carrier().lock().unwrap() = create_carrier( + reset_carrier( + watcher.responder.get_carrier(), MockedServerQuery::Error(rpc_errors::RPC_VERIFY_ERROR as i64), chain.tip().deref().height, + true ); - watcher.block_connected( + let join_handle_option = watcher.block_connected_helper( &chain.generate(Some(vec![dispute_tx])), chain.get_block_count(), ); + assert!(!join_handle_option.is_none()); + match join_handle_option.unwrap().await { + Ok(_) => (), + Err(_) => assert!(false) + }; // Data should not be in the Responder, in the Watcher nor in the Gatekeeper assert!(!watcher.appointments.lock().unwrap().contains_key(&uuid)); @@ -2041,10 +2292,15 @@ mod tests { .add_appointment(appointment.inner.clone(), sig) .unwrap(); - watcher.block_connected( + let join_handle_option = watcher.block_connected_helper( &chain.generate(Some(vec![dispute_tx])), chain.get_block_count(), ); + assert!(!join_handle_option.is_none()); + match join_handle_option.unwrap().await { + Ok(_) => (), + Err(_) => assert!(false) + }; // Data has been wiped since it was invalid assert!(!watcher.appointments.lock().unwrap().contains_key(&uuid));