Skip to content

Commit

Permalink
Group frames manually
Browse files Browse the repository at this point in the history
  • Loading branch information
Sheikah45 committed Oct 12, 2023
1 parent 0a4753e commit a3cb800
Showing 1 changed file with 4 additions and 3 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ import reactor.core.publisher.Sinks
import reactor.core.publisher.Sinks.EmitFailureHandler
import reactor.core.publisher.Sinks.EmitResult
import reactor.core.scheduler.Schedulers
import reactor.netty.ByteBufFlux
import reactor.netty.Connection
import reactor.netty.http.client.HttpClient
import reactor.netty.http.client.WebsocketClientSpec
Expand Down Expand Up @@ -148,9 +149,9 @@ class FafLobbyClient(
return webSocketClient
.uri(config.url)
.handle { inbound, outbound ->
val inboundMono = inbound.aggregateFrames(config.bufferSize)
.receive()
.asString(Charsets.UTF_8)
val inboundMono = inbound.receiveFrames()
.windowUntil { it.isFinalFragment }
.flatMap { ByteBufFlux.fromInbound(it.map { frame -> frame.content() }).asString(Charsets.UTF_8) }
.doOnError { LOG.error("Inbound channel closed with error", it) }
.doOnComplete { LOG.info("Inbound channel closed") }
.doOnCancel { LOG.info("Inbound channel cancelled") }
Expand Down

0 comments on commit a3cb800

Please sign in to comment.