diff --git a/eclair-core/src/main/scala/fr/acinq/eclair/channel/ChannelEvents.scala b/eclair-core/src/main/scala/fr/acinq/eclair/channel/ChannelEvents.scala index ff36bd5fb1..a138cfc1a3 100644 --- a/eclair-core/src/main/scala/fr/acinq/eclair/channel/ChannelEvents.scala +++ b/eclair-core/src/main/scala/fr/acinq/eclair/channel/ChannelEvents.scala @@ -18,12 +18,11 @@ package fr.acinq.eclair.channel import akka.actor.ActorRef import fr.acinq.bitcoin.scalacompat.Crypto.PublicKey -import fr.acinq.bitcoin.scalacompat.{ByteVector32, Satoshi, Transaction} +import fr.acinq.bitcoin.scalacompat.{ByteVector32, Satoshi, Transaction, TxId} import fr.acinq.eclair.blockchain.fee.FeeratePerKw import fr.acinq.eclair.channel.Helpers.Closing.ClosingType -import fr.acinq.eclair.io.Peer.OpenChannelResponse -import fr.acinq.eclair.wire.protocol.{ChannelAnnouncement, ChannelUpdate} -import fr.acinq.eclair.{BlockHeight, Features, ShortChannelId} +import fr.acinq.eclair.wire.protocol.{ChannelAnnouncement, ChannelUpdate, LiquidityAds} +import fr.acinq.eclair.{BlockHeight, Features, MilliSatoshi, ShortChannelId} /** * Created by PM on 17/08/2016. @@ -79,6 +78,14 @@ case class ChannelSignatureSent(channel: ActorRef, commitments: Commitments) ext case class ChannelSignatureReceived(channel: ActorRef, commitments: Commitments) extends ChannelEvent +case class LiquidityPurchase(fundingTxId: TxId, fundingTxIndex: Long, isBuyer: Boolean, amount: Satoshi, fees: LiquidityAds.Fees, capacity: Satoshi, localContribution: Satoshi, remoteContribution: Satoshi, localBalance: MilliSatoshi, remoteBalance: MilliSatoshi, outgoingHtlcCount: Long, incomingHtlcCount: Long) { + val previousCapacity: Satoshi = capacity - localContribution - remoteContribution + val previousLocalBalance: MilliSatoshi = if (isBuyer) localBalance - localContribution + fees.total else localBalance - localContribution - fees.total + val previousRemoteBalance: MilliSatoshi = if (isBuyer) remoteBalance - remoteContribution - fees.total else remoteBalance - remoteContribution + fees.total +} + +case class ChannelLiquidityPurchased(channel: ActorRef, channelId: ByteVector32, remoteNodeId: PublicKey, purchase: LiquidityPurchase) extends ChannelEvent + case class ChannelErrorOccurred(channel: ActorRef, channelId: ByteVector32, remoteNodeId: PublicKey, error: ChannelError, isFatal: Boolean) extends ChannelEvent // NB: the fee should be set to 0 when we're not paying it. diff --git a/eclair-core/src/main/scala/fr/acinq/eclair/channel/fsm/Channel.scala b/eclair-core/src/main/scala/fr/acinq/eclair/channel/fsm/Channel.scala index f6abe66821..ca6c7448e0 100644 --- a/eclair-core/src/main/scala/fr/acinq/eclair/channel/fsm/Channel.scala +++ b/eclair-core/src/main/scala/fr/acinq/eclair/channel/fsm/Channel.scala @@ -982,7 +982,7 @@ class Channel(val nodeParams: NodeParams, val wallet: OnChainChannelFunder with sessionId, nodeParams, fundingParams, channelParams = d.commitments.params, - purpose = InteractiveTxBuilder.SpliceTx(parentCommitment), + purpose = InteractiveTxBuilder.SpliceTx(parentCommitment, d.commitments.changes), localPushAmount = spliceAck.pushAmount, remotePushAmount = msg.pushAmount, liquidityPurchase_opt = willFund_opt.map(_.purchase), wallet @@ -1029,7 +1029,7 @@ class Channel(val nodeParams: NodeParams, val wallet: OnChainChannelFunder with sessionId, nodeParams, fundingParams, channelParams = d.commitments.params, - purpose = InteractiveTxBuilder.SpliceTx(parentCommitment), + purpose = InteractiveTxBuilder.SpliceTx(parentCommitment, d.commitments.changes), localPushAmount = cmd.pushAmount, remotePushAmount = msg.pushAmount, liquidityPurchase_opt = liquidityPurchase_opt, wallet diff --git a/eclair-core/src/main/scala/fr/acinq/eclair/channel/fund/InteractiveTxBuilder.scala b/eclair-core/src/main/scala/fr/acinq/eclair/channel/fund/InteractiveTxBuilder.scala index 60795b02f4..4eb1c7d1b4 100644 --- a/eclair-core/src/main/scala/fr/acinq/eclair/channel/fund/InteractiveTxBuilder.scala +++ b/eclair-core/src/main/scala/fr/acinq/eclair/channel/fund/InteractiveTxBuilder.scala @@ -16,6 +16,8 @@ package fr.acinq.eclair.channel.fund +import akka.actor.typed.eventstream.EventStream +import akka.actor.typed.scaladsl.adapter.TypedActorRefOps import akka.actor.typed.scaladsl.{ActorContext, Behaviors, StashBuffer} import akka.actor.typed.{ActorRef, Behavior} import akka.event.LoggingAdapter @@ -163,6 +165,8 @@ object InteractiveTxBuilder { def previousFundingAmount: Satoshi def localCommitIndex: Long def remoteCommitIndex: Long + def localNextHtlcId: Long + def remoteNextHtlcId: Long def remotePerCommitmentPoint: PublicKey def commitTxFeerate: FeeratePerKw def fundingTxIndex: Long @@ -175,15 +179,19 @@ object InteractiveTxBuilder { override val previousFundingAmount: Satoshi = 0 sat override val localCommitIndex: Long = 0 override val remoteCommitIndex: Long = 0 + override val localNextHtlcId: Long = 0 + override val remoteNextHtlcId: Long = 0 override val fundingTxIndex: Long = 0 override val localHtlcs: Set[DirectedHtlc] = Set.empty } - case class SpliceTx(parentCommitment: Commitment) extends Purpose { + case class SpliceTx(parentCommitment: Commitment, changes: CommitmentChanges) extends Purpose { override val previousLocalBalance: MilliSatoshi = parentCommitment.localCommit.spec.toLocal override val previousRemoteBalance: MilliSatoshi = parentCommitment.remoteCommit.spec.toLocal override val previousFundingAmount: Satoshi = parentCommitment.capacity override val localCommitIndex: Long = parentCommitment.localCommit.index override val remoteCommitIndex: Long = parentCommitment.remoteCommit.index + override val localNextHtlcId: Long = changes.localNextHtlcId + override val remoteNextHtlcId: Long = changes.remoteNextHtlcId override val remotePerCommitmentPoint: PublicKey = parentCommitment.remoteCommit.remotePerCommitmentPoint override val commitTxFeerate: FeeratePerKw = parentCommitment.localCommit.spec.commitTxFeerate override val fundingTxIndex: Long = parentCommitment.fundingTxIndex + 1 @@ -199,6 +207,8 @@ object InteractiveTxBuilder { override val previousFundingAmount: Satoshi = (previousLocalBalance + previousRemoteBalance).truncateToSatoshi override val localCommitIndex: Long = replacedCommitment.localCommit.index override val remoteCommitIndex: Long = replacedCommitment.remoteCommit.index + override val localNextHtlcId: Long = 0 + override val remoteNextHtlcId: Long = 0 override val remotePerCommitmentPoint: PublicKey = replacedCommitment.remoteCommit.remotePerCommitmentPoint override val commitTxFeerate: FeeratePerKw = replacedCommitment.localCommit.spec.commitTxFeerate override val fundingTxIndex: Long = replacedCommitment.fundingTxIndex @@ -792,6 +802,29 @@ private class InteractiveTxBuilder(replyTo: ActorRef[InteractiveTxBuilder.Respon Behaviors.receiveMessagePartial { case SignTransactionResult(signedTx) => log.info(s"interactive-tx txid=${signedTx.txId} partially signed with {} local inputs, {} remote inputs, {} local outputs and {} remote outputs", signedTx.tx.localInputs.length, signedTx.tx.remoteInputs.length, signedTx.tx.localOutputs.length, signedTx.tx.remoteOutputs.length) + // At this point, we're not completely sure that the transaction will succeed: if our peer doesn't send their + // commit_sig, the transaction will be aborted. But it's a best effort, because after sending our commit_sig, + // we won't store details about the liquidity purchase so we'll be unable to emit that event later. Even after + // fully signing the transaction, it may be double-spent by a force-close, which would invalidate it as well. + // The right solution is to check confirmations on the funding transaction before considering that a liquidity + // purchase is completed, which is what we do in our AuditDb. + liquidityPurchase_opt.foreach { p => + val purchase = LiquidityPurchase( + fundingTxId = signedTx.txId, + fundingTxIndex = purpose.fundingTxIndex, + isBuyer = fundingParams.isInitiator, + amount = p.amount, + fees = p.fees, + capacity = fundingParams.fundingAmount, + localContribution = fundingParams.localContribution, + remoteContribution = fundingParams.remoteContribution, + localBalance = localCommit.spec.toLocal, + remoteBalance = localCommit.spec.toRemote, + outgoingHtlcCount = purpose.localNextHtlcId, + incomingHtlcCount = purpose.remoteNextHtlcId, + ) + context.system.eventStream ! EventStream.Publish(ChannelLiquidityPurchased(replyTo.toClassic, channelParams.channelId, remoteNodeId, purchase)) + } replyTo ! Succeeded(InteractiveTxSigningSession.WaitingForSigs(fundingParams, purpose.fundingTxIndex, signedTx, Left(localCommit), remoteCommit), commitSig) Behaviors.stopped case WalletFailure(t) => diff --git a/eclair-core/src/main/scala/fr/acinq/eclair/db/Databases.scala b/eclair-core/src/main/scala/fr/acinq/eclair/db/Databases.scala index 9713cfbf1b..bf0007a15f 100644 --- a/eclair-core/src/main/scala/fr/acinq/eclair/db/Databases.scala +++ b/eclair-core/src/main/scala/fr/acinq/eclair/db/Databases.scala @@ -44,6 +44,7 @@ trait Databases { def peers: PeersDb def payments: PaymentsDb def pendingCommands: PendingCommandsDb + def liquidity: LiquidityDb //@formatter:on } @@ -60,6 +61,7 @@ object Databases extends Logging { } case class SqliteDatabases private(network: SqliteNetworkDb, + liquidity: SqliteLiquidityDb, audit: SqliteAuditDb, channels: SqliteChannelsDb, peers: SqlitePeersDb, @@ -78,6 +80,7 @@ object Databases extends Logging { jdbcUrlFile_opt.foreach(checkIfDatabaseUrlIsUnchanged("sqlite", _)) SqliteDatabases( network = new SqliteNetworkDb(networkJdbc), + liquidity = new SqliteLiquidityDb(eclairJdbc), audit = new SqliteAuditDb(auditJdbc), channels = new SqliteChannelsDb(eclairJdbc), peers = new SqlitePeersDb(eclairJdbc), @@ -89,6 +92,7 @@ object Databases extends Logging { } case class PostgresDatabases private(network: PgNetworkDb, + liquidity: PgLiquidityDb, audit: PgAuditDb, channels: PgChannelsDb, peers: PgPeersDb, @@ -106,8 +110,7 @@ object Databases extends Logging { auditRelayedMaxAge: FiniteDuration, localChannelsMinCount: Int, networkNodesMinCount: Int, - networkChannelsMinCount: Int - ) + networkChannelsMinCount: Int) def apply(hikariConfig: HikariConfig, instanceId: UUID, @@ -149,6 +152,7 @@ object Databases extends Logging { val databases = PostgresDatabases( network = new PgNetworkDb, + liquidity = new PgLiquidityDb, audit = new PgAuditDb, channels = new PgChannelsDb, peers = new PgPeersDb, @@ -160,7 +164,7 @@ object Databases extends Logging { readOnlyUser_opt.foreach { readOnlyUser => PgUtils.inTransaction { connection => using(connection.createStatement()) { statement => - val schemas = "public" :: "audit" :: "local" :: "network" :: "payments" :: Nil + val schemas = "public" :: "audit" :: "local" :: "network" :: "payments" :: "liquidity" :: Nil schemas.foreach { schema => logger.info(s"granting read-only access to user=$readOnlyUser schema=$schema") statement.executeUpdate(s"GRANT USAGE ON SCHEMA $schema TO $readOnlyUser") diff --git a/eclair-core/src/main/scala/fr/acinq/eclair/db/DbEventHandler.scala b/eclair-core/src/main/scala/fr/acinq/eclair/db/DbEventHandler.scala index 0a66c56b43..f74918980a 100644 --- a/eclair-core/src/main/scala/fr/acinq/eclair/db/DbEventHandler.scala +++ b/eclair-core/src/main/scala/fr/acinq/eclair/db/DbEventHandler.scala @@ -38,6 +38,7 @@ class DbEventHandler(nodeParams: NodeParams) extends Actor with DiagnosticActorL private val auditDb: AuditDb = nodeParams.db.audit private val channelsDb: ChannelsDb = nodeParams.db.channels + private val liquidityDb: LiquidityDb = nodeParams.db.liquidity context.spawn(Behaviors.supervise(RevokedHtlcInfoCleaner(channelsDb, nodeParams.revokedHtlcInfoCleanerConfig)).onFailure(SupervisorStrategy.restart), name = "revoked-htlc-info-cleaner") @@ -45,6 +46,7 @@ class DbEventHandler(nodeParams: NodeParams) extends Actor with DiagnosticActorL context.system.eventStream.subscribe(self, classOf[PaymentFailed]) context.system.eventStream.subscribe(self, classOf[PaymentReceived]) context.system.eventStream.subscribe(self, classOf[PaymentRelayed]) + context.system.eventStream.subscribe(self, classOf[ChannelLiquidityPurchased]) context.system.eventStream.subscribe(self, classOf[TransactionPublished]) context.system.eventStream.subscribe(self, classOf[TransactionConfirmed]) context.system.eventStream.subscribe(self, classOf[ChannelErrorOccurred]) @@ -92,11 +94,15 @@ class DbEventHandler(nodeParams: NodeParams) extends Actor with DiagnosticActorL } auditDb.add(e) + case e: ChannelLiquidityPurchased => liquidityDb.addPurchase(e) + case e: TransactionPublished => log.info(s"paying mining fee=${e.miningFee} for txid=${e.tx.txid} desc=${e.desc}") auditDb.add(e) - case e: TransactionConfirmed => auditDb.add(e) + case e: TransactionConfirmed => + liquidityDb.setConfirmed(e.remoteNodeId, e.tx.txid) + auditDb.add(e) case e: ChannelErrorOccurred => // first pattern matching level is to ignore some errors, second level is to separate between different kind of errors diff --git a/eclair-core/src/main/scala/fr/acinq/eclair/db/DualDatabases.scala b/eclair-core/src/main/scala/fr/acinq/eclair/db/DualDatabases.scala index c2e04179d4..755430bf00 100644 --- a/eclair-core/src/main/scala/fr/acinq/eclair/db/DualDatabases.scala +++ b/eclair-core/src/main/scala/fr/acinq/eclair/db/DualDatabases.scala @@ -30,16 +30,12 @@ import scala.util.{Failure, Success, Try} case class DualDatabases(primary: Databases, secondary: Databases) extends Databases with FileBackup { override val network: NetworkDb = DualNetworkDb(primary.network, secondary.network) - override val audit: AuditDb = DualAuditDb(primary.audit, secondary.audit) - override val channels: ChannelsDb = DualChannelsDb(primary.channels, secondary.channels) - override val peers: PeersDb = DualPeersDb(primary.peers, secondary.peers) - override val payments: PaymentsDb = DualPaymentsDb(primary.payments, secondary.payments) - override val pendingCommands: PendingCommandsDb = DualPendingCommandsDb(primary.pendingCommands, secondary.pendingCommands) + override val liquidity: LiquidityDb = DualLiquidityDb(primary.liquidity, secondary.liquidity) /** if one of the database supports file backup, we use it */ override def backup(backupFile: File): Unit = (primary, secondary) match { @@ -411,3 +407,24 @@ case class DualPendingCommandsDb(primary: PendingCommandsDb, secondary: PendingC primary.listSettlementCommands() } } + +case class DualLiquidityDb(primary: LiquidityDb, secondary: LiquidityDb) extends LiquidityDb { + + private implicit val ec: ExecutionContext = ExecutionContext.fromExecutor(Executors.newSingleThreadExecutor(new ThreadFactoryBuilder().setNameFormat("db-liquidity").build())) + + override def addPurchase(liquidityPurchase: ChannelLiquidityPurchased): Unit = { + runAsync(secondary.addPurchase(liquidityPurchase)) + primary.addPurchase(liquidityPurchase) + } + + override def setConfirmed(remoteNodeId: PublicKey, txId: TxId): Unit = { + runAsync(secondary.setConfirmed(remoteNodeId, txId)) + primary.setConfirmed(remoteNodeId, txId) + } + + override def listPurchases(remoteNodeId: PublicKey): Seq[LiquidityPurchase] = { + runAsync(secondary.listPurchases(remoteNodeId)) + primary.listPurchases(remoteNodeId) + } + +} \ No newline at end of file diff --git a/eclair-core/src/main/scala/fr/acinq/eclair/db/LiquidityDb.scala b/eclair-core/src/main/scala/fr/acinq/eclair/db/LiquidityDb.scala new file mode 100644 index 0000000000..e156b5121e --- /dev/null +++ b/eclair-core/src/main/scala/fr/acinq/eclair/db/LiquidityDb.scala @@ -0,0 +1,35 @@ +/* + * Copyright 2024 ACINQ SAS + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package fr.acinq.eclair.db + +import fr.acinq.bitcoin.scalacompat.Crypto.PublicKey +import fr.acinq.bitcoin.scalacompat.TxId +import fr.acinq.eclair.channel.{ChannelLiquidityPurchased, LiquidityPurchase} + +/** + * Created by t-bast on 13/09/2024. + */ + +trait LiquidityDb { + + def addPurchase(liquidityPurchase: ChannelLiquidityPurchased): Unit + + def setConfirmed(remoteNodeId: PublicKey, txId: TxId): Unit + + def listPurchases(remoteNodeId: PublicKey): Seq[LiquidityPurchase] + +} diff --git a/eclair-core/src/main/scala/fr/acinq/eclair/db/pg/PgLiquidityDb.scala b/eclair-core/src/main/scala/fr/acinq/eclair/db/pg/PgLiquidityDb.scala new file mode 100644 index 0000000000..37e742d354 --- /dev/null +++ b/eclair-core/src/main/scala/fr/acinq/eclair/db/pg/PgLiquidityDb.scala @@ -0,0 +1,121 @@ +/* + * Copyright 2024 ACINQ SAS + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package fr.acinq.eclair.db.pg + +import fr.acinq.bitcoin.scalacompat.Crypto.PublicKey +import fr.acinq.bitcoin.scalacompat.{Satoshi, TxId} +import fr.acinq.eclair.MilliSatoshi +import fr.acinq.eclair.channel.{ChannelLiquidityPurchased, LiquidityPurchase} +import fr.acinq.eclair.db.LiquidityDb +import fr.acinq.eclair.db.Monitoring.Metrics.withMetrics +import fr.acinq.eclair.db.Monitoring.Tags.DbBackends +import fr.acinq.eclair.wire.protocol.LiquidityAds +import grizzled.slf4j.Logging + +import java.sql.Timestamp +import java.time.Instant +import javax.sql.DataSource + +/** + * Created by t-bast on 13/09/2024. + */ + +object PgLiquidityDb { + val DB_NAME = "liquidity" + val CURRENT_VERSION = 1 +} + +class PgLiquidityDb(implicit ds: DataSource) extends LiquidityDb with Logging { + + import PgUtils._ + import ExtendedResultSet._ + import PgLiquidityDb._ + + inTransaction { pg => + using(pg.createStatement()) { statement => + getVersion(statement, DB_NAME) match { + case None => + statement.executeUpdate("CREATE SCHEMA liquidity") + statement.executeUpdate("CREATE TABLE liquidity.purchases (tx_id TEXT NOT NULL, channel_id TEXT NOT NULL, node_id TEXT NOT NULL, is_buyer BOOLEAN NOT NULL, amount_sat BIGINT NOT NULL, mining_fee_sat BIGINT NOT NULL, service_fee_sat BIGINT NOT NULL, funding_tx_index BIGINT NOT NULL, capacity_sat BIGINT NOT NULL, local_contribution_sat BIGINT NOT NULL, remote_contribution_sat BIGINT NOT NULL, local_balance_msat BIGINT NOT NULL, remote_balance_msat BIGINT NOT NULL, outgoing_htlc_count BIGINT NOT NULL, incoming_htlc_count BIGINT NOT NULL, created_at TIMESTAMP WITH TIME ZONE NOT NULL, confirmed_at TIMESTAMP WITH TIME ZONE)") + statement.executeUpdate("CREATE INDEX liquidity_purchases_node_id_idx ON liquidity.purchases(node_id)") + case Some(CURRENT_VERSION) => () // table is up-to-date, nothing to do + case Some(unknownVersion) => throw new RuntimeException(s"Unknown version of DB $DB_NAME found, version=$unknownVersion") + } + setVersion(statement, DB_NAME, CURRENT_VERSION) + } + } + + override def addPurchase(e: ChannelLiquidityPurchased): Unit = withMetrics("liquidity/add-purchase", DbBackends.Postgres) { + inTransaction { pg => + using(pg.prepareStatement("INSERT INTO liquidity.purchases VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, NULL)")) { statement => + statement.setString(1, e.purchase.fundingTxId.value.toHex) + statement.setString(2, e.channelId.toHex) + statement.setString(3, e.remoteNodeId.toHex) + statement.setBoolean(4, e.purchase.isBuyer) + statement.setLong(5, e.purchase.amount.toLong) + statement.setLong(6, e.purchase.fees.miningFee.toLong) + statement.setLong(7, e.purchase.fees.serviceFee.toLong) + statement.setLong(8, e.purchase.fundingTxIndex) + statement.setLong(9, e.purchase.capacity.toLong) + statement.setLong(10, e.purchase.localContribution.toLong) + statement.setLong(11, e.purchase.remoteContribution.toLong) + statement.setLong(12, e.purchase.localBalance.toLong) + statement.setLong(13, e.purchase.remoteBalance.toLong) + statement.setLong(14, e.purchase.outgoingHtlcCount) + statement.setLong(15, e.purchase.incomingHtlcCount) + statement.setTimestamp(16, Timestamp.from(Instant.now())) + statement.executeUpdate() + } + } + } + + override def setConfirmed(remoteNodeId: PublicKey, txId: TxId): Unit = withMetrics("liquidity/set-confirmed", DbBackends.Postgres) { + inTransaction { pg => + using(pg.prepareStatement("UPDATE liquidity.purchases SET confirmed_at=? WHERE node_id=? AND tx_id=?")) { statement => + statement.setTimestamp(1, Timestamp.from(Instant.now())) + statement.setString(2, remoteNodeId.toHex) + statement.setString(3, txId.value.toHex) + statement.executeUpdate() + } + } + } + + override def listPurchases(remoteNodeId: PublicKey): Seq[LiquidityPurchase] = withMetrics("liquidity/list-purchases", DbBackends.Postgres) { + inTransaction { pg => + using(pg.prepareStatement("SELECT * FROM liquidity.purchases WHERE node_id=? AND confirmed_at IS NOT NULL")) { statement => + statement.setString(1, remoteNodeId.toHex) + statement.executeQuery().map { rs => + LiquidityPurchase( + fundingTxId = TxId(rs.getByteVector32FromHex("tx_id")), + fundingTxIndex = rs.getLong("funding_tx_index"), + isBuyer = rs.getBoolean("is_buyer"), + amount = Satoshi(rs.getLong("amount_sat")), + fees = LiquidityAds.Fees(miningFee = Satoshi(rs.getLong("mining_fee_sat")), serviceFee = Satoshi(rs.getLong("service_fee_sat"))), + capacity = Satoshi(rs.getLong("capacity_sat")), + localContribution = Satoshi(rs.getLong("local_contribution_sat")), + remoteContribution = Satoshi(rs.getLong("remote_contribution_sat")), + localBalance = MilliSatoshi(rs.getLong("local_balance_msat")), + remoteBalance = MilliSatoshi(rs.getLong("remote_balance_msat")), + outgoingHtlcCount = rs.getLong("outgoing_htlc_count"), + incomingHtlcCount = rs.getLong("incoming_htlc_count") + ) + }.toSeq + } + } + } + +} diff --git a/eclair-core/src/main/scala/fr/acinq/eclair/db/sqlite/SqliteLiquidityDb.scala b/eclair-core/src/main/scala/fr/acinq/eclair/db/sqlite/SqliteLiquidityDb.scala new file mode 100644 index 0000000000..0fb51de127 --- /dev/null +++ b/eclair-core/src/main/scala/fr/acinq/eclair/db/sqlite/SqliteLiquidityDb.scala @@ -0,0 +1,110 @@ +/* + * Copyright 2024 ACINQ SAS + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package fr.acinq.eclair.db.sqlite + +import fr.acinq.bitcoin.scalacompat.Crypto.PublicKey +import fr.acinq.bitcoin.scalacompat.{Satoshi, TxId} +import fr.acinq.eclair.channel.{ChannelLiquidityPurchased, LiquidityPurchase} +import fr.acinq.eclair.db.LiquidityDb +import fr.acinq.eclair.db.Monitoring.Metrics.withMetrics +import fr.acinq.eclair.db.Monitoring.Tags.DbBackends +import fr.acinq.eclair.wire.protocol.LiquidityAds +import fr.acinq.eclair.{MilliSatoshi, TimestampMilli} +import grizzled.slf4j.Logging + +import java.sql.Connection + +/** + * Created by t-bast on 13/09/2024. + */ + +object SqliteLiquidityDb { + val DB_NAME = "liquidity" + val CURRENT_VERSION = 1 +} + +class SqliteLiquidityDb(val sqlite: Connection) extends LiquidityDb with Logging { + + import SqliteUtils._ + import ExtendedResultSet._ + import SqliteLiquidityDb._ + + using(sqlite.createStatement(), inTransaction = true) { statement => + getVersion(statement, DB_NAME) match { + case None => + statement.executeUpdate("CREATE TABLE liquidity_purchases (tx_id BLOB NOT NULL, channel_id BLOB NOT NULL, node_id BLOB NOT NULL, is_buyer BOOLEAN NOT NULL, amount_sat INTEGER NOT NULL, mining_fee_sat INTEGER NOT NULL, service_fee_sat INTEGER NOT NULL, funding_tx_index INTEGER NOT NULL, capacity_sat INTEGER NOT NULL, local_contribution_sat INTEGER NOT NULL, remote_contribution_sat INTEGER NOT NULL, local_balance_msat INTEGER NOT NULL, remote_balance_msat INTEGER NOT NULL, outgoing_htlc_count INTEGER NOT NULL, incoming_htlc_count INTEGER NOT NULL, created_at INTEGER NOT NULL, confirmed_at INTEGER)") + statement.executeUpdate("CREATE INDEX liquidity_purchases_node_id_idx ON liquidity_purchases(node_id)") + case Some(CURRENT_VERSION) => () // table is up-to-date, nothing to do + case Some(unknownVersion) => throw new RuntimeException(s"Unknown version of DB $DB_NAME found, version=$unknownVersion") + } + setVersion(statement, DB_NAME, CURRENT_VERSION) + } + + override def addPurchase(e: ChannelLiquidityPurchased): Unit = withMetrics("liquidity/add-purchase", DbBackends.Sqlite) { + using(sqlite.prepareStatement("INSERT INTO liquidity_purchases VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, NULL)")) { statement => + statement.setBytes(1, e.purchase.fundingTxId.value.toArray) + statement.setBytes(2, e.channelId.toArray) + statement.setBytes(3, e.remoteNodeId.value.toArray) + statement.setBoolean(4, e.purchase.isBuyer) + statement.setLong(5, e.purchase.amount.toLong) + statement.setLong(6, e.purchase.fees.miningFee.toLong) + statement.setLong(7, e.purchase.fees.serviceFee.toLong) + statement.setLong(8, e.purchase.fundingTxIndex) + statement.setLong(9, e.purchase.capacity.toLong) + statement.setLong(10, e.purchase.localContribution.toLong) + statement.setLong(11, e.purchase.remoteContribution.toLong) + statement.setLong(12, e.purchase.localBalance.toLong) + statement.setLong(13, e.purchase.remoteBalance.toLong) + statement.setLong(14, e.purchase.outgoingHtlcCount) + statement.setLong(15, e.purchase.incomingHtlcCount) + statement.setLong(16, TimestampMilli.now().toLong) + statement.executeUpdate() + } + } + + override def setConfirmed(remoteNodeId: PublicKey, txId: TxId): Unit = withMetrics("liquidity/set-confirmed", DbBackends.Sqlite) { + using(sqlite.prepareStatement("UPDATE liquidity_purchases SET confirmed_at=? WHERE node_id=? AND tx_id=?")) { statement => + statement.setLong(1, TimestampMilli.now().toLong) + statement.setBytes(2, remoteNodeId.value.toArray) + statement.setBytes(3, txId.value.toArray) + statement.executeUpdate() + } + } + + override def listPurchases(remoteNodeId: PublicKey): Seq[LiquidityPurchase] = withMetrics("liquidity/list-purchases", DbBackends.Sqlite) { + using(sqlite.prepareStatement("SELECT * FROM liquidity_purchases WHERE node_id=? AND confirmed_at IS NOT NULL")) { statement => + statement.setBytes(1, remoteNodeId.value.toArray) + statement.executeQuery().map { rs => + LiquidityPurchase( + fundingTxId = TxId(rs.getByteVector32("tx_id")), + fundingTxIndex = rs.getLong("funding_tx_index"), + isBuyer = rs.getBoolean("is_buyer"), + amount = Satoshi(rs.getLong("amount_sat")), + fees = LiquidityAds.Fees(miningFee = Satoshi(rs.getLong("mining_fee_sat")), serviceFee = Satoshi(rs.getLong("service_fee_sat"))), + capacity = Satoshi(rs.getLong("capacity_sat")), + localContribution = Satoshi(rs.getLong("local_contribution_sat")), + remoteContribution = Satoshi(rs.getLong("remote_contribution_sat")), + localBalance = MilliSatoshi(rs.getLong("local_balance_msat")), + remoteBalance = MilliSatoshi(rs.getLong("remote_balance_msat")), + outgoingHtlcCount = rs.getLong("outgoing_htlc_count"), + incomingHtlcCount = rs.getLong("incoming_htlc_count") + ) + }.toSeq + } + } + +} diff --git a/eclair-core/src/test/scala/fr/acinq/eclair/TestDatabases.scala b/eclair-core/src/test/scala/fr/acinq/eclair/TestDatabases.scala index 2b25448ce4..304afa9b48 100644 --- a/eclair-core/src/test/scala/fr/acinq/eclair/TestDatabases.scala +++ b/eclair-core/src/test/scala/fr/acinq/eclair/TestDatabases.scala @@ -34,6 +34,7 @@ sealed trait TestDatabases extends Databases { override def peers: PeersDb = db.peers override def payments: PaymentsDb = db.payments override def pendingCommands: PendingCommandsDb = db.pendingCommands + override def liquidity: LiquidityDb = db.liquidity def close(): Unit // @formatter:on } diff --git a/eclair-core/src/test/scala/fr/acinq/eclair/channel/InteractiveTxBuilderSpec.scala b/eclair-core/src/test/scala/fr/acinq/eclair/channel/InteractiveTxBuilderSpec.scala index ba5811e307..ba0f2c24e8 100644 --- a/eclair-core/src/test/scala/fr/acinq/eclair/channel/InteractiveTxBuilderSpec.scala +++ b/eclair-core/src/test/scala/fr/acinq/eclair/channel/InteractiveTxBuilderSpec.scala @@ -139,7 +139,7 @@ class InteractiveTxBuilderSpec extends TestKitBaseClass with AnyFunSuiteLike wit def spawnTxBuilderSpliceAlice(fundingParams: InteractiveTxParams, commitment: Commitment, wallet: OnChainWallet, liquidityPurchase_opt: Option[LiquidityAds.Purchase] = None): ActorRef[InteractiveTxBuilder.Command] = system.spawnAnonymous(InteractiveTxBuilder( ByteVector32.Zeroes, nodeParamsA, fundingParams, channelParamsA, - SpliceTx(commitment), + SpliceTx(commitment, CommitmentChanges.init()), 0 msat, 0 msat, liquidityPurchase_opt, wallet)) @@ -167,7 +167,7 @@ class InteractiveTxBuilderSpec extends TestKitBaseClass with AnyFunSuiteLike wit def spawnTxBuilderSpliceBob(fundingParams: InteractiveTxParams, commitment: Commitment, wallet: OnChainWallet, liquidityPurchase_opt: Option[LiquidityAds.Purchase] = None): ActorRef[InteractiveTxBuilder.Command] = system.spawnAnonymous(InteractiveTxBuilder( ByteVector32.Zeroes, nodeParamsB, fundingParams, channelParamsB, - SpliceTx(commitment), + SpliceTx(commitment, CommitmentChanges.init()), 0 msat, 0 msat, liquidityPurchase_opt, wallet)) @@ -1651,6 +1651,9 @@ class InteractiveTxBuilderSpec extends TestKitBaseClass with AnyFunSuiteLike wit walletA.publishTransaction(txA1.signedTx).pipeTo(probe.ref) probe.expectMsg(txA1.txId) + val eventListener = TestProbe() + system.eventStream.subscribe(eventListener.ref, classOf[ChannelLiquidityPurchased]) + // Alice initiates a splice that is only funded by Bob, because she is purchasing liquidity. val purchase = LiquidityAds.Purchase.Standard(50_000 sat, LiquidityAds.Fees(1000 sat, 1500 sat), LiquidityAds.PaymentDetails.FromChannelBalance) // Alice pays fees for the common fields of the transaction, by decreasing her balance in the shared output. @@ -1695,6 +1698,11 @@ class InteractiveTxBuilderSpec extends TestKitBaseClass with AnyFunSuiteLike wit assert(targetFeerate * 0.9 <= spliceTxA1.feerate && spliceTxA1.feerate <= targetFeerate * 1.25) walletA.publishTransaction(spliceTxA1.signedTx).pipeTo(probe.ref) probe.expectMsg(spliceTxA1.txId) + + val event = eventListener.expectMsgType[ChannelLiquidityPurchased] + assert(event.purchase.fees == purchase.fees) + assert(event.purchase.fundingTxIndex == 1) + assert(event.purchase.fundingTxId == spliceTxA1.txId) } } diff --git a/eclair-core/src/test/scala/fr/acinq/eclair/db/LiquidityDbSpec.scala b/eclair-core/src/test/scala/fr/acinq/eclair/db/LiquidityDbSpec.scala new file mode 100644 index 0000000000..a9b7a3a604 --- /dev/null +++ b/eclair-core/src/test/scala/fr/acinq/eclair/db/LiquidityDbSpec.scala @@ -0,0 +1,60 @@ +/* + * Copyright 2024 ACINQ SAS + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package fr.acinq.eclair.db + +import fr.acinq.bitcoin.scalacompat.{SatoshiLong, TxId} +import fr.acinq.eclair.TestDatabases.forAllDbs +import fr.acinq.eclair.channel.{ChannelLiquidityPurchased, LiquidityPurchase} +import fr.acinq.eclair.wire.protocol.LiquidityAds +import fr.acinq.eclair.{MilliSatoshiLong, randomBytes32, randomKey} +import org.scalatest.funsuite.AnyFunSuite + +class LiquidityDbSpec extends AnyFunSuite { + + test("add/list liquidity purchases") { + forAllDbs { dbs => + val db = dbs.liquidity + val (nodeId1, nodeId2) = (randomKey().publicKey, randomKey().publicKey) + val confirmedFundingTxId = TxId(randomBytes32()) + val unconfirmedFundingTxId = TxId(randomBytes32()) + val e1a = ChannelLiquidityPurchased(null, randomBytes32(), nodeId1, LiquidityPurchase(confirmedFundingTxId, 3, isBuyer = true, 250_000 sat, LiquidityAds.Fees(2_000 sat, 3_000 sat), 750_000 sat, 50_000 sat, 300_000 sat, 400_000_000 msat, 350_000_000 msat, 7, 11)) + val e1b = ChannelLiquidityPurchased(null, randomBytes32(), nodeId1, LiquidityPurchase(confirmedFundingTxId, 7, isBuyer = false, 50_000 sat, LiquidityAds.Fees(300 sat, 700 sat), 500_000 sat, 50_000 sat, 0 sat, 250_000_000 msat, 250_000_000 msat, 0, 0)) + val e1c = ChannelLiquidityPurchased(null, e1b.channelId, nodeId1, LiquidityPurchase(confirmedFundingTxId, 0, isBuyer = false, 150_000 sat, LiquidityAds.Fees(500 sat, 1_500 sat), 250_000 sat, 150_000 sat, -100_000 sat, 200_000_000 msat, 50_000_000 msat, 47, 45)) + val e1d = ChannelLiquidityPurchased(null, randomBytes32(), nodeId1, LiquidityPurchase(unconfirmedFundingTxId, 22, isBuyer = true, 250_000 sat, LiquidityAds.Fees(4_000 sat, 1_000 sat), 450_000 sat, -50_000 sat, 250_000 sat, 150_000_000 msat, 300_000_000 msat, 3, 3)) + val e2a = ChannelLiquidityPurchased(null, randomBytes32(), nodeId2, LiquidityPurchase(confirmedFundingTxId, 453, isBuyer = false, 200_000 sat, LiquidityAds.Fees(1_000 sat, 1_000 sat), 300_000 sat, 250_000 sat, 0 sat, 270_000_000 msat, 30_000_000 msat, 113, 0)) + val e2b = ChannelLiquidityPurchased(null, randomBytes32(), nodeId2, LiquidityPurchase(unconfirmedFundingTxId, 1, isBuyer = false, 200_000 sat, LiquidityAds.Fees(1_000 sat, 1_000 sat), 300_000 sat, 250_000 sat, -10_000 sat, 250_000_000 msat, 50_000_000 msat, 0, 113)) + + db.addPurchase(e1a) + db.addPurchase(e1b) + db.addPurchase(e1c) + db.addPurchase(e1d) + db.addPurchase(e2a) + db.addPurchase(e2b) + + // The liquidity purchase is confirmed only once the corresponding transaction confirms. + assert(db.listPurchases(nodeId1).isEmpty) + assert(db.listPurchases(nodeId2).isEmpty) + + db.setConfirmed(nodeId1, confirmedFundingTxId) + db.setConfirmed(nodeId2, confirmedFundingTxId) + + assert(db.listPurchases(nodeId1).toSet == Set(e1a, e1b, e1c).map(_.purchase)) + assert(db.listPurchases(nodeId2) == Seq(e2a.purchase)) + } + } + +}