Skip to content

Commit

Permalink
Add liquidity purchases to the AuditDb
Browse files Browse the repository at this point in the history
Whenever liquidity is purchased, we store it in the `AuditDb`. This lets
node operators gather useful statistics on their peers, and which ones
are actively using the liquidity that is purchased.

We store minimal information about the liquidity ads itself to be more
easily compatible with potential changes in the spec.
  • Loading branch information
t-bast committed Sep 24, 2024
1 parent 5b5452e commit c6a1199
Show file tree
Hide file tree
Showing 12 changed files with 420 additions and 18 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -18,12 +18,11 @@ package fr.acinq.eclair.channel

import akka.actor.ActorRef
import fr.acinq.bitcoin.scalacompat.Crypto.PublicKey
import fr.acinq.bitcoin.scalacompat.{ByteVector32, Satoshi, Transaction}
import fr.acinq.bitcoin.scalacompat.{ByteVector32, Satoshi, Transaction, TxId}
import fr.acinq.eclair.blockchain.fee.FeeratePerKw
import fr.acinq.eclair.channel.Helpers.Closing.ClosingType
import fr.acinq.eclair.io.Peer.OpenChannelResponse
import fr.acinq.eclair.wire.protocol.{ChannelAnnouncement, ChannelUpdate}
import fr.acinq.eclair.{BlockHeight, Features, ShortChannelId}
import fr.acinq.eclair.wire.protocol.{ChannelAnnouncement, ChannelUpdate, LiquidityAds}
import fr.acinq.eclair.{BlockHeight, Features, MilliSatoshi, ShortChannelId}

/**
* Created by PM on 17/08/2016.
Expand Down Expand Up @@ -79,6 +78,14 @@ case class ChannelSignatureSent(channel: ActorRef, commitments: Commitments) ext

case class ChannelSignatureReceived(channel: ActorRef, commitments: Commitments) extends ChannelEvent

case class LiquidityPurchase(fundingTxId: TxId, fundingTxIndex: Long, isBuyer: Boolean, amount: Satoshi, fees: LiquidityAds.Fees, capacity: Satoshi, localContribution: Satoshi, remoteContribution: Satoshi, localBalance: MilliSatoshi, remoteBalance: MilliSatoshi, outgoingHtlcCount: Long, incomingHtlcCount: Long) {
val previousCapacity: Satoshi = capacity - localContribution - remoteContribution
val previousLocalBalance: MilliSatoshi = if (isBuyer) localBalance - localContribution + fees.total else localBalance - localContribution - fees.total
val previousRemoteBalance: MilliSatoshi = if (isBuyer) remoteBalance - remoteContribution - fees.total else remoteBalance - remoteContribution + fees.total
}

case class ChannelLiquidityPurchased(channel: ActorRef, channelId: ByteVector32, remoteNodeId: PublicKey, purchase: LiquidityPurchase) extends ChannelEvent

case class ChannelErrorOccurred(channel: ActorRef, channelId: ByteVector32, remoteNodeId: PublicKey, error: ChannelError, isFatal: Boolean) extends ChannelEvent

// NB: the fee should be set to 0 when we're not paying it.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -982,7 +982,7 @@ class Channel(val nodeParams: NodeParams, val wallet: OnChainChannelFunder with
sessionId,
nodeParams, fundingParams,
channelParams = d.commitments.params,
purpose = InteractiveTxBuilder.SpliceTx(parentCommitment),
purpose = InteractiveTxBuilder.SpliceTx(parentCommitment, d.commitments.changes),
localPushAmount = spliceAck.pushAmount, remotePushAmount = msg.pushAmount,
liquidityPurchase_opt = willFund_opt.map(_.purchase),
wallet
Expand Down Expand Up @@ -1029,7 +1029,7 @@ class Channel(val nodeParams: NodeParams, val wallet: OnChainChannelFunder with
sessionId,
nodeParams, fundingParams,
channelParams = d.commitments.params,
purpose = InteractiveTxBuilder.SpliceTx(parentCommitment),
purpose = InteractiveTxBuilder.SpliceTx(parentCommitment, d.commitments.changes),
localPushAmount = cmd.pushAmount, remotePushAmount = msg.pushAmount,
liquidityPurchase_opt = liquidityPurchase_opt,
wallet
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,8 @@

package fr.acinq.eclair.channel.fund

import akka.actor.typed.eventstream.EventStream
import akka.actor.typed.scaladsl.adapter.TypedActorRefOps
import akka.actor.typed.scaladsl.{ActorContext, Behaviors, StashBuffer}
import akka.actor.typed.{ActorRef, Behavior}
import akka.event.LoggingAdapter
Expand Down Expand Up @@ -163,6 +165,8 @@ object InteractiveTxBuilder {
def previousFundingAmount: Satoshi
def localCommitIndex: Long
def remoteCommitIndex: Long
def localNextHtlcId: Long
def remoteNextHtlcId: Long
def remotePerCommitmentPoint: PublicKey
def commitTxFeerate: FeeratePerKw
def fundingTxIndex: Long
Expand All @@ -175,15 +179,19 @@ object InteractiveTxBuilder {
override val previousFundingAmount: Satoshi = 0 sat
override val localCommitIndex: Long = 0
override val remoteCommitIndex: Long = 0
override val localNextHtlcId: Long = 0
override val remoteNextHtlcId: Long = 0
override val fundingTxIndex: Long = 0
override val localHtlcs: Set[DirectedHtlc] = Set.empty
}
case class SpliceTx(parentCommitment: Commitment) extends Purpose {
case class SpliceTx(parentCommitment: Commitment, changes: CommitmentChanges) extends Purpose {
override val previousLocalBalance: MilliSatoshi = parentCommitment.localCommit.spec.toLocal
override val previousRemoteBalance: MilliSatoshi = parentCommitment.remoteCommit.spec.toLocal
override val previousFundingAmount: Satoshi = parentCommitment.capacity
override val localCommitIndex: Long = parentCommitment.localCommit.index
override val remoteCommitIndex: Long = parentCommitment.remoteCommit.index
override val localNextHtlcId: Long = changes.localNextHtlcId
override val remoteNextHtlcId: Long = changes.remoteNextHtlcId
override val remotePerCommitmentPoint: PublicKey = parentCommitment.remoteCommit.remotePerCommitmentPoint
override val commitTxFeerate: FeeratePerKw = parentCommitment.localCommit.spec.commitTxFeerate
override val fundingTxIndex: Long = parentCommitment.fundingTxIndex + 1
Expand All @@ -199,6 +207,8 @@ object InteractiveTxBuilder {
override val previousFundingAmount: Satoshi = (previousLocalBalance + previousRemoteBalance).truncateToSatoshi
override val localCommitIndex: Long = replacedCommitment.localCommit.index
override val remoteCommitIndex: Long = replacedCommitment.remoteCommit.index
override val localNextHtlcId: Long = 0
override val remoteNextHtlcId: Long = 0
override val remotePerCommitmentPoint: PublicKey = replacedCommitment.remoteCommit.remotePerCommitmentPoint
override val commitTxFeerate: FeeratePerKw = replacedCommitment.localCommit.spec.commitTxFeerate
override val fundingTxIndex: Long = replacedCommitment.fundingTxIndex
Expand Down Expand Up @@ -792,6 +802,29 @@ private class InteractiveTxBuilder(replyTo: ActorRef[InteractiveTxBuilder.Respon
Behaviors.receiveMessagePartial {
case SignTransactionResult(signedTx) =>
log.info(s"interactive-tx txid=${signedTx.txId} partially signed with {} local inputs, {} remote inputs, {} local outputs and {} remote outputs", signedTx.tx.localInputs.length, signedTx.tx.remoteInputs.length, signedTx.tx.localOutputs.length, signedTx.tx.remoteOutputs.length)
// At this point, we're not completely sure that the transaction will succeed: if our peer doesn't send their
// commit_sig, the transaction will be aborted. But it's a best effort, because after sending our commit_sig,
// we won't store details about the liquidity purchase so we'll be unable to emit that event later. Even after
// fully signing the transaction, it may be double-spent by a force-close, which would invalidate it as well.
// The right solution is to check confirmations on the funding transaction before considering that a liquidity
// purchase is completed, which is what we do in our AuditDb.
liquidityPurchase_opt.foreach { p =>
val purchase = LiquidityPurchase(
fundingTxId = signedTx.txId,
fundingTxIndex = purpose.fundingTxIndex,
isBuyer = fundingParams.isInitiator,
amount = p.amount,
fees = p.fees,
capacity = fundingParams.fundingAmount,
localContribution = fundingParams.localContribution,
remoteContribution = fundingParams.remoteContribution,
localBalance = localCommit.spec.toLocal,
remoteBalance = localCommit.spec.toRemote,
outgoingHtlcCount = purpose.localNextHtlcId,
incomingHtlcCount = purpose.remoteNextHtlcId,
)
context.system.eventStream ! EventStream.Publish(ChannelLiquidityPurchased(replyTo.toClassic, channelParams.channelId, remoteNodeId, purchase))
}
replyTo ! Succeeded(InteractiveTxSigningSession.WaitingForSigs(fundingParams, purpose.fundingTxIndex, signedTx, Left(localCommit), remoteCommit), commitSig)
Behaviors.stopped
case WalletFailure(t) =>
Expand Down
10 changes: 7 additions & 3 deletions eclair-core/src/main/scala/fr/acinq/eclair/db/Databases.scala
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,7 @@ trait Databases {
def peers: PeersDb
def payments: PaymentsDb
def pendingCommands: PendingCommandsDb
def liquidity: LiquidityDb
//@formatter:on
}

Expand All @@ -60,6 +61,7 @@ object Databases extends Logging {
}

case class SqliteDatabases private(network: SqliteNetworkDb,
liquidity: SqliteLiquidityDb,
audit: SqliteAuditDb,
channels: SqliteChannelsDb,
peers: SqlitePeersDb,
Expand All @@ -78,6 +80,7 @@ object Databases extends Logging {
jdbcUrlFile_opt.foreach(checkIfDatabaseUrlIsUnchanged("sqlite", _))
SqliteDatabases(
network = new SqliteNetworkDb(networkJdbc),
liquidity = new SqliteLiquidityDb(eclairJdbc),
audit = new SqliteAuditDb(auditJdbc),
channels = new SqliteChannelsDb(eclairJdbc),
peers = new SqlitePeersDb(eclairJdbc),
Expand All @@ -89,6 +92,7 @@ object Databases extends Logging {
}

case class PostgresDatabases private(network: PgNetworkDb,
liquidity: PgLiquidityDb,
audit: PgAuditDb,
channels: PgChannelsDb,
peers: PgPeersDb,
Expand All @@ -106,8 +110,7 @@ object Databases extends Logging {
auditRelayedMaxAge: FiniteDuration,
localChannelsMinCount: Int,
networkNodesMinCount: Int,
networkChannelsMinCount: Int
)
networkChannelsMinCount: Int)

def apply(hikariConfig: HikariConfig,
instanceId: UUID,
Expand Down Expand Up @@ -149,6 +152,7 @@ object Databases extends Logging {

val databases = PostgresDatabases(
network = new PgNetworkDb,
liquidity = new PgLiquidityDb,
audit = new PgAuditDb,
channels = new PgChannelsDb,
peers = new PgPeersDb,
Expand All @@ -160,7 +164,7 @@ object Databases extends Logging {
readOnlyUser_opt.foreach { readOnlyUser =>
PgUtils.inTransaction { connection =>
using(connection.createStatement()) { statement =>
val schemas = "public" :: "audit" :: "local" :: "network" :: "payments" :: Nil
val schemas = "public" :: "audit" :: "local" :: "network" :: "payments" :: "liquidity" :: Nil
schemas.foreach { schema =>
logger.info(s"granting read-only access to user=$readOnlyUser schema=$schema")
statement.executeUpdate(s"GRANT USAGE ON SCHEMA $schema TO $readOnlyUser")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -38,13 +38,15 @@ class DbEventHandler(nodeParams: NodeParams) extends Actor with DiagnosticActorL

private val auditDb: AuditDb = nodeParams.db.audit
private val channelsDb: ChannelsDb = nodeParams.db.channels
private val liquidityDb: LiquidityDb = nodeParams.db.liquidity

context.spawn(Behaviors.supervise(RevokedHtlcInfoCleaner(channelsDb, nodeParams.revokedHtlcInfoCleanerConfig)).onFailure(SupervisorStrategy.restart), name = "revoked-htlc-info-cleaner")

context.system.eventStream.subscribe(self, classOf[PaymentSent])
context.system.eventStream.subscribe(self, classOf[PaymentFailed])
context.system.eventStream.subscribe(self, classOf[PaymentReceived])
context.system.eventStream.subscribe(self, classOf[PaymentRelayed])
context.system.eventStream.subscribe(self, classOf[ChannelLiquidityPurchased])
context.system.eventStream.subscribe(self, classOf[TransactionPublished])
context.system.eventStream.subscribe(self, classOf[TransactionConfirmed])
context.system.eventStream.subscribe(self, classOf[ChannelErrorOccurred])
Expand Down Expand Up @@ -92,11 +94,15 @@ class DbEventHandler(nodeParams: NodeParams) extends Actor with DiagnosticActorL
}
auditDb.add(e)

case e: ChannelLiquidityPurchased => liquidityDb.addPurchase(e)

case e: TransactionPublished =>
log.info(s"paying mining fee=${e.miningFee} for txid=${e.tx.txid} desc=${e.desc}")
auditDb.add(e)

case e: TransactionConfirmed => auditDb.add(e)
case e: TransactionConfirmed =>
liquidityDb.setConfirmed(e.remoteNodeId, e.tx.txid)
auditDb.add(e)

case e: ChannelErrorOccurred =>
// first pattern matching level is to ignore some errors, second level is to separate between different kind of errors
Expand Down
27 changes: 22 additions & 5 deletions eclair-core/src/main/scala/fr/acinq/eclair/db/DualDatabases.scala
Original file line number Diff line number Diff line change
Expand Up @@ -30,16 +30,12 @@ import scala.util.{Failure, Success, Try}
case class DualDatabases(primary: Databases, secondary: Databases) extends Databases with FileBackup {

override val network: NetworkDb = DualNetworkDb(primary.network, secondary.network)

override val audit: AuditDb = DualAuditDb(primary.audit, secondary.audit)

override val channels: ChannelsDb = DualChannelsDb(primary.channels, secondary.channels)

override val peers: PeersDb = DualPeersDb(primary.peers, secondary.peers)

override val payments: PaymentsDb = DualPaymentsDb(primary.payments, secondary.payments)

override val pendingCommands: PendingCommandsDb = DualPendingCommandsDb(primary.pendingCommands, secondary.pendingCommands)
override val liquidity: LiquidityDb = DualLiquidityDb(primary.liquidity, secondary.liquidity)

/** if one of the database supports file backup, we use it */
override def backup(backupFile: File): Unit = (primary, secondary) match {
Expand Down Expand Up @@ -411,3 +407,24 @@ case class DualPendingCommandsDb(primary: PendingCommandsDb, secondary: PendingC
primary.listSettlementCommands()
}
}

case class DualLiquidityDb(primary: LiquidityDb, secondary: LiquidityDb) extends LiquidityDb {

private implicit val ec: ExecutionContext = ExecutionContext.fromExecutor(Executors.newSingleThreadExecutor(new ThreadFactoryBuilder().setNameFormat("db-liquidity").build()))

override def addPurchase(liquidityPurchase: ChannelLiquidityPurchased): Unit = {
runAsync(secondary.addPurchase(liquidityPurchase))
primary.addPurchase(liquidityPurchase)
}

override def setConfirmed(remoteNodeId: PublicKey, txId: TxId): Unit = {
runAsync(secondary.setConfirmed(remoteNodeId, txId))
primary.setConfirmed(remoteNodeId, txId)
}

override def listPurchases(remoteNodeId: PublicKey): Seq[LiquidityPurchase] = {
runAsync(secondary.listPurchases(remoteNodeId))
primary.listPurchases(remoteNodeId)
}

}
35 changes: 35 additions & 0 deletions eclair-core/src/main/scala/fr/acinq/eclair/db/LiquidityDb.scala
Original file line number Diff line number Diff line change
@@ -0,0 +1,35 @@
/*
* Copyright 2024 ACINQ SAS
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package fr.acinq.eclair.db

import fr.acinq.bitcoin.scalacompat.Crypto.PublicKey
import fr.acinq.bitcoin.scalacompat.TxId
import fr.acinq.eclair.channel.{ChannelLiquidityPurchased, LiquidityPurchase}

/**
* Created by t-bast on 13/09/2024.
*/

trait LiquidityDb {

def addPurchase(liquidityPurchase: ChannelLiquidityPurchased): Unit

def setConfirmed(remoteNodeId: PublicKey, txId: TxId): Unit

def listPurchases(remoteNodeId: PublicKey): Seq[LiquidityPurchase]

}
Loading

0 comments on commit c6a1199

Please sign in to comment.