From 3e4890976aa43dc3b2b8d37dbb47118a344d60d9 Mon Sep 17 00:00:00 2001 From: Oleg Yukhnevich Date: Mon, 13 Nov 2023 20:12:42 +0200 Subject: [PATCH] hide ktor deprecation * this API will be removed in Ktor 4.0 * possible replacement will be ready by Ktor 3.0 + kotlinx.io * most of the public declarations will be replaced in coming changes --- rsocket-core/api/rsocket-core.api | 4 ++++ .../kotlin/io/rsocket/kotlin/Connection.kt | 16 ++++++++++------ .../kotlin/io/rsocket/kotlin/buffers.kt | 19 +++++++++++++++++++ .../rsocket/kotlin/core/RSocketConnector.kt | 10 +++++++--- .../kotlin/core/RSocketConnectorBuilder.kt | 12 ++++++++++-- .../io/rsocket/kotlin/core/RSocketServer.kt | 12 ++++++++---- .../kotlin/core/RSocketServerBuilder.kt | 11 +++++++++-- .../io/rsocket/kotlin/frame/ExtensionFrame.kt | 8 ++++++-- .../kotlin/io/rsocket/kotlin/frame/Frame.kt | 6 +++--- .../io/rsocket/kotlin/frame/KeepAliveFrame.kt | 4 ++-- .../io/rsocket/kotlin/frame/LeaseFrame.kt | 4 ++-- .../rsocket/kotlin/frame/MetadataPushFrame.kt | 4 ++-- .../io/rsocket/kotlin/frame/RequestFrame.kt | 6 +++--- .../io/rsocket/kotlin/frame/ResumeFrame.kt | 4 ++-- .../io/rsocket/kotlin/frame/SetupFrame.kt | 4 ++-- .../io/rsocket/kotlin/frame/io/packet.kt | 7 +++++-- .../io/rsocket/kotlin/frame/io/payload.kt | 4 ++-- .../kotlin/io/rsocket/kotlin/frame/io/util.kt | 8 ++++---- .../io/rsocket/kotlin/internal/Connect.kt | 17 ++++++++++------- .../io/rsocket/kotlin/internal/FrameSender.kt | 6 +++--- .../kotlin/internal/RSocketRequester.kt | 2 +- .../rsocket/kotlin/internal/StreamsStorage.kt | 4 ++-- .../kotlin/internal/handler/FrameHandler.kt | 10 ++++++---- .../RequesterRequestChannelFrameHandler.kt | 2 +- .../RequesterRequestResponseFrameHandler.kt | 4 ++-- .../RequesterRequestStreamFrameHandler.kt | 2 +- .../ResponderFireAndForgetFrameHandler.kt | 4 ++-- .../ResponderRequestChannelFrameHandler.kt | 2 +- .../ResponderRequestResponseFrameHandler.kt | 4 ++-- .../ResponderRequestStreamFrameHandler.kt | 4 ++-- .../kotlin/metadata/CompositeMetadata.kt | 2 +- .../metadata/CompositeMetadataExtensions.kt | 6 +++--- .../io/rsocket/kotlin/metadata/Metadata.kt | 8 ++++---- ...erStreamAcceptableDataMimeTypesMetadata.kt | 4 ++-- .../metadata/PerStreamDataMimeTypeMetadata.kt | 4 ++-- .../io/rsocket/kotlin/metadata/RawMetadata.kt | 4 ++-- .../kotlin/metadata/RoutingMetadata.kt | 4 ++-- .../kotlin/metadata/ZipkinTracingMetadata.kt | 4 ++-- .../kotlin/metadata/security/AuthMetadata.kt | 6 +++--- .../metadata/security/BearerAuthMetadata.kt | 7 +++++-- .../metadata/security/RawAuthMetadata.kt | 8 ++++---- .../metadata/security/SimpleAuthMetadata.kt | 7 +++++-- .../io/rsocket/kotlin/TestConnection.kt | 3 --- .../io/rsocket/kotlin/core/RSocketTest.kt | 4 ++-- .../api/rsocket-ktor-client.api | 2 +- .../io/rsocket/kotlin/ktor/client/Builders.kt | 7 ++----- .../kotlin/ktor/client/RSocketSupport.kt | 5 +++-- .../api/rsocket-ktor-server.api | 2 +- .../kotlin/ktor/server/RSocketSupport.kt | 5 +++-- .../io/rsocket/kotlin/ktor/server/Routing.kt | 7 ++----- .../rsocket/kotlin/test/InUseTrackingPool.kt | 4 +++- .../kotlin/io/rsocket/kotlin/test/Packets.kt | 5 ++++- .../io/rsocket/kotlin/test/TestConfig.kt | 10 ++++++---- .../transport/ktor/tcp/TcpClientTransport.kt | 10 +++++----- .../transport/ktor/tcp/TcpConnection.kt | 3 --- .../transport/ktor/tcp/TcpServerTransport.kt | 6 +++--- .../transport/ktor/tcp/TcpServerTest.kt | 4 ++-- .../transport/ktor/tcp/TcpTransportTest.kt | 5 ++--- .../client/WebSocketClientTransport.kt | 14 +++++++------- .../server/WebSocketServerTransport.kt | 6 +++--- .../server/WebSocketTransportTest.kt | 5 ++--- ...socket-transport-ktor-websocket-shared.api | 2 +- .../ktor/websocket/WebSocketConnection.kt | 3 --- .../kotlin/transport/local/LocalConnection.kt | 5 +---- .../kotlin/transport/local/LocalServer.kt | 7 ++----- .../transport/local/LocalTransportTest.kt | 5 ++--- .../nodejs/tcp/TcpClientTransport.kt | 8 ++++---- .../transport/nodejs/tcp/TcpConnection.kt | 3 --- .../nodejs/tcp/TcpServerTransport.kt | 10 ++++++---- .../transport/nodejs/tcp/TcpTransportTest.kt | 7 +++---- 70 files changed, 238 insertions(+), 187 deletions(-) create mode 100644 rsocket-core/src/commonMain/kotlin/io/rsocket/kotlin/buffers.kt diff --git a/rsocket-core/api/rsocket-core.api b/rsocket-core/api/rsocket-core.api index f5b92b68..bdce403b 100644 --- a/rsocket-core/api/rsocket-core.api +++ b/rsocket-core/api/rsocket-core.api @@ -212,12 +212,14 @@ public final class io/rsocket/kotlin/core/RSocketConnector { public final class io/rsocket/kotlin/core/RSocketConnectorBuilder { public final fun acceptor (Lio/rsocket/kotlin/ConnectionAcceptor;)V public final fun connectionConfig (Lkotlin/jvm/functions/Function1;)V + public final fun getBufferPool ()Lio/ktor/utils/io/pool/ObjectPool; public final fun getLoggerFactory ()Lio/rsocket/kotlin/logging/LoggerFactory; public final fun getMaxFragmentSize ()I public final fun interceptors (Lkotlin/jvm/functions/Function1;)V public final fun reconnectable (JLkotlin/jvm/functions/Function2;)V public final fun reconnectable (Lkotlin/jvm/functions/Function3;)V public static synthetic fun reconnectable$default (Lio/rsocket/kotlin/core/RSocketConnectorBuilder;JLkotlin/jvm/functions/Function2;ILjava/lang/Object;)V + public final fun setBufferPool (Lio/ktor/utils/io/pool/ObjectPool;)V public final fun setLoggerFactory (Lio/rsocket/kotlin/logging/LoggerFactory;)V public final fun setMaxFragmentSize (I)V } @@ -242,9 +244,11 @@ public final class io/rsocket/kotlin/core/RSocketServer { } public final class io/rsocket/kotlin/core/RSocketServerBuilder { + public final fun getBufferPool ()Lio/ktor/utils/io/pool/ObjectPool; public final fun getLoggerFactory ()Lio/rsocket/kotlin/logging/LoggerFactory; public final fun getMaxFragmentSize ()I public final fun interceptors (Lkotlin/jvm/functions/Function1;)V + public final fun setBufferPool (Lio/ktor/utils/io/pool/ObjectPool;)V public final fun setLoggerFactory (Lio/rsocket/kotlin/logging/LoggerFactory;)V public final fun setMaxFragmentSize (I)V } diff --git a/rsocket-core/src/commonMain/kotlin/io/rsocket/kotlin/Connection.kt b/rsocket-core/src/commonMain/kotlin/io/rsocket/kotlin/Connection.kt index 71e2aa08..8a80a9d6 100644 --- a/rsocket-core/src/commonMain/kotlin/io/rsocket/kotlin/Connection.kt +++ b/rsocket-core/src/commonMain/kotlin/io/rsocket/kotlin/Connection.kt @@ -1,5 +1,5 @@ /* - * Copyright 2015-2022 the original author or authors. + * Copyright 2015-2023 the original author or authors. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -28,17 +28,21 @@ import kotlinx.coroutines.* */ @TransportApi public interface Connection : CoroutineScope { - public val pool: ObjectPool get() = ChunkBuffer.Pool + @Suppress("DEPRECATION") + @Deprecated(DEPRECATED_IN_KTOR, level = DeprecationLevel.ERROR) + public val pool: ObjectPool get() = TODO("SHOULD NOT BE CALLED ANY MORE") public suspend fun send(packet: ByteReadPacket) public suspend fun receive(): ByteReadPacket } +@Suppress("DEPRECATION") @OptIn(TransportApi::class) -internal suspend inline fun Connection.receiveFrame(block: (frame: Frame) -> T): T = - receive().readFrame(pool).closeOnError(block) +internal suspend inline fun Connection.receiveFrame(bufferPool: ObjectPool, block: (frame: Frame) -> T): T = + receive().readFrame(bufferPool).closeOnError(block) +@Suppress("DEPRECATION") @OptIn(TransportApi::class) -internal suspend fun Connection.sendFrame(frame: Frame) { - frame.toPacket(pool).closeOnError { send(it) } +internal suspend fun Connection.sendFrame(bufferPool: ObjectPool, frame: Frame) { + frame.toPacket(bufferPool).closeOnError { send(it) } } diff --git a/rsocket-core/src/commonMain/kotlin/io/rsocket/kotlin/buffers.kt b/rsocket-core/src/commonMain/kotlin/io/rsocket/kotlin/buffers.kt new file mode 100644 index 00000000..6725c167 --- /dev/null +++ b/rsocket-core/src/commonMain/kotlin/io/rsocket/kotlin/buffers.kt @@ -0,0 +1,19 @@ +/* + * Copyright 2015-2023 the original author or authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package io.rsocket.kotlin + +internal const val DEPRECATED_IN_KTOR = "Deprecated in Ktor" diff --git a/rsocket-core/src/commonMain/kotlin/io/rsocket/kotlin/core/RSocketConnector.kt b/rsocket-core/src/commonMain/kotlin/io/rsocket/kotlin/core/RSocketConnector.kt index e76ab2a9..952c268c 100644 --- a/rsocket-core/src/commonMain/kotlin/io/rsocket/kotlin/core/RSocketConnector.kt +++ b/rsocket-core/src/commonMain/kotlin/io/rsocket/kotlin/core/RSocketConnector.kt @@ -1,5 +1,5 @@ /* - * Copyright 2015-2022 the original author or authors. + * Copyright 2015-2023 the original author or authors. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -16,6 +16,8 @@ package io.rsocket.kotlin.core +import io.ktor.utils.io.core.internal.* +import io.ktor.utils.io.pool.* import io.rsocket.kotlin.* import io.rsocket.kotlin.frame.* import io.rsocket.kotlin.frame.io.* @@ -32,6 +34,7 @@ public class RSocketConnector internal constructor( private val connectionConfigProvider: () -> ConnectionConfig, private val acceptor: ConnectionAcceptor, private val reconnectPredicate: ReconnectPredicate?, + @Suppress("DEPRECATION") private val bufferPool: ObjectPool, ) { public suspend fun connect(transport: ClientTransport): RSocket = when (reconnectPredicate) { @@ -68,9 +71,10 @@ public class RSocketConnector internal constructor( maxFragmentSize = maxFragmentSize, interceptors = interceptors, connectionConfig = connectionConfig, - acceptor = acceptor + acceptor = acceptor, + bufferPool = bufferPool ) - connection.sendFrame(setupFrame) + connection.sendFrame(bufferPool, setupFrame) return requester } catch (cause: Throwable) { connectionConfig.setupPayload.close() diff --git a/rsocket-core/src/commonMain/kotlin/io/rsocket/kotlin/core/RSocketConnectorBuilder.kt b/rsocket-core/src/commonMain/kotlin/io/rsocket/kotlin/core/RSocketConnectorBuilder.kt index 6e2cc19d..c2753120 100644 --- a/rsocket-core/src/commonMain/kotlin/io/rsocket/kotlin/core/RSocketConnectorBuilder.kt +++ b/rsocket-core/src/commonMain/kotlin/io/rsocket/kotlin/core/RSocketConnectorBuilder.kt @@ -1,5 +1,5 @@ /* - * Copyright 2015-2022 the original author or authors. + * Copyright 2015-2023 the original author or authors. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -16,6 +16,8 @@ package io.rsocket.kotlin.core +import io.ktor.utils.io.core.internal.* +import io.ktor.utils.io.pool.* import io.rsocket.kotlin.* import io.rsocket.kotlin.internal.* import io.rsocket.kotlin.keepalive.* @@ -35,6 +37,10 @@ public class RSocketConnectorBuilder internal constructor() { field = value } + @Suppress("DEPRECATION") + @Deprecated("Only for tests in rsocket", level = DeprecationLevel.ERROR) + public var bufferPool: ObjectPool = ChunkBuffer.Pool + private val connectionConfig: ConnectionConfigBuilder = ConnectionConfigBuilder() private val interceptors: InterceptorsBuilder = InterceptorsBuilder() private var acceptor: ConnectionAcceptor? = null @@ -101,6 +107,7 @@ public class RSocketConnectorBuilder internal constructor() { } } + @Suppress("DEPRECATION_ERROR") @OptIn(RSocketLoggingApi::class) internal fun build(): RSocketConnector = RSocketConnector( loggerFactory, @@ -108,7 +115,8 @@ public class RSocketConnectorBuilder internal constructor() { interceptors.build(), connectionConfig.producer(), acceptor ?: defaultAcceptor, - reconnectPredicate + reconnectPredicate, + bufferPool ) private companion object { diff --git a/rsocket-core/src/commonMain/kotlin/io/rsocket/kotlin/core/RSocketServer.kt b/rsocket-core/src/commonMain/kotlin/io/rsocket/kotlin/core/RSocketServer.kt index f1391fce..24de28be 100644 --- a/rsocket-core/src/commonMain/kotlin/io/rsocket/kotlin/core/RSocketServer.kt +++ b/rsocket-core/src/commonMain/kotlin/io/rsocket/kotlin/core/RSocketServer.kt @@ -1,5 +1,5 @@ /* - * Copyright 2015-2022 the original author or authors. + * Copyright 2015-2023 the original author or authors. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -16,6 +16,8 @@ package io.rsocket.kotlin.core +import io.ktor.utils.io.core.internal.* +import io.ktor.utils.io.pool.* import io.rsocket.kotlin.* import io.rsocket.kotlin.frame.* import io.rsocket.kotlin.frame.io.* @@ -29,6 +31,7 @@ public class RSocketServer internal constructor( private val loggerFactory: LoggerFactory, private val maxFragmentSize: Int, private val interceptors: Interceptors, + @Suppress("DEPRECATION") private val bufferPool: ObjectPool, ) { @DelicateCoroutinesApi @@ -47,7 +50,7 @@ public class RSocketServer internal constructor( } } - private suspend fun Connection.bind(acceptor: ConnectionAcceptor): Job = receiveFrame { setupFrame -> + private suspend fun Connection.bind(acceptor: ConnectionAcceptor): Job = receiveFrame(bufferPool) { setupFrame -> when { setupFrame !is SetupFrame -> failSetup(RSocketError.Setup.Invalid("Invalid setup frame: ${setupFrame.type}")) setupFrame.version != Version.Current -> failSetup(RSocketError.Setup.Unsupported("Unsupported version: ${setupFrame.version}")) @@ -64,7 +67,8 @@ public class RSocketServer internal constructor( payloadMimeType = setupFrame.payloadMimeType, setupPayload = setupFrame.payload ), - acceptor = acceptor + acceptor = acceptor, + bufferPool = bufferPool ) coroutineContext.job } catch (e: Throwable) { @@ -75,7 +79,7 @@ public class RSocketServer internal constructor( @Suppress("SuspendFunctionOnCoroutineScope") private suspend fun Connection.failSetup(error: RSocketError.Setup): Nothing { - sendFrame(ErrorFrame(0, error)) + sendFrame(bufferPool, ErrorFrame(0, error)) cancel("Connection establishment failed", error) throw error } diff --git a/rsocket-core/src/commonMain/kotlin/io/rsocket/kotlin/core/RSocketServerBuilder.kt b/rsocket-core/src/commonMain/kotlin/io/rsocket/kotlin/core/RSocketServerBuilder.kt index 8b15c5ef..cedff364 100644 --- a/rsocket-core/src/commonMain/kotlin/io/rsocket/kotlin/core/RSocketServerBuilder.kt +++ b/rsocket-core/src/commonMain/kotlin/io/rsocket/kotlin/core/RSocketServerBuilder.kt @@ -1,5 +1,5 @@ /* - * Copyright 2015-2022 the original author or authors. + * Copyright 2015-2023 the original author or authors. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -16,6 +16,8 @@ package io.rsocket.kotlin.core +import io.ktor.utils.io.core.internal.* +import io.ktor.utils.io.pool.* import io.rsocket.kotlin.* import io.rsocket.kotlin.logging.* @@ -30,14 +32,19 @@ public class RSocketServerBuilder internal constructor() { field = value } + @Suppress("DEPRECATION") + @Deprecated("Only for tests in rsocket", level = DeprecationLevel.ERROR) + public var bufferPool: ObjectPool = ChunkBuffer.Pool + private val interceptors: InterceptorsBuilder = InterceptorsBuilder() public fun interceptors(configure: InterceptorsBuilder.() -> Unit) { interceptors.configure() } + @Suppress("DEPRECATION_ERROR") @OptIn(RSocketLoggingApi::class) - internal fun build(): RSocketServer = RSocketServer(loggerFactory, maxFragmentSize, interceptors.build()) + internal fun build(): RSocketServer = RSocketServer(loggerFactory, maxFragmentSize, interceptors.build(), bufferPool) } public fun RSocketServer(configure: RSocketServerBuilder.() -> Unit = {}): RSocketServer { diff --git a/rsocket-core/src/commonMain/kotlin/io/rsocket/kotlin/frame/ExtensionFrame.kt b/rsocket-core/src/commonMain/kotlin/io/rsocket/kotlin/frame/ExtensionFrame.kt index 11350031..91922e65 100644 --- a/rsocket-core/src/commonMain/kotlin/io/rsocket/kotlin/frame/ExtensionFrame.kt +++ b/rsocket-core/src/commonMain/kotlin/io/rsocket/kotlin/frame/ExtensionFrame.kt @@ -1,5 +1,5 @@ /* - * Copyright 2015-2022 the original author or authors. + * Copyright 2015-2023 the original author or authors. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -49,7 +49,11 @@ internal class ExtensionFrame( } } -internal fun ByteReadPacket.readExtension(pool: ObjectPool, streamId: Int, flags: Int): ExtensionFrame { +internal fun ByteReadPacket.readExtension( + @Suppress("DEPRECATION") pool: ObjectPool, + streamId: Int, + flags: Int, +): ExtensionFrame { val extendedType = readInt() val payload = readPayload(pool, flags) return ExtensionFrame(streamId, extendedType, payload) diff --git a/rsocket-core/src/commonMain/kotlin/io/rsocket/kotlin/frame/Frame.kt b/rsocket-core/src/commonMain/kotlin/io/rsocket/kotlin/frame/Frame.kt index 6b15cb65..440d0781 100644 --- a/rsocket-core/src/commonMain/kotlin/io/rsocket/kotlin/frame/Frame.kt +++ b/rsocket-core/src/commonMain/kotlin/io/rsocket/kotlin/frame/Frame.kt @@ -1,5 +1,5 @@ /* - * Copyright 2015-2022 the original author or authors. + * Copyright 2015-2023 the original author or authors. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -33,7 +33,7 @@ internal sealed class Frame : Closeable { protected abstract fun StringBuilder.appendFlags() protected abstract fun StringBuilder.appendSelf() - internal fun toPacket(pool: ObjectPool): ByteReadPacket { + internal fun toPacket(@Suppress("DEPRECATION") pool: ObjectPool): ByteReadPacket { check(type.canHaveMetadata || !(flags check Flags.Metadata)) { "bad value for metadata flag" } return buildPacket(pool) { writeInt(streamId) @@ -54,7 +54,7 @@ internal sealed class Frame : Closeable { } } -internal fun ByteReadPacket.readFrame(pool: ObjectPool): Frame = use { +internal fun ByteReadPacket.readFrame(@Suppress("DEPRECATION") pool: ObjectPool): Frame = use { val streamId = readInt() val typeAndFlags = readShort().toInt() and 0xFFFF val flags = typeAndFlags and FlagsMask diff --git a/rsocket-core/src/commonMain/kotlin/io/rsocket/kotlin/frame/KeepAliveFrame.kt b/rsocket-core/src/commonMain/kotlin/io/rsocket/kotlin/frame/KeepAliveFrame.kt index 64c1b436..6d1d9d1e 100644 --- a/rsocket-core/src/commonMain/kotlin/io/rsocket/kotlin/frame/KeepAliveFrame.kt +++ b/rsocket-core/src/commonMain/kotlin/io/rsocket/kotlin/frame/KeepAliveFrame.kt @@ -1,5 +1,5 @@ /* - * Copyright 2015-2022 the original author or authors. + * Copyright 2015-2023 the original author or authors. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -51,7 +51,7 @@ internal class KeepAliveFrame( } } -internal fun ByteReadPacket.readKeepAlive(pool: ObjectPool, flags: Int): KeepAliveFrame { +internal fun ByteReadPacket.readKeepAlive(@Suppress("DEPRECATION") pool: ObjectPool, flags: Int): KeepAliveFrame { val respond = flags check RespondFlag val lastPosition = readLong() val data = readPacket(pool) diff --git a/rsocket-core/src/commonMain/kotlin/io/rsocket/kotlin/frame/LeaseFrame.kt b/rsocket-core/src/commonMain/kotlin/io/rsocket/kotlin/frame/LeaseFrame.kt index cd705d81..30c18d30 100644 --- a/rsocket-core/src/commonMain/kotlin/io/rsocket/kotlin/frame/LeaseFrame.kt +++ b/rsocket-core/src/commonMain/kotlin/io/rsocket/kotlin/frame/LeaseFrame.kt @@ -1,5 +1,5 @@ /* - * Copyright 2015-2022 the original author or authors. + * Copyright 2015-2023 the original author or authors. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -50,7 +50,7 @@ internal class LeaseFrame( } } -internal fun ByteReadPacket.readLease(pool: ObjectPool, flags: Int): LeaseFrame { +internal fun ByteReadPacket.readLease(@Suppress("DEPRECATION") pool: ObjectPool, flags: Int): LeaseFrame { val ttl = readInt() val numberOfRequests = readInt() val metadata = if (flags check Flags.Metadata) readMetadata(pool) else null diff --git a/rsocket-core/src/commonMain/kotlin/io/rsocket/kotlin/frame/MetadataPushFrame.kt b/rsocket-core/src/commonMain/kotlin/io/rsocket/kotlin/frame/MetadataPushFrame.kt index 91e6d536..ee99997c 100644 --- a/rsocket-core/src/commonMain/kotlin/io/rsocket/kotlin/frame/MetadataPushFrame.kt +++ b/rsocket-core/src/commonMain/kotlin/io/rsocket/kotlin/frame/MetadataPushFrame.kt @@ -1,5 +1,5 @@ /* - * Copyright 2015-2022 the original author or authors. + * Copyright 2015-2023 the original author or authors. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -45,5 +45,5 @@ internal class MetadataPushFrame( } } -internal fun ByteReadPacket.readMetadataPush(pool: ObjectPool): MetadataPushFrame = +internal fun ByteReadPacket.readMetadataPush(@Suppress("DEPRECATION") pool: ObjectPool): MetadataPushFrame = MetadataPushFrame(readPacket(pool)) diff --git a/rsocket-core/src/commonMain/kotlin/io/rsocket/kotlin/frame/RequestFrame.kt b/rsocket-core/src/commonMain/kotlin/io/rsocket/kotlin/frame/RequestFrame.kt index b78885ec..16efc70d 100644 --- a/rsocket-core/src/commonMain/kotlin/io/rsocket/kotlin/frame/RequestFrame.kt +++ b/rsocket-core/src/commonMain/kotlin/io/rsocket/kotlin/frame/RequestFrame.kt @@ -1,5 +1,5 @@ /* - * Copyright 2015-2022 the original author or authors. + * Copyright 2015-2023 the original author or authors. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -66,11 +66,11 @@ internal class RequestFrame( } internal fun ByteReadPacket.readRequest( - pool: ObjectPool, + @Suppress("DEPRECATION") pool: ObjectPool, type: FrameType, streamId: Int, flags: Int, - withInitial: Boolean + withInitial: Boolean, ): RequestFrame { val fragmentFollows = flags check Flags.Follows val complete = flags check Flags.Complete diff --git a/rsocket-core/src/commonMain/kotlin/io/rsocket/kotlin/frame/ResumeFrame.kt b/rsocket-core/src/commonMain/kotlin/io/rsocket/kotlin/frame/ResumeFrame.kt index 34ee468c..d72e0841 100644 --- a/rsocket-core/src/commonMain/kotlin/io/rsocket/kotlin/frame/ResumeFrame.kt +++ b/rsocket-core/src/commonMain/kotlin/io/rsocket/kotlin/frame/ResumeFrame.kt @@ -1,5 +1,5 @@ /* - * Copyright 2015-2022 the original author or authors. + * Copyright 2015-2023 the original author or authors. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -50,7 +50,7 @@ internal class ResumeFrame( } } -internal fun ByteReadPacket.readResume(pool: ObjectPool): ResumeFrame { +internal fun ByteReadPacket.readResume(@Suppress("DEPRECATION") pool: ObjectPool): ResumeFrame { val version = readVersion() val resumeToken = readResumeToken(pool) val lastReceivedServerPosition = readLong() diff --git a/rsocket-core/src/commonMain/kotlin/io/rsocket/kotlin/frame/SetupFrame.kt b/rsocket-core/src/commonMain/kotlin/io/rsocket/kotlin/frame/SetupFrame.kt index c2f37402..92e81e87 100644 --- a/rsocket-core/src/commonMain/kotlin/io/rsocket/kotlin/frame/SetupFrame.kt +++ b/rsocket-core/src/commonMain/kotlin/io/rsocket/kotlin/frame/SetupFrame.kt @@ -1,5 +1,5 @@ /* - * Copyright 2015-2022 the original author or authors. + * Copyright 2015-2023 the original author or authors. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -76,7 +76,7 @@ internal class SetupFrame( } } -internal fun ByteReadPacket.readSetup(pool: ObjectPool, flags: Int): SetupFrame { +internal fun ByteReadPacket.readSetup(@Suppress("DEPRECATION") pool: ObjectPool, flags: Int): SetupFrame { val version = readVersion() val keepAlive = run { val interval = readInt() diff --git a/rsocket-core/src/commonMain/kotlin/io/rsocket/kotlin/frame/io/packet.kt b/rsocket-core/src/commonMain/kotlin/io/rsocket/kotlin/frame/io/packet.kt index 517b2fdc..03bd1cfb 100644 --- a/rsocket-core/src/commonMain/kotlin/io/rsocket/kotlin/frame/io/packet.kt +++ b/rsocket-core/src/commonMain/kotlin/io/rsocket/kotlin/frame/io/packet.kt @@ -1,5 +1,5 @@ /* - * Copyright 2015-2022 the original author or authors. + * Copyright 2015-2023 the original author or authors. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -20,7 +20,10 @@ import io.ktor.utils.io.core.* import io.ktor.utils.io.core.internal.* import io.ktor.utils.io.pool.* -internal inline fun buildPacket(pool: ObjectPool, block: BytePacketBuilder.() -> Unit): ByteReadPacket { +internal inline fun buildPacket( + @Suppress("DEPRECATION") pool: ObjectPool, + block: BytePacketBuilder.() -> Unit, +): ByteReadPacket { val builder = BytePacketBuilder(pool) try { block(builder) diff --git a/rsocket-core/src/commonMain/kotlin/io/rsocket/kotlin/frame/io/payload.kt b/rsocket-core/src/commonMain/kotlin/io/rsocket/kotlin/frame/io/payload.kt index b1144874..96ef6aa6 100644 --- a/rsocket-core/src/commonMain/kotlin/io/rsocket/kotlin/frame/io/payload.kt +++ b/rsocket-core/src/commonMain/kotlin/io/rsocket/kotlin/frame/io/payload.kt @@ -22,7 +22,7 @@ import io.ktor.utils.io.pool.* import io.rsocket.kotlin.internal.io.* import io.rsocket.kotlin.payload.* -internal fun ByteReadPacket.readMetadata(pool: ObjectPool): ByteReadPacket { +internal fun ByteReadPacket.readMetadata(@Suppress("DEPRECATION") pool: ObjectPool): ByteReadPacket { val length = readInt24() return readPacket(pool, length) } @@ -34,7 +34,7 @@ internal fun BytePacketBuilder.writeMetadata(metadata: ByteReadPacket?) { } } -internal fun ByteReadPacket.readPayload(pool: ObjectPool, flags: Int): Payload { +internal fun ByteReadPacket.readPayload(@Suppress("DEPRECATION") pool: ObjectPool, flags: Int): Payload { val metadata = if (flags check Flags.Metadata) readMetadata(pool) else null val data = readPacket(pool) return Payload(data = data, metadata = metadata) diff --git a/rsocket-core/src/commonMain/kotlin/io/rsocket/kotlin/frame/io/util.kt b/rsocket-core/src/commonMain/kotlin/io/rsocket/kotlin/frame/io/util.kt index ca679034..f2e96ba8 100644 --- a/rsocket-core/src/commonMain/kotlin/io/rsocket/kotlin/frame/io/util.kt +++ b/rsocket-core/src/commonMain/kotlin/io/rsocket/kotlin/frame/io/util.kt @@ -1,5 +1,5 @@ /* - * Copyright 2015-2022 the original author or authors. + * Copyright 2015-2023 the original author or authors. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -20,7 +20,7 @@ import io.ktor.utils.io.core.* import io.ktor.utils.io.core.internal.* import io.ktor.utils.io.pool.* -internal fun ByteReadPacket.readResumeToken(pool: ObjectPool): ByteReadPacket { +internal fun ByteReadPacket.readResumeToken(@Suppress("DEPRECATION") pool: ObjectPool): ByteReadPacket { val length = readShort().toInt() and 0xFFFF return readPacket(pool, length) } @@ -33,14 +33,14 @@ internal fun BytePacketBuilder.writeResumeToken(resumeToken: ByteReadPacket?) { } } -internal fun ByteReadPacket.readPacket(pool: ObjectPool): ByteReadPacket { +internal fun ByteReadPacket.readPacket(@Suppress("DEPRECATION") pool: ObjectPool): ByteReadPacket { if (isEmpty) return ByteReadPacket.Empty return buildPacket(pool) { writePacket(this@readPacket) } } -internal fun ByteReadPacket.readPacket(pool: ObjectPool, length: Int): ByteReadPacket { +internal fun ByteReadPacket.readPacket(@Suppress("DEPRECATION") pool: ObjectPool, length: Int): ByteReadPacket { if (length == 0) return ByteReadPacket.Empty return buildPacket(pool) { writePacket(this@readPacket, length) diff --git a/rsocket-core/src/commonMain/kotlin/io/rsocket/kotlin/internal/Connect.kt b/rsocket-core/src/commonMain/kotlin/io/rsocket/kotlin/internal/Connect.kt index 71493a64..0284e111 100644 --- a/rsocket-core/src/commonMain/kotlin/io/rsocket/kotlin/internal/Connect.kt +++ b/rsocket-core/src/commonMain/kotlin/io/rsocket/kotlin/internal/Connect.kt @@ -1,5 +1,5 @@ /* - * Copyright 2015-2022 the original author or authors. + * Copyright 2015-2023 the original author or authors. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -16,6 +16,8 @@ package io.rsocket.kotlin.internal +import io.ktor.utils.io.core.internal.* +import io.ktor.utils.io.pool.* import io.rsocket.kotlin.* import io.rsocket.kotlin.core.* import io.rsocket.kotlin.frame.* @@ -28,11 +30,12 @@ internal suspend inline fun connect( maxFragmentSize: Int, interceptors: Interceptors, connectionConfig: ConnectionConfig, - acceptor: ConnectionAcceptor + acceptor: ConnectionAcceptor, + @Suppress("DEPRECATION") bufferPool: ObjectPool, ): RSocket { val prioritizer = Prioritizer() - val frameSender = FrameSender(prioritizer, connection.pool, maxFragmentSize) - val streamsStorage = StreamsStorage(isServer, connection.pool) + val frameSender = FrameSender(prioritizer, bufferPool, maxFragmentSize) + val streamsStorage = StreamsStorage(isServer, bufferPool) val keepAliveHandler = KeepAliveHandler(connectionConfig.keepAlive, frameSender) val requestJob = SupervisorJob(connection.coroutineContext[Job]) @@ -49,7 +52,7 @@ internal suspend inline fun connect( requestContext + CoroutineName("rSocket-requester"), frameSender, streamsStorage, - connection.pool + bufferPool ) ) val requestHandler = interceptors.wrapResponder( @@ -82,12 +85,12 @@ internal suspend inline fun connect( // start sending frames to connection (connection + CoroutineName("rSocket-connection-send")).launch { - while (isActive) connection.sendFrame(prioritizer.receive()) + while (isActive) connection.sendFrame(bufferPool, prioritizer.receive()) } // start frame handling (connection + CoroutineName("rSocket-connection-receive")).launch { - while (isActive) connection.receiveFrame { frame -> + while (isActive) connection.receiveFrame(bufferPool) { frame -> when (frame.streamId) { 0 -> when (frame) { is MetadataPushFrame -> responder.handleMetadataPush(frame.metadata) diff --git a/rsocket-core/src/commonMain/kotlin/io/rsocket/kotlin/internal/FrameSender.kt b/rsocket-core/src/commonMain/kotlin/io/rsocket/kotlin/internal/FrameSender.kt index bddb76b3..6c70202b 100644 --- a/rsocket-core/src/commonMain/kotlin/io/rsocket/kotlin/internal/FrameSender.kt +++ b/rsocket-core/src/commonMain/kotlin/io/rsocket/kotlin/internal/FrameSender.kt @@ -1,5 +1,5 @@ /* - * Copyright 2015-2022 the original author or authors. + * Copyright 2015-2023 the original author or authors. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -32,8 +32,8 @@ private const val fragmentOffsetWithMetadata = fragmentOffset + lengthSize internal class FrameSender( private val prioritizer: Prioritizer, - private val pool: ObjectPool, - private val maxFragmentSize: Int + @Suppress("DEPRECATION") private val pool: ObjectPool, + private val maxFragmentSize: Int, ) { suspend fun sendKeepAlive(respond: Boolean, lastPosition: Long, data: ByteReadPacket): Unit = diff --git a/rsocket-core/src/commonMain/kotlin/io/rsocket/kotlin/internal/RSocketRequester.kt b/rsocket-core/src/commonMain/kotlin/io/rsocket/kotlin/internal/RSocketRequester.kt index edf2fbca..13097fd4 100644 --- a/rsocket-core/src/commonMain/kotlin/io/rsocket/kotlin/internal/RSocketRequester.kt +++ b/rsocket-core/src/commonMain/kotlin/io/rsocket/kotlin/internal/RSocketRequester.kt @@ -35,7 +35,7 @@ internal class RSocketRequester( override val coroutineContext: CoroutineContext, private val sender: FrameSender, private val streamsStorage: StreamsStorage, - private val pool: ObjectPool + @Suppress("DEPRECATION") private val pool: ObjectPool, ) : RSocket { override suspend fun metadataPush(metadata: ByteReadPacket) { diff --git a/rsocket-core/src/commonMain/kotlin/io/rsocket/kotlin/internal/StreamsStorage.kt b/rsocket-core/src/commonMain/kotlin/io/rsocket/kotlin/internal/StreamsStorage.kt index 65aa2eb6..b33128a4 100644 --- a/rsocket-core/src/commonMain/kotlin/io/rsocket/kotlin/internal/StreamsStorage.kt +++ b/rsocket-core/src/commonMain/kotlin/io/rsocket/kotlin/internal/StreamsStorage.kt @@ -1,5 +1,5 @@ /* - * Copyright 2015-2022 the original author or authors. + * Copyright 2015-2023 the original author or authors. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -24,7 +24,7 @@ import kotlinx.atomicfu.locks.* internal class StreamsStorage( private val isServer: Boolean, - private val pool: ObjectPool + @Suppress("DEPRECATION") private val pool: ObjectPool, ) : SynchronizedObject() { private val streamId: StreamId = StreamId(isServer) private val handlers: IntMap = IntMap() diff --git a/rsocket-core/src/commonMain/kotlin/io/rsocket/kotlin/internal/handler/FrameHandler.kt b/rsocket-core/src/commonMain/kotlin/io/rsocket/kotlin/internal/handler/FrameHandler.kt index 2c531e65..da652c7d 100644 --- a/rsocket-core/src/commonMain/kotlin/io/rsocket/kotlin/internal/handler/FrameHandler.kt +++ b/rsocket-core/src/commonMain/kotlin/io/rsocket/kotlin/internal/handler/FrameHandler.kt @@ -1,5 +1,5 @@ /* - * Copyright 2015-2022 the original author or authors. + * Copyright 2015-2023 the original author or authors. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -23,7 +23,7 @@ import io.rsocket.kotlin.frame.* import io.rsocket.kotlin.payload.* import kotlinx.coroutines.* -internal abstract class FrameHandler(pool: ObjectPool) : Closeable { +internal abstract class FrameHandler(@Suppress("DEPRECATION") pool: ObjectPool) : Closeable { private val data = BytePacketBuilder(pool) private val metadata = BytePacketBuilder(pool) private var hasMetadata: Boolean = false @@ -73,7 +73,8 @@ internal interface SendFrameHandler { fun onSendFailed(cause: Throwable): Boolean // if true, then request is failed } -internal abstract class RequesterFrameHandler(pool: ObjectPool) : FrameHandler(pool), ReceiveFrameHandler { +internal abstract class RequesterFrameHandler(@Suppress("DEPRECATION") pool: ObjectPool) : FrameHandler(pool), + ReceiveFrameHandler { override fun handleCancel() { //should be called only for RC } @@ -83,7 +84,8 @@ internal abstract class RequesterFrameHandler(pool: ObjectPool) : F } } -internal abstract class ResponderFrameHandler(pool: ObjectPool) : FrameHandler(pool), SendFrameHandler { +internal abstract class ResponderFrameHandler(@Suppress("DEPRECATION") pool: ObjectPool) : FrameHandler(pool), + SendFrameHandler { protected var job: Job? = null protected abstract fun start(payload: Payload): Job diff --git a/rsocket-core/src/commonMain/kotlin/io/rsocket/kotlin/internal/handler/RequesterRequestChannelFrameHandler.kt b/rsocket-core/src/commonMain/kotlin/io/rsocket/kotlin/internal/handler/RequesterRequestChannelFrameHandler.kt index 43f0592d..8376de69 100644 --- a/rsocket-core/src/commonMain/kotlin/io/rsocket/kotlin/internal/handler/RequesterRequestChannelFrameHandler.kt +++ b/rsocket-core/src/commonMain/kotlin/io/rsocket/kotlin/internal/handler/RequesterRequestChannelFrameHandler.kt @@ -30,7 +30,7 @@ internal class RequesterRequestChannelFrameHandler( private val limiter: Limiter, private val sender: Job, private val channel: Channel, - pool: ObjectPool + @Suppress("DEPRECATION") pool: ObjectPool, ) : RequesterFrameHandler(pool), SendFrameHandler { override fun handleNext(payload: Payload) { diff --git a/rsocket-core/src/commonMain/kotlin/io/rsocket/kotlin/internal/handler/RequesterRequestResponseFrameHandler.kt b/rsocket-core/src/commonMain/kotlin/io/rsocket/kotlin/internal/handler/RequesterRequestResponseFrameHandler.kt index 85146fbb..28d68d2d 100644 --- a/rsocket-core/src/commonMain/kotlin/io/rsocket/kotlin/internal/handler/RequesterRequestResponseFrameHandler.kt +++ b/rsocket-core/src/commonMain/kotlin/io/rsocket/kotlin/internal/handler/RequesterRequestResponseFrameHandler.kt @@ -1,5 +1,5 @@ /* - * Copyright 2015-2022 the original author or authors. + * Copyright 2015-2023 the original author or authors. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -26,7 +26,7 @@ internal class RequesterRequestResponseFrameHandler( private val id: Int, private val streamsStorage: StreamsStorage, private val deferred: CompletableDeferred, - pool: ObjectPool + @Suppress("DEPRECATION") pool: ObjectPool, ) : RequesterFrameHandler(pool) { override fun handleNext(payload: Payload) { deferred.complete(payload) diff --git a/rsocket-core/src/commonMain/kotlin/io/rsocket/kotlin/internal/handler/RequesterRequestStreamFrameHandler.kt b/rsocket-core/src/commonMain/kotlin/io/rsocket/kotlin/internal/handler/RequesterRequestStreamFrameHandler.kt index fbc1bc32..6f3563e8 100644 --- a/rsocket-core/src/commonMain/kotlin/io/rsocket/kotlin/internal/handler/RequesterRequestStreamFrameHandler.kt +++ b/rsocket-core/src/commonMain/kotlin/io/rsocket/kotlin/internal/handler/RequesterRequestStreamFrameHandler.kt @@ -27,7 +27,7 @@ internal class RequesterRequestStreamFrameHandler( private val id: Int, private val streamsStorage: StreamsStorage, private val channel: Channel, - pool: ObjectPool + @Suppress("DEPRECATION") pool: ObjectPool, ) : RequesterFrameHandler(pool) { override fun handleNext(payload: Payload) { diff --git a/rsocket-core/src/commonMain/kotlin/io/rsocket/kotlin/internal/handler/ResponderFireAndForgetFrameHandler.kt b/rsocket-core/src/commonMain/kotlin/io/rsocket/kotlin/internal/handler/ResponderFireAndForgetFrameHandler.kt index f56cbdd1..6af21b9a 100644 --- a/rsocket-core/src/commonMain/kotlin/io/rsocket/kotlin/internal/handler/ResponderFireAndForgetFrameHandler.kt +++ b/rsocket-core/src/commonMain/kotlin/io/rsocket/kotlin/internal/handler/ResponderFireAndForgetFrameHandler.kt @@ -1,5 +1,5 @@ /* - * Copyright 2015-2022 the original author or authors. + * Copyright 2015-2023 the original author or authors. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -26,7 +26,7 @@ internal class ResponderFireAndForgetFrameHandler( private val id: Int, private val streamsStorage: StreamsStorage, private val responder: RSocketResponder, - pool: ObjectPool + @Suppress("DEPRECATION") pool: ObjectPool, ) : ResponderFrameHandler(pool) { override fun start(payload: Payload): Job = responder.handleFireAndForget(payload, this) diff --git a/rsocket-core/src/commonMain/kotlin/io/rsocket/kotlin/internal/handler/ResponderRequestChannelFrameHandler.kt b/rsocket-core/src/commonMain/kotlin/io/rsocket/kotlin/internal/handler/ResponderRequestChannelFrameHandler.kt index 888ad786..945f0965 100644 --- a/rsocket-core/src/commonMain/kotlin/io/rsocket/kotlin/internal/handler/ResponderRequestChannelFrameHandler.kt +++ b/rsocket-core/src/commonMain/kotlin/io/rsocket/kotlin/internal/handler/ResponderRequestChannelFrameHandler.kt @@ -30,7 +30,7 @@ internal class ResponderRequestChannelFrameHandler( private val streamsStorage: StreamsStorage, private val responder: RSocketResponder, initialRequest: Int, - pool: ObjectPool + @Suppress("DEPRECATION") pool: ObjectPool, ) : ResponderFrameHandler(pool), ReceiveFrameHandler { val limiter = Limiter(initialRequest) val channel = channelForCloseable(Channel.UNLIMITED) diff --git a/rsocket-core/src/commonMain/kotlin/io/rsocket/kotlin/internal/handler/ResponderRequestResponseFrameHandler.kt b/rsocket-core/src/commonMain/kotlin/io/rsocket/kotlin/internal/handler/ResponderRequestResponseFrameHandler.kt index b24a0c75..7ef3f00d 100644 --- a/rsocket-core/src/commonMain/kotlin/io/rsocket/kotlin/internal/handler/ResponderRequestResponseFrameHandler.kt +++ b/rsocket-core/src/commonMain/kotlin/io/rsocket/kotlin/internal/handler/ResponderRequestResponseFrameHandler.kt @@ -1,5 +1,5 @@ /* - * Copyright 2015-2022 the original author or authors. + * Copyright 2015-2023 the original author or authors. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -26,7 +26,7 @@ internal class ResponderRequestResponseFrameHandler( private val id: Int, private val streamsStorage: StreamsStorage, private val responder: RSocketResponder, - pool: ObjectPool + @Suppress("DEPRECATION") pool: ObjectPool, ) : ResponderFrameHandler(pool) { override fun start(payload: Payload): Job = responder.handleRequestResponse(payload, id, this) diff --git a/rsocket-core/src/commonMain/kotlin/io/rsocket/kotlin/internal/handler/ResponderRequestStreamFrameHandler.kt b/rsocket-core/src/commonMain/kotlin/io/rsocket/kotlin/internal/handler/ResponderRequestStreamFrameHandler.kt index 6cfa95a6..2441a164 100644 --- a/rsocket-core/src/commonMain/kotlin/io/rsocket/kotlin/internal/handler/ResponderRequestStreamFrameHandler.kt +++ b/rsocket-core/src/commonMain/kotlin/io/rsocket/kotlin/internal/handler/ResponderRequestStreamFrameHandler.kt @@ -1,5 +1,5 @@ /* - * Copyright 2015-2022 the original author or authors. + * Copyright 2015-2023 the original author or authors. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -27,7 +27,7 @@ internal class ResponderRequestStreamFrameHandler( private val streamsStorage: StreamsStorage, private val responder: RSocketResponder, initialRequest: Int, - pool: ObjectPool + @Suppress("DEPRECATION") pool: ObjectPool, ) : ResponderFrameHandler(pool) { val limiter = Limiter(initialRequest) diff --git a/rsocket-core/src/commonMain/kotlin/io/rsocket/kotlin/metadata/CompositeMetadata.kt b/rsocket-core/src/commonMain/kotlin/io/rsocket/kotlin/metadata/CompositeMetadata.kt index 3b40dd82..32d443fc 100644 --- a/rsocket-core/src/commonMain/kotlin/io/rsocket/kotlin/metadata/CompositeMetadata.kt +++ b/rsocket-core/src/commonMain/kotlin/io/rsocket/kotlin/metadata/CompositeMetadata.kt @@ -55,7 +55,7 @@ public sealed interface CompositeMetadata : Metadata { public companion object Reader : MetadataReader { override val mimeType: MimeType get() = WellKnownMimeType.MessageRSocketCompositeMetadata - override fun ByteReadPacket.read(pool: ObjectPool): CompositeMetadata { + override fun ByteReadPacket.read(@Suppress("DEPRECATION") pool: ObjectPool): CompositeMetadata { val list = mutableListOf() while (isNotEmpty) { val type = readMimeType() diff --git a/rsocket-core/src/commonMain/kotlin/io/rsocket/kotlin/metadata/CompositeMetadataExtensions.kt b/rsocket-core/src/commonMain/kotlin/io/rsocket/kotlin/metadata/CompositeMetadataExtensions.kt index 573742df..7f78b928 100644 --- a/rsocket-core/src/commonMain/kotlin/io/rsocket/kotlin/metadata/CompositeMetadataExtensions.kt +++ b/rsocket-core/src/commonMain/kotlin/io/rsocket/kotlin/metadata/CompositeMetadataExtensions.kt @@ -1,5 +1,5 @@ /* - * Copyright 2015-2022 the original author or authors. + * Copyright 2015-2023 the original author or authors. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -28,7 +28,7 @@ public fun CompositeMetadata.Entry.hasMimeTypeOf(reader: MetadataReader<*>): Boo @ExperimentalMetadataApi public fun CompositeMetadata.Entry.read( reader: MetadataReader, - pool: ObjectPool = ChunkBuffer.Pool + @Suppress("DEPRECATION") pool: ObjectPool = ChunkBuffer.Pool, ): M { if (mimeType == reader.mimeType) return content.read(reader, pool) @@ -39,7 +39,7 @@ public fun CompositeMetadata.Entry.read( @ExperimentalMetadataApi public fun CompositeMetadata.Entry.readOrNull( reader: MetadataReader, - pool: ObjectPool = ChunkBuffer.Pool + @Suppress("DEPRECATION") pool: ObjectPool = ChunkBuffer.Pool, ): M? { return if (mimeType == reader.mimeType) content.read(reader, pool) else null } diff --git a/rsocket-core/src/commonMain/kotlin/io/rsocket/kotlin/metadata/Metadata.kt b/rsocket-core/src/commonMain/kotlin/io/rsocket/kotlin/metadata/Metadata.kt index 5ab9b987..bfd3459b 100644 --- a/rsocket-core/src/commonMain/kotlin/io/rsocket/kotlin/metadata/Metadata.kt +++ b/rsocket-core/src/commonMain/kotlin/io/rsocket/kotlin/metadata/Metadata.kt @@ -1,5 +1,5 @@ /* - * Copyright 2015-2022 the original author or authors. + * Copyright 2015-2023 the original author or authors. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -33,7 +33,7 @@ public interface Metadata : Closeable { @ExperimentalMetadataApi public interface MetadataReader { public val mimeType: MimeType - public fun ByteReadPacket.read(pool: ObjectPool): M + public fun ByteReadPacket.read(@Suppress("DEPRECATION") pool: ObjectPool): M } @@ -43,11 +43,11 @@ public fun PayloadBuilder.metadata(metadata: Metadata): Unit = metadata(metadata @ExperimentalMetadataApi public fun ByteReadPacket.read( reader: MetadataReader, - pool: ObjectPool = ChunkBuffer.Pool + @Suppress("DEPRECATION") pool: ObjectPool = ChunkBuffer.Pool, ): M = use { with(reader) { read(pool) } } @ExperimentalMetadataApi -public fun Metadata.toPacket(pool: ObjectPool = ChunkBuffer.Pool): ByteReadPacket = +public fun Metadata.toPacket(@Suppress("DEPRECATION") pool: ObjectPool = ChunkBuffer.Pool): ByteReadPacket = buildPacket(pool) { writeSelf() } diff --git a/rsocket-core/src/commonMain/kotlin/io/rsocket/kotlin/metadata/PerStreamAcceptableDataMimeTypesMetadata.kt b/rsocket-core/src/commonMain/kotlin/io/rsocket/kotlin/metadata/PerStreamAcceptableDataMimeTypesMetadata.kt index c0fc8a79..df8d9f99 100644 --- a/rsocket-core/src/commonMain/kotlin/io/rsocket/kotlin/metadata/PerStreamAcceptableDataMimeTypesMetadata.kt +++ b/rsocket-core/src/commonMain/kotlin/io/rsocket/kotlin/metadata/PerStreamAcceptableDataMimeTypesMetadata.kt @@ -1,5 +1,5 @@ /* - * Copyright 2015-2022 the original author or authors. + * Copyright 2015-2023 the original author or authors. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -41,7 +41,7 @@ public class PerStreamAcceptableDataMimeTypesMetadata(public val types: List { override val mimeType: MimeType get() = WellKnownMimeType.MessageRSocketAcceptMimeTypes - override fun ByteReadPacket.read(pool: ObjectPool): PerStreamAcceptableDataMimeTypesMetadata { + override fun ByteReadPacket.read(@Suppress("DEPRECATION") pool: ObjectPool): PerStreamAcceptableDataMimeTypesMetadata { val list = mutableListOf() while (isNotEmpty) list.add(readMimeType()) return PerStreamAcceptableDataMimeTypesMetadata(list.toList()) diff --git a/rsocket-core/src/commonMain/kotlin/io/rsocket/kotlin/metadata/PerStreamDataMimeTypeMetadata.kt b/rsocket-core/src/commonMain/kotlin/io/rsocket/kotlin/metadata/PerStreamDataMimeTypeMetadata.kt index 90efad3e..45e1d3a5 100644 --- a/rsocket-core/src/commonMain/kotlin/io/rsocket/kotlin/metadata/PerStreamDataMimeTypeMetadata.kt +++ b/rsocket-core/src/commonMain/kotlin/io/rsocket/kotlin/metadata/PerStreamDataMimeTypeMetadata.kt @@ -1,5 +1,5 @@ /* - * Copyright 2015-2022 the original author or authors. + * Copyright 2015-2023 the original author or authors. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -35,7 +35,7 @@ public class PerStreamDataMimeTypeMetadata(public val type: MimeType) : Metadata public companion object Reader : MetadataReader { override val mimeType: MimeType get() = WellKnownMimeType.MessageRSocketMimeType - override fun ByteReadPacket.read(pool: ObjectPool): PerStreamDataMimeTypeMetadata = + override fun ByteReadPacket.read(@Suppress("DEPRECATION") pool: ObjectPool): PerStreamDataMimeTypeMetadata = PerStreamDataMimeTypeMetadata(readMimeType()) } } diff --git a/rsocket-core/src/commonMain/kotlin/io/rsocket/kotlin/metadata/RawMetadata.kt b/rsocket-core/src/commonMain/kotlin/io/rsocket/kotlin/metadata/RawMetadata.kt index c24727f9..1c6dbfe9 100644 --- a/rsocket-core/src/commonMain/kotlin/io/rsocket/kotlin/metadata/RawMetadata.kt +++ b/rsocket-core/src/commonMain/kotlin/io/rsocket/kotlin/metadata/RawMetadata.kt @@ -1,5 +1,5 @@ /* - * Copyright 2015-2022 the original author or authors. + * Copyright 2015-2023 the original author or authors. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -37,7 +37,7 @@ public class RawMetadata( } private class Reader(override val mimeType: MimeType) : MetadataReader { - override fun ByteReadPacket.read(pool: ObjectPool): RawMetadata = + override fun ByteReadPacket.read(@Suppress("DEPRECATION") pool: ObjectPool): RawMetadata = RawMetadata(mimeType, readPacket(pool)) } diff --git a/rsocket-core/src/commonMain/kotlin/io/rsocket/kotlin/metadata/RoutingMetadata.kt b/rsocket-core/src/commonMain/kotlin/io/rsocket/kotlin/metadata/RoutingMetadata.kt index 2a726b28..eb961d06 100644 --- a/rsocket-core/src/commonMain/kotlin/io/rsocket/kotlin/metadata/RoutingMetadata.kt +++ b/rsocket-core/src/commonMain/kotlin/io/rsocket/kotlin/metadata/RoutingMetadata.kt @@ -1,5 +1,5 @@ /* - * Copyright 2015-2022 the original author or authors. + * Copyright 2015-2023 the original author or authors. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -47,7 +47,7 @@ public class RoutingMetadata(public val tags: List) : Metadata { public companion object Reader : MetadataReader { override val mimeType: MimeType get() = WellKnownMimeType.MessageRSocketRouting - override fun ByteReadPacket.read(pool: ObjectPool): RoutingMetadata { + override fun ByteReadPacket.read(@Suppress("DEPRECATION") pool: ObjectPool): RoutingMetadata { val list = mutableListOf() while (isNotEmpty) { val length = readByte().toInt() and 0xFF diff --git a/rsocket-core/src/commonMain/kotlin/io/rsocket/kotlin/metadata/ZipkinTracingMetadata.kt b/rsocket-core/src/commonMain/kotlin/io/rsocket/kotlin/metadata/ZipkinTracingMetadata.kt index 059fade2..f75f16ee 100644 --- a/rsocket-core/src/commonMain/kotlin/io/rsocket/kotlin/metadata/ZipkinTracingMetadata.kt +++ b/rsocket-core/src/commonMain/kotlin/io/rsocket/kotlin/metadata/ZipkinTracingMetadata.kt @@ -1,5 +1,5 @@ /* - * Copyright 2015-2022 the original author or authors. + * Copyright 2015-2023 the original author or authors. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -71,7 +71,7 @@ public class ZipkinTracingMetadata internal constructor( public companion object Reader : MetadataReader { override val mimeType: MimeType get() = WellKnownMimeType.MessageRSocketTracingZipkin - override fun ByteReadPacket.read(pool: ObjectPool): ZipkinTracingMetadata { + override fun ByteReadPacket.read(@Suppress("DEPRECATION") pool: ObjectPool): ZipkinTracingMetadata { val flags = readByte() val kind = when { diff --git a/rsocket-core/src/commonMain/kotlin/io/rsocket/kotlin/metadata/security/AuthMetadata.kt b/rsocket-core/src/commonMain/kotlin/io/rsocket/kotlin/metadata/security/AuthMetadata.kt index 9ddf5566..dfc8a5b6 100644 --- a/rsocket-core/src/commonMain/kotlin/io/rsocket/kotlin/metadata/security/AuthMetadata.kt +++ b/rsocket-core/src/commonMain/kotlin/io/rsocket/kotlin/metadata/security/AuthMetadata.kt @@ -1,5 +1,5 @@ /* - * Copyright 2015-2022 the original author or authors. + * Copyright 2015-2023 the original author or authors. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -39,10 +39,10 @@ public sealed interface AuthMetadata : Metadata { @ExperimentalMetadataApi public sealed interface AuthMetadataReader : MetadataReader { - public fun ByteReadPacket.readContent(type: AuthType, pool: ObjectPool): AM + public fun ByteReadPacket.readContent(type: AuthType, @Suppress("DEPRECATION") pool: ObjectPool): AM override val mimeType: MimeType get() = WellKnownMimeType.MessageRSocketAuthentication - override fun ByteReadPacket.read(pool: ObjectPool): AM { + override fun ByteReadPacket.read(@Suppress("DEPRECATION") pool: ObjectPool): AM { val type = readAuthType() return readContent(type, pool) } diff --git a/rsocket-core/src/commonMain/kotlin/io/rsocket/kotlin/metadata/security/BearerAuthMetadata.kt b/rsocket-core/src/commonMain/kotlin/io/rsocket/kotlin/metadata/security/BearerAuthMetadata.kt index 99af4fd0..3d239bd2 100644 --- a/rsocket-core/src/commonMain/kotlin/io/rsocket/kotlin/metadata/security/BearerAuthMetadata.kt +++ b/rsocket-core/src/commonMain/kotlin/io/rsocket/kotlin/metadata/security/BearerAuthMetadata.kt @@ -1,5 +1,5 @@ /* - * Copyright 2015-2022 the original author or authors. + * Copyright 2015-2023 the original author or authors. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -33,7 +33,10 @@ public class BearerAuthMetadata( override fun close(): Unit = Unit public companion object Reader : AuthMetadataReader { - override fun ByteReadPacket.readContent(type: AuthType, pool: ObjectPool): BearerAuthMetadata { + override fun ByteReadPacket.readContent( + type: AuthType, + @Suppress("DEPRECATION") pool: ObjectPool, + ): BearerAuthMetadata { require(type == WellKnowAuthType.Bearer) { "Metadata auth type should be 'bearer'" } val token = readText() return BearerAuthMetadata(token) diff --git a/rsocket-core/src/commonMain/kotlin/io/rsocket/kotlin/metadata/security/RawAuthMetadata.kt b/rsocket-core/src/commonMain/kotlin/io/rsocket/kotlin/metadata/security/RawAuthMetadata.kt index b6cd43e1..95ffaef9 100644 --- a/rsocket-core/src/commonMain/kotlin/io/rsocket/kotlin/metadata/security/RawAuthMetadata.kt +++ b/rsocket-core/src/commonMain/kotlin/io/rsocket/kotlin/metadata/security/RawAuthMetadata.kt @@ -1,5 +1,5 @@ /* - * Copyright 2015-2022 the original author or authors. + * Copyright 2015-2023 the original author or authors. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -37,7 +37,7 @@ public class RawAuthMetadata( } public companion object Reader : AuthMetadataReader { - override fun ByteReadPacket.readContent(type: AuthType, pool: ObjectPool): RawAuthMetadata { + override fun ByteReadPacket.readContent(type: AuthType, @Suppress("DEPRECATION") pool: ObjectPool): RawAuthMetadata { val content = readPacket(pool) return RawAuthMetadata(type, content) } @@ -50,7 +50,7 @@ public fun RawAuthMetadata.hasAuthTypeOf(reader: AuthMetadataReader<*>): Boolean @ExperimentalMetadataApi public fun RawAuthMetadata.read( reader: AuthMetadataReader, - pool: ObjectPool = ChunkBuffer.Pool + @Suppress("DEPRECATION") pool: ObjectPool = ChunkBuffer.Pool, ): AM { return readOrNull(reader, pool) ?: run { content.close() @@ -62,7 +62,7 @@ public fun RawAuthMetadata.read( public fun RawAuthMetadata.readOrNull( reader: AuthMetadataReader, - pool: ObjectPool = ChunkBuffer.Pool + @Suppress("DEPRECATION") pool: ObjectPool = ChunkBuffer.Pool, ): AM? { if (type != reader.mimeType) return null diff --git a/rsocket-core/src/commonMain/kotlin/io/rsocket/kotlin/metadata/security/SimpleAuthMetadata.kt b/rsocket-core/src/commonMain/kotlin/io/rsocket/kotlin/metadata/security/SimpleAuthMetadata.kt index 2e622fe6..9535ed0e 100644 --- a/rsocket-core/src/commonMain/kotlin/io/rsocket/kotlin/metadata/security/SimpleAuthMetadata.kt +++ b/rsocket-core/src/commonMain/kotlin/io/rsocket/kotlin/metadata/security/SimpleAuthMetadata.kt @@ -1,5 +1,5 @@ /* - * Copyright 2015-2022 the original author or authors. + * Copyright 2015-2023 the original author or authors. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -43,7 +43,10 @@ public class SimpleAuthMetadata( override fun close(): Unit = Unit public companion object Reader : AuthMetadataReader { - override fun ByteReadPacket.readContent(type: AuthType, pool: ObjectPool): SimpleAuthMetadata { + override fun ByteReadPacket.readContent( + type: AuthType, + @Suppress("DEPRECATION") pool: ObjectPool, + ): SimpleAuthMetadata { require(type == WellKnowAuthType.Simple) { "Metadata auth type should be 'simple'" } val length = readShort().toInt() val username = readTextExactBytes(length) diff --git a/rsocket-core/src/commonTest/kotlin/io/rsocket/kotlin/TestConnection.kt b/rsocket-core/src/commonTest/kotlin/io/rsocket/kotlin/TestConnection.kt index 31ebe822..faedfb73 100644 --- a/rsocket-core/src/commonTest/kotlin/io/rsocket/kotlin/TestConnection.kt +++ b/rsocket-core/src/commonTest/kotlin/io/rsocket/kotlin/TestConnection.kt @@ -18,8 +18,6 @@ package io.rsocket.kotlin import app.cash.turbine.* import io.ktor.utils.io.core.* -import io.ktor.utils.io.core.internal.* -import io.ktor.utils.io.pool.* import io.rsocket.kotlin.frame.* import io.rsocket.kotlin.internal.io.* import io.rsocket.kotlin.test.* @@ -33,7 +31,6 @@ import kotlin.time.* import kotlin.time.Duration.Companion.seconds class TestConnection : Connection, ClientTransport { - override val pool: ObjectPool = InUseTrackingPool override val coroutineContext: CoroutineContext = Job() + Dispatchers.Unconfined + TestExceptionHandler diff --git a/rsocket-core/src/commonTest/kotlin/io/rsocket/kotlin/core/RSocketTest.kt b/rsocket-core/src/commonTest/kotlin/io/rsocket/kotlin/core/RSocketTest.kt index 92229147..370933fa 100644 --- a/rsocket-core/src/commonTest/kotlin/io/rsocket/kotlin/core/RSocketTest.kt +++ b/rsocket-core/src/commonTest/kotlin/io/rsocket/kotlin/core/RSocketTest.kt @@ -1,5 +1,5 @@ /* - * Copyright 2015-2022 the original author or authors. + * Copyright 2015-2023 the original author or authors. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -41,7 +41,7 @@ class RSocketTest : SuspendTest, TestWithLeakCheck { private suspend fun start(handler: RSocket? = null): RSocket { val localServer = TestServer().bindIn( CoroutineScope(Dispatchers.Unconfined + testJob + TestExceptionHandler), - LocalServerTransport(InUseTrackingPool) + LocalServerTransport() ) { handler ?: RSocketRequestHandler { requestResponse { it } diff --git a/rsocket-ktor-client/api/rsocket-ktor-client.api b/rsocket-ktor-client/api/rsocket-ktor-client.api index ab614712..aa6779b3 100644 --- a/rsocket-ktor-client/api/rsocket-ktor-client.api +++ b/rsocket-ktor-client/api/rsocket-ktor-client.api @@ -8,7 +8,7 @@ public final class io/rsocket/kotlin/ktor/client/BuildersKt { public final class io/rsocket/kotlin/ktor/client/RSocketSupport { public static final field Plugin Lio/rsocket/kotlin/ktor/client/RSocketSupport$Plugin; - public synthetic fun (Lio/rsocket/kotlin/core/RSocketConnector;Lio/ktor/utils/io/pool/ObjectPool;Lkotlin/jvm/internal/DefaultConstructorMarker;)V + public synthetic fun (Lio/rsocket/kotlin/core/RSocketConnector;Lkotlin/jvm/internal/DefaultConstructorMarker;)V } public final class io/rsocket/kotlin/ktor/client/RSocketSupport$Config { diff --git a/rsocket-ktor-client/src/commonMain/kotlin/io/rsocket/kotlin/ktor/client/Builders.kt b/rsocket-ktor-client/src/commonMain/kotlin/io/rsocket/kotlin/ktor/client/Builders.kt index 5d1e6248..64741917 100644 --- a/rsocket-ktor-client/src/commonMain/kotlin/io/rsocket/kotlin/ktor/client/Builders.kt +++ b/rsocket-ktor-client/src/commonMain/kotlin/io/rsocket/kotlin/ktor/client/Builders.kt @@ -21,8 +21,6 @@ import io.ktor.client.plugins.* import io.ktor.client.plugins.websocket.* import io.ktor.client.request.* import io.ktor.http.* -import io.ktor.utils.io.core.internal.* -import io.ktor.utils.io.pool.* import io.rsocket.kotlin.* import io.rsocket.kotlin.transport.* import io.rsocket.kotlin.transport.ktor.websocket.* @@ -31,7 +29,7 @@ import kotlin.coroutines.* public suspend fun HttpClient.rSocket( request: HttpRequestBuilder.() -> Unit, ): RSocket = plugin(RSocketSupport).run { - connector.connect(KtorClientTransport(this@rSocket, request, bufferPool)) + connector.connect(KtorClientTransport(this@rSocket, request)) } public suspend fun HttpClient.rSocket( @@ -65,10 +63,9 @@ public suspend fun HttpClient.rSocket( private class KtorClientTransport( private val client: HttpClient, private val request: HttpRequestBuilder.() -> Unit, - private val pool: ObjectPool ) : ClientTransport { override val coroutineContext: CoroutineContext get() = client.coroutineContext @TransportApi - override suspend fun connect(): Connection = WebSocketConnection(client.webSocketSession(request), pool) + override suspend fun connect(): Connection = WebSocketConnection(client.webSocketSession(request)) } diff --git a/rsocket-ktor-client/src/commonMain/kotlin/io/rsocket/kotlin/ktor/client/RSocketSupport.kt b/rsocket-ktor-client/src/commonMain/kotlin/io/rsocket/kotlin/ktor/client/RSocketSupport.kt index 4ae70b04..1208bd78 100644 --- a/rsocket-ktor-client/src/commonMain/kotlin/io/rsocket/kotlin/ktor/client/RSocketSupport.kt +++ b/rsocket-ktor-client/src/commonMain/kotlin/io/rsocket/kotlin/ktor/client/RSocketSupport.kt @@ -26,10 +26,11 @@ import io.rsocket.kotlin.core.* public class RSocketSupport private constructor( internal val connector: RSocketConnector, - internal val bufferPool: ObjectPool ) { public class Config internal constructor() { + @Suppress("DEPRECATION") + @Deprecated("Not used anymore", level = DeprecationLevel.ERROR) public var bufferPool: ObjectPool = ChunkBuffer.Pool public var connector: RSocketConnector = RSocketConnector() public fun connector(block: RSocketConnectorBuilder.() -> Unit) { @@ -41,7 +42,7 @@ public class RSocketSupport private constructor( override val key: AttributeKey = AttributeKey("RSocket") override fun prepare(block: Config.() -> Unit): RSocketSupport = Config().run { block() - RSocketSupport(connector, bufferPool) + RSocketSupport(connector) } override fun install(plugin: RSocketSupport, scope: HttpClient) { diff --git a/rsocket-ktor-server/api/rsocket-ktor-server.api b/rsocket-ktor-server/api/rsocket-ktor-server.api index 08f142c4..118f542e 100644 --- a/rsocket-ktor-server/api/rsocket-ktor-server.api +++ b/rsocket-ktor-server/api/rsocket-ktor-server.api @@ -1,6 +1,6 @@ public final class io/rsocket/kotlin/ktor/server/RSocketSupport { public static final field Feature Lio/rsocket/kotlin/ktor/server/RSocketSupport$Feature; - public synthetic fun (Lio/rsocket/kotlin/core/RSocketServer;Lio/ktor/utils/io/pool/ObjectPool;Lkotlin/jvm/internal/DefaultConstructorMarker;)V + public synthetic fun (Lio/rsocket/kotlin/core/RSocketServer;Lkotlin/jvm/internal/DefaultConstructorMarker;)V } public final class io/rsocket/kotlin/ktor/server/RSocketSupport$Config { diff --git a/rsocket-ktor-server/src/commonMain/kotlin/io/rsocket/kotlin/ktor/server/RSocketSupport.kt b/rsocket-ktor-server/src/commonMain/kotlin/io/rsocket/kotlin/ktor/server/RSocketSupport.kt index 8f179318..06c79490 100644 --- a/rsocket-ktor-server/src/commonMain/kotlin/io/rsocket/kotlin/ktor/server/RSocketSupport.kt +++ b/rsocket-ktor-server/src/commonMain/kotlin/io/rsocket/kotlin/ktor/server/RSocketSupport.kt @@ -25,9 +25,10 @@ import io.rsocket.kotlin.core.* public class RSocketSupport private constructor( internal val server: RSocketServer, - internal val bufferPool: ObjectPool ) { public class Config internal constructor() { + @Suppress("DEPRECATION") + @Deprecated("Not used anymore", level = DeprecationLevel.ERROR) public var bufferPool: ObjectPool = ChunkBuffer.Pool public var server: RSocketServer = RSocketServer() public fun server(block: RSocketServerBuilder.() -> Unit) { @@ -43,7 +44,7 @@ public class RSocketSupport private constructor( return Config().run { configure() - RSocketSupport(server, bufferPool) + RSocketSupport(server) } } } diff --git a/rsocket-ktor-server/src/commonMain/kotlin/io/rsocket/kotlin/ktor/server/Routing.kt b/rsocket-ktor-server/src/commonMain/kotlin/io/rsocket/kotlin/ktor/server/Routing.kt index 7cd102fa..1de2a2b6 100644 --- a/rsocket-ktor-server/src/commonMain/kotlin/io/rsocket/kotlin/ktor/server/Routing.kt +++ b/rsocket-ktor-server/src/commonMain/kotlin/io/rsocket/kotlin/ktor/server/Routing.kt @@ -19,8 +19,6 @@ package io.rsocket.kotlin.ktor.server import io.ktor.server.application.* import io.ktor.server.routing.* import io.ktor.server.websocket.* -import io.ktor.utils.io.core.internal.* -import io.ktor.utils.io.pool.* import io.rsocket.kotlin.* import io.rsocket.kotlin.transport.* import io.rsocket.kotlin.transport.ktor.websocket.* @@ -31,19 +29,18 @@ public fun Route.rSocket( protocol: String? = null, acceptor: ConnectionAcceptor, ): Unit = application.plugin(RSocketSupport).run { - server.bindIn(application, KtorServerTransport(this@rSocket, path, protocol, bufferPool), acceptor) + server.bindIn(application, KtorServerTransport(this@rSocket, path, protocol), acceptor) } private class KtorServerTransport( private val route: Route, private val path: String?, private val protocol: String?, - private val pool: ObjectPool ) : ServerTransport { @TransportApi override fun CoroutineScope.start(accept: suspend CoroutineScope.(Connection) -> Unit) { val handler: suspend DefaultWebSocketServerSession.() -> Unit = { - val connection = WebSocketConnection(this, pool) + val connection = WebSocketConnection(this) accept(connection) } when (path) { diff --git a/rsocket-test/src/commonMain/kotlin/io/rsocket/kotlin/test/InUseTrackingPool.kt b/rsocket-test/src/commonMain/kotlin/io/rsocket/kotlin/test/InUseTrackingPool.kt index 493fd51e..60f24da7 100644 --- a/rsocket-test/src/commonMain/kotlin/io/rsocket/kotlin/test/InUseTrackingPool.kt +++ b/rsocket-test/src/commonMain/kotlin/io/rsocket/kotlin/test/InUseTrackingPool.kt @@ -1,5 +1,5 @@ /* - * Copyright 2015-2022 the original author or authors. + * Copyright 2015-2023 the original author or authors. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -23,6 +23,7 @@ import io.ktor.utils.io.pool.* import kotlinx.atomicfu.locks.* import kotlin.test.* +@Suppress("DEPRECATION") object InUseTrackingPool : ObjectPool, SynchronizedObject() { override val capacity: Int get() = BufferPool.capacity private val inUse = mutableSetOf() @@ -121,6 +122,7 @@ object InUseTrackingPool : ObjectPool, SynchronizedObject() { } } +@Suppress("DEPRECATION") private class IdentityWrapper( private val instance: ChunkBuffer, val throwable: Throwable? diff --git a/rsocket-test/src/commonMain/kotlin/io/rsocket/kotlin/test/Packets.kt b/rsocket-test/src/commonMain/kotlin/io/rsocket/kotlin/test/Packets.kt index 2f6df274..d5fad559 100644 --- a/rsocket-test/src/commonMain/kotlin/io/rsocket/kotlin/test/Packets.kt +++ b/rsocket-test/src/commonMain/kotlin/io/rsocket/kotlin/test/Packets.kt @@ -36,7 +36,10 @@ fun assertBytesEquals(expected: ByteArray?, actual: ByteArray?) { assertEquals(expected?.toHexString(), actual?.toHexString()) } -private inline fun buildPacket(pool: ObjectPool, block: BytePacketBuilder.() -> Unit): ByteReadPacket { +private inline fun buildPacket( + @Suppress("DEPRECATION") pool: ObjectPool, + block: BytePacketBuilder.() -> Unit, +): ByteReadPacket { val builder = BytePacketBuilder(pool) try { block(builder) diff --git a/rsocket-test/src/commonMain/kotlin/io/rsocket/kotlin/test/TestConfig.kt b/rsocket-test/src/commonMain/kotlin/io/rsocket/kotlin/test/TestConfig.kt index 97c538d6..96c77d9c 100644 --- a/rsocket-test/src/commonMain/kotlin/io/rsocket/kotlin/test/TestConfig.kt +++ b/rsocket-test/src/commonMain/kotlin/io/rsocket/kotlin/test/TestConfig.kt @@ -1,5 +1,5 @@ /* - * Copyright 2015-2022 the original author or authors. + * Copyright 2015-2023 the original author or authors. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -14,7 +14,7 @@ * limitations under the License. */ -@file:Suppress("FunctionName") +@file:Suppress("FunctionName", "DEPRECATION_ERROR") package io.rsocket.kotlin.test @@ -23,8 +23,9 @@ import io.rsocket.kotlin.logging.* fun TestServer( logging: Boolean = true, - block: RSocketServerBuilder.() -> Unit = {} + block: RSocketServerBuilder.() -> Unit = {}, ): RSocketServer = RSocketServer { + bufferPool = InUseTrackingPool loggerFactory = if (logging) { LoggerFactory { PrintLogger.withLevel(LoggingLevel.DEBUG).logger("SERVER |$it") } } else { @@ -35,8 +36,9 @@ fun TestServer( fun TestConnector( logging: Boolean = true, - block: RSocketConnectorBuilder.() -> Unit = {} + block: RSocketConnectorBuilder.() -> Unit = {}, ): RSocketConnector = RSocketConnector { + bufferPool = InUseTrackingPool loggerFactory = if (logging) { LoggerFactory { PrintLogger.withLevel(LoggingLevel.DEBUG).logger("CLIENT |$it") } } else { diff --git a/rsocket-transport-ktor-tcp/src/commonMain/kotlin/io/rsocket/kotlin/transport/ktor/tcp/TcpClientTransport.kt b/rsocket-transport-ktor-tcp/src/commonMain/kotlin/io/rsocket/kotlin/transport/ktor/tcp/TcpClientTransport.kt index c5275c2e..154d5cd0 100644 --- a/rsocket-transport-ktor-tcp/src/commonMain/kotlin/io/rsocket/kotlin/transport/ktor/tcp/TcpClientTransport.kt +++ b/rsocket-transport-ktor-tcp/src/commonMain/kotlin/io/rsocket/kotlin/transport/ktor/tcp/TcpClientTransport.kt @@ -36,17 +36,17 @@ internal expect val defaultDispatcher: CoroutineDispatcher public fun TcpClientTransport( hostname: String, port: Int, context: CoroutineContext = EmptyCoroutineContext, - pool: ObjectPool = ChunkBuffer.Pool, + @Suppress("DEPRECATION") pool: ObjectPool = ChunkBuffer.Pool, intercept: (Socket) -> Socket = { it }, //f.e. for tls, which is currently supported by ktor only on JVM - configure: SocketOptions.TCPClientSocketOptions.() -> Unit = {} + configure: SocketOptions.TCPClientSocketOptions.() -> Unit = {}, ): ClientTransport = TcpClientTransport(InetSocketAddress(hostname, port), context, pool, intercept, configure) public fun TcpClientTransport( remoteAddress: InetSocketAddress, context: CoroutineContext = EmptyCoroutineContext, - pool: ObjectPool = ChunkBuffer.Pool, + @Suppress("DEPRECATION", "UNUSED_PARAMETER") pool: ObjectPool = ChunkBuffer.Pool, intercept: (Socket) -> Socket = { it }, //f.e. for tls, which is currently supported by ktor only on JVM - configure: SocketOptions.TCPClientSocketOptions.() -> Unit = {} + configure: SocketOptions.TCPClientSocketOptions.() -> Unit = {}, ): ClientTransport { val transportJob = SupervisorJob(context[Job]) val transportContext = defaultDispatcher + context + transportJob + CoroutineName("rSocket-tcp-client") @@ -54,6 +54,6 @@ public fun TcpClientTransport( Job(transportJob).invokeOnCompletion { selector.close() } return ClientTransport(transportContext) { val socket = aSocket(selector).tcp().connect(remoteAddress, configure) - TcpConnection(intercept(socket), transportContext + Job(transportJob), pool) + TcpConnection(intercept(socket), transportContext + Job(transportJob)) } } diff --git a/rsocket-transport-ktor-tcp/src/commonMain/kotlin/io/rsocket/kotlin/transport/ktor/tcp/TcpConnection.kt b/rsocket-transport-ktor-tcp/src/commonMain/kotlin/io/rsocket/kotlin/transport/ktor/tcp/TcpConnection.kt index 1fb18421..e325fb07 100644 --- a/rsocket-transport-ktor-tcp/src/commonMain/kotlin/io/rsocket/kotlin/transport/ktor/tcp/TcpConnection.kt +++ b/rsocket-transport-ktor-tcp/src/commonMain/kotlin/io/rsocket/kotlin/transport/ktor/tcp/TcpConnection.kt @@ -20,8 +20,6 @@ import io.ktor.network.sockets.* import io.ktor.util.cio.* import io.ktor.utils.io.* import io.ktor.utils.io.core.* -import io.ktor.utils.io.core.internal.* -import io.ktor.utils.io.pool.* import io.rsocket.kotlin.* import io.rsocket.kotlin.Connection import io.rsocket.kotlin.internal.io.* @@ -32,7 +30,6 @@ import kotlin.coroutines.* internal class TcpConnection( socket: Socket, override val coroutineContext: CoroutineContext, - override val pool: ObjectPool ) : Connection { private val socketConnection = socket.connection() diff --git a/rsocket-transport-ktor-tcp/src/commonMain/kotlin/io/rsocket/kotlin/transport/ktor/tcp/TcpServerTransport.kt b/rsocket-transport-ktor-tcp/src/commonMain/kotlin/io/rsocket/kotlin/transport/ktor/tcp/TcpServerTransport.kt index 4552778d..e21f4e8d 100644 --- a/rsocket-transport-ktor-tcp/src/commonMain/kotlin/io/rsocket/kotlin/transport/ktor/tcp/TcpServerTransport.kt +++ b/rsocket-transport-ktor-tcp/src/commonMain/kotlin/io/rsocket/kotlin/transport/ktor/tcp/TcpServerTransport.kt @@ -35,13 +35,13 @@ public class TcpServer internal constructor( public fun TcpServerTransport( hostname: String = "0.0.0.0", port: Int = 0, - pool: ObjectPool = ChunkBuffer.Pool, + @Suppress("DEPRECATION") pool: ObjectPool = ChunkBuffer.Pool, configure: SocketOptions.AcceptorOptions.() -> Unit = {}, ): ServerTransport = TcpServerTransport(InetSocketAddress(hostname, port), pool, configure) public fun TcpServerTransport( localAddress: InetSocketAddress? = null, - pool: ObjectPool = ChunkBuffer.Pool, + @Suppress("DEPRECATION", "UNUSED_PARAMETER") pool: ObjectPool = ChunkBuffer.Pool, configure: SocketOptions.AcceptorOptions.() -> Unit = {}, ): ServerTransport = ServerTransport { accept -> val serverSocketDeferred = CompletableDeferred() @@ -54,7 +54,7 @@ public fun TcpServerTransport( while (isActive) { val clientSocket = serverSocket.accept() connectionScope.launch { - accept(TcpConnection(clientSocket, coroutineContext, pool)) + accept(TcpConnection(clientSocket, coroutineContext)) }.invokeOnCompletion { clientSocket.close() } diff --git a/rsocket-transport-ktor-tcp/src/commonTest/kotlin/io/rsocket/kotlin/transport/ktor/tcp/TcpServerTest.kt b/rsocket-transport-ktor-tcp/src/commonTest/kotlin/io/rsocket/kotlin/transport/ktor/tcp/TcpServerTest.kt index 08a469e3..67692766 100644 --- a/rsocket-transport-ktor-tcp/src/commonTest/kotlin/io/rsocket/kotlin/transport/ktor/tcp/TcpServerTest.kt +++ b/rsocket-transport-ktor-tcp/src/commonTest/kotlin/io/rsocket/kotlin/transport/ktor/tcp/TcpServerTest.kt @@ -27,8 +27,8 @@ class TcpServerTest : SuspendTest, TestWithLeakCheck { private val testJob = Job() private val testContext = testJob + TestExceptionHandler private val address = InetSocketAddress("0.0.0.0", PortProvider.next()) - private val serverTransport = TcpServerTransport(address, InUseTrackingPool) - private val clientTransport = TcpClientTransport(address, testContext, InUseTrackingPool) + private val serverTransport = TcpServerTransport(address) + private val clientTransport = TcpClientTransport(address, testContext) override suspend fun after() { testJob.cancelAndJoin() diff --git a/rsocket-transport-ktor-tcp/src/commonTest/kotlin/io/rsocket/kotlin/transport/ktor/tcp/TcpTransportTest.kt b/rsocket-transport-ktor-tcp/src/commonTest/kotlin/io/rsocket/kotlin/transport/ktor/tcp/TcpTransportTest.kt index eaa7ba30..1d4af5ec 100644 --- a/rsocket-transport-ktor-tcp/src/commonTest/kotlin/io/rsocket/kotlin/transport/ktor/tcp/TcpTransportTest.kt +++ b/rsocket-transport-ktor-tcp/src/commonTest/kotlin/io/rsocket/kotlin/transport/ktor/tcp/TcpTransportTest.kt @@ -17,13 +17,12 @@ package io.rsocket.kotlin.transport.ktor.tcp import io.ktor.network.sockets.* -import io.rsocket.kotlin.test.* import io.rsocket.kotlin.transport.tests.* class TcpTransportTest : TransportTest() { override suspend fun before() { val address = InetSocketAddress("0.0.0.0", PortProvider.next()) - startServer(TcpServerTransport(address, InUseTrackingPool)).serverSocket.await() - client = connectClient(TcpClientTransport(address, testContext, InUseTrackingPool)) + startServer(TcpServerTransport(address)).serverSocket.await() + client = connectClient(TcpClientTransport(address, testContext)) } } diff --git a/rsocket-transport-ktor-websocket-client/src/commonMain/kotlin/io/rsocket/kotlin/transport/ktor/websocket/client/WebSocketClientTransport.kt b/rsocket-transport-ktor-websocket-client/src/commonMain/kotlin/io/rsocket/kotlin/transport/ktor/websocket/client/WebSocketClientTransport.kt index 5e5c03f1..2982801c 100644 --- a/rsocket-transport-ktor-websocket-client/src/commonMain/kotlin/io/rsocket/kotlin/transport/ktor/websocket/client/WebSocketClientTransport.kt +++ b/rsocket-transport-ktor-websocket-client/src/commonMain/kotlin/io/rsocket/kotlin/transport/ktor/websocket/client/WebSocketClientTransport.kt @@ -37,10 +37,10 @@ import kotlin.coroutines.* public fun WebSocketClientTransport( engineFactory: HttpClientEngineFactory, context: CoroutineContext = EmptyCoroutineContext, - pool: ObjectPool = ChunkBuffer.Pool, + @Suppress("DEPRECATION", "UNUSED_PARAMETER") pool: ObjectPool = ChunkBuffer.Pool, engine: T.() -> Unit = {}, webSockets: WebSockets.Config.() -> Unit = {}, - request: HttpRequestBuilder.() -> Unit + request: HttpRequestBuilder.() -> Unit, ): ClientTransport { val clientEngine = engineFactory.create(engine) @@ -60,7 +60,7 @@ public fun WebSocketClientTransport( return ClientTransport(transportContext) { val session = httpClient.webSocketSession(request) - WebSocketConnection(session, pool) + WebSocketConnection(session) } } @@ -68,10 +68,10 @@ public fun WebSocketClientTransport( engineFactory: HttpClientEngineFactory, urlString: String, secure: Boolean = false, context: CoroutineContext = EmptyCoroutineContext, - pool: ObjectPool = ChunkBuffer.Pool, + @Suppress("DEPRECATION") pool: ObjectPool = ChunkBuffer.Pool, engine: HttpClientEngineConfig.() -> Unit = {}, webSockets: WebSockets.Config.() -> Unit = {}, - request: HttpRequestBuilder.() -> Unit = {} + request: HttpRequestBuilder.() -> Unit = {}, ): ClientTransport = WebSocketClientTransport(engineFactory, context, pool, engine, webSockets) { url { this.protocol = if (secure) URLProtocol.WSS else URLProtocol.WS @@ -88,10 +88,10 @@ public fun WebSocketClientTransport( path: String? = null, secure: Boolean = false, context: CoroutineContext = EmptyCoroutineContext, - pool: ObjectPool = ChunkBuffer.Pool, + @Suppress("DEPRECATION") pool: ObjectPool = ChunkBuffer.Pool, engine: HttpClientEngineConfig.() -> Unit = {}, webSockets: WebSockets.Config.() -> Unit = {}, - request: HttpRequestBuilder.() -> Unit = {} + request: HttpRequestBuilder.() -> Unit = {}, ): ClientTransport = WebSocketClientTransport(engineFactory, context, pool, engine, webSockets) { url { this.protocol = if (secure) URLProtocol.WSS else URLProtocol.WS diff --git a/rsocket-transport-ktor-websocket-server/src/commonMain/kotlin/io/rsocket/kotlin/transport/ktor/websocket/server/WebSocketServerTransport.kt b/rsocket-transport-ktor-websocket-server/src/commonMain/kotlin/io/rsocket/kotlin/transport/ktor/websocket/server/WebSocketServerTransport.kt index 58fd6125..7cc9ba29 100644 --- a/rsocket-transport-ktor-websocket-server/src/commonMain/kotlin/io/rsocket/kotlin/transport/ktor/websocket/server/WebSocketServerTransport.kt +++ b/rsocket-transport-ktor-websocket-server/src/commonMain/kotlin/io/rsocket/kotlin/transport/ktor/websocket/server/WebSocketServerTransport.kt @@ -33,7 +33,7 @@ public fun WebSocke engineFactory: ApplicationEngineFactory, port: Int = 80, host: String = "0.0.0.0", path: String? = null, protocol: String? = null, - pool: ObjectPool = ChunkBuffer.Pool, + @Suppress("DEPRECATION") pool: ObjectPool = ChunkBuffer.Pool, engine: T.() -> Unit = {}, webSockets: WebSockets.WebSocketOptions.() -> Unit = {}, ): ServerTransport = WebSocketServerTransport( @@ -55,12 +55,12 @@ public fun WebSocke engineFactory: ApplicationEngineFactory, vararg connectors: EngineConnectorConfig, path: String? = null, protocol: String? = null, - pool: ObjectPool = ChunkBuffer.Pool, + @Suppress("DEPRECATION", "UNUSED_PARAMETER") pool: ObjectPool = ChunkBuffer.Pool, engine: T.() -> Unit = {}, webSockets: WebSockets.WebSocketOptions.() -> Unit = {}, ): ServerTransport = ServerTransport { acceptor -> val handler: suspend DefaultWebSocketServerSession.() -> Unit = { - val connection = WebSocketConnection(this, pool) + val connection = WebSocketConnection(this) acceptor(connection) } embeddedServer(engineFactory, *connectors, configure = engine) { diff --git a/rsocket-transport-ktor-websocket-server/src/commonTest/kotlin/io/rsocket/kotlin/transport/ktor/websocket/server/WebSocketTransportTest.kt b/rsocket-transport-ktor-websocket-server/src/commonTest/kotlin/io/rsocket/kotlin/transport/ktor/websocket/server/WebSocketTransportTest.kt index 3e2eb454..733c3718 100644 --- a/rsocket-transport-ktor-websocket-server/src/commonTest/kotlin/io/rsocket/kotlin/transport/ktor/websocket/server/WebSocketTransportTest.kt +++ b/rsocket-transport-ktor-websocket-server/src/commonTest/kotlin/io/rsocket/kotlin/transport/ktor/websocket/server/WebSocketTransportTest.kt @@ -18,7 +18,6 @@ package io.rsocket.kotlin.transport.ktor.websocket.server import io.ktor.client.engine.* import io.ktor.server.engine.* -import io.rsocket.kotlin.test.* import io.rsocket.kotlin.transport.ktor.websocket.client.* import io.rsocket.kotlin.transport.tests.* @@ -29,10 +28,10 @@ abstract class WebSocketTransportTest( override suspend fun before() { val port = PortProvider.next() startServer( - WebSocketServerTransport(serverEngine, port = port, pool = InUseTrackingPool) + WebSocketServerTransport(serverEngine, port = port) ) client = connectClient( - WebSocketClientTransport(clientEngine, port = port, context = testContext, pool = InUseTrackingPool) + WebSocketClientTransport(clientEngine, port = port, context = testContext) ) } } diff --git a/rsocket-transport-ktor-websocket-shared/api/rsocket-transport-ktor-websocket-shared.api b/rsocket-transport-ktor-websocket-shared/api/rsocket-transport-ktor-websocket-shared.api index a8992de9..7925b62e 100644 --- a/rsocket-transport-ktor-websocket-shared/api/rsocket-transport-ktor-websocket-shared.api +++ b/rsocket-transport-ktor-websocket-shared/api/rsocket-transport-ktor-websocket-shared.api @@ -1,5 +1,5 @@ public final class io/rsocket/kotlin/transport/ktor/websocket/WebSocketConnection : io/rsocket/kotlin/Connection, kotlinx/coroutines/CoroutineScope { - public fun (Lio/ktor/websocket/WebSocketSession;Lio/ktor/utils/io/pool/ObjectPool;)V + public fun (Lio/ktor/websocket/WebSocketSession;)V public fun getCoroutineContext ()Lkotlin/coroutines/CoroutineContext; public fun getPool ()Lio/ktor/utils/io/pool/ObjectPool; public fun receive (Lkotlin/coroutines/Continuation;)Ljava/lang/Object; diff --git a/rsocket-transport-ktor-websocket-shared/src/commonMain/kotlin/io/rsocket/kotlin/transport/ktor/websocket/WebSocketConnection.kt b/rsocket-transport-ktor-websocket-shared/src/commonMain/kotlin/io/rsocket/kotlin/transport/ktor/websocket/WebSocketConnection.kt index 62d4cdb2..9c80e6b5 100644 --- a/rsocket-transport-ktor-websocket-shared/src/commonMain/kotlin/io/rsocket/kotlin/transport/ktor/websocket/WebSocketConnection.kt +++ b/rsocket-transport-ktor-websocket-shared/src/commonMain/kotlin/io/rsocket/kotlin/transport/ktor/websocket/WebSocketConnection.kt @@ -17,8 +17,6 @@ package io.rsocket.kotlin.transport.ktor.websocket import io.ktor.utils.io.core.* -import io.ktor.utils.io.core.internal.* -import io.ktor.utils.io.pool.* import io.ktor.websocket.* import io.rsocket.kotlin.* import kotlinx.coroutines.* @@ -26,7 +24,6 @@ import kotlinx.coroutines.* @TransportApi public class WebSocketConnection( private val session: WebSocketSession, - override val pool: ObjectPool ) : Connection, CoroutineScope by session { override suspend fun send(packet: ByteReadPacket) { session.send(packet.readBytes()) diff --git a/rsocket-transport-local/src/commonMain/kotlin/io/rsocket/kotlin/transport/local/LocalConnection.kt b/rsocket-transport-local/src/commonMain/kotlin/io/rsocket/kotlin/transport/local/LocalConnection.kt index 120acf7c..b7104e67 100644 --- a/rsocket-transport-local/src/commonMain/kotlin/io/rsocket/kotlin/transport/local/LocalConnection.kt +++ b/rsocket-transport-local/src/commonMain/kotlin/io/rsocket/kotlin/transport/local/LocalConnection.kt @@ -1,5 +1,5 @@ /* - * Copyright 2015-2022 the original author or authors. + * Copyright 2015-2023 the original author or authors. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -17,8 +17,6 @@ package io.rsocket.kotlin.transport.local import io.ktor.utils.io.core.* -import io.ktor.utils.io.core.internal.* -import io.ktor.utils.io.pool.* import io.rsocket.kotlin.* import kotlinx.coroutines.channels.* import kotlin.coroutines.* @@ -27,7 +25,6 @@ import kotlin.coroutines.* internal class LocalConnection( private val sender: SendChannel, private val receiver: ReceiveChannel, - override val pool: ObjectPool, override val coroutineContext: CoroutineContext ) : Connection { diff --git a/rsocket-transport-local/src/commonMain/kotlin/io/rsocket/kotlin/transport/local/LocalServer.kt b/rsocket-transport-local/src/commonMain/kotlin/io/rsocket/kotlin/transport/local/LocalServer.kt index e0a34f93..6b758294 100644 --- a/rsocket-transport-local/src/commonMain/kotlin/io/rsocket/kotlin/transport/local/LocalServer.kt +++ b/rsocket-transport-local/src/commonMain/kotlin/io/rsocket/kotlin/transport/local/LocalServer.kt @@ -30,7 +30,7 @@ import kotlinx.coroutines.channels.* import kotlin.coroutines.* public fun LocalServerTransport( - pool: ObjectPool = ChunkBuffer.Pool, + @Suppress("DEPRECATION", "UNUSED_PARAMETER") pool: ObjectPool = ChunkBuffer.Pool, ): ServerTransport = ServerTransport { accept -> val connections = Channel() val handlerJob = launch { @@ -40,11 +40,10 @@ public fun LocalServerTransport( } } } - LocalServer(pool, connections, coroutineContext + SupervisorJob(handlerJob)) + LocalServer(connections, coroutineContext + SupervisorJob(handlerJob)) } public class LocalServer internal constructor( - private val pool: ObjectPool, private val connections: Channel, override val coroutineContext: CoroutineContext, ) : ClientTransport { @@ -60,13 +59,11 @@ public class LocalServer internal constructor( val clientConnection = LocalConnection( sender = serverChannel, receiver = clientChannel, - pool = pool, coroutineContext = connectionContext + CoroutineName("rSocket-local-client") ) val serverConnection = LocalConnection( sender = clientChannel, receiver = serverChannel, - pool = pool, coroutineContext = connectionContext + CoroutineName("rSocket-local-server") ) connections.send(serverConnection) diff --git a/rsocket-transport-local/src/commonTest/kotlin/io/rsocket/kotlin/transport/local/LocalTransportTest.kt b/rsocket-transport-local/src/commonTest/kotlin/io/rsocket/kotlin/transport/local/LocalTransportTest.kt index a51eba76..7b7d60b2 100644 --- a/rsocket-transport-local/src/commonTest/kotlin/io/rsocket/kotlin/transport/local/LocalTransportTest.kt +++ b/rsocket-transport-local/src/commonTest/kotlin/io/rsocket/kotlin/transport/local/LocalTransportTest.kt @@ -1,5 +1,5 @@ /* - * Copyright 2015-2022 the original author or authors. + * Copyright 2015-2023 the original author or authors. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -16,12 +16,11 @@ package io.rsocket.kotlin.transport.local -import io.rsocket.kotlin.test.* import io.rsocket.kotlin.transport.tests.* class LocalTransportTest : TransportTest() { override suspend fun before() { - val server = startServer(LocalServerTransport(InUseTrackingPool)) + val server = startServer(LocalServerTransport()) client = connectClient(server) } } diff --git a/rsocket-transport-nodejs-tcp/src/jsMain/kotlin/io/rsocket/kotlin/transport/nodejs/tcp/TcpClientTransport.kt b/rsocket-transport-nodejs-tcp/src/jsMain/kotlin/io/rsocket/kotlin/transport/nodejs/tcp/TcpClientTransport.kt index f0c536d9..2f90030e 100644 --- a/rsocket-transport-nodejs-tcp/src/jsMain/kotlin/io/rsocket/kotlin/transport/nodejs/tcp/TcpClientTransport.kt +++ b/rsocket-transport-nodejs-tcp/src/jsMain/kotlin/io/rsocket/kotlin/transport/nodejs/tcp/TcpClientTransport.kt @@ -1,5 +1,5 @@ /* - * Copyright 2015-2022 the original author or authors. + * Copyright 2015-2023 the original author or authors. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -27,8 +27,8 @@ import kotlin.coroutines.* public class TcpClientTransport( private val port: Int, private val hostname: String, - private val pool: ObjectPool = ChunkBuffer.Pool, - coroutineContext: CoroutineContext = EmptyCoroutineContext + @Suppress("DEPRECATION", "UNUSED_PARAMETER") pool: ObjectPool = ChunkBuffer.Pool, + coroutineContext: CoroutineContext = EmptyCoroutineContext, ) : ClientTransport { override val coroutineContext: CoroutineContext = coroutineContext + SupervisorJob(coroutineContext[Job]) @@ -36,6 +36,6 @@ public class TcpClientTransport( @TransportApi override suspend fun connect(): Connection { val socket = connect(port, hostname) - return TcpConnection(coroutineContext, pool, socket) + return TcpConnection(coroutineContext, socket) } } diff --git a/rsocket-transport-nodejs-tcp/src/jsMain/kotlin/io/rsocket/kotlin/transport/nodejs/tcp/TcpConnection.kt b/rsocket-transport-nodejs-tcp/src/jsMain/kotlin/io/rsocket/kotlin/transport/nodejs/tcp/TcpConnection.kt index 4d49201b..da5ce9a9 100644 --- a/rsocket-transport-nodejs-tcp/src/jsMain/kotlin/io/rsocket/kotlin/transport/nodejs/tcp/TcpConnection.kt +++ b/rsocket-transport-nodejs-tcp/src/jsMain/kotlin/io/rsocket/kotlin/transport/nodejs/tcp/TcpConnection.kt @@ -17,9 +17,7 @@ package io.rsocket.kotlin.transport.nodejs.tcp import io.ktor.utils.io.core.* -import io.ktor.utils.io.core.internal.* import io.ktor.utils.io.js.* -import io.ktor.utils.io.pool.* import io.rsocket.kotlin.* import io.rsocket.kotlin.internal.io.* import io.rsocket.kotlin.transport.nodejs.tcp.internal.* @@ -31,7 +29,6 @@ import kotlin.coroutines.* @TransportApi internal class TcpConnection( override val coroutineContext: CoroutineContext, - override val pool: ObjectPool, private val socket: Socket, ) : Connection { diff --git a/rsocket-transport-nodejs-tcp/src/jsMain/kotlin/io/rsocket/kotlin/transport/nodejs/tcp/TcpServerTransport.kt b/rsocket-transport-nodejs-tcp/src/jsMain/kotlin/io/rsocket/kotlin/transport/nodejs/tcp/TcpServerTransport.kt index 77fcea61..f010cbe3 100644 --- a/rsocket-transport-nodejs-tcp/src/jsMain/kotlin/io/rsocket/kotlin/transport/nodejs/tcp/TcpServerTransport.kt +++ b/rsocket-transport-nodejs-tcp/src/jsMain/kotlin/io/rsocket/kotlin/transport/nodejs/tcp/TcpServerTransport.kt @@ -1,5 +1,5 @@ /* - * Copyright 2015-2022 the original author or authors. + * Copyright 2015-2023 the original author or authors. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -25,7 +25,7 @@ import kotlinx.coroutines.* import kotlin.coroutines.* public class TcpServer internal constructor( - public val job: Job, private val server: Server + public val job: Job, private val server: Server, ) { public suspend fun close(): Unit = suspendCancellableCoroutine { cont -> server.close { cont.resume(Unit) } @@ -33,14 +33,16 @@ public class TcpServer internal constructor( } public class TcpServerTransport( - private val port: Int, private val hostname: String, private val pool: ObjectPool = ChunkBuffer.Pool + private val port: Int, + private val hostname: String, + @Suppress("DEPRECATION", "UNUSED_PARAMETER") pool: ObjectPool = ChunkBuffer.Pool, ) : ServerTransport { @TransportApi override fun CoroutineScope.start(accept: suspend CoroutineScope.(Connection) -> Unit): TcpServer { val supervisorJob = SupervisorJob(coroutineContext[Job]) val server = createServer(port, hostname, { supervisorJob.cancel() }) { launch(supervisorJob) { - accept(TcpConnection(coroutineContext, pool, it)) + accept(TcpConnection(coroutineContext, it)) } } return TcpServer(supervisorJob, server) diff --git a/rsocket-transport-nodejs-tcp/src/jsTest/kotlin/io/rsocket/kotlin/transport/nodejs/tcp/TcpTransportTest.kt b/rsocket-transport-nodejs-tcp/src/jsTest/kotlin/io/rsocket/kotlin/transport/nodejs/tcp/TcpTransportTest.kt index 6699f3db..0addb489 100644 --- a/rsocket-transport-nodejs-tcp/src/jsTest/kotlin/io/rsocket/kotlin/transport/nodejs/tcp/TcpTransportTest.kt +++ b/rsocket-transport-nodejs-tcp/src/jsTest/kotlin/io/rsocket/kotlin/transport/nodejs/tcp/TcpTransportTest.kt @@ -1,5 +1,5 @@ /* - * Copyright 2015-2022 the original author or authors. + * Copyright 2015-2023 the original author or authors. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -16,7 +16,6 @@ package io.rsocket.kotlin.transport.nodejs.tcp -import io.rsocket.kotlin.test.* import io.rsocket.kotlin.transport.tests.* import kotlinx.coroutines.* @@ -25,8 +24,8 @@ class TcpTransportTest : TransportTest() { override suspend fun before() { val port = PortProvider.next() - server = startServer(TcpServerTransport(port, "127.0.0.1", InUseTrackingPool)) - client = connectClient(TcpClientTransport(port, "127.0.0.1", InUseTrackingPool, testContext)) + server = startServer(TcpServerTransport(port, "127.0.0.1")) + client = connectClient(TcpClientTransport(port, "127.0.0.1", coroutineContext = testContext)) } override suspend fun after() {