Skip to content

Commit

Permalink
Aggregate frames
Browse files Browse the repository at this point in the history
  • Loading branch information
Sheikah45 committed Oct 12, 2023
1 parent d234f86 commit 4063006
Showing 1 changed file with 9 additions and 7 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -73,11 +73,11 @@ class FafLobbyClient(
}

private val loginResponseMono = rawEvents.ofType(LoginResponse::class.java).next().flatMap {
when (it) {
is LoginSuccessResponse -> Mono.just(it.me)
is LoginFailedResponse -> Mono.error(LoginException(it.text))
}
}.timeout(Duration.ofMinutes(1))
when (it) {
is LoginSuccessResponse -> Mono.just(it.me)
is LoginFailedResponse -> Mono.error(LoginException(it.text))
}
}.timeout(Duration.ofMinutes(1))
.doOnError(LoginException::class.java) { kicked = true }
.doOnSubscribe {
prepareAuthenticateOnNextSession()
Expand All @@ -104,7 +104,8 @@ class FafLobbyClient(
(emitResult == EmitResult.FAIL_NON_SERIALIZED)
}

private val webSocketClient = HttpClient.newConnection().resolver(DefaultAddressResolverGroup.INSTANCE)
private val webSocketClient = HttpClient.newConnection()
.resolver(DefaultAddressResolverGroup.INSTANCE)
.doOnResolveError { connection, throwable ->
LOG.error("Could not find server", throwable)
connection.dispose()
Expand Down Expand Up @@ -149,7 +150,8 @@ class FafLobbyClient(
return webSocketClient
.uri(config.url)
.handle { inbound, outbound ->
val inboundMono = inbound.receive()
val inboundMono = inbound.aggregateFrames(100 * 1024 * 1024)
.receive()
.asString(Charsets.UTF_8)
.doOnError { LOG.error("Inbound channel closed with error", it) }
.doOnComplete { LOG.info("Inbound channel closed") }
Expand Down

0 comments on commit 4063006

Please sign in to comment.