Skip to content

Commit

Permalink
Stream closing and finishing APIs
Browse files Browse the repository at this point in the history
  • Loading branch information
MOZGIII committed Sep 14, 2024
1 parent 62a1665 commit 2e29bb3
Show file tree
Hide file tree
Showing 25 changed files with 974 additions and 17 deletions.
8 changes: 8 additions & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

47 changes: 47 additions & 0 deletions crates/xwt-anchor/src/impls/xwt_core/streams.rs
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,41 @@ where
}
}

impl<T> xwt_core::stream::WriteAbort for SendStream<T>
where
T: xwt_core::stream::WriteAbort,
{
type ErrorCode = T::ErrorCode;
type Error = T::Error;

async fn abort(self, error_code: Self::ErrorCode) -> Result<(), Self::Error> {
T::abort(self.0, error_code).await
}
}

impl<T> xwt_core::stream::WriteAborted for SendStream<T>
where
T: xwt_core::stream::WriteAborted,
{
type ErrorCode = T::ErrorCode;
type Error = T::Error;

async fn aborted(self) -> Result<Option<Self::ErrorCode>, Self::Error> {
T::aborted(self.0).await
}
}

impl<T> xwt_core::stream::Finish for SendStream<T>
where
T: xwt_core::stream::Finish,
{
type Error = T::Error;

async fn finish(self) -> Result<(), Self::Error> {
T::finish(self.0).await
}
}

impl<T> xwt_core::stream::Read for RecvStream<T>
where
T: xwt_core::stream::Read,
Expand All @@ -23,3 +58,15 @@ where
T::read(&mut self.0, buf).await
}
}

impl<T> xwt_core::stream::ReadAbort for RecvStream<T>
where
T: xwt_core::stream::ReadAbort,
{
type ErrorCode = T::ErrorCode;
type Error = T::Error;

async fn abort(self, error_code: Self::ErrorCode) -> Result<(), Self::Error> {
T::abort(self.0, error_code).await
}
}
5 changes: 4 additions & 1 deletion crates/xwt-core/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,10 @@ pub mod prelude {
pub use crate::session::stream::{
AcceptBi as _, AcceptUni as _, OpenBi as _, OpenUni as _, OpeningBi as _, OpeningUni as _,
};
pub use crate::stream::{Read as _, ReadChunk as _, Write as _, WriteChunk as _};
pub use crate::stream::{
AbortableRead as _, AsErrorCode as _, Finish as _, Read as _, ReadAbort as _,
ReadChunk as _, Write as _, WriteAbort as _, WriteAborted as _, WriteChunk as _,
};

pub use crate::endpoint::accept_utils::*;
pub use crate::endpoint::connect_utils::*;
Expand Down
7 changes: 5 additions & 2 deletions crates/xwt-core/src/session/stream.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,11 +5,14 @@ use core::future::Future;
use crate::utils::{maybe, Error};

pub trait SendSpec: maybe::Send {
type SendStream: crate::stream::Write;
type SendStream: crate::stream::Write
+ crate::stream::WriteAbort
+ crate::stream::WriteAborted
+ crate::stream::Finish;
}

pub trait RecvSpec: maybe::Send {
type RecvStream: crate::stream::Read;
type RecvStream: crate::stream::Read + crate::stream::ReadAbort;
}

pub trait PairSpec: maybe::Send + SendSpec + RecvSpec {}
Expand Down
92 changes: 92 additions & 0 deletions crates/xwt-core/src/stream.rs
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,98 @@ pub trait Write: maybe::Send {
) -> impl Future<Output = Result<usize, Self::Error>> + maybe::Send;
}

/// Abort the read stream.
///
/// Sends a signal to the peer that the read side of the stream has been
/// aborted.
/// Discards the receive buffer; the peer is typically expected to abort
/// the corresponding send side in response.
///
/// An unsigned 8-bit error code can be supplied as a part of the signal
/// to the peer.
pub trait ReadAbort: maybe::Send {
/// An error code to abort the stream with.
///
/// Pass `0` for default.
type ErrorCode: From<u32> + maybe::Send + maybe::Sync;

/// An error that can occur while stopping the stream.
type Error: Error + maybe::Send + maybe::Sync + 'static;

/// Abort the stream.
fn abort(
self,
error_code: Self::ErrorCode,
) -> impl Future<Output = Result<(), Self::Error>> + maybe::Send;
}

/// Abort the write stream.
///
/// Sends a signal to the peer that the write side of the stream has been
/// aborted.
/// Discards the send buffer; if possible, no currently outstanding data
/// is transmitted or retransmitted.
///
/// An unsigned 8-bit error code can be supplied as a part of the signal to
/// the peer; if omitted, the error code is presumed to be 0.
pub trait WriteAbort: maybe::Send {
/// An error code to abort the stream with.
///
/// Pass `0` for default.
type ErrorCode: From<u32> + maybe::Send + maybe::Sync;

/// An error that can occur while stopping the stream.
type Error: Error + maybe::Send + maybe::Sync + 'static;

/// Abort the stream.
fn abort(
self,
error_code: Self::ErrorCode,
) -> impl Future<Output = Result<(), Self::Error>> + maybe::Send;
}

/// Wait for the write stream to abort.
///
/// This can happen when the "read" part aborts the stream.
pub trait WriteAborted: maybe::Send {
/// An error code the stream is aborted with.
type ErrorCode: TryInto<u32> + maybe::Send + maybe::Sync;

/// An error that can occur while waiting for a stream to be aborted.
type Error: Error + maybe::Send + maybe::Sync + 'static;

/// Wait for a stream to abort.
fn aborted(
self,
) -> impl Future<Output = Result<Option<Self::ErrorCode>, Self::Error>> + maybe::Send;
}

/// Finish the write stream.
///
/// Call when all data has been submitted and no further data will be written.
pub trait Finish: maybe::Send {
/// An error that can occur while finishing the stream.
type Error: Error + maybe::Send + maybe::Sync + 'static;

/// Finish the stream.
fn finish(self) -> impl Future<Output = Result<(), Self::Error>> + maybe::Send;
}

/// A way to represent a stream operation error as an error code.
pub trait AsErrorCode: maybe::Send {
/// An error code the stream is aborted with.
type ErrorCode: TryInto<u32> + maybe::Send + maybe::Sync;

/// Represent the error as an error code.
fn as_error_code(&self) -> Option<Self::ErrorCode>;
}

/// An extension to the [`Read`] trait that enables reading the error code
/// from a stream reading error.
pub trait AbortableRead: Read<Error: AsErrorCode> {}

impl<T> AbortableRead for T where T: Read<Error: AsErrorCode> {}

/// An chunk of data with an explicit offset in the stream.
#[derive(Debug)]
pub struct Chunk<Data> {
Expand Down
21 changes: 21 additions & 0 deletions crates/xwt-core/src/stream_utils.rs
Original file line number Diff line number Diff line change
Expand Up @@ -13,3 +13,24 @@ pub type ReadChunkErrorFor<T, Data> = <T as stream::ReadChunk<Data>>::Error;

/// A shortcut for the error type for a given [`stream::WriteChunk] type.
pub type WriteChunkErrorFor<T, Data> = <T as stream::WriteChunk<Data>>::Error;

/// A shortcut for the error type for a given [`stream::ReadAbort`] type.
pub type ReadAbortErrorFor<T> = <T as stream::ReadAbort>::Error;

/// A shortcut for the error code type for a given [`stream::ReadAbort`] type.
pub type ReadAbortErrorCodeFor<T> = <T as stream::ReadAbort>::ErrorCode;

/// A shortcut for the error type for a given [`stream::WriteAbort`] type.
pub type WriteAbortErrorFor<T> = <T as stream::WriteAbort>::Error;

/// A shortcut for the error code type for a given [`stream::WriteAbort`] type.
pub type WriteAbortErrorCodeFor<T> = <T as stream::WriteAbort>::ErrorCode;

/// A shortcut for the error type for a given [`stream::WriteAborted`] type.
pub type WriteAbortedErrorFor<T> = <T as stream::WriteAborted>::Error;

/// A shortcut for the error code type for a given [`stream::WriteAborted`] type.
pub type WriteAbortedErrorCodeFor<T> = <T as stream::WriteAborted>::ErrorCode;

/// A shortcut for the error type for a given [`stream::Finish`] type.
pub type FinishErrorFor<T> = <T as stream::Finish>::Error;
1 change: 1 addition & 0 deletions crates/xwt-core/src/utils/error.rs
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ pub use std::error::Error;
pub use core::error::Error;

#[cfg(all(not(feature = "std"), not(feature = "error-in-core")))]
/// An xwt error.
pub trait Error: core::fmt::Debug + core::fmt::Display {}

#[cfg(all(not(feature = "std"), not(feature = "error-in-core")))]
Expand Down
6 changes: 3 additions & 3 deletions crates/xwt-test-server/src/handling.rs
Original file line number Diff line number Diff line change
Expand Up @@ -57,7 +57,7 @@ where
}
}

#[typle(Tuple for 1..=5)]
#[typle(Tuple for 1..=32)]
impl<T> SpawnHandleSession for T
where
T: Tuple,
Expand Down Expand Up @@ -146,7 +146,7 @@ pub trait RouteSession: Send {
fn handler() -> impl HandleSessionRequest;
}

#[typle(Tuple for 1..=5)]
#[typle(Tuple for 1..=32)]
impl<T> StaticHandleSessionRequest for T
where
T: Tuple,
Expand All @@ -165,7 +165,7 @@ where
}
}

tracing::info!(message = "rejecting incoming session due to path mismatch");
tracing::info!(message = "rejecting incoming session due to path mismatch", %path);
session_request.not_found().await;
Ok(())
}
Expand Down
Loading

0 comments on commit 2e29bb3

Please sign in to comment.