Skip to content

Commit

Permalink
Merge pull request #1332 from get10101/feat/async-app-wallet-refresh
Browse files Browse the repository at this point in the history
Ensure app wallet balance and history appear ready ASAP
  • Loading branch information
luckysori authored Sep 20, 2023
2 parents bebf296 + 8fb0324 commit 843f205
Show file tree
Hide file tree
Showing 6 changed files with 146 additions and 73 deletions.
2 changes: 2 additions & 0 deletions coordinator/src/bin/coordinator.rs
Original file line number Diff line number Diff line change
Expand Up @@ -139,6 +139,8 @@ async fn main() -> Result<()> {
});
}

std::thread::spawn(node.inner.sync_on_chain_wallet_periodically());

tokio::spawn({
let node = node.clone();
async move {
Expand Down
10 changes: 9 additions & 1 deletion crates/ln-dlc-node/src/ln_dlc_wallet.rs
Original file line number Diff line number Diff line change
Expand Up @@ -118,7 +118,15 @@ impl LnDlcWallet {
self.address_cache.read().clone()
}

pub fn update_address_cache(&self) -> Result<()> {
pub fn sync_and_update_address_cache(&self) -> Result<()> {
self.inner().sync()?;

self.update_address_cache()?;

Ok(())
}

fn update_address_cache(&self) -> Result<()> {
let address = self.inner().get_last_unused_address()?;
*self.address_cache.write() = address;

Expand Down
105 changes: 62 additions & 43 deletions crates/ln-dlc-node/src/node/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -432,19 +432,14 @@ where
self.listen_address,
)];

std::thread::spawn(sync_on_chain_wallet_periodically(
self.settings.clone(),
self.wallet.clone(),
));

std::thread::spawn(shadow_sync_periodically(
self.settings.clone(),
self.storage.clone(),
self.wallet.clone(),
self.channel_manager.clone(),
));

tokio::spawn(lightning_wallet_sync(
tokio::spawn(periodic_lightning_wallet_sync(
self.channel_manager.clone(),
self.chain_monitor.clone(),
self.settings.clone(),
Expand Down Expand Up @@ -522,6 +517,42 @@ where
)
.await
}

/// Returns a closure which triggers an on-chain sync and subsequently updates the address
/// cache, at an interval.
///
/// The task will loop at an interval determined by the node's [`LnDlcNodeSettings`].
///
/// Suitable for daemons such as the coordinator and the maker.
pub fn sync_on_chain_wallet_periodically(&self) -> impl Fn() {
let handle = tokio::runtime::Handle::current();
let settings = self.settings.clone();
let ln_dlc_wallet = self.wallet.clone();
move || loop {
if let Err(e) = ln_dlc_wallet.sync_and_update_address_cache() {
tracing::error!("Failed on-chain sync: {e:#}");
}

let interval = handle.block_on(async {
let guard = settings.read().await;
guard.on_chain_sync_interval
});

std::thread::sleep(interval);
}
}

pub fn sync_on_chain_wallet(&self) -> Result<()> {
self.wallet.sync_and_update_address_cache()
}

pub fn sync_lightning_wallet(&self) -> Result<()> {
lightning_wallet_sync(
&self.channel_manager,
&self.chain_monitor,
&self.esplora_client,
)
}
}

async fn update_fee_rate_estimates(
Expand Down Expand Up @@ -580,26 +611,15 @@ fn spawn_background_processor(
remote_handle
}

async fn lightning_wallet_sync(
async fn periodic_lightning_wallet_sync(
channel_manager: Arc<ChannelManager>,
chain_monitor: Arc<ChainMonitor>,
settings: Arc<RwLock<LnDlcNodeSettings>>,
esplora_client: Arc<EsploraSyncClient<Arc<TracingLogger>>>,
) {
loop {
let now = Instant::now();
let confirmables = vec![
&*channel_manager as &(dyn Confirm + Sync + Send),
&*chain_monitor as &(dyn Confirm + Sync + Send),
];
match esplora_client.sync(confirmables) {
Ok(()) => tracing::info!(
"Background sync of Lightning wallet finished in {}ms.",
now.elapsed().as_millis()
),
Err(e) => {
tracing::error!("Background sync of Lightning wallet failed: {e:#}")
}
if let Err(e) = lightning_wallet_sync(&channel_manager, &chain_monitor, &esplora_client) {
tracing::error!("Background sync of Lightning wallet failed: {e:#}")
}

let interval = {
Expand All @@ -610,6 +630,28 @@ async fn lightning_wallet_sync(
}
}

fn lightning_wallet_sync(
channel_manager: &ChannelManager,
chain_monitor: &ChainMonitor,
esplora_client: &EsploraSyncClient<Arc<TracingLogger>>,
) -> Result<()> {
let now = Instant::now();
let confirmables = vec![
channel_manager as &(dyn Confirm + Sync + Send),
chain_monitor as &(dyn Confirm + Sync + Send),
];
esplora_client
.sync(confirmables)
.context("Lightning wallet sync failed")?;

tracing::info!(
"Lightning wallet sync finished in {}ms.",
now.elapsed().as_millis()
);

Ok(())
}

fn shadow_sync_periodically<S: Storage + Sync + Send + 'static>(
settings: Arc<RwLock<LnDlcNodeSettings>>,
node_storage: Arc<S>,
Expand All @@ -636,29 +678,6 @@ fn shadow_sync_periodically<S: Storage + Sync + Send + 'static>(
}
}

fn sync_on_chain_wallet_periodically(
settings: Arc<RwLock<LnDlcNodeSettings>>,
ln_dlc_wallet: Arc<LnDlcWallet>,
) -> impl Fn() {
let handle = tokio::runtime::Handle::current();
move || loop {
if let Err(e) = ln_dlc_wallet.inner().sync() {
tracing::error!("Failed on-chain sync: {e:#}");
}

if let Err(e) = ln_dlc_wallet.update_address_cache() {
tracing::warn!("Failed to update address cache: {e:#}");
}

let interval = handle.block_on(async {
let guard = settings.read().await;
guard.on_chain_sync_interval
});

std::thread::sleep(interval);
}
}

fn spawn_connection_management(
peer_manager: Arc<PeerManager>,
listen_address: SocketAddr,
Expand Down
2 changes: 2 additions & 0 deletions maker/src/bin/maker.rs
Original file line number Diff line number Diff line change
Expand Up @@ -90,6 +90,8 @@ async fn main() -> Result<()> {
let event_handler = EventHandler::new(node.clone());
let _running_node = node.start(event_handler)?;

std::thread::spawn(node.sync_on_chain_wallet_periodically());

let (health, health_tx) = health::Health::new();

let node_pubkey = node.info.pubkey;
Expand Down
16 changes: 7 additions & 9 deletions mobile/lib/main.dart
Original file line number Diff line number Diff line change
Expand Up @@ -259,7 +259,6 @@ class _TenTenOneAppState extends State<TenTenOneApp> {
final orderChangeNotifier = context.read<OrderChangeNotifier>();
final positionChangeNotifier = context.read<PositionChangeNotifier>();
final candlestickChangeNotifier = context.read<CandlestickChangeNotifier>();
final walletChangeNotifier = context.read<WalletChangeNotifier>();

try {
setupRustLogging();
Expand All @@ -269,16 +268,15 @@ class _TenTenOneAppState extends State<TenTenOneApp> {
await runBackend(config);
FLog.info(text: "Backend started");

await orderChangeNotifier.initialize();
await positionChangeNotifier.initialize();
await candlestickChangeNotifier.initialize();
orderChangeNotifier.initialize();
positionChangeNotifier.initialize();
candlestickChangeNotifier.initialize();

await logAppSettings(config);
logAppSettings(config);

final lastLogin = await rust.api.updateLastLogin();
FLog.debug(text: "Last login was at ${lastLogin.date}");

await walletChangeNotifier.refreshWalletInfo();
rust.api
.updateLastLogin()
.then((lastLogin) => FLog.debug(text: "Last login was at ${lastLogin.date}"));
} on FfiException catch (error) {
FLog.error(text: "Failed to initialise: Error: ${error.message}", exception: error);
} catch (error) {
Expand Down
84 changes: 64 additions & 20 deletions mobile/native/src/ln_dlc/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -76,6 +76,7 @@ pub use channel_status::ChannelStatus;
const PROCESS_INCOMING_DLC_MESSAGES_INTERVAL: Duration = Duration::from_millis(200);
const UPDATE_WALLET_HISTORY_INTERVAL: Duration = Duration::from_secs(5);
const CHECK_OPEN_ORDERS_INTERVAL: Duration = Duration::from_secs(60);
const ON_CHAIN_SYNC_INTERVAL: Duration = Duration::from_secs(300);

/// The weight estimate of the funding transaction
///
Expand All @@ -89,12 +90,32 @@ pub const FUNDING_TX_WEIGHT_ESTIMATE: u64 = 220;
static NODE: Storage<Arc<Node>> = Storage::new();
static SEED: Storage<Bip39Seed> = Storage::new();

/// Trigger an on-chain sync followed by an update to the wallet balance and history.
///
/// We do not wait for the triggered task to finish, because the effect will be reflected
/// asynchronously on the UI.
pub async fn refresh_wallet_info() -> Result<()> {
let node = NODE.try_get().context("failed to get ln dlc node")?;
let wallet = node.inner.wallet();

spawn_blocking(move || wallet.sync()).await??;
keep_wallet_balance_and_history_up_to_date(node)?;
// Spawn into the blocking thread pool of the dedicated backend runtime to avoid blocking the UI
// thread.
let runtime = get_or_create_tokio_runtime()?;
runtime.spawn_blocking(move || {
if let Err(e) = wallet.sync() {
tracing::error!("Manually triggered on-chain sync failed: {e:#}");
}

if let Err(e) = node.inner.sync_lightning_wallet() {
tracing::error!("Manually triggered Lightning wallet sync failed: {e:#}");
}

if let Err(e) = keep_wallet_balance_and_history_up_to_date(node) {
tracing::error!("Failed to keep wallet history up to date: {e:#}");
}

anyhow::Ok(())
});

Ok(())
}
Expand Down Expand Up @@ -224,6 +245,47 @@ pub fn run(data_dir: String, seed_dir: String, runtime: &Runtime) -> Result<()>
let _running = node.start(event_handler)?;
let node = Arc::new(Node::new(node, _running));

// Refresh the wallet balance and history eagerly so that it can complete before the
// triggering the first on-chain sync. This ensures that the UI appears ready as soon as
// possible.
//
// TODO: This might not be necessary once we rewrite the on-chain wallet with bdk:1.0.0.
spawn_blocking({
let node = node.clone();
move || keep_wallet_balance_and_history_up_to_date(&node)
})
.await
.expect("task to complete")?;

runtime.spawn({
let node = node.clone();
async move {
loop {
tokio::time::sleep(UPDATE_WALLET_HISTORY_INTERVAL).await;

let node = node.clone();
if let Err(e) =
spawn_blocking(move || keep_wallet_balance_and_history_up_to_date(&node))
.await
.expect("To spawn blocking task")
{
tracing::error!("Failed to sync balance and wallet history: {e:#}");
}
}
}
});

std::thread::spawn({
let node = node.clone();
move || loop {
if let Err(e) = node.inner.sync_on_chain_wallet() {
tracing::error!("Failed on-chain sync: {e:#}");
}

std::thread::sleep(ON_CHAIN_SYNC_INTERVAL);
}
});

runtime.spawn({
let node = node.clone();
async move { node.listen_for_lightning_events(event_receiver).await }
Expand All @@ -247,24 +309,6 @@ pub fn run(data_dir: String, seed_dir: String, runtime: &Runtime) -> Result<()>
}
});

runtime.spawn({
let node = node.clone();
async move {
loop {
let node = node.clone();
if let Err(e) =
spawn_blocking(move || keep_wallet_balance_and_history_up_to_date(&node))
.await
.expect("To spawn blocking task")
{
tracing::error!("Failed to sync balance and wallet history: {e:#}");
}

tokio::time::sleep(UPDATE_WALLET_HISTORY_INTERVAL).await;
}
}
});

runtime.spawn(async move {
loop {
if let Err(e) = spawn_blocking(order::handler::check_open_orders)
Expand Down

0 comments on commit 843f205

Please sign in to comment.