Skip to content

Commit

Permalink
Merge pull request #1106 from get10101/fix/tx-db-register
Browse files Browse the repository at this point in the history
Ensure that broadcast TXs are always stored
  • Loading branch information
luckysori authored Aug 18, 2023
2 parents a4005f5 + ee2d5e8 commit 0dfe631
Show file tree
Hide file tree
Showing 8 changed files with 246 additions and 88 deletions.
20 changes: 10 additions & 10 deletions coordinator/src/db/transactions.rs
Original file line number Diff line number Diff line change
Expand Up @@ -54,21 +54,21 @@ pub(crate) fn upsert(tx: Transaction, conn: &mut PgConnection) -> Result<()> {
impl From<ln_dlc_node::transaction::Transaction> for Transaction {
fn from(value: ln_dlc_node::transaction::Transaction) -> Self {
Transaction {
txid: value.txid.to_string(),
fee: value.fee as i64,
created_at: value.created_at,
updated_at: value.updated_at,
txid: value.txid().to_string(),
fee: value.fee() as i64,
created_at: value.created_at(),
updated_at: value.updated_at(),
}
}
}

impl From<Transaction> for ln_dlc_node::transaction::Transaction {
fn from(value: Transaction) -> Self {
ln_dlc_node::transaction::Transaction {
txid: Txid::from_str(&value.txid).expect("valid txid"),
fee: value.fee as u64,
created_at: value.created_at,
updated_at: value.updated_at,
}
ln_dlc_node::transaction::Transaction::new(
Txid::from_str(&value.txid).expect("valid txid"),
value.fee as u64,
value.created_at,
value.updated_at,
)
}
}
146 changes: 134 additions & 12 deletions crates/ln-dlc-node/src/ldk_node_wallet.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
use crate::fee_rate_estimator::EstimateFeeRate;
use crate::node::Storage;
use anyhow::bail;
use anyhow::Context;
use anyhow::Error;
Expand Down Expand Up @@ -40,6 +41,7 @@ where
settings: RwLock<WalletSettings>,
fee_rate_estimator: Arc<F>,
locked_outpoints: Mutex<Vec<OutPoint>>,
node_storage: Arc<dyn Storage + Send + Sync + 'static>,
}

#[derive(Clone, Debug, Default)]
Expand All @@ -53,7 +55,12 @@ where
B: Blockchain,
F: EstimateFeeRate,
{
pub(crate) fn new(blockchain: B, wallet: bdk::Wallet<D>, fee_rate_estimator: Arc<F>) -> Self {
pub(crate) fn new(
blockchain: B,
wallet: bdk::Wallet<D>,
fee_rate_estimator: Arc<F>,
node_storage: Arc<dyn Storage + Send + Sync + 'static>,
) -> Self {
let inner = Mutex::new(wallet);
let settings = RwLock::new(WalletSettings::default());

Expand All @@ -63,6 +70,7 @@ where
settings,
fee_rate_estimator,
locked_outpoints: Mutex::new(vec![]),
node_storage,
}
}

Expand Down Expand Up @@ -226,9 +234,7 @@ where
psbt.extract_tx()
};

self.broadcast_transaction(&tx);

let txid = tx.txid();
let txid = self.broadcast_transaction(&tx)?;

if let Some(amount_sats) = amount_msat_or_drain {
tracing::info!(
Expand Down Expand Up @@ -270,6 +276,23 @@ where
let transaction_details = wallet_lock.get_tx(txid, false)?;
Ok(transaction_details)
}

#[autometrics]
pub fn broadcast_transaction(&self, tx: &Transaction) -> Result<Txid> {
let txid = tx.txid();

tracing::info!(%txid, raw_tx = %serialize_hex(&tx), "Broadcasting transaction");

self.blockchain
.broadcast(tx)
.with_context(|| format!("Failed to broadcast transaction {txid}"))?;

self.node_storage
.upsert_transaction(tx.into())
.with_context(|| format!("Failed to store transaction {txid}"))?;

Ok(txid)
}
}

impl<D, B, F> BroadcasterInterface for Wallet<D, B, F>
Expand All @@ -279,18 +302,18 @@ where
F: EstimateFeeRate,
{
fn broadcast_transaction(&self, tx: &Transaction) {
let txid = tx.txid();

tracing::info!(%txid, raw_tx = %serialize_hex(&tx), "Broadcasting transaction");

if let Err(err) = self.blockchain.broadcast(tx) {
tracing::error!("Failed to broadcast transaction: {err:#}");
if let Err(e) = self.broadcast_transaction(tx) {
tracing::error!(
txid = %tx.txid(),
"Error when broadcasting transaction: {e:#}"
);
}
}
}

#[cfg(test)]
pub mod tests {
use super::*;
use crate::fee_rate_estimator::EstimateFeeRate;
use crate::ldk_node_wallet::Wallet;
use anyhow::Result;
Expand Down Expand Up @@ -326,7 +349,12 @@ pub mod tests {
async fn wallet_with_two_utxo_should_be_able_to_fund_twice_but_not_three_times() {
let mut rng = thread_rng();
let test_wallet = new_test_wallet(&mut rng, Amount::from_btc(1.0).unwrap(), 2).unwrap();
let wallet = Wallet::new(DummyEsplora, test_wallet, Arc::new(DummyFeeRateEstimator));
let wallet = Wallet::new(
DummyEsplora,
test_wallet,
Arc::new(DummyFeeRateEstimator),
Arc::new(DummyNodeStorage),
);

let _ = wallet
.create_funding_transaction(
Expand Down Expand Up @@ -394,7 +422,6 @@ pub mod tests {
Ok(wallet)
}

struct DummyEsplora;
struct DummyFeeRateEstimator;

impl EstimateFeeRate for DummyFeeRateEstimator {
Expand All @@ -403,6 +430,8 @@ pub mod tests {
}
}

struct DummyEsplora;

impl WalletSync for DummyEsplora {
fn wallet_setup<D: BatchDatabase>(
&self,
Expand Down Expand Up @@ -444,4 +473,97 @@ pub mod tests {
unimplemented!()
}
}

struct DummyNodeStorage;

impl Storage for DummyNodeStorage {
fn insert_payment(
&self,
_payment_hash: lightning::ln::PaymentHash,
_info: crate::PaymentInfo,
) -> Result<()> {
unimplemented!();
}

fn merge_payment(
&self,
_payment_hash: &lightning::ln::PaymentHash,
_flow: crate::PaymentFlow,
_amt_msat: crate::MillisatAmount,
_htlc_status: crate::HTLCStatus,
_preimage: Option<lightning::ln::PaymentPreimage>,
_secret: Option<lightning::ln::PaymentSecret>,
) -> Result<()> {
unimplemented!();
}

fn get_payment(
&self,
_payment_hash: &lightning::ln::PaymentHash,
) -> Result<Option<(lightning::ln::PaymentHash, crate::PaymentInfo)>> {
unimplemented!();
}

fn all_payments(&self) -> Result<Vec<(lightning::ln::PaymentHash, crate::PaymentInfo)>> {
unimplemented!();
}

fn insert_spendable_output(
&self,
_descriptor: lightning::chain::keysinterface::SpendableOutputDescriptor,
) -> Result<()> {
unimplemented!();
}

fn get_spendable_output(
&self,
_outpoint: &lightning::chain::transaction::OutPoint,
) -> Result<Option<lightning::chain::keysinterface::SpendableOutputDescriptor>> {
unimplemented!();
}

fn delete_spendable_output(
&self,
_outpoint: &lightning::chain::transaction::OutPoint,
) -> Result<()> {
unimplemented!();
}

fn all_spendable_outputs(
&self,
) -> Result<Vec<lightning::chain::keysinterface::SpendableOutputDescriptor>> {
unimplemented!();
}

fn upsert_channel(&self, _channel: crate::channel::Channel) -> Result<()> {
unimplemented!();
}

fn get_channel(&self, _user_channel_id: &str) -> Result<Option<crate::channel::Channel>> {
unimplemented!();
}

fn get_channel_by_fake_scid(
&self,
_fake_scid: crate::channel::FakeScid,
) -> Result<Option<crate::channel::Channel>> {
unimplemented!();
}

fn all_non_pending_channels(&self) -> Result<Vec<crate::channel::Channel>> {
unimplemented!();
}

fn upsert_transaction(&self, _transaction: crate::transaction::Transaction) -> Result<()> {
unimplemented!();
}

fn get_transaction(&self, _txid: &str) -> Result<Option<crate::transaction::Transaction>> {
unimplemented!();
}

fn all_transactions_without_fees(&self) -> Result<Vec<crate::transaction::Transaction>> {
unimplemented!();
}
}
}
23 changes: 9 additions & 14 deletions crates/ln-dlc-node/src/ln_dlc_wallet.rs
Original file line number Diff line number Diff line change
Expand Up @@ -33,14 +33,12 @@ use parking_lot::RwLock;
use rust_bitcoin_coin_selection::select_coins;
use simple_wallet::WalletStorage;
use std::sync::Arc;
use time::OffsetDateTime;

/// This is a wrapper type introduced to be able to implement traits from `rust-dlc` on the
/// `ldk_node::LightningWallet`.
pub struct LnDlcWallet {
ln_wallet: Arc<ldk_node_wallet::Wallet<sled::Tree, EsploraBlockchain, FeeRateEstimator>>,
storage: Arc<SledStorageProvider>,
node_storage: Arc<dyn Storage + Send + Sync + 'static>,
secp: Secp256k1<All>,
seed: Bip39Seed,
network: Network,
Expand Down Expand Up @@ -74,6 +72,7 @@ impl LnDlcWallet {
blockchain,
on_chain_wallet,
fee_rate_estimator,
node_storage,
));

let last_unused_address = wallet
Expand All @@ -87,7 +86,6 @@ impl LnDlcWallet {
seed,
network,
address_cache: RwLock::new(last_unused_address),
node_storage,
}
}

Expand Down Expand Up @@ -142,7 +140,9 @@ impl LnDlcWallet {

impl Blockchain for LnDlcWallet {
fn send_transaction(&self, transaction: &Transaction) -> Result<(), Error> {
self.ln_wallet.broadcast_transaction(transaction);
self.ln_wallet
.broadcast_transaction(transaction)
.map_err(|e| Error::WalletError(e.into()))?;

Ok(())
}
Expand Down Expand Up @@ -309,16 +309,11 @@ impl dlc_manager::Wallet for LnDlcWallet {
impl BroadcasterInterface for LnDlcWallet {
#[autometrics]
fn broadcast_transaction(&self, tx: &Transaction) {
self.ln_wallet.broadcast_transaction(tx);
let transaction = crate::transaction::Transaction {
txid: tx.txid(),
fee: 0,
created_at: OffsetDateTime::now_utc(),
updated_at: OffsetDateTime::now_utc(),
};

if let Err(e) = self.node_storage.upsert_transaction(transaction) {
tracing::error!("Failed to create shadow transaction. Error: {e:#}");
if let Err(e) = self.ln_wallet.broadcast_transaction(tx) {
tracing::error!(
txid = %tx.txid(),
"Error when broadcasting transaction: {e:#}"
);
}
}
}
Expand Down
4 changes: 2 additions & 2 deletions crates/ln-dlc-node/src/node/storage.rs
Original file line number Diff line number Diff line change
Expand Up @@ -243,7 +243,7 @@ impl Storage for InMemoryStore {
// Transaction

fn upsert_transaction(&self, transaction: Transaction) -> Result<()> {
let txid = transaction.txid.to_string();
let txid = transaction.txid().to_string();
self.transactions_lock().insert(txid, transaction);
Ok(())
}
Expand All @@ -257,7 +257,7 @@ impl Storage for InMemoryStore {
Ok(self
.transactions_lock()
.values()
.filter(|t| t.fee == 0)
.filter(|t| t.fee() == 0)
.cloned()
.collect())
}
Expand Down
25 changes: 12 additions & 13 deletions crates/ln-dlc-node/src/shadow.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,9 +4,9 @@ use crate::ln_dlc_wallet::LnDlcWallet;
use crate::node::ChannelManager;
use crate::node::Storage;
use anyhow::Result;
use bdk::TransactionDetails;
use dlc_manager::subchannel::LNChannelManager;
use std::sync::Arc;
use time::OffsetDateTime;

pub struct Shadow<S> {
storage: Arc<S>,
Expand Down Expand Up @@ -51,18 +51,17 @@ where
let transactions = self.storage.all_transactions_without_fees()?;
tracing::debug!("Syncing {} shadow transactions", transactions.len());

for mut transaction in transactions.into_iter() {
let transaction_details = self
.ln_dlc_wallet
.inner()
.get_transaction(&transaction.txid)?;

transaction.fee = transaction_details
.map(|d| d.fee.unwrap_or_default())
.unwrap_or_default();
transaction.updated_at = OffsetDateTime::now_utc();

self.storage.upsert_transaction(transaction)?;
for transaction in transactions.iter() {
let txid = transaction.txid();
match self.ln_dlc_wallet.inner().get_transaction(&txid) {
Ok(Some(TransactionDetails { fee: Some(fee), .. })) => {
self.storage.upsert_transaction(transaction.with_fee(fee))?;
}
Ok(_) => {}
Err(e) => {
tracing::warn!(%txid, "Failed to get transaction details: {e:#}");
}
};
}
Ok(())
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -94,7 +94,7 @@ async fn open_jit_channel() {
.get_transaction(&channel.funding_txid.unwrap().to_string())
.unwrap()
.unwrap();
assert!(transaction.fee > 0);
assert!(transaction.fee() > 0);
}

#[tokio::test(flavor = "multi_thread")]
Expand Down
Loading

0 comments on commit 0dfe631

Please sign in to comment.