Skip to content

Commit

Permalink
Fix memory pool waiters management (#334)
Browse files Browse the repository at this point in the history
  • Loading branch information
fafhrd91 authored Apr 2, 2024
1 parent 351f699 commit 975f64c
Show file tree
Hide file tree
Showing 4 changed files with 99 additions and 99 deletions.
4 changes: 4 additions & 0 deletions ntex-bytes/CHANGELOG.md
Original file line number Diff line number Diff line change
@@ -1,5 +1,9 @@
# Changes

## [0.1.25] (2024-04-02)

* Fix pool waiters management

## [0.1.24] (2024-02-01)

* Add `checked` api
Expand Down
14 changes: 8 additions & 6 deletions ntex-bytes/Cargo.toml
Original file line number Diff line number Diff line change
@@ -1,10 +1,10 @@
[package]
name = "ntex-bytes"
version = "0.1.24"
version = "0.1.25"
authors = ["Nikolay Kim <[email protected]>", "Carl Lerche <[email protected]>"]
description = "Types and traits for working with bytes (bytes crate fork)"
documentation = "https://docs.rs/ntex-bytes"
repository = "https://github.com/ntex-rs/ntex.git"
repository = "https://github.com/ntex-rs/ntex"
readme = "README.md"
keywords = ["buffers", "zero-copy", "io"]
categories = ["network-programming", "data-structures"]
Expand All @@ -18,13 +18,15 @@ default = []
simd = ["simdutf8"]

[dependencies]
bitflags = "2.4"
bitflags = "2"
bytes = "1"
serde = "1"
futures-core = { version = "0.3", default-features = false, features = ["alloc"] }
futures-core = { version = "0.3", default-features = false }
simdutf8 = { version = "0.1.4", optional = true }

backtrace = "*"

[dev-dependencies]
serde_test = "1.0"
serde_json = "1.0"
serde_test = "1"
serde_json = "1"
ntex = { version = "1", features = ["tokio"] }
176 changes: 85 additions & 91 deletions ntex-bytes/src/pool.rs
Original file line number Diff line number Diff line change
Expand Up @@ -81,7 +81,7 @@ impl PoolId {
#[inline]
pub fn pool(self) -> Pool {
POOLS.with(|pools| Pool {
idx: Cell::new(0),
idx: Cell::new(usize::MAX),
inner: pools[self.0 as usize],
})
}
Expand Down Expand Up @@ -462,7 +462,7 @@ impl Clone for Pool {
#[inline]
fn clone(&self) -> Pool {
Pool {
idx: Cell::new(0),
idx: Cell::new(usize::MAX),
inner: self.inner,
}
}
Expand All @@ -484,12 +484,10 @@ impl From<PoolRef> for Pool {

impl Drop for Pool {
fn drop(&mut self) {
// cleanup waiter
let idx = self.idx.get();
if idx > 0 {
// cleanup waiter
let mut waiters = self.inner.waiters.borrow_mut();
waiters.remove(idx - 1);
waiters.truncate();
if idx != usize::MAX {
self.inner.waiters.borrow_mut().remove(idx);
}
}
}
Expand All @@ -515,10 +513,8 @@ impl Pool {
/// Check if pool is ready
pub fn is_ready(&self) -> bool {
let idx = self.idx.get();
if idx > 0 {
if let Some(Entry::Occupied(_)) =
self.inner.waiters.borrow().entries.get(idx - 1)
{
if idx != usize::MAX {
if let Some(Entry::Occupied(_)) = self.inner.waiters.borrow().entries.get(idx) {
return false;
}
}
Expand All @@ -543,26 +539,26 @@ impl Pool {
let allocated = self.inner.size.load(Relaxed);
if allocated < window_l {
let idx = self.idx.get();
if idx > 0 {
if idx != usize::MAX {
// cleanup waiter
let mut waiters = self.inner.waiters.borrow_mut();
waiters.remove(idx - 1);
waiters.truncate();
self.idx.set(0);
self.inner.waiters.borrow_mut().remove(idx);
self.idx.set(usize::MAX);
}
return Poll::Ready(());
}

// register waiter only if spawn fn is provided
if let Some(spawn) = &*self.inner.spawn.borrow() {
let idx = self.idx.get();
let mut flags = self.inner.flags.get();
let mut waiters = self.inner.waiters.borrow_mut();
let new = if idx == 0 {
self.idx.set(waiters.append(ctx.waker().clone()) + 1);
true
} else {
waiters.update(idx - 1, ctx.waker().clone())
let new = {
let idx = self.idx.get();
if idx == usize::MAX {
self.idx.set(waiters.append(ctx.waker().clone()));
true
} else {
waiters.update(idx, ctx.waker().clone())
}
};

// if memory usage has increased since last window change,
Expand Down Expand Up @@ -600,7 +596,7 @@ impl Driver {
fn release(&self, waiters_num: usize) {
let mut waiters = self.pool.waiters.borrow_mut();

let mut to_release = waiters.occupied_len / 100 * 5;
let mut to_release = waiters.occupied_len >> 4;
if waiters_num > to_release {
to_release += waiters_num >> 1;
} else {
Expand Down Expand Up @@ -654,7 +650,7 @@ impl Future for Driver {
pool.flags.set(Flags::INCREASED);
return Poll::Ready(());
} else {
// release 5% of pending waiters
// release 6% of pending waiters
self.release(waiters);

if allocated > windows[idx].0 {
Expand Down Expand Up @@ -725,15 +721,6 @@ impl Waiters {
}
}

fn truncate(&mut self) {
if self.len == 0 {
self.entries.truncate(0);
self.root = usize::MAX;
self.tail = usize::MAX;
self.free = 0;
}
}

fn get_node(&mut self, key: usize) -> &mut Node {
if let Some(Entry::Occupied(ref mut node)) = self.entries.get_mut(key) {
return node;
Expand All @@ -745,11 +732,10 @@ impl Waiters {
fn consume(&mut self) -> Option<Waker> {
if self.root != usize::MAX {
self.occupied_len -= 1;
let entry =
mem::replace(self.entries.get_mut(self.root).unwrap(), Entry::Consumed);

let entry = self.entries.get_mut(self.root).unwrap();
let prev = mem::replace(entry, Entry::Consumed);

match prev {
match entry {
Entry::Occupied(node) => {
debug_assert!(node.prev == usize::MAX);

Expand All @@ -760,57 +746,63 @@ impl Waiters {
} else {
// remove from root
self.root = node.next;
self.get_node(self.root).prev = usize::MAX;
if self.root != usize::MAX {
self.get_node(self.root).prev = usize::MAX;
}
}
Some(node.item)
}
_ => {
unreachable!()
}
_ => unreachable!(),
}
} else {
None
}
}

fn update(&mut self, key: usize, val: Waker) -> bool {
if let Some(entry) = self.entries.get_mut(key) {
match entry {
Entry::Occupied(ref mut node) => {
node.item = val;
return false;
fn update(&mut self, idx: usize, val: Waker) -> bool {
let entry = self
.entries
.get_mut(idx)
.expect("Entry is expected to exist");
match entry {
Entry::Occupied(ref mut node) => {
node.item = val;
false
}
Entry::Consumed => {
// append to the tail
*entry = Entry::Occupied(Node {
item: val,
prev: self.tail,
next: usize::MAX,
});

self.occupied_len += 1;
if self.root == usize::MAX {
self.root = idx;
}
Entry::Consumed => {
*entry = Entry::Occupied(Node {
item: val,
prev: self.tail,
next: usize::MAX,
});
if self.tail != usize::MAX {
self.get_node(self.tail).next = idx;
}
_ => unreachable!(),
self.tail = idx;
true
}
Entry::Vacant(_) => unreachable!(),
}
self.occupied_len += 1;
if self.root == usize::MAX {
self.root = key;
}
if self.tail != usize::MAX {
self.get_node(self.tail).next = key;
}
self.tail = key;
true
}

fn remove(&mut self, key: usize) {
if let Some(entry) = self.entries.get_mut(key) {
// Swap the entry at the provided value
let prev = mem::replace(entry, Entry::Vacant(self.free));
let entry = mem::replace(entry, Entry::Vacant(self.free));

self.len -= 1;
self.free = key;

match prev {
match entry {
Entry::Occupied(node) => {
self.len -= 1;
self.occupied_len -= 1;
self.free = key;

// remove from root
if self.root == key {
self.root = node.next;
Expand All @@ -826,52 +818,54 @@ impl Waiters {
}
}
}
Entry::Consumed => {
self.len -= 1;
self.free = key;
}
_ => {
unreachable!()
}
Entry::Consumed => {}
Entry::Vacant(_) => unreachable!(),
}

if self.len == 0 {
self.entries.truncate(128);
}
}
}

fn append(&mut self, val: Waker) -> usize {
let idx = self.free;

self.len += 1;
self.occupied_len += 1;
let key = self.free;

if key == self.entries.len() {
if self.root == usize::MAX {
self.root = key;
}
if self.tail != usize::MAX {
self.get_node(self.tail).next = key;
}
// root points to first entry, append to empty list
if self.root == usize::MAX {
self.root = idx;
}
// tail points to last entry
if self.tail != usize::MAX {
self.get_node(self.tail).next = idx;
}

// append item to entries, first free item is not allocated yet
if idx == self.entries.len() {
self.entries.push(Entry::Occupied(Node {
item: val,
prev: self.tail,
next: usize::MAX,
}));
self.tail = key;
self.free = key + 1;
self.tail = idx;
self.free = idx + 1;
} else {
self.free = match self.entries.get(key) {
// entries has enough capacity
self.free = match self.entries.get(idx) {
Some(&Entry::Vacant(next)) => next,
_ => unreachable!(),
};
if self.tail != usize::MAX {
self.get_node(self.tail).next = key;
}
self.entries[key] = Entry::Occupied(Node {
self.entries[idx] = Entry::Occupied(Node {
item: val,
prev: self.tail,
next: usize::MAX,
});
self.tail = key;
self.tail = idx;
}
key

idx
}
}
4 changes: 2 additions & 2 deletions ntex/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@ authors = ["ntex contributors <[email protected]>"]
description = "Framework for composable network services"
readme = "README.md"
keywords = ["ntex", "networking", "framework", "async", "futures"]
repository = "https://github.com/ntex-rs/ntex.git"
repository = "https://github.com/ntex-rs/ntex"
documentation = "https://docs.rs/ntex/"
categories = [
"network-programming",
Expand Down Expand Up @@ -63,7 +63,7 @@ ntex-router = "0.5.3"
ntex-service = "2.0.1"
ntex-macros = "0.1.3"
ntex-util = "1.0.1"
ntex-bytes = "0.1.24"
ntex-bytes = "0.1.25"
ntex-server = "1.0.5"
ntex-h2 = "0.5.2"
ntex-rt = "0.4.12"
Expand Down

0 comments on commit 975f64c

Please sign in to comment.