Skip to content

Commit

Permalink
Allow disabling wake-up
Browse files Browse the repository at this point in the history
This makes testing easier. We also do small refactorings in the relay
actors without any behavior changes.
  • Loading branch information
t-bast committed Aug 27, 2024
1 parent bfb9090 commit 3e23b85
Show file tree
Hide file tree
Showing 10 changed files with 44 additions and 33 deletions.
1 change: 1 addition & 0 deletions eclair-core/src/main/resources/reference.conf
Original file line number Diff line number Diff line change
Expand Up @@ -321,6 +321,7 @@ eclair {
// When relaying payments or messages to mobile peers who are disconnected, we may try to wake them up using a mobile
// notification system, or we attempt connecting to the last known address.
peer-wake-up {
enabled = false
timeout = 60 seconds
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -614,6 +614,7 @@ object NodeParams extends Logging {
interval = FiniteDuration(config.getDuration("db.revoked-htlc-info-cleaner.interval").getSeconds, TimeUnit.SECONDS)
),
peerWakeUpConfig = PeerReadyNotifier.WakeUpConfig(
enabled = config.getBoolean("peer-wake-up.enabled"),
timeout = FiniteDuration(config.getDuration("peer-wake-up.timeout").getSeconds, TimeUnit.SECONDS)
),
)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -97,7 +97,7 @@ object PeerReadyManager {
*/
object PeerReadyNotifier {

case class WakeUpConfig(timeout: FiniteDuration)
case class WakeUpConfig(enabled: Boolean, timeout: FiniteDuration)

// @formatter:off
sealed trait Command
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -145,8 +145,8 @@ class ChannelRelay private(nodeParams: NodeParams,

def start(): Behavior[Command] = {
walletNodeId_opt match {
case Some(walletNodeId) => wakeUp(walletNodeId)
case None =>
case Some(walletNodeId) if nodeParams.peerWakeUpConfig.enabled => wakeUp(walletNodeId)
case _ =>
context.self ! DoRelay
relay(Seq.empty)
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -287,8 +287,8 @@ class NodeRelay private(nodeParams: NodeParams,
*/
private def ensureRecipientReady(upstream: Upstream.Hot.Trampoline, recipient: Recipient, nextPayload: IntermediatePayload.NodeRelay, nextPacket_opt: Option[OnionRoutingPacket]): Behavior[Command] = {
nextWalletNodeId(nodeParams, recipient) match {
case Some(walletNodeId) => waitForPeerReady(upstream, walletNodeId, recipient, nextPayload, nextPacket_opt)
case None => relay(upstream, recipient, nextPayload, nextPacket_opt)
case Some(walletNodeId) if nodeParams.peerWakeUpConfig.enabled => waitForPeerReady(upstream, walletNodeId, recipient, nextPayload, nextPacket_opt)
case _ => relay(upstream, recipient, nextPayload, nextPacket_opt)
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -232,7 +232,7 @@ object TestConstants {
),
purgeInvoicesInterval = None,
revokedHtlcInfoCleanerConfig = RevokedHtlcInfoCleaner.Config(10, 100 millis),
peerWakeUpConfig = PeerReadyNotifier.WakeUpConfig(30 seconds),
peerWakeUpConfig = PeerReadyNotifier.WakeUpConfig(enabled = false, timeout = 30 seconds),
)

def channelParams: LocalParams = OpenChannelInterceptor.makeChannelParams(
Expand Down Expand Up @@ -403,7 +403,7 @@ object TestConstants {
),
purgeInvoicesInterval = None,
revokedHtlcInfoCleanerConfig = RevokedHtlcInfoCleaner.Config(10, 100 millis),
peerWakeUpConfig = PeerReadyNotifier.WakeUpConfig(30 seconds),
peerWakeUpConfig = PeerReadyNotifier.WakeUpConfig(enabled = false, timeout = 30 seconds),
)

def channelParams: LocalParams = OpenChannelInterceptor.makeChannelParams(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,13 +22,13 @@ import akka.actor.typed.eventstream.EventStream
import akka.actor.typed.receptionist.Receptionist
import akka.actor.typed.scaladsl.adapter.{ClassicActorRefOps, TypedActorRefOps}
import akka.testkit.TestProbe
import com.softwaremill.quicklens.ModifyPimp
import com.typesafe.config.ConfigFactory
import fr.acinq.bitcoin.scalacompat.Crypto.PublicKey
import fr.acinq.eclair.TestConstants.{Alice, Bob}
import fr.acinq.eclair.channel.Register
import fr.acinq.eclair.io.MessageRelay._
import fr.acinq.eclair.io.Peer.{PeerInfo, PeerNotFound}
import fr.acinq.eclair.io.PeerReadyNotifier.WakeUpConfig
import fr.acinq.eclair.io.Switchboard.GetPeerInfo
import fr.acinq.eclair.message.OnionMessages
import fr.acinq.eclair.message.OnionMessages.{IntermediateNode, Recipient}
Expand All @@ -45,6 +45,7 @@ class MessageRelaySpec extends ScalaTestWithActorTestKit(ConfigFactory.load("app
val aliceId: PublicKey = Alice.nodeParams.nodeId
val bobId: PublicKey = Bob.nodeParams.nodeId

val wakeUpEnabled = "wake_up_enabled"
val wakeUpTimeout = "wake_up_timeout"

case class FixtureParam(relay: ActorRef[Command], switchboard: TestProbe, register: TestProbe, router: TypedProbe[Router.GetNodeId], peerConnection: TypedProbe[Nothing], peer: TypedProbe[Peer.RelayOnionMessage], peerReadyManager: TestProbe, probe: TypedProbe[Status])
Expand All @@ -59,7 +60,9 @@ class MessageRelaySpec extends ScalaTestWithActorTestKit(ConfigFactory.load("app
val peerConnection = TypedProbe[Nothing]("peerConnection")
val peer = TypedProbe[Peer.RelayOnionMessage]("peer")
val probe = TypedProbe[Status]("probe")
val nodeParams = if (test.tags.contains(wakeUpTimeout)) Alice.nodeParams.copy(peerWakeUpConfig = WakeUpConfig(100 millis)) else Alice.nodeParams
val nodeParams = Alice.nodeParams
.modify(_.peerWakeUpConfig.enabled).setToIf(test.tags.contains(wakeUpEnabled))(true)
.modify(_.peerWakeUpConfig.timeout).setToIf(test.tags.contains(wakeUpTimeout))(100 millis)
val relay = testKit.spawn(MessageRelay(nodeParams, switchboard.ref, register.ref, router.ref))
try {
withFixture(test.toNoArgTest(FixtureParam(relay, switchboard, register, router, peerConnection, peer, peerReadyManager, probe)))
Expand Down Expand Up @@ -96,7 +99,7 @@ class MessageRelaySpec extends ScalaTestWithActorTestKit(ConfigFactory.load("app
assert(peer.expectMessageType[Peer.RelayOnionMessage].msg == message)
}

test("relay after waking up next node") { f =>
test("relay after waking up next node", Tag(wakeUpEnabled)) { f =>
import f._

val Right(message) = OnionMessages.buildMessage(randomKey(), randomKey(), Seq(), Recipient(bobId, None), TlvStream.empty)
Expand Down Expand Up @@ -126,7 +129,7 @@ class MessageRelaySpec extends ScalaTestWithActorTestKit(ConfigFactory.load("app
probe.expectMessage(ConnectionFailure(messageId, PeerConnection.ConnectionResult.NoAddressFound))
}

test("can't wake up next node", Tag(wakeUpTimeout)) { f =>
test("can't wake up next node", Tag(wakeUpEnabled), Tag(wakeUpTimeout)) { f =>
import f._

val Right(message) = OnionMessages.buildMessage(randomKey(), randomKey(), Seq(), Recipient(bobId, None), TlvStream.empty)
Expand Down
Original file line number Diff line number Diff line change
@@ -1,10 +1,11 @@
package fr.acinq.eclair.payment.relay

import akka.actor.testkit.typed.scaladsl.{ScalaTestWithActorTestKit, TestProbe}
import akka.actor.typed.ActorRef
import akka.actor.typed.eventstream.EventStream
import akka.actor.typed.receptionist.Receptionist
import akka.actor.typed.scaladsl.Behaviors
import akka.actor.typed.scaladsl.adapter.TypedActorRefOps
import akka.actor.typed.{ActorRef, Behavior}
import com.typesafe.config.ConfigFactory
import fr.acinq.bitcoin.scalacompat.ByteVector32
import fr.acinq.bitcoin.scalacompat.Crypto.PublicKey
Expand All @@ -21,21 +22,31 @@ import scala.concurrent.duration.DurationInt

class AsyncPaymentTriggererSpec extends ScalaTestWithActorTestKit(ConfigFactory.load("application")) with FixtureAnyFunSuiteLike {

case class FixtureParam(remoteNodeId: PublicKey, peerReadyManager: TestProbe[PeerReadyManager.Register], switchboard: TestProbe[Switchboard.GetPeerInfo], peer: TestProbe[Peer.GetPeerChannels], probe: TestProbe[Result], triggerer: ActorRef[Command])
case class FixtureParam(remoteNodeId: PublicKey, switchboard: TestProbe[Switchboard.GetPeerInfo], peer: TestProbe[Peer.GetPeerChannels], probe: TestProbe[Result], triggerer: ActorRef[Command])

object DummyPeerReadyManager {
def apply(): Behavior[PeerReadyManager.Command] = {
Behaviors.receiveMessagePartial {
case PeerReadyManager.Register(replyTo, remoteNodeId) =>
replyTo ! PeerReadyManager.Registered(remoteNodeId, otherAttempts = 0)
Behaviors.same
}
}
}

override def withFixture(test: OneArgTest): Outcome = {
val remoteNodeId = TestConstants.Alice.nodeParams.nodeId
val peerReadyManager = TestProbe[PeerReadyManager.Register]("peer-ready-manager")
system.receptionist ! Receptionist.Register(PeerReadyManager.PeerReadyManagerServiceKey, peerReadyManager.ref)
val peerReadyManager = testKit.spawn(DummyPeerReadyManager())
system.receptionist ! Receptionist.Register(PeerReadyManager.PeerReadyManagerServiceKey, peerReadyManager)
val switchboard = TestProbe[Switchboard.GetPeerInfo]("switchboard")
system.receptionist ! Receptionist.Register(Switchboard.SwitchboardServiceKey, switchboard.ref)
val peer = TestProbe[Peer.GetPeerChannels]("peer")
val probe = TestProbe[Result]()
val triggerer = testKit.spawn(AsyncPaymentTriggerer())
try {
withFixture(test.toNoArgTest(FixtureParam(remoteNodeId, peerReadyManager, switchboard, peer, probe, triggerer)))
withFixture(test.toNoArgTest(FixtureParam(remoteNodeId, switchboard, peer, probe, triggerer)))
} finally {
system.receptionist ! Receptionist.Deregister(PeerReadyManager.PeerReadyManagerServiceKey, peerReadyManager.ref)
system.receptionist ! Receptionist.Deregister(PeerReadyManager.PeerReadyManagerServiceKey, peerReadyManager)
system.receptionist ! Receptionist.Deregister(Switchboard.SwitchboardServiceKey, switchboard.ref)
}
}
Expand All @@ -44,7 +55,6 @@ class AsyncPaymentTriggererSpec extends ScalaTestWithActorTestKit(ConfigFactory.
import f._

triggerer ! Watch(probe.ref, remoteNodeId, paymentHash = ByteVector32.Zeroes, timeout = BlockHeight(100))
peerReadyManager.expectMessageType[PeerReadyManager.Register].replyTo ! PeerReadyManager.Registered(remoteNodeId, otherAttempts = 0)
assert(switchboard.expectMessageType[GetPeerInfo].remoteNodeId == remoteNodeId)

// We haven't reached the timeout yet.
Expand All @@ -64,7 +74,6 @@ class AsyncPaymentTriggererSpec extends ScalaTestWithActorTestKit(ConfigFactory.
import f._

triggerer ! Watch(probe.ref, remoteNodeId, paymentHash = ByteVector32.Zeroes, timeout = BlockHeight(100))
peerReadyManager.expectMessageType[PeerReadyManager.Register].replyTo ! PeerReadyManager.Registered(remoteNodeId, otherAttempts = 0)
assert(switchboard.expectMessageType[GetPeerInfo].remoteNodeId == remoteNodeId)

// cancel of an unwatched payment does nothing
Expand All @@ -80,7 +89,6 @@ class AsyncPaymentTriggererSpec extends ScalaTestWithActorTestKit(ConfigFactory.

// create two identical watches
triggerer ! Watch(probe.ref, remoteNodeId, paymentHash = ByteVector32.Zeroes, timeout = BlockHeight(100))
peerReadyManager.expectMessageType[PeerReadyManager.Register].replyTo ! PeerReadyManager.Registered(remoteNodeId, otherAttempts = 0)
assert(switchboard.expectMessageType[GetPeerInfo].remoteNodeId == remoteNodeId)
triggerer ! Watch(probe.ref, remoteNodeId, paymentHash = ByteVector32.Zeroes, timeout = BlockHeight(100))

Expand All @@ -92,7 +100,6 @@ class AsyncPaymentTriggererSpec extends ScalaTestWithActorTestKit(ConfigFactory.
// create two different watches
val probe2 = TestProbe[Result]()
triggerer ! Watch(probe.ref, remoteNodeId, paymentHash = ByteVector32.Zeroes, timeout = BlockHeight(100))
peerReadyManager.expectMessageType[PeerReadyManager.Register].replyTo ! PeerReadyManager.Registered(remoteNodeId, otherAttempts = 1)
assert(switchboard.expectMessageType[GetPeerInfo].remoteNodeId == remoteNodeId)
triggerer ! Watch(probe2.ref, remoteNodeId, paymentHash = ByteVector32.Zeroes, timeout = BlockHeight(100))

Expand All @@ -108,7 +115,6 @@ class AsyncPaymentTriggererSpec extends ScalaTestWithActorTestKit(ConfigFactory.
// create watches for two payments with the same payment hash
val probe2 = TestProbe[Result]()
triggerer ! Watch(probe.ref, remoteNodeId, paymentHash = ByteVector32.Zeroes, timeout = BlockHeight(100))
peerReadyManager.expectMessageType[PeerReadyManager.Register].replyTo ! PeerReadyManager.Registered(remoteNodeId, otherAttempts = 0)
assert(switchboard.expectMessageType[GetPeerInfo].remoteNodeId == remoteNodeId)
triggerer ! Watch(probe2.ref, remoteNodeId, paymentHash = ByteVector32.Zeroes, timeout = BlockHeight(100))

Expand All @@ -122,7 +128,6 @@ class AsyncPaymentTriggererSpec extends ScalaTestWithActorTestKit(ConfigFactory.
import f._

triggerer ! Watch(probe.ref, remoteNodeId, paymentHash = ByteVector32.Zeroes, timeout = BlockHeight(100))
peerReadyManager.expectMessageType[PeerReadyManager.Register].replyTo ! PeerReadyManager.Registered(remoteNodeId, otherAttempts = 0)
val request1 = switchboard.expectMessageType[Switchboard.GetPeerInfo]
request1.replyTo ! Peer.PeerInfo(peer.ref.toClassic, remoteNodeId, Peer.DISCONNECTED, None, Set(TestProbe().ref.toClassic))

Expand All @@ -146,7 +151,6 @@ class AsyncPaymentTriggererSpec extends ScalaTestWithActorTestKit(ConfigFactory.
import f._

triggerer ! Watch(probe.ref, remoteNodeId, paymentHash = ByteVector32.Zeroes, timeout = BlockHeight(100))
peerReadyManager.expectMessageType[PeerReadyManager.Register].replyTo ! PeerReadyManager.Registered(remoteNodeId, otherAttempts = 0)
val request1 = switchboard.expectMessageType[Switchboard.GetPeerInfo]
request1.replyTo ! Peer.PeerInfo(peer.ref.toClassic, remoteNodeId, Peer.DISCONNECTED, None, Set(TestProbe().ref.toClassic))

Expand All @@ -172,15 +176,13 @@ class AsyncPaymentTriggererSpec extends ScalaTestWithActorTestKit(ConfigFactory.

// watch remote node
triggerer ! Watch(probe.ref, remoteNodeId, paymentHash = ByteVector32.Zeroes, timeout = BlockHeight(100))
peerReadyManager.expectMessageType[PeerReadyManager.Register].replyTo ! PeerReadyManager.Registered(remoteNodeId, otherAttempts = 0)
val request1 = switchboard.expectMessageType[Switchboard.GetPeerInfo]
request1.replyTo ! Peer.PeerInfo(peer.ref.toClassic, remoteNodeId, Peer.DISCONNECTED, None, Set(TestProbe().ref.toClassic))

// watch another remote node
val remoteNodeId2 = TestConstants.Bob.nodeParams.nodeId
val probe2 = TestProbe[Result]()
triggerer ! Watch(probe2.ref, remoteNodeId2, paymentHash = ByteVector32.Zeroes, timeout = BlockHeight(101))
peerReadyManager.expectMessageType[PeerReadyManager.Register].replyTo ! PeerReadyManager.Registered(remoteNodeId2, otherAttempts = 0)
val request2 = switchboard.expectMessageType[Switchboard.GetPeerInfo]
request2.replyTo ! Peer.PeerInfo(peer.ref.toClassic, remoteNodeId2, Peer.DISCONNECTED, None, Set(TestProbe().ref.toClassic))

Expand All @@ -204,8 +206,8 @@ class AsyncPaymentTriggererSpec extends ScalaTestWithActorTestKit(ConfigFactory.

test("triggerer treats an unexpected stop of the notifier as a cancel") { f =>
import f._

triggerer ! Watch(probe.ref, remoteNodeId, paymentHash = ByteVector32.Zeroes, timeout = BlockHeight(100))
peerReadyManager.expectMessageType[PeerReadyManager.Register].replyTo ! PeerReadyManager.Registered(remoteNodeId, otherAttempts = 0)
assert(switchboard.expectMessageType[GetPeerInfo].remoteNodeId == remoteNodeId)

triggerer ! NotifierStopped(remoteNodeId)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,6 @@ import fr.acinq.eclair.TestConstants.emptyOnionPacket
import fr.acinq.eclair.blockchain.fee.FeeratePerKw
import fr.acinq.eclair.channel._
import fr.acinq.eclair.crypto.Sphinx
import fr.acinq.eclair.io.PeerReadyNotifier.WakeUpConfig
import fr.acinq.eclair.io.{Peer, PeerReadyManager, Switchboard}
import fr.acinq.eclair.payment.IncomingPaymentPacket.ChannelRelayPacket
import fr.acinq.eclair.payment.relay.ChannelRelayer._
Expand All @@ -52,13 +51,16 @@ class ChannelRelayerSpec extends ScalaTestWithActorTestKit(ConfigFactory.load("a

import ChannelRelayerSpec._

val wakeUpEnabled = "wake_up_enabled"
val wakeUpTimeout = "wake_up_timeout"

case class FixtureParam(nodeParams: NodeParams, channelRelayer: typed.ActorRef[ChannelRelayer.Command], register: TestProbe[Any])

override def withFixture(test: OneArgTest): Outcome = {
// we are node B in the route A -> B -> C -> ....
val nodeParams = if (test.tags.contains(wakeUpTimeout)) TestConstants.Bob.nodeParams.copy(peerWakeUpConfig = WakeUpConfig(100 millis)) else TestConstants.Bob.nodeParams
val nodeParams = TestConstants.Bob.nodeParams
.modify(_.peerWakeUpConfig.enabled).setToIf(test.tags.contains(wakeUpEnabled))(true)
.modify(_.peerWakeUpConfig.timeout).setToIf(test.tags.contains(wakeUpTimeout))(100 millis)
val register = TestProbe[Any]("register")
val channelRelayer = testKit.spawn(ChannelRelayer.apply(nodeParams, register.ref.toClassic))
try {
Expand Down Expand Up @@ -173,7 +175,7 @@ class ChannelRelayerSpec extends ScalaTestWithActorTestKit(ConfigFactory.load("a
expectFwdAdd(register, channelIds(realScid1), outgoingAmount, outgoingExpiry, 7)
}

test("relay blinded payment (wake up wallet node)") { f =>
test("relay blinded payment (wake up wallet node)", Tag(wakeUpEnabled)) { f =>
import f._

val peerReadyManager = TestProbe[PeerReadyManager.Register]()
Expand Down Expand Up @@ -328,7 +330,7 @@ class ChannelRelayerSpec extends ScalaTestWithActorTestKit(ConfigFactory.load("a
}
}

test("fail to relay blinded payment (cannot wake up remote node)", Tag(wakeUpTimeout)) { f =>
test("fail to relay blinded payment (cannot wake up remote node)", Tag(wakeUpEnabled), Tag(wakeUpTimeout)) { f =>
import f._

val peerReadyManager = TestProbe[PeerReadyManager.Register]()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -67,6 +67,7 @@ class NodeRelayerSpec extends ScalaTestWithActorTestKit(ConfigFactory.load("appl

import NodeRelayerSpec._

val wakeUpEnabled = "wake_up_enabled"
val wakeUpTimeout = "wake_up_timeout"

case class FixtureParam(nodeParams: NodeParams, router: TestProbe[Any], register: TestProbe[Any], mockPayFSM: TestProbe[Any], eventListener: TestProbe[PaymentEvent]) {
Expand Down Expand Up @@ -97,6 +98,7 @@ class NodeRelayerSpec extends ScalaTestWithActorTestKit(ConfigFactory.load("appl
val nodeParams = TestConstants.Bob.nodeParams
.modify(_.multiPartPaymentExpiry).setTo(5 seconds)
.modify(_.relayParams.asyncPaymentsParams.holdTimeoutBlocks).setToIf(test.tags.contains("long_hold_timeout"))(200000) // timeout after payment expires
.modify(_.peerWakeUpConfig.enabled).setToIf(test.tags.contains(wakeUpEnabled))(true)
.modify(_.peerWakeUpConfig.timeout).setToIf(test.tags.contains(wakeUpTimeout))(100 millis)
val router = TestProbe[Any]("router")
val register = TestProbe[Any]("register")
Expand Down Expand Up @@ -787,7 +789,7 @@ class NodeRelayerSpec extends ScalaTestWithActorTestKit(ConfigFactory.load("appl
register.expectNoMessage(100 millis)
}

test("relay to blinded path with wake-up") { f =>
test("relay to blinded path with wake-up", Tag(wakeUpEnabled)) { f =>
import f._

val peerReadyManager = TestProbe[PeerReadyManager.Register]()
Expand Down Expand Up @@ -833,7 +835,7 @@ class NodeRelayerSpec extends ScalaTestWithActorTestKit(ConfigFactory.load("appl
register.expectNoMessage(100 millis)
}

test("fail to relay to blinded path when wake-up fails", Tag(wakeUpTimeout)) { f =>
test("fail to relay to blinded path when wake-up fails", Tag(wakeUpEnabled), Tag(wakeUpTimeout)) { f =>
import f._

val peerReadyManager = TestProbe[PeerReadyManager.Register]()
Expand Down

0 comments on commit 3e23b85

Please sign in to comment.