diff --git a/faf-commons-lobby/src/main/kotlin/com/faforever/commons/lobby/FafLobbyClient.kt b/faf-commons-lobby/src/main/kotlin/com/faforever/commons/lobby/FafLobbyClient.kt index 2f0bc597..660b6a3a 100644 --- a/faf-commons-lobby/src/main/kotlin/com/faforever/commons/lobby/FafLobbyClient.kt +++ b/faf-commons-lobby/src/main/kotlin/com/faforever/commons/lobby/FafLobbyClient.kt @@ -15,6 +15,7 @@ import reactor.core.publisher.Sinks import reactor.core.publisher.Sinks.EmitFailureHandler import reactor.core.publisher.Sinks.EmitResult import reactor.core.scheduler.Schedulers +import reactor.netty.ByteBufFlux import reactor.netty.Connection import reactor.netty.http.client.HttpClient import reactor.netty.http.client.WebsocketClientSpec @@ -148,9 +149,9 @@ class FafLobbyClient( return webSocketClient .uri(config.url) .handle { inbound, outbound -> - val inboundMono = inbound.aggregateFrames(config.bufferSize) - .receive() - .asString(Charsets.UTF_8) + val inboundMono = inbound.receiveFrames() + .windowUntil { it.isFinalFragment } + .flatMap { ByteBufFlux.fromInbound(it.map { frame -> frame.content() }).asString(Charsets.UTF_8) } .doOnError { LOG.error("Inbound channel closed with error", it) } .doOnComplete { LOG.info("Inbound channel closed") } .doOnCancel { LOG.info("Inbound channel cancelled") }