Skip to content

Commit

Permalink
Introduce new transport API and migrate internals to use it under the…
Browse files Browse the repository at this point in the history
… hood

* minor Gradle flags
  • Loading branch information
whyoleg committed Mar 3, 2024
1 parent 4dafb9a commit 384d191
Show file tree
Hide file tree
Showing 14 changed files with 366 additions and 79 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,7 @@ kotlin {

// rsocket related
optIn(OptIns.TransportApi)
optIn(OptIns.RSocketTransportApi)
optIn(OptIns.ExperimentalMetadataApi)
optIn(OptIns.ExperimentalStreamsApi)
optIn(OptIns.RSocketLoggingApi)
Expand Down
1 change: 1 addition & 0 deletions build-logic/src/main/kotlin/rsocketbuild/OptIns.kt
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ object OptIns {
const val DelicateCoroutinesApi = "kotlinx.coroutines.DelicateCoroutinesApi"

const val TransportApi = "io.rsocket.kotlin.TransportApi"
const val RSocketTransportApi = "io.rsocket.kotlin.transport.RSocketTransportApi"
const val ExperimentalMetadataApi = "io.rsocket.kotlin.ExperimentalMetadataApi"
const val ExperimentalStreamsApi = "io.rsocket.kotlin.ExperimentalStreamsApi"
const val RSocketLoggingApi = "io.rsocket.kotlin.RSocketLoggingApi"
Expand Down
4 changes: 3 additions & 1 deletion gradle.properties
Original file line number Diff line number Diff line change
Expand Up @@ -16,10 +16,12 @@
group=io.rsocket.kotlin
version=0.16.0-SNAPSHOT
#Kotlin
kotlin.mpp.import.enableKgpDependencyResolution=true
kotlin.native.ignoreDisabledTargets=true
kotlin.native.ignoreIncorrectDependencies=true
kotlinx.atomicfu.enableJvmIrTransformation=true
kotlinx.atomicfu.enableJsIrTransformation=true
kotlinx.atomicfu.enableNativeIrTransformations=true
kotlinx.atomicfu.enableNativeIrTransformation=true
#Gradle
org.gradle.jvmargs=-Xmx2g
org.gradle.parallel=true
Expand Down
58 changes: 58 additions & 0 deletions rsocket-core/api/rsocket-core.api
Original file line number Diff line number Diff line change
Expand Up @@ -194,6 +194,7 @@ public abstract interface class io/rsocket/kotlin/core/MimeTypeWithName : io/rso

public final class io/rsocket/kotlin/core/RSocketConnector {
public final fun connect (Lio/rsocket/kotlin/transport/ClientTransport;Lkotlin/coroutines/Continuation;)Ljava/lang/Object;
public final fun connect (Lio/rsocket/kotlin/transport/RSocketClientTarget;Lkotlin/coroutines/Continuation;)Ljava/lang/Object;
}

public final class io/rsocket/kotlin/core/RSocketConnectorBuilder {
Expand Down Expand Up @@ -228,6 +229,8 @@ public final class io/rsocket/kotlin/core/RSocketConnectorBuilderKt {
public final class io/rsocket/kotlin/core/RSocketServer {
public final fun bind (Lio/rsocket/kotlin/transport/ServerTransport;Lio/rsocket/kotlin/ConnectionAcceptor;)Ljava/lang/Object;
public final fun bindIn (Lkotlinx/coroutines/CoroutineScope;Lio/rsocket/kotlin/transport/ServerTransport;Lio/rsocket/kotlin/ConnectionAcceptor;)Ljava/lang/Object;
public final fun createAcceptor (Lio/rsocket/kotlin/ConnectionAcceptor;)Lio/rsocket/kotlin/transport/RSocketServerAcceptor;
public final fun start (Lio/rsocket/kotlin/transport/RSocketServerTarget;Lio/rsocket/kotlin/ConnectionAcceptor;Lkotlin/coroutines/Continuation;)Ljava/lang/Object;
}

public final class io/rsocket/kotlin/core/RSocketServerBuilder {
Expand Down Expand Up @@ -760,6 +763,61 @@ public final class io/rsocket/kotlin/transport/ClientTransportKt {
public static final fun ClientTransport (Lkotlin/coroutines/CoroutineContext;Lio/rsocket/kotlin/transport/ClientTransport;)Lio/rsocket/kotlin/transport/ClientTransport;
}

public abstract interface class io/rsocket/kotlin/transport/RSocketClientTarget : io/rsocket/kotlin/transport/RSocketTransportTarget {
public abstract fun createSession (Lkotlin/coroutines/Continuation;)Ljava/lang/Object;
}

public abstract interface class io/rsocket/kotlin/transport/RSocketServerAcceptor {
public abstract fun acceptSession (Lio/rsocket/kotlin/transport/RSocketTransportSession;Lkotlin/coroutines/Continuation;)Ljava/lang/Object;
}

public abstract interface class io/rsocket/kotlin/transport/RSocketServerInstance : kotlinx/coroutines/CoroutineScope {
}

public abstract interface class io/rsocket/kotlin/transport/RSocketServerTarget : io/rsocket/kotlin/transport/RSocketTransportTarget {
public abstract fun startServer (Lio/rsocket/kotlin/transport/RSocketServerAcceptor;Lkotlin/coroutines/Continuation;)Ljava/lang/Object;
}

public abstract interface class io/rsocket/kotlin/transport/RSocketTransport : kotlinx/coroutines/CoroutineScope {
public abstract fun target (Ljava/lang/Object;)Lio/rsocket/kotlin/transport/RSocketTransportTarget;
}

public abstract interface annotation class io/rsocket/kotlin/transport/RSocketTransportApi : java/lang/annotation/Annotation {
}

public abstract interface class io/rsocket/kotlin/transport/RSocketTransportBuilder {
public abstract fun buildTransport (Lkotlin/coroutines/CoroutineContext;)Lio/rsocket/kotlin/transport/RSocketTransport;
}

public abstract class io/rsocket/kotlin/transport/RSocketTransportFactory {
public fun <init> (Lkotlin/jvm/functions/Function0;)V
public final fun getCreateBuilder ()Lkotlin/jvm/functions/Function0;
public final fun invoke (Lkotlin/coroutines/CoroutineContext;Lkotlin/jvm/functions/Function1;)Lio/rsocket/kotlin/transport/RSocketTransport;
public static synthetic fun invoke$default (Lio/rsocket/kotlin/transport/RSocketTransportFactory;Lkotlin/coroutines/CoroutineContext;Lkotlin/jvm/functions/Function1;ILjava/lang/Object;)Lio/rsocket/kotlin/transport/RSocketTransport;
}

public abstract interface class io/rsocket/kotlin/transport/RSocketTransportSession : kotlinx/coroutines/CoroutineScope {
}

public abstract interface class io/rsocket/kotlin/transport/RSocketTransportSession$Multiplexed : io/rsocket/kotlin/transport/RSocketTransportSession {
public abstract fun awaitStream (Lkotlin/coroutines/Continuation;)Ljava/lang/Object;
public abstract fun createStream (Lkotlin/coroutines/Continuation;)Ljava/lang/Object;
}

public abstract interface class io/rsocket/kotlin/transport/RSocketTransportSession$Multiplexed$Stream : kotlinx/coroutines/CoroutineScope {
public abstract fun receiveFrame (Lkotlin/coroutines/Continuation;)Ljava/lang/Object;
public abstract fun sendFrame (Lio/ktor/utils/io/core/ByteReadPacket;Lkotlin/coroutines/Continuation;)Ljava/lang/Object;
public abstract fun updatePriority (I)V
}

public abstract interface class io/rsocket/kotlin/transport/RSocketTransportSession$Sequential : io/rsocket/kotlin/transport/RSocketTransportSession {
public abstract fun receiveFrame (Lkotlin/coroutines/Continuation;)Ljava/lang/Object;
public abstract fun sendFrame (Lio/ktor/utils/io/core/ByteReadPacket;Lkotlin/coroutines/Continuation;)Ljava/lang/Object;
}

public abstract interface class io/rsocket/kotlin/transport/RSocketTransportTarget : kotlinx/coroutines/CoroutineScope {
}

public abstract interface class io/rsocket/kotlin/transport/ServerTransport {
public abstract fun start (Lkotlinx/coroutines/CoroutineScope;Lkotlin/jvm/functions/Function3;)Ljava/lang/Object;
}
Expand Down
12 changes: 0 additions & 12 deletions rsocket-core/src/commonMain/kotlin/io/rsocket/kotlin/Connection.kt
Original file line number Diff line number Diff line change
Expand Up @@ -17,9 +17,6 @@
package io.rsocket.kotlin

import io.ktor.utils.io.core.*
import io.rsocket.kotlin.frame.*
import io.rsocket.kotlin.internal.*
import io.rsocket.kotlin.internal.io.*
import kotlinx.coroutines.*

/**
Expand All @@ -30,12 +27,3 @@ public interface Connection : CoroutineScope {
public suspend fun send(packet: ByteReadPacket)
public suspend fun receive(): ByteReadPacket
}

@OptIn(TransportApi::class)
internal suspend inline fun <T> Connection.receiveFrame(pool: BufferPool, block: (frame: Frame) -> T): T =
receive().readFrame(pool).closeOnError(block)

@OptIn(TransportApi::class)
internal suspend fun Connection.sendFrame(pool: BufferPool, frame: Frame) {
frame.toPacket(pool).closeOnError { send(it) }
}
Original file line number Diff line number Diff line change
Expand Up @@ -24,10 +24,11 @@ import io.rsocket.kotlin.internal.io.*
import io.rsocket.kotlin.logging.*
import io.rsocket.kotlin.transport.*
import kotlinx.coroutines.*
import kotlin.coroutines.*

@OptIn(TransportApi::class, RSocketLoggingApi::class)
@OptIn(TransportApi::class, RSocketTransportApi::class, RSocketLoggingApi::class)
public class RSocketConnector internal constructor(
private val loggerFactory: LoggerFactory,
loggerFactory: LoggerFactory,
private val maxFragmentSize: Int,
private val interceptors: Interceptors,
private val connectionConfigProvider: () -> ConnectionConfig,
Expand All @@ -36,19 +37,32 @@ public class RSocketConnector internal constructor(
private val bufferPool: BufferPool,
) {

public suspend fun connect(transport: ClientTransport): RSocket = when (reconnectPredicate) {
//TODO current coroutineContext job is overriden by transport coroutineContext jov
null -> withContext(transport.coroutineContext) { connectOnce(transport) }
else -> connectWithReconnect(
transport.coroutineContext,
loggerFactory.logger("io.rsocket.kotlin.connection"),
{ connectOnce(transport) },
reconnectPredicate,
)
private val connectionLogger = loggerFactory.logger("io.rsocket.kotlin.connection")
private val frameLogger = loggerFactory.logger("io.rsocket.kotlin.frame")

public suspend fun connect(transport: ClientTransport): RSocket = connect(object : RSocketClientTarget {
override val coroutineContext: CoroutineContext get() = transport.coroutineContext
override suspend fun createSession(): RSocketTransportSession {
return interceptors.wrapConnection(transport.connect()).convert()
}
})

public suspend fun connect(transport: RSocketClientTarget): RSocket {
return when (reconnectPredicate) {
//TODO current coroutineContext job is overriden by transport coroutineContext jov
null -> withContext(transport.coroutineContext) { connectOnce(transport) }
else -> connectWithReconnect(
transport.coroutineContext,
connectionLogger,
{ connectOnce(transport) },
reconnectPredicate,
)
}
}

private suspend fun connectOnce(transport: ClientTransport): RSocket {
val connection = transport.connect().wrapConnection()
private suspend fun connectOnce(transport: RSocketClientTarget): RSocket {
val connection = transport.createSession().logging(frameLogger, bufferPool)
check(connection is RSocketTransportSession.Sequential) { "multiplexed is not yet supported" }
val connectionConfig = try {
connectionConfigProvider()
} catch (cause: Throwable) {
Expand Down Expand Up @@ -82,8 +96,4 @@ public class RSocketConnector internal constructor(
throw cause
}
}

private fun Connection.wrapConnection(): Connection =
interceptors.wrapConnection(this)
.logging(loggerFactory.logger("io.rsocket.kotlin.frame"), bufferPool)
}
Original file line number Diff line number Diff line change
Expand Up @@ -25,14 +25,16 @@ import io.rsocket.kotlin.logging.*
import io.rsocket.kotlin.transport.*
import kotlinx.coroutines.*

@OptIn(TransportApi::class, RSocketLoggingApi::class)
@OptIn(TransportApi::class, RSocketTransportApi::class, RSocketLoggingApi::class)
public class RSocketServer internal constructor(
private val loggerFactory: LoggerFactory,
loggerFactory: LoggerFactory,
private val maxFragmentSize: Int,
private val interceptors: Interceptors,
private val bufferPool: BufferPool,
) {

private val frameLogger = loggerFactory.logger("io.rsocket.kotlin.frame")

@DelicateCoroutinesApi
public fun <T> bind(
transport: ServerTransport<T>,
Expand All @@ -45,46 +47,56 @@ public class RSocketServer internal constructor(
acceptor: ConnectionAcceptor,
): T = with(transport) {
scope.start {
it.wrapConnection().bind(acceptor).join()
interceptors.wrapConnection(it).convert().bind(acceptor).join()
}
}

private suspend fun Connection.bind(acceptor: ConnectionAcceptor): Job = receiveFrame(bufferPool) { setupFrame ->
when {
setupFrame !is SetupFrame -> failSetup(RSocketError.Setup.Invalid("Invalid setup frame: ${setupFrame.type}"))
setupFrame.version != Version.Current -> failSetup(RSocketError.Setup.Unsupported("Unsupported version: ${setupFrame.version}"))
setupFrame.honorLease -> failSetup(RSocketError.Setup.Unsupported("Lease is not supported"))
setupFrame.resumeToken != null -> failSetup(RSocketError.Setup.Unsupported("Resume is not supported"))
else -> try {
connect(
connection = this,
isServer = true,
maxFragmentSize = maxFragmentSize,
interceptors = interceptors,
connectionConfig = ConnectionConfig(
keepAlive = setupFrame.keepAlive,
payloadMimeType = setupFrame.payloadMimeType,
setupPayload = setupFrame.payload
),
acceptor = acceptor,
bufferPool = bufferPool
)
coroutineContext.job
} catch (e: Throwable) {
failSetup(RSocketError.Setup.Rejected(e.message ?: "Rejected by server acceptor"))
public suspend fun <T : RSocketServerInstance> start(
transport: RSocketServerTarget<T>,
acceptor: ConnectionAcceptor,
): T = transport.startServer(createAcceptor(acceptor))

@RSocketTransportApi
public fun createAcceptor(acceptor: ConnectionAcceptor): RSocketServerAcceptor = object : RSocketServerAcceptor {
override suspend fun acceptSession(session: RSocketTransportSession) {
session.logging(frameLogger, bufferPool).bind(acceptor)
}
}

private suspend fun RSocketTransportSession.bind(acceptor: ConnectionAcceptor): Job {
check(this is RSocketTransportSession.Sequential) { "multiplexed is not yet supported" }

return receiveFrame(bufferPool) { setupFrame ->
when {
setupFrame !is SetupFrame -> failSetup(RSocketError.Setup.Invalid("Invalid setup frame: ${setupFrame.type}"))
setupFrame.version != Version.Current -> failSetup(RSocketError.Setup.Unsupported("Unsupported version: ${setupFrame.version}"))
setupFrame.honorLease -> failSetup(RSocketError.Setup.Unsupported("Lease is not supported"))
setupFrame.resumeToken != null -> failSetup(RSocketError.Setup.Unsupported("Resume is not supported"))
else -> try {
connect(
connection = this,
isServer = true,
maxFragmentSize = maxFragmentSize,
interceptors = interceptors,
connectionConfig = ConnectionConfig(
keepAlive = setupFrame.keepAlive,
payloadMimeType = setupFrame.payloadMimeType,
setupPayload = setupFrame.payload
),
acceptor = acceptor,
bufferPool = bufferPool
)
coroutineContext.job
} catch (e: Throwable) {
failSetup(RSocketError.Setup.Rejected(e.message ?: "Rejected by server acceptor"))
}
}
}
}

@Suppress("SuspendFunctionOnCoroutineScope")
private suspend fun Connection.failSetup(error: RSocketError.Setup): Nothing {
private suspend fun RSocketTransportSession.Sequential.failSetup(error: RSocketError.Setup): Nothing {
sendFrame(bufferPool, ErrorFrame(0, error))
cancel("Connection establishment failed", error)
throw error
}

private fun Connection.wrapConnection(): Connection =
interceptors.wrapConnection(this)
.logging(loggerFactory.logger("io.rsocket.kotlin.frame"), bufferPool)

}
Original file line number Diff line number Diff line change
@@ -0,0 +1,30 @@
/*
* Copyright 2015-2024 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package io.rsocket.kotlin.core

import io.ktor.utils.io.core.*
import io.rsocket.kotlin.*
import io.rsocket.kotlin.transport.*
import kotlin.coroutines.*

@TransportApi
@RSocketTransportApi
internal fun Connection.convert(): RSocketTransportSession.Sequential = object : RSocketTransportSession.Sequential {
override val coroutineContext: CoroutineContext get() = this@convert.coroutineContext
override suspend fun sendFrame(frame: ByteReadPacket) = send(frame)
override suspend fun receiveFrame(): ByteReadPacket = receive()
}
Original file line number Diff line number Diff line change
Expand Up @@ -20,11 +20,12 @@ import io.rsocket.kotlin.*
import io.rsocket.kotlin.core.*
import io.rsocket.kotlin.frame.*
import io.rsocket.kotlin.internal.io.*
import io.rsocket.kotlin.transport.*
import kotlinx.coroutines.*

@OptIn(TransportApi::class)
@RSocketTransportApi
internal suspend inline fun connect(
connection: Connection,
connection: RSocketTransportSession.Sequential,
isServer: Boolean,
maxFragmentSize: Int,
interceptors: Interceptors,
Expand Down Expand Up @@ -97,10 +98,21 @@ internal suspend inline fun connect(
is LeaseFrame -> frame.close().also { error("lease isn't implemented") }
else -> frame.close()
}

else -> streamsStorage.handleFrame(frame, responder)
}
}
}

return requester
}

@RSocketTransportApi
internal suspend inline fun <T> RSocketTransportSession.Sequential.receiveFrame(bufferPool: BufferPool, block: (frame: Frame) -> T): T {
return receiveFrame().readFrame(bufferPool).closeOnError(block)
}

@RSocketTransportApi
internal suspend fun RSocketTransportSession.Sequential.sendFrame(bufferPool: BufferPool, frame: Frame) {
frame.toPacket(bufferPool).closeOnError { sendFrame(it) }
}
Loading

0 comments on commit 384d191

Please sign in to comment.