Skip to content

Commit

Permalink
Tune compio integration (#412)
Browse files Browse the repository at this point in the history
  • Loading branch information
fafhrd91 authored Sep 5, 2024
1 parent 4327913 commit 8a3a8f1
Show file tree
Hide file tree
Showing 6 changed files with 38 additions and 25 deletions.
4 changes: 4 additions & 0 deletions ntex-compio/CHANGES.md
Original file line number Diff line number Diff line change
@@ -1,5 +1,9 @@
# Changes

## [0.1.1] - 2024-09-05

* Tune write task

## [0.1.0] - 2024-08-29

* Initial release
2 changes: 1 addition & 1 deletion ntex-compio/Cargo.toml
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
[package]
name = "ntex-compio"
version = "0.1.0"
version = "0.1.1"
authors = ["ntex contributors <[email protected]>"]
description = "compio runtime intergration for ntex framework"
keywords = ["network", "framework", "async", "futures"]
Expand Down
36 changes: 21 additions & 15 deletions ntex-compio/src/io.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,8 +15,10 @@ impl IoStream for crate::TcpStream {
compio::runtime::spawn(async move {
run(&mut io, &read, write).await;

let res = io.close().await;
log::debug!("{} Stream is closed, {:?}", read.tag(), res);
match io.close().await {
Ok(_) => log::debug!("{} Stream is closed", read.tag()),
Err(e) => log::error!("{} Stream is closed, {:?}", read.tag(), e),
}
})
.detach();

Expand All @@ -31,8 +33,10 @@ impl IoStream for crate::UnixStream {
compio::runtime::spawn(async move {
run(&mut io, &read, write).await;

let res = io.close().await;
log::debug!("{} Stream is closed, {:?}", read.tag(), res);
match io.close().await {
Ok(_) => log::debug!("{} Unix stream is closed", read.tag()),
Err(e) => log::error!("{} Unix stream is closed, {:?}", read.tag(), e),
}
})
.detach();

Expand Down Expand Up @@ -221,21 +225,23 @@ async fn write<T: AsyncWrite>(io: &mut T, state: &WriteContext) -> io::Result<()
let BufResult(result, buf1) = io.write(buf).await;
buf = buf1;

match result {
return match result {
Ok(0) => Err(io::Error::new(
io::ErrorKind::WriteZero,
"failed to write frame to transport",
)),
Ok(size) => {
if buf.0.len() == size {
return io.flush().await;
}
if size == 0 {
return Err(io::Error::new(
io::ErrorKind::WriteZero,
"failed to write frame to transport",
));
// return io.flush().await;
state.memory_pool().release_write_buf(buf.0);
Ok(())
} else {
buf.0.advance(size);
continue;
}
buf.0.advance(size);
}
Err(e) => return Err(e),
}
Err(e) => Err(e),
};
}
})
.await
Expand Down
4 changes: 4 additions & 0 deletions ntex-io/CHANGES.md
Original file line number Diff line number Diff line change
@@ -1,5 +1,9 @@
# Changes

## [2.3.1] - 2024-09-05

* Tune async io tasks support

## [2.3.0] - 2024-08-28

* Extend io task contexts, for "compio" runtime compatibility
Expand Down
2 changes: 1 addition & 1 deletion ntex-io/Cargo.toml
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
[package]
name = "ntex-io"
version = "2.3.0"
version = "2.3.1"
authors = ["ntex contributors <[email protected]>"]
description = "Utilities for encoding and decoding frames"
keywords = ["network", "framework", "async", "futures"]
Expand Down
15 changes: 7 additions & 8 deletions ntex-io/src/tasks.rs
Original file line number Diff line number Diff line change
@@ -1,7 +1,6 @@
use std::{future::poll_fn, future::Future, io, task::Context, task::Poll};

use ntex_bytes::{BufMut, BytesVec, PoolRef};
use ntex_util::task;

use crate::{Flags, IoRef, ReadStatus, WriteStatus};

Expand Down Expand Up @@ -157,11 +156,11 @@ impl ReadContext {
{
let inner = &self.0 .0;

// we already pushed new data to read buffer,
// we have to wait for dispatcher to read data from buffer
if inner.flags.get().is_read_buf_ready() {
task::yield_to().await;
}
// // we already pushed new data to read buffer,
// // we have to wait for dispatcher to read data from buffer
// if inner.flags.get().is_read_buf_ready() {
// ntex_util::task::yield_to().await;
// }

let mut buf = if inner.flags.get().is_read_buf_ready() {
// read buffer is still not read by dispatcher
Expand All @@ -175,9 +174,9 @@ impl ReadContext {
};

// make sure we've got room
let remaining = buf.remaining_mut();
let (hw, lw) = self.0.memory_pool().read_params().unpack();
if remaining < lw {
let remaining = buf.remaining_mut();
if remaining <= lw {
buf.reserve(hw - remaining);
}
let total = buf.len();
Expand Down

0 comments on commit 8a3a8f1

Please sign in to comment.