From 472275573cd4b9576d54a6e06373223877653554 Mon Sep 17 00:00:00 2001 From: Thomas HUET <81159533+thomash-acinq@users.noreply.github.com> Date: Wed, 8 Jun 2022 13:37:19 +0200 Subject: [PATCH] Estimate balances of remote channels (#2272) We use past payments attempts to estimate the balances of all channels. This can later be used in path-finding to improve the reliability of payments. --- eclair-core/src/main/resources/reference.conf | 1 + .../scala/fr/acinq/eclair/NodeParams.scala | 3 +- .../payment/send/PaymentLifecycle.scala | 21 +- .../remote/EclairInternalsSerializer.scala | 3 +- .../acinq/eclair/router/BalanceEstimate.scala | 331 ++++++++++++++++++ .../fr/acinq/eclair/router/Monitoring.scala | 6 + .../eclair/router/RouteCalculation.scala | 12 +- .../scala/fr/acinq/eclair/router/Router.scala | 44 ++- .../acinq/eclair/router/StaleChannels.scala | 4 +- .../fr/acinq/eclair/router/Validation.scala | 59 ++-- .../scala/fr/acinq/eclair/TestConstants.scala | 6 +- .../eclair/payment/PaymentLifecycleSpec.scala | 29 ++ .../eclair/router/BalanceEstimateSpec.scala | 292 +++++++++++++++ .../fr/acinq/eclair/router/RouterSpec.scala | 8 +- 14 files changed, 767 insertions(+), 52 deletions(-) create mode 100644 eclair-core/src/main/scala/fr/acinq/eclair/router/BalanceEstimate.scala create mode 100644 eclair-core/src/test/scala/fr/acinq/eclair/router/BalanceEstimateSpec.scala diff --git a/eclair-core/src/main/resources/reference.conf b/eclair-core/src/main/resources/reference.conf index ae606c4774..8b6739d551 100644 --- a/eclair-core/src/main/resources/reference.conf +++ b/eclair-core/src/main/resources/reference.conf @@ -230,6 +230,7 @@ eclair { channel-exclude-duration = 60 seconds // when a temporary channel failure is returned, we exclude the channel from our payment routes for this duration broadcast-interval = 60 seconds // see BOLT #7 init-timeout = 5 minutes + balance-estimate-half-life = 1 day // time after which the confidence of the balance estimate is halved sync { request-node-announcements = true // if true we will ask for node announcements when we receive channel ids that we don't know diff --git a/eclair-core/src/main/scala/fr/acinq/eclair/NodeParams.scala b/eclair-core/src/main/scala/fr/acinq/eclair/NodeParams.scala index 7fedf8d21d..155a56c1e1 100644 --- a/eclair-core/src/main/scala/fr/acinq/eclair/NodeParams.scala +++ b/eclair-core/src/main/scala/fr/acinq/eclair/NodeParams.scala @@ -493,7 +493,8 @@ object NodeParams extends Logging { encodingType = EncodingType.UNCOMPRESSED, channelRangeChunkSize = config.getInt("router.sync.channel-range-chunk-size"), channelQueryChunkSize = config.getInt("router.sync.channel-query-chunk-size"), - pathFindingExperimentConf = getPathFindingExperimentConf(config.getConfig("router.path-finding.experiments")) + pathFindingExperimentConf = getPathFindingExperimentConf(config.getConfig("router.path-finding.experiments")), + balanceEstimateHalfLife = FiniteDuration(config.getDuration("router.balance-estimate-half-life").getSeconds, TimeUnit.SECONDS), ), socksProxy_opt = socksProxy_opt, maxPaymentAttempts = config.getInt("max-payment-attempts"), diff --git a/eclair-core/src/main/scala/fr/acinq/eclair/payment/send/PaymentLifecycle.scala b/eclair-core/src/main/scala/fr/acinq/eclair/payment/send/PaymentLifecycle.scala index 0735f48353..4a7e80e688 100644 --- a/eclair-core/src/main/scala/fr/acinq/eclair/payment/send/PaymentLifecycle.scala +++ b/eclair-core/src/main/scala/fr/acinq/eclair/payment/send/PaymentLifecycle.scala @@ -102,6 +102,7 @@ class PaymentLifecycle(nodeParams: NodeParams, cfg: SendPaymentConfig, router: A handleLocalFail(d, DisconnectedException, isFatal = false) case Event(RES_ADD_SETTLED(_, htlc, fulfill: HtlcResult.Fulfill), d: WaitingForComplete) => + router ! Router.RouteDidRelay(d.route) Metrics.PaymentAttempt.withTag(Tags.MultiPart, value = false).record(d.failures.size + 1) val p = PartialPayment(id, d.c.finalPayload.amount, d.cmd.amount - d.c.finalPayload.amount, htlc.channelId, Some(cfg.fullRoute(d.route))) myStop(d.c, Right(cfg.createPaymentSent(fulfill.paymentPreimage, p :: Nil))) @@ -166,13 +167,31 @@ class PaymentLifecycle(nodeParams: NodeParams, cfg: SendPaymentConfig, router: A private def handleRemoteFail(d: WaitingForComplete, fail: UpdateFailHtlc) = { import d._ - (Sphinx.FailurePacket.decrypt(fail.reason, sharedSecrets) match { + ((Sphinx.FailurePacket.decrypt(fail.reason, sharedSecrets) match { case success@Success(e) => Metrics.PaymentError.withTag(Tags.Failure, Tags.FailureType(RemoteFailure(d.c.finalPayload.amount, Nil, e))).increment() success case failure@Failure(_) => Metrics.PaymentError.withTag(Tags.Failure, Tags.FailureType(UnreadableRemoteFailure(d.c.finalPayload.amount, Nil))).increment() failure + }) match { + case res@Success(Sphinx.DecryptedFailurePacket(nodeId, failureMessage)) => + // We have discovered some liquidity information with this payment: we update the router accordingly. + val stoppedRoute = d.route.stopAt(nodeId) + if (stoppedRoute.hops.length > 1) { + router ! Router.RouteCouldRelay(stoppedRoute) + } + failureMessage match { + case TemporaryChannelFailure(update) => + d.route.hops.find(_.nodeId == nodeId) match { + case Some(failingHop) if ChannelRelayParams.areSame(failingHop.params, ChannelRelayParams.FromAnnouncement(update), true) => + router ! Router.ChannelCouldNotRelay(stoppedRoute.amount, failingHop) + case _ => // otherwise the relay parameters may have changed, so it's not necessarily a liquidity issue + } + case _ => // other errors should not be used for liquidity issues + } + res + case res => res }) match { case Success(e@Sphinx.DecryptedFailurePacket(nodeId, failureMessage)) if nodeId == c.targetNodeId => // if destination node returns an error, we fail the payment immediately diff --git a/eclair-core/src/main/scala/fr/acinq/eclair/remote/EclairInternalsSerializer.scala b/eclair-core/src/main/scala/fr/acinq/eclair/remote/EclairInternalsSerializer.scala index 5d0c6a3a8b..d0917fdf4b 100644 --- a/eclair-core/src/main/scala/fr/acinq/eclair/remote/EclairInternalsSerializer.scala +++ b/eclair-core/src/main/scala/fr/acinq/eclair/remote/EclairInternalsSerializer.scala @@ -96,7 +96,8 @@ object EclairInternalsSerializer { .typecase(1, provide(EncodingType.COMPRESSED_ZLIB))) :: ("channelRangeChunkSize" | int32) :: ("channelQueryChunkSize" | int32) :: - ("pathFindingExperimentConf" | pathFindingExperimentConfCodec)).as[RouterConf] + ("pathFindingExperimentConf" | pathFindingExperimentConfCodec) :: + ("balanceEstimateHalfLife" | finiteDurationCodec)).as[RouterConf] val overrideFeaturesListCodec: Codec[List[(PublicKey, Features[Feature])]] = listOfN(uint16, publicKey ~ variableSizeBytes(uint16, featuresCodec)) diff --git a/eclair-core/src/main/scala/fr/acinq/eclair/router/BalanceEstimate.scala b/eclair-core/src/main/scala/fr/acinq/eclair/router/BalanceEstimate.scala new file mode 100644 index 0000000000..18ea2a810d --- /dev/null +++ b/eclair-core/src/main/scala/fr/acinq/eclair/router/BalanceEstimate.scala @@ -0,0 +1,331 @@ +/* + * Copyright 2021 ACINQ SAS + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package fr.acinq.eclair.router + +import fr.acinq.bitcoin.scalacompat.Crypto.PublicKey +import fr.acinq.bitcoin.scalacompat.{Satoshi, SatoshiLong} +import fr.acinq.eclair.router.Graph.GraphStructure.{DirectedGraph, GraphEdge} +import fr.acinq.eclair.router.Router.{ChannelDesc, ChannelHop, Route} +import fr.acinq.eclair.{MilliSatoshi, MilliSatoshiLong, ShortChannelId, TimestampSecond, TimestampSecondLong, ToMilliSatoshiConversion} + +import scala.concurrent.duration.{DurationInt, FiniteDuration} + +/** + * Estimates the balance between a pair of nodes + * + * @param low lower bound on the balance + * @param lowTimestamp time at which the lower bound was known to be correct + * @param high upper bound on the balance + * @param highTimestamp time at which the upper bound was known to be correct + * @param capacities capacities of the channels between these two nodes + * @param halfLife time after which the certainty of the lower/upper bounds is halved + */ +case class BalanceEstimate private(low: MilliSatoshi, + lowTimestamp: TimestampSecond, + high: MilliSatoshi, highTimestamp: TimestampSecond, + capacities: Map[ShortChannelId, Satoshi], + halfLife: FiniteDuration) { + val maxCapacity: Satoshi = capacities.values.maxOption.getOrElse(0 sat) + + /* The goal of this class is to estimate the probability that a given edge can relay the amount that we plan to send + * through it. We model this probability with 3 pieces of linear functions. + * + * Without any information we use the following baseline (x is the amount we're sending and y the probability it can be relayed): + * + * 1 |**** + * | **** + * | **** + * | **** + * | **** + * | **** + * | **** + * | **** + * | **** + * | **** + * | **** + * | **** + * 0 +------------------------------------------------**** + * 0 capacity + * + * If we get the information that the edge can (or can't) relay a given amount (because we tried), then we get a lower + * bound (or upper bound) that we can use and our model becomes: + * + * 1 |*************** + * | |* + * | | * + * | | * + * | | * + * | | * + * | | * + * | | * + * | | * + * | | * + * | | * + * | | * + * 0 +--------------|-----------|************************* + * 0 low high capacity + * + * However this lower bound (or upper bound) is only valid at the moment we got that information. If we wait, the + * information decays and we slowly go back towards our baseline: + * + * 1 |***** + * | ***** + * | ***** + * | |** + * | | * + * | | ** + * | | ** + * | | * + * | | ** + * | | * + * | | ******** + * | | | ********* + * 0 +--------------|-----------|----------------********* + * 0 low high capacity + */ + + /** + * We model the decay with a half-life H: every H units of time, our confidence decreases by half and our estimated + * probability distribution gets closer to the baseline uniform distribution of balances between 0 and totalCapacity. + * + * @param amount the amount that we knew we could send or not send at time t + * @param successProbabilityAtT probability that we could relay amount at time t (usually 0 or 1) + * @param t time at which we knew if we could or couldn't send amount + * @return the probability that we can send amount now + */ + private def decay(amount: MilliSatoshi, successProbabilityAtT: Double, t: TimestampSecond): Double = { + val decayRatio = 1 / math.pow(2, (TimestampSecond.now() - t) / halfLife) + val baseline = 1 - amount.toLong.toDouble / maxCapacity.toMilliSatoshi.toLong + baseline * (1 - decayRatio) + successProbabilityAtT * decayRatio + } + + private def otherSide: BalanceEstimate = + BalanceEstimate(maxCapacity - high, highTimestamp, maxCapacity - low, lowTimestamp, capacities, halfLife) + + /** + * We tried to send the given amount and received a temporary channel failure. We assume that this failure was caused + * by a lack of liquidity: it could also be caused by a violation of max_accepted_htlcs, max_htlc_value_in_flight_msat + * or a spamming protection heuristic by the relaying node, but since we have no way of detecting that, our best + * strategy is to ignore these cases. + */ + def couldNotSend(amount: MilliSatoshi, timestamp: TimestampSecond): BalanceEstimate = { + if (amount <= low) { + // the balance is actually below `low`, we discard our previous lower bound + copy(low = 0 msat, lowTimestamp = timestamp, high = amount, highTimestamp = timestamp) + } else if (amount < high) { + // the balance is between `low` and `high` as we expected, we discard our previous upper bound + copy(high = amount, highTimestamp = timestamp) + } else { + // We already expected not to be able to relay that amount as it is above our upper bound. However if the upper bound + // was old enough that replacing it with the current amount decreases the success probability for `high`, then we + // replace it. + val updated = copy(high = amount, highTimestamp = timestamp) + if (updated.canSend(high) < this.canSend(high)) { + updated + } else { + this + } + } + } + + /** + * We tried to send the given amount, it was correctly relayed but failed afterwards, so we know we should be able to + * send at least this amount again. + */ + def couldSend(amount: MilliSatoshi, timestamp: TimestampSecond): BalanceEstimate = + otherSide.couldNotSend(maxCapacity - amount, timestamp).otherSide + + /** + * We successfully sent the given amount, so we know that some of the liquidity has shifted. + */ + def didSend(amount: MilliSatoshi, timestamp: TimestampSecond): BalanceEstimate = { + val newLow = (low - amount).max(0 msat) + if (capacities.size == 1) { + // Special case for single channel as we expect this case to be quite common and we can easily get more precise bounds. + val newHigh = (high - amount).max(0 msat) + // We could shift everything left by amount without changing the timestamps but we may get more information by + // ignoring the old high if it has decayed too much. We try both and choose the one that gives the lowest + // probability for the new high. + val a = copy(low = newLow, high = newHigh) + val b = copy(low = newLow, high = (maxCapacity - amount).max(0 msat), highTimestamp = timestamp) + if (a.canSend(newHigh) < b.canSend(newHigh)) { + a + } else { + b + } + } else { + copy(low = newLow) + } + } + + /** + * We successfully received the given amount, so we know that some of the liquidity has shifted. + */ + def didReceive(amount: MilliSatoshi, timestamp: TimestampSecond): BalanceEstimate = + otherSide.didSend(amount, timestamp).otherSide + + def addEdge(edge: GraphEdge): BalanceEstimate = copy( + high = high.max(edge.capacity.toMilliSatoshi), + capacities = capacities.updated(edge.desc.shortChannelId, edge.capacity) + ) + + def removeEdge(desc: ChannelDesc): BalanceEstimate = { + val edgeCapacity = capacities.getOrElse(desc.shortChannelId, 0 sat) + val newCapacities = capacities.removed(desc.shortChannelId) + copy( + low = (low - edgeCapacity.toMilliSatoshi).max(0 msat), + high = high.min(newCapacities.values.maxOption.getOrElse(0 sat).toMilliSatoshi), + capacities = newCapacities + ) + } + + /** + * Estimate the probability that we can successfully send `amount` through the channel + * + * We estimate this probability with a piecewise linear function: + * - probability that it can relay a payment of 0 is 1 + * - probability that it can relay a payment of low is decay(low, 1, lowTimestamp) which is close to 1 if lowTimestamp is recent + * - probability that it can relay a payment of high is decay(high, 0, highTimestamp) which is close to 0 if highTimestamp is recent + * - probability that it can relay a payment of maxCapacity is 0 + */ + def canSend(amount: MilliSatoshi): Double = { + val a = amount.toLong.toDouble + val l = low.toLong.toDouble + val h = high.toLong.toDouble + val c = maxCapacity.toMilliSatoshi.toLong.toDouble + + // Success probability at the low and high points + val pLow = decay(low, 1, lowTimestamp) + val pHigh = decay(high, 0, highTimestamp) + + if (amount < low) { + (l - a * (1.0 - pLow)) / l + } else if (amount < high) { + ((h - a) * pLow + (a - l) * pHigh) / (h - l) + } else if (h < c) { + ((c - a) * pHigh) / (c - h) + } else { + 0 + } + } +} + +object BalanceEstimate { + def empty(halfLife: FiniteDuration): BalanceEstimate = BalanceEstimate(0 msat, 0 unixsec, 0 msat, 0 unixsec, Map.empty, halfLife) +} + +/** + * Balance estimates for the whole routing graph. + */ +case class BalancesEstimates(balances: Map[(PublicKey, PublicKey), BalanceEstimate], defaultHalfLife: FiniteDuration) { + private def get(a: PublicKey, b: PublicKey): Option[BalanceEstimate] = balances.get((a, b)) + + def addEdge(edge: GraphEdge): BalancesEstimates = BalancesEstimates( + balances.updatedWith((edge.desc.a, edge.desc.b))(balance => + Some(balance.getOrElse(BalanceEstimate.empty(defaultHalfLife)).addEdge(edge)) + ), + defaultHalfLife + ) + + def removeEdge(desc: ChannelDesc): BalancesEstimates = BalancesEstimates( + balances.updatedWith((desc.a, desc.b)) { + case None => None + case Some(balance) => + val newBalance = balance.removeEdge(desc) + if (newBalance.capacities.nonEmpty) { + Some(newBalance) + } else { + None + } + }, + defaultHalfLife + ) + + def channelCouldSend(hop: ChannelHop, amount: MilliSatoshi): BalancesEstimates = { + get(hop.nodeId, hop.nextNodeId).foreach { balance => + val estimatedProbability = balance.canSend(amount) + Monitoring.Metrics.remoteEdgeRelaySuccess(estimatedProbability) + } + BalancesEstimates(balances.updatedWith((hop.nodeId, hop.nextNodeId))(_.map(_.couldSend(amount, TimestampSecond.now()))), defaultHalfLife) + } + + def channelCouldNotSend(hop: ChannelHop, amount: MilliSatoshi): BalancesEstimates = { + get(hop.nodeId, hop.nextNodeId).foreach { balance => + val estimatedProbability = balance.canSend(amount) + Monitoring.Metrics.remoteEdgeRelayFailure(estimatedProbability) + } + BalancesEstimates(balances.updatedWith((hop.nodeId, hop.nextNodeId))(_.map(_.couldNotSend(amount, TimestampSecond.now()))), defaultHalfLife) + } + + def channelDidSend(hop: ChannelHop, amount: MilliSatoshi): BalancesEstimates = { + get(hop.nodeId, hop.nextNodeId).foreach { balance => + val estimatedProbability = balance.canSend(amount) + Monitoring.Metrics.remoteEdgeRelaySuccess(estimatedProbability) + } + val balances1 = balances.updatedWith((hop.nodeId, hop.nextNodeId))(_.map(_.didSend(amount, TimestampSecond.now()))) + val balances2 = balances1.updatedWith((hop.nextNodeId, hop.nodeId))(_.map(_.didReceive(amount, TimestampSecond.now()))) + BalancesEstimates(balances2, defaultHalfLife) + } + +} + +case class GraphWithBalanceEstimates(graph: DirectedGraph, private val balances: BalancesEstimates) { + def addEdge(edge: GraphEdge): GraphWithBalanceEstimates = GraphWithBalanceEstimates(graph.addEdge(edge), balances.addEdge(edge)) + + def removeEdge(desc: ChannelDesc): GraphWithBalanceEstimates = GraphWithBalanceEstimates(graph.removeEdge(desc), balances.removeEdge(desc)) + + def removeEdges(descList: Iterable[ChannelDesc]): GraphWithBalanceEstimates = GraphWithBalanceEstimates( + graph.removeEdges(descList), + descList.foldLeft(balances)((acc, edge) => acc.removeEdge(edge)), + ) + + def routeCouldRelay(route: Route): GraphWithBalanceEstimates = { + val (balances1, _) = route.hops.foldRight((balances, route.amount)) { + case (hop, (balances, amount)) => + (balances.channelCouldSend(hop, amount), amount + hop.fee(amount)) + } + GraphWithBalanceEstimates(graph, balances1) + } + + def routeDidRelay(route: Route): GraphWithBalanceEstimates = { + val (balances1, _) = route.hops.foldRight((balances, route.amount)) { + case (hop, (balances, amount)) => + (balances.channelDidSend(hop, amount), amount + hop.fee(amount)) + } + GraphWithBalanceEstimates(graph, balances1) + } + + def channelCouldNotSend(hop: ChannelHop, amount: MilliSatoshi): GraphWithBalanceEstimates = { + GraphWithBalanceEstimates(graph, balances.channelCouldNotSend(hop, amount)) + } + + def canSend(amount: MilliSatoshi, edge: GraphEdge): Double = { + balances.balances.get((edge.desc.a, edge.desc.b)) match { + case Some(estimate) => estimate.canSend(amount) + case None => BalanceEstimate.empty(1 hour).addEdge(edge).canSend(amount) + } + } +} + +object GraphWithBalanceEstimates { + def apply(graph: DirectedGraph, defaultHalfLife: FiniteDuration): GraphWithBalanceEstimates = { + val balances = graph.edgeSet().foldLeft(Map.empty[(PublicKey, PublicKey), BalanceEstimate]) { + case (m, edge) => m.updatedWith((edge.desc.a, edge.desc.b))(balance => Some(balance.getOrElse(BalanceEstimate.empty(defaultHalfLife)).addEdge(edge))) + } + GraphWithBalanceEstimates(graph, BalancesEstimates(balances, defaultHalfLife)) + } +} \ No newline at end of file diff --git a/eclair-core/src/main/scala/fr/acinq/eclair/router/Monitoring.scala b/eclair-core/src/main/scala/fr/acinq/eclair/router/Monitoring.scala index bd0e76b068..5daf5383ee 100644 --- a/eclair-core/src/main/scala/fr/acinq/eclair/router/Monitoring.scala +++ b/eclair-core/src/main/scala/fr/acinq/eclair/router/Monitoring.scala @@ -66,6 +66,12 @@ object Monitoring { case _: GossipDecision.Accepted => GossipResult.withTag("result", "accepted") case rejected: GossipDecision.Rejected => GossipResult.withTag("result", "rejected").withTag("reason", getSimpleClassName(rejected)) } + + private val RelayProbabilityEstimate = Kamon.histogram("router.balance-estimates.remote-edge-relay", "Estimated probability (in percent) that the relay will be successful") + + def remoteEdgeRelaySuccess(estimatedProbability: Double) = RelayProbabilityEstimate.withTag("success", (estimatedProbability * 100).toLong) + + def remoteEdgeRelayFailure(estimatedProbability: Double) = RelayProbabilityEstimate.withTag("failure", (estimatedProbability * 100).toLong) } object Tags { diff --git a/eclair-core/src/main/scala/fr/acinq/eclair/router/RouteCalculation.scala b/eclair-core/src/main/scala/fr/acinq/eclair/router/RouteCalculation.scala index 58d75a6c4c..46cd566252 100644 --- a/eclair-core/src/main/scala/fr/acinq/eclair/router/RouteCalculation.scala +++ b/eclair-core/src/main/scala/fr/acinq/eclair/router/RouteCalculation.scala @@ -20,7 +20,6 @@ import akka.actor.{ActorContext, ActorRef, Status} import akka.event.DiagnosticLoggingAdapter import com.softwaremill.quicklens.ModifyPimp import fr.acinq.bitcoin.scalacompat.Crypto.PublicKey -import fr.acinq.bitcoin.scalacompat.{ByteVector32, ByteVector64, Satoshi, SatoshiLong} import fr.acinq.eclair.Logs.LogCategory import fr.acinq.eclair._ import fr.acinq.eclair.payment.Bolt11Invoice.ExtraHop @@ -29,7 +28,6 @@ import fr.acinq.eclair.router.Graph.GraphStructure.{DirectedGraph, GraphEdge} import fr.acinq.eclair.router.Graph.{InfiniteLoop, NegativeProbability, RichWeight, RoutingHeuristics} import fr.acinq.eclair.router.Monitoring.{Metrics, Tags} import fr.acinq.eclair.router.Router._ -import fr.acinq.eclair.wire.protocol.ChannelUpdate import kamon.tag.TagSet import scala.annotation.tailrec @@ -48,7 +46,7 @@ object RouteCalculation { val assistedChannels: Map[ShortChannelId, AssistedChannel] = fr.assistedRoutes.flatMap(toAssistedChannels(_, fr.route.targetNodeId, fr.amount)).toMap val extraEdges = assistedChannels.values.map(ac => GraphEdge(ac)).toSet - val g = extraEdges.foldLeft(d.graph) { case (g: DirectedGraph, e: GraphEdge) => g.addEdge(e) } + val g = extraEdges.foldLeft(d.graphWithBalances.graph) { case (g: DirectedGraph, e: GraphEdge) => g.addEdge(e) } fr.route match { case PredefinedNodeRoute(hops) => @@ -118,13 +116,13 @@ object RouteCalculation { log.info(s"finding routes ${r.source}->${r.target} with assistedChannels={} ignoreNodes={} ignoreChannels={} excludedChannels={}", assistedChannels.keys.mkString(","), r.ignore.nodes.map(_.value).mkString(","), r.ignore.channels.mkString(","), d.excludedChannels.mkString(",")) log.info("finding routes with params={}, multiPart={}", params, r.allowMultiPart) - log.info("local channels to recipient: {}", d.graph.getEdgesBetween(r.source, r.target).map(e => s"${e.desc.shortChannelId} (${e.balance_opt}/${e.capacity})").mkString(", ")) + log.info("local channels to recipient: {}", d.graphWithBalances.graph.getEdgesBetween(r.source, r.target).map(e => s"${e.desc.shortChannelId} (${e.balance_opt}/${e.capacity})").mkString(", ")) val tags = TagSet.Empty.withTag(Tags.MultiPart, r.allowMultiPart).withTag(Tags.Amount, Tags.amountBucket(r.amount)) KamonExt.time(Metrics.FindRouteDuration.withTags(tags.withTag(Tags.NumberOfRoutes, routesToFind.toLong))) { val result = if (r.allowMultiPart) { - findMultiPartRoute(d.graph, r.source, r.target, r.amount, r.maxFee, extraEdges, ignoredEdges, r.ignore.nodes, r.pendingPayments, params, currentBlockHeight) + findMultiPartRoute(d.graphWithBalances.graph, r.source, r.target, r.amount, r.maxFee, extraEdges, ignoredEdges, r.ignore.nodes, r.pendingPayments, params, currentBlockHeight) } else { - findRoute(d.graph, r.source, r.target, r.amount, r.maxFee, routesToFind, extraEdges, ignoredEdges, r.ignore.nodes, params, currentBlockHeight) + findRoute(d.graphWithBalances.graph, r.source, r.target, r.amount, r.maxFee, routesToFind, extraEdges, ignoredEdges, r.ignore.nodes, params, currentBlockHeight) } result match { case Success(routes) => @@ -140,7 +138,7 @@ object RouteCalculation { Metrics.FindRouteErrors.withTags(tags.withTag(Tags.Error, "NegativeProbability")).increment() ctx.sender() ! Status.Failure(failure) case Failure(t) => - val failure = if (isNeighborBalanceTooLow(d.graph, r)) BalanceTooLow else t + val failure = if (isNeighborBalanceTooLow(d.graphWithBalances.graph, r)) BalanceTooLow else t Metrics.FindRouteErrors.withTags(tags.withTag(Tags.Error, failure.getClass.getSimpleName)).increment() ctx.sender() ! Status.Failure(failure) } diff --git a/eclair-core/src/main/scala/fr/acinq/eclair/router/Router.scala b/eclair-core/src/main/scala/fr/acinq/eclair/router/Router.scala index 42caea67d0..6580c17831 100644 --- a/eclair-core/src/main/scala/fr/acinq/eclair/router/Router.scala +++ b/eclair-core/src/main/scala/fr/acinq/eclair/router/Router.scala @@ -103,7 +103,17 @@ class Router(val nodeParams: NodeParams, watcher: typed.ActorRef[ZmqWatcher.Comm log.info(s"initialization completed, ready to process messages") Try(initialized.map(_.success(Done))) - startWith(NORMAL, Data(initNodes, initChannels, Stash(Map.empty, Map.empty), rebroadcast = Rebroadcast(channels = Map.empty, updates = Map.empty, nodes = Map.empty), awaiting = Map.empty, privateChannels = Map.empty, scid2PrivateChannels = Map.empty, excludedChannels = Set.empty, graph, sync = Map.empty)) + val data = Data( + initNodes, initChannels, + Stash(Map.empty, Map.empty), + rebroadcast = Rebroadcast(channels = Map.empty, updates = Map.empty, nodes = Map.empty), + awaiting = Map.empty, + privateChannels = Map.empty, + scid2PrivateChannels = Map.empty, + excludedChannels = Set.empty, + graphWithBalances = GraphWithBalanceEstimates(graph, nodeParams.routerConf.balanceEstimateHalfLife), + sync = Map.empty) + startWith(NORMAL, data) } when(NORMAL) { @@ -251,6 +261,14 @@ class Router(val nodeParams: NodeParams, watcher: typed.ActorRef[ZmqWatcher.Comm case Event(PeerRoutingMessage(peerConnection, remoteNodeId, r: ReplyShortChannelIdsEnd), d) => stay() using Sync.handleReplyShortChannelIdsEnd(d, RemoteGossip(peerConnection, remoteNodeId), r) + case Event(RouteCouldRelay(route), d) => + stay() using d.copy(graphWithBalances = d.graphWithBalances.routeCouldRelay(route)) + + case Event(RouteDidRelay(route), d) => + stay() using d.copy(graphWithBalances = d.graphWithBalances.routeDidRelay(route)) + + case Event(ChannelCouldNotRelay(amount, hop), d) => + stay() using d.copy(graphWithBalances = d.graphWithBalances.channelCouldNotSend(hop, amount)) } initialize() @@ -302,7 +320,8 @@ object Router { encodingType: EncodingType, channelRangeChunkSize: Int, channelQueryChunkSize: Int, - pathFindingExperimentConf: PathFindingExperimentConf) { + pathFindingExperimentConf: PathFindingExperimentConf, + balanceEstimateHalfLife: FiniteDuration) { require(channelRangeChunkSize <= Sync.MAXIMUM_CHUNK_SIZE, "channel range chunk size exceeds the size of a lightning message") require(channelQueryChunkSize <= Sync.MAXIMUM_CHUNK_SIZE, "channel query chunk size exceeds the size of a lightning message") } @@ -428,6 +447,11 @@ object Router { override def htlcMinimum: MilliSatoshi = 0 msat override def htlcMaximum_opt: Option[MilliSatoshi] = Some(htlcMaximum) } + + def areSame(a: ChannelRelayParams, b: ChannelRelayParams, ignoreHtlcSize: Boolean = false): Boolean = + a.cltvExpiryDelta == b.cltvExpiryDelta && + a.relayFees == b.relayFees && + (ignoreHtlcSize || (a.htlcMinimum == b.htlcMinimum && a.htlcMaximum_opt == b.htlcMaximum_opt)) } // @formatter:on @@ -527,6 +551,10 @@ object Router { def printChannels(): String = hops.map(_.shortChannelId).mkString("->") + def stopAt(nodeId: PublicKey): Route = { + val amountAtStop = hops.reverse.takeWhile(_.nextNodeId != nodeId).foldLeft(amount) { case (amount1, hop) => amount1 + hop.fee(amount1) } + Route(amountAtStop, hops.takeWhile(_.nodeId != nodeId)) + } } case class RouteResponse(routes: Seq[Route]) { @@ -616,10 +644,9 @@ object Router { privateChannels: Map[ByteVector32, PrivateChannel], // indexed by channel id scid2PrivateChannels: Map[ShortChannelId, ByteVector32], // scid to channel_id, only to be used for private channels excludedChannels: Set[ChannelDesc], // those channels are temporarily excluded from route calculation, because their node returned a TemporaryChannelFailure - graph: DirectedGraph, + graphWithBalances: GraphWithBalanceEstimates, sync: Map[PublicKey, Syncing] // keep tracks of channel range queries sent to each peer. If there is an entry in the map, it means that there is an ongoing query for which we have not yet received an 'end' message ) { - def resolve(scid: ShortChannelId): Option[KnownChannel] = { // let's assume this is a real scid channels.get(scid) match { @@ -645,4 +672,13 @@ object Router { def isRelatedTo(c: ChannelAnnouncement, nodeId: PublicKey) = nodeId == c.nodeId1 || nodeId == c.nodeId2 def hasChannels(nodeId: PublicKey, channels: Iterable[PublicChannel]): Boolean = channels.exists(c => isRelatedTo(c.ann, nodeId)) + + /** We know that this route could relay because we have tried it but the payment was eventually cancelled */ + case class RouteCouldRelay(route: Route) + + /** We have relayed using this route. */ + case class RouteDidRelay(route: Route) + + /** We have tried to relay this amount from this channel and it failed. */ + case class ChannelCouldNotRelay(amount: MilliSatoshi, hop: ChannelHop) } diff --git a/eclair-core/src/main/scala/fr/acinq/eclair/router/StaleChannels.scala b/eclair-core/src/main/scala/fr/acinq/eclair/router/StaleChannels.scala index 1eff789583..91ce0d13f2 100644 --- a/eclair-core/src/main/scala/fr/acinq/eclair/router/StaleChannels.scala +++ b/eclair-core/src/main/scala/fr/acinq/eclair/router/StaleChannels.scala @@ -53,14 +53,14 @@ object StaleChannels { staleChannelsToRemove += ChannelDesc(ca.ann.shortChannelId, ca.ann.nodeId2, ca.ann.nodeId1) }) - val graph1 = d.graph.removeEdges(staleChannelsToRemove) + val graphWithBalances1 = d.graphWithBalances.removeEdges(staleChannelsToRemove) staleNodes.foreach { nodeId => log.info("pruning nodeId={} (stale)", nodeId) db.removeNode(nodeId) ctx.system.eventStream.publish(NodeLost(nodeId)) } - d.copy(nodes = d.nodes -- staleNodes, channels = channels1, graph = graph1) + d.copy(nodes = d.nodes -- staleNodes, channels = channels1, graphWithBalances = graphWithBalances1) } def isStale(u: ChannelUpdate): Boolean = isStale(u.timestamp) diff --git a/eclair-core/src/main/scala/fr/acinq/eclair/router/Validation.scala b/eclair-core/src/main/scala/fr/acinq/eclair/router/Validation.scala index a1ca814820..a218504a1e 100644 --- a/eclair-core/src/main/scala/fr/acinq/eclair/router/Validation.scala +++ b/eclair-core/src/main/scala/fr/acinq/eclair/router/Validation.scala @@ -19,7 +19,6 @@ package fr.acinq.eclair.router import akka.actor.typed.scaladsl.adapter.actorRefAdapter import akka.actor.{ActorContext, ActorRef, typed} import akka.event.{DiagnosticLoggingAdapter, LoggingAdapter} -import com.softwaremill.quicklens.ModifyPimp import fr.acinq.bitcoin.scalacompat.Crypto.PublicKey import fr.acinq.bitcoin.scalacompat.Script.{pay2wsh, write} import fr.acinq.eclair.blockchain.bitcoind.ZmqWatcher @@ -195,7 +194,7 @@ object Validation { log.info("pruning shortChannelId={} (spent)", shortChannelId) db.removeChannel(shortChannelId) // NB: this also removes channel updates // we also need to remove updates from the graph - val graph1 = d.graph + val graphWithBalances1 = d.graphWithBalances .removeEdge(ChannelDesc(lostChannel.shortChannelId, lostChannel.nodeId1, lostChannel.nodeId2)) .removeEdge(ChannelDesc(lostChannel.shortChannelId, lostChannel.nodeId2, lostChannel.nodeId1)) @@ -206,7 +205,7 @@ object Validation { db.removeNode(nodeId) ctx.system.eventStream.publish(NodeLost(nodeId)) } - d.copy(nodes = d.nodes -- lostNodes, channels = d.channels - shortChannelId, graph = graph1) + d.copy(nodes = d.nodes -- lostNodes, channels = d.channels - shortChannelId, graphWithBalances = graphWithBalances1) } def handleNodeAnnouncement(d: Data, db: NetworkDb, origins: Set[GossipOrigin], n: NodeAnnouncement, wasStashed: Boolean = false)(implicit ctx: ActorContext, log: LoggingAdapter): Data = { @@ -285,8 +284,8 @@ object Validation { val origins1 = d.rebroadcast.updates(u) ++ origins // NB: we update the channels because the balances may have changed even if the channel_update is the same. val pc1 = pc.applyChannelUpdate(update) - val graph1 = d.graph.addEdge(GraphEdge(u, pc1)) - d.copy(rebroadcast = d.rebroadcast.copy(updates = d.rebroadcast.updates + (u -> origins1)), channels = d.channels + (pc.shortChannelId -> pc1), graph = graph1) + val graphWithBalances1 = d.graphWithBalances.addEdge(GraphEdge(u, pc1)) + d.copy(rebroadcast = d.rebroadcast.copy(updates = d.rebroadcast.updates + (u -> origins1)), channels = d.channels + (pc.shortChannelId -> pc1), graphWithBalances = graphWithBalances1) } else if (StaleChannels.isStale(u)) { log.debug("ignoring {} (stale)", u) sendDecision(origins, GossipDecision.Stale(u)) @@ -298,8 +297,8 @@ object Validation { case Left(_) => // NB: we update the graph because the balances may have changed even if the channel_update is the same. val pc1 = pc.applyChannelUpdate(update) - val graph1 = d.graph.addEdge(GraphEdge(u, pc1)) - d.copy(channels = d.channels + (pc.shortChannelId -> pc1), graph = graph1) + val graphWithBalances1 = d.graphWithBalances.addEdge(GraphEdge(u, pc1)) + d.copy(channels = d.channels + (pc.shortChannelId -> pc1), graphWithBalances = graphWithBalances1) case Right(_) => d } } else if (!Announcements.checkSig(u, pc.getNodeIdSameSideAs(u))) { @@ -314,14 +313,14 @@ object Validation { db.updateChannel(u) // update the graph val pc1 = pc.applyChannelUpdate(update) - val graph1 = if (u.channelFlags.isEnabled) { + val graphWithBalances1 = if (u.channelFlags.isEnabled) { update.left.foreach(_ => log.info("added local shortChannelId={} public={} to the network graph", u.shortChannelId, publicChannel)) - d.graph.addEdge(GraphEdge(u, pc1)) + d.graphWithBalances.addEdge(GraphEdge(u, pc1)) } else { update.left.foreach(_ => log.info("removed local shortChannelId={} public={} from the network graph", u.shortChannelId, publicChannel)) - d.graph.removeEdge(ChannelDesc(u, pc1.ann)) + d.graphWithBalances.removeEdge(ChannelDesc(u, pc1.ann)) } - d.copy(channels = d.channels + (pc.shortChannelId -> pc1), rebroadcast = d.rebroadcast.copy(updates = d.rebroadcast.updates + (u -> origins)), graph = graph1) + d.copy(channels = d.channels + (pc.shortChannelId -> pc1), rebroadcast = d.rebroadcast.copy(updates = d.rebroadcast.updates + (u -> origins)), graphWithBalances = graphWithBalances1) } else { log.debug("added channel_update for shortChannelId={} public={} flags={} {}", u.shortChannelId, publicChannel, u.channelFlags, u) sendDecision(origins, GossipDecision.Accepted(u)) @@ -329,9 +328,9 @@ object Validation { db.updateChannel(u) // we also need to update the graph val pc1 = pc.applyChannelUpdate(update) - val graph1 = d.graph.addEdge(GraphEdge(u, pc1)) + val graphWithBalances1 = d.graphWithBalances.addEdge(GraphEdge(u, pc1)) update.left.foreach(_ => log.info("added local shortChannelId={} public={} to the network graph", u.shortChannelId, publicChannel)) - d.copy(channels = d.channels + (pc.shortChannelId -> pc1), privateChannels = d.privateChannels - pc1.channelId, rebroadcast = d.rebroadcast.copy(updates = d.rebroadcast.updates + (u -> origins)), graph = graph1) + d.copy(channels = d.channels + (pc.shortChannelId -> pc1), privateChannels = d.privateChannels - pc1.channelId, rebroadcast = d.rebroadcast.copy(updates = d.rebroadcast.updates + (u -> origins)), graphWithBalances = graphWithBalances1) } case Some(pc: PrivateChannel) => val publicChannel = false @@ -354,23 +353,23 @@ object Validation { ctx.system.eventStream.publish(ChannelUpdatesReceived(u :: Nil)) // we also need to update the graph val pc1 = pc.applyChannelUpdate(update) - val graph1 = if (u.channelFlags.isEnabled) { + val graphWithBalances1 = if (u.channelFlags.isEnabled) { update.left.foreach(_ => log.info("added local channelId={} public={} to the network graph", pc.channelId, publicChannel)) - d.graph.addEdge(GraphEdge(u, pc1)) + d.graphWithBalances.addEdge(GraphEdge(u, pc1)) } else { update.left.foreach(_ => log.info("removed local channelId={} public={} from the network graph", pc.channelId, publicChannel)) - d.graph.removeEdge(ChannelDesc(u, pc1)) + d.graphWithBalances.removeEdge(ChannelDesc(u, pc1)) } - d.copy(privateChannels = d.privateChannels + (pc.channelId -> pc1), graph = graph1) + d.copy(privateChannels = d.privateChannels + (pc.channelId -> pc1), graphWithBalances = graphWithBalances1) } else { log.debug("added channel_update for channelId={} public={} flags={} {}", pc.channelId, publicChannel, u.channelFlags, u) sendDecision(origins, GossipDecision.Accepted(u)) ctx.system.eventStream.publish(ChannelUpdatesReceived(u :: Nil)) // we also need to update the graph val pc1 = pc.applyChannelUpdate(update) - val graph1 = d.graph.addEdge(GraphEdge(u, pc1)) + val graphWithBalances1 = d.graphWithBalances.addEdge(GraphEdge(u, pc1)) update.left.foreach(_ => log.info("added local channelId={} public={} to the network graph", pc.channelId, publicChannel)) - d.copy(privateChannels = d.privateChannels + (pc.channelId -> pc1), graph = graph1) + d.copy(privateChannels = d.privateChannels + (pc.channelId -> pc1), graphWithBalances = graphWithBalances1) } case None if d.awaiting.keys.exists(c => c.shortChannelId == u.shortChannelId) => // channel is currently being validated @@ -467,38 +466,38 @@ object Validation { val desc1 = ChannelDesc(shortChannelId, localNodeId, remoteNodeId) val desc2 = ChannelDesc(shortChannelId, remoteNodeId, localNodeId) // we remove the corresponding updates from the graph - val graph1 = d.graph + val graphWithBalances1 = d.graphWithBalances .removeEdge(desc1) .removeEdge(desc2) // and we remove the channel and channel_update from our state - d.copy(privateChannels = d.privateChannels - channelId, graph = graph1) + d.copy(privateChannels = d.privateChannels - channelId, graphWithBalances = graphWithBalances1) } else { d } } def handleAvailableBalanceChanged(d: Data, e: AvailableBalanceChanged)(implicit log: LoggingAdapter): Data = { - val (publicChannels1, graph1) = d.channels.get(e.shortChannelId) match { + val (publicChannels1, graphWithBalances1) = d.channels.get(e.shortChannelId) match { case Some(pc) => val pc1 = pc.updateBalances(e.commitments) log.debug("public channel balance updated: {}", pc1) val update_opt = if (e.commitments.localNodeId == pc1.ann.nodeId1) pc1.update_1_opt else pc1.update_2_opt - val graph1 = update_opt.map(u => d.graph.addEdge(GraphEdge(u, pc1))).getOrElse(d.graph) - (d.channels + (pc.ann.shortChannelId -> pc1), graph1) + val graphWithBalances1 = update_opt.map(u => d.graphWithBalances.addEdge(GraphEdge(u, pc1))).getOrElse(d.graphWithBalances) + (d.channels + (pc.ann.shortChannelId -> pc1), graphWithBalances1) case None => - (d.channels, d.graph) + (d.channels, d.graphWithBalances) } - val (privateChannels1, graph2) = d.privateChannels.get(e.channelId) match { + val (privateChannels1, graphWithBalances2) = d.privateChannels.get(e.channelId) match { case Some(pc) => val pc1 = pc.updateBalances(e.commitments) log.debug("private channel balance updated: {}", pc1) val update_opt = if (e.commitments.localNodeId == pc1.nodeId1) pc1.update_1_opt else pc1.update_2_opt - val graph2 = update_opt.map(u => graph1.addEdge(GraphEdge(u, pc1))).getOrElse(graph1) - (d.privateChannels + (e.channelId -> pc1), graph2) + val graphWithBalances2 = update_opt.map(u => graphWithBalances1.addEdge(GraphEdge(u, pc1))).getOrElse(graphWithBalances1) + (d.privateChannels + (e.channelId -> pc1), graphWithBalances2) case None => - (d.privateChannels, graph1) + (d.privateChannels, graphWithBalances1) } - d.copy(channels = publicChannels1, privateChannels = privateChannels1, graph = graph2) + d.copy(channels = publicChannels1, privateChannels = privateChannels1, graphWithBalances = graphWithBalances2) } } diff --git a/eclair-core/src/test/scala/fr/acinq/eclair/TestConstants.scala b/eclair-core/src/test/scala/fr/acinq/eclair/TestConstants.scala index 2bc65f917a..aedfb336f3 100644 --- a/eclair-core/src/test/scala/fr/acinq/eclair/TestConstants.scala +++ b/eclair-core/src/test/scala/fr/acinq/eclair/TestConstants.scala @@ -186,7 +186,8 @@ object TestConstants { maxParts = 10, ), experimentName = "alice-test-experiment", - experimentPercentage = 100))) + experimentPercentage = 100))), + balanceEstimateHalfLife = 1 day, ), socksProxy_opt = None, maxPaymentAttempts = 5, @@ -325,7 +326,8 @@ object TestConstants { maxParts = 10, ), experimentName = "bob-test-experiment", - experimentPercentage = 100))) + experimentPercentage = 100))), + balanceEstimateHalfLife = 1 day, ), socksProxy_opt = None, maxPaymentAttempts = 5, diff --git a/eclair-core/src/test/scala/fr/acinq/eclair/payment/PaymentLifecycleSpec.scala b/eclair-core/src/test/scala/fr/acinq/eclair/payment/PaymentLifecycleSpec.scala index e1882d59d5..c7c010a55c 100644 --- a/eclair-core/src/test/scala/fr/acinq/eclair/payment/PaymentLifecycleSpec.scala +++ b/eclair-core/src/test/scala/fr/acinq/eclair/payment/PaymentLifecycleSpec.scala @@ -121,6 +121,8 @@ class PaymentLifecycleSpec extends BaseRouterSpec { awaitCond(nodeParams.db.payments.getOutgoingPayment(id).exists(_.status.isInstanceOf[OutgoingPaymentStatus.Succeeded])) metricsListener.expectNoMessage() + + assert(routerForwarder.expectMsgType[RouteDidRelay].route === route) } test("send to route (node_id only)") { routerFixture => @@ -148,6 +150,8 @@ class PaymentLifecycleSpec extends BaseRouterSpec { awaitCond(nodeParams.db.payments.getOutgoingPayment(id).exists(_.status.isInstanceOf[OutgoingPaymentStatus.Succeeded])) metricsListener.expectNoMessage() + + assert(routerForwarder.expectMsgType[RouteDidRelay].route.hops.map(_.nodeId) === Seq(a, b, c)) } test("send to route (nodes not found in the graph)") { routerFixture => @@ -163,6 +167,8 @@ class PaymentLifecycleSpec extends BaseRouterSpec { assert(failureMessage == "Not all the nodes in the supplied route are connected with public channels") metricsListener.expectNoMessage() + + routerForwarder.expectNoMessage(100 millis) } test("send to route (channels not found in the graph)") { routerFixture => @@ -178,6 +184,8 @@ class PaymentLifecycleSpec extends BaseRouterSpec { assert(failureMessage == "The sequence of channels provided cannot be used to build a route to the target node") metricsListener.expectNoMessage() + + routerForwarder.expectNoMessage(100 millis) } test("send to route (routing hints)") { routerFixture => @@ -205,6 +213,8 @@ class PaymentLifecycleSpec extends BaseRouterSpec { awaitCond(nodeParams.db.payments.getOutgoingPayment(id).exists(_.status.isInstanceOf[OutgoingPaymentStatus.Succeeded])) metricsListener.expectNoMessage() + + assert(routerForwarder.expectMsgType[RouteDidRelay].route.hops.map(_.nodeId) === Seq(a, b, c)) } test("payment failed (route not found)") { routerFixture => @@ -228,6 +238,8 @@ class PaymentLifecycleSpec extends BaseRouterSpec { assert(metrics.amount == defaultAmountMsat) assert(metrics.fees == 4260000.msat) metricsListener.expectNoMessage() + + routerForwarder.expectNoMessage(100 millis) } test("payment failed (route too expensive)") { routerFixture => @@ -258,6 +270,8 @@ class PaymentLifecycleSpec extends BaseRouterSpec { assert(metrics.amount == defaultAmountMsat) assert(metrics.fees == 100.msat) metricsListener.expectNoMessage() + + routerForwarder.expectNoMessage(100 millis) } test("payment failed (cannot build onion)") { routerFixture => @@ -276,6 +290,8 @@ class PaymentLifecycleSpec extends BaseRouterSpec { assert(pf.failures.length == 1) assert(pf.failures.head.isInstanceOf[LocalFailure]) awaitCond(nodeParams.db.payments.getOutgoingPayment(id).exists(_.status.isInstanceOf[OutgoingPaymentStatus.Failed])) + + routerForwarder.expectNoMessage(100 millis) } test("payment failed (unparsable failure)") { routerFixture => @@ -319,6 +335,8 @@ class PaymentLifecycleSpec extends BaseRouterSpec { assert(metrics.amount == defaultAmountMsat) assert(metrics.fees == 4260000.msat) metricsListener.expectNoMessage() + + routerForwarder.expectNoMessage(100 millis) } test("payment failed (local error)") { routerFixture => @@ -450,6 +468,7 @@ class PaymentLifecycleSpec extends BaseRouterSpec { sender.send(paymentFSM, addCompleted(HtlcResult.RemoteFail(UpdateFailHtlc(ByteVector32.Zeroes, 0, Sphinx.FailurePacket.create(sharedSecrets1.head._1, failure))))) // payment lifecycle will ask the router to temporarily exclude this channel from its route calculations + assert(routerForwarder.expectMsgType[ChannelCouldNotRelay].hop.shortChannelId == update_bc.shortChannelId) routerForwarder.expectMsg(ExcludeChannel(ChannelDesc(update_bc.shortChannelId, b, c))) routerForwarder.forward(routerFixture.router) // payment lifecycle forwards the embedded channelUpdate to the router @@ -512,6 +531,8 @@ class PaymentLifecycleSpec extends BaseRouterSpec { // this time the router can't find a route: game over assert(sender.expectMsgType[PaymentFailed].failures == RemoteFailure(route1.amount, route1.hops, Sphinx.DecryptedFailurePacket(b, failure)) :: RemoteFailure(route2.amount, route2.hops, Sphinx.DecryptedFailurePacket(b, failure2)) :: LocalFailure(defaultAmountMsat, Nil, RouteNotFound) :: Nil) awaitCond(nodeParams.db.payments.getOutgoingPayment(id).exists(_.status.isInstanceOf[OutgoingPaymentStatus.Failed])) + + routerForwarder.expectNoMessage(100 millis) } test("payment failed (Update in last attempt)") { routerFixture => @@ -530,6 +551,7 @@ class PaymentLifecycleSpec extends BaseRouterSpec { val failure = TemporaryChannelFailure(update_bc) sender.send(paymentFSM, addCompleted(HtlcResult.RemoteFail(UpdateFailHtlc(ByteVector32.Zeroes, 0, Sphinx.FailurePacket.create(sharedSecrets1.head._1, failure))))) // we should temporarily exclude that channel + assert(routerForwarder.expectMsgType[ChannelCouldNotRelay].hop.shortChannelId == update_bc.shortChannelId) routerForwarder.expectMsg(ExcludeChannel(ChannelDesc(update_bc.shortChannelId, b, c))) routerForwarder.expectMsg(update_bc) @@ -605,6 +627,7 @@ class PaymentLifecycleSpec extends BaseRouterSpec { val failureOnion = Sphinx.FailurePacket.wrap(Sphinx.FailurePacket.create(sharedSecrets1(1)._1, failure), sharedSecrets1.head._1) sender.send(paymentFSM, addCompleted(HtlcResult.RemoteFail(UpdateFailHtlc(ByteVector32.Zeroes, 0, failureOnion)))) + assert(routerForwarder.expectMsgType[RouteCouldRelay].route.hops.map(_.shortChannelId) == Seq(update_ab, update_bc).map(_.shortChannelId)) routerForwarder.expectMsg(channelUpdate_cd_disabled) routerForwarder.expectMsg(ExcludeChannel(ChannelDesc(update_cd.shortChannelId, c, d))) } @@ -678,6 +701,8 @@ class PaymentLifecycleSpec extends BaseRouterSpec { assert(metrics.amount == defaultAmountMsat) assert(metrics.fees == 730.msat) metricsListener.expectNoMessage() + + assert(routerForwarder.expectMsgType[RouteDidRelay].route.hops.map(_.shortChannelId) == Seq(update_ab, update_bc, update_cd).map(_.shortChannelId)) } test("payment succeeded to a channel with fees=0") { routerFixture => @@ -731,6 +756,8 @@ class PaymentLifecycleSpec extends BaseRouterSpec { assert(metrics.amount == defaultAmountMsat) assert(metrics.fees == 0.msat) metricsListener.expectNoMessage() + + assert(routerForwarder.expectMsgType[RouteDidRelay].route.hops.map(_.shortChannelId) == Seq(update_ab, channelUpdate_bh).map(_.shortChannelId)) } test("filter errors properly") { () => @@ -802,6 +829,8 @@ class PaymentLifecycleSpec extends BaseRouterSpec { sender.expectMsgType[PaymentSent] assert(nodeParams.db.payments.getOutgoingPayment(id) == None) eventListener.expectNoMessage(100 millis) + + assert(routerForwarder.expectMsgType[RouteDidRelay].route.hops.map(_.nextNodeId) == Seq(b, c, d)) } test("send to route (no retry on error") { () => diff --git a/eclair-core/src/test/scala/fr/acinq/eclair/router/BalanceEstimateSpec.scala b/eclair-core/src/test/scala/fr/acinq/eclair/router/BalanceEstimateSpec.scala new file mode 100644 index 0000000000..4e14e981b6 --- /dev/null +++ b/eclair-core/src/test/scala/fr/acinq/eclair/router/BalanceEstimateSpec.scala @@ -0,0 +1,292 @@ +/* + * Copyright 2021 ACINQ SAS + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package fr.acinq.eclair.router + +import fr.acinq.bitcoin.scalacompat.Crypto.PublicKey +import fr.acinq.bitcoin.scalacompat.{Satoshi, SatoshiLong} +import fr.acinq.eclair.payment.Bolt11Invoice.ExtraHop +import fr.acinq.eclair.router.Graph.GraphStructure.{DirectedGraph, GraphEdge} +import fr.acinq.eclair.router.Router.{ChannelDesc, ChannelRelayParams} +import fr.acinq.eclair.{CltvExpiryDelta, MilliSatoshiLong, ShortChannelId, TimestampSecond, randomKey} +import org.scalactic.Tolerance.convertNumericToPlusOrMinusWrapper +import org.scalatest.funsuite.AnyFunSuite + +import scala.concurrent.duration.DurationInt + +class BalanceEstimateSpec extends AnyFunSuite { + + def isValid(balance: BalanceEstimate): Boolean = { + balance.low >= 0.msat && + balance.low <= balance.high && + balance.high <= balance.maxCapacity + } + + def makeEdge(nodeId1: PublicKey, nodeId2: PublicKey, channelId: Long, capacity: Satoshi): GraphEdge = + GraphEdge( + ChannelDesc(ShortChannelId(channelId), nodeId1, nodeId2), + ChannelRelayParams.FromHint(ExtraHop(nodeId1, ShortChannelId(channelId), 0 msat, 0, CltvExpiryDelta(0)), 0 msat), + capacity, None) + + def makeEdge(channelId: Long, capacity: Satoshi): GraphEdge = + makeEdge(randomKey().publicKey, randomKey().publicKey, channelId, capacity) + + test("no balance information") { + val balance = BalanceEstimate.empty(1 day).addEdge(makeEdge(0, 100 sat)) + assert(isValid(balance)) + assert(balance.canSend(0 msat) === 1.0 +- 0.001) + assert(balance.canSend(1 msat) === 1.0 +- 0.001) + assert(balance.canSend(25000 msat) === 0.75 +- 0.001) + assert(balance.canSend(50000 msat) === 0.5 +- 0.001) + assert(balance.canSend(75000 msat) === 0.25 +- 0.001) + assert(balance.canSend(99999 msat) === 0.0 +- 0.001) + assert(balance.canSend(100000 msat) === 0.0 +- 0.001) + } + + test("add and remove channels") { + val a = makeEdge(0, 200 sat) + val b = makeEdge(1, 100 sat) + val c = makeEdge(2, 800 sat) + val d = makeEdge(3, 120 sat) + val e = makeEdge(4, 190 sat) + val balance = BalanceEstimate.empty(1 day) + .addEdge(a) + .addEdge(b) + .removeEdge(a.desc) + assert(isValid(balance)) + assert(balance.maxCapacity == 100.sat) + val balance1 = balance + .addEdge(c) + .removeEdge(c.desc) + .removeEdge(b.desc) + .addEdge(d) + .addEdge(e) + assert(isValid(balance1)) + assert(balance1.maxCapacity == 190.sat) + val balance2 = balance1 + .removeEdge(d.desc) + .removeEdge(e.desc) + assert(isValid(balance2)) + assert(balance2.maxCapacity == 0.sat) + assert(balance2.capacities.isEmpty) + } + + test("update bounds based on what could then could not be sent (increasing amounts)") { + val now = TimestampSecond.now() + val balance = BalanceEstimate.empty(1 day) + // NB: the number of channels has no impact here + .addEdge(makeEdge(0, 100 sat)) + .addEdge(makeEdge(1, 100 sat)) + .couldSend(24000 msat, now) + .couldNotSend(30000 msat, now) + assert(balance.canSend(0 msat) === 1.0 +- 0.001) + assert(balance.canSend(23999 msat) === 1.0 +- 0.001) + assert(balance.canSend(24000 msat) === 1.0 +- 0.001) + assert(balance.canSend(24001 msat) === 1.0 +- 0.001) + assert(balance.canSend(27000 msat) === 0.5 +- 0.001) + assert(balance.canSend(29999 msat) === 0.0 +- 0.001) + assert(balance.canSend(30000 msat) === 0.0 +- 0.001) + assert(balance.canSend(30001 msat) === 0.0 +- 0.001) + assert(balance.canSend(100000 msat) === 0.0 +- 0.001) + } + + test("update bounds based on what could then could not be sent (decreasing amounts)") { + val now = TimestampSecond.now() + val balance = BalanceEstimate.empty(1 day) + // NB: the number of channels has no impact here + .addEdge(makeEdge(0, 75 sat)) + .addEdge(makeEdge(1, 100 sat)) + .couldSend(26000 msat, now) + .couldNotSend(14000 msat, now) + assert(isValid(balance)) + assert(balance.canSend(0 msat) === 1.0 +- 0.001) + assert(balance.canSend(1 msat) === 1.0 +- 0.001) + assert(balance.canSend(7000 msat) === 0.5 +- 0.001) + assert(balance.canSend(14000 msat) === 0.0 +- 0.001) + assert(balance.canSend(26000 msat) === 0.0 +- 0.001) + assert(balance.canSend(99999 msat) === 0.0 +- 0.001) + assert(balance.canSend(100000 msat) === 0.0 +- 0.001) + } + + test("update bounds based on what could not then could be sent (increasing amounts)") { + val now = TimestampSecond.now() + val balance = BalanceEstimate.empty(1 day) + // NB: the number of channels has no impact here + .addEdge(makeEdge(0, 100 sat)) + .addEdge(makeEdge(1, 50 sat)) + .couldNotSend(26000 msat, now) + .couldSend(30000 msat, now) + assert(isValid(balance)) + assert(balance.canSend(0 msat) === 1.0 +- 0.001) + assert(balance.canSend(1 msat) === 1.0 +- 0.001) + assert(balance.canSend(30000 msat) === 1.0 +- 0.001) + assert(balance.canSend(65000 msat) === 0.5 +- 0.001) + assert(balance.canSend(99999 msat) === 0.0 +- 0.001) + assert(balance.canSend(100000 msat) === 0.0 +- 0.001) + } + + test("update bounds based on what could not then could be sent (decreasing amounts)") { + val now = TimestampSecond.now() + val balance = BalanceEstimate.empty(1 day) + // NB: the number of channels has no impact here + .addEdge(makeEdge(0, 100 sat)) + .addEdge(makeEdge(1, 50 sat)) + .couldNotSend(30000 msat, now) + .couldSend(20000 msat, now) + assert(isValid(balance)) + assert(balance.canSend(0 msat) === 1.0 +- 0.001) + assert(balance.canSend(1 msat) === 1.0 +- 0.001) + assert(balance.canSend(20000 msat) === 1.0 +- 0.001) + assert(balance.canSend(25000 msat) === 0.5 +- 0.001) + assert(balance.canSend(30000 msat) === 0.0 +- 0.001) + assert(balance.canSend(99999 msat) === 0.0 +- 0.001) + assert(balance.canSend(100000 msat) === 0.0 +- 0.001) + } + + test("decay restores baseline bounds") { + val longAgo = TimestampSecond.now() - 30.seconds + val balance = BalanceEstimate.empty(1 second) + .addEdge(makeEdge(0, 100 sat)) + .couldNotSend(32000 msat, longAgo) + .couldSend(28000 msat, longAgo) + assert(isValid(balance)) + assert(balance.canSend(1 msat) === 1.0 +- 0.01) + assert(balance.canSend(33333 msat) === 0.666 +- 0.01) + assert(balance.canSend(66666 msat) === 0.333 +- 0.01) + assert(balance.canSend(99999 msat) === 0.0 +- 0.01) + } + + test("sending on single channel shifts amounts") { + val now = TimestampSecond.now() + val balance = BalanceEstimate.empty(1 day) + .addEdge(makeEdge(0, 100 sat)) + .couldNotSend(80000 msat, now) + .couldSend(50000 msat, now) + assert(isValid(balance)) + assert(balance.canSend(50000 msat) === 1.0 +- 0.001) + assert(balance.canSend(80000 msat) === 0.0 +- 0.001) + val balanceAfterSend = balance.didSend(20000 msat, now) + assert(isValid(balanceAfterSend)) + assert(balanceAfterSend.canSend(30000 msat) === 1.0 +- 0.001) + assert(balanceAfterSend.canSend(45000 msat) === 0.5 +- 0.001) + assert(balanceAfterSend.canSend(60000 msat) === 0.0 +- 0.001) + } + + test("sending on single channel after decay") { + val longAgo = TimestampSecond.now() - 60.seconds + val now = TimestampSecond.now() + val balance = BalanceEstimate.empty(1 second) + .addEdge(makeEdge(0, 100 sat)) + .couldNotSend(80000 msat, longAgo) + .couldSend(50000 msat, longAgo) + .didSend(40000 msat, now) + assert(isValid(balance)) + assert(balance.canSend(0 msat) === 1.0 +- 0.01) + assert(balance.canSend(10000 msat) <= 0.9) + assert(balance.canSend(50000 msat) >= 0.1) + assert(balance.canSend(60000 msat) === 0.0 +- 0.01) + } + + test("sending on parallel channels shifts low only") { + val now = TimestampSecond.now() + val balance = BalanceEstimate.empty(1 day) + .addEdge(makeEdge(0, 100 sat)) + .addEdge(makeEdge(1, 80 sat)) + .couldNotSend(80000 msat, now) + .couldSend(50000 msat, now) + assert(isValid(balance)) + assert(balance.canSend(50000 msat) === 1.0 +- 0.001) + assert(balance.canSend(80000 msat) === 0.0 +- 0.001) + val balanceAfterSend = balance.didSend(20000 msat, now) + assert(isValid(balanceAfterSend)) + assert(balanceAfterSend.canSend(30000 msat) === 1.0 +- 0.001) + assert(balanceAfterSend.canSend(70000 msat) > 0.1) + } + + test("receiving on single channel shifts amounts") { + val now = TimestampSecond.now() + val balance = BalanceEstimate.empty(1 day) + .addEdge(makeEdge(0, 100 sat)) + .couldNotSend(80000 msat, now) + .couldSend(50000 msat, now) + .didReceive(10000 msat, now) + assert(isValid(balance)) + assert(balance.canSend(60000 msat) === 1.0 +- 0.001) + assert(balance.canSend(75000 msat) === 0.5 +- 0.001) + assert(balance.canSend(90000 msat) === 0.0 +- 0.001) + } + + test("receiving on single channel after decay") { + val longAgo = TimestampSecond.now() - 60.seconds + val now = TimestampSecond.now() + val balance = BalanceEstimate.empty(1 second) + .addEdge(makeEdge(0, 100 sat)) + .couldNotSend(80000 msat, longAgo) + .couldSend(50000 msat, longAgo) + .didReceive(10000 msat, now) + assert(isValid(balance)) + assert(balance.canSend(10000 msat) >= 0.9) + assert(balance.canSend(20000 msat) <= 0.9) + assert(balance.canSend(80000 msat) >= 0.1) + assert(balance.canSend(90000 msat) <= 0.1) + } + + test("receiving on parallel channels shifts high only") { + val now = TimestampSecond.now() + val balance = BalanceEstimate.empty(1 day) + .addEdge(makeEdge(0, 100 sat)) + .addEdge(makeEdge(1, 80 sat)) + .couldNotSend(70000 msat, now) + .couldSend(50000 msat, now) + .didReceive(20000 msat, now) + assert(isValid(balance)) + assert(balance.canSend(50000 msat) === 1.0 +- 0.001) + assert(balance.canSend(70000 msat) === 0.5 +- 0.001) + assert(balance.canSend(90000 msat) === 0.0 +- 0.001) + } + + test("baseline from graph") { + val (a, b, c) = (randomKey().publicKey, randomKey().publicKey, randomKey().publicKey) + val g = DirectedGraph(Seq( + makeEdge(a, b, 1, 100 sat), + makeEdge(b, a, 1, 100 sat), + makeEdge(a, b, 2, 110 sat), + makeEdge(b, a, 3, 120 sat), + makeEdge(a, c, 4, 130 sat), + makeEdge(c, a, 4, 130 sat), + makeEdge(a, c, 5, 140 sat), + makeEdge(c, a, 5, 140 sat), + makeEdge(b, c, 6, 150 sat), + )) + + val graphWithBalances = GraphWithBalanceEstimates(g, 1 day) + // NB: it doesn't matter which edge is selected, the balance estimation takes all existing edges into account. + val edge_ab = makeEdge(a, b, 1, 10 sat) + val edge_ba = makeEdge(b, a, 1, 10 sat) + val edge_bc = makeEdge(b, c, 6, 10 sat) + assert(graphWithBalances.canSend(27500 msat, edge_ab) === 0.75 +- 0.01) + assert(graphWithBalances.canSend(55000 msat, edge_ab) === 0.5 +- 0.01) + assert(graphWithBalances.canSend(30000 msat, edge_ba) === 0.75 +- 0.01) + assert(graphWithBalances.canSend(60000 msat, edge_ba) === 0.5 +- 0.01) + assert(graphWithBalances.canSend(75000 msat, edge_bc) === 0.5 +- 0.01) + assert(graphWithBalances.canSend(100000 msat, edge_bc) === 0.33 +- 0.01) + val unknownEdge = makeEdge(42, 40 sat) + assert(graphWithBalances.canSend(10000 msat, unknownEdge) === 0.75 +- 0.01) + assert(graphWithBalances.canSend(20000 msat, unknownEdge) === 0.5 +- 0.01) + assert(graphWithBalances.canSend(30000 msat, unknownEdge) === 0.25 +- 0.01) + } + +} diff --git a/eclair-core/src/test/scala/fr/acinq/eclair/router/RouterSpec.scala b/eclair-core/src/test/scala/fr/acinq/eclair/router/RouterSpec.scala index d5dfcd1850..4c5b9d0eb8 100644 --- a/eclair-core/src/test/scala/fr/acinq/eclair/router/RouterSpec.scala +++ b/eclair-core/src/test/scala/fr/acinq/eclair/router/RouterSpec.scala @@ -617,7 +617,7 @@ class RouterSpec extends BaseRouterSpec { assert(Set(channel_ab.meta_opt.map(_.balance1), channel_ab.meta_opt.map(_.balance2)) == balances) // And the graph should be updated too. sender.send(router, Router.GetRouterData) - val g = sender.expectMsgType[Data].graph + val g = sender.expectMsgType[Data].graphWithBalances.graph val edge_ab = g.getEdge(ChannelDesc(scid_ab, a, b)).get val edge_ba = g.getEdge(ChannelDesc(scid_ab, b, a)).get assert(edge_ab.capacity == channel_ab.capacity && edge_ba.capacity == channel_ab.capacity) @@ -640,7 +640,7 @@ class RouterSpec extends BaseRouterSpec { assert(Set(channel_ab.meta_opt.map(_.balance1), channel_ab.meta_opt.map(_.balance2)) == balances) // And the graph should be updated too. sender.send(router, Router.GetRouterData) - val g = sender.expectMsgType[Data].graph + val g = sender.expectMsgType[Data].graphWithBalances.graph val edge_ab = g.getEdge(ChannelDesc(scid_ab, a, b)).get val edge_ba = g.getEdge(ChannelDesc(scid_ab, b, a)).get assert(edge_ab.capacity == channel_ab.capacity && edge_ba.capacity == channel_ab.capacity) @@ -658,7 +658,7 @@ class RouterSpec extends BaseRouterSpec { assert(Set(channel_ab.meta_opt.map(_.balance1), channel_ab.meta_opt.map(_.balance2)) == balances) // And the graph should be updated too. sender.send(router, Router.GetRouterData) - val g = sender.expectMsgType[Data].graph + val g = sender.expectMsgType[Data].graphWithBalances.graph val edge_ab = g.getEdge(ChannelDesc(scid_ab, a, b)).get val edge_ba = g.getEdge(ChannelDesc(scid_ab, b, a)).get assert(edge_ab.capacity == channel_ab.capacity && edge_ba.capacity == channel_ab.capacity) @@ -676,7 +676,7 @@ class RouterSpec extends BaseRouterSpec { val channel_ag = data.privateChannels(channelId_ag_private) assert(Set(channel_ag.meta.balance1, channel_ag.meta.balance2) == balances) // And the graph should be updated too. - val edge_ag = data.graph.getEdge(ChannelDesc(scid_ag_private, a, g)).get + val edge_ag = data.graphWithBalances.graph.getEdge(ChannelDesc(scid_ag_private, a, g)).get assert(edge_ag.capacity == channel_ag.capacity) assert(edge_ag.balance_opt == Some(33000000 msat)) }