Skip to content

Commit

Permalink
Merge pull request #52 from elfenpiff/iox2-51-finalize-mac-os-support
Browse files Browse the repository at this point in the history
iox2-51 integrate posix condition variable for mac os
  • Loading branch information
elfenpiff authored Dec 21, 2023
2 parents d041800 + 05d43b0 commit 5d54039
Show file tree
Hide file tree
Showing 15 changed files with 434 additions and 324 deletions.
34 changes: 23 additions & 11 deletions iceoryx2-pal/concurrency-sync/src/condition_variable.rs
Original file line number Diff line number Diff line change
Expand Up @@ -13,15 +13,18 @@
use core::sync::atomic::{AtomicU32, Ordering};

pub use crate::mutex::Mutex;
use crate::{semaphore::Semaphore, WaitAction, WaitResult};

pub struct ConditionVariable {
counter: AtomicU32,
number_of_waiters: AtomicU32,
semaphore: Semaphore,
}

impl Default for ConditionVariable {
fn default() -> Self {
Self {
counter: AtomicU32::new(0),
semaphore: Semaphore::new(0),
number_of_waiters: AtomicU32::new(0),
}
}
}
Expand All @@ -31,28 +34,37 @@ impl ConditionVariable {
Self::default()
}

pub fn notify<WakeOneOrAll: Fn(&AtomicU32)>(&self, wake_one_or_all: WakeOneOrAll) {
self.counter.fetch_add(1, Ordering::Relaxed);
wake_one_or_all(&self.counter);
pub fn notify_one<WakeOne: Fn(&AtomicU32)>(&self, wake_one: WakeOne) {
self.semaphore.post(
wake_one,
1.min(self.number_of_waiters.load(Ordering::Relaxed)),
);
}

pub fn notify_all<WakeAll: Fn(&AtomicU32)>(&self, wake_all: WakeAll) {
self.semaphore
.post(wake_all, self.number_of_waiters.load(Ordering::Relaxed));
}

pub fn wait<
WakeOne: Fn(&AtomicU32),
Wait: Fn(&AtomicU32, &u32) -> bool,
MtxWait: Fn(&AtomicU32, &u32) -> bool,
Wait: Fn(&AtomicU32, &u32) -> WaitAction,
MtxWait: Fn(&AtomicU32, &u32) -> WaitAction,
>(
&self,
mtx: &Mutex,
mtx_wake_one: WakeOne,
wait: Wait,
mtx_wait: MtxWait,
) -> bool {
let counter_value = self.counter.load(Ordering::Relaxed);
) -> WaitResult {
self.number_of_waiters.fetch_add(1, Ordering::Relaxed);
mtx.unlock(mtx_wake_one);

if !wait(&self.counter, &counter_value) {
return false;
if self.semaphore.wait(wait) == WaitResult::Interrupted {
self.number_of_waiters.fetch_sub(1, Ordering::Relaxed);
return WaitResult::Interrupted;
}
self.number_of_waiters.fetch_sub(1, Ordering::Relaxed);

// this maybe problematic when the wait has a timeout. it is possible that
// the timeout is nearly doubled when wait is waken up at the end of the timeout
Expand Down
12 changes: 12 additions & 0 deletions iceoryx2-pal/concurrency-sync/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,3 +19,15 @@ pub mod condition_variable;
pub mod mutex;
pub mod rwlock;
pub mod semaphore;

#[derive(Debug, PartialEq, Eq)]
pub enum WaitAction {
Continue,
Abort,
}

#[derive(Debug, PartialEq, Eq)]
pub enum WaitResult {
Interrupted,
Success,
}
36 changes: 22 additions & 14 deletions iceoryx2-pal/concurrency-sync/src/mutex.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,8 @@ use core::{
sync::atomic::{AtomicU32, Ordering},
};

use crate::{WaitAction, WaitResult};

pub struct Mutex {
// we use an AtomicU32 since it should be supported on nearly every platform
state: AtomicU32,
Expand All @@ -33,24 +35,24 @@ impl Mutex {
}
}

pub fn lock<Wait: Fn(&AtomicU32, &u32) -> bool>(&self, wait: Wait) -> bool {
if self.uncontested_lock(crate::SPIN_REPETITIONS) {
return true;
pub fn lock<Wait: Fn(&AtomicU32, &u32) -> WaitAction>(&self, wait: Wait) -> WaitResult {
if self.uncontested_lock(crate::SPIN_REPETITIONS) == WaitResult::Success {
return WaitResult::Success;
}

loop {
let keep_running = wait(&self.state, &1);
let action = wait(&self.state, &1);

if self
.state
.compare_exchange(0, 1, Ordering::Acquire, Ordering::Relaxed)
.is_ok()
{
return true;
return WaitResult::Success;
}

if !keep_running {
return false;
if action == WaitAction::Abort {
return WaitResult::Interrupted;
}
}
}
Expand All @@ -60,15 +62,21 @@ impl Mutex {
wake_one(&self.state);
}

pub fn try_lock(&self) -> bool {
self.state
pub fn try_lock(&self) -> WaitResult {
if self
.state
.compare_exchange(0, 1, Ordering::Acquire, Ordering::Relaxed)
.is_ok()
{
WaitResult::Success
} else {
WaitResult::Interrupted
}
}

fn uncontested_lock(&self, retry_limit: u64) -> bool {
if self.try_lock() {
return true;
fn uncontested_lock(&self, retry_limit: u64) -> WaitResult {
if self.try_lock() == WaitResult::Success {
return WaitResult::Success;
}

let mut retry_counter = 0;
Expand All @@ -81,10 +89,10 @@ impl Mutex {
retry_counter += 1;

if retry_limit == retry_counter {
return false;
return WaitResult::Interrupted;
}
}

true
WaitResult::Success
}
}
82 changes: 53 additions & 29 deletions iceoryx2-pal/concurrency-sync/src/rwlock.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@ use core::{
sync::atomic::{AtomicU32, Ordering},
};

use crate::SPIN_REPETITIONS;
use crate::{WaitAction, WaitResult, SPIN_REPETITIONS};

const WRITE_LOCKED: u32 = u32::MAX;
const UNLOCKED: u32 = 0;
Expand All @@ -37,24 +37,30 @@ impl RwLockReaderPreference {
Self::default()
}

pub fn try_read_lock(&self) -> bool {
pub fn try_read_lock(&self) -> WaitResult {
let reader_count = self.reader_count.load(Ordering::Relaxed);

if reader_count == WRITE_LOCKED {
return false;
return WaitResult::Interrupted;
}

self.reader_count
if self
.reader_count
.compare_exchange(
reader_count,
reader_count + 1,
Ordering::Acquire,
Ordering::Relaxed,
)
.is_ok()
{
WaitResult::Success
} else {
WaitResult::Interrupted
}
}

pub fn read_lock<F: Fn(&AtomicU32, &u32) -> bool>(&self, wait: F) -> bool {
pub fn read_lock<F: Fn(&AtomicU32, &u32) -> WaitAction>(&self, wait: F) -> WaitResult {
let mut reader_count = self.reader_count.load(Ordering::Relaxed);
let mut retry_counter = 0;

Expand All @@ -66,13 +72,13 @@ impl RwLockReaderPreference {
}

if !keep_running {
return false;
return WaitResult::Interrupted;
}

if retry_counter < SPIN_REPETITIONS {
retry_counter += 1;
spin_loop();
} else if !wait(&self.reader_count, &reader_count) {
} else if wait(&self.reader_count, &reader_count) == WaitAction::Abort {
keep_running = false;
}

Expand All @@ -85,7 +91,7 @@ impl RwLockReaderPreference {
Ordering::Acquire,
Ordering::Relaxed,
) {
Ok(_) => return true,
Ok(_) => return WaitResult::Success,
Err(v) => {
reader_count = v;
}
Expand All @@ -103,13 +109,19 @@ impl RwLockReaderPreference {
wake_one(&self.reader_count);
}

pub fn try_write_lock(&self) -> bool {
self.reader_count
pub fn try_write_lock(&self) -> WaitResult {
if self
.reader_count
.compare_exchange(UNLOCKED, WRITE_LOCKED, Ordering::Acquire, Ordering::Relaxed)
.is_ok()
{
WaitResult::Success
} else {
WaitResult::Interrupted
}
}

pub fn write_lock<F: Fn(&AtomicU32, &u32) -> bool>(&self, wait: F) -> bool {
pub fn write_lock<F: Fn(&AtomicU32, &u32) -> WaitAction>(&self, wait: F) -> WaitResult {
let mut retry_counter = 0;
let mut reader_count;

Expand All @@ -122,17 +134,17 @@ impl RwLockReaderPreference {
Ordering::Relaxed,
) {
Err(v) => reader_count = v,
Ok(_) => return true,
Ok(_) => return WaitResult::Success,
};

if !keep_running {
return false;
return WaitResult::Interrupted;
}

if retry_counter < SPIN_REPETITIONS {
retry_counter += 1;
spin_loop();
} else if !wait(&self.reader_count, &reader_count) {
} else if wait(&self.reader_count, &reader_count) == WaitAction::Abort {
keep_running = false;
}
}
Expand All @@ -158,32 +170,38 @@ impl RwLockWriterPreference {
Self::default()
}

pub fn try_read_lock(&self) -> bool {
pub fn try_read_lock(&self) -> WaitResult {
let state = self.state.load(Ordering::Relaxed);
if state % 2 == 1 {
return false;
return WaitResult::Interrupted;
}

self.state
if self
.state
.compare_exchange(state, state + 2, Ordering::Acquire, Ordering::Relaxed)
.is_ok()
{
WaitResult::Success
} else {
WaitResult::Interrupted
}
}

pub fn read_lock<F: Fn(&AtomicU32, &u32) -> bool>(&self, wait: F) -> bool {
pub fn read_lock<F: Fn(&AtomicU32, &u32) -> WaitAction>(&self, wait: F) -> WaitResult {
let mut state = self.state.load(Ordering::Relaxed);

let mut retry_counter = 0;
let mut keep_running = true;
loop {
if state % 2 == 1 {
if !keep_running {
return false;
return WaitResult::Interrupted;
}

if retry_counter < SPIN_REPETITIONS {
retry_counter += 1;
spin_loop();
} else if !wait(&self.state, &state) {
} else if wait(&self.state, &state) == WaitAction::Abort {
keep_running = false;
}
state = self.state.load(Ordering::Relaxed);
Expand All @@ -194,7 +212,7 @@ impl RwLockWriterPreference {
Ordering::Acquire,
Ordering::Relaxed,
) {
Ok(_) => return true,
Ok(_) => return WaitResult::Success,
Err(v) => state = v,
}
}
Expand All @@ -218,22 +236,28 @@ impl RwLockWriterPreference {
}
}

pub fn try_write_lock(&self) -> bool {
self.state
pub fn try_write_lock(&self) -> WaitResult {
if self
.state
.compare_exchange(0, WRITE_LOCKED, Ordering::Acquire, Ordering::Relaxed)
.is_ok()
{
WaitResult::Success
} else {
WaitResult::Interrupted
}
}

pub fn write_lock<
Wait: Fn(&AtomicU32, &u32) -> bool,
Wait: Fn(&AtomicU32, &u32) -> WaitAction,
WakeOne: Fn(&AtomicU32),
WakeAll: Fn(&AtomicU32),
>(
&self,
wait: Wait,
wake_one: WakeOne,
wake_all: WakeAll,
) -> bool {
) -> WaitResult {
let mut state = self.state.load(Ordering::Relaxed);

let mut keep_running = true;
Expand All @@ -246,7 +270,7 @@ impl RwLockWriterPreference {
Ordering::Acquire,
Ordering::Relaxed,
) {
Ok(_) => return true,
Ok(_) => return WaitResult::Success,
Err(v) => state = v,
}
}
Expand All @@ -264,12 +288,12 @@ impl RwLockWriterPreference {
self.writer_wake_counter.fetch_add(1, Ordering::Relaxed);
wake_one(&self.writer_wake_counter);
wake_all(&self.state);
return false;
return WaitResult::Interrupted;
}
Err(v) => state = v,
}
} else {
return false;
return WaitResult::Interrupted;
}
}
}
Expand All @@ -288,7 +312,7 @@ impl RwLockWriterPreference {
retry_counter += 1;
} else {
let writer_wake_counter = self.writer_wake_counter.load(Ordering::Relaxed);
if !wait(&self.writer_wake_counter, &writer_wake_counter) {
if wait(&self.writer_wake_counter, &writer_wake_counter) == WaitAction::Abort {
keep_running = false;
}
}
Expand Down
Loading

0 comments on commit 5d54039

Please sign in to comment.