Skip to content

Commit

Permalink
Websocket lobby client (#105)
Browse files Browse the repository at this point in the history
  • Loading branch information
Sheikah45 authored Oct 14, 2023
1 parent 4a3facc commit 472039a
Show file tree
Hide file tree
Showing 5 changed files with 135 additions and 269 deletions.
2 changes: 2 additions & 0 deletions faf-commons-lobby/build.gradle.kts
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,8 @@ tasks.withType<Test> {
showCauses = true
showStackTraces = true
}

systemProperties["junit.jupiter.execution.parallel.enabled"] = true
}

tasks.withType<KotlinCompile> {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,8 @@ import reactor.core.publisher.Sinks.EmitFailureHandler
import reactor.core.publisher.Sinks.EmitResult
import reactor.core.scheduler.Schedulers
import reactor.netty.Connection
import reactor.netty.tcp.TcpClient
import reactor.netty.http.client.HttpClient
import reactor.netty.http.client.WebsocketClientSpec
import reactor.util.retry.Retry
import reactor.util.retry.Retry.RetrySignal
import java.net.InetSocketAddress
Expand All @@ -35,8 +36,7 @@ class FafLobbyClient(
val tokenMono: Mono<String>,
val version: String,
val userAgent: String,
val host: String,
val port: Int,
val url: String,
val generateUid: Function<Long, String>,
val bufferSize: Int,
val wiretap: Boolean = false,
Expand All @@ -46,6 +46,7 @@ class FafLobbyClient(

private lateinit var config: Config

private var connectionDisposable: Disposable? = null
private var connection: Connection? = null
private var pingDisposable: Disposable? = null
private var connecting: Boolean = false
Expand All @@ -59,9 +60,11 @@ class FafLobbyClient(
private val eventSink: Sinks.Many<ServerMessage> = Sinks.many().unicast().onBackpressureBuffer()
private val rawEvents = eventSink.asFlux().publish().autoConnect()
private val connectionStatusSink: Sinks.Many<ConnectionStatus> = Sinks.many().unicast().onBackpressureBuffer()
override val connectionStatus = connectionStatusSink.asFlux().publish().autoConnect()
override val connectionStatus: Flux<ConnectionStatus> = connectionStatusSink.asFlux().publish().autoConnect()
private val connectionAcquiredSink: Sinks.Many<Any> = Sinks.many().unicast().onBackpressureBuffer()
private val connectionAcquired: Flux<Any> = connectionAcquiredSink.asFlux().publish().autoConnect()

override val events = rawEvents.filter {
override val events: Flux<ServerMessage> = rawEvents.filter {
it !is ServerPingMessage &&
it !is ServerPongMessage &&
it !is SessionResponse &&
Expand All @@ -70,27 +73,28 @@ class FafLobbyClient(
}

private val loginResponseMono = rawEvents.ofType(LoginResponse::class.java).next().flatMap {
when (it) {
is LoginSuccessResponse -> Mono.just(it.me)
is LoginFailedResponse -> Mono.error(LoginException(it.text))
}
}.timeout(Duration.ofMinutes(1))
when (it) {
is LoginSuccessResponse -> Mono.just(it.me)
is LoginFailedResponse -> Mono.error(LoginException(it.text))
}
}.timeout(Duration.ofSeconds(30))
.doOnError(LoginException::class.java) { kicked = true }
.doOnSubscribe {
.doFirst {
prepareAuthenticateOnNextSession()
send(SessionRequest(config.version, config.userAgent))
}

private val loginMono = Mono.defer {
openConnection()
.then(loginResponseMono)
.retryWhen(createRetrySpec(config))
connectionAcquired.next().then(loginResponseMono).doFirst {
openConnection()
}.retryWhen(createRetrySpec(config))
}
.doOnError { LOG.error("Error during connection", it); connection?.dispose() }
.doOnCancel { LOG.debug("Login cancelled"); disconnect() }
.doOnSuccess {
connectionStatusSink.emitNext(ConnectionStatus.CONNECTED, retrySerialFailure)
}
.doOnSubscribe { LOG.debug("Starting login process") }
.materialize()
.cacheInvalidateIf { it.isOnError || (!connecting && (connection == null || connection?.isDisposed == true)) }
.dematerialize<Player>()
Expand All @@ -101,7 +105,7 @@ class FafLobbyClient(
(emitResult == EmitResult.FAIL_NON_SERIALIZED)
}

private val client = TcpClient.newConnection()
private val httpClient = HttpClient.newConnection()
.resolver(DefaultAddressResolverGroup.INSTANCE)
.doOnResolveError { connection, throwable ->
LOG.error("Could not find server", throwable)
Expand All @@ -115,6 +119,7 @@ class FafLobbyClient(
it.addHandlerFirst(LineEncoder(LineSeparator.UNIX)) // TODO: This is not working. Raise a bug ticket! Workaround below
.addHandlerLast(LineBasedFrameDecoder(config.bufferSize))
connection = it
connectionAcquiredSink.emitNext(true, retrySerialFailure)
}.doOnDisconnected {
LOG.info("Disconnected from server")
it.dispose()
Expand Down Expand Up @@ -143,14 +148,19 @@ class FafLobbyClient(
}.subscribe()
}

private fun openConnection(): Mono<out Connection> {
return client
private fun openConnection() {
LOG.debug("Opening connection")
connectionDisposable?.dispose()
connectionDisposable = httpClient
.wiretap(config.wiretap)
.host(config.host)
.port(config.port)
.websocket(WebsocketClientSpec.builder().maxFramePayloadLength(config.bufferSize).build())
.uri(config.url)
.handle { inbound, outbound ->
val inboundMono = inbound.receive()
.asString(Charsets.UTF_8)
.flatMapIterable { it.toCharArray().asIterable() }
.windowUntil { '\n' == it }
.flatMap { it.collectList().map { chars -> chars.toCharArray() }.map { charArray -> String(charArray) } }
.doOnError { LOG.error("Inbound channel closed with error", it) }
.doOnComplete { LOG.info("Inbound channel closed") }
.doOnCancel { LOG.info("Inbound channel cancelled") }
Expand Down Expand Up @@ -211,15 +221,38 @@ class FafLobbyClient(
Mono.empty()
}
}
).then()
).neverComplete()

/* The lobby protocol requires two-way communication. If either the outbound or inbound connections complete/close
then we are better off closing the connection to the server. This is why we return a mono that completes when one
of the connections finishes */
Mono.firstWithSignal(inboundMono, outboundMono)
}
.connect()
.doOnSubscribe { connectionStatusSink.emitNext(ConnectionStatus.CONNECTING, retrySerialFailure) }
.doOnCancel { LOG.info("Connection cancelled") }
.doOnSubscribe {
LOG.debug("Beginning connection process")
connectionStatusSink.emitNext(ConnectionStatus.CONNECTING, retrySerialFailure)
}
.retryWhen(Retry.fixedDelay(5, Duration.ofSeconds(config.retryWaitSeconds / 5))
.doBeforeRetry { retry: RetrySignal ->
LOG.warn(
"Could not connect to server retrying: Attempt #{} of 5",
retry.totalRetries(),
retry.failure()
)
}.onRetryExhaustedThrow { spec, retrySignal ->
LoginException(
"Could not connect to server after ${spec.maxAttempts} attempts",
retrySignal.failure()
)
})
.subscribe(null, {
LOG.warn("Error in connection", it)
connectionStatusSink.emitNext(ConnectionStatus.DISCONNECTED, retrySerialFailure)
}, {
LOG.info("Connection closed")
connectionStatusSink.emitNext(ConnectionStatus.DISCONNECTED, retrySerialFailure)
})
}

override fun connectAndLogin(config: Config): Mono<Player> {
Expand Down Expand Up @@ -267,6 +300,7 @@ class FafLobbyClient(
}

private fun send(message: ClientMessage) {
LOG.trace("Sending message of type {}", message.javaClass)
outboundSink.emitNext(message, retrySerialFailure)
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,8 +5,11 @@ import com.fasterxml.jackson.datatype.jsr310.JavaTimeModule
import com.fasterxml.jackson.module.kotlin.KotlinModule
import org.junit.jupiter.api.Assertions.assertEquals
import org.junit.jupiter.api.Test
import org.junit.jupiter.api.parallel.Execution
import org.junit.jupiter.api.parallel.ExecutionMode
import org.skyscreamer.jsonassert.JSONAssert

@Execution(ExecutionMode.CONCURRENT)
class ClientMessageTest {

companion object {
Expand Down
Loading

0 comments on commit 472039a

Please sign in to comment.