Skip to content

Commit

Permalink
framed
Browse files Browse the repository at this point in the history
Signed-off-by: iGxnon <[email protected]>
  • Loading branch information
iGxnon committed Jul 19, 2024
1 parent 8e02743 commit 866f59b
Show file tree
Hide file tree
Showing 20 changed files with 489 additions and 349 deletions.
7 changes: 1 addition & 6 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -26,11 +26,6 @@ pin-project-lite = "0.2"
rand = "0.8"
thiserror = "1"
tokio = { version = "1", features = ["io-util", "macros"], optional = true }
tokio-util = { version = "0.7", features = [
"codec",
"net",
"io-util",
], optional = true }

[dev-dependencies]
criterion = { version = "0.5", features = ["async_futures", "async_tokio"] }
Expand All @@ -46,7 +41,7 @@ tokio-macros = { git = "https://github.com/tokio-rs/tokio.git", rev = "833ee027d

[features]
default = ["tokio-udp"]
tokio-udp = ["dep:tokio-util", "dep:tokio"]
tokio-udp = ["dep:tokio"]
micro-bench = []

# TODO: move bench marks to a separate crate cz it maybe slow to compile with minitrace enabled at [dev-dependencies]
Expand Down
1 change: 0 additions & 1 deletion README.md
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,6 @@ Yet another project rewritten in Rust.
- Support `ACK`/`NACK` mechanism.
- Full tracing powered by [minitrace-rust](https://github.com/tikv/minitrace-rust).
- You can track a packet's span during deduplication, fragmentation, ...
- User space zero copy

## Roadmap

Expand Down
24 changes: 11 additions & 13 deletions src/client/conn/tokio.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,22 +2,20 @@ use std::io;
use std::net::ToSocketAddrs;
use std::sync::Arc;

use log::debug;
use minitrace::Span;
use tokio::net::UdpSocket as TokioUdpSocket;
use tokio_util::udp::UdpFramed;

use super::ConnectTo;
use crate::client::handler::offline::HandleOffline;
use crate::client::handler::offline::OfflineHandler;
use crate::client::handler::online::HandleOnline;
use crate::codec::tokio::Codec;
use crate::codec::frame::Framed;
use crate::codec::{Decoded, Encoded};
use crate::errors::Error;
use crate::guard::HandleOutgoing;
use crate::io::{SeparatedIO, IO};
use crate::link::TransferLink;
use crate::state::{IncomingStateManage, OutgoingStateManage};
use crate::utils::{Logged, TraceStreamExt};
use crate::utils::TraceStreamExt;
use crate::PeerContext;

impl ConnectTo for TokioUdpSocket {
Expand All @@ -38,9 +36,16 @@ impl ConnectTo for TokioUdpSocket {
return Err(io::Error::new(io::ErrorKind::AddrNotAvailable, "invalid address").into());
};

let incoming = OfflineHandler::new(
Framed::new(Arc::clone(&socket), config.mtu as usize), // TODO: discover MTU
addr,
config.offline_config(),
)
.await?;

let ack = TransferLink::new_arc(config.client_role());

let dst = UdpFramed::new(Arc::clone(&socket), Codec)
let dst = Framed::new(Arc::clone(&socket), config.mtu as usize)
.handle_outgoing(
Arc::clone(&ack),
config.send_buf_cap,
Expand All @@ -53,13 +58,6 @@ impl ConnectTo for TokioUdpSocket {
.frame_encoded(config.mtu, config.codec_config(), Arc::clone(&ack))
.manage_outgoing_state(None);

let incoming = UdpFramed::new(socket, Codec)
.logged_err(|err| {
debug!("codec error: {err} when decode offline frames");
})
.handle_offline(addr, config.offline_config())
.await?;

let src = ack
.filter_incoming_ack(incoming)
.frame_decoded(
Expand Down
83 changes: 44 additions & 39 deletions src/client/handler/offline.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@ use log::debug;
use pin_project_lite::pin_project;

use crate::errors::{CodecError, Error};
use crate::packet::connected::{self, Frames, FramesMut};
use crate::packet::connected::{self, FramesMut};
use crate::packet::{unconnected, Packet};
use crate::RoleContext;

Expand All @@ -18,25 +18,30 @@ pub(crate) struct Config {
pub(crate) protocol_version: u8,
}

pub(crate) trait HandleOffline: Sized {
fn handle_offline(self, server_addr: SocketAddr, config: Config) -> OfflineHandler<Self>;
pin_project! {
pub(crate) struct OfflineHandler<F> {
frame: Option<F>,
state: State,
server_addr: SocketAddr,
config: Config,
role: RoleContext,
}
}

impl<F> HandleOffline for F
impl<F> OfflineHandler<F>
where
F: Stream<Item = (Packet<FramesMut>, SocketAddr)>
+ Sink<(Packet<Frames>, SocketAddr), Error = CodecError>,
+ Sink<(unconnected::Packet, SocketAddr), Error = CodecError>
+ Unpin,
{
fn handle_offline(self, server_addr: SocketAddr, config: Config) -> OfflineHandler<Self> {
OfflineHandler {
frame: Some(self),
state: State::SendOpenConnectionRequest1(Packet::Unconnected(
unconnected::Packet::OpenConnectionRequest1 {
magic: (),
protocol_version: config.protocol_version,
mtu: config.mtu,
},
)),
pub(crate) fn new(frame: F, server_addr: SocketAddr, config: Config) -> Self {
Self {
frame: Some(frame),
state: State::SendOpenConnReq1(unconnected::Packet::OpenConnectionRequest1 {
magic: (),
protocol_version: config.protocol_version,
mtu: config.mtu,
}),
server_addr,
role: RoleContext::Client {
guid: config.client_guid,
Expand All @@ -46,27 +51,19 @@ where
}
}

pin_project! {
pub(crate) struct OfflineHandler<F> {
frame: Option<F>,
state: State,
server_addr: SocketAddr,
config: Config,
role: RoleContext,
}
}

enum State {
SendOpenConnectionRequest1(Packet<Frames>),
WaitOpenConnectionReply1,
SendOpenConnectionRequest2(Packet<Frames>),
WaitOpenConnectionReply2,
SendOpenConnReq1(unconnected::Packet),
SendOpenConnReq1Flush,
WaitOpenConnReply1,
SendOpenConnReq2(unconnected::Packet),
SendOpenConnReq2Flush,
WaitOpenConnReply2,
}

impl<F> Future for OfflineHandler<F>
where
F: Stream<Item = (Packet<FramesMut>, SocketAddr)>
+ Sink<(Packet<Frames>, SocketAddr), Error = CodecError>
+ Sink<(unconnected::Packet, SocketAddr), Error = CodecError>
+ Unpin,
{
type Output = Result<impl Stream<Item = connected::Packet<FramesMut>>, Error>;
Expand All @@ -76,7 +73,7 @@ where
let frame = this.frame.as_mut().unwrap();
loop {
match this.state {
State::SendOpenConnectionRequest1(pack) => {
State::SendOpenConnReq1(pack) => {
if let Err(err) = ready!(frame.poll_ready_unpin(cx)) {
debug!(
"[{}] SendingOpenConnectionRequest1 poll_ready error: {err}, retrying",
Expand All @@ -91,16 +88,20 @@ where
);
continue;
}
*this.state = State::SendOpenConnReq1Flush;
}
State::SendOpenConnReq1Flush => {
if let Err(err) = ready!(frame.poll_flush_unpin(cx)) {
debug!(
"[{}] SendingOpenConnectionRequest1 poll_flush error: {err}, retrying",
this.role
);
continue;
}
*this.state = State::WaitOpenConnectionReply1;
*this.state = State::WaitOpenConnReply1;
}
State::WaitOpenConnectionReply1 => {
State::WaitOpenConnReply1 => {
// TODO: Add timeout
let Some((pack, addr)) = ready!(frame.poll_next_unpin(cx)) else {
return Poll::Ready(Err(Error::ConnectionClosed));
};
Expand All @@ -111,17 +112,17 @@ where
Packet::Unconnected(unconnected::Packet::OpenConnectionReply1 {
mtu,
..
}) => Packet::Unconnected(unconnected::Packet::OpenConnectionRequest2 {
}) => unconnected::Packet::OpenConnectionRequest2 {
magic: (),
server_address: *this.server_addr,
mtu,
client_guid: this.config.client_guid,
}),
},
_ => continue,
};
*this.state = State::SendOpenConnectionRequest2(next);
*this.state = State::SendOpenConnReq2(next);
}
State::SendOpenConnectionRequest2(pack) => {
State::SendOpenConnReq2(pack) => {
if let Err(err) = ready!(frame.poll_ready_unpin(cx)) {
debug!(
"[{}] SendOpenConnectionRequest2 poll_ready error: {err}, retrying",
Expand All @@ -136,16 +137,20 @@ where
);
continue;
}
*this.state = State::SendOpenConnReq2Flush;
}
State::SendOpenConnReq2Flush => {
if let Err(err) = ready!(frame.poll_flush_unpin(cx)) {
debug!(
"[{}] SendOpenConnectionRequest2 poll_flush error: {err}, retrying",
this.role
);
continue;
}
*this.state = State::WaitOpenConnectionReply2;
*this.state = State::WaitOpenConnReply2;
}
State::WaitOpenConnectionReply2 => {
State::WaitOpenConnReply2 => {
// TODO: Add timeout
let Some((pack, addr)) = ready!(frame.poll_next_unpin(cx)) else {
return Poll::Ready(Err(Error::ConnectionClosed));
};
Expand Down
16 changes: 8 additions & 8 deletions src/codec/decoder/frame.rs → src/codec/decoder/body.rs
Original file line number Diff line number Diff line change
Expand Up @@ -10,26 +10,26 @@ use crate::errors::CodecError;
use crate::packet::connected::{Frame, FrameBody, FrameSet};

pin_project! {
pub(crate) struct FrameDecoder<F> {
pub(crate) struct BodyDecoder<F> {
#[pin]
frame: F
}
}

pub(crate) trait FrameDecoded: Sized {
fn frame_decoded(self) -> FrameDecoder<Self>;
pub(crate) trait BodyDecoded: Sized {
fn body_decoded(self) -> BodyDecoder<Self>;
}

impl<F> FrameDecoded for F
impl<F> BodyDecoded for F
where
F: Stream<Item = Result<FrameSet<Frame>, CodecError>>,
{
fn frame_decoded(self) -> FrameDecoder<Self> {
FrameDecoder { frame: self }
fn body_decoded(self) -> BodyDecoder<Self> {
BodyDecoder { frame: self }
}
}

impl<F> Stream for FrameDecoder<F>
impl<F> Stream for BodyDecoder<F>
where
F: Stream<Item = Result<FrameSet<Frame>, CodecError>>,
{
Expand All @@ -42,7 +42,7 @@ where
return Poll::Ready(None);
};

let span = LocalSpan::enter_with_local_parent("codec.reframe")
let span = LocalSpan::enter_with_local_parent("codec.body_decoder")
.with_properties(|| [("frame_seq_num", frame_set.seq_num.to_string())]);

match FrameBody::read(frame_set.set.body) {
Expand Down
4 changes: 2 additions & 2 deletions src/codec/decoder/mod.rs
Original file line number Diff line number Diff line change
@@ -1,9 +1,9 @@
mod body;
mod dedup;
mod fragment;
mod frame;
mod ordered;

pub(super) use body::*;
pub(super) use dedup::*;
pub(super) use fragment::*;
pub(super) use frame::*;
pub(super) use ordered::*;
22 changes: 11 additions & 11 deletions src/codec/encoder/frame.rs → src/codec/encoder/body.rs
Original file line number Diff line number Diff line change
Expand Up @@ -11,24 +11,24 @@ use crate::packet::connected::{FrameBody, Reliability};
use crate::Message;

pin_project! {
// FrameEncoder encodes internal frame body into Message
pub(crate) struct FrameEncoder<F> {
// BodyEncoder encodes internal frame body into Message
pub(crate) struct BodyEncoder<F> {
#[pin]
frame: F,
link: SharedLink,
}
}

pub(crate) trait FrameEncoded: Sized {
fn frame_encoded(self, link: SharedLink) -> FrameEncoder<Self>;
pub(crate) trait BodyEncoded: Sized {
fn body_encoded(self, link: SharedLink) -> BodyEncoder<Self>;
}

impl<F> FrameEncoded for F
impl<F> BodyEncoded for F
where
F: Sink<Message, Error = CodecError>,
{
fn frame_encoded(self, link: SharedLink) -> FrameEncoder<Self> {
FrameEncoder { frame: self, link }
fn body_encoded(self, link: SharedLink) -> BodyEncoder<Self> {
BodyEncoder { frame: self, link }
}
}

Expand All @@ -45,7 +45,7 @@ fn encode(body: FrameBody) -> Message {
FrameBody::DisconnectNotification => Reliability::Reliable,
FrameBody::DetectLostConnections => Reliability::Reliable,
FrameBody::User(_) => {
panic!("you should not send user packet into FrameEncoder, please send `Message`")
panic!("you should not send user packet into BodyEncoder, please send `Message`")
}
};
let mut data = BytesMut::new();
Expand All @@ -57,7 +57,7 @@ fn encode(body: FrameBody) -> Message {
)
}

impl<F> FrameEncoder<F>
impl<F> BodyEncoder<F>
where
F: Sink<Message, Error = CodecError>,
{
Expand All @@ -83,7 +83,7 @@ where
}
}

impl<F> Sink<Message> for FrameEncoder<F>
impl<F> Sink<Message> for BodyEncoder<F>
where
F: Sink<Message, Error = CodecError>,
{
Expand All @@ -107,7 +107,7 @@ where
}
}

impl<F> Sink<FrameBody> for FrameEncoder<F>
impl<F> Sink<FrameBody> for BodyEncoder<F>
where
F: Sink<Message, Error = CodecError>,
{
Expand Down
Loading

0 comments on commit 866f59b

Please sign in to comment.