From c6a11992ccf22bcdc27aea7a8245b6b9e44f5b13 Mon Sep 17 00:00:00 2001 From: t-bast Date: Fri, 13 Sep 2024 16:27:19 +0200 Subject: [PATCH] Add liquidity purchases to the `AuditDb` Whenever liquidity is purchased, we store it in the `AuditDb`. This lets node operators gather useful statistics on their peers, and which ones are actively using the liquidity that is purchased. We store minimal information about the liquidity ads itself to be more easily compatible with potential changes in the spec. --- .../acinq/eclair/channel/ChannelEvents.scala | 15 ++- .../fr/acinq/eclair/channel/fsm/Channel.scala | 4 +- .../channel/fund/InteractiveTxBuilder.scala | 35 ++++- .../scala/fr/acinq/eclair/db/Databases.scala | 10 +- .../fr/acinq/eclair/db/DbEventHandler.scala | 8 +- .../fr/acinq/eclair/db/DualDatabases.scala | 27 +++- .../fr/acinq/eclair/db/LiquidityDb.scala | 35 +++++ .../fr/acinq/eclair/db/pg/PgLiquidityDb.scala | 121 ++++++++++++++++++ .../eclair/db/sqlite/SqliteLiquidityDb.scala | 110 ++++++++++++++++ .../scala/fr/acinq/eclair/TestDatabases.scala | 1 + .../channel/InteractiveTxBuilderSpec.scala | 12 +- .../fr/acinq/eclair/db/LiquidityDbSpec.scala | 60 +++++++++ 12 files changed, 420 insertions(+), 18 deletions(-) create mode 100644 eclair-core/src/main/scala/fr/acinq/eclair/db/LiquidityDb.scala create mode 100644 eclair-core/src/main/scala/fr/acinq/eclair/db/pg/PgLiquidityDb.scala create mode 100644 eclair-core/src/main/scala/fr/acinq/eclair/db/sqlite/SqliteLiquidityDb.scala create mode 100644 eclair-core/src/test/scala/fr/acinq/eclair/db/LiquidityDbSpec.scala diff --git a/eclair-core/src/main/scala/fr/acinq/eclair/channel/ChannelEvents.scala b/eclair-core/src/main/scala/fr/acinq/eclair/channel/ChannelEvents.scala index ff36bd5fb1..a138cfc1a3 100644 --- a/eclair-core/src/main/scala/fr/acinq/eclair/channel/ChannelEvents.scala +++ b/eclair-core/src/main/scala/fr/acinq/eclair/channel/ChannelEvents.scala @@ -18,12 +18,11 @@ package fr.acinq.eclair.channel import akka.actor.ActorRef import fr.acinq.bitcoin.scalacompat.Crypto.PublicKey -import fr.acinq.bitcoin.scalacompat.{ByteVector32, Satoshi, Transaction} +import fr.acinq.bitcoin.scalacompat.{ByteVector32, Satoshi, Transaction, TxId} import fr.acinq.eclair.blockchain.fee.FeeratePerKw import fr.acinq.eclair.channel.Helpers.Closing.ClosingType -import fr.acinq.eclair.io.Peer.OpenChannelResponse -import fr.acinq.eclair.wire.protocol.{ChannelAnnouncement, ChannelUpdate} -import fr.acinq.eclair.{BlockHeight, Features, ShortChannelId} +import fr.acinq.eclair.wire.protocol.{ChannelAnnouncement, ChannelUpdate, LiquidityAds} +import fr.acinq.eclair.{BlockHeight, Features, MilliSatoshi, ShortChannelId} /** * Created by PM on 17/08/2016. @@ -79,6 +78,14 @@ case class ChannelSignatureSent(channel: ActorRef, commitments: Commitments) ext case class ChannelSignatureReceived(channel: ActorRef, commitments: Commitments) extends ChannelEvent +case class LiquidityPurchase(fundingTxId: TxId, fundingTxIndex: Long, isBuyer: Boolean, amount: Satoshi, fees: LiquidityAds.Fees, capacity: Satoshi, localContribution: Satoshi, remoteContribution: Satoshi, localBalance: MilliSatoshi, remoteBalance: MilliSatoshi, outgoingHtlcCount: Long, incomingHtlcCount: Long) { + val previousCapacity: Satoshi = capacity - localContribution - remoteContribution + val previousLocalBalance: MilliSatoshi = if (isBuyer) localBalance - localContribution + fees.total else localBalance - localContribution - fees.total + val previousRemoteBalance: MilliSatoshi = if (isBuyer) remoteBalance - remoteContribution - fees.total else remoteBalance - remoteContribution + fees.total +} + +case class ChannelLiquidityPurchased(channel: ActorRef, channelId: ByteVector32, remoteNodeId: PublicKey, purchase: LiquidityPurchase) extends ChannelEvent + case class ChannelErrorOccurred(channel: ActorRef, channelId: ByteVector32, remoteNodeId: PublicKey, error: ChannelError, isFatal: Boolean) extends ChannelEvent // NB: the fee should be set to 0 when we're not paying it. diff --git a/eclair-core/src/main/scala/fr/acinq/eclair/channel/fsm/Channel.scala b/eclair-core/src/main/scala/fr/acinq/eclair/channel/fsm/Channel.scala index f6abe66821..ca6c7448e0 100644 --- a/eclair-core/src/main/scala/fr/acinq/eclair/channel/fsm/Channel.scala +++ b/eclair-core/src/main/scala/fr/acinq/eclair/channel/fsm/Channel.scala @@ -982,7 +982,7 @@ class Channel(val nodeParams: NodeParams, val wallet: OnChainChannelFunder with sessionId, nodeParams, fundingParams, channelParams = d.commitments.params, - purpose = InteractiveTxBuilder.SpliceTx(parentCommitment), + purpose = InteractiveTxBuilder.SpliceTx(parentCommitment, d.commitments.changes), localPushAmount = spliceAck.pushAmount, remotePushAmount = msg.pushAmount, liquidityPurchase_opt = willFund_opt.map(_.purchase), wallet @@ -1029,7 +1029,7 @@ class Channel(val nodeParams: NodeParams, val wallet: OnChainChannelFunder with sessionId, nodeParams, fundingParams, channelParams = d.commitments.params, - purpose = InteractiveTxBuilder.SpliceTx(parentCommitment), + purpose = InteractiveTxBuilder.SpliceTx(parentCommitment, d.commitments.changes), localPushAmount = cmd.pushAmount, remotePushAmount = msg.pushAmount, liquidityPurchase_opt = liquidityPurchase_opt, wallet diff --git a/eclair-core/src/main/scala/fr/acinq/eclair/channel/fund/InteractiveTxBuilder.scala b/eclair-core/src/main/scala/fr/acinq/eclair/channel/fund/InteractiveTxBuilder.scala index 60795b02f4..4eb1c7d1b4 100644 --- a/eclair-core/src/main/scala/fr/acinq/eclair/channel/fund/InteractiveTxBuilder.scala +++ b/eclair-core/src/main/scala/fr/acinq/eclair/channel/fund/InteractiveTxBuilder.scala @@ -16,6 +16,8 @@ package fr.acinq.eclair.channel.fund +import akka.actor.typed.eventstream.EventStream +import akka.actor.typed.scaladsl.adapter.TypedActorRefOps import akka.actor.typed.scaladsl.{ActorContext, Behaviors, StashBuffer} import akka.actor.typed.{ActorRef, Behavior} import akka.event.LoggingAdapter @@ -163,6 +165,8 @@ object InteractiveTxBuilder { def previousFundingAmount: Satoshi def localCommitIndex: Long def remoteCommitIndex: Long + def localNextHtlcId: Long + def remoteNextHtlcId: Long def remotePerCommitmentPoint: PublicKey def commitTxFeerate: FeeratePerKw def fundingTxIndex: Long @@ -175,15 +179,19 @@ object InteractiveTxBuilder { override val previousFundingAmount: Satoshi = 0 sat override val localCommitIndex: Long = 0 override val remoteCommitIndex: Long = 0 + override val localNextHtlcId: Long = 0 + override val remoteNextHtlcId: Long = 0 override val fundingTxIndex: Long = 0 override val localHtlcs: Set[DirectedHtlc] = Set.empty } - case class SpliceTx(parentCommitment: Commitment) extends Purpose { + case class SpliceTx(parentCommitment: Commitment, changes: CommitmentChanges) extends Purpose { override val previousLocalBalance: MilliSatoshi = parentCommitment.localCommit.spec.toLocal override val previousRemoteBalance: MilliSatoshi = parentCommitment.remoteCommit.spec.toLocal override val previousFundingAmount: Satoshi = parentCommitment.capacity override val localCommitIndex: Long = parentCommitment.localCommit.index override val remoteCommitIndex: Long = parentCommitment.remoteCommit.index + override val localNextHtlcId: Long = changes.localNextHtlcId + override val remoteNextHtlcId: Long = changes.remoteNextHtlcId override val remotePerCommitmentPoint: PublicKey = parentCommitment.remoteCommit.remotePerCommitmentPoint override val commitTxFeerate: FeeratePerKw = parentCommitment.localCommit.spec.commitTxFeerate override val fundingTxIndex: Long = parentCommitment.fundingTxIndex + 1 @@ -199,6 +207,8 @@ object InteractiveTxBuilder { override val previousFundingAmount: Satoshi = (previousLocalBalance + previousRemoteBalance).truncateToSatoshi override val localCommitIndex: Long = replacedCommitment.localCommit.index override val remoteCommitIndex: Long = replacedCommitment.remoteCommit.index + override val localNextHtlcId: Long = 0 + override val remoteNextHtlcId: Long = 0 override val remotePerCommitmentPoint: PublicKey = replacedCommitment.remoteCommit.remotePerCommitmentPoint override val commitTxFeerate: FeeratePerKw = replacedCommitment.localCommit.spec.commitTxFeerate override val fundingTxIndex: Long = replacedCommitment.fundingTxIndex @@ -792,6 +802,29 @@ private class InteractiveTxBuilder(replyTo: ActorRef[InteractiveTxBuilder.Respon Behaviors.receiveMessagePartial { case SignTransactionResult(signedTx) => log.info(s"interactive-tx txid=${signedTx.txId} partially signed with {} local inputs, {} remote inputs, {} local outputs and {} remote outputs", signedTx.tx.localInputs.length, signedTx.tx.remoteInputs.length, signedTx.tx.localOutputs.length, signedTx.tx.remoteOutputs.length) + // At this point, we're not completely sure that the transaction will succeed: if our peer doesn't send their + // commit_sig, the transaction will be aborted. But it's a best effort, because after sending our commit_sig, + // we won't store details about the liquidity purchase so we'll be unable to emit that event later. Even after + // fully signing the transaction, it may be double-spent by a force-close, which would invalidate it as well. + // The right solution is to check confirmations on the funding transaction before considering that a liquidity + // purchase is completed, which is what we do in our AuditDb. + liquidityPurchase_opt.foreach { p => + val purchase = LiquidityPurchase( + fundingTxId = signedTx.txId, + fundingTxIndex = purpose.fundingTxIndex, + isBuyer = fundingParams.isInitiator, + amount = p.amount, + fees = p.fees, + capacity = fundingParams.fundingAmount, + localContribution = fundingParams.localContribution, + remoteContribution = fundingParams.remoteContribution, + localBalance = localCommit.spec.toLocal, + remoteBalance = localCommit.spec.toRemote, + outgoingHtlcCount = purpose.localNextHtlcId, + incomingHtlcCount = purpose.remoteNextHtlcId, + ) + context.system.eventStream ! EventStream.Publish(ChannelLiquidityPurchased(replyTo.toClassic, channelParams.channelId, remoteNodeId, purchase)) + } replyTo ! Succeeded(InteractiveTxSigningSession.WaitingForSigs(fundingParams, purpose.fundingTxIndex, signedTx, Left(localCommit), remoteCommit), commitSig) Behaviors.stopped case WalletFailure(t) => diff --git a/eclair-core/src/main/scala/fr/acinq/eclair/db/Databases.scala b/eclair-core/src/main/scala/fr/acinq/eclair/db/Databases.scala index 9713cfbf1b..bf0007a15f 100644 --- a/eclair-core/src/main/scala/fr/acinq/eclair/db/Databases.scala +++ b/eclair-core/src/main/scala/fr/acinq/eclair/db/Databases.scala @@ -44,6 +44,7 @@ trait Databases { def peers: PeersDb def payments: PaymentsDb def pendingCommands: PendingCommandsDb + def liquidity: LiquidityDb //@formatter:on } @@ -60,6 +61,7 @@ object Databases extends Logging { } case class SqliteDatabases private(network: SqliteNetworkDb, + liquidity: SqliteLiquidityDb, audit: SqliteAuditDb, channels: SqliteChannelsDb, peers: SqlitePeersDb, @@ -78,6 +80,7 @@ object Databases extends Logging { jdbcUrlFile_opt.foreach(checkIfDatabaseUrlIsUnchanged("sqlite", _)) SqliteDatabases( network = new SqliteNetworkDb(networkJdbc), + liquidity = new SqliteLiquidityDb(eclairJdbc), audit = new SqliteAuditDb(auditJdbc), channels = new SqliteChannelsDb(eclairJdbc), peers = new SqlitePeersDb(eclairJdbc), @@ -89,6 +92,7 @@ object Databases extends Logging { } case class PostgresDatabases private(network: PgNetworkDb, + liquidity: PgLiquidityDb, audit: PgAuditDb, channels: PgChannelsDb, peers: PgPeersDb, @@ -106,8 +110,7 @@ object Databases extends Logging { auditRelayedMaxAge: FiniteDuration, localChannelsMinCount: Int, networkNodesMinCount: Int, - networkChannelsMinCount: Int - ) + networkChannelsMinCount: Int) def apply(hikariConfig: HikariConfig, instanceId: UUID, @@ -149,6 +152,7 @@ object Databases extends Logging { val databases = PostgresDatabases( network = new PgNetworkDb, + liquidity = new PgLiquidityDb, audit = new PgAuditDb, channels = new PgChannelsDb, peers = new PgPeersDb, @@ -160,7 +164,7 @@ object Databases extends Logging { readOnlyUser_opt.foreach { readOnlyUser => PgUtils.inTransaction { connection => using(connection.createStatement()) { statement => - val schemas = "public" :: "audit" :: "local" :: "network" :: "payments" :: Nil + val schemas = "public" :: "audit" :: "local" :: "network" :: "payments" :: "liquidity" :: Nil schemas.foreach { schema => logger.info(s"granting read-only access to user=$readOnlyUser schema=$schema") statement.executeUpdate(s"GRANT USAGE ON SCHEMA $schema TO $readOnlyUser") diff --git a/eclair-core/src/main/scala/fr/acinq/eclair/db/DbEventHandler.scala b/eclair-core/src/main/scala/fr/acinq/eclair/db/DbEventHandler.scala index 0a66c56b43..f74918980a 100644 --- a/eclair-core/src/main/scala/fr/acinq/eclair/db/DbEventHandler.scala +++ b/eclair-core/src/main/scala/fr/acinq/eclair/db/DbEventHandler.scala @@ -38,6 +38,7 @@ class DbEventHandler(nodeParams: NodeParams) extends Actor with DiagnosticActorL private val auditDb: AuditDb = nodeParams.db.audit private val channelsDb: ChannelsDb = nodeParams.db.channels + private val liquidityDb: LiquidityDb = nodeParams.db.liquidity context.spawn(Behaviors.supervise(RevokedHtlcInfoCleaner(channelsDb, nodeParams.revokedHtlcInfoCleanerConfig)).onFailure(SupervisorStrategy.restart), name = "revoked-htlc-info-cleaner") @@ -45,6 +46,7 @@ class DbEventHandler(nodeParams: NodeParams) extends Actor with DiagnosticActorL context.system.eventStream.subscribe(self, classOf[PaymentFailed]) context.system.eventStream.subscribe(self, classOf[PaymentReceived]) context.system.eventStream.subscribe(self, classOf[PaymentRelayed]) + context.system.eventStream.subscribe(self, classOf[ChannelLiquidityPurchased]) context.system.eventStream.subscribe(self, classOf[TransactionPublished]) context.system.eventStream.subscribe(self, classOf[TransactionConfirmed]) context.system.eventStream.subscribe(self, classOf[ChannelErrorOccurred]) @@ -92,11 +94,15 @@ class DbEventHandler(nodeParams: NodeParams) extends Actor with DiagnosticActorL } auditDb.add(e) + case e: ChannelLiquidityPurchased => liquidityDb.addPurchase(e) + case e: TransactionPublished => log.info(s"paying mining fee=${e.miningFee} for txid=${e.tx.txid} desc=${e.desc}") auditDb.add(e) - case e: TransactionConfirmed => auditDb.add(e) + case e: TransactionConfirmed => + liquidityDb.setConfirmed(e.remoteNodeId, e.tx.txid) + auditDb.add(e) case e: ChannelErrorOccurred => // first pattern matching level is to ignore some errors, second level is to separate between different kind of errors diff --git a/eclair-core/src/main/scala/fr/acinq/eclair/db/DualDatabases.scala b/eclair-core/src/main/scala/fr/acinq/eclair/db/DualDatabases.scala index c2e04179d4..755430bf00 100644 --- a/eclair-core/src/main/scala/fr/acinq/eclair/db/DualDatabases.scala +++ b/eclair-core/src/main/scala/fr/acinq/eclair/db/DualDatabases.scala @@ -30,16 +30,12 @@ import scala.util.{Failure, Success, Try} case class DualDatabases(primary: Databases, secondary: Databases) extends Databases with FileBackup { override val network: NetworkDb = DualNetworkDb(primary.network, secondary.network) - override val audit: AuditDb = DualAuditDb(primary.audit, secondary.audit) - override val channels: ChannelsDb = DualChannelsDb(primary.channels, secondary.channels) - override val peers: PeersDb = DualPeersDb(primary.peers, secondary.peers) - override val payments: PaymentsDb = DualPaymentsDb(primary.payments, secondary.payments) - override val pendingCommands: PendingCommandsDb = DualPendingCommandsDb(primary.pendingCommands, secondary.pendingCommands) + override val liquidity: LiquidityDb = DualLiquidityDb(primary.liquidity, secondary.liquidity) /** if one of the database supports file backup, we use it */ override def backup(backupFile: File): Unit = (primary, secondary) match { @@ -411,3 +407,24 @@ case class DualPendingCommandsDb(primary: PendingCommandsDb, secondary: PendingC primary.listSettlementCommands() } } + +case class DualLiquidityDb(primary: LiquidityDb, secondary: LiquidityDb) extends LiquidityDb { + + private implicit val ec: ExecutionContext = ExecutionContext.fromExecutor(Executors.newSingleThreadExecutor(new ThreadFactoryBuilder().setNameFormat("db-liquidity").build())) + + override def addPurchase(liquidityPurchase: ChannelLiquidityPurchased): Unit = { + runAsync(secondary.addPurchase(liquidityPurchase)) + primary.addPurchase(liquidityPurchase) + } + + override def setConfirmed(remoteNodeId: PublicKey, txId: TxId): Unit = { + runAsync(secondary.setConfirmed(remoteNodeId, txId)) + primary.setConfirmed(remoteNodeId, txId) + } + + override def listPurchases(remoteNodeId: PublicKey): Seq[LiquidityPurchase] = { + runAsync(secondary.listPurchases(remoteNodeId)) + primary.listPurchases(remoteNodeId) + } + +} \ No newline at end of file diff --git a/eclair-core/src/main/scala/fr/acinq/eclair/db/LiquidityDb.scala b/eclair-core/src/main/scala/fr/acinq/eclair/db/LiquidityDb.scala new file mode 100644 index 0000000000..e156b5121e --- /dev/null +++ b/eclair-core/src/main/scala/fr/acinq/eclair/db/LiquidityDb.scala @@ -0,0 +1,35 @@ +/* + * Copyright 2024 ACINQ SAS + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package fr.acinq.eclair.db + +import fr.acinq.bitcoin.scalacompat.Crypto.PublicKey +import fr.acinq.bitcoin.scalacompat.TxId +import fr.acinq.eclair.channel.{ChannelLiquidityPurchased, LiquidityPurchase} + +/** + * Created by t-bast on 13/09/2024. + */ + +trait LiquidityDb { + + def addPurchase(liquidityPurchase: ChannelLiquidityPurchased): Unit + + def setConfirmed(remoteNodeId: PublicKey, txId: TxId): Unit + + def listPurchases(remoteNodeId: PublicKey): Seq[LiquidityPurchase] + +} diff --git a/eclair-core/src/main/scala/fr/acinq/eclair/db/pg/PgLiquidityDb.scala b/eclair-core/src/main/scala/fr/acinq/eclair/db/pg/PgLiquidityDb.scala new file mode 100644 index 0000000000..37e742d354 --- /dev/null +++ b/eclair-core/src/main/scala/fr/acinq/eclair/db/pg/PgLiquidityDb.scala @@ -0,0 +1,121 @@ +/* + * Copyright 2024 ACINQ SAS + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package fr.acinq.eclair.db.pg + +import fr.acinq.bitcoin.scalacompat.Crypto.PublicKey +import fr.acinq.bitcoin.scalacompat.{Satoshi, TxId} +import fr.acinq.eclair.MilliSatoshi +import fr.acinq.eclair.channel.{ChannelLiquidityPurchased, LiquidityPurchase} +import fr.acinq.eclair.db.LiquidityDb +import fr.acinq.eclair.db.Monitoring.Metrics.withMetrics +import fr.acinq.eclair.db.Monitoring.Tags.DbBackends +import fr.acinq.eclair.wire.protocol.LiquidityAds +import grizzled.slf4j.Logging + +import java.sql.Timestamp +import java.time.Instant +import javax.sql.DataSource + +/** + * Created by t-bast on 13/09/2024. + */ + +object PgLiquidityDb { + val DB_NAME = "liquidity" + val CURRENT_VERSION = 1 +} + +class PgLiquidityDb(implicit ds: DataSource) extends LiquidityDb with Logging { + + import PgUtils._ + import ExtendedResultSet._ + import PgLiquidityDb._ + + inTransaction { pg => + using(pg.createStatement()) { statement => + getVersion(statement, DB_NAME) match { + case None => + statement.executeUpdate("CREATE SCHEMA liquidity") + statement.executeUpdate("CREATE TABLE liquidity.purchases (tx_id TEXT NOT NULL, channel_id TEXT NOT NULL, node_id TEXT NOT NULL, is_buyer BOOLEAN NOT NULL, amount_sat BIGINT NOT NULL, mining_fee_sat BIGINT NOT NULL, service_fee_sat BIGINT NOT NULL, funding_tx_index BIGINT NOT NULL, capacity_sat BIGINT NOT NULL, local_contribution_sat BIGINT NOT NULL, remote_contribution_sat BIGINT NOT NULL, local_balance_msat BIGINT NOT NULL, remote_balance_msat BIGINT NOT NULL, outgoing_htlc_count BIGINT NOT NULL, incoming_htlc_count BIGINT NOT NULL, created_at TIMESTAMP WITH TIME ZONE NOT NULL, confirmed_at TIMESTAMP WITH TIME ZONE)") + statement.executeUpdate("CREATE INDEX liquidity_purchases_node_id_idx ON liquidity.purchases(node_id)") + case Some(CURRENT_VERSION) => () // table is up-to-date, nothing to do + case Some(unknownVersion) => throw new RuntimeException(s"Unknown version of DB $DB_NAME found, version=$unknownVersion") + } + setVersion(statement, DB_NAME, CURRENT_VERSION) + } + } + + override def addPurchase(e: ChannelLiquidityPurchased): Unit = withMetrics("liquidity/add-purchase", DbBackends.Postgres) { + inTransaction { pg => + using(pg.prepareStatement("INSERT INTO liquidity.purchases VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, NULL)")) { statement => + statement.setString(1, e.purchase.fundingTxId.value.toHex) + statement.setString(2, e.channelId.toHex) + statement.setString(3, e.remoteNodeId.toHex) + statement.setBoolean(4, e.purchase.isBuyer) + statement.setLong(5, e.purchase.amount.toLong) + statement.setLong(6, e.purchase.fees.miningFee.toLong) + statement.setLong(7, e.purchase.fees.serviceFee.toLong) + statement.setLong(8, e.purchase.fundingTxIndex) + statement.setLong(9, e.purchase.capacity.toLong) + statement.setLong(10, e.purchase.localContribution.toLong) + statement.setLong(11, e.purchase.remoteContribution.toLong) + statement.setLong(12, e.purchase.localBalance.toLong) + statement.setLong(13, e.purchase.remoteBalance.toLong) + statement.setLong(14, e.purchase.outgoingHtlcCount) + statement.setLong(15, e.purchase.incomingHtlcCount) + statement.setTimestamp(16, Timestamp.from(Instant.now())) + statement.executeUpdate() + } + } + } + + override def setConfirmed(remoteNodeId: PublicKey, txId: TxId): Unit = withMetrics("liquidity/set-confirmed", DbBackends.Postgres) { + inTransaction { pg => + using(pg.prepareStatement("UPDATE liquidity.purchases SET confirmed_at=? WHERE node_id=? AND tx_id=?")) { statement => + statement.setTimestamp(1, Timestamp.from(Instant.now())) + statement.setString(2, remoteNodeId.toHex) + statement.setString(3, txId.value.toHex) + statement.executeUpdate() + } + } + } + + override def listPurchases(remoteNodeId: PublicKey): Seq[LiquidityPurchase] = withMetrics("liquidity/list-purchases", DbBackends.Postgres) { + inTransaction { pg => + using(pg.prepareStatement("SELECT * FROM liquidity.purchases WHERE node_id=? AND confirmed_at IS NOT NULL")) { statement => + statement.setString(1, remoteNodeId.toHex) + statement.executeQuery().map { rs => + LiquidityPurchase( + fundingTxId = TxId(rs.getByteVector32FromHex("tx_id")), + fundingTxIndex = rs.getLong("funding_tx_index"), + isBuyer = rs.getBoolean("is_buyer"), + amount = Satoshi(rs.getLong("amount_sat")), + fees = LiquidityAds.Fees(miningFee = Satoshi(rs.getLong("mining_fee_sat")), serviceFee = Satoshi(rs.getLong("service_fee_sat"))), + capacity = Satoshi(rs.getLong("capacity_sat")), + localContribution = Satoshi(rs.getLong("local_contribution_sat")), + remoteContribution = Satoshi(rs.getLong("remote_contribution_sat")), + localBalance = MilliSatoshi(rs.getLong("local_balance_msat")), + remoteBalance = MilliSatoshi(rs.getLong("remote_balance_msat")), + outgoingHtlcCount = rs.getLong("outgoing_htlc_count"), + incomingHtlcCount = rs.getLong("incoming_htlc_count") + ) + }.toSeq + } + } + } + +} diff --git a/eclair-core/src/main/scala/fr/acinq/eclair/db/sqlite/SqliteLiquidityDb.scala b/eclair-core/src/main/scala/fr/acinq/eclair/db/sqlite/SqliteLiquidityDb.scala new file mode 100644 index 0000000000..0fb51de127 --- /dev/null +++ b/eclair-core/src/main/scala/fr/acinq/eclair/db/sqlite/SqliteLiquidityDb.scala @@ -0,0 +1,110 @@ +/* + * Copyright 2024 ACINQ SAS + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package fr.acinq.eclair.db.sqlite + +import fr.acinq.bitcoin.scalacompat.Crypto.PublicKey +import fr.acinq.bitcoin.scalacompat.{Satoshi, TxId} +import fr.acinq.eclair.channel.{ChannelLiquidityPurchased, LiquidityPurchase} +import fr.acinq.eclair.db.LiquidityDb +import fr.acinq.eclair.db.Monitoring.Metrics.withMetrics +import fr.acinq.eclair.db.Monitoring.Tags.DbBackends +import fr.acinq.eclair.wire.protocol.LiquidityAds +import fr.acinq.eclair.{MilliSatoshi, TimestampMilli} +import grizzled.slf4j.Logging + +import java.sql.Connection + +/** + * Created by t-bast on 13/09/2024. + */ + +object SqliteLiquidityDb { + val DB_NAME = "liquidity" + val CURRENT_VERSION = 1 +} + +class SqliteLiquidityDb(val sqlite: Connection) extends LiquidityDb with Logging { + + import SqliteUtils._ + import ExtendedResultSet._ + import SqliteLiquidityDb._ + + using(sqlite.createStatement(), inTransaction = true) { statement => + getVersion(statement, DB_NAME) match { + case None => + statement.executeUpdate("CREATE TABLE liquidity_purchases (tx_id BLOB NOT NULL, channel_id BLOB NOT NULL, node_id BLOB NOT NULL, is_buyer BOOLEAN NOT NULL, amount_sat INTEGER NOT NULL, mining_fee_sat INTEGER NOT NULL, service_fee_sat INTEGER NOT NULL, funding_tx_index INTEGER NOT NULL, capacity_sat INTEGER NOT NULL, local_contribution_sat INTEGER NOT NULL, remote_contribution_sat INTEGER NOT NULL, local_balance_msat INTEGER NOT NULL, remote_balance_msat INTEGER NOT NULL, outgoing_htlc_count INTEGER NOT NULL, incoming_htlc_count INTEGER NOT NULL, created_at INTEGER NOT NULL, confirmed_at INTEGER)") + statement.executeUpdate("CREATE INDEX liquidity_purchases_node_id_idx ON liquidity_purchases(node_id)") + case Some(CURRENT_VERSION) => () // table is up-to-date, nothing to do + case Some(unknownVersion) => throw new RuntimeException(s"Unknown version of DB $DB_NAME found, version=$unknownVersion") + } + setVersion(statement, DB_NAME, CURRENT_VERSION) + } + + override def addPurchase(e: ChannelLiquidityPurchased): Unit = withMetrics("liquidity/add-purchase", DbBackends.Sqlite) { + using(sqlite.prepareStatement("INSERT INTO liquidity_purchases VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, NULL)")) { statement => + statement.setBytes(1, e.purchase.fundingTxId.value.toArray) + statement.setBytes(2, e.channelId.toArray) + statement.setBytes(3, e.remoteNodeId.value.toArray) + statement.setBoolean(4, e.purchase.isBuyer) + statement.setLong(5, e.purchase.amount.toLong) + statement.setLong(6, e.purchase.fees.miningFee.toLong) + statement.setLong(7, e.purchase.fees.serviceFee.toLong) + statement.setLong(8, e.purchase.fundingTxIndex) + statement.setLong(9, e.purchase.capacity.toLong) + statement.setLong(10, e.purchase.localContribution.toLong) + statement.setLong(11, e.purchase.remoteContribution.toLong) + statement.setLong(12, e.purchase.localBalance.toLong) + statement.setLong(13, e.purchase.remoteBalance.toLong) + statement.setLong(14, e.purchase.outgoingHtlcCount) + statement.setLong(15, e.purchase.incomingHtlcCount) + statement.setLong(16, TimestampMilli.now().toLong) + statement.executeUpdate() + } + } + + override def setConfirmed(remoteNodeId: PublicKey, txId: TxId): Unit = withMetrics("liquidity/set-confirmed", DbBackends.Sqlite) { + using(sqlite.prepareStatement("UPDATE liquidity_purchases SET confirmed_at=? WHERE node_id=? AND tx_id=?")) { statement => + statement.setLong(1, TimestampMilli.now().toLong) + statement.setBytes(2, remoteNodeId.value.toArray) + statement.setBytes(3, txId.value.toArray) + statement.executeUpdate() + } + } + + override def listPurchases(remoteNodeId: PublicKey): Seq[LiquidityPurchase] = withMetrics("liquidity/list-purchases", DbBackends.Sqlite) { + using(sqlite.prepareStatement("SELECT * FROM liquidity_purchases WHERE node_id=? AND confirmed_at IS NOT NULL")) { statement => + statement.setBytes(1, remoteNodeId.value.toArray) + statement.executeQuery().map { rs => + LiquidityPurchase( + fundingTxId = TxId(rs.getByteVector32("tx_id")), + fundingTxIndex = rs.getLong("funding_tx_index"), + isBuyer = rs.getBoolean("is_buyer"), + amount = Satoshi(rs.getLong("amount_sat")), + fees = LiquidityAds.Fees(miningFee = Satoshi(rs.getLong("mining_fee_sat")), serviceFee = Satoshi(rs.getLong("service_fee_sat"))), + capacity = Satoshi(rs.getLong("capacity_sat")), + localContribution = Satoshi(rs.getLong("local_contribution_sat")), + remoteContribution = Satoshi(rs.getLong("remote_contribution_sat")), + localBalance = MilliSatoshi(rs.getLong("local_balance_msat")), + remoteBalance = MilliSatoshi(rs.getLong("remote_balance_msat")), + outgoingHtlcCount = rs.getLong("outgoing_htlc_count"), + incomingHtlcCount = rs.getLong("incoming_htlc_count") + ) + }.toSeq + } + } + +} diff --git a/eclair-core/src/test/scala/fr/acinq/eclair/TestDatabases.scala b/eclair-core/src/test/scala/fr/acinq/eclair/TestDatabases.scala index 2b25448ce4..304afa9b48 100644 --- a/eclair-core/src/test/scala/fr/acinq/eclair/TestDatabases.scala +++ b/eclair-core/src/test/scala/fr/acinq/eclair/TestDatabases.scala @@ -34,6 +34,7 @@ sealed trait TestDatabases extends Databases { override def peers: PeersDb = db.peers override def payments: PaymentsDb = db.payments override def pendingCommands: PendingCommandsDb = db.pendingCommands + override def liquidity: LiquidityDb = db.liquidity def close(): Unit // @formatter:on } diff --git a/eclair-core/src/test/scala/fr/acinq/eclair/channel/InteractiveTxBuilderSpec.scala b/eclair-core/src/test/scala/fr/acinq/eclair/channel/InteractiveTxBuilderSpec.scala index ba5811e307..ba0f2c24e8 100644 --- a/eclair-core/src/test/scala/fr/acinq/eclair/channel/InteractiveTxBuilderSpec.scala +++ b/eclair-core/src/test/scala/fr/acinq/eclair/channel/InteractiveTxBuilderSpec.scala @@ -139,7 +139,7 @@ class InteractiveTxBuilderSpec extends TestKitBaseClass with AnyFunSuiteLike wit def spawnTxBuilderSpliceAlice(fundingParams: InteractiveTxParams, commitment: Commitment, wallet: OnChainWallet, liquidityPurchase_opt: Option[LiquidityAds.Purchase] = None): ActorRef[InteractiveTxBuilder.Command] = system.spawnAnonymous(InteractiveTxBuilder( ByteVector32.Zeroes, nodeParamsA, fundingParams, channelParamsA, - SpliceTx(commitment), + SpliceTx(commitment, CommitmentChanges.init()), 0 msat, 0 msat, liquidityPurchase_opt, wallet)) @@ -167,7 +167,7 @@ class InteractiveTxBuilderSpec extends TestKitBaseClass with AnyFunSuiteLike wit def spawnTxBuilderSpliceBob(fundingParams: InteractiveTxParams, commitment: Commitment, wallet: OnChainWallet, liquidityPurchase_opt: Option[LiquidityAds.Purchase] = None): ActorRef[InteractiveTxBuilder.Command] = system.spawnAnonymous(InteractiveTxBuilder( ByteVector32.Zeroes, nodeParamsB, fundingParams, channelParamsB, - SpliceTx(commitment), + SpliceTx(commitment, CommitmentChanges.init()), 0 msat, 0 msat, liquidityPurchase_opt, wallet)) @@ -1651,6 +1651,9 @@ class InteractiveTxBuilderSpec extends TestKitBaseClass with AnyFunSuiteLike wit walletA.publishTransaction(txA1.signedTx).pipeTo(probe.ref) probe.expectMsg(txA1.txId) + val eventListener = TestProbe() + system.eventStream.subscribe(eventListener.ref, classOf[ChannelLiquidityPurchased]) + // Alice initiates a splice that is only funded by Bob, because she is purchasing liquidity. val purchase = LiquidityAds.Purchase.Standard(50_000 sat, LiquidityAds.Fees(1000 sat, 1500 sat), LiquidityAds.PaymentDetails.FromChannelBalance) // Alice pays fees for the common fields of the transaction, by decreasing her balance in the shared output. @@ -1695,6 +1698,11 @@ class InteractiveTxBuilderSpec extends TestKitBaseClass with AnyFunSuiteLike wit assert(targetFeerate * 0.9 <= spliceTxA1.feerate && spliceTxA1.feerate <= targetFeerate * 1.25) walletA.publishTransaction(spliceTxA1.signedTx).pipeTo(probe.ref) probe.expectMsg(spliceTxA1.txId) + + val event = eventListener.expectMsgType[ChannelLiquidityPurchased] + assert(event.purchase.fees == purchase.fees) + assert(event.purchase.fundingTxIndex == 1) + assert(event.purchase.fundingTxId == spliceTxA1.txId) } } diff --git a/eclair-core/src/test/scala/fr/acinq/eclair/db/LiquidityDbSpec.scala b/eclair-core/src/test/scala/fr/acinq/eclair/db/LiquidityDbSpec.scala new file mode 100644 index 0000000000..a9b7a3a604 --- /dev/null +++ b/eclair-core/src/test/scala/fr/acinq/eclair/db/LiquidityDbSpec.scala @@ -0,0 +1,60 @@ +/* + * Copyright 2024 ACINQ SAS + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package fr.acinq.eclair.db + +import fr.acinq.bitcoin.scalacompat.{SatoshiLong, TxId} +import fr.acinq.eclair.TestDatabases.forAllDbs +import fr.acinq.eclair.channel.{ChannelLiquidityPurchased, LiquidityPurchase} +import fr.acinq.eclair.wire.protocol.LiquidityAds +import fr.acinq.eclair.{MilliSatoshiLong, randomBytes32, randomKey} +import org.scalatest.funsuite.AnyFunSuite + +class LiquidityDbSpec extends AnyFunSuite { + + test("add/list liquidity purchases") { + forAllDbs { dbs => + val db = dbs.liquidity + val (nodeId1, nodeId2) = (randomKey().publicKey, randomKey().publicKey) + val confirmedFundingTxId = TxId(randomBytes32()) + val unconfirmedFundingTxId = TxId(randomBytes32()) + val e1a = ChannelLiquidityPurchased(null, randomBytes32(), nodeId1, LiquidityPurchase(confirmedFundingTxId, 3, isBuyer = true, 250_000 sat, LiquidityAds.Fees(2_000 sat, 3_000 sat), 750_000 sat, 50_000 sat, 300_000 sat, 400_000_000 msat, 350_000_000 msat, 7, 11)) + val e1b = ChannelLiquidityPurchased(null, randomBytes32(), nodeId1, LiquidityPurchase(confirmedFundingTxId, 7, isBuyer = false, 50_000 sat, LiquidityAds.Fees(300 sat, 700 sat), 500_000 sat, 50_000 sat, 0 sat, 250_000_000 msat, 250_000_000 msat, 0, 0)) + val e1c = ChannelLiquidityPurchased(null, e1b.channelId, nodeId1, LiquidityPurchase(confirmedFundingTxId, 0, isBuyer = false, 150_000 sat, LiquidityAds.Fees(500 sat, 1_500 sat), 250_000 sat, 150_000 sat, -100_000 sat, 200_000_000 msat, 50_000_000 msat, 47, 45)) + val e1d = ChannelLiquidityPurchased(null, randomBytes32(), nodeId1, LiquidityPurchase(unconfirmedFundingTxId, 22, isBuyer = true, 250_000 sat, LiquidityAds.Fees(4_000 sat, 1_000 sat), 450_000 sat, -50_000 sat, 250_000 sat, 150_000_000 msat, 300_000_000 msat, 3, 3)) + val e2a = ChannelLiquidityPurchased(null, randomBytes32(), nodeId2, LiquidityPurchase(confirmedFundingTxId, 453, isBuyer = false, 200_000 sat, LiquidityAds.Fees(1_000 sat, 1_000 sat), 300_000 sat, 250_000 sat, 0 sat, 270_000_000 msat, 30_000_000 msat, 113, 0)) + val e2b = ChannelLiquidityPurchased(null, randomBytes32(), nodeId2, LiquidityPurchase(unconfirmedFundingTxId, 1, isBuyer = false, 200_000 sat, LiquidityAds.Fees(1_000 sat, 1_000 sat), 300_000 sat, 250_000 sat, -10_000 sat, 250_000_000 msat, 50_000_000 msat, 0, 113)) + + db.addPurchase(e1a) + db.addPurchase(e1b) + db.addPurchase(e1c) + db.addPurchase(e1d) + db.addPurchase(e2a) + db.addPurchase(e2b) + + // The liquidity purchase is confirmed only once the corresponding transaction confirms. + assert(db.listPurchases(nodeId1).isEmpty) + assert(db.listPurchases(nodeId2).isEmpty) + + db.setConfirmed(nodeId1, confirmedFundingTxId) + db.setConfirmed(nodeId2, confirmedFundingTxId) + + assert(db.listPurchases(nodeId1).toSet == Set(e1a, e1b, e1c).map(_.purchase)) + assert(db.listPurchases(nodeId2) == Seq(e2a.purchase)) + } + } + +}