Skip to content

Commit

Permalink
migrate ktor integration
Browse files Browse the repository at this point in the history
  • Loading branch information
whyoleg committed Nov 16, 2023
1 parent 6cf8765 commit 039cb0c
Show file tree
Hide file tree
Showing 6 changed files with 47 additions and 52 deletions.
8 changes: 4 additions & 4 deletions rsocket-ktor-client/api/rsocket-ktor-client.api
Original file line number Diff line number Diff line change
@@ -1,9 +1,9 @@
public final class io/rsocket/kotlin/ktor/client/BuildersKt {
public static final fun rSocket (Lio/ktor/client/HttpClient;Ljava/lang/String;Ljava/lang/Integer;Ljava/lang/String;ZLkotlin/jvm/functions/Function1;Lkotlin/coroutines/Continuation;)Ljava/lang/Object;
public static final fun rSocket (Lio/ktor/client/HttpClient;Ljava/lang/String;ZLkotlin/jvm/functions/Function1;Lkotlin/coroutines/Continuation;)Ljava/lang/Object;
public static final fun rSocket (Lio/ktor/client/HttpClient;Lio/ktor/http/HttpMethod;Ljava/lang/String;Ljava/lang/Integer;Ljava/lang/String;Lkotlin/jvm/functions/Function1;Lkotlin/coroutines/Continuation;)Ljava/lang/Object;
public static final fun rSocket (Lio/ktor/client/HttpClient;Ljava/lang/String;Lkotlin/jvm/functions/Function1;Lkotlin/coroutines/Continuation;)Ljava/lang/Object;
public static final fun rSocket (Lio/ktor/client/HttpClient;Lkotlin/jvm/functions/Function1;Lkotlin/coroutines/Continuation;)Ljava/lang/Object;
public static synthetic fun rSocket$default (Lio/ktor/client/HttpClient;Ljava/lang/String;Ljava/lang/Integer;Ljava/lang/String;ZLkotlin/jvm/functions/Function1;Lkotlin/coroutines/Continuation;ILjava/lang/Object;)Ljava/lang/Object;
public static synthetic fun rSocket$default (Lio/ktor/client/HttpClient;Ljava/lang/String;ZLkotlin/jvm/functions/Function1;Lkotlin/coroutines/Continuation;ILjava/lang/Object;)Ljava/lang/Object;
public static synthetic fun rSocket$default (Lio/ktor/client/HttpClient;Lio/ktor/http/HttpMethod;Ljava/lang/String;Ljava/lang/Integer;Ljava/lang/String;Lkotlin/jvm/functions/Function1;Lkotlin/coroutines/Continuation;ILjava/lang/Object;)Ljava/lang/Object;
public static synthetic fun rSocket$default (Lio/ktor/client/HttpClient;Ljava/lang/String;Lkotlin/jvm/functions/Function1;Lkotlin/coroutines/Continuation;ILjava/lang/Object;)Ljava/lang/Object;
}

public final class io/rsocket/kotlin/ktor/client/RSocketSupport {
Expand Down
2 changes: 2 additions & 0 deletions rsocket-ktor-client/build.gradle.kts
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,8 @@ kotlin {
sourceSets {
commonMain {
dependencies {
implementation(projects.rsocketInternalIo)

api(projects.rsocketCore)
api(projects.rsocketTransportKtorWebsocketShared)
api(libs.ktor.client.websockets)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,50 +22,48 @@ import io.ktor.client.plugins.websocket.*
import io.ktor.client.request.*
import io.ktor.http.*
import io.rsocket.kotlin.*
import io.rsocket.kotlin.internal.io.*
import io.rsocket.kotlin.transport.*
import io.rsocket.kotlin.transport.ktor.websocket.*
import kotlinx.coroutines.*
import kotlin.coroutines.*

public suspend fun HttpClient.rSocket(
request: HttpRequestBuilder.() -> Unit,
): RSocket = plugin(RSocketSupport).run {
connector.connect(KtorClientTransport(this@rSocket, request))
}
): RSocket = plugin(RSocketSupport).connector.connect(KtorWebSocketClientTransport(this, request))

public suspend fun HttpClient.rSocket(
urlString: String,
secure: Boolean = false,
request: HttpRequestBuilder.() -> Unit = {},
): RSocket = rSocket {
url {
this.protocol = if (secure) URLProtocol.WSS else URLProtocol.WS
this.port = protocol.defaultPort
takeFrom(urlString)
}
): RSocket = rSocket(method = HttpMethod.Get, host = null, port = null, path = null) {
url.protocol = URLProtocol.WS
url.port = url.protocol.defaultPort
url.takeFrom(urlString)
request()
}

public suspend fun HttpClient.rSocket(
method: HttpMethod = HttpMethod.Get,
host: String? = null,
port: Int? = null,
path: String? = null,
secure: Boolean = false,
request: HttpRequestBuilder.() -> Unit = {},
): RSocket = rSocket {
url {
this.protocol = if (secure) URLProtocol.WSS else URLProtocol.WS
this.port = protocol.defaultPort
set(host = host, port = port, path = path)
}
this.method = method
url("ws", host, port, path)
request()
}

private class KtorClientTransport(
private class KtorWebSocketClientTransport(
private val client: HttpClient,
private val request: HttpRequestBuilder.() -> Unit,
) : ClientTransport {
override val coroutineContext: CoroutineContext get() = client.coroutineContext
) : RSocketClientTransport {
override val coroutineContext: CoroutineContext = client.coroutineContext.supervisorContext()

@RSocketTransportApi
override suspend fun createSession(): RSocketTransportSession {
ensureActive()

@TransportApi
override suspend fun connect(): Connection = WebSocketConnection(client.webSocketSession(request))
return KtorWebSocketSession(client.webSocketSession(request))
}
}
2 changes: 2 additions & 0 deletions rsocket-ktor-server/api/rsocket-ktor-server.api
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,9 @@ public final class io/rsocket/kotlin/ktor/server/RSocketSupport$Feature : io/kto
}

public final class io/rsocket/kotlin/ktor/server/RoutingKt {
public static final fun rSocket (Lio/ktor/server/routing/Route;Ljava/lang/String;Lio/rsocket/kotlin/ConnectionAcceptor;)V
public static final fun rSocket (Lio/ktor/server/routing/Route;Ljava/lang/String;Ljava/lang/String;Lio/rsocket/kotlin/ConnectionAcceptor;)V
public static synthetic fun rSocket$default (Lio/ktor/server/routing/Route;Ljava/lang/String;Lio/rsocket/kotlin/ConnectionAcceptor;ILjava/lang/Object;)V
public static synthetic fun rSocket$default (Lio/ktor/server/routing/Route;Ljava/lang/String;Ljava/lang/String;Lio/rsocket/kotlin/ConnectionAcceptor;ILjava/lang/Object;)V
}

Original file line number Diff line number Diff line change
Expand Up @@ -21,10 +21,14 @@ import io.ktor.server.websocket.*
import io.ktor.util.*
import io.ktor.utils.io.core.internal.*
import io.ktor.utils.io.pool.*
import io.rsocket.kotlin.*
import io.rsocket.kotlin.core.*
import io.rsocket.kotlin.transport.*
import io.rsocket.kotlin.transport.ktor.websocket.*
import kotlinx.coroutines.*

public class RSocketSupport private constructor(
internal val server: RSocketServer,
private val server: RSocketServer,
) {
public class Config internal constructor() {
@Suppress("DEPRECATION")
Expand All @@ -48,4 +52,13 @@ public class RSocketSupport private constructor(
}
}
}

@RSocketTransportApi
internal fun handler(acceptor: ConnectionAcceptor): suspend DefaultWebSocketServerSession.() -> Unit {
val serverAcceptor = server.createAcceptor(acceptor)
return {
serverAcceptor.acceptSession(KtorWebSocketSession(this))
coroutineContext.job.join()
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -21,31 +21,11 @@ import io.ktor.server.routing.*
import io.ktor.server.websocket.*
import io.rsocket.kotlin.*
import io.rsocket.kotlin.transport.*
import io.rsocket.kotlin.transport.ktor.websocket.*
import kotlinx.coroutines.*

public fun Route.rSocket(
path: String? = null,
protocol: String? = null,
acceptor: ConnectionAcceptor,
): Unit = application.plugin(RSocketSupport).run {
server.bindIn(application, KtorServerTransport(this@rSocket, path, protocol), acceptor)
}
@OptIn(RSocketTransportApi::class)
public fun Route.rSocket(protocol: String? = null, acceptor: ConnectionAcceptor): Unit =
webSocket(protocol, application.plugin(RSocketSupport).handler(acceptor))

private class KtorServerTransport(
private val route: Route,
private val path: String?,
private val protocol: String?,
) : ServerTransport<Unit> {
@TransportApi
override fun CoroutineScope.start(accept: suspend CoroutineScope.(Connection) -> Unit) {
val handler: suspend DefaultWebSocketServerSession.() -> Unit = {
val connection = WebSocketConnection(this)
accept(connection)
}
when (path) {
null -> route.webSocket(protocol, handler)
else -> route.webSocket(path, protocol, handler)
}
}
}
@OptIn(RSocketTransportApi::class)
public fun Route.rSocket(path: String, protocol: String? = null, acceptor: ConnectionAcceptor): Unit =
webSocket(path, protocol, application.plugin(RSocketSupport).handler(acceptor))

0 comments on commit 039cb0c

Please sign in to comment.