Skip to content

Commit

Permalink
feat(coordinator): Persist spendable outputs generated by LDK
Browse files Browse the repository at this point in the history
Fixes #845.
  • Loading branch information
luckysori committed Jul 13, 2023
1 parent a046cc9 commit 009837d
Show file tree
Hide file tree
Showing 11 changed files with 218 additions and 21 deletions.
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
DROP TABLE "spendable_outputs";
Original file line number Diff line number Diff line change
@@ -0,0 +1,8 @@
CREATE TABLE "spendable_outputs" (
id SERIAL PRIMARY KEY NOT NULL,
-- hex encoded
txid TEXT NOT NULL,
vout int NOT NULL,
-- hex representation of LDK's own encoding
descriptor TEXT NOT NULL
);
21 changes: 10 additions & 11 deletions coordinator/src/bin/coordinator.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,6 @@ use coordinator::settings::Settings;
use diesel::r2d2;
use diesel::r2d2::ConnectionManager;
use diesel::PgConnection;
use ln_dlc_node::node::InMemoryStore;
use ln_dlc_node::seed::Bip39Seed;
use rand::thread_rng;
use rand::RngCore;
Expand Down Expand Up @@ -77,11 +76,20 @@ async fn main() -> Result<()> {

let settings = Settings::new(&data_dir).await;

// set up database connection pool
let manager = ConnectionManager::<PgConnection>::new(opts.database.clone());
let pool = r2d2::Pool::builder()
.build(manager)
.expect("Failed to create pool.");

let mut conn = pool.get()?;
run_migration(&mut conn);

let node = Arc::new(ln_dlc_node::node::Node::new_coordinator(
"10101.finance",
network,
data_dir.as_path(),
InMemoryStore::default(),
coordinator::node::storage::NodeStorage::new(pool.clone()),
address,
SocketAddr::new(IpAddr::V4(Ipv4Addr::new(0, 0, 0, 0)), address.port()),
opts.p2p_announcement_addresses(),
Expand All @@ -92,15 +100,6 @@ async fn main() -> Result<()> {
opts.get_oracle_info(),
)?);

// set up database connection pool
let manager = ConnectionManager::<PgConnection>::new(opts.database);
let pool = r2d2::Pool::builder()
.build(manager)
.expect("Failed to create pool.");

let mut conn = pool.get()?;
run_migration(&mut conn);

let node = Node::new(node, pool.clone());
node.update_settings(settings.as_node_settings()).await;

Expand Down
1 change: 1 addition & 0 deletions coordinator/src/db/mod.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
pub mod custom_types;
pub mod positions;
pub mod spendable_outputs;
pub mod trades;
pub mod user;
102 changes: 102 additions & 0 deletions coordinator/src/db/spendable_outputs.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,102 @@
use crate::schema::spendable_outputs;
use anyhow::anyhow;
use anyhow::Result;
use autometrics::autometrics;
use bitcoin::hashes::hex::FromHex;
use bitcoin::hashes::hex::ToHex;
use diesel::prelude::*;
use lightning::chain::keysinterface::DelayedPaymentOutputDescriptor;
use lightning::chain::keysinterface::SpendableOutputDescriptor;
use lightning::chain::keysinterface::StaticPaymentOutputDescriptor;
use lightning::chain::transaction::OutPoint;
use lightning::util::ser::Readable;
use lightning::util::ser::Writeable;

#[autometrics]
pub(crate) fn insert(
conn: &mut PgConnection,
output: SpendableOutputDescriptor,
) -> QueryResult<()> {
diesel::insert_into(spendable_outputs::table)
.values(NewSpendableOutput::from(output))
.execute(conn)?;

Ok(())
}

#[autometrics]
pub fn get(
conn: &mut PgConnection,
outpoint: &OutPoint,
) -> Result<Option<SpendableOutputDescriptor>> {
let output: Option<SpendableOutput> = spendable_outputs::table
.filter(spendable_outputs::txid.eq(outpoint.txid.to_string()))
.first(conn)
.optional()?;

let output = output
.map(|output| anyhow::Ok(output.try_into()?))
.transpose()?;

Ok(output)
}

#[autometrics]
pub fn get_all(conn: &mut PgConnection) -> Result<Vec<SpendableOutputDescriptor>> {
let outputs: Vec<SpendableOutput> = spendable_outputs::table.load(conn)?;
outputs
.into_iter()
.map(SpendableOutputDescriptor::try_from)
.collect()
}

#[derive(Insertable, Debug, Clone)]
#[diesel(table_name = spendable_outputs)]
struct NewSpendableOutput {
txid: String,
vout: i32,
descriptor: String,
}

#[derive(Queryable, Debug, Clone)]
#[diesel(table_name = spendable_outputs)]
struct SpendableOutput {
#[diesel(column_name = "id")]
_id: i32,
#[diesel(column_name = "txid")]
_txid: String,
#[diesel(column_name = "vout")]
_vout: i32,
descriptor: String,
}

impl From<SpendableOutputDescriptor> for NewSpendableOutput {
fn from(descriptor: SpendableOutputDescriptor) -> Self {
use SpendableOutputDescriptor::*;
let outpoint = match &descriptor {
StaticOutput { outpoint, .. } => outpoint,
DelayedPaymentOutput(DelayedPaymentOutputDescriptor { outpoint, .. }) => outpoint,
StaticPaymentOutput(StaticPaymentOutputDescriptor { outpoint, .. }) => outpoint,
};

let descriptor = descriptor.encode().to_hex();

Self {
txid: outpoint.txid.to_string(),
vout: outpoint.index as i32,
descriptor,
}
}
}

impl TryFrom<SpendableOutput> for SpendableOutputDescriptor {
type Error = anyhow::Error;

fn try_from(value: SpendableOutput) -> Result<Self, Self::Error> {
let bytes = Vec::from_hex(&value.descriptor)?;
let descriptor = Self::read(&mut lightning::io::Cursor::new(bytes))
.map_err(|e| anyhow!("Failed to decode spendable output descriptor: {e}"))?;

Ok(descriptor)
}
}
4 changes: 2 additions & 2 deletions coordinator/src/metrics.rs
Original file line number Diff line number Diff line change
@@ -1,8 +1,8 @@
use crate::db;
use crate::node::storage::NodeStorage;
use crate::node::Node;
use lazy_static::lazy_static;
use lightning::ln::channelmanager::ChannelDetails;
use ln_dlc_node::node::InMemoryStore;
use opentelemetry::global;
use opentelemetry::metrics::Meter;
use opentelemetry::metrics::ObservableGauge;
Expand Down Expand Up @@ -181,7 +181,7 @@ fn channel_metrics(cx: &Context, channels: Vec<ChannelDetails>) {
}
}

fn node_metrics(cx: &Context, inner_node: Arc<ln_dlc_node::node::Node<InMemoryStore>>) {
fn node_metrics(cx: &Context, inner_node: Arc<ln_dlc_node::node::Node<NodeStorage>>) {
let connected_peers = inner_node.list_peers().len();
CONNECTED_PEERS.observe(cx, connected_peers as u64, &[]);
let offchain = inner_node.get_ldk_balance();
Expand Down
7 changes: 4 additions & 3 deletions coordinator/src/node.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
use crate::db;
use crate::db::trades::Trade;
use crate::node::storage::NodeStorage;
use crate::position::models::NewPosition;
use crate::position::models::Position;
use anyhow::bail;
Expand Down Expand Up @@ -29,7 +30,6 @@ use lightning::ln::channelmanager::ChannelDetails;
use lightning_invoice::Invoice;
use ln_dlc_node::node::dlc_message_name;
use ln_dlc_node::node::sub_channel_message_name;
use ln_dlc_node::node::InMemoryStore;
use ln_dlc_node::WalletSettings;
use ln_dlc_node::CONTRACT_TX_FEE_RATE;
use rust_decimal::prelude::ToPrimitive;
Expand All @@ -45,6 +45,7 @@ use trade::Direction;

pub mod connection;
pub mod order_matching_fee;
pub mod storage;

/// The leverage used by the coordinator for all trades.
const COORDINATOR_LEVERAGE: f32 = 1.0;
Expand Down Expand Up @@ -77,14 +78,14 @@ impl Default for NodeSettings {

#[derive(Clone)]
pub struct Node {
pub inner: Arc<ln_dlc_node::node::Node<InMemoryStore>>,
pub inner: Arc<ln_dlc_node::node::Node<NodeStorage>>,
pub pool: Pool<ConnectionManager<PgConnection>>,
pub settings: Arc<RwLock<NodeSettings>>,
}

impl Node {
pub fn new(
inner: Arc<ln_dlc_node::node::Node<InMemoryStore>>,
inner: Arc<ln_dlc_node::node::Node<NodeStorage>>,
pool: Pool<ConnectionManager<PgConnection>>,
) -> Self {
Self {
Expand Down
7 changes: 4 additions & 3 deletions coordinator/src/node/connection.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,5 @@
use autometrics::autometrics;
use lightning::ln::msgs::NetAddress;
use ln_dlc_node::node::InMemoryStore;
use ln_dlc_node::node::Node;
use ln_dlc_node::node::NodeInfo;
use rand::seq::SliceRandom;
Expand All @@ -11,9 +10,11 @@ use std::sync::Arc;
use std::time::Duration;
use tokio::task::spawn_blocking;

use crate::node::storage::NodeStorage;

#[autometrics]
pub async fn keep_public_channel_peers_connected(
node: Arc<Node<InMemoryStore>>,
node: Arc<Node<NodeStorage>>,
check_interval: Duration,
) {
loop {
Expand All @@ -28,7 +29,7 @@ pub async fn keep_public_channel_peers_connected(
}
}

fn reconnect_to_disconnected_public_channel_peers(node: Arc<Node<InMemoryStore>>) {
fn reconnect_to_disconnected_public_channel_peers(node: Arc<Node<NodeStorage>>) {
let connected_peers = node
.peer_manager
.get_peer_node_ids()
Expand Down
75 changes: 75 additions & 0 deletions coordinator/src/node/storage.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,75 @@
use anyhow::Result;
use diesel::r2d2::ConnectionManager;
use diesel::r2d2::Pool;
use diesel::PgConnection;
use lightning::chain::keysinterface::SpendableOutputDescriptor;
use lightning::chain::transaction::OutPoint;
use lightning::ln::PaymentHash;
use lightning::ln::PaymentPreimage;
use lightning::ln::PaymentSecret;
use ln_dlc_node::node;
use ln_dlc_node::HTLCStatus;
use ln_dlc_node::MillisatAmount;
use ln_dlc_node::PaymentFlow;
use ln_dlc_node::PaymentInfo;

#[derive(Clone)]
pub struct NodeStorage {
pool: Pool<ConnectionManager<PgConnection>>,
}

impl NodeStorage {
pub fn new(pool: Pool<ConnectionManager<PgConnection>>) -> Self {
Self { pool }
}
}

impl node::Storage for NodeStorage {
// Payments

fn insert_payment(&self, _payment_hash: PaymentHash, _info: PaymentInfo) -> Result<()> {
todo!()
}
fn merge_payment(
&self,
_payment_hash: &PaymentHash,
_flow: PaymentFlow,
_amt_msat: MillisatAmount,
_htlc_status: HTLCStatus,
_preimage: Option<PaymentPreimage>,
_secret: Option<PaymentSecret>,
) -> Result<()> {
todo!()
}
fn get_payment(
&self,
_payment_hash: &PaymentHash,
) -> Result<Option<(PaymentHash, PaymentInfo)>> {
todo!()
}
fn all_payments(&self) -> Result<Vec<(PaymentHash, PaymentInfo)>> {
todo!()
}

// Spendable outputs

fn insert_spendable_output(&self, output: SpendableOutputDescriptor) -> Result<()> {
let mut conn = self.pool.get()?;
crate::db::spendable_outputs::insert(&mut conn, output)?;

Ok(())
}

fn get_spendable_output(
&self,
outpoint: &OutPoint,
) -> Result<Option<SpendableOutputDescriptor>> {
let mut conn = self.pool.get()?;
crate::db::spendable_outputs::get(&mut conn, outpoint)
}

fn all_spendable_outputs(&self) -> Result<Vec<SpendableOutputDescriptor>> {
let mut conn = self.pool.get()?;
crate::db::spendable_outputs::get_all(&mut conn)
}
}
11 changes: 10 additions & 1 deletion coordinator/src/schema.rs
Original file line number Diff line number Diff line change
Expand Up @@ -60,6 +60,15 @@ diesel::table! {
}
}

diesel::table! {
spendable_outputs (id) {
id -> Int4,
txid -> Text,
vout -> Int4,
descriptor -> Text,
}
}

diesel::table! {
use diesel::sql_types::*;
use super::sql_types::ContractSymbolType;
Expand Down Expand Up @@ -91,4 +100,4 @@ diesel::table! {

diesel::joinable!(trades -> positions (position_id));

diesel::allow_tables_to_appear_in_same_query!(orders, positions, trades, users,);
diesel::allow_tables_to_appear_in_same_query!(orders, positions, spendable_outputs, trades, users,);
2 changes: 1 addition & 1 deletion crates/ln-dlc-node/src/node/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@ use lightning::routing::gossip::P2PGossipSync;
use lightning::routing::router::DefaultRouter;
use lightning::routing::utxo::UtxoLookup;
use lightning::util::config::UserConfig;
use lightning::util::events::Event;
use lightning_background_processor::process_events_async;
use lightning_background_processor::GossipSync;
use lightning_persister::FilesystemPersister;
Expand Down Expand Up @@ -74,7 +75,6 @@ pub use channel_manager::ChannelManager;
pub use dlc_channel::dlc_message_name;
pub use dlc_channel::sub_channel_message_name;
pub use invoice::HTLCStatus;
use lightning::util::events::Event;
pub use storage::InMemoryStore;
pub use storage::Storage;
pub use sub_channel_manager::SubChannelManager;
Expand Down

0 comments on commit 009837d

Please sign in to comment.