Skip to content

Commit

Permalink
fix: Ensure that broadcast TX is always stored
Browse files Browse the repository at this point in the history
We needed to align the implementations of the two traits where we
broadcast transactions. There were several places where we were not
storing broadcast transactions because of this.
  • Loading branch information
luckysori committed Aug 16, 2023
1 parent 179119c commit ee2d5e8
Show file tree
Hide file tree
Showing 8 changed files with 246 additions and 88 deletions.
20 changes: 10 additions & 10 deletions coordinator/src/db/transactions.rs
Original file line number Diff line number Diff line change
Expand Up @@ -54,21 +54,21 @@ pub(crate) fn upsert(tx: Transaction, conn: &mut PgConnection) -> Result<()> {
impl From<ln_dlc_node::transaction::Transaction> for Transaction {
fn from(value: ln_dlc_node::transaction::Transaction) -> Self {
Transaction {
txid: value.txid.to_string(),
fee: value.fee as i64,
created_at: value.created_at,
updated_at: value.updated_at,
txid: value.txid().to_string(),
fee: value.fee() as i64,
created_at: value.created_at(),
updated_at: value.updated_at(),
}
}
}

impl From<Transaction> for ln_dlc_node::transaction::Transaction {
fn from(value: Transaction) -> Self {
ln_dlc_node::transaction::Transaction {
txid: Txid::from_str(&value.txid).expect("valid txid"),
fee: value.fee as u64,
created_at: value.created_at,
updated_at: value.updated_at,
}
ln_dlc_node::transaction::Transaction::new(
Txid::from_str(&value.txid).expect("valid txid"),
value.fee as u64,
value.created_at,
value.updated_at,
)
}
}
146 changes: 134 additions & 12 deletions crates/ln-dlc-node/src/ldk_node_wallet.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
use crate::fee_rate_estimator::EstimateFeeRate;
use crate::node::Storage;
use anyhow::bail;
use anyhow::Context;
use anyhow::Error;
Expand Down Expand Up @@ -40,6 +41,7 @@ where
settings: RwLock<WalletSettings>,
fee_rate_estimator: Arc<F>,
locked_outpoints: Mutex<Vec<OutPoint>>,
node_storage: Arc<dyn Storage + Send + Sync + 'static>,
}

#[derive(Clone, Debug, Default)]
Expand All @@ -53,7 +55,12 @@ where
B: Blockchain,
F: EstimateFeeRate,
{
pub(crate) fn new(blockchain: B, wallet: bdk::Wallet<D>, fee_rate_estimator: Arc<F>) -> Self {
pub(crate) fn new(
blockchain: B,
wallet: bdk::Wallet<D>,
fee_rate_estimator: Arc<F>,
node_storage: Arc<dyn Storage + Send + Sync + 'static>,
) -> Self {
let inner = Mutex::new(wallet);
let settings = RwLock::new(WalletSettings::default());

Expand All @@ -63,6 +70,7 @@ where
settings,
fee_rate_estimator,
locked_outpoints: Mutex::new(vec![]),
node_storage,
}
}

Expand Down Expand Up @@ -226,9 +234,7 @@ where
psbt.extract_tx()
};

self.broadcast_transaction(&tx);

let txid = tx.txid();
let txid = self.broadcast_transaction(&tx)?;

if let Some(amount_sats) = amount_msat_or_drain {
tracing::info!(
Expand Down Expand Up @@ -270,6 +276,23 @@ where
let transaction_details = wallet_lock.get_tx(txid, false)?;
Ok(transaction_details)
}

#[autometrics]
pub fn broadcast_transaction(&self, tx: &Transaction) -> Result<Txid> {
let txid = tx.txid();

tracing::info!(%txid, raw_tx = %serialize_hex(&tx), "Broadcasting transaction");

self.blockchain
.broadcast(tx)
.with_context(|| format!("Failed to broadcast transaction {txid}"))?;

self.node_storage
.upsert_transaction(tx.into())
.with_context(|| format!("Failed to store transaction {txid}"))?;

Ok(txid)
}
}

impl<D, B, F> BroadcasterInterface for Wallet<D, B, F>
Expand All @@ -279,18 +302,18 @@ where
F: EstimateFeeRate,
{
fn broadcast_transaction(&self, tx: &Transaction) {
let txid = tx.txid();

tracing::info!(%txid, raw_tx = %serialize_hex(&tx), "Broadcasting transaction");

if let Err(err) = self.blockchain.broadcast(tx) {
tracing::error!("Failed to broadcast transaction: {err:#}");
if let Err(e) = self.broadcast_transaction(tx) {
tracing::error!(
txid = %tx.txid(),
"Error when broadcasting transaction: {e:#}"
);
}
}
}

#[cfg(test)]
pub mod tests {
use super::*;
use crate::fee_rate_estimator::EstimateFeeRate;
use crate::ldk_node_wallet::Wallet;
use anyhow::Result;
Expand Down Expand Up @@ -326,7 +349,12 @@ pub mod tests {
async fn wallet_with_two_utxo_should_be_able_to_fund_twice_but_not_three_times() {
let mut rng = thread_rng();
let test_wallet = new_test_wallet(&mut rng, Amount::from_btc(1.0).unwrap(), 2).unwrap();
let wallet = Wallet::new(DummyEsplora, test_wallet, Arc::new(DummyFeeRateEstimator));
let wallet = Wallet::new(
DummyEsplora,
test_wallet,
Arc::new(DummyFeeRateEstimator),
Arc::new(DummyNodeStorage),
);

let _ = wallet
.create_funding_transaction(
Expand Down Expand Up @@ -394,7 +422,6 @@ pub mod tests {
Ok(wallet)
}

struct DummyEsplora;
struct DummyFeeRateEstimator;

impl EstimateFeeRate for DummyFeeRateEstimator {
Expand All @@ -403,6 +430,8 @@ pub mod tests {
}
}

struct DummyEsplora;

impl WalletSync for DummyEsplora {
fn wallet_setup<D: BatchDatabase>(
&self,
Expand Down Expand Up @@ -444,4 +473,97 @@ pub mod tests {
unimplemented!()
}
}

struct DummyNodeStorage;

impl Storage for DummyNodeStorage {
fn insert_payment(
&self,
_payment_hash: lightning::ln::PaymentHash,
_info: crate::PaymentInfo,
) -> Result<()> {
unimplemented!();
}

fn merge_payment(
&self,
_payment_hash: &lightning::ln::PaymentHash,
_flow: crate::PaymentFlow,
_amt_msat: crate::MillisatAmount,
_htlc_status: crate::HTLCStatus,
_preimage: Option<lightning::ln::PaymentPreimage>,
_secret: Option<lightning::ln::PaymentSecret>,
) -> Result<()> {
unimplemented!();
}

fn get_payment(
&self,
_payment_hash: &lightning::ln::PaymentHash,
) -> Result<Option<(lightning::ln::PaymentHash, crate::PaymentInfo)>> {
unimplemented!();
}

fn all_payments(&self) -> Result<Vec<(lightning::ln::PaymentHash, crate::PaymentInfo)>> {
unimplemented!();
}

fn insert_spendable_output(
&self,
_descriptor: lightning::chain::keysinterface::SpendableOutputDescriptor,
) -> Result<()> {
unimplemented!();
}

fn get_spendable_output(
&self,
_outpoint: &lightning::chain::transaction::OutPoint,
) -> Result<Option<lightning::chain::keysinterface::SpendableOutputDescriptor>> {
unimplemented!();
}

fn delete_spendable_output(
&self,
_outpoint: &lightning::chain::transaction::OutPoint,
) -> Result<()> {
unimplemented!();
}

fn all_spendable_outputs(
&self,
) -> Result<Vec<lightning::chain::keysinterface::SpendableOutputDescriptor>> {
unimplemented!();
}

fn upsert_channel(&self, _channel: crate::channel::Channel) -> Result<()> {
unimplemented!();
}

fn get_channel(&self, _user_channel_id: &str) -> Result<Option<crate::channel::Channel>> {
unimplemented!();
}

fn get_channel_by_fake_scid(
&self,
_fake_scid: crate::channel::FakeScid,
) -> Result<Option<crate::channel::Channel>> {
unimplemented!();
}

fn all_non_pending_channels(&self) -> Result<Vec<crate::channel::Channel>> {
unimplemented!();
}

fn upsert_transaction(&self, _transaction: crate::transaction::Transaction) -> Result<()> {
unimplemented!();
}

fn get_transaction(&self, _txid: &str) -> Result<Option<crate::transaction::Transaction>> {
unimplemented!();
}

fn all_transactions_without_fees(&self) -> Result<Vec<crate::transaction::Transaction>> {
unimplemented!();
}
}
}
23 changes: 9 additions & 14 deletions crates/ln-dlc-node/src/ln_dlc_wallet.rs
Original file line number Diff line number Diff line change
Expand Up @@ -33,14 +33,12 @@ use parking_lot::RwLock;
use rust_bitcoin_coin_selection::select_coins;
use simple_wallet::WalletStorage;
use std::sync::Arc;
use time::OffsetDateTime;

/// This is a wrapper type introduced to be able to implement traits from `rust-dlc` on the
/// `ldk_node::LightningWallet`.
pub struct LnDlcWallet {
ln_wallet: Arc<ldk_node_wallet::Wallet<sled::Tree, EsploraBlockchain, FeeRateEstimator>>,
storage: Arc<SledStorageProvider>,
node_storage: Arc<dyn Storage + Send + Sync + 'static>,
secp: Secp256k1<All>,
seed: Bip39Seed,
network: Network,
Expand Down Expand Up @@ -74,6 +72,7 @@ impl LnDlcWallet {
blockchain,
on_chain_wallet,
fee_rate_estimator,
node_storage,
));

let last_unused_address = wallet
Expand All @@ -87,7 +86,6 @@ impl LnDlcWallet {
seed,
network,
address_cache: RwLock::new(last_unused_address),
node_storage,
}
}

Expand Down Expand Up @@ -142,7 +140,9 @@ impl LnDlcWallet {

impl Blockchain for LnDlcWallet {
fn send_transaction(&self, transaction: &Transaction) -> Result<(), Error> {
self.ln_wallet.broadcast_transaction(transaction);
self.ln_wallet
.broadcast_transaction(transaction)
.map_err(|e| Error::WalletError(e.into()))?;

Ok(())
}
Expand Down Expand Up @@ -309,16 +309,11 @@ impl dlc_manager::Wallet for LnDlcWallet {
impl BroadcasterInterface for LnDlcWallet {
#[autometrics]
fn broadcast_transaction(&self, tx: &Transaction) {
self.ln_wallet.broadcast_transaction(tx);
let transaction = crate::transaction::Transaction {
txid: tx.txid(),
fee: 0,
created_at: OffsetDateTime::now_utc(),
updated_at: OffsetDateTime::now_utc(),
};

if let Err(e) = self.node_storage.upsert_transaction(transaction) {
tracing::error!("Failed to create shadow transaction. Error: {e:#}");
if let Err(e) = self.ln_wallet.broadcast_transaction(tx) {
tracing::error!(
txid = %tx.txid(),
"Error when broadcasting transaction: {e:#}"
);
}
}
}
Expand Down
4 changes: 2 additions & 2 deletions crates/ln-dlc-node/src/node/storage.rs
Original file line number Diff line number Diff line change
Expand Up @@ -243,7 +243,7 @@ impl Storage for InMemoryStore {
// Transaction

fn upsert_transaction(&self, transaction: Transaction) -> Result<()> {
let txid = transaction.txid.to_string();
let txid = transaction.txid().to_string();
self.transactions_lock().insert(txid, transaction);
Ok(())
}
Expand All @@ -257,7 +257,7 @@ impl Storage for InMemoryStore {
Ok(self
.transactions_lock()
.values()
.filter(|t| t.fee == 0)
.filter(|t| t.fee() == 0)
.cloned()
.collect())
}
Expand Down
25 changes: 12 additions & 13 deletions crates/ln-dlc-node/src/shadow.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,9 +4,9 @@ use crate::ln_dlc_wallet::LnDlcWallet;
use crate::node::ChannelManager;
use crate::node::Storage;
use anyhow::Result;
use bdk::TransactionDetails;
use dlc_manager::subchannel::LNChannelManager;
use std::sync::Arc;
use time::OffsetDateTime;

pub struct Shadow<S> {
storage: Arc<S>,
Expand Down Expand Up @@ -51,18 +51,17 @@ where
let transactions = self.storage.all_transactions_without_fees()?;
tracing::debug!("Syncing {} shadow transactions", transactions.len());

for mut transaction in transactions.into_iter() {
let transaction_details = self
.ln_dlc_wallet
.inner()
.get_transaction(&transaction.txid)?;

transaction.fee = transaction_details
.map(|d| d.fee.unwrap_or_default())
.unwrap_or_default();
transaction.updated_at = OffsetDateTime::now_utc();

self.storage.upsert_transaction(transaction)?;
for transaction in transactions.iter() {
let txid = transaction.txid();
match self.ln_dlc_wallet.inner().get_transaction(&txid) {
Ok(Some(TransactionDetails { fee: Some(fee), .. })) => {
self.storage.upsert_transaction(transaction.with_fee(fee))?;
}
Ok(_) => {}
Err(e) => {
tracing::warn!(%txid, "Failed to get transaction details: {e:#}");
}
};
}
Ok(())
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -95,7 +95,7 @@ async fn open_jit_channel() {
.get_transaction(&channel.funding_txid.unwrap().to_string())
.unwrap()
.unwrap();
assert!(transaction.fee > 0);
assert!(transaction.fee() > 0);
}

#[tokio::test(flavor = "multi_thread")]
Expand Down
Loading

0 comments on commit ee2d5e8

Please sign in to comment.