Skip to content

Commit

Permalink
Rework node relay FSM flow
Browse files Browse the repository at this point in the history
We refactor `NodeRelay.scala` to re-order some steps, without making
meaningful functional changes. The steps are:

1. Fully receive the incoming payment
2. Resolve the next node (unwrap blinded paths if needed)
3. Wake-up the next node if necessary (mobile wallet)
4. Relay outgoing payment

Note that we introduce a wake-up step, that will be enriched in future
commits and can be extended to include mobile notifications. The file
is now also easier to follow, as steps are done linearly by simply
scrolling down.
  • Loading branch information
t-bast committed Jun 18, 2024
1 parent b53d116 commit 66078aa
Show file tree
Hide file tree
Showing 10 changed files with 116 additions and 227 deletions.
2 changes: 1 addition & 1 deletion eclair-core/src/main/scala/fr/acinq/eclair/Setup.scala
Original file line number Diff line number Diff line change
Expand Up @@ -360,7 +360,7 @@ class Setup(val datadir: File,
offerManager = system.spawn(Behaviors.supervise(OfferManager(nodeParams, router, paymentTimeout = 1 minute)).onFailure(typed.SupervisorStrategy.resume), name = "offer-manager")
paymentHandler = system.actorOf(SimpleSupervisor.props(PaymentHandler.props(nodeParams, register, offerManager), "payment-handler", SupervisorStrategy.Resume))
triggerer = system.spawn(Behaviors.supervise(AsyncPaymentTriggerer()).onFailure(typed.SupervisorStrategy.resume), name = "async-payment-triggerer")
relayer = system.actorOf(SimpleSupervisor.props(Relayer.props(nodeParams, router, register, paymentHandler, triggerer, Some(postRestartCleanUpInitialized)), "relayer", SupervisorStrategy.Resume))
relayer = system.actorOf(SimpleSupervisor.props(Relayer.props(nodeParams, router, register, paymentHandler, Some(postRestartCleanUpInitialized)), "relayer", SupervisorStrategy.Resume))
_ = relayer ! PostRestartHtlcCleaner.Init(channels)
// Before initializing the switchboard (which re-connects us to the network) and the user-facing parts of the system,
// we want to make sure the handler for post-restart broken HTLCs has finished initializing.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -88,7 +88,11 @@ object PeerReadyNotifier {
context.log.error("no switchboard found")
replyTo ! PeerUnavailable(remoteNodeId)
Behaviors.stopped
}
}
case Timeout =>
context.log.info("timed out finding switchboard actor")
replyTo ! PeerUnavailable(remoteNodeId)
Behaviors.stopped
}
}

Expand Down

Large diffs are not rendered by default.

Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,6 @@

package fr.acinq.eclair.payment.relay

import akka.actor.typed
import akka.actor.typed.scaladsl.Behaviors
import akka.actor.typed.{ActorRef, Behavior}
import fr.acinq.bitcoin.scalacompat.ByteVector32
Expand Down Expand Up @@ -57,7 +56,7 @@ object NodeRelayer {
* NB: the payment secret used here is different from the invoice's payment secret and ensures we can
* group together HTLCs that the previous trampoline node sent in the same MPP.
*/
def apply(nodeParams: NodeParams, register: akka.actor.ActorRef, outgoingPaymentFactory: NodeRelay.OutgoingPaymentFactory, triggerer: typed.ActorRef[AsyncPaymentTriggerer.Command], router: akka.actor.ActorRef, children: Map[PaymentKey, ActorRef[NodeRelay.Command]] = Map.empty): Behavior[Command] =
def apply(nodeParams: NodeParams, register: akka.actor.ActorRef, outgoingPaymentFactory: NodeRelay.OutgoingPaymentFactory, router: akka.actor.ActorRef, children: Map[PaymentKey, ActorRef[NodeRelay.Command]] = Map.empty): Behavior[Command] =
Behaviors.setup { context =>
Behaviors.withMdc(Logs.mdc(category_opt = Some(Logs.LogCategory.PAYMENT)), mdc) {
Behaviors.receiveMessage {
Expand All @@ -72,15 +71,15 @@ object NodeRelayer {
case None =>
val relayId = UUID.randomUUID()
context.log.debug(s"spawning a new handler with relayId=$relayId")
val handler = context.spawn(NodeRelay.apply(nodeParams, context.self, register, relayId, nodeRelayPacket, outgoingPaymentFactory, triggerer, router), relayId.toString)
val handler = context.spawn(NodeRelay.apply(nodeParams, context.self, register, relayId, nodeRelayPacket, outgoingPaymentFactory, router), relayId.toString)
context.log.debug("forwarding incoming htlc #{} from channel {} to new handler", htlcIn.id, htlcIn.channelId)
handler ! NodeRelay.Relay(nodeRelayPacket)
apply(nodeParams, register, outgoingPaymentFactory, triggerer, router, children + (childKey -> handler))
apply(nodeParams, register, outgoingPaymentFactory, router, children + (childKey -> handler))
}
case RelayComplete(childHandler, paymentHash, paymentSecret) =>
// we do a back-and-forth between parent and child before stopping the child to prevent a race condition
childHandler ! NodeRelay.Stop
apply(nodeParams, register, outgoingPaymentFactory, triggerer, router, children - PaymentKey(paymentHash, paymentSecret))
apply(nodeParams, register, outgoingPaymentFactory, router, children - PaymentKey(paymentHash, paymentSecret))
case GetPendingPayments(replyTo) =>
replyTo ! children
Behaviors.same
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -49,7 +49,7 @@ import scala.util.Random
* It also receives channel HTLC events (fulfill / failed) and relays those to the appropriate handlers.
* It also maintains an up-to-date view of local channel balances.
*/
class Relayer(nodeParams: NodeParams, router: ActorRef, register: ActorRef, paymentHandler: ActorRef, triggerer: typed.ActorRef[AsyncPaymentTriggerer.Command], initialized: Option[Promise[Done]] = None) extends Actor with DiagnosticActorLogging {
class Relayer(nodeParams: NodeParams, router: ActorRef, register: ActorRef, paymentHandler: ActorRef, initialized: Option[Promise[Done]] = None) extends Actor with DiagnosticActorLogging {

import Relayer._

Expand All @@ -58,7 +58,7 @@ class Relayer(nodeParams: NodeParams, router: ActorRef, register: ActorRef, paym

private val postRestartCleaner = context.actorOf(PostRestartHtlcCleaner.props(nodeParams, register, initialized), "post-restart-htlc-cleaner")
private val channelRelayer = context.spawn(Behaviors.supervise(ChannelRelayer(nodeParams, register)).onFailure(SupervisorStrategy.resume), "channel-relayer")
private val nodeRelayer = context.spawn(Behaviors.supervise(NodeRelayer(nodeParams, register, NodeRelay.SimpleOutgoingPaymentFactory(nodeParams, router, register), triggerer, router)).onFailure(SupervisorStrategy.resume), name = "node-relayer")
private val nodeRelayer = context.spawn(Behaviors.supervise(NodeRelayer(nodeParams, register, NodeRelay.SimpleOutgoingPaymentFactory(nodeParams, router, register), router)).onFailure(SupervisorStrategy.resume), name = "node-relayer")

def receive: Receive = {
case init: PostRestartHtlcCleaner.Init => postRestartCleaner forward init
Expand Down Expand Up @@ -120,8 +120,8 @@ class Relayer(nodeParams: NodeParams, router: ActorRef, register: ActorRef, paym

object Relayer extends Logging {

def props(nodeParams: NodeParams, router: ActorRef, register: ActorRef, paymentHandler: ActorRef, triggerer: typed.ActorRef[AsyncPaymentTriggerer.Command], initialized: Option[Promise[Done]] = None): Props =
Props(new Relayer(nodeParams, router, register, paymentHandler, triggerer, initialized))
def props(nodeParams: NodeParams, router: ActorRef, register: ActorRef, paymentHandler: ActorRef, initialized: Option[Promise[Done]] = None): Props =
Props(new Relayer(nodeParams, router, register, paymentHandler, initialized))

// @formatter:off
case class RelayFees(feeBase: MilliSatoshi, feeProportionalMillionths: Long) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -68,8 +68,8 @@ class FuzzySpec extends TestKitBaseClass with FixtureAnyFunSuiteLike with Channe
val bobRegister = system.actorOf(Props(new TestRegister()))
val alicePaymentHandler = system.actorOf(Props(new PaymentHandler(aliceParams, aliceRegister, TestProbe().ref)))
val bobPaymentHandler = system.actorOf(Props(new PaymentHandler(bobParams, bobRegister, TestProbe().ref)))
val aliceRelayer = system.actorOf(Relayer.props(aliceParams, TestProbe().ref, aliceRegister, alicePaymentHandler, TestProbe().ref))
val bobRelayer = system.actorOf(Relayer.props(bobParams, TestProbe().ref, bobRegister, bobPaymentHandler, TestProbe().ref))
val aliceRelayer = system.actorOf(Relayer.props(aliceParams, TestProbe().ref, aliceRegister, alicePaymentHandler))
val bobRelayer = system.actorOf(Relayer.props(bobParams, TestProbe().ref, bobRegister, bobPaymentHandler))
val wallet = new DummyOnChainWallet()
val alice: TestFSMRef[ChannelState, ChannelData, Channel] = TestFSMRef(new Channel(aliceParams, wallet, bobParams.nodeId, alice2blockchain.ref, aliceRelayer, FakeTxPublisherFactory(alice2blockchain)), alicePeer.ref)
val bob: TestFSMRef[ChannelState, ChannelData, Channel] = TestFSMRef(new Channel(bobParams, wallet, aliceParams.nodeId, bob2blockchain.ref, bobRelayer, FakeTxPublisherFactory(bob2blockchain)), bobPeer.ref)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -90,13 +90,12 @@ object MinimalNodeFixture extends Assertions with Eventually with IntegrationPat
val bitcoinClient = new TestBitcoinCoreClient()
val wallet = new SingleKeyOnChainWallet()
val watcher = TestProbe("watcher")
val triggerer = TestProbe("payment-triggerer")
val watcherTyped = watcher.ref.toTyped[ZmqWatcher.Command]
val register = system.actorOf(Register.props(), "register")
val router = system.actorOf(Router.props(nodeParams, watcherTyped), "router")
val offerManager = system.spawn(OfferManager(nodeParams, router, 1 minute), "offer-manager")
val paymentHandler = system.actorOf(PaymentHandler.props(nodeParams, register, offerManager), "payment-handler")
val relayer = system.actorOf(Relayer.props(nodeParams, router, register, paymentHandler, triggerer.ref.toTyped), "relayer")
val relayer = system.actorOf(Relayer.props(nodeParams, router, register, paymentHandler), "relayer")
val txPublisherFactory = Channel.SimpleTxPublisherFactory(nodeParams, watcherTyped, bitcoinClient)
val channelFactory = Peer.SimpleChannelFactory(nodeParams, watcherTyped, relayer, wallet, txPublisherFactory)
val pendingChannelsRateLimiter = system.spawnAnonymous(Behaviors.supervise(PendingChannelsRateLimiter(nodeParams, router.toTyped, Seq())).onFailure(typed.SupervisorStrategy.resume))
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -57,7 +57,7 @@ class PostRestartHtlcCleanerSpec extends TestKitBaseClass with FixtureAnyFunSuit

case class FixtureParam(nodeParams: NodeParams, register: TestProbe, sender: TestProbe, eventListener: TestProbe) {
def createRelayer(nodeParams1: NodeParams): (ActorRef, ActorRef) = {
val relayer = system.actorOf(Relayer.props(nodeParams1, TestProbe().ref, register.ref, TestProbe().ref, TestProbe().ref.toTyped))
val relayer = system.actorOf(Relayer.props(nodeParams1, TestProbe().ref, register.ref, TestProbe().ref))
// we need ensure the post-htlc-restart child actor is initialized
sender.send(relayer, Relayer.GetChildActors(sender.ref))
(relayer, sender.expectMsgType[Relayer.ChildActors].postRestartCleaner)
Expand Down
Loading

0 comments on commit 66078aa

Please sign in to comment.