From 5ab74ba8a52676933256e4f786dba9fac6db6ee7 Mon Sep 17 00:00:00 2001 From: t-bast Date: Wed, 4 Sep 2024 17:37:23 +0200 Subject: [PATCH] Add liquidity purchases to the `AuditDb` Whenever liquidity is purchased, we store it in the `AuditDb`. This lets node operators gather useful statistics on their peers, and which ones are actively using the liquidity that is purchased. We store minimal information about the liquidity ads itself to be more easily compatible with potential changes in the spec. --- .../acinq/eclair/channel/ChannelEvents.scala | 15 ++-- .../fr/acinq/eclair/channel/fsm/Channel.scala | 4 +- .../channel/fund/InteractiveTxBuilder.scala | 35 +++++++++- .../scala/fr/acinq/eclair/db/AuditDb.scala | 4 ++ .../fr/acinq/eclair/db/DbEventHandler.scala | 3 + .../fr/acinq/eclair/db/DualDatabases.scala | 10 +++ .../fr/acinq/eclair/db/pg/PgAuditDb.scala | 69 ++++++++++++++++++- .../eclair/db/sqlite/SqliteAuditDb.scala | 65 ++++++++++++++++- .../channel/InteractiveTxBuilderSpec.scala | 12 +++- .../fr/acinq/eclair/db/AuditDbSpec.scala | 34 ++++++++- 10 files changed, 237 insertions(+), 14 deletions(-) diff --git a/eclair-core/src/main/scala/fr/acinq/eclair/channel/ChannelEvents.scala b/eclair-core/src/main/scala/fr/acinq/eclair/channel/ChannelEvents.scala index ff36bd5fb1..a138cfc1a3 100644 --- a/eclair-core/src/main/scala/fr/acinq/eclair/channel/ChannelEvents.scala +++ b/eclair-core/src/main/scala/fr/acinq/eclair/channel/ChannelEvents.scala @@ -18,12 +18,11 @@ package fr.acinq.eclair.channel import akka.actor.ActorRef import fr.acinq.bitcoin.scalacompat.Crypto.PublicKey -import fr.acinq.bitcoin.scalacompat.{ByteVector32, Satoshi, Transaction} +import fr.acinq.bitcoin.scalacompat.{ByteVector32, Satoshi, Transaction, TxId} import fr.acinq.eclair.blockchain.fee.FeeratePerKw import fr.acinq.eclair.channel.Helpers.Closing.ClosingType -import fr.acinq.eclair.io.Peer.OpenChannelResponse -import fr.acinq.eclair.wire.protocol.{ChannelAnnouncement, ChannelUpdate} -import fr.acinq.eclair.{BlockHeight, Features, ShortChannelId} +import fr.acinq.eclair.wire.protocol.{ChannelAnnouncement, ChannelUpdate, LiquidityAds} +import fr.acinq.eclair.{BlockHeight, Features, MilliSatoshi, ShortChannelId} /** * Created by PM on 17/08/2016. @@ -79,6 +78,14 @@ case class ChannelSignatureSent(channel: ActorRef, commitments: Commitments) ext case class ChannelSignatureReceived(channel: ActorRef, commitments: Commitments) extends ChannelEvent +case class LiquidityPurchase(fundingTxId: TxId, fundingTxIndex: Long, isBuyer: Boolean, amount: Satoshi, fees: LiquidityAds.Fees, capacity: Satoshi, localContribution: Satoshi, remoteContribution: Satoshi, localBalance: MilliSatoshi, remoteBalance: MilliSatoshi, outgoingHtlcCount: Long, incomingHtlcCount: Long) { + val previousCapacity: Satoshi = capacity - localContribution - remoteContribution + val previousLocalBalance: MilliSatoshi = if (isBuyer) localBalance - localContribution + fees.total else localBalance - localContribution - fees.total + val previousRemoteBalance: MilliSatoshi = if (isBuyer) remoteBalance - remoteContribution - fees.total else remoteBalance - remoteContribution + fees.total +} + +case class ChannelLiquidityPurchased(channel: ActorRef, channelId: ByteVector32, remoteNodeId: PublicKey, purchase: LiquidityPurchase) extends ChannelEvent + case class ChannelErrorOccurred(channel: ActorRef, channelId: ByteVector32, remoteNodeId: PublicKey, error: ChannelError, isFatal: Boolean) extends ChannelEvent // NB: the fee should be set to 0 when we're not paying it. diff --git a/eclair-core/src/main/scala/fr/acinq/eclair/channel/fsm/Channel.scala b/eclair-core/src/main/scala/fr/acinq/eclair/channel/fsm/Channel.scala index f6abe66821..ca6c7448e0 100644 --- a/eclair-core/src/main/scala/fr/acinq/eclair/channel/fsm/Channel.scala +++ b/eclair-core/src/main/scala/fr/acinq/eclair/channel/fsm/Channel.scala @@ -982,7 +982,7 @@ class Channel(val nodeParams: NodeParams, val wallet: OnChainChannelFunder with sessionId, nodeParams, fundingParams, channelParams = d.commitments.params, - purpose = InteractiveTxBuilder.SpliceTx(parentCommitment), + purpose = InteractiveTxBuilder.SpliceTx(parentCommitment, d.commitments.changes), localPushAmount = spliceAck.pushAmount, remotePushAmount = msg.pushAmount, liquidityPurchase_opt = willFund_opt.map(_.purchase), wallet @@ -1029,7 +1029,7 @@ class Channel(val nodeParams: NodeParams, val wallet: OnChainChannelFunder with sessionId, nodeParams, fundingParams, channelParams = d.commitments.params, - purpose = InteractiveTxBuilder.SpliceTx(parentCommitment), + purpose = InteractiveTxBuilder.SpliceTx(parentCommitment, d.commitments.changes), localPushAmount = cmd.pushAmount, remotePushAmount = msg.pushAmount, liquidityPurchase_opt = liquidityPurchase_opt, wallet diff --git a/eclair-core/src/main/scala/fr/acinq/eclair/channel/fund/InteractiveTxBuilder.scala b/eclair-core/src/main/scala/fr/acinq/eclair/channel/fund/InteractiveTxBuilder.scala index 60795b02f4..4eb1c7d1b4 100644 --- a/eclair-core/src/main/scala/fr/acinq/eclair/channel/fund/InteractiveTxBuilder.scala +++ b/eclair-core/src/main/scala/fr/acinq/eclair/channel/fund/InteractiveTxBuilder.scala @@ -16,6 +16,8 @@ package fr.acinq.eclair.channel.fund +import akka.actor.typed.eventstream.EventStream +import akka.actor.typed.scaladsl.adapter.TypedActorRefOps import akka.actor.typed.scaladsl.{ActorContext, Behaviors, StashBuffer} import akka.actor.typed.{ActorRef, Behavior} import akka.event.LoggingAdapter @@ -163,6 +165,8 @@ object InteractiveTxBuilder { def previousFundingAmount: Satoshi def localCommitIndex: Long def remoteCommitIndex: Long + def localNextHtlcId: Long + def remoteNextHtlcId: Long def remotePerCommitmentPoint: PublicKey def commitTxFeerate: FeeratePerKw def fundingTxIndex: Long @@ -175,15 +179,19 @@ object InteractiveTxBuilder { override val previousFundingAmount: Satoshi = 0 sat override val localCommitIndex: Long = 0 override val remoteCommitIndex: Long = 0 + override val localNextHtlcId: Long = 0 + override val remoteNextHtlcId: Long = 0 override val fundingTxIndex: Long = 0 override val localHtlcs: Set[DirectedHtlc] = Set.empty } - case class SpliceTx(parentCommitment: Commitment) extends Purpose { + case class SpliceTx(parentCommitment: Commitment, changes: CommitmentChanges) extends Purpose { override val previousLocalBalance: MilliSatoshi = parentCommitment.localCommit.spec.toLocal override val previousRemoteBalance: MilliSatoshi = parentCommitment.remoteCommit.spec.toLocal override val previousFundingAmount: Satoshi = parentCommitment.capacity override val localCommitIndex: Long = parentCommitment.localCommit.index override val remoteCommitIndex: Long = parentCommitment.remoteCommit.index + override val localNextHtlcId: Long = changes.localNextHtlcId + override val remoteNextHtlcId: Long = changes.remoteNextHtlcId override val remotePerCommitmentPoint: PublicKey = parentCommitment.remoteCommit.remotePerCommitmentPoint override val commitTxFeerate: FeeratePerKw = parentCommitment.localCommit.spec.commitTxFeerate override val fundingTxIndex: Long = parentCommitment.fundingTxIndex + 1 @@ -199,6 +207,8 @@ object InteractiveTxBuilder { override val previousFundingAmount: Satoshi = (previousLocalBalance + previousRemoteBalance).truncateToSatoshi override val localCommitIndex: Long = replacedCommitment.localCommit.index override val remoteCommitIndex: Long = replacedCommitment.remoteCommit.index + override val localNextHtlcId: Long = 0 + override val remoteNextHtlcId: Long = 0 override val remotePerCommitmentPoint: PublicKey = replacedCommitment.remoteCommit.remotePerCommitmentPoint override val commitTxFeerate: FeeratePerKw = replacedCommitment.localCommit.spec.commitTxFeerate override val fundingTxIndex: Long = replacedCommitment.fundingTxIndex @@ -792,6 +802,29 @@ private class InteractiveTxBuilder(replyTo: ActorRef[InteractiveTxBuilder.Respon Behaviors.receiveMessagePartial { case SignTransactionResult(signedTx) => log.info(s"interactive-tx txid=${signedTx.txId} partially signed with {} local inputs, {} remote inputs, {} local outputs and {} remote outputs", signedTx.tx.localInputs.length, signedTx.tx.remoteInputs.length, signedTx.tx.localOutputs.length, signedTx.tx.remoteOutputs.length) + // At this point, we're not completely sure that the transaction will succeed: if our peer doesn't send their + // commit_sig, the transaction will be aborted. But it's a best effort, because after sending our commit_sig, + // we won't store details about the liquidity purchase so we'll be unable to emit that event later. Even after + // fully signing the transaction, it may be double-spent by a force-close, which would invalidate it as well. + // The right solution is to check confirmations on the funding transaction before considering that a liquidity + // purchase is completed, which is what we do in our AuditDb. + liquidityPurchase_opt.foreach { p => + val purchase = LiquidityPurchase( + fundingTxId = signedTx.txId, + fundingTxIndex = purpose.fundingTxIndex, + isBuyer = fundingParams.isInitiator, + amount = p.amount, + fees = p.fees, + capacity = fundingParams.fundingAmount, + localContribution = fundingParams.localContribution, + remoteContribution = fundingParams.remoteContribution, + localBalance = localCommit.spec.toLocal, + remoteBalance = localCommit.spec.toRemote, + outgoingHtlcCount = purpose.localNextHtlcId, + incomingHtlcCount = purpose.remoteNextHtlcId, + ) + context.system.eventStream ! EventStream.Publish(ChannelLiquidityPurchased(replyTo.toClassic, channelParams.channelId, remoteNodeId, purchase)) + } replyTo ! Succeeded(InteractiveTxSigningSession.WaitingForSigs(fundingParams, purpose.fundingTxIndex, signedTx, Left(localCommit), remoteCommit), commitSig) Behaviors.stopped case WalletFailure(t) => diff --git a/eclair-core/src/main/scala/fr/acinq/eclair/db/AuditDb.scala b/eclair-core/src/main/scala/fr/acinq/eclair/db/AuditDb.scala index fee50e7e15..7a49307da6 100644 --- a/eclair-core/src/main/scala/fr/acinq/eclair/db/AuditDb.scala +++ b/eclair-core/src/main/scala/fr/acinq/eclair/db/AuditDb.scala @@ -34,6 +34,8 @@ trait AuditDb { def add(paymentRelayed: PaymentRelayed): Unit + def add(liquidityPurchase: ChannelLiquidityPurchased): Unit + def add(txPublished: TransactionPublished): Unit def add(txConfirmed: TransactionConfirmed): Unit @@ -52,6 +54,8 @@ trait AuditDb { def listRelayed(from: TimestampMilli, to: TimestampMilli, paginated_opt: Option[Paginated] = None): Seq[PaymentRelayed] + def listLiquidityPurchases(remoteNodeId: PublicKey): Seq[LiquidityPurchase] + def listNetworkFees(from: TimestampMilli, to: TimestampMilli): Seq[NetworkFee] def stats(from: TimestampMilli, to: TimestampMilli, paginated_opt: Option[Paginated] = None): Seq[Stats] diff --git a/eclair-core/src/main/scala/fr/acinq/eclair/db/DbEventHandler.scala b/eclair-core/src/main/scala/fr/acinq/eclair/db/DbEventHandler.scala index 0a66c56b43..437ff06799 100644 --- a/eclair-core/src/main/scala/fr/acinq/eclair/db/DbEventHandler.scala +++ b/eclair-core/src/main/scala/fr/acinq/eclair/db/DbEventHandler.scala @@ -45,6 +45,7 @@ class DbEventHandler(nodeParams: NodeParams) extends Actor with DiagnosticActorL context.system.eventStream.subscribe(self, classOf[PaymentFailed]) context.system.eventStream.subscribe(self, classOf[PaymentReceived]) context.system.eventStream.subscribe(self, classOf[PaymentRelayed]) + context.system.eventStream.subscribe(self, classOf[ChannelLiquidityPurchased]) context.system.eventStream.subscribe(self, classOf[TransactionPublished]) context.system.eventStream.subscribe(self, classOf[TransactionConfirmed]) context.system.eventStream.subscribe(self, classOf[ChannelErrorOccurred]) @@ -92,6 +93,8 @@ class DbEventHandler(nodeParams: NodeParams) extends Actor with DiagnosticActorL } auditDb.add(e) + case e: ChannelLiquidityPurchased => auditDb.add(e) + case e: TransactionPublished => log.info(s"paying mining fee=${e.miningFee} for txid=${e.tx.txid} desc=${e.desc}") auditDb.add(e) diff --git a/eclair-core/src/main/scala/fr/acinq/eclair/db/DualDatabases.scala b/eclair-core/src/main/scala/fr/acinq/eclair/db/DualDatabases.scala index c2e04179d4..917ae810ea 100644 --- a/eclair-core/src/main/scala/fr/acinq/eclair/db/DualDatabases.scala +++ b/eclair-core/src/main/scala/fr/acinq/eclair/db/DualDatabases.scala @@ -151,6 +151,11 @@ case class DualAuditDb(primary: AuditDb, secondary: AuditDb) extends AuditDb { primary.add(paymentRelayed) } + override def add(liquidityPurchase: ChannelLiquidityPurchased): Unit = { + runAsync(secondary.add(liquidityPurchase)) + primary.add(liquidityPurchase) + } + override def add(txPublished: TransactionPublished): Unit = { runAsync(secondary.add(txPublished)) primary.add(txPublished) @@ -196,6 +201,11 @@ case class DualAuditDb(primary: AuditDb, secondary: AuditDb) extends AuditDb { primary.listRelayed(from, to, paginated_opt) } + override def listLiquidityPurchases(remoteNodeId: PublicKey): Seq[LiquidityPurchase] = { + runAsync(secondary.listLiquidityPurchases(remoteNodeId)) + primary.listLiquidityPurchases(remoteNodeId) + } + override def listNetworkFees(from: TimestampMilli, to: TimestampMilli): Seq[AuditDb.NetworkFee] = { runAsync(secondary.listNetworkFees(from, to)) primary.listNetworkFees(from, to) diff --git a/eclair-core/src/main/scala/fr/acinq/eclair/db/pg/PgAuditDb.scala b/eclair-core/src/main/scala/fr/acinq/eclair/db/pg/PgAuditDb.scala index b5a3a7dbc1..82cab5d8c6 100644 --- a/eclair-core/src/main/scala/fr/acinq/eclair/db/pg/PgAuditDb.scala +++ b/eclair-core/src/main/scala/fr/acinq/eclair/db/pg/PgAuditDb.scala @@ -26,6 +26,7 @@ import fr.acinq.eclair.db.Monitoring.Tags.DbBackends import fr.acinq.eclair.db._ import fr.acinq.eclair.payment._ import fr.acinq.eclair.transactions.Transactions.PlaceHolderPubKey +import fr.acinq.eclair.wire.protocol.LiquidityAds import fr.acinq.eclair.{MilliSatoshi, MilliSatoshiLong, Paginated, TimestampMilli} import grizzled.slf4j.Logging @@ -36,7 +37,7 @@ import javax.sql.DataSource object PgAuditDb { val DB_NAME = "audit" - val CURRENT_VERSION = 12 + val CURRENT_VERSION = 13 } class PgAuditDb(implicit ds: DataSource) extends AuditDb with Logging { @@ -114,6 +115,11 @@ class PgAuditDb(implicit ds: DataSource) extends AuditDb with Logging { statement.executeUpdate("CREATE INDEX transactions_published_channel_id_idx ON audit.transactions_published(channel_id)") } + def migration1213(statement: Statement): Unit = { + statement.executeUpdate("CREATE TABLE IF NOT EXISTS audit.liquidity_purchases (tx_id TEXT NOT NULL, channel_id TEXT NOT NULL, node_id TEXT NOT NULL, is_buyer BOOLEAN NOT NULL, amount_sat BIGINT NOT NULL, mining_fee_sat BIGINT NOT NULL, service_fee_sat BIGINT NOT NULL, funding_tx_index BIGINT NOT NULL, capacity_sat BIGINT NOT NULL, local_contribution_sat BIGINT NOT NULL, remote_contribution_sat BIGINT NOT NULL, local_balance_msat BIGINT NOT NULL, remote_balance_msat BIGINT NOT NULL, outgoing_htlc_count BIGINT NOT NULL, incoming_htlc_count BIGINT NOT NULL, created_at TIMESTAMP WITH TIME ZONE NOT NULL, confirmed_at TIMESTAMP WITH TIME ZONE)") + statement.executeUpdate("CREATE INDEX IF NOT EXISTS liquidity_purchases_node_id_idx ON audit.liquidity_purchases(node_id)") + } + getVersion(statement, DB_NAME) match { case None => statement.executeUpdate("CREATE SCHEMA audit") @@ -125,6 +131,7 @@ class PgAuditDb(implicit ds: DataSource) extends AuditDb with Logging { statement.executeUpdate("CREATE TABLE audit.channel_events (channel_id TEXT NOT NULL, node_id TEXT NOT NULL, capacity_sat BIGINT NOT NULL, is_funder BOOLEAN NOT NULL, is_private BOOLEAN NOT NULL, event TEXT NOT NULL, timestamp TIMESTAMP WITH TIME ZONE NOT NULL)") statement.executeUpdate("CREATE TABLE audit.channel_updates (channel_id TEXT NOT NULL, node_id TEXT NOT NULL, fee_base_msat BIGINT NOT NULL, fee_proportional_millionths BIGINT NOT NULL, cltv_expiry_delta BIGINT NOT NULL, htlc_minimum_msat BIGINT NOT NULL, htlc_maximum_msat BIGINT NOT NULL, timestamp TIMESTAMP WITH TIME ZONE NOT NULL)") statement.executeUpdate("CREATE TABLE audit.path_finding_metrics (amount_msat BIGINT NOT NULL, fees_msat BIGINT NOT NULL, status TEXT NOT NULL, duration_ms BIGINT NOT NULL, timestamp TIMESTAMP WITH TIME ZONE NOT NULL, is_mpp BOOLEAN NOT NULL, experiment_name TEXT NOT NULL, recipient_node_id TEXT NOT NULL, payment_hash TEXT, routing_hints JSONB)") + statement.executeUpdate("CREATE TABLE audit.liquidity_purchases (tx_id TEXT NOT NULL, channel_id TEXT NOT NULL, node_id TEXT NOT NULL, is_buyer BOOLEAN NOT NULL, amount_sat BIGINT NOT NULL, mining_fee_sat BIGINT NOT NULL, service_fee_sat BIGINT NOT NULL, funding_tx_index BIGINT NOT NULL, capacity_sat BIGINT NOT NULL, local_contribution_sat BIGINT NOT NULL, remote_contribution_sat BIGINT NOT NULL, local_balance_msat BIGINT NOT NULL, remote_balance_msat BIGINT NOT NULL, outgoing_htlc_count BIGINT NOT NULL, incoming_htlc_count BIGINT NOT NULL, created_at TIMESTAMP WITH TIME ZONE NOT NULL, confirmed_at TIMESTAMP WITH TIME ZONE)") statement.executeUpdate("CREATE TABLE audit.transactions_published (tx_id TEXT NOT NULL PRIMARY KEY, channel_id TEXT NOT NULL, node_id TEXT NOT NULL, mining_fee_sat BIGINT NOT NULL, tx_type TEXT NOT NULL, timestamp TIMESTAMP WITH TIME ZONE NOT NULL)") statement.executeUpdate("CREATE TABLE audit.transactions_confirmed (tx_id TEXT NOT NULL PRIMARY KEY, channel_id TEXT NOT NULL, node_id TEXT NOT NULL, timestamp TIMESTAMP WITH TIME ZONE NOT NULL)") @@ -146,10 +153,11 @@ class PgAuditDb(implicit ds: DataSource) extends AuditDb with Logging { statement.executeUpdate("CREATE INDEX metrics_name_idx ON audit.path_finding_metrics(experiment_name)") statement.executeUpdate("CREATE INDEX metrics_recipient_idx ON audit.path_finding_metrics(recipient_node_id)") statement.executeUpdate("CREATE INDEX metrics_hash_idx ON audit.path_finding_metrics(payment_hash)") + statement.executeUpdate("CREATE INDEX liquidity_purchases_node_id_idx ON audit.liquidity_purchases(node_id)") statement.executeUpdate("CREATE INDEX transactions_published_channel_id_idx ON audit.transactions_published(channel_id)") statement.executeUpdate("CREATE INDEX transactions_published_timestamp_idx ON audit.transactions_published(timestamp)") statement.executeUpdate("CREATE INDEX transactions_confirmed_timestamp_idx ON audit.transactions_confirmed(timestamp)") - case Some(v@(4 | 5 | 6 | 7 | 8 | 9 | 10 | 11)) => + case Some(v@(4 | 5 | 6 | 7 | 8 | 9 | 10 | 11 | 12)) => logger.warn(s"migrating db $DB_NAME, found version=$v current=$CURRENT_VERSION") if (v < 5) { migration45(statement) @@ -175,6 +183,9 @@ class PgAuditDb(implicit ds: DataSource) extends AuditDb with Logging { if (v < 12) { migration1112(statement) } + if (v < 13) { + migration1213(statement) + } case Some(CURRENT_VERSION) => () // table is up-to-date, nothing to do case Some(unknownVersion) => throw new RuntimeException(s"Unknown version of DB $DB_NAME found, version=$unknownVersion") } @@ -264,6 +275,30 @@ class PgAuditDb(implicit ds: DataSource) extends AuditDb with Logging { } } + override def add(e: ChannelLiquidityPurchased): Unit = withMetrics("audit/add-liquidity-purchase", DbBackends.Postgres) { + inTransaction { pg => + using(pg.prepareStatement("INSERT INTO audit.liquidity_purchases VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, NULL)")) { statement => + statement.setString(1, e.purchase.fundingTxId.value.toHex) + statement.setString(2, e.channelId.toHex) + statement.setString(3, e.remoteNodeId.toHex) + statement.setBoolean(4, e.purchase.isBuyer) + statement.setLong(5, e.purchase.amount.toLong) + statement.setLong(6, e.purchase.fees.miningFee.toLong) + statement.setLong(7, e.purchase.fees.serviceFee.toLong) + statement.setLong(8, e.purchase.fundingTxIndex) + statement.setLong(9, e.purchase.capacity.toLong) + statement.setLong(10, e.purchase.localContribution.toLong) + statement.setLong(11, e.purchase.remoteContribution.toLong) + statement.setLong(12, e.purchase.localBalance.toLong) + statement.setLong(13, e.purchase.remoteBalance.toLong) + statement.setLong(14, e.purchase.outgoingHtlcCount) + statement.setLong(15, e.purchase.incomingHtlcCount) + statement.setTimestamp(16, Timestamp.from(Instant.now())) + statement.executeUpdate() + } + } + } + override def add(e: TransactionPublished): Unit = withMetrics("audit/add-transaction-published", DbBackends.Postgres) { inTransaction { pg => using(pg.prepareStatement("INSERT INTO audit.transactions_published VALUES (?, ?, ?, ?, ?, ?) ON CONFLICT DO NOTHING")) { statement => @@ -287,6 +322,12 @@ class PgAuditDb(implicit ds: DataSource) extends AuditDb with Logging { statement.setTimestamp(4, Timestamp.from(Instant.now())) statement.executeUpdate() } + using(pg.prepareStatement("UPDATE audit.liquidity_purchases SET confirmed_at=? WHERE node_id=? AND tx_id=?")) { statement => + statement.setTimestamp(1, Timestamp.from(Instant.now())) + statement.setString(2, e.remoteNodeId.toHex) + statement.setString(3, e.tx.txid.value.toHex) + statement.executeUpdate() + } } } @@ -462,6 +503,30 @@ class PgAuditDb(implicit ds: DataSource) extends AuditDb with Logging { } } + override def listLiquidityPurchases(remoteNodeId: PublicKey): Seq[LiquidityPurchase] = withMetrics("audit/list-liquidity-purchases", DbBackends.Postgres) { + inTransaction { pg => + using(pg.prepareStatement("SELECT * FROM audit.liquidity_purchases WHERE node_id=? AND confirmed_at IS NOT NULL")) { statement => + statement.setString(1, remoteNodeId.toHex) + statement.executeQuery().map { rs => + LiquidityPurchase( + fundingTxId = TxId(rs.getByteVector32FromHex("tx_id")), + fundingTxIndex = rs.getLong("funding_tx_index"), + isBuyer = rs.getBoolean("is_buyer"), + amount = Satoshi(rs.getLong("amount_sat")), + fees = LiquidityAds.Fees(miningFee = Satoshi(rs.getLong("mining_fee_sat")), serviceFee = Satoshi(rs.getLong("service_fee_sat"))), + capacity = Satoshi(rs.getLong("capacity_sat")), + localContribution = Satoshi(rs.getLong("local_contribution_sat")), + remoteContribution = Satoshi(rs.getLong("remote_contribution_sat")), + localBalance = MilliSatoshi(rs.getLong("local_balance_msat")), + remoteBalance = MilliSatoshi(rs.getLong("remote_balance_msat")), + outgoingHtlcCount = rs.getLong("outgoing_htlc_count"), + incomingHtlcCount = rs.getLong("incoming_htlc_count") + ) + }.toSeq + } + } + } + override def listNetworkFees(from: TimestampMilli, to: TimestampMilli): Seq[NetworkFee] = inTransaction { pg => using(pg.prepareStatement("SELECT * FROM audit.transactions_confirmed INNER JOIN audit.transactions_published ON audit.transactions_published.tx_id = audit.transactions_confirmed.tx_id WHERE audit.transactions_confirmed.timestamp BETWEEN ? and ? ORDER BY audit.transactions_confirmed.timestamp")) { statement => diff --git a/eclair-core/src/main/scala/fr/acinq/eclair/db/sqlite/SqliteAuditDb.scala b/eclair-core/src/main/scala/fr/acinq/eclair/db/sqlite/SqliteAuditDb.scala index d511768da6..6869a75621 100644 --- a/eclair-core/src/main/scala/fr/acinq/eclair/db/sqlite/SqliteAuditDb.scala +++ b/eclair-core/src/main/scala/fr/acinq/eclair/db/sqlite/SqliteAuditDb.scala @@ -26,6 +26,7 @@ import fr.acinq.eclair.db.Monitoring.Tags.DbBackends import fr.acinq.eclair.db._ import fr.acinq.eclair.payment._ import fr.acinq.eclair.transactions.Transactions.PlaceHolderPubKey +import fr.acinq.eclair.wire.protocol.LiquidityAds import fr.acinq.eclair.{MilliSatoshi, MilliSatoshiLong, Paginated, TimestampMilli} import grizzled.slf4j.Logging @@ -34,7 +35,7 @@ import java.util.UUID object SqliteAuditDb { val DB_NAME = "audit" - val CURRENT_VERSION = 9 + val CURRENT_VERSION = 10 } class SqliteAuditDb(val sqlite: Connection) extends AuditDb with Logging { @@ -114,6 +115,11 @@ class SqliteAuditDb(val sqlite: Connection) extends AuditDb with Logging { statement.executeUpdate("CREATE INDEX transactions_published_channel_id_idx ON transactions_published(channel_id)") } + def migration910(statement: Statement): Unit = { + statement.executeUpdate("CREATE TABLE IF NOT EXISTS liquidity_purchases (tx_id BLOB NOT NULL, channel_id BLOB NOT NULL, node_id BLOB NOT NULL, is_buyer BOOLEAN NOT NULL, amount_sat INTEGER NOT NULL, mining_fee_sat INTEGER NOT NULL, service_fee_sat INTEGER NOT NULL, funding_tx_index INTEGER NOT NULL, capacity_sat INTEGER NOT NULL, local_contribution_sat INTEGER NOT NULL, remote_contribution_sat INTEGER NOT NULL, local_balance_msat INTEGER NOT NULL, remote_balance_msat INTEGER NOT NULL, outgoing_htlc_count INTEGER NOT NULL, incoming_htlc_count INTEGER NOT NULL, created_at INTEGER NOT NULL, confirmed_at INTEGER)") + statement.executeUpdate("CREATE INDEX IF NOT EXISTS liquidity_purchases_node_id_idx ON liquidity_purchases(node_id)") + } + getVersion(statement, DB_NAME) match { case None => statement.executeUpdate("CREATE TABLE sent (amount_msat INTEGER NOT NULL, fees_msat INTEGER NOT NULL, recipient_amount_msat INTEGER NOT NULL, payment_id TEXT NOT NULL, parent_payment_id TEXT NOT NULL, payment_hash BLOB NOT NULL, payment_preimage BLOB NOT NULL, recipient_node_id BLOB NOT NULL, to_channel_id BLOB NOT NULL, timestamp INTEGER NOT NULL)") @@ -124,6 +130,7 @@ class SqliteAuditDb(val sqlite: Connection) extends AuditDb with Logging { statement.executeUpdate("CREATE TABLE channel_errors (channel_id BLOB NOT NULL, node_id BLOB NOT NULL, error_name TEXT NOT NULL, error_message TEXT NOT NULL, is_fatal INTEGER NOT NULL, timestamp INTEGER NOT NULL)") statement.executeUpdate("CREATE TABLE channel_updates (channel_id BLOB NOT NULL, node_id BLOB NOT NULL, fee_base_msat INTEGER NOT NULL, fee_proportional_millionths INTEGER NOT NULL, cltv_expiry_delta INTEGER NOT NULL, htlc_minimum_msat INTEGER NOT NULL, htlc_maximum_msat INTEGER NOT NULL, timestamp INTEGER NOT NULL)") statement.executeUpdate("CREATE TABLE path_finding_metrics (amount_msat INTEGER NOT NULL, fees_msat INTEGER NOT NULL, status TEXT NOT NULL, duration_ms INTEGER NOT NULL, timestamp INTEGER NOT NULL, is_mpp INTEGER NOT NULL, experiment_name TEXT NOT NULL, recipient_node_id BLOB NOT NULL)") + statement.executeUpdate("CREATE TABLE liquidity_purchases (tx_id BLOB NOT NULL, channel_id BLOB NOT NULL, node_id BLOB NOT NULL, is_buyer BOOLEAN NOT NULL, amount_sat INTEGER NOT NULL, mining_fee_sat INTEGER NOT NULL, service_fee_sat INTEGER NOT NULL, funding_tx_index INTEGER NOT NULL, capacity_sat INTEGER NOT NULL, local_contribution_sat INTEGER NOT NULL, remote_contribution_sat INTEGER NOT NULL, local_balance_msat INTEGER NOT NULL, remote_balance_msat INTEGER NOT NULL, outgoing_htlc_count INTEGER NOT NULL, incoming_htlc_count INTEGER NOT NULL, created_at INTEGER NOT NULL, confirmed_at INTEGER)") statement.executeUpdate("CREATE TABLE transactions_published (tx_id BLOB NOT NULL PRIMARY KEY, channel_id BLOB NOT NULL, node_id BLOB NOT NULL, mining_fee_sat INTEGER NOT NULL, tx_type TEXT NOT NULL, timestamp INTEGER NOT NULL)") statement.executeUpdate("CREATE TABLE transactions_confirmed (tx_id BLOB NOT NULL PRIMARY KEY, channel_id BLOB NOT NULL, node_id BLOB NOT NULL, timestamp INTEGER NOT NULL)") @@ -142,10 +149,11 @@ class SqliteAuditDb(val sqlite: Connection) extends AuditDb with Logging { statement.executeUpdate("CREATE INDEX metrics_timestamp_idx ON path_finding_metrics(timestamp)") statement.executeUpdate("CREATE INDEX metrics_mpp_idx ON path_finding_metrics(is_mpp)") statement.executeUpdate("CREATE INDEX metrics_name_idx ON path_finding_metrics(experiment_name)") + statement.executeUpdate("CREATE INDEX liquidity_purchases_node_id_idx ON liquidity_purchases(node_id)") statement.executeUpdate("CREATE INDEX transactions_published_channel_id_idx ON transactions_published(channel_id)") statement.executeUpdate("CREATE INDEX transactions_published_timestamp_idx ON transactions_published(timestamp)") statement.executeUpdate("CREATE INDEX transactions_confirmed_timestamp_idx ON transactions_confirmed(timestamp)") - case Some(v@(1 | 2 | 3 | 4 | 5 | 6 | 7 | 8)) => + case Some(v@(1 | 2 | 3 | 4 | 5 | 6 | 7 | 8 | 9)) => logger.warn(s"migrating db $DB_NAME, found version=$v current=$CURRENT_VERSION") if (v < 2) { migration12(statement) @@ -171,6 +179,9 @@ class SqliteAuditDb(val sqlite: Connection) extends AuditDb with Logging { if (v < 9) { migration89(statement) } + if (v < 10) { + migration910(statement) + } case Some(CURRENT_VERSION) => () // table is up-to-date, nothing to do case Some(unknownVersion) => throw new RuntimeException(s"Unknown version of DB $DB_NAME found, version=$unknownVersion") } @@ -252,6 +263,28 @@ class SqliteAuditDb(val sqlite: Connection) extends AuditDb with Logging { } } + override def add(e: ChannelLiquidityPurchased): Unit = withMetrics("audit/add-liquidity-purchase", DbBackends.Sqlite) { + using(sqlite.prepareStatement("INSERT INTO liquidity_purchases VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, NULL)")) { statement => + statement.setBytes(1, e.purchase.fundingTxId.value.toArray) + statement.setBytes(2, e.channelId.toArray) + statement.setBytes(3, e.remoteNodeId.value.toArray) + statement.setBoolean(4, e.purchase.isBuyer) + statement.setLong(5, e.purchase.amount.toLong) + statement.setLong(6, e.purchase.fees.miningFee.toLong) + statement.setLong(7, e.purchase.fees.serviceFee.toLong) + statement.setLong(8, e.purchase.fundingTxIndex) + statement.setLong(9, e.purchase.capacity.toLong) + statement.setLong(10, e.purchase.localContribution.toLong) + statement.setLong(11, e.purchase.remoteContribution.toLong) + statement.setLong(12, e.purchase.localBalance.toLong) + statement.setLong(13, e.purchase.remoteBalance.toLong) + statement.setLong(14, e.purchase.outgoingHtlcCount) + statement.setLong(15, e.purchase.incomingHtlcCount) + statement.setLong(16, TimestampMilli.now().toLong) + statement.executeUpdate() + } + } + override def add(e: TransactionPublished): Unit = withMetrics("audit/add-transaction-published", DbBackends.Sqlite) { using(sqlite.prepareStatement("INSERT OR IGNORE INTO transactions_published VALUES (?, ?, ?, ?, ?, ?)")) { statement => statement.setBytes(1, e.tx.txid.value.toArray) @@ -272,6 +305,12 @@ class SqliteAuditDb(val sqlite: Connection) extends AuditDb with Logging { statement.setLong(4, TimestampMilli.now().toLong) statement.executeUpdate() } + using(sqlite.prepareStatement("UPDATE liquidity_purchases SET confirmed_at=? WHERE node_id=? AND tx_id=?")) { statement => + statement.setLong(1, TimestampMilli.now().toLong) + statement.setBytes(2, e.remoteNodeId.value.toArray) + statement.setBytes(3, e.tx.txid.value.toArray) + statement.executeUpdate() + } } override def add(e: ChannelErrorOccurred): Unit = withMetrics("audit/add-channel-error", DbBackends.Sqlite) { @@ -432,6 +471,28 @@ class SqliteAuditDb(val sqlite: Connection) extends AuditDb with Logging { } } + override def listLiquidityPurchases(remoteNodeId: PublicKey): Seq[LiquidityPurchase] = withMetrics("audit/list-liquidity-purchases", DbBackends.Sqlite) { + using(sqlite.prepareStatement("SELECT * FROM liquidity_purchases WHERE node_id=? AND confirmed_at IS NOT NULL")) { statement => + statement.setBytes(1, remoteNodeId.value.toArray) + statement.executeQuery().map { rs => + LiquidityPurchase( + fundingTxId = TxId(rs.getByteVector32("tx_id")), + fundingTxIndex = rs.getLong("funding_tx_index"), + isBuyer = rs.getBoolean("is_buyer"), + amount = Satoshi(rs.getLong("amount_sat")), + fees = LiquidityAds.Fees(miningFee = Satoshi(rs.getLong("mining_fee_sat")), serviceFee = Satoshi(rs.getLong("service_fee_sat"))), + capacity = Satoshi(rs.getLong("capacity_sat")), + localContribution = Satoshi(rs.getLong("local_contribution_sat")), + remoteContribution = Satoshi(rs.getLong("remote_contribution_sat")), + localBalance = MilliSatoshi(rs.getLong("local_balance_msat")), + remoteBalance = MilliSatoshi(rs.getLong("remote_balance_msat")), + outgoingHtlcCount = rs.getLong("outgoing_htlc_count"), + incomingHtlcCount = rs.getLong("incoming_htlc_count") + ) + }.toSeq + } + } + override def listNetworkFees(from: TimestampMilli, to: TimestampMilli): Seq[NetworkFee] = using(sqlite.prepareStatement("SELECT * FROM transactions_confirmed INNER JOIN transactions_published ON transactions_published.tx_id = transactions_confirmed.tx_id WHERE transactions_confirmed.timestamp >= ? AND transactions_confirmed.timestamp < ? ORDER BY transactions_confirmed.timestamp")) { statement => statement.setLong(1, from.toLong) diff --git a/eclair-core/src/test/scala/fr/acinq/eclair/channel/InteractiveTxBuilderSpec.scala b/eclair-core/src/test/scala/fr/acinq/eclair/channel/InteractiveTxBuilderSpec.scala index ba5811e307..ba0f2c24e8 100644 --- a/eclair-core/src/test/scala/fr/acinq/eclair/channel/InteractiveTxBuilderSpec.scala +++ b/eclair-core/src/test/scala/fr/acinq/eclair/channel/InteractiveTxBuilderSpec.scala @@ -139,7 +139,7 @@ class InteractiveTxBuilderSpec extends TestKitBaseClass with AnyFunSuiteLike wit def spawnTxBuilderSpliceAlice(fundingParams: InteractiveTxParams, commitment: Commitment, wallet: OnChainWallet, liquidityPurchase_opt: Option[LiquidityAds.Purchase] = None): ActorRef[InteractiveTxBuilder.Command] = system.spawnAnonymous(InteractiveTxBuilder( ByteVector32.Zeroes, nodeParamsA, fundingParams, channelParamsA, - SpliceTx(commitment), + SpliceTx(commitment, CommitmentChanges.init()), 0 msat, 0 msat, liquidityPurchase_opt, wallet)) @@ -167,7 +167,7 @@ class InteractiveTxBuilderSpec extends TestKitBaseClass with AnyFunSuiteLike wit def spawnTxBuilderSpliceBob(fundingParams: InteractiveTxParams, commitment: Commitment, wallet: OnChainWallet, liquidityPurchase_opt: Option[LiquidityAds.Purchase] = None): ActorRef[InteractiveTxBuilder.Command] = system.spawnAnonymous(InteractiveTxBuilder( ByteVector32.Zeroes, nodeParamsB, fundingParams, channelParamsB, - SpliceTx(commitment), + SpliceTx(commitment, CommitmentChanges.init()), 0 msat, 0 msat, liquidityPurchase_opt, wallet)) @@ -1651,6 +1651,9 @@ class InteractiveTxBuilderSpec extends TestKitBaseClass with AnyFunSuiteLike wit walletA.publishTransaction(txA1.signedTx).pipeTo(probe.ref) probe.expectMsg(txA1.txId) + val eventListener = TestProbe() + system.eventStream.subscribe(eventListener.ref, classOf[ChannelLiquidityPurchased]) + // Alice initiates a splice that is only funded by Bob, because she is purchasing liquidity. val purchase = LiquidityAds.Purchase.Standard(50_000 sat, LiquidityAds.Fees(1000 sat, 1500 sat), LiquidityAds.PaymentDetails.FromChannelBalance) // Alice pays fees for the common fields of the transaction, by decreasing her balance in the shared output. @@ -1695,6 +1698,11 @@ class InteractiveTxBuilderSpec extends TestKitBaseClass with AnyFunSuiteLike wit assert(targetFeerate * 0.9 <= spliceTxA1.feerate && spliceTxA1.feerate <= targetFeerate * 1.25) walletA.publishTransaction(spliceTxA1.signedTx).pipeTo(probe.ref) probe.expectMsg(spliceTxA1.txId) + + val event = eventListener.expectMsgType[ChannelLiquidityPurchased] + assert(event.purchase.fees == purchase.fees) + assert(event.purchase.fundingTxIndex == 1) + assert(event.purchase.fundingTxId == spliceTxA1.txId) } } diff --git a/eclair-core/src/test/scala/fr/acinq/eclair/db/AuditDbSpec.scala b/eclair-core/src/test/scala/fr/acinq/eclair/db/AuditDbSpec.scala index d79665a0a0..ed49e9c75a 100644 --- a/eclair-core/src/test/scala/fr/acinq/eclair/db/AuditDbSpec.scala +++ b/eclair-core/src/test/scala/fr/acinq/eclair/db/AuditDbSpec.scala @@ -32,7 +32,7 @@ import fr.acinq.eclair.payment.Bolt11Invoice.ExtraHop import fr.acinq.eclair.payment._ import fr.acinq.eclair.router.Announcements import fr.acinq.eclair.transactions.Transactions.PlaceHolderPubKey -import fr.acinq.eclair.wire.protocol.Error +import fr.acinq.eclair.wire.protocol.{Error, LiquidityAds} import org.scalatest.Tag import org.scalatest.funsuite.AnyFunSuite import scodec.bits.HexStringSyntax @@ -132,6 +132,38 @@ class AuditDbSpec extends AnyFunSuite { } } + test("add/list liquidity events") { + forAllDbs { dbs => + val db = dbs.audit + val (nodeId1, nodeId2) = (randomKey().publicKey, randomKey().publicKey) + val confirmedFundingTx = Transaction(2, Nil, Seq(TxOut(150_000 sat, Script.pay2wpkh(randomKey().publicKey))), 0) + val unconfirmedFundingTx = Transaction(2, Nil, Seq(TxOut(100_000 sat, Script.pay2wpkh(randomKey().publicKey))), 0) + val e1a = ChannelLiquidityPurchased(null, randomBytes32(), nodeId1, LiquidityPurchase(confirmedFundingTx.txid, 3, isBuyer = true, 250_000 sat, LiquidityAds.Fees(2_000 sat, 3_000 sat), 750_000 sat, 50_000 sat, 300_000 sat, 400_000_000 msat, 350_000_000 msat, 7, 11)) + val e1b = ChannelLiquidityPurchased(null, randomBytes32(), nodeId1, LiquidityPurchase(confirmedFundingTx.txid, 7, isBuyer = false, 50_000 sat, LiquidityAds.Fees(300 sat, 700 sat), 500_000 sat, 50_000 sat, 0 sat, 250_000_000 msat, 250_000_000 msat, 0, 0)) + val e1c = ChannelLiquidityPurchased(null, e1b.channelId, nodeId1, LiquidityPurchase(confirmedFundingTx.txid, 0, isBuyer = false, 150_000 sat, LiquidityAds.Fees(500 sat, 1_500 sat), 250_000 sat, 150_000 sat, -100_000 sat, 200_000_000 msat, 50_000_000 msat, 47, 45)) + val e1d = ChannelLiquidityPurchased(null, randomBytes32(), nodeId1, LiquidityPurchase(unconfirmedFundingTx.txid, 22, isBuyer = true, 250_000 sat, LiquidityAds.Fees(4_000 sat, 1_000 sat), 450_000 sat, -50_000 sat, 250_000 sat, 150_000_000 msat, 300_000_000 msat, 3, 3)) + val e2a = ChannelLiquidityPurchased(null, randomBytes32(), nodeId2, LiquidityPurchase(confirmedFundingTx.txid, 453, isBuyer = false, 200_000 sat, LiquidityAds.Fees(1_000 sat, 1_000 sat), 300_000 sat, 250_000 sat, 0 sat, 270_000_000 msat, 30_000_000 msat, 113, 0)) + val e2b = ChannelLiquidityPurchased(null, randomBytes32(), nodeId2, LiquidityPurchase(unconfirmedFundingTx.txid, 1, isBuyer = false, 200_000 sat, LiquidityAds.Fees(1_000 sat, 1_000 sat), 300_000 sat, 250_000 sat, -10_000 sat, 250_000_000 msat, 50_000_000 msat, 0, 113)) + + db.add(e1a) + db.add(e1b) + db.add(e1c) + db.add(e1d) + db.add(e2a) + db.add(e2b) + + // The liquidity purchase is confirmed only once the corresponding transaction confirms. + assert(db.listLiquidityPurchases(nodeId1).isEmpty) + assert(db.listLiquidityPurchases(nodeId2).isEmpty) + + db.add(TransactionConfirmed(randomBytes32(), nodeId1, confirmedFundingTx)) + db.add(TransactionConfirmed(randomBytes32(), nodeId2, confirmedFundingTx)) + + assert(db.listLiquidityPurchases(nodeId1).toSet == Set(e1a, e1b, e1c).map(_.purchase)) + assert(db.listLiquidityPurchases(nodeId2) == Seq(e2a.purchase)) + } + } + test("stats") { forAllDbs { dbs => val db = dbs.audit