Skip to content

Commit

Permalink
migrate nodejs transport to new API
Browse files Browse the repository at this point in the history
  • Loading branch information
whyoleg committed Mar 3, 2024
1 parent 80641ef commit d15aa0f
Show file tree
Hide file tree
Showing 5 changed files with 279 additions and 0 deletions.
Original file line number Diff line number Diff line change
@@ -0,0 +1,22 @@
/*
* Copyright 2015-2024 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package io.rsocket.kotlin.transport.nodejs.tcp

public class NodejsTcpAddress(
public val hostname: String,
public val port: Int,
)
Original file line number Diff line number Diff line change
@@ -0,0 +1,75 @@
/*
* Copyright 2015-2024 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package io.rsocket.kotlin.transport.nodejs.tcp

import io.rsocket.kotlin.internal.io.*
import io.rsocket.kotlin.transport.*
import io.rsocket.kotlin.transport.nodejs.tcp.internal.*
import kotlinx.coroutines.*
import kotlin.coroutines.*

public sealed interface NodejsTcpClientTarget : RSocketClientTarget {
public val address: NodejsTcpAddress
}

public sealed interface NodejsTcpClientTransport : RSocketTransport<
NodejsTcpAddress,
NodejsTcpClientTarget> {

public fun target(hostname: String, port: Int): NodejsTcpClientTarget = target(NodejsTcpAddress(hostname, port))

public companion object Factory : RSocketTransportFactory<
NodejsTcpAddress,
NodejsTcpClientTarget,
NodejsTcpClientTransport,
NodejsTcpClientTransportBuilder>({ NodejsTcpClientTransportBuilderImpl })
}

public sealed interface NodejsTcpClientTransportBuilder : RSocketTransportBuilder<
NodejsTcpAddress,
NodejsTcpClientTarget,
NodejsTcpClientTransport>

private object NodejsTcpClientTransportBuilderImpl : NodejsTcpClientTransportBuilder {
@RSocketTransportApi
override fun buildTransport(context: CoroutineContext): NodejsTcpClientTransport = NodejsTcpClientTransportImpl(
coroutineContext = context.supervisorContext(),
)
}

private class NodejsTcpClientTransportImpl(
override val coroutineContext: CoroutineContext,
) : NodejsTcpClientTransport {

override fun target(address: NodejsTcpAddress): NodejsTcpClientTarget = NodejsTcpClientTargetImpl(
coroutineContext = coroutineContext.supervisorContext(),
address = address
)
}

private class NodejsTcpClientTargetImpl(
override val coroutineContext: CoroutineContext,
override val address: NodejsTcpAddress,
) : NodejsTcpClientTarget {

@RSocketTransportApi
override suspend fun createSession(): RSocketTransportSession {
ensureActive()

return NodejsTcpSession(coroutineContext.childContext(), connect(address.port, address.hostname))
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,107 @@
/*
* Copyright 2015-2024 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package io.rsocket.kotlin.transport.nodejs.tcp

import io.rsocket.kotlin.internal.io.*
import io.rsocket.kotlin.transport.*
import io.rsocket.kotlin.transport.nodejs.tcp.internal.*
import kotlinx.coroutines.*
import kotlin.coroutines.*

public sealed interface NodejsTcpServerInstance : RSocketServerInstance {
public val address: NodejsTcpAddress
}

public sealed interface NodejsTcpServerTarget : RSocketServerTarget<NodejsTcpServerInstance> {
public val address: NodejsTcpAddress
}

public sealed interface NodejsTcpServerTransport : RSocketTransport<
NodejsTcpAddress,
NodejsTcpServerTarget> {

public fun target(hostname: String, port: Int): NodejsTcpServerTarget = target(NodejsTcpAddress(hostname, port))

public companion object Factory : RSocketTransportFactory<
NodejsTcpAddress,
NodejsTcpServerTarget,
NodejsTcpServerTransport,
NodejsTcpServerTransportBuilder>({ NodejsTcpServerTransportBuilderImpl })
}

public sealed interface NodejsTcpServerTransportBuilder : RSocketTransportBuilder<
NodejsTcpAddress,
NodejsTcpServerTarget,
NodejsTcpServerTransport>

private object NodejsTcpServerTransportBuilderImpl : NodejsTcpServerTransportBuilder {
@RSocketTransportApi
override fun buildTransport(context: CoroutineContext): NodejsTcpServerTransport = NodejsTcpServerTransportImpl(
coroutineContext = context.supervisorContext(),
)
}

private class NodejsTcpServerTransportImpl(
override val coroutineContext: CoroutineContext,
) : NodejsTcpServerTransport {
override fun target(address: NodejsTcpAddress): NodejsTcpServerTarget = NodejsTcpServerTargetImpl(
coroutineContext = coroutineContext.supervisorContext(),
address = address
)
}

private class NodejsTcpServerTargetImpl(
override val coroutineContext: CoroutineContext,
override val address: NodejsTcpAddress,
) : NodejsTcpServerTarget {

@RSocketTransportApi
override suspend fun startServer(acceptor: RSocketServerAcceptor): NodejsTcpServerInstance {
ensureActive()

return NodejsTcpServerInstanceImpl(
coroutineContext = coroutineContext.supervisorContext(),
address = address,
acceptor = acceptor,
)
}
}

@RSocketTransportApi
private class NodejsTcpServerInstanceImpl(
override val coroutineContext: CoroutineContext,
override val address: NodejsTcpAddress,
private val acceptor: RSocketServerAcceptor,
) : NodejsTcpServerInstance {
init {
val server = createServer(address.port, address.hostname, {
coroutineContext.job.cancel("Server closed")
}) {
launch {
acceptor.acceptSession(NodejsTcpSession(coroutineContext.childContext(), it))
}
}
launch {
try {
awaitCancellation()
} catch (cause: Throwable) {
suspendCoroutine { cont -> server.close { cont.resume(Unit) } }
throw cause
}
}
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,67 @@
/*
* Copyright 2015-2024 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package io.rsocket.kotlin.transport.nodejs.tcp

import io.ktor.utils.io.core.*
import io.ktor.utils.io.js.*
import io.rsocket.kotlin.internal.io.*
import io.rsocket.kotlin.transport.*
import io.rsocket.kotlin.transport.nodejs.tcp.internal.*
import kotlinx.coroutines.*
import kotlinx.coroutines.channels.*
import org.khronos.webgl.*
import kotlin.coroutines.*

@RSocketTransportApi
internal class NodejsTcpSession(
override val coroutineContext: CoroutineContext,
private val socket: Socket,
) : RSocketTransportSession.Sequential {

private val sendChannel = channelForCloseable<ByteReadPacket>(8)
private val receiveChannel = channelForCloseable<ByteReadPacket>(Channel.UNLIMITED)

init {
launch {
sendChannel.consumeEach { packet ->
socket.write(Uint8Array(packet.withLength().readArrayBuffer()))
}
}

coroutineContext.job.invokeOnCompletion {
when (it) {
null -> socket.destroy()
else -> socket.destroy(Error(it.message, it.cause))
}
}

val frameAssembler = FrameWithLengthAssembler { receiveChannel.trySend(it) } //TODO
socket.on(
onData = { frameAssembler.write { writeFully(it.buffer) } },
onError = { coroutineContext.job.cancel("Socket error", it) },
onClose = { if (!it) coroutineContext.job.cancel("Socket closed") }
)
}

override suspend fun sendFrame(frame: ByteReadPacket) {
sendChannel.send(frame)
}

override suspend fun receiveFrame(): ByteReadPacket {
return receiveChannel.receive()
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -34,3 +34,11 @@ class TcpTransportTest : TransportTest() {
server.close()
}
}

class NodejsTcpTransportTest : TransportTest() {
override suspend fun before() {
val port = PortProvider.next()
startServer(NodejsTcpServerTransport(testContext).target("127.0.0.1", port))
client = connectClient(NodejsTcpClientTransport(testContext).target("127.0.0.1", port))
}
}

0 comments on commit d15aa0f

Please sign in to comment.