Skip to content

Commit

Permalink
Reimplement the accept APIs
Browse files Browse the repository at this point in the history
  • Loading branch information
MOZGIII committed Jul 22, 2023
1 parent b256ad5 commit 7dd009d
Show file tree
Hide file tree
Showing 12 changed files with 132 additions and 44 deletions.
6 changes: 3 additions & 3 deletions crates/xwebtransport-core/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,9 +12,9 @@ pub use traits::*;

pub mod prelude {
pub use crate::traits::{
AcceptBiStream as _, AcceptUniStream as _, Connecting as _, Connection as _,
EndpointAccept as _, EndpointConnect as _, OpenBiStream as _, OpenUniStream as _,
OpeningBiStream as _, OpeningUniStream as _,
AcceptBiStream as _, AcceptUniStream as _, Accepting as _, Connecting as _,
Connection as _, EndpointAccept as _, EndpointConnect as _, OpenBiStream as _,
OpenUniStream as _, OpeningBiStream as _, OpeningUniStream as _, Request as _,
};

pub use crate::trait_utils::*;
Expand Down
10 changes: 8 additions & 2 deletions crates/xwebtransport-core/src/trait_utils.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,8 +3,14 @@ use crate::traits;
pub type EndpointConnectConnectionFor<T> =
<<T as traits::EndpointConnect>::Connecting as traits::Connecting>::Connection;

pub type EndpointAcceptConnectionFor<T> =
<<T as traits::EndpointAccept>::Connecting as traits::Connecting>::Connection;
pub type EndpointAcceptConnectionFor<T> = RequestConnectionFor<EndpointAcceptRequestFor<T>>;

pub type EndpointAcceptRequestFor<T> =
<<T as traits::EndpointAccept>::Accepting as traits::Accepting>::Request;

pub type RequestConnectionFor<T> = <T as traits::Request>::Connection;

pub type AcceptingConnectionFor<T> = RequestConnectionFor<<T as traits::Accepting>::Request>;

pub type BiStreamOpeningErrorFor<T> =
<<T as traits::OpenBiStream>::Opening as traits::OpeningBiStream>::Error;
Expand Down
42 changes: 31 additions & 11 deletions crates/xwebtransport-core/src/traits.rs
Original file line number Diff line number Diff line change
Expand Up @@ -61,15 +61,6 @@ pub trait AcceptUniStream: Streams {
async fn accept_uni(&self) -> Result<Self::RecvStream, Self::Error>;
}

#[cfg_attr(not(target_family = "wasm"), async_trait)]
#[cfg_attr(target_family = "wasm", async_trait(?Send))]
pub trait Connecting: maybe::Send {
type Connection: maybe::Send;
type Error: std::error::Error + maybe::Send + maybe::Sync + 'static;

async fn wait(self) -> Result<Self::Connection, Self::Error>;
}

#[cfg_attr(not(target_family = "wasm"), async_trait)]
#[cfg_attr(target_family = "wasm", async_trait(?Send))]
pub trait EndpointConnect: Sized + maybe::Send {
Expand All @@ -84,13 +75,42 @@ pub trait EndpointConnect: Sized + maybe::Send {
) -> Result<Self::Connecting, Self::Error>;
}

#[cfg_attr(not(target_family = "wasm"), async_trait)]
#[cfg_attr(target_family = "wasm", async_trait(?Send))]
pub trait Connecting: maybe::Send {
type Connection: maybe::Send;
type Error: std::error::Error + maybe::Send + maybe::Sync + 'static;

async fn wait_connect(self) -> Result<Self::Connection, Self::Error>;
}

#[cfg_attr(not(target_family = "wasm"), async_trait)]
#[cfg_attr(target_family = "wasm", async_trait(?Send))]
pub trait EndpointAccept: Sized + maybe::Send {
type Connecting: Connecting;
type Accepting: Accepting;
type Error: std::error::Error + maybe::Send + maybe::Sync + 'static;

async fn accept(&self) -> Result<Option<Self::Accepting>, Self::Error>;
}

#[cfg_attr(not(target_family = "wasm"), async_trait)]
#[cfg_attr(target_family = "wasm", async_trait(?Send))]
pub trait Accepting: maybe::Send {
type Request: Request;
type Error: std::error::Error + maybe::Send + maybe::Sync + 'static;

async fn accept(&self) -> Result<Self::Connecting, Self::Error>;
async fn wait_accept(self) -> Result<Self::Request, Self::Error>;
}

#[cfg_attr(not(target_family = "wasm"), async_trait)]
#[cfg_attr(target_family = "wasm", async_trait(?Send))]
pub trait Request: maybe::Send {
type Connection: maybe::Send;
type OkError: std::error::Error + maybe::Send + maybe::Sync + 'static;
type CloseError: std::error::Error + maybe::Send + maybe::Sync + 'static;

async fn ok(self) -> Result<Self::Connection, Self::OkError>;
async fn close(self, status: u16) -> Result<(), Self::CloseError>;
}

pub trait Connection:
Expand Down
2 changes: 1 addition & 1 deletion crates/xwebtransport-core/src/utils/dummy.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@ where
type Connection = T;
type Error = std::convert::Infallible;

async fn wait(self) -> Result<Self::Connection, Self::Error> {
async fn wait_connect(self) -> Result<Self::Connection, Self::Error> {
Ok(self.0)
}
}
Expand Down
16 changes: 15 additions & 1 deletion crates/xwebtransport-error/src/impls/debug.rs
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,21 @@ where
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
match self {
Accept::Accept(inner) => f.debug_tuple("Accept::Accept").field(inner).finish(),
Accept::Connecting(inner) => f.debug_tuple("Accept::Connecting").field(inner).finish(),
}
}
}

impl<TAccepting> std::fmt::Debug for Accepting<TAccepting>
where
TAccepting: xwebtransport_core::Accepting,
{
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
match self {
Accepting::Accepting(inner) => f.debug_tuple("Accept::Accepting").field(inner).finish(),
Accepting::RequestOk(inner) => f.debug_tuple("Accept::RequestOk").field(inner).finish(),
Accepting::RequestClose(inner) => {
f.debug_tuple("Accept::RequestClose").field(inner).finish()
}
}
}
}
Expand Down
14 changes: 13 additions & 1 deletion crates/xwebtransport-error/src/impls/display.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,19 @@ where
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
match self {
Accept::Accept(inner) => write!(f, "accept: {inner}"),
Accept::Connecting(inner) => write!(f, "connecting: {inner}"),
}
}
}

impl<TAccepting> std::fmt::Display for Accepting<TAccepting>
where
TAccepting: xwebtransport_core::Accepting,
{
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
match self {
Accepting::Accepting(inner) => write!(f, "accepting: {inner}"),
Accepting::RequestOk(inner) => write!(f, "oking request: {inner}"),
Accepting::RequestClose(inner) => write!(f, "closing request: {inner}"),
}
}
}
Expand Down
5 changes: 5 additions & 0 deletions crates/xwebtransport-error/src/impls/error.rs
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,11 @@ impl<Endpoint> std::error::Error for Accept<Endpoint> where
{
}

impl<TAccepting> std::error::Error for Accepting<TAccepting> where
TAccepting: xwebtransport_core::Accepting
{
}

impl<Connect> std::error::Error for OpenBi<Connect> where Connect: xwebtransport_core::OpenBiStream {}

impl<Connect> std::error::Error for OpenUni<Connect> where Connect: xwebtransport_core::OpenUniStream
Expand Down
10 changes: 9 additions & 1 deletion crates/xwebtransport-error/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,15 @@ where
Endpoint: traits::EndpointAccept,
{
Accept(Endpoint::Error),
Connecting(<Endpoint::Connecting as traits::Connecting>::Error),
}

pub enum Accepting<TAccepting>
where
TAccepting: traits::Accepting,
{
Accepting(TAccepting::Error),
RequestOk(<TAccepting::Request as traits::Request>::OkError),
RequestClose(<TAccepting::Request as traits::Request>::CloseError),
}

pub enum OpenBi<Connection>
Expand Down
25 changes: 13 additions & 12 deletions crates/xwebtransport-tests/src/utils.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,28 +14,29 @@ where
.map_err(xwebtransport_error::Connect::Connect)?;

let connection = connecting
.wait()
.wait_connect()
.await
.map_err(xwebtransport_error::Connect::Connecting)?;

Ok(connection)
}

pub async fn accept<Endpoint>(
endpoint: Endpoint,
) -> Result<EndpointAcceptConnectionFor<Endpoint>, xwebtransport_error::Accept<Endpoint>>
pub async fn ok_accepting<Accepting>(
accepting: Accepting,
) -> Result<AcceptingConnectionFor<Accepting>, xwebtransport_error::Accepting<Accepting>>
where
Endpoint: xwebtransport_core::EndpointAccept,
EndpointAcceptConnectionFor<Endpoint>: xwebtransport_core::Connection,
Accepting: xwebtransport_core::Accepting,
AcceptingConnectionFor<Accepting>: xwebtransport_core::Connection,
{
let connecting = endpoint
.accept()
let request = accepting
.wait_accept()
.await
.map_err(xwebtransport_error::Accept::Accept)?;
let connection = connecting
.wait()
.map_err(xwebtransport_error::Accepting::Accepting)?;

let connection = request
.ok()
.await
.map_err(xwebtransport_error::Accept::Connecting)?;
.map_err(xwebtransport_error::Accepting::RequestOk)?;

Ok(connection)
}
Expand Down
2 changes: 1 addition & 1 deletion crates/xwebtransport-wtransport/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -18,4 +18,4 @@ wtransport = { version = "0.1.3" }
xwebtransport-tests = { version = "0.1", path = "../xwebtransport-tests" }

anyhow = "1"
tokio = "1"
tokio = { version = "1", features = ["macros"] }
43 changes: 32 additions & 11 deletions crates/xwebtransport-wtransport/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -24,28 +24,49 @@ impl xwebtransport_core::traits::EndpointConnect for Endpoint<wtransport::endpoi

#[async_trait]
impl xwebtransport_core::traits::EndpointAccept for Endpoint<wtransport::endpoint::Server> {
type Connecting = SessionRequest;
type Error = wtransport::error::ConnectionError;
type Accepting = IncomingSession;
type Error = std::convert::Infallible;

async fn accept(&self) -> Result<Self::Connecting, Self::Error> {
let incoming = self.0.accept().await;
incoming.await.map(SessionRequest)
async fn accept(&self) -> Result<Option<Self::Accepting>, Self::Error> {
let incoming_session = self.0.accept().await;
let incoming_session = IncomingSession(incoming_session);
Ok(Some(incoming_session))
}
}

impl xwebtransport_core::traits::Streams for Connection {
type SendStream = wtransport::SendStream;
type RecvStream = wtransport::RecvStream;
#[async_trait]
impl xwebtransport_core::traits::Accepting for IncomingSession {
type Request = SessionRequest;
type Error = wtransport::error::ConnectionError;

async fn wait_accept(self) -> Result<Self::Request, Self::Error> {
self.0.await.map(SessionRequest)
}
}

#[async_trait]
impl xwebtransport_core::traits::Connecting for SessionRequest {
impl xwebtransport_core::traits::Request for SessionRequest {
type Connection = Connection;
type Error = wtransport::error::ConnectionError;
type OkError = wtransport::error::ConnectionError;
type CloseError = std::convert::Infallible;

async fn wait(self) -> Result<Self::Connection, Self::Error> {
async fn ok(self) -> Result<Self::Connection, Self::OkError> {
self.0.accept().await.map(Connection)
}

async fn close(self, status: u16) -> Result<(), Self::CloseError> {
debug_assert!(
status == 404,
"wtransport driver only supports closing requests with 404 status code"
);
self.0.not_found().await;
Ok(())
}
}

impl xwebtransport_core::traits::Streams for Connection {
type SendStream = wtransport::SendStream;
type RecvStream = wtransport::RecvStream;
}

#[async_trait]
Expand Down
1 change: 1 addition & 0 deletions crates/xwebtransport-wtransport/src/types.rs
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ macro_rules! newtype {

newtype!(Endpoint<Side> => wtransport::Endpoint<Side>);
newtype!(Connection => wtransport::Connection);
newtype!(IncomingSession => wtransport::endpoint::IncomingSession);
newtype!(SessionRequest => wtransport::endpoint::SessionRequest);
newtype!(OpeningBiStream => wtransport::stream::OpeningBiStream);
newtype!(OpeningUniStream => wtransport::stream::OpeningUniStream);

0 comments on commit 7dd009d

Please sign in to comment.