Skip to content

Commit

Permalink
Netty TCP and WebSocket transports
Browse files Browse the repository at this point in the history
  • Loading branch information
whyoleg committed Nov 17, 2023
1 parent ca4659a commit 5102e8e
Show file tree
Hide file tree
Showing 20 changed files with 1,855 additions and 0 deletions.
9 changes: 9 additions & 0 deletions gradle/libs.versions.toml
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,10 @@ kotlinx-bcv = "0.13.2"

ktor = "2.3.6"

netty = "4.1.101.Final"

bouncycastle = "1.77"

turbine = "1.0.0"

rsocket-java = "1.1.3"
Expand Down Expand Up @@ -44,6 +48,11 @@ ktor-server-cio = { module = "io.ktor:ktor-server-cio", version.ref = "ktor" }
ktor-server-netty = { module = "io.ktor:ktor-server-netty", version.ref = "ktor" }
ktor-server-jetty = { module = "io.ktor:ktor-server-jetty", version.ref = "ktor" }

netty-handler = { module = "io.netty:netty-handler", version.ref = "netty" }
netty-codec-http = { module = "io.netty:netty-codec-http", version.ref = "netty" }

bouncycastle = { module = "org.bouncycastle:bcpkix-jdk18on", version.ref = "bouncycastle" }

turbine = { module = "app.cash.turbine:turbine", version.ref = "turbine" }

rsocket-java-core = { module = 'io.rsocket:rsocket-core', version.ref = "rsocket-java" }
Expand Down
2 changes: 2 additions & 0 deletions rsocket-internal-io/api/rsocket-internal-io.api
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,8 @@ public final class io/rsocket/kotlin/internal/io/ChannelsKt {

public final class io/rsocket/kotlin/internal/io/ContextKt {
public static final fun childContext (Lkotlin/coroutines/CoroutineContext;)Lkotlin/coroutines/CoroutineContext;
public static final fun invokeOnCancellation (Lkotlinx/coroutines/CoroutineScope;Lkotlin/coroutines/CoroutineContext;Lkotlin/jvm/functions/Function1;)V
public static synthetic fun invokeOnCancellation$default (Lkotlinx/coroutines/CoroutineScope;Lkotlin/coroutines/CoroutineContext;Lkotlin/jvm/functions/Function1;ILjava/lang/Object;)V
public static final fun supervisorContext (Lkotlin/coroutines/CoroutineContext;)Lkotlin/coroutines/CoroutineContext;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,3 +21,23 @@ import kotlin.coroutines.*

public fun CoroutineContext.supervisorContext(): CoroutineContext = plus(SupervisorJob(get(Job)))
public fun CoroutineContext.childContext(): CoroutineContext = plus(Job(get(Job)))

public inline fun CoroutineScope.invokeOnCancellation(
context: CoroutineContext = EmptyCoroutineContext,
crossinline block: suspend () -> Unit,
) {
launch(context) {
try {
awaitCancellation()
} catch (cause: Throwable) {
withContext(NonCancellable) {
try {
block()
} catch (suppressed: Throwable) {
cause.addSuppressed(suppressed)
}
}
throw cause
}
}
}
69 changes: 69 additions & 0 deletions rsocket-transport-netty-tcp/api/rsocket-transport-netty-tcp.api
Original file line number Diff line number Diff line change
@@ -0,0 +1,69 @@
public abstract interface class io/rsocket/kotlin/transport/netty/tcp/NettyTcpClientTransport : io/rsocket/kotlin/transport/RSocketClientTransport {
public static final field Factory Lio/rsocket/kotlin/transport/netty/tcp/NettyTcpClientTransport$Factory;
public abstract fun getRemoteAddress ()Ljava/net/SocketAddress;
}

public final class io/rsocket/kotlin/transport/netty/tcp/NettyTcpClientTransport$Factory : io/rsocket/kotlin/transport/RSocketTransportFactory {
public final fun invoke (Lkotlin/coroutines/CoroutineContext;Ljava/lang/String;ILkotlin/jvm/functions/Function1;)Lio/rsocket/kotlin/transport/netty/tcp/NettyTcpClientTransport;
}

public abstract interface class io/rsocket/kotlin/transport/netty/tcp/NettyTcpClientTransportBuilder : io/rsocket/kotlin/transport/RSocketTransportBuilder, io/rsocket/kotlin/transport/RSocketTransportEngineBuilder {
public abstract fun bootstrap (Lkotlin/jvm/functions/Function1;)V
public abstract fun channel (Lkotlin/reflect/KClass;)V
public abstract fun channelFactory (Lio/netty/channel/ChannelFactory;)V
public abstract fun eventLoopGroup (Lio/netty/channel/EventLoopGroup;Z)V
public abstract fun ssl (Lkotlin/jvm/functions/Function1;)V
}

public abstract interface class io/rsocket/kotlin/transport/netty/tcp/NettyTcpClientTransportEngine : io/rsocket/kotlin/transport/RSocketTransportEngine {
public static final field Factory Lio/rsocket/kotlin/transport/netty/tcp/NettyTcpClientTransportEngine$Factory;
public abstract fun createTransport (Ljava/lang/String;I)Lio/rsocket/kotlin/transport/netty/tcp/NettyTcpClientTransport;
}

public final class io/rsocket/kotlin/transport/netty/tcp/NettyTcpClientTransportEngine$DefaultImpls {
public static fun createTransport (Lio/rsocket/kotlin/transport/netty/tcp/NettyTcpClientTransportEngine;Ljava/lang/String;I)Lio/rsocket/kotlin/transport/netty/tcp/NettyTcpClientTransport;
}

public final class io/rsocket/kotlin/transport/netty/tcp/NettyTcpClientTransportEngine$Factory : io/rsocket/kotlin/transport/RSocketTransportEngineFactory {
}

public abstract interface class io/rsocket/kotlin/transport/netty/tcp/NettyTcpServerInstance : io/rsocket/kotlin/transport/RSocketServerInstance {
public abstract fun getLocalAddress ()Ljava/net/SocketAddress;
}

public abstract interface class io/rsocket/kotlin/transport/netty/tcp/NettyTcpServerTransport : io/rsocket/kotlin/transport/RSocketServerTransport {
public static final field Factory Lio/rsocket/kotlin/transport/netty/tcp/NettyTcpServerTransport$Factory;
public abstract fun getLocalAddress ()Ljava/net/SocketAddress;
}

public final class io/rsocket/kotlin/transport/netty/tcp/NettyTcpServerTransport$Factory : io/rsocket/kotlin/transport/RSocketTransportFactory {
public final fun invoke (Lkotlin/coroutines/CoroutineContext;Ljava/lang/String;ILkotlin/jvm/functions/Function1;)Lio/rsocket/kotlin/transport/netty/tcp/NettyTcpServerTransport;
public final fun invoke (Lkotlin/coroutines/CoroutineContext;Lkotlin/jvm/functions/Function1;)Lio/rsocket/kotlin/transport/netty/tcp/NettyTcpServerTransport;
public static synthetic fun invoke$default (Lio/rsocket/kotlin/transport/netty/tcp/NettyTcpServerTransport$Factory;Lkotlin/coroutines/CoroutineContext;Ljava/lang/String;ILkotlin/jvm/functions/Function1;ILjava/lang/Object;)Lio/rsocket/kotlin/transport/netty/tcp/NettyTcpServerTransport;
public static synthetic fun invoke$default (Lio/rsocket/kotlin/transport/netty/tcp/NettyTcpServerTransport$Factory;Lkotlin/coroutines/CoroutineContext;Lkotlin/jvm/functions/Function1;ILjava/lang/Object;)Lio/rsocket/kotlin/transport/netty/tcp/NettyTcpServerTransport;
}

public abstract interface class io/rsocket/kotlin/transport/netty/tcp/NettyTcpServerTransportBuilder : io/rsocket/kotlin/transport/RSocketTransportBuilder, io/rsocket/kotlin/transport/RSocketTransportEngineBuilder {
public abstract fun bootstrap (Lkotlin/jvm/functions/Function1;)V
public abstract fun channel (Lkotlin/reflect/KClass;)V
public abstract fun channelFactory (Lio/netty/channel/ChannelFactory;)V
public abstract fun eventLoopGroup (Lio/netty/channel/EventLoopGroup;Lio/netty/channel/EventLoopGroup;Z)V
public abstract fun eventLoopGroup (Lio/netty/channel/EventLoopGroup;Z)V
public abstract fun ssl (Lkotlin/jvm/functions/Function1;)V
}

public abstract interface class io/rsocket/kotlin/transport/netty/tcp/NettyTcpServerTransportEngine : io/rsocket/kotlin/transport/RSocketTransportEngine {
public static final field Factory Lio/rsocket/kotlin/transport/netty/tcp/NettyTcpServerTransportEngine$Factory;
public abstract fun createTransport ()Lio/rsocket/kotlin/transport/netty/tcp/NettyTcpServerTransport;
public abstract fun createTransport (Ljava/lang/String;I)Lio/rsocket/kotlin/transport/netty/tcp/NettyTcpServerTransport;
}

public final class io/rsocket/kotlin/transport/netty/tcp/NettyTcpServerTransportEngine$DefaultImpls {
public static fun createTransport (Lio/rsocket/kotlin/transport/netty/tcp/NettyTcpServerTransportEngine;)Lio/rsocket/kotlin/transport/netty/tcp/NettyTcpServerTransport;
public static fun createTransport (Lio/rsocket/kotlin/transport/netty/tcp/NettyTcpServerTransportEngine;Ljava/lang/String;I)Lio/rsocket/kotlin/transport/netty/tcp/NettyTcpServerTransport;
public static synthetic fun createTransport$default (Lio/rsocket/kotlin/transport/netty/tcp/NettyTcpServerTransportEngine;Ljava/lang/String;IILjava/lang/Object;)Lio/rsocket/kotlin/transport/netty/tcp/NettyTcpServerTransport;
}

public final class io/rsocket/kotlin/transport/netty/tcp/NettyTcpServerTransportEngine$Factory : io/rsocket/kotlin/transport/RSocketTransportEngineFactory {
}

44 changes: 44 additions & 0 deletions rsocket-transport-netty-tcp/build.gradle.kts
Original file line number Diff line number Diff line change
@@ -0,0 +1,44 @@
/*
* Copyright 2015-2023 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

import rsocketbuild.*

plugins {
id("rsocketbuild.template.library")
}

kotlin {
jvmTarget()

sourceSets {
jvmMain {
dependencies {
implementation(projects.rsocketInternalIo)

api(projects.rsocketCore)
api(libs.netty.handler)
}
}
jvmTest {
dependencies {
implementation(projects.rsocketTransportTests)
implementation(libs.bouncycastle)
}
}
}
}

description = "rsocket-kotlin Netty TCP transport implementation"
Original file line number Diff line number Diff line change
@@ -0,0 +1,96 @@
/*
* Copyright 2015-2023 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package io.rsocket.kotlin.transport.netty.tcp

import io.ktor.utils.io.core.*
import io.netty.buffer.*
import io.netty.channel.*
import io.netty.channel.socket.*
import io.netty.handler.codec.*
import io.netty.handler.ssl.*
import io.rsocket.kotlin.internal.io.*
import io.rsocket.kotlin.transport.*
import kotlinx.coroutines.channels.*
import kotlinx.coroutines.channels.Channel
import java.net.*
import kotlin.coroutines.*

internal class NettyTcpChannelHandler(
private val sslContext: SslContext?,
private val remoteAddress: SocketAddress?,
) : ChannelInitializer<DuplexChannel>() {
private val frames = channelForCloseable<ByteReadPacket>(Channel.UNLIMITED)

@RSocketTransportApi
fun connect(
context: CoroutineContext,
channel: DuplexChannel,
): NettyTcpSession = NettyTcpSession(
coroutineContext = context,
channel = channel,
frames = frames
)

override fun initChannel(ch: DuplexChannel): Unit = with(ch.pipeline()) {
if (sslContext != null) {
val sslHandler = if (
remoteAddress is InetSocketAddress &&
ch.parent() == null // not server
) {
sslContext.newHandler(ch.alloc(), remoteAddress.hostName, remoteAddress.port)
} else {
sslContext.newHandler(ch.alloc())
}
addLast("ssl", sslHandler)
}
addLast(
"rsocket-length-encoder",
LengthFieldPrepender(
/* lengthFieldLength = */ 3
)
)
addLast(
"rsocket-length-decoder",
LengthFieldBasedFrameDecoder(
/* maxFrameLength = */ Int.MAX_VALUE,
/* lengthFieldOffset = */ 0,
/* lengthFieldLength = */ 3,
/* lengthAdjustment = */ 0,
/* initialBytesToStrip = */ 3
)
)
addLast(
"rsocket-frame-receiver",
IncomingFramesChannelHandler(frames)
)
}

private class IncomingFramesChannelHandler(
private val channel: SendChannel<ByteReadPacket>,
) : SimpleChannelInboundHandler<ByteBuf>() {
override fun channelInactive(ctx: ChannelHandlerContext) {
channel.close() //TODO?
super.channelInactive(ctx)
}

override fun channelRead0(ctx: ChannelHandlerContext, msg: ByteBuf) {
channel.trySend(buildPacket {
writeFully(msg.nioBuffer())
}).getOrThrow()
}
}
}
Loading

0 comments on commit 5102e8e

Please sign in to comment.