diff --git a/coordinator/migrations/2023-07-12-040736_spendable_outputs/down.sql b/coordinator/migrations/2023-07-12-040736_spendable_outputs/down.sql new file mode 100644 index 000000000..8a69f3ccc --- /dev/null +++ b/coordinator/migrations/2023-07-12-040736_spendable_outputs/down.sql @@ -0,0 +1 @@ +DROP TABLE "spendable_outputs"; diff --git a/coordinator/migrations/2023-07-12-040736_spendable_outputs/up.sql b/coordinator/migrations/2023-07-12-040736_spendable_outputs/up.sql new file mode 100644 index 000000000..59b479738 --- /dev/null +++ b/coordinator/migrations/2023-07-12-040736_spendable_outputs/up.sql @@ -0,0 +1,8 @@ +CREATE TABLE "spendable_outputs" ( + id SERIAL PRIMARY KEY NOT NULL, + -- hex encoded + txid TEXT NOT NULL, + vout int NOT NULL, + -- hex representation of LDK's own encoding + descriptor TEXT NOT NULL +); diff --git a/coordinator/src/bin/coordinator.rs b/coordinator/src/bin/coordinator.rs index c8ad4ea8a..b1518156f 100644 --- a/coordinator/src/bin/coordinator.rs +++ b/coordinator/src/bin/coordinator.rs @@ -16,7 +16,6 @@ use coordinator::settings::Settings; use diesel::r2d2; use diesel::r2d2::ConnectionManager; use diesel::PgConnection; -use ln_dlc_node::node::InMemoryStore; use ln_dlc_node::seed::Bip39Seed; use rand::thread_rng; use rand::RngCore; @@ -77,11 +76,20 @@ async fn main() -> Result<()> { let settings = Settings::new(&data_dir).await; + // set up database connection pool + let manager = ConnectionManager::::new(opts.database.clone()); + let pool = r2d2::Pool::builder() + .build(manager) + .expect("Failed to create pool."); + + let mut conn = pool.get()?; + run_migration(&mut conn); + let node = Arc::new(ln_dlc_node::node::Node::new_coordinator( "10101.finance", network, data_dir.as_path(), - InMemoryStore::default(), + coordinator::node::storage::NodeStorage::new(pool.clone()), address, SocketAddr::new(IpAddr::V4(Ipv4Addr::new(0, 0, 0, 0)), address.port()), opts.p2p_announcement_addresses(), @@ -92,15 +100,6 @@ async fn main() -> Result<()> { opts.get_oracle_info(), )?); - // set up database connection pool - let manager = ConnectionManager::::new(opts.database); - let pool = r2d2::Pool::builder() - .build(manager) - .expect("Failed to create pool."); - - let mut conn = pool.get()?; - run_migration(&mut conn); - let node = Node::new(node, pool.clone()); node.update_settings(settings.as_node_settings()).await; diff --git a/coordinator/src/db/mod.rs b/coordinator/src/db/mod.rs index 209bd29b1..cfd3b38c5 100644 --- a/coordinator/src/db/mod.rs +++ b/coordinator/src/db/mod.rs @@ -1,4 +1,5 @@ pub mod custom_types; pub mod positions; +pub mod spendable_outputs; pub mod trades; pub mod user; diff --git a/coordinator/src/db/spendable_outputs.rs b/coordinator/src/db/spendable_outputs.rs new file mode 100644 index 000000000..195b6e963 --- /dev/null +++ b/coordinator/src/db/spendable_outputs.rs @@ -0,0 +1,102 @@ +use crate::schema::spendable_outputs; +use anyhow::anyhow; +use anyhow::Result; +use autometrics::autometrics; +use bitcoin::hashes::hex::FromHex; +use bitcoin::hashes::hex::ToHex; +use diesel::prelude::*; +use lightning::chain::keysinterface::DelayedPaymentOutputDescriptor; +use lightning::chain::keysinterface::SpendableOutputDescriptor; +use lightning::chain::keysinterface::StaticPaymentOutputDescriptor; +use lightning::chain::transaction::OutPoint; +use lightning::util::ser::Readable; +use lightning::util::ser::Writeable; + +#[autometrics] +pub(crate) fn insert( + conn: &mut PgConnection, + output: SpendableOutputDescriptor, +) -> QueryResult<()> { + diesel::insert_into(spendable_outputs::table) + .values(NewSpendableOutput::from(output)) + .execute(conn)?; + + Ok(()) +} + +#[autometrics] +pub fn get( + conn: &mut PgConnection, + outpoint: &OutPoint, +) -> Result> { + let output: Option = spendable_outputs::table + .filter(spendable_outputs::txid.eq(outpoint.txid.to_string())) + .first(conn) + .optional()?; + + let output = output + .map(|output| anyhow::Ok(output.try_into()?)) + .transpose()?; + + Ok(output) +} + +#[autometrics] +pub fn get_all(conn: &mut PgConnection) -> Result> { + let outputs: Vec = spendable_outputs::table.load(conn)?; + outputs + .into_iter() + .map(SpendableOutputDescriptor::try_from) + .collect() +} + +#[derive(Insertable, Debug, Clone)] +#[diesel(table_name = spendable_outputs)] +struct NewSpendableOutput { + txid: String, + vout: i32, + descriptor: String, +} + +#[derive(Queryable, Debug, Clone)] +#[diesel(table_name = spendable_outputs)] +struct SpendableOutput { + #[diesel(column_name = "id")] + _id: i32, + #[diesel(column_name = "txid")] + _txid: String, + #[diesel(column_name = "vout")] + _vout: i32, + descriptor: String, +} + +impl From for NewSpendableOutput { + fn from(descriptor: SpendableOutputDescriptor) -> Self { + use SpendableOutputDescriptor::*; + let outpoint = match &descriptor { + StaticOutput { outpoint, .. } => outpoint, + DelayedPaymentOutput(DelayedPaymentOutputDescriptor { outpoint, .. }) => outpoint, + StaticPaymentOutput(StaticPaymentOutputDescriptor { outpoint, .. }) => outpoint, + }; + + let descriptor = descriptor.encode().to_hex(); + + Self { + txid: outpoint.txid.to_string(), + vout: outpoint.index as i32, + descriptor, + } + } +} + +impl TryFrom for SpendableOutputDescriptor { + type Error = anyhow::Error; + + fn try_from(value: SpendableOutput) -> Result { + let bytes = Vec::from_hex(&value.descriptor)?; + let descriptor = Self::read(&mut lightning::io::Cursor::new(bytes)) + .map_err(|e| anyhow!("Failed to decode spendable output descriptor: {e}"))?; + + Ok(descriptor) + } +} diff --git a/coordinator/src/metrics.rs b/coordinator/src/metrics.rs index 8b2ffa9da..db010abbc 100644 --- a/coordinator/src/metrics.rs +++ b/coordinator/src/metrics.rs @@ -1,8 +1,8 @@ use crate::db; +use crate::node::storage::NodeStorage; use crate::node::Node; use lazy_static::lazy_static; use lightning::ln::channelmanager::ChannelDetails; -use ln_dlc_node::node::InMemoryStore; use opentelemetry::global; use opentelemetry::metrics::Meter; use opentelemetry::metrics::ObservableGauge; @@ -181,7 +181,7 @@ fn channel_metrics(cx: &Context, channels: Vec) { } } -fn node_metrics(cx: &Context, inner_node: Arc>) { +fn node_metrics(cx: &Context, inner_node: Arc>) { let connected_peers = inner_node.list_peers().len(); CONNECTED_PEERS.observe(cx, connected_peers as u64, &[]); let offchain = inner_node.get_ldk_balance(); diff --git a/coordinator/src/node.rs b/coordinator/src/node.rs index a8060b0b1..9fcf00692 100644 --- a/coordinator/src/node.rs +++ b/coordinator/src/node.rs @@ -1,5 +1,6 @@ use crate::db; use crate::db::trades::Trade; +use crate::node::storage::NodeStorage; use crate::position::models::NewPosition; use crate::position::models::Position; use anyhow::bail; @@ -29,7 +30,6 @@ use lightning::ln::channelmanager::ChannelDetails; use lightning_invoice::Invoice; use ln_dlc_node::node::dlc_message_name; use ln_dlc_node::node::sub_channel_message_name; -use ln_dlc_node::node::InMemoryStore; use ln_dlc_node::WalletSettings; use ln_dlc_node::CONTRACT_TX_FEE_RATE; use rust_decimal::prelude::ToPrimitive; @@ -45,6 +45,7 @@ use trade::Direction; pub mod connection; pub mod order_matching_fee; +pub mod storage; /// The leverage used by the coordinator for all trades. const COORDINATOR_LEVERAGE: f32 = 1.0; @@ -77,14 +78,14 @@ impl Default for NodeSettings { #[derive(Clone)] pub struct Node { - pub inner: Arc>, + pub inner: Arc>, pub pool: Pool>, pub settings: Arc>, } impl Node { pub fn new( - inner: Arc>, + inner: Arc>, pool: Pool>, ) -> Self { Self { diff --git a/coordinator/src/node/connection.rs b/coordinator/src/node/connection.rs index 1a0c2810b..c239f0b3c 100644 --- a/coordinator/src/node/connection.rs +++ b/coordinator/src/node/connection.rs @@ -1,6 +1,5 @@ use autometrics::autometrics; use lightning::ln::msgs::NetAddress; -use ln_dlc_node::node::InMemoryStore; use ln_dlc_node::node::Node; use ln_dlc_node::node::NodeInfo; use rand::seq::SliceRandom; @@ -11,9 +10,11 @@ use std::sync::Arc; use std::time::Duration; use tokio::task::spawn_blocking; +use crate::node::storage::NodeStorage; + #[autometrics] pub async fn keep_public_channel_peers_connected( - node: Arc>, + node: Arc>, check_interval: Duration, ) { loop { @@ -28,7 +29,7 @@ pub async fn keep_public_channel_peers_connected( } } -fn reconnect_to_disconnected_public_channel_peers(node: Arc>) { +fn reconnect_to_disconnected_public_channel_peers(node: Arc>) { let connected_peers = node .peer_manager .get_peer_node_ids() diff --git a/coordinator/src/node/storage.rs b/coordinator/src/node/storage.rs new file mode 100644 index 000000000..b53df1b21 --- /dev/null +++ b/coordinator/src/node/storage.rs @@ -0,0 +1,75 @@ +use anyhow::Result; +use diesel::r2d2::ConnectionManager; +use diesel::r2d2::Pool; +use diesel::PgConnection; +use lightning::chain::keysinterface::SpendableOutputDescriptor; +use lightning::chain::transaction::OutPoint; +use lightning::ln::PaymentHash; +use lightning::ln::PaymentPreimage; +use lightning::ln::PaymentSecret; +use ln_dlc_node::node; +use ln_dlc_node::HTLCStatus; +use ln_dlc_node::MillisatAmount; +use ln_dlc_node::PaymentFlow; +use ln_dlc_node::PaymentInfo; + +#[derive(Clone)] +pub struct NodeStorage { + pool: Pool>, +} + +impl NodeStorage { + pub fn new(pool: Pool>) -> Self { + Self { pool } + } +} + +impl node::Storage for NodeStorage { + // Payments + + fn insert_payment(&self, _payment_hash: PaymentHash, _info: PaymentInfo) -> Result<()> { + todo!() + } + fn merge_payment( + &self, + _payment_hash: &PaymentHash, + _flow: PaymentFlow, + _amt_msat: MillisatAmount, + _htlc_status: HTLCStatus, + _preimage: Option, + _secret: Option, + ) -> Result<()> { + todo!() + } + fn get_payment( + &self, + _payment_hash: &PaymentHash, + ) -> Result> { + todo!() + } + fn all_payments(&self) -> Result> { + todo!() + } + + // Spendable outputs + + fn insert_spendable_output(&self, output: SpendableOutputDescriptor) -> Result<()> { + let mut conn = self.pool.get()?; + crate::db::spendable_outputs::insert(&mut conn, output)?; + + Ok(()) + } + + fn get_spendable_output( + &self, + outpoint: &OutPoint, + ) -> Result> { + let mut conn = self.pool.get()?; + crate::db::spendable_outputs::get(&mut conn, outpoint) + } + + fn all_spendable_outputs(&self) -> Result> { + let mut conn = self.pool.get()?; + crate::db::spendable_outputs::get_all(&mut conn) + } +} diff --git a/coordinator/src/schema.rs b/coordinator/src/schema.rs index 289436619..5b4771b70 100644 --- a/coordinator/src/schema.rs +++ b/coordinator/src/schema.rs @@ -60,6 +60,15 @@ diesel::table! { } } +diesel::table! { + spendable_outputs (id) { + id -> Int4, + txid -> Text, + vout -> Int4, + descriptor -> Text, + } +} + diesel::table! { use diesel::sql_types::*; use super::sql_types::ContractSymbolType; @@ -91,4 +100,4 @@ diesel::table! { diesel::joinable!(trades -> positions (position_id)); -diesel::allow_tables_to_appear_in_same_query!(orders, positions, trades, users,); +diesel::allow_tables_to_appear_in_same_query!(orders, positions, spendable_outputs, trades, users,); diff --git a/crates/ln-dlc-node/src/node/mod.rs b/crates/ln-dlc-node/src/node/mod.rs index 497f124f9..a1f19ec21 100644 --- a/crates/ln-dlc-node/src/node/mod.rs +++ b/crates/ln-dlc-node/src/node/mod.rs @@ -34,6 +34,7 @@ use lightning::routing::gossip::P2PGossipSync; use lightning::routing::router::DefaultRouter; use lightning::routing::utxo::UtxoLookup; use lightning::util::config::UserConfig; +use lightning::util::events::Event; use lightning_background_processor::process_events_async; use lightning_background_processor::GossipSync; use lightning_persister::FilesystemPersister; @@ -74,7 +75,6 @@ pub use channel_manager::ChannelManager; pub use dlc_channel::dlc_message_name; pub use dlc_channel::sub_channel_message_name; pub use invoice::HTLCStatus; -use lightning::util::events::Event; pub use storage::InMemoryStore; pub use storage::Storage; pub use sub_channel_manager::SubChannelManager;