Skip to content

Commit

Permalink
WIP transport API
Browse files Browse the repository at this point in the history
  • Loading branch information
whyoleg committed Nov 13, 2023
1 parent 3e48909 commit f285c5f
Show file tree
Hide file tree
Showing 5 changed files with 161 additions and 0 deletions.
Original file line number Diff line number Diff line change
@@ -0,0 +1,23 @@
/*
* Copyright 2015-2023 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package io.rsocket.kotlin.transport

@SubclassOptInRequired(RSocketTransportApi::class)
public interface RSocketClientTransport : RSocketTransport {
@RSocketTransportApi
public suspend fun connect(): RSocketTransportSession
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,35 @@
/*
* Copyright 2015-2023 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package io.rsocket.kotlin.transport

import kotlinx.coroutines.*

@SubclassOptInRequired(RSocketTransportApi::class)
public interface RSocketServerTransport<Instance : RSocketServerInstance> : RSocketTransport {
@RSocketTransportApi
public suspend fun start(acceptor: RSocketServerAcceptor): Instance
}

@RSocketTransportApi
public interface RSocketServerAcceptor {
public suspend fun accept(connection: RSocketTransportSession)
}

// cancelling it will cancel server
// coroutineContext of transport should contain SupervisorJob
@SubclassOptInRequired(RSocketTransportApi::class)
public interface RSocketServerInstance : CoroutineScope
Original file line number Diff line number Diff line change
@@ -0,0 +1,37 @@
/*
* Copyright 2015-2023 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package io.rsocket.kotlin.transport

import kotlinx.coroutines.*
import kotlin.coroutines.*

// coroutineContext of transport should contain SupervisorJob
@SubclassOptInRequired(RSocketTransportApi::class)
public interface RSocketTransport : CoroutineScope

// coroutineContext of transport should contain SupervisorJob
@SubclassOptInRequired(RSocketTransportApi::class)
public interface RSocketTransportEngine<Target, Transport : RSocketTransport> : CoroutineScope {
public fun createTransport(target: Target): Transport
}

// TODO: split into transport factory and RSocketTransportEngineFactory
@SubclassOptInRequired(RSocketTransportApi::class)
public interface RSocketTransportFactory<Target, Transport : RSocketTransport, Engine : RSocketTransportEngine<Target, Transport>, Builder> {
public operator fun invoke(context: CoroutineContext, target: Target, block: Builder.() -> Unit = {}): Transport
public fun Engine(context: CoroutineContext, block: Builder.() -> Unit = {}): Engine
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,26 @@
/*
* Copyright 2015-2023 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package io.rsocket.kotlin.transport

@RequiresOptIn(
level = RequiresOptIn.Level.ERROR,
message = """
This is an API which is used to implement transport for RSocket, such as WS or TCP.
This API should not be used from general code
"""
)
public annotation class RSocketTransportApi
Original file line number Diff line number Diff line change
@@ -0,0 +1,40 @@
/*
* Copyright 2015-2023 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package io.rsocket.kotlin.transport

import io.ktor.utils.io.core.*
import kotlinx.coroutines.*

// coroutineContext of session should NOT contain SupervisorJob
@RSocketTransportApi
public sealed interface RSocketTransportSession : CoroutineScope {
public interface Sequential : RSocketTransportSession {
public suspend fun sendFrame(frame: ByteReadPacket)
public suspend fun receiveFrame(): ByteReadPacket
}

public interface Multiplexed : RSocketTransportSession {
public interface Stream : CoroutineScope {
public fun prioritize()
public suspend fun sendFrame(frame: ByteReadPacket)
public suspend fun receiveFrame(): ByteReadPacket
}

public suspend fun createStream(): Stream
public suspend fun awaitStream(): Stream
}
}

0 comments on commit f285c5f

Please sign in to comment.