diff --git a/docs/release-notes/eclair-vnext.md b/docs/release-notes/eclair-vnext.md index 61bd991621..bb101b59d5 100644 --- a/docs/release-notes/eclair-vnext.md +++ b/docs/release-notes/eclair-vnext.md @@ -26,20 +26,28 @@ Eclair will not allow remote peers to open new obsolete channels that do not sup ### Local reputation and HTLC endorsement -To protect against jamming attacks, eclair gives a reputation to its neighbors and uses to decide if a HTLC should be relayed given how congested is the outgoing channel. -The reputation is basically how much this node paid us in fees divided by how much they should have paid us for the liquidity and slots that they blocked. +To protect against jamming attacks, eclair gives a reputation to its neighbors and uses it to decide if a HTLC should be relayed given how congested the outgoing channel is. +The reputation is basically how much this node paid us in fees divided by how much they should have paid us for the liquidity and slots that they blocked. The reputation is per incoming node and endorsement level. The confidence that the HTLC will be fulfilled is transmitted to the next node using the endorsement TLV of the `update_add_htlc` message. +Note that HTLCs that are considered dangerous are still relayed: this is the first phase of a network-wide experimentation aimed at collecting data. To configure, edit `eclair.conf`: + ```eclair.conf -eclair.local-reputation { - # Reputation decays with the following half life to emphasize recent behavior. +// We assign reputations to our peers to prioritize payments during congestion. +// The reputation is computed as fees paid divided by what should have been paid if all payments were successful. +eclair.peer-reputation { + // Set this parameter to false to disable the reputation algorithm and simply relay the incoming endorsement + // value, as described by https://github.com/lightning/blips/blob/master/blip-0004.md, + enabled = true + // Reputation decays with the following half life to emphasize recent behavior. half-life = 7 days - # HTLCs that stay pending for longer than this get penalized - good-htlc-duration = 12 seconds - # How much to penalize pending HLTCs. A pending HTLC is considered equivalent to this many fast-failing HTLCs. - pending-multiplier = 1000 + // Payments that stay pending for longer than this get penalized + max-relay-duration = 12 seconds + // Pending payments are counted as failed, and because they could potentially stay pending for a very long time, + // the following multiplier is applied. + pending-multiplier = 1000 // A pending payment counts as a thousand failed ones. } ``` diff --git a/eclair-core/src/main/resources/reference.conf b/eclair-core/src/main/resources/reference.conf index 323da88202..c604a896e6 100644 --- a/eclair-core/src/main/resources/reference.conf +++ b/eclair-core/src/main/resources/reference.conf @@ -238,16 +238,19 @@ eclair { cancel-safety-before-timeout-blocks = 144 } - // We assign reputations to our peers to prioritize HTLCs during congestion. - // The reputation is computed as fees paid divided by what should have been paid if all HTLCs were successful. + // We assign reputation to our peers to prioritize payments during congestion. + // The reputation is computed as fees paid divided by what should have been paid if all payments were successful. peer-reputation { + // Set this parameter to false to disable the reputation algorithm and simply relay the incoming endorsement + // value, as described by https://github.com/lightning/blips/blob/master/blip-0004.md, + enabled = true // Reputation decays with the following half life to emphasize recent behavior. half-life = 7 days - // HTLCs that stay pending for longer than this get penalized - max-htlc-relay-duration = 12 seconds - // Pending HTLCs are counted as failed, and because they could potentially stay pending for a very long time, the - // following multiplier is applied. - pending-multiplier = 1000 // A pending HTLCs counts as a thousand failed ones. + // Payments that stay pending for longer than this get penalized. + max-relay-duration = 12 seconds + // Pending payments are counted as failed, and because they could potentially stay pending for a very long time, + // the following multiplier is applied. + pending-multiplier = 1000 // A pending payment counts as a thousand failed ones. } } diff --git a/eclair-core/src/main/scala/fr/acinq/eclair/NodeParams.scala b/eclair-core/src/main/scala/fr/acinq/eclair/NodeParams.scala index 92d6cb54fe..400995784b 100644 --- a/eclair-core/src/main/scala/fr/acinq/eclair/NodeParams.scala +++ b/eclair-core/src/main/scala/fr/acinq/eclair/NodeParams.scala @@ -31,7 +31,7 @@ import fr.acinq.eclair.io.MessageRelay.{RelayAll, RelayChannelsOnly, RelayPolicy import fr.acinq.eclair.io.PeerConnection import fr.acinq.eclair.message.OnionMessages.OnionMessageConfig import fr.acinq.eclair.payment.relay.Relayer.{AsyncPaymentsParams, RelayFees, RelayParams} -import fr.acinq.eclair.reputation.Reputation.ReputationConfig +import fr.acinq.eclair.reputation.Reputation import fr.acinq.eclair.router.Announcements.AddressException import fr.acinq.eclair.router.Graph.{HeuristicsConstants, WeightRatios} import fr.acinq.eclair.router.Router._ @@ -563,10 +563,11 @@ object NodeParams extends Logging { minTrampolineFees = getRelayFees(config.getConfig("relay.fees.min-trampoline")), enforcementDelay = FiniteDuration(config.getDuration("relay.fees.enforcement-delay").getSeconds, TimeUnit.SECONDS), asyncPaymentsParams = AsyncPaymentsParams(asyncPaymentHoldTimeoutBlocks, asyncPaymentCancelSafetyBeforeTimeoutBlocks), - peerReputationConfig = ReputationConfig( - FiniteDuration(config.getDuration("relay.peer-reputation.half-life").getSeconds, TimeUnit.SECONDS), - FiniteDuration(config.getDuration("relay.peer-reputation.max-htlc-relay-duration").getSeconds, TimeUnit.SECONDS), - config.getDouble("relay.peer-reputation.pending-multiplier"), + peerReputationConfig = Reputation.Config( + enabled = config.getBoolean("relay.peer-reputation.enabled"), + halfLife = FiniteDuration(config.getDuration("relay.peer-reputation.half-life").getSeconds, TimeUnit.SECONDS), + maxRelayDuration = FiniteDuration(config.getDuration("relay.peer-reputation.max-relay-duration").getSeconds, TimeUnit.SECONDS), + pendingMultiplier = config.getDouble("relay.peer-reputation.pending-multiplier"), ), ), db = database, diff --git a/eclair-core/src/main/scala/fr/acinq/eclair/Setup.scala b/eclair-core/src/main/scala/fr/acinq/eclair/Setup.scala index 2c0453554b..e2dfcf810f 100644 --- a/eclair-core/src/main/scala/fr/acinq/eclair/Setup.scala +++ b/eclair-core/src/main/scala/fr/acinq/eclair/Setup.scala @@ -361,8 +361,12 @@ class Setup(val datadir: File, offerManager = system.spawn(Behaviors.supervise(OfferManager(nodeParams, router, paymentTimeout = 1 minute)).onFailure(typed.SupervisorStrategy.resume), name = "offer-manager") paymentHandler = system.actorOf(SimpleSupervisor.props(PaymentHandler.props(nodeParams, register, offerManager), "payment-handler", SupervisorStrategy.Resume)) triggerer = system.spawn(Behaviors.supervise(AsyncPaymentTriggerer()).onFailure(typed.SupervisorStrategy.resume), name = "async-payment-triggerer") - reputationRecorder = system.spawn(Behaviors.supervise(ReputationRecorder(nodeParams.relayParams.peerReputationConfig, Map.empty)).onFailure(typed.SupervisorStrategy.resume), name = "reputation-recorder") - relayer = system.actorOf(SimpleSupervisor.props(Relayer.props(nodeParams, router, register, paymentHandler, triggerer, reputationRecorder, Some(postRestartCleanUpInitialized)), "relayer", SupervisorStrategy.Resume)) + reputationRecorder_opt = if (nodeParams.relayParams.peerReputationConfig.enabled) { + Some(system.spawn(Behaviors.supervise(ReputationRecorder(nodeParams.relayParams.peerReputationConfig, Map.empty)).onFailure(typed.SupervisorStrategy.resume), name = "reputation-recorder")) + } else { + None + } + relayer = system.actorOf(SimpleSupervisor.props(Relayer.props(nodeParams, router, register, paymentHandler, triggerer, reputationRecorder_opt, Some(postRestartCleanUpInitialized)), "relayer", SupervisorStrategy.Resume)) _ = relayer ! PostRestartHtlcCleaner.Init(channels) // Before initializing the switchboard (which re-connects us to the network) and the user-facing parts of the system, // we want to make sure the handler for post-restart broken HTLCs has finished initializing. diff --git a/eclair-core/src/main/scala/fr/acinq/eclair/payment/relay/ChannelRelay.scala b/eclair-core/src/main/scala/fr/acinq/eclair/payment/relay/ChannelRelay.scala index 414755065e..e53deeddab 100644 --- a/eclair-core/src/main/scala/fr/acinq/eclair/payment/relay/ChannelRelay.scala +++ b/eclair-core/src/main/scala/fr/acinq/eclair/payment/relay/ChannelRelay.scala @@ -45,6 +45,7 @@ object ChannelRelay { // @formatter:off sealed trait Command + private case object DoRelay extends Command private case class WrappedConfidence(confidence: Double) extends Command private case class WrappedForwardFailure(failure: Register.ForwardFailure[CMD_ADD_HTLC]) extends Command private case class WrappedAddResponse(res: CommandResponse[CMD_ADD_HTLC]) extends Command @@ -58,9 +59,9 @@ object ChannelRelay { def apply(nodeParams: NodeParams, register: ActorRef, - reputationRecorder: typed.ActorRef[ReputationRecorder.StandardCommand], + reputationRecorder_opt: Option[typed.ActorRef[ReputationRecorder.ChannelRelayCommand]], channels: Map[ByteVector32, Relayer.OutgoingChannel], - originNode:PublicKey, + originNode: PublicKey, relayId: UUID, r: IncomingPaymentPacket.ChannelRelayPacket): Behavior[Command] = Behaviors.setup { context => @@ -70,10 +71,17 @@ object ChannelRelay { paymentHash_opt = Some(r.add.paymentHash), nodeAlias_opt = Some(nodeParams.alias))) { val upstream = Upstream.Hot.Channel(r.add.removeUnknownTlvs(), TimestampMilli.now(), originNode) - reputationRecorder ! GetConfidence(context.messageAdapter[ReputationRecorder.Confidence](confidence => WrappedConfidence(confidence.value)), originNode, r.add.endorsement, relayId, r.relayFeeMsat) + reputationRecorder_opt match { + case Some(reputationRecorder) => + reputationRecorder ! GetConfidence(context.messageAdapter[ReputationRecorder.Confidence](confidence => WrappedConfidence(confidence.value)), originNode, r.add.endorsement, relayId, r.relayFeeMsat) + case None => + val confidence = (r.add.endorsement + 0.5) / 8 + context.self ! WrappedConfidence(confidence) + } Behaviors.receiveMessagePartial { case WrappedConfidence(confidence) => - new ChannelRelay(nodeParams, register, reputationRecorder, channels, r, upstream, confidence, context, relayId).relay(Seq.empty) + context.self ! DoRelay + new ChannelRelay(nodeParams, register, reputationRecorder_opt, channels, r, upstream, confidence, context, relayId).relay(Seq.empty) } } } @@ -115,7 +123,7 @@ object ChannelRelay { */ class ChannelRelay private(nodeParams: NodeParams, register: ActorRef, - reputationRecorder: typed.ActorRef[ReputationRecorder.StandardCommand], + reputationRecorder_opt: Option[typed.ActorRef[ReputationRecorder.ChannelRelayCommand]], channels: Map[ByteVector32, Relayer.OutgoingChannel], r: IncomingPaymentPacket.ChannelRelayPacket, upstream: Upstream.Hot.Channel, @@ -131,6 +139,8 @@ class ChannelRelay private(nodeParams: NodeParams, private case class PreviouslyTried(channelId: ByteVector32, failure: RES_ADD_FAILED[ChannelException]) def relay(previousFailures: Seq[PreviouslyTried]): Behavior[Command] = { + Behaviors.receiveMessagePartial { + case DoRelay => if (previousFailures.isEmpty) { context.log.info("relaying htlc #{} from channelId={} to requestedShortChannelId={} nextNode={}", r.add.id, r.add.channelId, r.payload.outgoingChannelId, nextNodeId_opt.getOrElse("")) } @@ -139,13 +149,14 @@ class ChannelRelay private(nodeParams: NodeParams, case RelayFailure(cmdFail) => Metrics.recordPaymentRelayFailed(Tags.FailureType(cmdFail), Tags.RelayType.Channel) context.log.info("rejecting htlc reason={}", cmdFail.reason) - reputationRecorder ! CancelRelay(upstream.receivedFrom, r.add.endorsement, relayId) + reputationRecorder_opt.foreach(_ ! CancelRelay(upstream.receivedFrom, r.add.endorsement, relayId)) safeSendAndStop(r.add.channelId, cmdFail) case RelaySuccess(selectedChannelId, cmdAdd) => context.log.info("forwarding htlc #{} from channelId={} to channelId={}", r.add.id, r.add.channelId, selectedChannelId) register ! Register.Forward(forwardFailureAdapter, selectedChannelId, cmdAdd) waitForAddResponse(selectedChannelId, previousFailures) } + } } def waitForAddResponse(selectedChannelId: ByteVector32, previousFailures: Seq[PreviouslyTried]): Behavior[Command] = @@ -154,11 +165,12 @@ class ChannelRelay private(nodeParams: NodeParams, context.log.warn(s"couldn't resolve downstream channel $channelId, failing htlc #${upstream.add.id}") val cmdFail = CMD_FAIL_HTLC(upstream.add.id, Right(UnknownNextPeer()), commit = true) Metrics.recordPaymentRelayFailed(Tags.FailureType(cmdFail), Tags.RelayType.Channel) - reputationRecorder ! CancelRelay(upstream.receivedFrom, r.add.endorsement, relayId) + reputationRecorder_opt.foreach(_ ! CancelRelay(upstream.receivedFrom, r.add.endorsement, relayId)) safeSendAndStop(upstream.add.channelId, cmdFail) case WrappedAddResponse(addFailed: RES_ADD_FAILED[_]) => context.log.info("attempt failed with reason={}", addFailed.t.getClass.getSimpleName) + context.self ! DoRelay relay(previousFailures :+ PreviouslyTried(selectedChannelId, addFailed)) case WrappedAddResponse(r: RES_SUCCESS[_]) => @@ -331,7 +343,7 @@ class ChannelRelay private(nodeParams: NodeParams, } private def recordRelayDuration(isSuccess: Boolean): Unit = { - reputationRecorder ! RecordResult(upstream.receivedFrom, r.add.endorsement, relayId, isSuccess) + reputationRecorder_opt.foreach(_ ! RecordResult(upstream.receivedFrom, r.add.endorsement, relayId, isSuccess)) Metrics.RelayedPaymentDuration .withTag(Tags.Relay, Tags.RelayType.Channel) .withTag(Tags.Success, isSuccess) diff --git a/eclair-core/src/main/scala/fr/acinq/eclair/payment/relay/ChannelRelayer.scala b/eclair-core/src/main/scala/fr/acinq/eclair/payment/relay/ChannelRelayer.scala index cb1e51610f..d5d4d11b7f 100644 --- a/eclair-core/src/main/scala/fr/acinq/eclair/payment/relay/ChannelRelayer.scala +++ b/eclair-core/src/main/scala/fr/acinq/eclair/payment/relay/ChannelRelayer.scala @@ -59,7 +59,7 @@ object ChannelRelayer { def apply(nodeParams: NodeParams, register: ActorRef, - reputationRecorder: typed.ActorRef[ReputationRecorder.StandardCommand], + reputationRecorder_opt: Option[typed.ActorRef[ReputationRecorder.ChannelRelayCommand]], channels: Map[ByteVector32, Relayer.OutgoingChannel] = Map.empty, scid2channels: Map[ShortChannelId, ByteVector32] = Map.empty, node2channels: mutable.MultiDict[PublicKey, ByteVector32] = mutable.MultiDict.empty): Behavior[Command] = @@ -81,7 +81,7 @@ object ChannelRelayer { case None => Map.empty } context.log.debug(s"spawning a new handler with relayId=$relayId to nextNodeId={} with channels={}", nextNodeId_opt.getOrElse(""), nextChannels.keys.mkString(",")) - context.spawn(ChannelRelay.apply(nodeParams, register, reputationRecorder, nextChannels, originNode, relayId, channelRelayPacket), name = relayId.toString) + context.spawn(ChannelRelay.apply(nodeParams, register, reputationRecorder_opt, nextChannels, originNode, relayId, channelRelayPacket), name = relayId.toString) Behaviors.same case GetOutgoingChannels(replyTo, Relayer.GetOutgoingChannels(enabledOnly)) => @@ -102,14 +102,14 @@ object ChannelRelayer { context.log.debug("adding mappings={} to channelId={}", mappings.keys.mkString(","), channelId) val scid2channels1 = scid2channels ++ mappings val node2channels1 = node2channels.addOne(remoteNodeId, channelId) - apply(nodeParams, register, reputationRecorder, channels1, scid2channels1, node2channels1) + apply(nodeParams, register, reputationRecorder_opt, channels1, scid2channels1, node2channels1) case WrappedLocalChannelDown(LocalChannelDown(_, channelId, shortIds, remoteNodeId)) => context.log.debug(s"removed local channel info for channelId=$channelId localAlias=${shortIds.localAlias}") val channels1 = channels - channelId val scid2Channels1 = scid2channels - shortIds.localAlias -- shortIds.real.toOption val node2channels1 = node2channels.subtractOne(remoteNodeId, channelId) - apply(nodeParams, register, reputationRecorder, channels1, scid2Channels1, node2channels1) + apply(nodeParams, register, reputationRecorder_opt, channels1, scid2Channels1, node2channels1) case WrappedAvailableBalanceChanged(AvailableBalanceChanged(_, channelId, shortIds, commitments)) => val channels1 = channels.get(channelId) match { @@ -118,7 +118,7 @@ object ChannelRelayer { channels + (channelId -> c.copy(commitments = commitments)) case None => channels // we only consider the balance if we have the channel_update } - apply(nodeParams, register, reputationRecorder, channels1, scid2channels, node2channels) + apply(nodeParams, register, reputationRecorder_opt, channels1, scid2channels, node2channels) } } diff --git a/eclair-core/src/main/scala/fr/acinq/eclair/payment/relay/NodeRelay.scala b/eclair-core/src/main/scala/fr/acinq/eclair/payment/relay/NodeRelay.scala index 08c767a67e..8710f9fb8f 100644 --- a/eclair-core/src/main/scala/fr/acinq/eclair/payment/relay/NodeRelay.scala +++ b/eclair-core/src/main/scala/fr/acinq/eclair/payment/relay/NodeRelay.scala @@ -37,7 +37,6 @@ import fr.acinq.eclair.payment.send.PaymentInitiator.SendPaymentConfig import fr.acinq.eclair.payment.send.PaymentLifecycle.SendPaymentToNode import fr.acinq.eclair.payment.send._ import fr.acinq.eclair.reputation.ReputationRecorder -import fr.acinq.eclair.reputation.ReputationRecorder.{GetTrampolineConfidence, RecordTrampolineFailure, RecordTrampolineSuccess} import fr.acinq.eclair.router.Router.RouteParams import fr.acinq.eclair.router.{BalanceTooLow, RouteNotFound} import fr.acinq.eclair.wire.protocol.PaymentOnion.IntermediatePayload @@ -88,7 +87,7 @@ object NodeRelay { def apply(nodeParams: NodeParams, parent: typed.ActorRef[NodeRelayer.Command], register: ActorRef, - reputationRecorder: typed.ActorRef[ReputationRecorder.TrampolineCommand], + reputationRecorder_opt: Option[typed.ActorRef[ReputationRecorder.TrampolineRelayCommand]], relayId: UUID, nodeRelayPacket: NodeRelayPacket, outgoingPaymentFactory: OutgoingPaymentFactory, @@ -112,7 +111,7 @@ object NodeRelay { case IncomingPaymentPacket.RelayToTrampolinePacket(_, _, _, nextPacket) => Some(nextPacket) case _: IncomingPaymentPacket.RelayToBlindedPathsPacket => None } - new NodeRelay(nodeParams, parent, register, reputationRecorder, relayId, paymentHash, nodeRelayPacket.outerPayload.paymentSecret, context, outgoingPaymentFactory, triggerer, router) + new NodeRelay(nodeParams, parent, register, reputationRecorder_opt, relayId, paymentHash, nodeRelayPacket.outerPayload.paymentSecret, context, outgoingPaymentFactory, triggerer, router) .receiving(Queue.empty, nodeRelayPacket.innerPayload, nextPacket_opt, incomingPaymentHandler) } } @@ -187,7 +186,7 @@ object NodeRelay { class NodeRelay private(nodeParams: NodeParams, parent: akka.actor.typed.ActorRef[NodeRelayer.Command], register: ActorRef, - reputationRecorder: typed.ActorRef[ReputationRecorder.TrampolineCommand], + reputationRecorder_opt: Option[typed.ActorRef[ReputationRecorder.TrampolineRelayCommand]], relayId: UUID, paymentHash: ByteVector32, paymentSecret: ByteVector32, @@ -265,10 +264,13 @@ class NodeRelay private(nodeParams: NodeParams, private def doSend(upstream: Upstream.Hot.Trampoline, nextPayload: IntermediatePayload.NodeRelay, nextPacket_opt: Option[OnionRoutingPacket]): Behavior[Command] = { context.log.debug(s"relaying trampoline payment (amountIn=${upstream.amountIn} expiryIn=${upstream.expiryIn} amountOut=${nextPayload.amountToForward} expiryOut=${nextPayload.outgoingCltv})") val totalFee = upstream.amountIn - nextPayload.amountToForward - val fees = upstream.received.foldLeft(Map.empty[(PublicKey, Int), MilliSatoshi])((fees, r) => - fees.updatedWith((r.receivedFrom, r.add.endorsement))(fee => + val fees = upstream.received.foldLeft(Map.empty[ReputationRecorder.PeerEndorsement, MilliSatoshi])((fees, r) => + fees.updatedWith(ReputationRecorder.PeerEndorsement(r.receivedFrom, r.add.endorsement))(fee => Some(fee.getOrElse(MilliSatoshi(0)) + r.add.amountMsat * totalFee.toLong / upstream.amountIn.toLong))) - reputationRecorder ! GetTrampolineConfidence(context.messageAdapter[ReputationRecorder.Confidence](confidence => WrappedConfidence(confidence.value)), fees, relayId) + reputationRecorder_opt match { + case Some(reputationRecorder) => reputationRecorder ! ReputationRecorder.GetTrampolineConfidence(context.messageAdapter[ReputationRecorder.Confidence](confidence => WrappedConfidence(confidence.value)), fees, relayId) + case None => context.self ! WrappedConfidence((upstream.received.map(_.add.endorsement).min + 0.5) / 8) + } Behaviors.receiveMessagePartial { rejectExtraHtlcPartialFunction orElse { case WrappedConfidence(confidence) => @@ -302,10 +304,10 @@ class NodeRelay private(nodeParams: NodeParams, context.log.debug("trampoline payment fully resolved downstream") success(upstream, fulfilledUpstream, paymentSent) val totalFee = upstream.amountIn - paymentSent.amountWithFees - val fees = upstream.received.foldLeft(Map.empty[(PublicKey, Int), MilliSatoshi])((fees, r) => - fees.updatedWith((r.receivedFrom, r.add.endorsement))(fee => + val fees = upstream.received.foldLeft(Map.empty[ReputationRecorder.PeerEndorsement, MilliSatoshi])((fees, r) => + fees.updatedWith(ReputationRecorder.PeerEndorsement(r.receivedFrom, r.add.endorsement))(fee => Some(fee.getOrElse(MilliSatoshi(0)) + r.add.amountMsat * totalFee.toLong / upstream.amountIn.toLong))) - reputationRecorder ! RecordTrampolineSuccess(fees, relayId) + reputationRecorder_opt.foreach(_ ! ReputationRecorder.RecordTrampolineSuccess(fees, relayId)) recordRelayDuration(startedAt, isSuccess = true) stopping() case WrappedPaymentFailed(PaymentFailed(_, _, failures, _)) => @@ -313,7 +315,7 @@ class NodeRelay private(nodeParams: NodeParams, if (!fulfilledUpstream) { rejectPayment(upstream, translateError(nodeParams, failures, upstream, nextPayload)) } - reputationRecorder ! RecordTrampolineFailure(upstream.received.map(r => (r.receivedFrom, r.add.endorsement)).toSet, relayId) + reputationRecorder_opt.foreach(_ ! ReputationRecorder.RecordTrampolineFailure(upstream.received.map(r => ReputationRecorder.PeerEndorsement(r.receivedFrom, r.add.endorsement)).toSet, relayId)) recordRelayDuration(startedAt, isSuccess = fulfilledUpstream) stopping() } diff --git a/eclair-core/src/main/scala/fr/acinq/eclair/payment/relay/NodeRelayer.scala b/eclair-core/src/main/scala/fr/acinq/eclair/payment/relay/NodeRelayer.scala index 9d413124d7..8db24382b4 100644 --- a/eclair-core/src/main/scala/fr/acinq/eclair/payment/relay/NodeRelayer.scala +++ b/eclair-core/src/main/scala/fr/acinq/eclair/payment/relay/NodeRelayer.scala @@ -61,7 +61,7 @@ object NodeRelayer { */ def apply(nodeParams: NodeParams, register: akka.actor.ActorRef, - reputationRecorder: typed.ActorRef[ReputationRecorder.TrampolineCommand], + reputationRecorder_opt: Option[typed.ActorRef[ReputationRecorder.TrampolineRelayCommand]], outgoingPaymentFactory: NodeRelay.OutgoingPaymentFactory, triggerer: typed.ActorRef[AsyncPaymentTriggerer.Command], router: akka.actor.ActorRef, @@ -80,15 +80,15 @@ object NodeRelayer { case None => val relayId = UUID.randomUUID() context.log.debug(s"spawning a new handler with relayId=$relayId") - val handler = context.spawn(NodeRelay.apply(nodeParams, context.self, register, reputationRecorder, relayId, nodeRelayPacket, outgoingPaymentFactory, triggerer, router), relayId.toString) + val handler = context.spawn(NodeRelay.apply(nodeParams, context.self, register, reputationRecorder_opt, relayId, nodeRelayPacket, outgoingPaymentFactory, triggerer, router), relayId.toString) context.log.debug("forwarding incoming htlc #{} from channel {} to new handler", htlcIn.id, htlcIn.channelId) handler ! NodeRelay.Relay(nodeRelayPacket, originNode) - apply(nodeParams, register, reputationRecorder, outgoingPaymentFactory, triggerer, router, children + (childKey -> handler)) + apply(nodeParams, register, reputationRecorder_opt, outgoingPaymentFactory, triggerer, router, children + (childKey -> handler)) } case RelayComplete(childHandler, paymentHash, paymentSecret) => // we do a back-and-forth between parent and child before stopping the child to prevent a race condition childHandler ! NodeRelay.Stop - apply(nodeParams, register, reputationRecorder, outgoingPaymentFactory, triggerer, router, children - PaymentKey(paymentHash, paymentSecret)) + apply(nodeParams, register, reputationRecorder_opt, outgoingPaymentFactory, triggerer, router, children - PaymentKey(paymentHash, paymentSecret)) case GetPendingPayments(replyTo) => replyTo ! children Behaviors.same diff --git a/eclair-core/src/main/scala/fr/acinq/eclair/payment/relay/Relayer.scala b/eclair-core/src/main/scala/fr/acinq/eclair/payment/relay/Relayer.scala index 51a53016bf..b0ac3e2cb5 100644 --- a/eclair-core/src/main/scala/fr/acinq/eclair/payment/relay/Relayer.scala +++ b/eclair-core/src/main/scala/fr/acinq/eclair/payment/relay/Relayer.scala @@ -28,8 +28,7 @@ import fr.acinq.bitcoin.scalacompat.Crypto.PublicKey import fr.acinq.eclair.channel._ import fr.acinq.eclair.db.PendingCommandsDb import fr.acinq.eclair.payment._ -import fr.acinq.eclair.reputation.Reputation.ReputationConfig -import fr.acinq.eclair.reputation.ReputationRecorder +import fr.acinq.eclair.reputation.{Reputation, ReputationRecorder} import fr.acinq.eclair.wire.protocol._ import fr.acinq.eclair.{CltvExpiryDelta, Logs, MilliSatoshi, NodeParams} import grizzled.slf4j.Logging @@ -51,7 +50,7 @@ import scala.util.Random * It also receives channel HTLC events (fulfill / failed) and relays those to the appropriate handlers. * It also maintains an up-to-date view of local channel balances. */ -class Relayer(nodeParams: NodeParams, router: ActorRef, register: ActorRef, paymentHandler: ActorRef, triggerer: typed.ActorRef[AsyncPaymentTriggerer.Command], reputationRecorder: typed.ActorRef[ReputationRecorder.Command], initialized: Option[Promise[Done]] = None) extends Actor with DiagnosticActorLogging { +class Relayer(nodeParams: NodeParams, router: ActorRef, register: ActorRef, paymentHandler: ActorRef, triggerer: typed.ActorRef[AsyncPaymentTriggerer.Command], reputationRecorder_opt: Option[typed.ActorRef[ReputationRecorder.Command]], initialized: Option[Promise[Done]] = None) extends Actor with DiagnosticActorLogging { import Relayer._ @@ -59,8 +58,8 @@ class Relayer(nodeParams: NodeParams, router: ActorRef, register: ActorRef, paym implicit def implicitLog: LoggingAdapter = log private val postRestartCleaner = context.actorOf(PostRestartHtlcCleaner.props(nodeParams, register, initialized), "post-restart-htlc-cleaner") - private val channelRelayer = context.spawn(Behaviors.supervise(ChannelRelayer(nodeParams, register, reputationRecorder)).onFailure(SupervisorStrategy.resume), "channel-relayer") - private val nodeRelayer = context.spawn(Behaviors.supervise(NodeRelayer(nodeParams, register, reputationRecorder, NodeRelay.SimpleOutgoingPaymentFactory(nodeParams, router, register), triggerer, router)).onFailure(SupervisorStrategy.resume), name = "node-relayer") + private val channelRelayer = context.spawn(Behaviors.supervise(ChannelRelayer(nodeParams, register, reputationRecorder_opt)).onFailure(SupervisorStrategy.resume), "channel-relayer") + private val nodeRelayer = context.spawn(Behaviors.supervise(NodeRelayer(nodeParams, register, reputationRecorder_opt, NodeRelay.SimpleOutgoingPaymentFactory(nodeParams, router, register), triggerer, router)).onFailure(SupervisorStrategy.resume), name = "node-relayer") def receive: Receive = { case init: PostRestartHtlcCleaner.Init => postRestartCleaner forward init @@ -122,8 +121,8 @@ class Relayer(nodeParams: NodeParams, router: ActorRef, register: ActorRef, paym object Relayer extends Logging { - def props(nodeParams: NodeParams, router: ActorRef, register: ActorRef, paymentHandler: ActorRef, triggerer: typed.ActorRef[AsyncPaymentTriggerer.Command], reputationRecorder: typed.ActorRef[ReputationRecorder.Command], initialized: Option[Promise[Done]] = None): Props = - Props(new Relayer(nodeParams, router, register, paymentHandler, triggerer, reputationRecorder, initialized)) + def props(nodeParams: NodeParams, router: ActorRef, register: ActorRef, paymentHandler: ActorRef, triggerer: typed.ActorRef[AsyncPaymentTriggerer.Command], reputationRecorder_opt: Option[typed.ActorRef[ReputationRecorder.Command]], initialized: Option[Promise[Done]] = None): Props = + Props(new Relayer(nodeParams, router, register, paymentHandler, triggerer, reputationRecorder_opt, initialized)) // @formatter:off case class RelayFees(feeBase: MilliSatoshi, feeProportionalMillionths: Long) { @@ -138,7 +137,7 @@ object Relayer extends Logging { minTrampolineFees: RelayFees, enforcementDelay: FiniteDuration, asyncPaymentsParams: AsyncPaymentsParams, - peerReputationConfig: ReputationConfig) { + peerReputationConfig: Reputation.Config) { def defaultFees(announceChannel: Boolean): RelayFees = { if (announceChannel) { publicChannelFees diff --git a/eclair-core/src/main/scala/fr/acinq/eclair/reputation/Reputation.scala b/eclair-core/src/main/scala/fr/acinq/eclair/reputation/Reputation.scala index b2b7e52fbd..f74c0aca67 100644 --- a/eclair-core/src/main/scala/fr/acinq/eclair/reputation/Reputation.scala +++ b/eclair-core/src/main/scala/fr/acinq/eclair/reputation/Reputation.scala @@ -16,67 +16,80 @@ package fr.acinq.eclair.reputation -import fr.acinq.eclair.reputation.Reputation.Pending import fr.acinq.eclair.{MilliSatoshi, TimestampMilli} import java.util.UUID import scala.concurrent.duration.FiniteDuration -/** Local reputation per incoming node and endorsement level +/** + * Created by thomash on 21/07/2023. + */ + +/** + * Local reputation for a given incoming node, that should be track for each incoming endorsement level. * - * @param pastWeight How much fees we would have collected in the past if all HTLCs had succeeded (exponential moving average). + * @param pastWeight How much fees we would have collected in the past if all payments had succeeded (exponential moving average). * @param pastScore How much fees we have collected in the past (exponential moving average). - * @param lastSettlementAt Timestamp of the last recorded HTLC settlement. - * @param pending Set of pending HTLCs. + * @param lastSettlementAt Timestamp of the last recorded payment settlement. + * @param pending Set of pending payments (payments may contain multiple HTLCs when using trampoline). * @param halfLife Half life for the exponential moving average. - * @param goodDuration Duration after which HTLCs are penalized for staying pending too long. - * @param pendingMultiplier How much to penalize pending HTLCs. + * @param maxRelayDuration Duration after which payments are penalized for staying pending too long. + * @param pendingMultiplier How much to penalize pending payments. */ -case class Reputation(pastWeight: Double, pastScore: Double, lastSettlementAt: TimestampMilli, pending: Map[UUID, Pending], halfLife: FiniteDuration, goodDuration: FiniteDuration, pendingMultiplier: Double) { +case class Reputation(pastWeight: Double, pastScore: Double, lastSettlementAt: TimestampMilli, pending: Map[UUID, Reputation.PendingPayment], halfLife: FiniteDuration, maxRelayDuration: FiniteDuration, pendingMultiplier: Double) { private def decay(now: TimestampMilli): Double = scala.math.pow(0.5, (now - lastSettlementAt) / halfLife) - private def pendingWeight(now: TimestampMilli): Double = pending.values.map(_.weight(now, goodDuration, pendingMultiplier)).sum + private def pendingWeight(now: TimestampMilli): Double = pending.values.map(_.weight(now, maxRelayDuration, pendingMultiplier)).sum - /** Register a HTLC to relay and estimate the confidence that it will succeed. + /** + * Register a payment to relay and estimate the confidence that it will succeed. + * * @return (updated reputation, confidence) */ def attempt(relayId: UUID, fee: MilliSatoshi, now: TimestampMilli = TimestampMilli.now()): (Reputation, Double) = { val d = decay(now) - val newReputation = copy(pending = pending + (relayId -> Pending(fee, now))) + val newReputation = copy(pending = pending + (relayId -> Reputation.PendingPayment(fee, now))) val confidence = d * pastScore / (d * pastWeight + newReputation.pendingWeight(now)) (newReputation, confidence) } - /** Mark a previously registered HTLC as failed without trying to relay it (usually because its confidence was too low). + /** + * Mark a previously registered payment as failed without trying to relay it (usually because its confidence was too low). + * * @return updated reputation */ def cancel(relayId: UUID): Reputation = copy(pending = pending - relayId) - /** When a HTLC is settled, we record whether it succeeded and how long it took. + /** + * When a payment is settled, we record whether it succeeded and how long it took. * * @param feeOverride When relaying trampoline payments, the actual fee is only known when the payment succeeds. This * is used instead of the fee upper bound that was known when first attempting the relay. * @return updated reputation */ def record(relayId: UUID, isSuccess: Boolean, feeOverride: Option[MilliSatoshi] = None, now: TimestampMilli = TimestampMilli.now()): Reputation = { - val d = decay(now) - var p = pending.getOrElse(relayId, Pending(MilliSatoshi(0), now)) - feeOverride.foreach(fee => p = p.copy(fee = fee)) - val newWeight = d * pastWeight + p.weight(now, goodDuration, 1.0) - val newScore = d * pastScore + (if (isSuccess) p.fee.toLong.toDouble else 0) - Reputation(newWeight, newScore, now, pending - relayId, halfLife, goodDuration, pendingMultiplier) + pending.get(relayId) match { + case Some(p) => + val d = decay(now) + val p1 = p.copy(fee = feeOverride.getOrElse(p.fee)) + val newWeight = d * pastWeight + p1.weight(now, maxRelayDuration, 1.0) + val newScore = d * pastScore + (if (isSuccess) p1.fee.toLong.toDouble else 0) + Reputation(newWeight, newScore, now, pending - relayId, halfLife, maxRelayDuration, pendingMultiplier) + case None => this + } } } object Reputation { - case class Pending(fee: MilliSatoshi, startedAt: TimestampMilli) { + /** We're relaying that payment and are waiting for it to settle. */ + case class PendingPayment(fee: MilliSatoshi, startedAt: TimestampMilli) { def weight(now: TimestampMilli, minDuration: FiniteDuration, multiplier: Double): Double = { val duration = now - startedAt fee.toLong.toDouble * (duration / minDuration).max(multiplier) } } - case class ReputationConfig(halfLife: FiniteDuration, maxHtlcRelayDuration: FiniteDuration, pendingMultiplier: Double) + case class Config(enabled: Boolean, halfLife: FiniteDuration, maxRelayDuration: FiniteDuration, pendingMultiplier: Double) - def init(config: ReputationConfig): Reputation = Reputation(0.0, 0.0, TimestampMilli.min, Map.empty, config.halfLife, config.maxHtlcRelayDuration, config.pendingMultiplier) + def init(config: Config): Reputation = Reputation(0.0, 0.0, TimestampMilli.min, Map.empty, config.halfLife, config.maxRelayDuration, config.pendingMultiplier) } diff --git a/eclair-core/src/main/scala/fr/acinq/eclair/reputation/ReputationRecorder.scala b/eclair-core/src/main/scala/fr/acinq/eclair/reputation/ReputationRecorder.scala index b10fd218f7..77982ebfbe 100644 --- a/eclair-core/src/main/scala/fr/acinq/eclair/reputation/ReputationRecorder.scala +++ b/eclair-core/src/main/scala/fr/acinq/eclair/reputation/ReputationRecorder.scala @@ -19,55 +19,70 @@ package fr.acinq.eclair.reputation import akka.actor.typed.scaladsl.Behaviors import akka.actor.typed.{ActorRef, Behavior} import fr.acinq.bitcoin.scalacompat.Crypto.PublicKey -import fr.acinq.eclair.reputation.Reputation.ReputationConfig import fr.acinq.eclair.MilliSatoshi import java.util.UUID +/** + * Created by thomash on 21/07/2023. + */ + object ReputationRecorder { + // @formatter:off sealed trait Command - sealed trait StandardCommand extends Command - case class GetConfidence(replyTo: ActorRef[Confidence], originNode: PublicKey, endorsement: Int, relayId: UUID, fee: MilliSatoshi) extends StandardCommand - case class CancelRelay(originNode: PublicKey, endorsement: Int, relayId: UUID) extends StandardCommand - case class RecordResult(originNode: PublicKey, endorsement: Int, relayId: UUID, isSuccess: Boolean) extends StandardCommand + sealed trait ChannelRelayCommand extends Command + case class GetConfidence(replyTo: ActorRef[Confidence], originNode: PublicKey, endorsement: Int, relayId: UUID, fee: MilliSatoshi) extends ChannelRelayCommand + case class CancelRelay(originNode: PublicKey, endorsement: Int, relayId: UUID) extends ChannelRelayCommand + case class RecordResult(originNode: PublicKey, endorsement: Int, relayId: UUID, isSuccess: Boolean) extends ChannelRelayCommand + + sealed trait TrampolineRelayCommand extends Command + case class GetTrampolineConfidence(replyTo: ActorRef[Confidence], fees: Map[PeerEndorsement, MilliSatoshi], relayId: UUID) extends TrampolineRelayCommand + case class RecordTrampolineFailure(upstream: Set[PeerEndorsement], relayId: UUID) extends TrampolineRelayCommand + case class RecordTrampolineSuccess(fees: Map[PeerEndorsement, MilliSatoshi], relayId: UUID) extends TrampolineRelayCommand + // @formatter:on - sealed trait TrampolineCommand extends Command - case class GetTrampolineConfidence(replyTo: ActorRef[Confidence], fees: Map[(PublicKey, Int), MilliSatoshi], relayId: UUID) extends TrampolineCommand - case class RecordTrampolineFailure(keys: Set[(PublicKey, Int)], relayId: UUID) extends TrampolineCommand - case class RecordTrampolineSuccess(fees: Map[(PublicKey, Int), MilliSatoshi], relayId: UUID) extends TrampolineCommand + /** + * @param nodeId nodeId of the upstream peer. + * @param endorsement endorsement value set by the upstream peer in the HTLC we received. + */ + case class PeerEndorsement(nodeId: PublicKey, endorsement: Int) + /** Confidence that the outgoing HTLC will succeed. */ case class Confidence(value: Double) - def apply(reputationConfig: ReputationConfig, reputations: Map[(PublicKey, Int), Reputation]): Behavior[Command] = { + def apply(reputationConfig: Reputation.Config, reputations: Map[PeerEndorsement, Reputation]): Behavior[Command] = { Behaviors.receiveMessage { case GetConfidence(replyTo, originNode, endorsement, relayId, fee) => - val (updatedReputation, confidence) = reputations.getOrElse((originNode, endorsement), Reputation.init(reputationConfig)).attempt(relayId, fee) + val (updatedReputation, confidence) = reputations.getOrElse(PeerEndorsement(originNode, endorsement), Reputation.init(reputationConfig)).attempt(relayId, fee) replyTo ! Confidence(confidence) - ReputationRecorder(reputationConfig, reputations.updated((originNode, endorsement), updatedReputation)) + ReputationRecorder(reputationConfig, reputations.updated(PeerEndorsement(originNode, endorsement), updatedReputation)) case CancelRelay(originNode, endorsement, relayId) => - val updatedReputation = reputations.getOrElse((originNode, endorsement), Reputation.init(reputationConfig)).cancel(relayId) - ReputationRecorder(reputationConfig, reputations.updated((originNode, endorsement), updatedReputation)) + val updatedReputation = reputations.getOrElse(PeerEndorsement(originNode, endorsement), Reputation.init(reputationConfig)).cancel(relayId) + ReputationRecorder(reputationConfig, reputations.updated(PeerEndorsement(originNode, endorsement), updatedReputation)) case RecordResult(originNode, endorsement, relayId, isSuccess) => - val updatedReputation = reputations.getOrElse((originNode, endorsement), Reputation.init(reputationConfig)).record(relayId, isSuccess) - ReputationRecorder(reputationConfig, reputations.updated((originNode, endorsement), updatedReputation)) + val updatedReputation = reputations.getOrElse(PeerEndorsement(originNode, endorsement), Reputation.init(reputationConfig)).record(relayId, isSuccess) + ReputationRecorder(reputationConfig, reputations.updated(PeerEndorsement(originNode, endorsement), updatedReputation)) case GetTrampolineConfidence(replyTo, fees, relayId) => - val (confidence, updatedReputations) = fees.foldLeft((1.0, reputations)){case ((c, r), ((originNode, endorsement), fee)) => - val (updatedReputation, confidence) = reputations.getOrElse((originNode, endorsement), Reputation.init(reputationConfig)).attempt(relayId, fee) - (c.min(confidence), r.updated((originNode, endorsement), updatedReputation)) + val (confidence, updatedReputations) = fees.foldLeft((1.0, reputations)) { + case ((c, r), (peerEndorsement, fee)) => + val (updatedReputation, confidence) = reputations.getOrElse(peerEndorsement, Reputation.init(reputationConfig)).attempt(relayId, fee) + (c.min(confidence), r.updated(peerEndorsement, updatedReputation)) } replyTo ! Confidence(confidence) ReputationRecorder(reputationConfig, updatedReputations) case RecordTrampolineFailure(keys, relayId) => - val updatedReputations = keys.foldLeft(reputations) { case (r, (originNode, endorsement)) => - val updatedReputation = reputations.getOrElse((originNode, endorsement), Reputation.init(reputationConfig)).record(relayId, isSuccess = false) - r.updated((originNode, endorsement), updatedReputation) + val updatedReputations = keys.foldLeft(reputations) { + case (r, peerEndorsement) => + val updatedReputation = reputations.getOrElse(peerEndorsement, Reputation.init(reputationConfig)).record(relayId, isSuccess = false) + r.updated(peerEndorsement, updatedReputation) } ReputationRecorder(reputationConfig, updatedReputations) case RecordTrampolineSuccess(fees, relayId) => - val updatedReputations = fees.foldLeft(reputations) { case (r, ((originNode, endorsement), fee)) => - val updatedReputation = reputations.getOrElse((originNode, endorsement), Reputation.init(reputationConfig)).record(relayId, isSuccess = true, Some(fee)) - r.updated((originNode, endorsement), updatedReputation) + val updatedReputations = fees.foldLeft(reputations) { + case (r, (peerEndorsement, fee)) => + val updatedReputation = reputations.getOrElse(peerEndorsement, Reputation.init(reputationConfig)).record(relayId, isSuccess = true, Some(fee)) + r.updated(peerEndorsement, updatedReputation) } ReputationRecorder(reputationConfig, updatedReputations) } diff --git a/eclair-core/src/test/scala/fr/acinq/eclair/TestConstants.scala b/eclair-core/src/test/scala/fr/acinq/eclair/TestConstants.scala index 470f2164df..2ea188fb76 100644 --- a/eclair-core/src/test/scala/fr/acinq/eclair/TestConstants.scala +++ b/eclair-core/src/test/scala/fr/acinq/eclair/TestConstants.scala @@ -29,7 +29,7 @@ import fr.acinq.eclair.io.MessageRelay.RelayAll import fr.acinq.eclair.io.{OpenChannelInterceptor, PeerConnection} import fr.acinq.eclair.message.OnionMessages.OnionMessageConfig import fr.acinq.eclair.payment.relay.Relayer.{AsyncPaymentsParams, RelayFees, RelayParams} -import fr.acinq.eclair.reputation.Reputation.ReputationConfig +import fr.acinq.eclair.reputation.Reputation import fr.acinq.eclair.router.Graph.{MessagePath, WeightRatios} import fr.acinq.eclair.router.PathFindingExperimentConf import fr.acinq.eclair.router.Router._ @@ -167,7 +167,7 @@ object TestConstants { feeProportionalMillionths = 30), enforcementDelay = 10 minutes, asyncPaymentsParams = AsyncPaymentsParams(1008, CltvExpiryDelta(144)), - peerReputationConfig = ReputationConfig(1 day, 10 seconds, 100), + peerReputationConfig = Reputation.Config(enabled = false, 1 day, 10 seconds, 100), ), db = TestDatabases.inMemoryDb(), autoReconnect = false, @@ -339,7 +339,7 @@ object TestConstants { feeProportionalMillionths = 30), enforcementDelay = 10 minutes, asyncPaymentsParams = AsyncPaymentsParams(1008, CltvExpiryDelta(144)), - peerReputationConfig = ReputationConfig(2 day, 20 seconds, 200), + peerReputationConfig = Reputation.Config(enabled = false, 2 day, 20 seconds, 200), ), db = TestDatabases.inMemoryDb(), autoReconnect = false, diff --git a/eclair-core/src/test/scala/fr/acinq/eclair/channel/FuzzySpec.scala b/eclair-core/src/test/scala/fr/acinq/eclair/channel/FuzzySpec.scala index 7ea80b8fa2..51c6f1b193 100644 --- a/eclair-core/src/test/scala/fr/acinq/eclair/channel/FuzzySpec.scala +++ b/eclair-core/src/test/scala/fr/acinq/eclair/channel/FuzzySpec.scala @@ -16,7 +16,7 @@ package fr.acinq.eclair.channel -import akka.actor.typed.scaladsl.adapter.{ClassicActorSystemOps, actorRefAdapter} +import akka.actor.typed.scaladsl.adapter.actorRefAdapter import akka.actor.{Actor, ActorLogging, ActorRef, Props} import akka.testkit.{TestFSMRef, TestProbe} import fr.acinq.bitcoin.scalacompat.ByteVector32 @@ -33,7 +33,6 @@ import fr.acinq.eclair.payment.receive.MultiPartHandler.ReceiveStandardPayment import fr.acinq.eclair.payment.receive.PaymentHandler import fr.acinq.eclair.payment.relay.Relayer import fr.acinq.eclair.payment.send.ClearRecipient -import fr.acinq.eclair.reputation.ReputationRecorder import fr.acinq.eclair.wire.protocol._ import grizzled.slf4j.Logging import org.scalatest.funsuite.FixtureAnyFunSuiteLike @@ -67,10 +66,8 @@ class FuzzySpec extends TestKitBaseClass with FixtureAnyFunSuiteLike with Channe val bobRegister = system.actorOf(Props(new TestRegister())) val alicePaymentHandler = system.actorOf(Props(new PaymentHandler(aliceParams, aliceRegister, TestProbe().ref))) val bobPaymentHandler = system.actorOf(Props(new PaymentHandler(bobParams, bobRegister, TestProbe().ref))) - val aliceReputationRecorder = system.spawnAnonymous(ReputationRecorder(aliceParams.relayParams.peerReputationConfig, Map.empty)) - val bobReputationRecorder = system.spawnAnonymous(ReputationRecorder(bobParams.relayParams.peerReputationConfig, Map.empty)) - val aliceRelayer = system.actorOf(Relayer.props(aliceParams, TestProbe().ref, aliceRegister, alicePaymentHandler, TestProbe().ref, aliceReputationRecorder)) - val bobRelayer = system.actorOf(Relayer.props(bobParams, TestProbe().ref, bobRegister, bobPaymentHandler, TestProbe().ref, bobReputationRecorder)) + val aliceRelayer = system.actorOf(Relayer.props(aliceParams, TestProbe().ref, aliceRegister, alicePaymentHandler, TestProbe().ref, None)) + val bobRelayer = system.actorOf(Relayer.props(bobParams, TestProbe().ref, bobRegister, bobPaymentHandler, TestProbe().ref, None)) val wallet = new DummyOnChainWallet() val alice: TestFSMRef[ChannelState, ChannelData, Channel] = TestFSMRef(new Channel(aliceParams, wallet, bobParams.nodeId, alice2blockchain.ref, aliceRelayer, FakeTxPublisherFactory(alice2blockchain)), alicePeer.ref) val bob: TestFSMRef[ChannelState, ChannelData, Channel] = TestFSMRef(new Channel(bobParams, wallet, aliceParams.nodeId, bob2blockchain.ref, bobRelayer, FakeTxPublisherFactory(bob2blockchain)), bobPeer.ref) diff --git a/eclair-core/src/test/scala/fr/acinq/eclair/integration/basic/fixtures/MinimalNodeFixture.scala b/eclair-core/src/test/scala/fr/acinq/eclair/integration/basic/fixtures/MinimalNodeFixture.scala index cb8ae9a386..f9f0821b67 100644 --- a/eclair-core/src/test/scala/fr/acinq/eclair/integration/basic/fixtures/MinimalNodeFixture.scala +++ b/eclair-core/src/test/scala/fr/acinq/eclair/integration/basic/fixtures/MinimalNodeFixture.scala @@ -97,8 +97,7 @@ object MinimalNodeFixture extends Assertions with Eventually with IntegrationPat val router = system.actorOf(Router.props(nodeParams, watcherTyped), "router") val offerManager = system.spawn(OfferManager(nodeParams, router, 1 minute), "offer-manager") val paymentHandler = system.actorOf(PaymentHandler.props(nodeParams, register, offerManager), "payment-handler") - val reputationRecorder = system.spawn(ReputationRecorder(nodeParams.relayParams.peerReputationConfig, Map.empty), "reputation-recorder") - val relayer = system.actorOf(Relayer.props(nodeParams, router, register, paymentHandler, triggerer.ref.toTyped, reputationRecorder), "relayer") + val relayer = system.actorOf(Relayer.props(nodeParams, router, register, paymentHandler, triggerer.ref.toTyped, None), "relayer") val txPublisherFactory = Channel.SimpleTxPublisherFactory(nodeParams, watcherTyped, bitcoinClient) val channelFactory = Peer.SimpleChannelFactory(nodeParams, watcherTyped, relayer, wallet, txPublisherFactory) val pendingChannelsRateLimiter = system.spawnAnonymous(Behaviors.supervise(PendingChannelsRateLimiter(nodeParams, router.toTyped, Seq())).onFailure(typed.SupervisorStrategy.resume)) diff --git a/eclair-core/src/test/scala/fr/acinq/eclair/payment/PostRestartHtlcCleanerSpec.scala b/eclair-core/src/test/scala/fr/acinq/eclair/payment/PostRestartHtlcCleanerSpec.scala index 5e0cd855b8..7e7333a4a7 100644 --- a/eclair-core/src/test/scala/fr/acinq/eclair/payment/PostRestartHtlcCleanerSpec.scala +++ b/eclair-core/src/test/scala/fr/acinq/eclair/payment/PostRestartHtlcCleanerSpec.scala @@ -57,7 +57,7 @@ class PostRestartHtlcCleanerSpec extends TestKitBaseClass with FixtureAnyFunSuit case class FixtureParam(nodeParams: NodeParams, register: TestProbe, sender: TestProbe, eventListener: TestProbe) { def createRelayer(nodeParams1: NodeParams): (ActorRef, ActorRef) = { - val relayer = system.actorOf(Relayer.props(nodeParams1, TestProbe().ref, register.ref, TestProbe().ref, TestProbe().ref.toTyped, TestProbe().ref.toTyped)) + val relayer = system.actorOf(Relayer.props(nodeParams1, TestProbe().ref, register.ref, TestProbe().ref, TestProbe().ref.toTyped, None)) // we need ensure the post-htlc-restart child actor is initialized sender.send(relayer, Relayer.GetChildActors(sender.ref)) (relayer, sender.expectMsgType[Relayer.ChildActors].postRestartCleaner) diff --git a/eclair-core/src/test/scala/fr/acinq/eclair/payment/relay/ChannelRelayerSpec.scala b/eclair-core/src/test/scala/fr/acinq/eclair/payment/relay/ChannelRelayerSpec.scala index 71bde1f19e..2ee7d8979f 100644 --- a/eclair-core/src/test/scala/fr/acinq/eclair/payment/relay/ChannelRelayerSpec.scala +++ b/eclair-core/src/test/scala/fr/acinq/eclair/payment/relay/ChannelRelayerSpec.scala @@ -55,7 +55,7 @@ class ChannelRelayerSpec extends ScalaTestWithActorTestKit(ConfigFactory.load("a val nodeParams = TestConstants.Bob.nodeParams val register = TestProbe[Any]("register") val reputationRecorder = TestProbe[ReputationRecorder.Command]("reputation-recorder") - val channelRelayer = testKit.spawn(ChannelRelayer.apply(nodeParams, register.ref.toClassic, reputationRecorder.ref)) + val channelRelayer = testKit.spawn(ChannelRelayer.apply(nodeParams, register.ref.toClassic, Some(reputationRecorder.ref))) try { withFixture(test.toNoArgTest(FixtureParam(nodeParams, channelRelayer, register, reputationRecorder))) } finally { diff --git a/eclair-core/src/test/scala/fr/acinq/eclair/payment/relay/NodeRelayerSpec.scala b/eclair-core/src/test/scala/fr/acinq/eclair/payment/relay/NodeRelayerSpec.scala index e22e618ac3..8ee310ab6d 100644 --- a/eclair-core/src/test/scala/fr/acinq/eclair/payment/relay/NodeRelayerSpec.scala +++ b/eclair-core/src/test/scala/fr/acinq/eclair/payment/relay/NodeRelayerSpec.scala @@ -71,7 +71,7 @@ class NodeRelayerSpec extends ScalaTestWithActorTestKit(ConfigFactory.load("appl def createNodeRelay(packetIn: IncomingPaymentPacket.NodeRelayPacket, useRealPaymentFactory: Boolean = false): (ActorRef[NodeRelay.Command], TestProbe[NodeRelayer.Command]) = { val parent = TestProbe[NodeRelayer.Command]("parent-relayer") val outgoingPaymentFactory = if (useRealPaymentFactory) RealOutgoingPaymentFactory(this) else FakeOutgoingPaymentFactory(this) - val nodeRelay = testKit.spawn(NodeRelay(nodeParams, parent.ref, register.ref.toClassic, reputationRecorder.ref, relayId, packetIn, outgoingPaymentFactory, triggerer.ref, router.ref.toClassic)) + val nodeRelay = testKit.spawn(NodeRelay(nodeParams, parent.ref, register.ref.toClassic, Some(reputationRecorder.ref), relayId, packetIn, outgoingPaymentFactory, triggerer.ref, router.ref.toClassic)) (nodeRelay, parent) } } @@ -109,7 +109,7 @@ class NodeRelayerSpec extends ScalaTestWithActorTestKit(ConfigFactory.load("appl test("create child handlers for new payments") { f => import f._ val probe = TestProbe[Any]() - val parentRelayer = testKit.spawn(NodeRelayer(nodeParams, register.ref.toClassic, reputationRecorder.ref, FakeOutgoingPaymentFactory(f), triggerer.ref, router.ref.toClassic)) + val parentRelayer = testKit.spawn(NodeRelayer(nodeParams, register.ref.toClassic, Some(reputationRecorder.ref), FakeOutgoingPaymentFactory(f), triggerer.ref, router.ref.toClassic)) parentRelayer ! NodeRelayer.GetPendingPayments(probe.ref.toClassic) probe.expectMessage(Map.empty) @@ -148,7 +148,7 @@ class NodeRelayerSpec extends ScalaTestWithActorTestKit(ConfigFactory.load("appl val outgoingPaymentFactory = FakeOutgoingPaymentFactory(f) { - val parentRelayer = testKit.spawn(NodeRelayer(nodeParams, register.ref.toClassic, reputationRecorder.ref, outgoingPaymentFactory, triggerer.ref, router.ref.toClassic)) + val parentRelayer = testKit.spawn(NodeRelayer(nodeParams, register.ref.toClassic, Some(reputationRecorder.ref), outgoingPaymentFactory, triggerer.ref, router.ref.toClassic)) parentRelayer ! NodeRelayer.GetPendingPayments(probe.ref.toClassic) probe.expectMessage(Map.empty) } @@ -156,7 +156,7 @@ class NodeRelayerSpec extends ScalaTestWithActorTestKit(ConfigFactory.load("appl val (paymentHash1, paymentSecret1, child1) = (randomBytes32(), randomBytes32(), TestProbe[NodeRelay.Command]()) val (paymentHash2, paymentSecret2, child2) = (randomBytes32(), randomBytes32(), TestProbe[NodeRelay.Command]()) val children = Map(PaymentKey(paymentHash1, paymentSecret1) -> child1.ref, PaymentKey(paymentHash2, paymentSecret2) -> child2.ref) - val parentRelayer = testKit.spawn(NodeRelayer(nodeParams, register.ref.toClassic, reputationRecorder.ref, outgoingPaymentFactory, triggerer.ref, router.ref.toClassic, children)) + val parentRelayer = testKit.spawn(NodeRelayer(nodeParams, register.ref.toClassic, Some(reputationRecorder.ref), outgoingPaymentFactory, triggerer.ref, router.ref.toClassic, children)) parentRelayer ! NodeRelayer.GetPendingPayments(probe.ref.toClassic) probe.expectMessage(children) @@ -172,7 +172,7 @@ class NodeRelayerSpec extends ScalaTestWithActorTestKit(ConfigFactory.load("appl val (paymentSecret1, child1) = (randomBytes32(), TestProbe[NodeRelay.Command]()) val (paymentSecret2, child2) = (randomBytes32(), TestProbe[NodeRelay.Command]()) val children = Map(PaymentKey(paymentHash, paymentSecret1) -> child1.ref, PaymentKey(paymentHash, paymentSecret2) -> child2.ref) - val parentRelayer = testKit.spawn(NodeRelayer(nodeParams, register.ref.toClassic, reputationRecorder.ref, outgoingPaymentFactory, triggerer.ref, router.ref.toClassic, children)) + val parentRelayer = testKit.spawn(NodeRelayer(nodeParams, register.ref.toClassic, Some(reputationRecorder.ref), outgoingPaymentFactory, triggerer.ref, router.ref.toClassic, children)) parentRelayer ! NodeRelayer.GetPendingPayments(probe.ref.toClassic) probe.expectMessage(children) @@ -182,7 +182,7 @@ class NodeRelayerSpec extends ScalaTestWithActorTestKit(ConfigFactory.load("appl probe.expectMessage(Map(PaymentKey(paymentHash, paymentSecret2) -> child2.ref)) } { - val parentRelayer = testKit.spawn(NodeRelayer(nodeParams, register.ref.toClassic, reputationRecorder.ref, outgoingPaymentFactory, triggerer.ref, router.ref.toClassic)) + val parentRelayer = testKit.spawn(NodeRelayer(nodeParams, register.ref.toClassic, Some(reputationRecorder.ref), outgoingPaymentFactory, triggerer.ref, router.ref.toClassic)) parentRelayer ! NodeRelayer.Relay(incomingMultiPart.head, randomKey().publicKey) parentRelayer ! NodeRelayer.GetPendingPayments(probe.ref.toClassic) val pending1 = probe.expectMessageType[Map[PaymentKey, ActorRef[NodeRelay.Command]]] diff --git a/eclair-core/src/test/scala/fr/acinq/eclair/payment/relay/RelayerSpec.scala b/eclair-core/src/test/scala/fr/acinq/eclair/payment/relay/RelayerSpec.scala index 32a4398ce1..2d5c2aadce 100644 --- a/eclair-core/src/test/scala/fr/acinq/eclair/payment/relay/RelayerSpec.scala +++ b/eclair-core/src/test/scala/fr/acinq/eclair/payment/relay/RelayerSpec.scala @@ -61,7 +61,7 @@ class RelayerSpec extends ScalaTestWithActorTestKit(ConfigFactory.load("applicat val probe = TestProbe[Any]() // we can't spawn top-level actors with akka typed testKit.spawn(Behaviors.setup[Any] { context => - val relayer = context.toClassic.actorOf(Relayer.props(nodeParams, router.ref.toClassic, register.ref.toClassic, paymentHandler.ref.toClassic, triggerer.ref, reputationRecorder.ref)) + val relayer = context.toClassic.actorOf(Relayer.props(nodeParams, router.ref.toClassic, register.ref.toClassic, paymentHandler.ref.toClassic, triggerer.ref, Some(reputationRecorder.ref))) probe.ref ! relayer Behaviors.empty[Any] }) diff --git a/eclair-core/src/test/scala/fr/acinq/eclair/reputation/ReputationRecorderSpec.scala b/eclair-core/src/test/scala/fr/acinq/eclair/reputation/ReputationRecorderSpec.scala index 310ee4fbab..7228b3372d 100644 --- a/eclair-core/src/test/scala/fr/acinq/eclair/reputation/ReputationRecorderSpec.scala +++ b/eclair-core/src/test/scala/fr/acinq/eclair/reputation/ReputationRecorderSpec.scala @@ -20,7 +20,6 @@ import akka.actor.testkit.typed.scaladsl.{ScalaTestWithActorTestKit, TestProbe} import akka.actor.typed.ActorRef import com.typesafe.config.ConfigFactory import fr.acinq.bitcoin.scalacompat.Crypto.PublicKey -import fr.acinq.eclair.reputation.Reputation.ReputationConfig import fr.acinq.eclair.reputation.ReputationRecorder._ import fr.acinq.eclair.{MilliSatoshiLong, randomKey} import org.scalatest.Outcome @@ -33,16 +32,16 @@ class ReputationRecorderSpec extends ScalaTestWithActorTestKit(ConfigFactory.loa val (uuid1, uuid2, uuid3, uuid4, uuid5, uuid6, uuid7, uuid8) = (UUID.randomUUID(), UUID.randomUUID(), UUID.randomUUID(), UUID.randomUUID(), UUID.randomUUID(), UUID.randomUUID(), UUID.randomUUID(), UUID.randomUUID()) val originNode: PublicKey = randomKey().publicKey - case class FixtureParam(config: ReputationConfig, reputationRecorder: ActorRef[Command], replyTo: TestProbe[Confidence]) + case class FixtureParam(config: Reputation.Config, reputationRecorder: ActorRef[Command], replyTo: TestProbe[Confidence]) override def withFixture(test: OneArgTest): Outcome = { - val config = ReputationConfig(1 day, 10 seconds, 2) + val config = Reputation.Config(enabled = true, 1 day, 10 seconds, 2) val replyTo = TestProbe[Confidence]("confidence") val reputationRecorder = testKit.spawn(ReputationRecorder(config, Map.empty)) withFixture(test.toNoArgTest(FixtureParam(config, reputationRecorder.ref, replyTo))) } - test("standard") { f => + test("channel relay") { f => import f._ reputationRecorder ! GetConfidence(replyTo.ref, originNode, 7, uuid1, 2000 msat) @@ -68,20 +67,20 @@ class ReputationRecorderSpec extends ScalaTestWithActorTestKit(ConfigFactory.loa assert(replyTo.expectMessageType[Confidence].value === 0.0 +- 0.001) } - test("trampoline") { f => + test("trampoline relay") { f => import f._ val (a, b, c) = (randomKey().publicKey, randomKey().publicKey, randomKey().publicKey) - reputationRecorder ! GetTrampolineConfidence(replyTo.ref, Map((a, 7) -> 2000.msat, (b, 7) -> 4000.msat, (c, 0) -> 6000.msat), uuid1) + reputationRecorder ! GetTrampolineConfidence(replyTo.ref, Map(PeerEndorsement(a, 7) -> 2000.msat, PeerEndorsement(b, 7) -> 4000.msat, PeerEndorsement(c, 0) -> 6000.msat), uuid1) assert(replyTo.expectMessageType[Confidence].value == 0) - reputationRecorder ! RecordTrampolineSuccess(Map((a, 7) -> 1000.msat, (b, 7) -> 2000.msat, (c, 0) -> 3000.msat), uuid1) - reputationRecorder ! GetTrampolineConfidence(replyTo.ref, Map((a, 7) -> 1000.msat, (c, 0) -> 1000.msat), uuid2) + reputationRecorder ! RecordTrampolineSuccess(Map(PeerEndorsement(a, 7) -> 1000.msat, PeerEndorsement(b, 7) -> 2000.msat, PeerEndorsement(c, 0) -> 3000.msat), uuid1) + reputationRecorder ! GetTrampolineConfidence(replyTo.ref, Map(PeerEndorsement(a, 7) -> 1000.msat, PeerEndorsement(c, 0) -> 1000.msat), uuid2) assert(replyTo.expectMessageType[Confidence].value === (1.0 / 3) +- 0.001) - reputationRecorder ! GetTrampolineConfidence(replyTo.ref, Map((a, 0) -> 1000.msat, (b, 7) -> 2000.msat), uuid3) + reputationRecorder ! GetTrampolineConfidence(replyTo.ref, Map(PeerEndorsement(a, 0) -> 1000.msat, PeerEndorsement(b, 7) -> 2000.msat), uuid3) assert(replyTo.expectMessageType[Confidence].value == 0) - reputationRecorder ! RecordTrampolineFailure(Set((a, 7), (c, 0)), uuid2) - reputationRecorder ! RecordTrampolineSuccess(Map((a, 0) -> 1000.msat, (b, 7) -> 2000.msat), uuid3) + reputationRecorder ! RecordTrampolineFailure(Set(PeerEndorsement(a, 7), PeerEndorsement(c, 0)), uuid2) + reputationRecorder ! RecordTrampolineSuccess(Map(PeerEndorsement(a, 0) -> 1000.msat, PeerEndorsement(b, 7) -> 2000.msat), uuid3) reputationRecorder ! GetConfidence(replyTo.ref, a, 7, uuid4, 1000 msat) assert(replyTo.expectMessageType[Confidence].value === (1.0 / 4) +- 0.001) diff --git a/eclair-core/src/test/scala/fr/acinq/eclair/reputation/ReputationSpec.scala b/eclair-core/src/test/scala/fr/acinq/eclair/reputation/ReputationSpec.scala index 9d369aacab..4ee8042ad6 100644 --- a/eclair-core/src/test/scala/fr/acinq/eclair/reputation/ReputationSpec.scala +++ b/eclair-core/src/test/scala/fr/acinq/eclair/reputation/ReputationSpec.scala @@ -16,10 +16,10 @@ package fr.acinq.eclair.reputation +import fr.acinq.eclair.reputation.Reputation._ import fr.acinq.eclair.{MilliSatoshiLong, TimestampMilli} -import fr.acinq.eclair.reputation.Reputation.ReputationConfig -import org.scalatest.funsuite.AnyFunSuite import org.scalactic.Tolerance.convertNumericToPlusOrMinusWrapper +import org.scalatest.funsuite.AnyFunSuite import java.util.UUID import scala.concurrent.duration.DurationInt @@ -28,7 +28,7 @@ class ReputationSpec extends AnyFunSuite { val (uuid1, uuid2, uuid3, uuid4, uuid5, uuid6, uuid7) = (UUID.randomUUID(), UUID.randomUUID(), UUID.randomUUID(), UUID.randomUUID(), UUID.randomUUID(), UUID.randomUUID(), UUID.randomUUID()) test("basic") { - val r0 = Reputation.init(ReputationConfig(1 day, 1 second, 2)) + val r0 = Reputation.init(Config(enabled = true, 1 day, 1 second, 2)) val (r1, c1) = r0.attempt(uuid1, 10000 msat) assert(c1 == 0) val r2 = r1.record(uuid1, isSuccess = true) @@ -51,7 +51,7 @@ class ReputationSpec extends AnyFunSuite { } test("long HTLC") { - val r0 = Reputation.init(ReputationConfig(1000 day, 1 second, 10)) + val r0 = Reputation.init(Config(enabled = true, 1000 day, 1 second, 10)) val (r1, c1) = r0.attempt(uuid1, 100000 msat, TimestampMilli(0)) assert(c1 == 0) val r2 = r1.record(uuid1, isSuccess = true, now = TimestampMilli(0)) @@ -63,7 +63,7 @@ class ReputationSpec extends AnyFunSuite { } test("exponential decay") { - val r0 = Reputation.init(ReputationConfig(100 seconds, 1 second, 1)) + val r0 = Reputation.init(Config(enabled = true, 100 seconds, 1 second, 1)) val (r1, _) = r0.attempt(uuid1, 1000 msat, TimestampMilli(0)) val r2 = r1.record(uuid1, isSuccess = true, now = TimestampMilli(0)) val (r3, c3) = r2.attempt(uuid2, 1000 msat, TimestampMilli(0))