From 9973d1ecaf8736ea9f989bbb58b3b4e7bc24b42a Mon Sep 17 00:00:00 2001 From: Thomas HUET Date: Thu, 8 Aug 2024 10:42:58 +0200 Subject: [PATCH] Reputation is recorded from channel events --- .../main/scala/fr/acinq/eclair/Setup.scala | 2 +- .../fr/acinq/eclair/channel/ChannelData.scala | 6 +- .../fr/acinq/eclair/channel/fsm/Channel.scala | 4 + .../eclair/payment/relay/ChannelRelay.scala | 18 +-- .../eclair/payment/relay/ChannelRelayer.scala | 2 +- .../eclair/payment/relay/NodeRelay.scala | 17 +-- .../eclair/payment/relay/NodeRelayer.scala | 2 +- .../acinq/eclair/reputation/Reputation.scala | 37 +++-- .../reputation/ReputationRecorder.scala | 133 ++++++++++++------ .../payment/relay/ChannelRelayerSpec.scala | 9 +- .../payment/relay/NodeRelayerSpec.scala | 11 +- .../eclair/payment/relay/RelayerSpec.scala | 2 +- .../reputation/ReputationRecorderSpec.scala | 4 +- 13 files changed, 136 insertions(+), 111 deletions(-) diff --git a/eclair-core/src/main/scala/fr/acinq/eclair/Setup.scala b/eclair-core/src/main/scala/fr/acinq/eclair/Setup.scala index b39ca56a9d..3d5ec1b325 100644 --- a/eclair-core/src/main/scala/fr/acinq/eclair/Setup.scala +++ b/eclair-core/src/main/scala/fr/acinq/eclair/Setup.scala @@ -364,7 +364,7 @@ class Setup(val datadir: File, triggerer = system.spawn(Behaviors.supervise(AsyncPaymentTriggerer()).onFailure(typed.SupervisorStrategy.resume), name = "async-payment-triggerer") peerReadyManager = system.spawn(Behaviors.supervise(PeerReadyManager()).onFailure(typed.SupervisorStrategy.restart), name = "peer-ready-manager") reputationRecorder_opt = if (nodeParams.relayParams.peerReputationConfig.enabled) { - Some(system.spawn(Behaviors.supervise(ReputationRecorder(nodeParams.relayParams.peerReputationConfig, Map.empty)).onFailure(typed.SupervisorStrategy.resume), name = "reputation-recorder")) + Some(system.spawn(Behaviors.supervise(ReputationRecorder(nodeParams.relayParams.peerReputationConfig)).onFailure(typed.SupervisorStrategy.resume), name = "reputation-recorder")) } else { None } diff --git a/eclair-core/src/main/scala/fr/acinq/eclair/channel/ChannelData.scala b/eclair-core/src/main/scala/fr/acinq/eclair/channel/ChannelData.scala index 982403d640..3d4428192a 100644 --- a/eclair-core/src/main/scala/fr/acinq/eclair/channel/ChannelData.scala +++ b/eclair-core/src/main/scala/fr/acinq/eclair/channel/ChannelData.scala @@ -26,7 +26,7 @@ import fr.acinq.eclair.channel.fund.{InteractiveTxBuilder, InteractiveTxSigningS import fr.acinq.eclair.io.Peer import fr.acinq.eclair.transactions.CommitmentSpec import fr.acinq.eclair.transactions.Transactions._ -import fr.acinq.eclair.wire.protocol.{ChannelAnnouncement, ChannelReady, ChannelReestablish, ChannelUpdate, ClosingSigned, CommitSig, FailureMessage, FundingCreated, FundingSigned, Init, LiquidityAds, OnionRoutingPacket, OpenChannel, OpenDualFundedChannel, Shutdown, SpliceInit, Stfu, TxSignatures, UpdateAddHtlc, UpdateFailHtlc, UpdateFailMalformedHtlc, UpdateFulfillHtlc} +import fr.acinq.eclair.wire.protocol.{ChannelAnnouncement, ChannelReady, ChannelReestablish, ChannelUpdate, ClosingSigned, CommitSig, FailureMessage, FundingCreated, FundingSigned, HtlcFailureMessage, Init, LiquidityAds, OnionRoutingPacket, OpenChannel, OpenDualFundedChannel, Shutdown, SpliceInit, Stfu, TxSignatures, UpdateAddHtlc, UpdateFailHtlc, UpdateFailMalformedHtlc, UpdateFulfillHtlc} import fr.acinq.eclair.{Alias, BlockHeight, CltvExpiry, CltvExpiryDelta, Features, InitFeature, MilliSatoshi, MilliSatoshiLong, RealShortChannelId, TimestampMilli, UInt64} import scodec.bits.ByteVector @@ -244,6 +244,10 @@ final case class CMD_GET_CHANNEL_STATE(replyTo: ActorRef) extends HasReplyToComm final case class CMD_GET_CHANNEL_DATA(replyTo: ActorRef) extends HasReplyToCommand final case class CMD_GET_CHANNEL_INFO(replyTo: akka.actor.typed.ActorRef[RES_GET_CHANNEL_INFO]) extends Command +case class OutgoingHtlcAdded(add: UpdateAddHtlc, upstream: Upstream.Hot, fee: MilliSatoshi) +case class OutgoingHtlcFailed(fail: HtlcFailureMessage) +case class OutgoingHtlcFulfilled(fulfill: UpdateFulfillHtlc) + /* 88888888b. 8888888888 .d8888b. 88888888b. ,ad8888ba, 888b 88 .d8888b. 8888888888 .d8888b. 88 "8b 88 d88P Y88b 88 "8b d8"' `"8b 8888b 88 d88P Y88b 88 d88P Y88b diff --git a/eclair-core/src/main/scala/fr/acinq/eclair/channel/fsm/Channel.scala b/eclair-core/src/main/scala/fr/acinq/eclair/channel/fsm/Channel.scala index 056cbbed46..d47614abcb 100644 --- a/eclair-core/src/main/scala/fr/acinq/eclair/channel/fsm/Channel.scala +++ b/eclair-core/src/main/scala/fr/acinq/eclair/channel/fsm/Channel.scala @@ -440,6 +440,7 @@ class Channel(val nodeParams: NodeParams, val wallet: OnChainChannelFunder with case Right((commitments1, add)) => if (c.commit) self ! CMD_SIGN() context.system.eventStream.publish(AvailableBalanceChanged(self, d.channelId, d.shortIds, commitments1)) + context.system.eventStream.publish(OutgoingHtlcAdded(add, c.origin.upstream, nodeFee(d.channelUpdate.relayFees, add.amountMsat))) handleCommandSuccess(c, d.copy(commitments = commitments1)) sending add case Left(cause) => handleAddHtlcCommandError(c, cause, Some(d.channelUpdate)) } @@ -466,6 +467,7 @@ class Channel(val nodeParams: NodeParams, val wallet: OnChainChannelFunder with case Right((commitments1, origin, htlc)) => // we forward preimages as soon as possible to the upstream channel because it allows us to pull funds relayer ! RES_ADD_SETTLED(origin, htlc, HtlcResult.RemoteFulfill(fulfill)) + context.system.eventStream.publish(OutgoingHtlcFulfilled(fulfill)) stay() using d.copy(commitments = commitments1) case Left(cause) => handleLocalError(cause, d, Some(fulfill)) } @@ -499,12 +501,14 @@ class Channel(val nodeParams: NodeParams, val wallet: OnChainChannelFunder with } case Event(fail: UpdateFailHtlc, d: DATA_NORMAL) => + context.system.eventStream.publish(OutgoingHtlcFailed(fail)) d.commitments.receiveFail(fail) match { case Right((commitments1, _, _)) => stay() using d.copy(commitments = commitments1) case Left(cause) => handleLocalError(cause, d, Some(fail)) } case Event(fail: UpdateFailMalformedHtlc, d: DATA_NORMAL) => + context.system.eventStream.publish(OutgoingHtlcFailed(fail)) d.commitments.receiveFailMalformed(fail) match { case Right((commitments1, _, _)) => stay() using d.copy(commitments = commitments1) case Left(cause) => handleLocalError(cause, d, Some(fail)) diff --git a/eclair-core/src/main/scala/fr/acinq/eclair/payment/relay/ChannelRelay.scala b/eclair-core/src/main/scala/fr/acinq/eclair/payment/relay/ChannelRelay.scala index cb3a9b7a94..bfd686ecf3 100644 --- a/eclair-core/src/main/scala/fr/acinq/eclair/payment/relay/ChannelRelay.scala +++ b/eclair-core/src/main/scala/fr/acinq/eclair/payment/relay/ChannelRelay.scala @@ -33,7 +33,7 @@ import fr.acinq.eclair.payment.Monitoring.{Metrics, Tags} import fr.acinq.eclair.payment.relay.Relayer.{OutgoingChannel, OutgoingChannelParams} import fr.acinq.eclair.payment.{ChannelPaymentRelayed, IncomingPaymentPacket} import fr.acinq.eclair.reputation.ReputationRecorder -import fr.acinq.eclair.reputation.ReputationRecorder.{CancelRelay, GetConfidence, RecordResult} +import fr.acinq.eclair.reputation.ReputationRecorder.GetConfidence import fr.acinq.eclair.wire.protocol.FailureMessageCodecs.createBadOnionFailure import fr.acinq.eclair.wire.protocol.PaymentOnion.IntermediatePayload import fr.acinq.eclair.wire.protocol._ @@ -65,7 +65,7 @@ object ChannelRelay { def apply(nodeParams: NodeParams, register: ActorRef, - reputationRecorder_opt: Option[typed.ActorRef[ReputationRecorder.ChannelRelayCommand]], + reputationRecorder_opt: Option[typed.ActorRef[GetConfidence]], channels: Map[ByteVector32, Relayer.OutgoingChannel], originNode: PublicKey, relayId: UUID, @@ -79,14 +79,14 @@ object ChannelRelay { val upstream = Upstream.Hot.Channel(r.add.removeUnknownTlvs(), TimestampMilli.now(), originNode) reputationRecorder_opt match { case Some(reputationRecorder) => - reputationRecorder ! GetConfidence(context.messageAdapter[ReputationRecorder.Confidence](confidence => WrappedConfidence(confidence.value)), originNode, r.add.endorsement, relayId, r.relayFeeMsat) + reputationRecorder ! GetConfidence(context.messageAdapter[ReputationRecorder.Confidence](confidence => WrappedConfidence(confidence.value)), upstream, r.relayFeeMsat) case None => val confidence = (r.add.endorsement + 0.5) / 8 context.self ! WrappedConfidence(confidence) } Behaviors.receiveMessagePartial { case WrappedConfidence(confidence) => - new ChannelRelay(nodeParams, register, reputationRecorder_opt, channels, r, upstream, confidence, context, relayId).start() + new ChannelRelay(nodeParams, register, channels, r, upstream, confidence, context).start() } } } @@ -128,13 +128,11 @@ object ChannelRelay { */ class ChannelRelay private(nodeParams: NodeParams, register: ActorRef, - reputationRecorder_opt: Option[typed.ActorRef[ReputationRecorder.ChannelRelayCommand]], channels: Map[ByteVector32, Relayer.OutgoingChannel], r: IncomingPaymentPacket.ChannelRelayPacket, upstream: Upstream.Hot.Channel, confidence: Double, - context: ActorContext[ChannelRelay.Command], - relayId: UUID) { + context: ActorContext[ChannelRelay.Command]) { import ChannelRelay._ @@ -200,7 +198,6 @@ class ChannelRelay private(nodeParams: NodeParams, case RelayFailure(cmdFail) => Metrics.recordPaymentRelayFailed(Tags.FailureType(cmdFail), Tags.RelayType.Channel) context.log.info("rejecting htlc reason={}", cmdFail.reason) - reputationRecorder_opt.foreach(_ ! CancelRelay(upstream.receivedFrom, r.add.endorsement, relayId)) safeSendAndStop(r.add.channelId, cmdFail) case RelayNeedsFunding(nextNodeId, cmdFail) => val cmd = Peer.ProposeOnTheFlyFunding(onTheFlyFundingResponseAdapter, r.amountToForward, r.add.paymentHash, r.outgoingCltv, r.nextPacket, nextBlindingKey_opt, upstream) @@ -220,7 +217,6 @@ class ChannelRelay private(nodeParams: NodeParams, context.log.warn(s"couldn't resolve downstream channel $channelId, failing htlc #${upstream.add.id}") val cmdFail = CMD_FAIL_HTLC(upstream.add.id, Right(UnknownNextPeer()), commit = true) Metrics.recordPaymentRelayFailed(Tags.FailureType(cmdFail), Tags.RelayType.Channel) - reputationRecorder_opt.foreach(_ ! CancelRelay(upstream.receivedFrom, r.add.endorsement, relayId)) safeSendAndStop(upstream.add.channelId, cmdFail) case WrappedAddResponse(addFailed: RES_ADD_FAILED[_]) => @@ -431,11 +427,9 @@ class ChannelRelay private(nodeParams: NodeParams, featureOk && liquidityIssue && relayParamsOk } - private def recordRelayDuration(isSuccess: Boolean): Unit = { - reputationRecorder_opt.foreach(_ ! RecordResult(upstream.receivedFrom, r.add.endorsement, relayId, isSuccess)) + private def recordRelayDuration(isSuccess: Boolean): Unit = Metrics.RelayedPaymentDuration .withTag(Tags.Relay, Tags.RelayType.Channel) .withTag(Tags.Success, isSuccess) .record((TimestampMilli.now() - upstream.receivedAt).toMillis, TimeUnit.MILLISECONDS) - } } diff --git a/eclair-core/src/main/scala/fr/acinq/eclair/payment/relay/ChannelRelayer.scala b/eclair-core/src/main/scala/fr/acinq/eclair/payment/relay/ChannelRelayer.scala index 590ca16e37..7cf29cc977 100644 --- a/eclair-core/src/main/scala/fr/acinq/eclair/payment/relay/ChannelRelayer.scala +++ b/eclair-core/src/main/scala/fr/acinq/eclair/payment/relay/ChannelRelayer.scala @@ -59,7 +59,7 @@ object ChannelRelayer { def apply(nodeParams: NodeParams, register: ActorRef, - reputationRecorder_opt: Option[typed.ActorRef[ReputationRecorder.ChannelRelayCommand]], + reputationRecorder_opt: Option[typed.ActorRef[ReputationRecorder.GetConfidence]], channels: Map[ByteVector32, Relayer.OutgoingChannel] = Map.empty, scid2channels: Map[ShortChannelId, ByteVector32] = Map.empty, node2channels: mutable.MultiDict[PublicKey, ByteVector32] = mutable.MultiDict.empty): Behavior[Command] = diff --git a/eclair-core/src/main/scala/fr/acinq/eclair/payment/relay/NodeRelay.scala b/eclair-core/src/main/scala/fr/acinq/eclair/payment/relay/NodeRelay.scala index 472ad705dc..a0d26f78d1 100644 --- a/eclair-core/src/main/scala/fr/acinq/eclair/payment/relay/NodeRelay.scala +++ b/eclair-core/src/main/scala/fr/acinq/eclair/payment/relay/NodeRelay.scala @@ -40,6 +40,7 @@ import fr.acinq.eclair.payment.send.PaymentLifecycle.SendPaymentToNode import fr.acinq.eclair.payment.send._ import fr.acinq.eclair.router.Router.{ChannelHop, HopRelayParams, Route, RouteParams} import fr.acinq.eclair.reputation.ReputationRecorder +import fr.acinq.eclair.reputation.ReputationRecorder.GetTrampolineConfidence import fr.acinq.eclair.router.Router.RouteParams import fr.acinq.eclair.router.{BalanceTooLow, RouteNotFound} import fr.acinq.eclair.wire.protocol.PaymentOnion.IntermediatePayload @@ -91,7 +92,7 @@ object NodeRelay { def apply(nodeParams: NodeParams, parent: typed.ActorRef[NodeRelayer.Command], register: ActorRef, - reputationRecorder_opt: Option[typed.ActorRef[ReputationRecorder.TrampolineRelayCommand]], + reputationRecorder_opt: Option[typed.ActorRef[GetTrampolineConfidence]], relayId: UUID, nodeRelayPacket: NodeRelayPacket, outgoingPaymentFactory: OutgoingPaymentFactory, @@ -215,7 +216,7 @@ object NodeRelay { class NodeRelay private(nodeParams: NodeParams, parent: akka.actor.typed.ActorRef[NodeRelayer.Command], register: ActorRef, - reputationRecorder_opt: Option[typed.ActorRef[ReputationRecorder.TrampolineRelayCommand]], + reputationRecorder_opt: Option[typed.ActorRef[GetTrampolineConfidence]], relayId: UUID, paymentHash: ByteVector32, paymentSecret: ByteVector32, @@ -336,11 +337,8 @@ class NodeRelay private(nodeParams: NodeParams, // We only make one try when it's a direct payment to a wallet. val maxPaymentAttempts = if (walletNodeId_opt.isDefined) 1 else nodeParams.maxPaymentAttempts val totalFee = upstream.amountIn - payloadOut.amountToForward - val fees = upstream.received.foldLeft(Map.empty[ReputationRecorder.PeerEndorsement, MilliSatoshi])((fees, r) => - fees.updatedWith(ReputationRecorder.PeerEndorsement(r.receivedFrom, r.add.endorsement))(fee => - Some(fee.getOrElse(MilliSatoshi(0)) + r.add.amountMsat * totalFee.toLong / upstream.amountIn.toLong))) reputationRecorder_opt match { - case Some(reputationRecorder) => reputationRecorder ! ReputationRecorder.GetTrampolineConfidence(context.messageAdapter[ReputationRecorder.Confidence](confidence => WrappedConfidence(confidence.value)), fees, relayId) + case Some(reputationRecorder) => reputationRecorder ! GetTrampolineConfidence(context.messageAdapter(confidence => WrappedConfidence(confidence.value)), upstream, totalFee) case None => context.self ! WrappedConfidence((upstream.received.map(_.add.endorsement).min + 0.5) / 8) } Behaviors.receiveMessagePartial { @@ -396,16 +394,10 @@ class NodeRelay private(nodeParams: NodeParams, case WrappedPaymentSent(paymentSent) => context.log.debug("trampoline payment fully resolved downstream") success(upstream, fulfilledUpstream, paymentSent) - val totalFee = upstream.amountIn - paymentSent.amountWithFees - val fees = upstream.received.foldLeft(Map.empty[ReputationRecorder.PeerEndorsement, MilliSatoshi])((fees, r) => - fees.updatedWith(ReputationRecorder.PeerEndorsement(r.receivedFrom, r.add.endorsement))(fee => - Some(fee.getOrElse(MilliSatoshi(0)) + r.add.amountMsat * totalFee.toLong / upstream.amountIn.toLong))) - reputationRecorder_opt.foreach(_ ! ReputationRecorder.RecordTrampolineSuccess(fees, relayId)) recordRelayDuration(startedAt, isSuccess = true) stopping() case _: WrappedPaymentFailed if fulfilledUpstream => context.log.warn("trampoline payment failed downstream but was fulfilled upstream") - reputationRecorder_opt.foreach(_ ! ReputationRecorder.RecordTrampolineFailure(upstream.received.map(r => ReputationRecorder.PeerEndorsement(r.receivedFrom, r.add.endorsement)).toSet, relayId)) recordRelayDuration(startedAt, isSuccess = true) stopping() case WrappedPaymentFailed(PaymentFailed(_, _, failures, _)) => @@ -415,7 +407,6 @@ class NodeRelay private(nodeParams: NodeParams, attemptOnTheFlyFunding(upstream, walletNodeId, recipient, nextPayload, failures, startedAt) case _ => rejectPayment(upstream, translateError(nodeParams, failures, upstream, nextPayload)) - reputationRecorder_opt.foreach(_ ! ReputationRecorder.RecordTrampolineFailure(upstream.received.map(r => ReputationRecorder.PeerEndorsement(r.receivedFrom, r.add.endorsement)).toSet, relayId)) recordRelayDuration(startedAt, isSuccess = false) stopping() } diff --git a/eclair-core/src/main/scala/fr/acinq/eclair/payment/relay/NodeRelayer.scala b/eclair-core/src/main/scala/fr/acinq/eclair/payment/relay/NodeRelayer.scala index 4bff50e476..17cadc734d 100644 --- a/eclair-core/src/main/scala/fr/acinq/eclair/payment/relay/NodeRelayer.scala +++ b/eclair-core/src/main/scala/fr/acinq/eclair/payment/relay/NodeRelayer.scala @@ -60,7 +60,7 @@ object NodeRelayer { */ def apply(nodeParams: NodeParams, register: akka.actor.ActorRef, - reputationRecorder_opt: Option[ActorRef[ReputationRecorder.TrampolineRelayCommand]], + reputationRecorder_opt: Option[ActorRef[ReputationRecorder.GetTrampolineConfidence]], outgoingPaymentFactory: NodeRelay.OutgoingPaymentFactory, router: akka.actor.ActorRef, children: Map[PaymentKey, ActorRef[NodeRelay.Command]] = Map.empty): Behavior[Command] = diff --git a/eclair-core/src/main/scala/fr/acinq/eclair/reputation/Reputation.scala b/eclair-core/src/main/scala/fr/acinq/eclair/reputation/Reputation.scala index f74c0aca67..fda3ea5d54 100644 --- a/eclair-core/src/main/scala/fr/acinq/eclair/reputation/Reputation.scala +++ b/eclair-core/src/main/scala/fr/acinq/eclair/reputation/Reputation.scala @@ -16,9 +16,10 @@ package fr.acinq.eclair.reputation +import fr.acinq.bitcoin.scalacompat.ByteVector32 +import fr.acinq.eclair.reputation.Reputation.HtlcId import fr.acinq.eclair.{MilliSatoshi, TimestampMilli} -import java.util.UUID import scala.concurrent.duration.FiniteDuration /** @@ -26,7 +27,7 @@ import scala.concurrent.duration.FiniteDuration */ /** - * Local reputation for a given incoming node, that should be track for each incoming endorsement level. + * Local reputation for a given incoming node, that should be tracked for each incoming endorsement level. * * @param pastWeight How much fees we would have collected in the past if all payments had succeeded (exponential moving average). * @param pastScore How much fees we have collected in the past (exponential moving average). @@ -36,51 +37,47 @@ import scala.concurrent.duration.FiniteDuration * @param maxRelayDuration Duration after which payments are penalized for staying pending too long. * @param pendingMultiplier How much to penalize pending payments. */ -case class Reputation(pastWeight: Double, pastScore: Double, lastSettlementAt: TimestampMilli, pending: Map[UUID, Reputation.PendingPayment], halfLife: FiniteDuration, maxRelayDuration: FiniteDuration, pendingMultiplier: Double) { +case class Reputation(pastWeight: Double, pastScore: Double, lastSettlementAt: TimestampMilli, pending: Map[HtlcId, Reputation.PendingPayment], halfLife: FiniteDuration, maxRelayDuration: FiniteDuration, pendingMultiplier: Double) { private def decay(now: TimestampMilli): Double = scala.math.pow(0.5, (now - lastSettlementAt) / halfLife) private def pendingWeight(now: TimestampMilli): Double = pending.values.map(_.weight(now, maxRelayDuration, pendingMultiplier)).sum /** - * Register a payment to relay and estimate the confidence that it will succeed. - * - * @return (updated reputation, confidence) + * Estimate the confidence that a payment will succeed. */ - def attempt(relayId: UUID, fee: MilliSatoshi, now: TimestampMilli = TimestampMilli.now()): (Reputation, Double) = { + def getConfidence(fee: MilliSatoshi, now: TimestampMilli = TimestampMilli.now()): Double = { val d = decay(now) - val newReputation = copy(pending = pending + (relayId -> Reputation.PendingPayment(fee, now))) - val confidence = d * pastScore / (d * pastWeight + newReputation.pendingWeight(now)) - (newReputation, confidence) + d * pastScore / (d * pastWeight + pendingWeight(now) + fee.toLong.toDouble * pendingMultiplier) } /** - * Mark a previously registered payment as failed without trying to relay it (usually because its confidence was too low). + * Register a pending relay. * * @return updated reputation */ - def cancel(relayId: UUID): Reputation = copy(pending = pending - relayId) + def attempt(htlcId: HtlcId, fee: MilliSatoshi, now: TimestampMilli = TimestampMilli.now()): Reputation = + copy(pending = pending + (htlcId -> Reputation.PendingPayment(fee, now))) /** * When a payment is settled, we record whether it succeeded and how long it took. * - * @param feeOverride When relaying trampoline payments, the actual fee is only known when the payment succeeds. This - * is used instead of the fee upper bound that was known when first attempting the relay. * @return updated reputation */ - def record(relayId: UUID, isSuccess: Boolean, feeOverride: Option[MilliSatoshi] = None, now: TimestampMilli = TimestampMilli.now()): Reputation = { - pending.get(relayId) match { + def record(htlcId: HtlcId, isSuccess: Boolean, now: TimestampMilli = TimestampMilli.now()): Reputation = { + pending.get(htlcId) match { case Some(p) => val d = decay(now) - val p1 = p.copy(fee = feeOverride.getOrElse(p.fee)) - val newWeight = d * pastWeight + p1.weight(now, maxRelayDuration, 1.0) - val newScore = d * pastScore + (if (isSuccess) p1.fee.toLong.toDouble else 0) - Reputation(newWeight, newScore, now, pending - relayId, halfLife, maxRelayDuration, pendingMultiplier) + val newWeight = d * pastWeight + p.weight(now, maxRelayDuration, if (isSuccess) 1.0 else 0.0) + val newScore = d * pastScore + (if (isSuccess) p.fee.toLong.toDouble else 0) + Reputation(newWeight, newScore, now, pending - htlcId, halfLife, maxRelayDuration, pendingMultiplier) case None => this } } } object Reputation { + case class HtlcId(channelId: ByteVector32, id: Long) + /** We're relaying that payment and are waiting for it to settle. */ case class PendingPayment(fee: MilliSatoshi, startedAt: TimestampMilli) { def weight(now: TimestampMilli, minDuration: FiniteDuration, multiplier: Double): Double = { diff --git a/eclair-core/src/main/scala/fr/acinq/eclair/reputation/ReputationRecorder.scala b/eclair-core/src/main/scala/fr/acinq/eclair/reputation/ReputationRecorder.scala index 77982ebfbe..3cb7509e28 100644 --- a/eclair-core/src/main/scala/fr/acinq/eclair/reputation/ReputationRecorder.scala +++ b/eclair-core/src/main/scala/fr/acinq/eclair/reputation/ReputationRecorder.scala @@ -16,12 +16,19 @@ package fr.acinq.eclair.reputation -import akka.actor.typed.scaladsl.Behaviors +import akka.actor.typed.eventstream.EventStream +import akka.actor.typed.scaladsl.{ActorContext, Behaviors} import akka.actor.typed.{ActorRef, Behavior} import fr.acinq.bitcoin.scalacompat.Crypto.PublicKey import fr.acinq.eclair.MilliSatoshi +import fr.acinq.eclair.channel.Upstream.Hot +import fr.acinq.eclair.channel.{OutgoingHtlcAdded, OutgoingHtlcFailed, OutgoingHtlcFulfilled, Upstream} +import fr.acinq.eclair.reputation.Reputation.HtlcId +import fr.acinq.eclair.wire.protocol.{UpdateFailHtlc, UpdateFailMalformedHtlc} +import ReputationRecorder._ + +import scala.collection.mutable -import java.util.UUID /** * Created by thomash on 21/07/2023. @@ -30,16 +37,11 @@ import java.util.UUID object ReputationRecorder { // @formatter:off sealed trait Command - - sealed trait ChannelRelayCommand extends Command - case class GetConfidence(replyTo: ActorRef[Confidence], originNode: PublicKey, endorsement: Int, relayId: UUID, fee: MilliSatoshi) extends ChannelRelayCommand - case class CancelRelay(originNode: PublicKey, endorsement: Int, relayId: UUID) extends ChannelRelayCommand - case class RecordResult(originNode: PublicKey, endorsement: Int, relayId: UUID, isSuccess: Boolean) extends ChannelRelayCommand - - sealed trait TrampolineRelayCommand extends Command - case class GetTrampolineConfidence(replyTo: ActorRef[Confidence], fees: Map[PeerEndorsement, MilliSatoshi], relayId: UUID) extends TrampolineRelayCommand - case class RecordTrampolineFailure(upstream: Set[PeerEndorsement], relayId: UUID) extends TrampolineRelayCommand - case class RecordTrampolineSuccess(fees: Map[PeerEndorsement, MilliSatoshi], relayId: UUID) extends TrampolineRelayCommand + case class GetConfidence(replyTo: ActorRef[Confidence], upstream: Upstream.Hot.Channel, fee: MilliSatoshi) extends Command + case class GetTrampolineConfidence(replyTo: ActorRef[Confidence], upstream: Upstream.Hot.Trampoline, fee: MilliSatoshi) extends Command + private case class WrappedOutgoingHtlcAdded(added: OutgoingHtlcAdded) extends Command + private case class WrappedOutgoingHtlcFailed(failed: OutgoingHtlcFailed) extends Command + private case class WrappedOutgoingHtlcFulfilled(fulfilled: OutgoingHtlcFulfilled) extends Command // @formatter:on /** @@ -48,43 +50,90 @@ object ReputationRecorder { */ case class PeerEndorsement(nodeId: PublicKey, endorsement: Int) + object PeerEndorsement { + def apply(channel: Upstream.Hot.Channel): PeerEndorsement = PeerEndorsement(channel.receivedFrom, channel.add.endorsement) + } + /** Confidence that the outgoing HTLC will succeed. */ case class Confidence(value: Double) - def apply(reputationConfig: Reputation.Config, reputations: Map[PeerEndorsement, Reputation]): Behavior[Command] = { + def apply(config: Reputation.Config): Behavior[Command] = + Behaviors.setup(context => { + context.system.eventStream ! EventStream.Subscribe(context.messageAdapter(WrappedOutgoingHtlcAdded)) + context.system.eventStream ! EventStream.Subscribe(context.messageAdapter(WrappedOutgoingHtlcFailed)) + context.system.eventStream ! EventStream.Subscribe(context.messageAdapter(WrappedOutgoingHtlcFulfilled)) + new ReputationRecorder(config, context).run() + }) +} + +class ReputationRecorder(config: Reputation.Config, context: ActorContext[ReputationRecorder.Command]) { + private val reputations: mutable.Map[PeerEndorsement, Reputation] = mutable.HashMap.empty.withDefaultValue(Reputation.init(config)) + private val pending: mutable.Map[HtlcId, Upstream.Hot] = mutable.HashMap.empty + + def run(): Behavior[Command] = Behaviors.receiveMessage { - case GetConfidence(replyTo, originNode, endorsement, relayId, fee) => - val (updatedReputation, confidence) = reputations.getOrElse(PeerEndorsement(originNode, endorsement), Reputation.init(reputationConfig)).attempt(relayId, fee) + case GetConfidence(replyTo, upstream, fee) => + val confidence = reputations(PeerEndorsement(upstream)).getConfidence(fee) replyTo ! Confidence(confidence) - ReputationRecorder(reputationConfig, reputations.updated(PeerEndorsement(originNode, endorsement), updatedReputation)) - case CancelRelay(originNode, endorsement, relayId) => - val updatedReputation = reputations.getOrElse(PeerEndorsement(originNode, endorsement), Reputation.init(reputationConfig)).cancel(relayId) - ReputationRecorder(reputationConfig, reputations.updated(PeerEndorsement(originNode, endorsement), updatedReputation)) - case RecordResult(originNode, endorsement, relayId, isSuccess) => - val updatedReputation = reputations.getOrElse(PeerEndorsement(originNode, endorsement), Reputation.init(reputationConfig)).record(relayId, isSuccess) - ReputationRecorder(reputationConfig, reputations.updated(PeerEndorsement(originNode, endorsement), updatedReputation)) - case GetTrampolineConfidence(replyTo, fees, relayId) => - val (confidence, updatedReputations) = fees.foldLeft((1.0, reputations)) { - case ((c, r), (peerEndorsement, fee)) => - val (updatedReputation, confidence) = reputations.getOrElse(peerEndorsement, Reputation.init(reputationConfig)).attempt(relayId, fee) - (c.min(confidence), r.updated(peerEndorsement, updatedReputation)) - } + Behaviors.same + + case GetTrampolineConfidence(replyTo, upstream, totalFee) => + val confidence = + upstream.received + .groupMapReduce(r => PeerEndorsement(r.receivedFrom, r.add.endorsement))(_.add.amountMsat)(_ + _) + .map { + case (peerEndorsement, amount) => + val fee = amount * totalFee.toLong / upstream.amountIn.toLong + reputations(peerEndorsement).getConfidence(fee) + } + .min replyTo ! Confidence(confidence) - ReputationRecorder(reputationConfig, updatedReputations) - case RecordTrampolineFailure(keys, relayId) => - val updatedReputations = keys.foldLeft(reputations) { - case (r, peerEndorsement) => - val updatedReputation = reputations.getOrElse(peerEndorsement, Reputation.init(reputationConfig)).record(relayId, isSuccess = false) - r.updated(peerEndorsement, updatedReputation) + Behaviors.same + + case WrappedOutgoingHtlcAdded(OutgoingHtlcAdded(add, upstream, fee)) => + val htlcId = HtlcId(add.channelId, add.id) + upstream match { + case channel: Hot.Channel => + reputations(PeerEndorsement(channel)) = reputations(PeerEndorsement(channel)).attempt(htlcId, fee) + pending += (htlcId -> upstream) + case trampoline: Hot.Trampoline => + trampoline.received.foreach(channel => + reputations(PeerEndorsement(channel)) = reputations(PeerEndorsement(channel)).attempt(htlcId, fee * channel.amountIn.toLong / trampoline.amountIn.toLong) + ) + pending += (htlcId -> upstream) + case _: Upstream.Local => () + } + Behaviors.same + + case WrappedOutgoingHtlcFailed(OutgoingHtlcFailed(fail)) => + val htlcId = fail match { + case UpdateFailHtlc(channelId, id, _, _) => HtlcId(channelId, id) + case UpdateFailMalformedHtlc(channelId, id, _, _, _) => HtlcId(channelId, id) + } + pending.get(htlcId) match { + case Some(channel: Hot.Channel) => + reputations(PeerEndorsement(channel)) = reputations(PeerEndorsement(channel)).record(htlcId, isSuccess = false) + case Some(trampoline: Hot.Trampoline) => + trampoline.received.foreach(channel => + reputations(PeerEndorsement(channel)) = reputations(PeerEndorsement(channel)).record(htlcId, isSuccess = false) + ) + case _ => () } - ReputationRecorder(reputationConfig, updatedReputations) - case RecordTrampolineSuccess(fees, relayId) => - val updatedReputations = fees.foldLeft(reputations) { - case (r, (peerEndorsement, fee)) => - val updatedReputation = reputations.getOrElse(peerEndorsement, Reputation.init(reputationConfig)).record(relayId, isSuccess = true, Some(fee)) - r.updated(peerEndorsement, updatedReputation) + pending -= htlcId + Behaviors.same + + case WrappedOutgoingHtlcFulfilled(OutgoingHtlcFulfilled(fulfill)) => + val htlcId = HtlcId(fulfill.channelId, fulfill.id) + pending.get(htlcId) match { + case Some(channel: Hot.Channel) => + reputations(PeerEndorsement(channel)) = reputations(PeerEndorsement(channel)).record(htlcId, isSuccess = true) + case Some(trampoline: Hot.Trampoline) => + trampoline.received.foreach(channel => + reputations(PeerEndorsement(channel)) = reputations(PeerEndorsement(channel)).record(htlcId, isSuccess = true) + ) + case _ => () } - ReputationRecorder(reputationConfig, updatedReputations) + pending -= htlcId + Behaviors.same } - } } diff --git a/eclair-core/src/test/scala/fr/acinq/eclair/payment/relay/ChannelRelayerSpec.scala b/eclair-core/src/test/scala/fr/acinq/eclair/payment/relay/ChannelRelayerSpec.scala index 34cbe20f55..47a32e6b2b 100644 --- a/eclair-core/src/test/scala/fr/acinq/eclair/payment/relay/ChannelRelayerSpec.scala +++ b/eclair-core/src/test/scala/fr/acinq/eclair/payment/relay/ChannelRelayerSpec.scala @@ -92,7 +92,6 @@ class ChannelRelayerSpec extends ScalaTestWithActorTestKit(ConfigFactory.load("a val fwd = register.expectMessageType[Register.Forward[channel.Command]] assert(fwd.message == cmd) assert(fwd.channelId == channelId) - reputationRecorder.expectMessageType[ReputationRecorder.CancelRelay] fwd } @@ -111,7 +110,7 @@ class ChannelRelayerSpec extends ScalaTestWithActorTestKit(ConfigFactory.load("a import f._ val getConfidence = reputationRecorder.expectMessageType[ReputationRecorder.GetConfidence] - assert(getConfidence.originNode == TestConstants.Alice.nodeParams.nodeId) + assert(getConfidence.upstream.receivedFrom == TestConstants.Alice.nodeParams.nodeId) getConfidence.replyTo ! ReputationRecorder.Confidence(value) } @@ -469,8 +468,6 @@ class ChannelRelayerSpec extends ScalaTestWithActorTestKit(ConfigFactory.load("a assert(fail.onionHash == Sphinx.hash(r.add.onionRoutingPacket)) assert(fail.failureCode == InvalidOnionBlinding(Sphinx.hash(r.add.onionRoutingPacket)).code) } - - reputationRecorder.expectMessageType[ReputationRecorder.CancelRelay] } } @@ -719,7 +716,6 @@ class ChannelRelayerSpec extends ScalaTestWithActorTestKit(ConfigFactory.load("a val fwdFail = register.expectMessageType[Register.Forward[channel.Command]] assert(fwdFail.message == testCase.cmd) assert(fwdFail.channelId == r.add.channelId) - assert(!reputationRecorder.expectMessageType[ReputationRecorder.RecordResult].isSuccess) } } @@ -761,7 +757,6 @@ class ChannelRelayerSpec extends ScalaTestWithActorTestKit(ConfigFactory.load("a assert(fail.onionHash == Sphinx.hash(r.add.onionRoutingPacket)) assert(fail.failureCode == InvalidOnionBlinding(Sphinx.hash(r.add.onionRoutingPacket)).code) } - assert(!reputationRecorder.expectMessageType[ReputationRecorder.RecordResult].isSuccess) } } } @@ -800,8 +795,6 @@ class ChannelRelayerSpec extends ScalaTestWithActorTestKit(ConfigFactory.load("a val paymentRelayed = eventListener.expectMessageType[ChannelPaymentRelayed] assert(paymentRelayed.copy(startedAt = 0 unixms, settledAt = 0 unixms) == ChannelPaymentRelayed(r.add.amountMsat, r.amountToForward, r.add.paymentHash, r.add.channelId, channelId1, startedAt = 0 unixms, settledAt = 0 unixms)) - - assert(reputationRecorder.expectMessageType[ReputationRecorder.RecordResult].isSuccess) } } diff --git a/eclair-core/src/test/scala/fr/acinq/eclair/payment/relay/NodeRelayerSpec.scala b/eclair-core/src/test/scala/fr/acinq/eclair/payment/relay/NodeRelayerSpec.scala index 978c87ddb0..8f4cdc1d78 100644 --- a/eclair-core/src/test/scala/fr/acinq/eclair/payment/relay/NodeRelayerSpec.scala +++ b/eclair-core/src/test/scala/fr/acinq/eclair/payment/relay/NodeRelayerSpec.scala @@ -46,7 +46,7 @@ import fr.acinq.eclair.payment.send.{BlindedRecipient, ClearRecipient} import fr.acinq.eclair.router.Router.{ChannelHop, HopRelayParams, RouteRequest} import fr.acinq.eclair.router.{BalanceTooLow, BlindedRouteCreation, RouteNotFound, Router} import fr.acinq.eclair.reputation.ReputationRecorder -import fr.acinq.eclair.reputation.ReputationRecorder.{Confidence, GetTrampolineConfidence, RecordTrampolineFailure, RecordTrampolineSuccess} +import fr.acinq.eclair.reputation.ReputationRecorder.{Confidence, GetTrampolineConfidence} import fr.acinq.eclair.wire.protocol.OfferTypes._ import fr.acinq.eclair.wire.protocol.PaymentOnion.{FinalPayload, IntermediatePayload} import fr.acinq.eclair.wire.protocol.RouteBlindingEncryptedDataCodecs.blindedRouteDataCodec @@ -395,7 +395,6 @@ class NodeRelayerSpec extends ScalaTestWithActorTestKit(ConfigFactory.load("appl assert(relayEvent.incoming.map(p => (p.amount, p.channelId)).toSet == incomingAsyncPayment.map(i => (i.add.amountMsat, i.add.channelId)).toSet) assert(relayEvent.outgoing.nonEmpty) parent.expectMessageType[NodeRelayer.RelayComplete] - reputationRecorder.expectMessageType[RecordTrampolineSuccess] register.expectNoMessage(100 millis) } @@ -486,7 +485,6 @@ class NodeRelayerSpec extends ScalaTestWithActorTestKit(ConfigFactory.load("appl assert(fwd.message == CMD_FAIL_HTLC(p.add.id, Right(TrampolineFeeInsufficient()), commit = true)) } - reputationRecorder.expectMessageType[RecordTrampolineFailure] register.expectNoMessage(100 millis) eventListener.expectNoMessage(100 millis) } @@ -514,7 +512,6 @@ class NodeRelayerSpec extends ScalaTestWithActorTestKit(ConfigFactory.load("appl assert(fwd.message == CMD_FAIL_HTLC(p.add.id, Right(TemporaryNodeFailure()), commit = true)) } - reputationRecorder.expectMessageType[RecordTrampolineFailure] register.expectNoMessage(100 millis) } @@ -540,7 +537,6 @@ class NodeRelayerSpec extends ScalaTestWithActorTestKit(ConfigFactory.load("appl assert(fwd.message == CMD_FAIL_HTLC(p.add.id, Right(TrampolineFeeInsufficient()), commit = true)) } - reputationRecorder.expectMessageType[RecordTrampolineFailure] register.expectNoMessage(100 millis) } @@ -565,7 +561,6 @@ class NodeRelayerSpec extends ScalaTestWithActorTestKit(ConfigFactory.load("appl assert(fwd.message == CMD_FAIL_HTLC(p.add.id, Right(FinalIncorrectHtlcAmount(42 msat)), commit = true)) } - reputationRecorder.expectMessageType[RecordTrampolineFailure] register.expectNoMessage(100 millis) } @@ -623,7 +618,6 @@ class NodeRelayerSpec extends ScalaTestWithActorTestKit(ConfigFactory.load("appl validateRelayEvent(relayEvent) assert(relayEvent.incoming.map(p => (p.amount, p.channelId)).toSet == incomingMultiPart.map(i => (i.add.amountMsat, i.add.channelId)).toSet) assert(relayEvent.outgoing.nonEmpty) - reputationRecorder.expectMessageType[RecordTrampolineSuccess] parent.expectMessageType[NodeRelayer.RelayComplete] register.expectNoMessage(100 millis) } @@ -655,7 +649,6 @@ class NodeRelayerSpec extends ScalaTestWithActorTestKit(ConfigFactory.load("appl validateRelayEvent(relayEvent) assert(relayEvent.incoming.map(p => (p.amount, p.channelId)) == Seq((incomingSinglePart.add.amountMsat, incomingSinglePart.add.channelId))) assert(relayEvent.outgoing.nonEmpty) - reputationRecorder.expectMessageType[RecordTrampolineSuccess] parent.expectMessageType[NodeRelayer.RelayComplete] register.expectNoMessage(100 millis) } @@ -779,7 +772,6 @@ class NodeRelayerSpec extends ScalaTestWithActorTestKit(ConfigFactory.load("appl validateRelayEvent(relayEvent) assert(relayEvent.incoming.map(p => (p.amount, p.channelId)) == incomingMultiPart.map(i => (i.add.amountMsat, i.add.channelId))) assert(relayEvent.outgoing.nonEmpty) - reputationRecorder.expectMessageType[RecordTrampolineSuccess] parent.expectMessageType[NodeRelayer.RelayComplete] register.expectNoMessage(100 millis) } @@ -826,7 +818,6 @@ class NodeRelayerSpec extends ScalaTestWithActorTestKit(ConfigFactory.load("appl validateRelayEvent(relayEvent) assert(relayEvent.incoming.map(p => (p.amount, p.channelId)) == incomingMultiPart.map(i => (i.add.amountMsat, i.add.channelId))) assert(relayEvent.outgoing.length == 1) - reputationRecorder.expectMessageType[RecordTrampolineSuccess] parent.expectMessageType[NodeRelayer.RelayComplete] register.expectNoMessage(100 millis) } diff --git a/eclair-core/src/test/scala/fr/acinq/eclair/payment/relay/RelayerSpec.scala b/eclair-core/src/test/scala/fr/acinq/eclair/payment/relay/RelayerSpec.scala index 4b374fa59a..406b6bd566 100644 --- a/eclair-core/src/test/scala/fr/acinq/eclair/payment/relay/RelayerSpec.scala +++ b/eclair-core/src/test/scala/fr/acinq/eclair/payment/relay/RelayerSpec.scala @@ -75,7 +75,7 @@ class RelayerSpec extends ScalaTestWithActorTestKit(ConfigFactory.load("applicat import f._ val getConfidence = reputationRecorder.expectMessageType[ReputationRecorder.GetConfidence] - assert(getConfidence.originNode == TestConstants.Alice.nodeParams.nodeId) + assert(getConfidence.upstream.receivedFrom == TestConstants.Alice.nodeParams.nodeId) getConfidence.replyTo ! ReputationRecorder.Confidence(value) } diff --git a/eclair-core/src/test/scala/fr/acinq/eclair/reputation/ReputationRecorderSpec.scala b/eclair-core/src/test/scala/fr/acinq/eclair/reputation/ReputationRecorderSpec.scala index 7228b3372d..ec47d15444 100644 --- a/eclair-core/src/test/scala/fr/acinq/eclair/reputation/ReputationRecorderSpec.scala +++ b/eclair-core/src/test/scala/fr/acinq/eclair/reputation/ReputationRecorderSpec.scala @@ -20,7 +20,9 @@ import akka.actor.testkit.typed.scaladsl.{ScalaTestWithActorTestKit, TestProbe} import akka.actor.typed.ActorRef import com.typesafe.config.ConfigFactory import fr.acinq.bitcoin.scalacompat.Crypto.PublicKey +import fr.acinq.eclair.channel.Upstream import fr.acinq.eclair.reputation.ReputationRecorder._ +import fr.acinq.eclair.wire.protocol.UpdateAddHtlc import fr.acinq.eclair.{MilliSatoshiLong, randomKey} import org.scalatest.Outcome import org.scalatest.funsuite.FixtureAnyFunSuiteLike @@ -37,7 +39,7 @@ class ReputationRecorderSpec extends ScalaTestWithActorTestKit(ConfigFactory.loa override def withFixture(test: OneArgTest): Outcome = { val config = Reputation.Config(enabled = true, 1 day, 10 seconds, 2) val replyTo = TestProbe[Confidence]("confidence") - val reputationRecorder = testKit.spawn(ReputationRecorder(config, Map.empty)) + val reputationRecorder = testKit.spawn(ReputationRecorder(config)) withFixture(test.toNoArgTest(FixtureParam(config, reputationRecorder.ref, replyTo))) }