Skip to content

Commit

Permalink
Wait for connection before sending a message
Browse files Browse the repository at this point in the history
  • Loading branch information
Sheikah45 committed Oct 13, 2023
1 parent 2736117 commit eb9cbfc
Showing 1 changed file with 9 additions and 4 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -65,6 +65,8 @@ class FafLobbyClient(
private val rawEvents = eventSink.asFlux().publish().autoConnect()
private val connectionStatusSink: Sinks.Many<ConnectionStatus> = Sinks.many().unicast().onBackpressureBuffer()
override val connectionStatus = connectionStatusSink.asFlux().publish().autoConnect()
private val connectionAcquiredSink: Sinks.Many<Any> = Sinks.many().unicast().onBackpressureBuffer()
private val connectionAcquired: Flux<Any> = connectionAcquiredSink.asFlux().publish().autoConnect()

override val events = rawEvents.filter {
it !is ServerPingMessage &&
Expand All @@ -84,15 +86,16 @@ class FafLobbyClient(
.then(Mono.delay(Duration.ofSeconds(10)))
.timeout(Duration.ofMinutes(1), Mono.empty()))
.doOnError(LoginException::class.java) { kicked = true }
.doOnSubscribe {
.doFirst {
prepareAuthenticateOnNextSession()
send(SessionRequest(config.version, config.userAgent))
}

private val loginMono = Mono.defer {
connectionDisposable?.dispose()
connectionDisposable = openConnection()
loginResponseMono.retryWhen(createRetrySpec(config))
connectionAcquired.next().then(loginResponseMono.retryWhen(createRetrySpec(config))).doFirst {
connectionDisposable?.dispose()
connectionDisposable = openConnection()
}
}
.doOnError { LOG.error("Error during connection", it); connection?.dispose() }
.doOnCancel { LOG.debug("Login cancelled"); disconnect() }
Expand Down Expand Up @@ -122,6 +125,7 @@ class FafLobbyClient(
val address = it.channel().remoteAddress() as InetSocketAddress
LOG.info("Connected to {} on port {}", address.hostName, address.port)
connection = it
connectionAcquiredSink.emitNext(true, retrySerialFailure)
}.doOnDisconnected {
LOG.info("Disconnected from server")
it.dispose()
Expand Down Expand Up @@ -285,6 +289,7 @@ class FafLobbyClient(
}

private fun send(message: ClientMessage) {
LOG.trace("Sending {}", message)
outboundSink.emitNext(message, retrySerialFailure)
}

Expand Down

0 comments on commit eb9cbfc

Please sign in to comment.