Skip to content

Commit

Permalink
Track pending PeerReadyNotifier instances
Browse files Browse the repository at this point in the history
It can be useful to track pending `PeerReadyNotifier` instances to avoid
performing duplicate actions when multiple `PeerReadyNotifier` spawn for
the same peer (e.g. sending a mobile notification).

When a `PeerReadyNotifier` actor is started, it registers itself into a
singleton `PeerReadyManager`, which tells it whether there are other
pending attempts for the same peer.
  • Loading branch information
t-bast committed Aug 26, 2024
1 parent aad3226 commit 5613204
Show file tree
Hide file tree
Showing 4 changed files with 186 additions and 28 deletions.
1 change: 1 addition & 0 deletions eclair-core/src/main/scala/fr/acinq/eclair/Setup.scala
Original file line number Diff line number Diff line change
Expand Up @@ -360,6 +360,7 @@ class Setup(val datadir: File,
offerManager = system.spawn(Behaviors.supervise(OfferManager(nodeParams, router, paymentTimeout = 1 minute)).onFailure(typed.SupervisorStrategy.resume), name = "offer-manager")
paymentHandler = system.actorOf(SimpleSupervisor.props(PaymentHandler.props(nodeParams, register, offerManager), "payment-handler", SupervisorStrategy.Resume))
triggerer = system.spawn(Behaviors.supervise(AsyncPaymentTriggerer()).onFailure(typed.SupervisorStrategy.resume), name = "async-payment-triggerer")
peerReadyManager = system.spawn(Behaviors.supervise(PeerReadyManager()).onFailure(typed.SupervisorStrategy.restart), name = "peer-ready-manager")
relayer = system.actorOf(SimpleSupervisor.props(Relayer.props(nodeParams, router, register, paymentHandler, Some(postRestartCleanUpInitialized)), "relayer", SupervisorStrategy.Resume))
_ = relayer ! PostRestartHtlcCleaner.Init(channels)
// Before initializing the switchboard (which re-connects us to the network) and the user-facing parts of the system,
Expand Down
142 changes: 116 additions & 26 deletions eclair-core/src/main/scala/fr/acinq/eclair/io/PeerReadyNotifier.scala
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@
package fr.acinq.eclair.io

import akka.actor.typed.eventstream.EventStream
import akka.actor.typed.receptionist.Receptionist
import akka.actor.typed.receptionist.{Receptionist, ServiceKey}
import akka.actor.typed.scaladsl.adapter.{ClassicActorRefOps, TypedActorRefOps}
import akka.actor.typed.scaladsl.{ActorContext, Behaviors, TimerScheduler}
import akka.actor.typed.{ActorRef, Behavior, SupervisorStrategy}
Expand All @@ -27,9 +27,73 @@ import fr.acinq.eclair.{BlockHeight, Logs, channel}

import scala.concurrent.duration.{DurationInt, FiniteDuration}

/**
* This actor tracks the set of pending [[PeerReadyNotifier]].
* It can be used to ensure that notifications are only sent once, even if there are multiple parallel operations
* waiting for that peer to come online.
*/
object PeerReadyManager {

val PeerReadyManagerServiceKey: ServiceKey[Register] = ServiceKey[Register]("peer-ready-manager")

// @formatter:off
sealed trait Command
case class Register(replyTo: ActorRef[Registered], remoteNodeId: PublicKey) extends Command
case class List(replyTo: ActorRef[Set[PublicKey]]) extends Command
private case class Completed(remoteNodeId: PublicKey, actor: ActorRef[Registered]) extends Command
// @formatter:on

/**
* @param otherAttempts number of already pending [[PeerReadyNotifier]] instances for that peer.
*/
case class Registered(remoteNodeId: PublicKey, otherAttempts: Int)

def apply(): Behavior[Command] = {
Behaviors.setup { context =>
context.system.receptionist ! Receptionist.Register(PeerReadyManagerServiceKey, context.self)
watch(Map.empty, context)
}
}

private def watch(pending: Map[PublicKey, Set[ActorRef[Registered]]], context: ActorContext[Command]): Behavior[Command] = {
Behaviors.receiveMessage {
case Register(replyTo, remoteNodeId) =>
context.watchWith(replyTo, Completed(remoteNodeId, replyTo))
pending.get(remoteNodeId) match {
case Some(attempts) =>
replyTo ! Registered(remoteNodeId, otherAttempts = attempts.size)
val attempts1 = attempts + replyTo
watch(pending + (remoteNodeId -> attempts1), context)
case None =>
replyTo ! Registered(remoteNodeId, otherAttempts = 0)
watch(pending + (remoteNodeId -> Set(replyTo)), context)
}
case Completed(remoteNodeId, actor) =>
pending.get(remoteNodeId) match {
case Some(attempts) =>
val attempts1 = attempts - actor
if (attempts1.isEmpty) {
watch(pending - remoteNodeId, context)
} else {
watch(pending + (remoteNodeId -> attempts1), context)
}
case None =>
Behaviors.same
}
case List(replyTo) =>
replyTo ! pending.keySet
Behaviors.same
}
}

}

/**
* This actor waits for a given peer to be online and ready to process payments.
* It automatically stops after the timeout provided.
* It automatically stops after the timeout provided if the peer doesn't connect.
* There may be multiple instances of this actor running in parallel for the same peer, which is fine because they
* may use different timeouts.
* Having separate actor instances for each caller guarantees that the caller will always receive a response.
*/
object PeerReadyNotifier {

Expand All @@ -39,6 +103,7 @@ object PeerReadyNotifier {
sealed trait Command
case class NotifyWhenPeerReady(replyTo: ActorRef[Result]) extends Command
private final case class WrappedListing(wrapped: Receptionist.Listing) extends Command
private final case class WrappedRegistered(registered: PeerReadyManager.Registered) extends Command
private case object PeerNotConnected extends Command
private case object PeerConnected extends Command
private case object PeerDisconnected extends Command
Expand All @@ -60,24 +125,24 @@ object PeerReadyNotifier {
Behaviors.setup { context =>
Behaviors.withTimers { timers =>
Behaviors.withMdc(Logs.mdc(remoteNodeId_opt = Some(remoteNodeId))) {
Behaviors.receiveMessagePartial {
case NotifyWhenPeerReady(replyTo) =>
timeout_opt.foreach {
case Left(d) => timers.startSingleTimer(Timeout, d)
case Right(h) => context.system.eventStream ! EventStream.Subscribe(context.messageAdapter[CurrentBlockHeight] {
case cbc if h <= cbc.blockHeight => Timeout
case cbc => NewBlockNotTimedOut(cbc.blockHeight)
})
}
// In case the peer is not currently connected, we will wait for them to connect instead of regularly
// polling the switchboard. This makes more sense for long timeouts such as the ones used for async payments.
context.system.eventStream ! EventStream.Subscribe(context.messageAdapter[PeerConnected](e => if (e.nodeId == remoteNodeId) PeerConnected else ToBeIgnored))
context.system.eventStream ! EventStream.Subscribe(context.messageAdapter[PeerDisconnected](e => if (e.nodeId == remoteNodeId) PeerDisconnected else ToBeIgnored))
// The actor should never throw, but for extra safety we wrap it with a supervisor.
Behaviors.supervise {
new PeerReadyNotifier(replyTo, remoteNodeId, context, timers).findSwitchboard()
}.onFailure(SupervisorStrategy.stop)
}
Behaviors.receiveMessagePartial {
case NotifyWhenPeerReady(replyTo) =>
timeout_opt.foreach {
case Left(d) => timers.startSingleTimer(Timeout, d)
case Right(h) => context.system.eventStream ! EventStream.Subscribe(context.messageAdapter[CurrentBlockHeight] {
case cbc if h <= cbc.blockHeight => Timeout
case cbc => NewBlockNotTimedOut(cbc.blockHeight)
})
}
// In case the peer is not currently connected, we will wait for them to connect instead of regularly
// polling the switchboard. This makes more sense for long timeouts such as the ones used for async payments.
context.system.eventStream ! EventStream.Subscribe(context.messageAdapter[PeerConnected](e => if (e.nodeId == remoteNodeId) PeerConnected else ToBeIgnored))
context.system.eventStream ! EventStream.Subscribe(context.messageAdapter[PeerDisconnected](e => if (e.nodeId == remoteNodeId) PeerDisconnected else ToBeIgnored))
// The actor should never throw, but for extra safety we wrap it with a supervisor.
Behaviors.supervise {
new PeerReadyNotifier(replyTo, remoteNodeId, context, timers).register()
}.onFailure(SupervisorStrategy.stop)
}
}
}
}
Expand Down Expand Up @@ -125,13 +190,38 @@ private class PeerReadyNotifier(replyTo: ActorRef[PeerReadyNotifier.Result],

private val log = context.log

private def findSwitchboard(): Behavior[Command] = {
private def register(): Behavior[Command] = {
context.system.receptionist ! Receptionist.Find(PeerReadyManager.PeerReadyManagerServiceKey, context.messageAdapter[Receptionist.Listing](WrappedListing))
Behaviors.receiveMessagePartial {
case WrappedListing(PeerReadyManager.PeerReadyManagerServiceKey.Listing(listings)) =>
listings.headOption match {
case Some(peerReadyManager) =>
peerReadyManager ! PeerReadyManager.Register(context.messageAdapter[PeerReadyManager.Registered](WrappedRegistered), remoteNodeId)
Behaviors.same
case None =>
log.error("no peer-ready-manager found")
replyTo ! PeerUnavailable(remoteNodeId)
Behaviors.stopped
}
case WrappedRegistered(registered) =>
log.info("checking if peer is available ({} other attempts)", registered.otherAttempts)
findSwitchboard(isFirstAttempt = registered.otherAttempts == 0)
case Timeout =>
log.info("timed out finding peer-ready-manager actor")
replyTo ! PeerUnavailable(remoteNodeId)
Behaviors.stopped
case ToBeIgnored =>
Behaviors.same
}
}

private def findSwitchboard(isFirstAttempt: Boolean): Behavior[Command] = {
context.system.receptionist ! Receptionist.Find(Switchboard.SwitchboardServiceKey, context.messageAdapter[Receptionist.Listing](WrappedListing))
Behaviors.receiveMessagePartial {
case WrappedListing(Switchboard.SwitchboardServiceKey.Listing(listings)) =>
listings.headOption match {
case Some(switchboard) =>
waitForPeerConnected(switchboard)
waitForPeerConnected(switchboard, isFirstAttempt)
case None =>
log.error("no switchboard found")
replyTo ! PeerUnavailable(remoteNodeId)
Expand All @@ -146,7 +236,7 @@ private class PeerReadyNotifier(replyTo: ActorRef[PeerReadyNotifier.Result],
}
}

private def waitForPeerConnected(switchboard: ActorRef[Switchboard.GetPeerInfo]): Behavior[Command] = {
private def waitForPeerConnected(switchboard: ActorRef[Switchboard.GetPeerInfo], isFirstAttempt: Boolean): Behavior[Command] = {
val peerInfoAdapter = context.messageAdapter[Peer.PeerInfoResponse] {
// We receive this when we don't have any channel to the given peer and are not currently connected to them.
// In that case we still want to wait for a connection, because we may want to open a channel to them.
Expand All @@ -172,7 +262,7 @@ private class PeerReadyNotifier(replyTo: ActorRef[PeerReadyNotifier.Result],
Behaviors.stopped
} else {
log.debug("peer is connected with {} channels", channelCount)
waitForChannelsReady(peer, switchboard)
waitForChannelsReady(peer, switchboard, isFirstAttempt)
}
case NewBlockNotTimedOut(currentBlockHeight) =>
log.debug("waiting for peer to connect at block {}", currentBlockHeight)
Expand All @@ -186,7 +276,7 @@ private class PeerReadyNotifier(replyTo: ActorRef[PeerReadyNotifier.Result],
}
}

private def waitForChannelsReady(peer: ActorRef[Peer.GetPeerChannels], switchboard: ActorRef[Switchboard.GetPeerInfo]): Behavior[Command] = {
private def waitForChannelsReady(peer: ActorRef[Peer.GetPeerChannels], switchboard: ActorRef[Switchboard.GetPeerInfo], isFirstAttempt: Boolean): Behavior[Command] = {
timers.startTimerWithFixedDelay(ChannelsReadyTimerKey, CheckChannelsReady, initialDelay = 50 millis, delay = 1 second)
Behaviors.receiveMessagePartial {
case CheckChannelsReady =>
Expand All @@ -209,7 +299,7 @@ private class PeerReadyNotifier(replyTo: ActorRef[PeerReadyNotifier.Result],
case PeerDisconnected =>
log.debug("peer disconnected, waiting for them to reconnect")
timers.cancel(ChannelsReadyTimerKey)
waitForPeerConnected(switchboard)
waitForPeerConnected(switchboard, isFirstAttempt)
case Timeout =>
log.info("timed out waiting for channels to be ready")
replyTo ! PeerUnavailable(remoteNodeId)
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,55 @@
/*
* Copyright 2024 ACINQ SAS
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package fr.acinq.eclair.io

import akka.actor.testkit.typed.scaladsl.{ScalaTestWithActorTestKit, TestProbe}
import com.typesafe.config.ConfigFactory
import fr.acinq.bitcoin.scalacompat.Crypto.PublicKey
import fr.acinq.eclair.randomKey
import org.scalatest.funsuite.AnyFunSuiteLike

class PeerReadyManagerSpec extends ScalaTestWithActorTestKit(ConfigFactory.load("application")) with AnyFunSuiteLike {

test("watch pending notifiers") {
val manager = testKit.spawn(PeerReadyManager())
val remoteNodeId1 = randomKey().publicKey
val notifier1a = TestProbe[PeerReadyManager.Registered]()
val notifier1b = TestProbe[PeerReadyManager.Registered]()

manager ! PeerReadyManager.Register(notifier1a.ref, remoteNodeId1)
assert(notifier1a.expectMessageType[PeerReadyManager.Registered].otherAttempts == 0)
manager ! PeerReadyManager.Register(notifier1b.ref, remoteNodeId1)
assert(notifier1b.expectMessageType[PeerReadyManager.Registered].otherAttempts == 1)

val remoteNodeId2 = randomKey().publicKey
val notifier2a = TestProbe[PeerReadyManager.Registered]()
val notifier2b = TestProbe[PeerReadyManager.Registered]()

// Later attempts aren't affected by previously completed attempts.
manager ! PeerReadyManager.Register(notifier2a.ref, remoteNodeId2)
assert(notifier2a.expectMessageType[PeerReadyManager.Registered].otherAttempts == 0)
notifier2a.stop()
val probe = TestProbe[Set[PublicKey]]()
probe.awaitAssert({
manager ! PeerReadyManager.List(probe.ref)
assert(probe.expectMessageType[Set[PublicKey]] == Set(remoteNodeId1))
})
manager ! PeerReadyManager.Register(notifier2b.ref, remoteNodeId2)
assert(notifier2b.expectMessageType[PeerReadyManager.Registered].otherAttempts == 0)
}

}
Loading

0 comments on commit 5613204

Please sign in to comment.