Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merge issue43 hotfix #55

Open
wants to merge 4 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
9 changes: 5 additions & 4 deletions teos/src/api/internal.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
use std::sync::{Arc, Condvar, Mutex};
use std::sync::{Arc, Mutex};
use tokio::sync::Notify;
use tonic::{Code, Request, Response, Status};
use triggered::Trigger;

Expand All @@ -22,7 +23,7 @@ pub struct InternalAPI {
/// A [Watcher] instance.
watcher: Arc<Watcher>,
/// A flag that indicates wether bitcoind is reachable or not.
bitcoind_reachable: Arc<(Mutex<bool>, Condvar)>,
bitcoind_reachable: Arc<(Mutex<bool>, Notify)>,
/// A signal indicating the tower is shuting down.
shutdown_trigger: Trigger,
}
Expand All @@ -31,7 +32,7 @@ impl InternalAPI {
/// Creates a new [InternalAPI] instance.
pub fn new(
watcher: Arc<Watcher>,
bitcoind_reachable: Arc<(Mutex<bool>, Condvar)>,
bitcoind_reachable: Arc<(Mutex<bool>, Notify)>,
shutdown_trigger: Trigger,
) -> Self {
Self {
Expand Down Expand Up @@ -107,7 +108,7 @@ impl PublicTowerServices for Arc<InternalAPI> {
.watcher
.add_appointment(appointment, req_data.signature)
{
Ok((receipt, available_slots, subscription_expiry)) => {
Ok((receipt, available_slots, subscription_expiry, _)) => {
Ok(Response::new(msgs::AddAppointmentResponse {
locator: locator.serialize(),
start_block: receipt.start_block(),
Expand Down
559 changes: 337 additions & 222 deletions teos/src/carrier.rs

Large diffs are not rendered by default.

38 changes: 21 additions & 17 deletions teos/src/chain_monitor.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,8 +2,9 @@
//!

use std::ops::Deref;
use std::sync::{Arc, Condvar, Mutex};
use std::sync::{Arc, Mutex};
use std::time;
use tokio::sync::Notify;
use tokio::time::timeout;
use triggered::Listener;

Expand Down Expand Up @@ -35,7 +36,7 @@ where
/// A signal from the main thread indicating the tower is shuting down.
shutdown_signal: Listener,
/// A flag that indicates wether bitcoind is reachable or not.
bitcoind_reachable: Arc<(Mutex<bool>, Condvar)>,
bitcoind_reachable: Arc<(Mutex<bool>, Notify)>,
}

impl<'a, P, C, L> ChainMonitor<'a, P, C, L>
Expand All @@ -52,7 +53,7 @@ where
dbm: Arc<Mutex<DBM>>,
polling_delta_sec: u16,
shutdown_signal: Listener,
bitcoind_reachable: Arc<(Mutex<bool>, Condvar)>,
bitcoind_reachable: Arc<(Mutex<bool>, Notify)>,
) -> ChainMonitor<'a, P, C, L> {
ChainMonitor {
spv_client,
Expand Down Expand Up @@ -95,7 +96,7 @@ where
}
}
*reachable.lock().unwrap() = true;
notifier.notify_all();
notifier.notify_waiters();
}
Err(e) => match e.kind() {
BlockSourceErrorKind::Persistent => {
Expand Down Expand Up @@ -133,7 +134,6 @@ mod tests {
use std::cell::RefCell;
use std::collections::HashSet;
use std::iter::FromIterator;
use std::thread;

use bitcoin::network::constants::Network;
use bitcoin::BlockHash;
Expand Down Expand Up @@ -181,7 +181,7 @@ mod tests {
let poller = ChainPoller::new(&mut chain, Network::Bitcoin);
let cache = &mut UnboundedCache::new();
let spv_client = SpvClient::new(tip, poller, cache, &listener);
let bitcoind_reachable = Arc::new((Mutex::new(true), Condvar::new()));
let bitcoind_reachable = Arc::new((Mutex::new(true), Notify::new()));

let mut cm =
ChainMonitor::new(spv_client, tip, dbm, 1, shutdown_signal, bitcoind_reachable).await;
Expand All @@ -205,7 +205,7 @@ mod tests {
let poller = ChainPoller::new(&mut chain, Network::Bitcoin);
let cache = &mut UnboundedCache::new();
let spv_client = SpvClient::new(old_tip, poller, cache, &listener);
let bitcoind_reachable = Arc::new((Mutex::new(true), Condvar::new()));
let bitcoind_reachable = Arc::new((Mutex::new(true), Notify::new()));

let mut cm = ChainMonitor::new(
spv_client,
Expand Down Expand Up @@ -244,7 +244,7 @@ mod tests {
let poller = ChainPoller::new(&mut chain, Network::Bitcoin);
let cache = &mut UnboundedCache::new();
let spv_client = SpvClient::new(best_tip, poller, cache, &listener);
let bitcoind_reachable = Arc::new((Mutex::new(true), Condvar::new()));
let bitcoind_reachable = Arc::new((Mutex::new(true), Notify::new()));

let mut cm = ChainMonitor::new(
spv_client,
Expand Down Expand Up @@ -286,7 +286,7 @@ mod tests {
let poller = ChainPoller::new(&mut chain, Network::Bitcoin);
let cache = &mut UnboundedCache::new();
let spv_client = SpvClient::new(old_best, poller, cache, &listener);
let bitcoind_reachable = Arc::new((Mutex::new(true), Condvar::new()));
let bitcoind_reachable = Arc::new((Mutex::new(true), Notify::new()));

let mut cm = ChainMonitor::new(
spv_client,
Expand Down Expand Up @@ -325,7 +325,7 @@ mod tests {
let poller = ChainPoller::new(&mut chain, Network::Bitcoin);
let cache = &mut UnboundedCache::new();
let spv_client = SpvClient::new(tip, poller, cache, &listener);
let bitcoind_reachable = Arc::new((Mutex::new(true), Condvar::new()));
let bitcoind_reachable = Arc::new((Mutex::new(true), Notify::new()));

let mut cm = ChainMonitor::new(
spv_client,
Expand All @@ -342,12 +342,13 @@ mod tests {
let (reachable, _) = &*bitcoind_reachable.clone();
assert!(!*reachable.lock().unwrap());

// Set a thread to block on bitcoind unreachable to check that it gets notified once bitcoind comes back online
let t = thread::spawn(move || {
let (lock, notifier) = &*bitcoind_reachable;
let mut reachable = lock.lock().unwrap();
while !*reachable {
reachable = notifier.wait(reachable).unwrap();
// Set an async task to block on bitcoind unreachable to check that it gets notified once bitcoind comes back online
let join_handle = tokio::spawn(async move {
let (lock, notify) = &*bitcoind_reachable;
let mut reachable = *lock.lock().unwrap();
while !reachable {
notify.notified().await;
reachable = *lock.lock().unwrap();
}
});

Expand All @@ -357,6 +358,9 @@ mod tests {
assert!(*reachable.lock().unwrap());

// This would hang if the cm didn't notify their subscribers about the bitcoind status, so it serves as out assert.
t.join().unwrap();
match join_handle.await {
Ok(_) => (),
Err(_) => assert!(false),
};
}
}
17 changes: 8 additions & 9 deletions teos/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,8 +3,9 @@ use std::fs;
use std::io::ErrorKind;
use std::ops::{Deref, DerefMut};
use std::str::FromStr;
use std::sync::{Arc, Condvar, Mutex};
use std::sync::{Arc, Mutex};
use structopt::StructOpt;
use tokio::sync::Notify;
use tokio::task;
use tonic::transport::Server;

Expand Down Expand Up @@ -127,7 +128,7 @@ async fn main() {
{
Ok(client) => (
Arc::new(client),
Arc::new((Mutex::new(true), Condvar::new())),
Arc::new((Mutex::new(true), Notify::new())),
),
Err(e) => {
let e_msg = match e.kind() {
Expand All @@ -146,13 +147,11 @@ async fn main() {
} else {
""
};
let rpc = Arc::new(
Client::new(
&format!("{}{}:{}", schema, conf.btc_rpc_connect, conf.btc_rpc_port),
Auth::UserPass(conf.btc_rpc_user.clone(), conf.btc_rpc_password.clone()),
)
.unwrap(),
);
let rpc = Client::new(
&format!("{}{}:{}", schema, conf.btc_rpc_connect, conf.btc_rpc_port),
Auth::UserPass(conf.btc_rpc_user.clone(), conf.btc_rpc_password.clone()),
)
.unwrap();
let mut derefed = bitcoin_cli.deref();
// Load last known block from DB if found. Poll it from Bitcoind otherwise.
let tip = if let Ok(block_hash) = dbm.lock().unwrap().load_last_known_block() {
Expand Down
Loading