Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

iox2-51 integrate posix condition variable for mac os #52

Merged
26 changes: 19 additions & 7 deletions iceoryx2-pal/concurrency-sync/src/condition_variable.rs
Original file line number Diff line number Diff line change
Expand Up @@ -13,15 +13,18 @@
use core::sync::atomic::{AtomicU32, Ordering};

pub use crate::mutex::Mutex;
use crate::semaphore::Semaphore;

pub struct ConditionVariable {
counter: AtomicU32,
number_of_waiters: AtomicU32,
semaphore: Semaphore,
}

impl Default for ConditionVariable {
fn default() -> Self {
Self {
counter: AtomicU32::new(0),
semaphore: Semaphore::new(0),
number_of_waiters: AtomicU32::new(0),
}
}
}
Expand All @@ -31,9 +34,16 @@ impl ConditionVariable {
Self::default()
}

pub fn notify<WakeOneOrAll: Fn(&AtomicU32)>(&self, wake_one_or_all: WakeOneOrAll) {
self.counter.fetch_add(1, Ordering::Relaxed);
wake_one_or_all(&self.counter);
pub fn notify_one<WakeOne: Fn(&AtomicU32)>(&self, wake_one: WakeOne) {
self.semaphore.post(
wake_one,
1.min(self.number_of_waiters.load(Ordering::Relaxed)),
);
}

pub fn notify_all<WakeAll: Fn(&AtomicU32)>(&self, wake_all: WakeAll) {
self.semaphore
.post(wake_all, self.number_of_waiters.load(Ordering::Relaxed));
}

pub fn wait<
Expand All @@ -47,12 +57,14 @@ impl ConditionVariable {
wait: Wait,
mtx_wait: MtxWait,
) -> bool {
let counter_value = self.counter.load(Ordering::Relaxed);
mtx.unlock(mtx_wake_one);

if !wait(&self.counter, &counter_value) {
self.number_of_waiters.fetch_add(1, Ordering::Relaxed);
if !self.semaphore.wait(wait) {
self.number_of_waiters.fetch_sub(1, Ordering::Relaxed);
return false;
}
self.number_of_waiters.fetch_sub(1, Ordering::Relaxed);

// this maybe problematic when the wait has a timeout. it is possible that
// the timeout is nearly doubled when wait is waken up at the end of the timeout
Expand Down
4 changes: 2 additions & 2 deletions iceoryx2-pal/concurrency-sync/src/semaphore.rs
Original file line number Diff line number Diff line change
Expand Up @@ -32,8 +32,8 @@ impl Semaphore {
self.value.load(Ordering::Relaxed)
}

pub fn post<WakeUp: Fn(&AtomicU32)>(&self, wakeup: WakeUp) {
self.value.fetch_add(1, Ordering::Acquire);
pub fn post<WakeUp: Fn(&AtomicU32)>(&self, wakeup: WakeUp, value: u32) {
self.value.fetch_add(value, Ordering::Acquire);
wakeup(&self.value);
}

Expand Down
14 changes: 6 additions & 8 deletions iceoryx2-pal/concurrency-sync/tests/condition_variable_tests.rs
Original file line number Diff line number Diff line change
Expand Up @@ -94,7 +94,7 @@ fn condition_variable_notify_one_unblocks_one() {
let counter_old = counter.load(Ordering::Relaxed);

for i in 0..NUMBER_OF_THREADS {
sut.notify(|_| {
sut.notify_one(|_| {
triggered_thread.fetch_add(1, Ordering::Relaxed);
});

Expand Down Expand Up @@ -142,7 +142,7 @@ fn condition_variable_notify_all_unblocks_all() {
std::thread::sleep(TIMEOUT);
let counter_old = counter.load(Ordering::Relaxed);

sut.notify(|_| {
sut.notify_all(|_| {
triggered_thread.fetch_add(1, Ordering::Relaxed);
});

Expand Down Expand Up @@ -192,12 +192,10 @@ fn condition_variable_mutex_is_locked_when_wait_returns() {
std::thread::sleep(TIMEOUT);
let counter_old = counter.load(Ordering::Relaxed);

for _ in 0..NUMBER_OF_THREADS {
sut.notify(|_| {
triggered_thread.fetch_add(1, Ordering::Relaxed);
});
std::thread::sleep(TIMEOUT);
}
sut.notify_all(|_| {
triggered_thread.fetch_add(1, Ordering::Relaxed);
});
std::thread::sleep(TIMEOUT);

assert_that!(counter_old, eq 0);
});
Expand Down
10 changes: 3 additions & 7 deletions iceoryx2-pal/concurrency-sync/tests/semaphore_tests.rs
Original file line number Diff line number Diff line change
Expand Up @@ -30,9 +30,7 @@ fn semaphore_post_and_try_wait_works() {
}
assert_that!(!sut.try_wait(), eq true);

for _ in 0..initial_value {
sut.post(|_| {});
}
sut.post(|_| {}, initial_value);

for _ in 0..initial_value {
assert_that!(sut.try_wait(), eq true);
Expand All @@ -50,9 +48,7 @@ fn semaphore_post_and_wait_works() {
}
assert_that!(!sut.wait(|_, _| false), eq true);

for _ in 0..initial_value {
sut.post(|_| {});
}
sut.post(|_| {}, initial_value);

for _ in 0..initial_value {
assert_that!(sut.wait(|_, _| false), eq true);
Expand All @@ -74,7 +70,7 @@ fn semaphore_wait_blocks() {

std::thread::sleep(TIMEOUT);
let old_counter = counter.load(Ordering::Relaxed);
sut.post(|_| {});
sut.post(|_| {}, 1);

assert_that!(old_counter, eq 0);
});
Expand Down
39 changes: 34 additions & 5 deletions iceoryx2-pal/posix/src/macos/pthread.rs
Original file line number Diff line number Diff line change
Expand Up @@ -169,6 +169,35 @@ pub fn wait(atomic: &AtomicU32, expected: &u32) {
unsafe { __libcpp_atomic_wait(ptr, monitor) };
}

pub fn timed_wait(atomic: &AtomicU32, expected: &u32, timeout: timespec) {
let sleep_time = timespec {
tv_sec: 0,
tv_nsec: 1000000,
};
let mut now = timespec::new();
loop {
if atomic.load(Ordering::Relaxed) != *expected {
return;
}

unsafe { clock_gettime(CLOCK_REALTIME, &mut now) };
if now.tv_sec > timeout.tv_sec
|| (now.tv_sec == timeout.tv_sec && now.tv_nsec > timeout.tv_nsec)
{
return;
}

unsafe {
clock_nanosleep(
CLOCK_REALTIME,
0,
&sleep_time,
core::ptr::null_mut::<timespec>().cast(),
)
};
}
}

pub fn wake_one(atomic: &AtomicU32) {
let ptr = (atomic as *const AtomicU32) as *const void;
unsafe { __cxx_atomic_notify_one(ptr) };
Expand Down Expand Up @@ -567,12 +596,12 @@ pub unsafe fn pthread_rwlock_timedrdlock(
}

pub unsafe fn pthread_cond_broadcast(cond: *mut pthread_cond_t) -> int {
(*cond).cv.notify(wake_all);
(*cond).cv.notify_all(wake_all);
Errno::ESUCCES as _
}

pub unsafe fn pthread_cond_signal(cond: *mut pthread_cond_t) -> int {
(*cond).cv.notify(wake_one);
(*cond).cv.notify_one(wake_one);
Errno::ESUCCES as _
}

Expand Down Expand Up @@ -611,11 +640,11 @@ pub unsafe fn pthread_cond_timedwait(
&(*mutex).mtx,
wake_one,
|atomic, value| {
wait(atomic, value);
true
timed_wait(atomic, value, *abstime);
false
},
|atomic, value| {
wait(atomic, value);
timed_wait(atomic, value, *abstime);
false
},
);
Expand Down
2 changes: 1 addition & 1 deletion iceoryx2-pal/posix/src/macos/semaphore.rs
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@ pub unsafe fn sem_post(sem: *mut sem_t) -> int {
return -1;
}

(*sem).semaphore.post(wake_one);
(*sem).semaphore.post(wake_one, 1);

Errno::set(Errno::ESUCCES);
0
Expand Down
6 changes: 3 additions & 3 deletions iceoryx2-pal/posix/src/windows/pthread.rs
Original file line number Diff line number Diff line change
Expand Up @@ -693,14 +693,14 @@ pub unsafe fn pthread_rwlock_timedrdlock(
}

pub unsafe fn pthread_cond_broadcast(cond: *mut pthread_cond_t) -> int {
(*cond).cv.notify(|atomic| {
(*cond).cv.notify_all(|atomic| {
WakeByAddressAll((atomic as *const AtomicU32).cast());
});
Errno::ESUCCES as _
}

pub unsafe fn pthread_cond_signal(cond: *mut pthread_cond_t) -> int {
(*cond).cv.notify(|atomic| {
(*cond).cv.notify_one(|atomic| {
WakeByAddressSingle((atomic as *const AtomicU32).cast());
});
Errno::ESUCCES as _
Expand Down Expand Up @@ -767,7 +767,7 @@ pub unsafe fn pthread_cond_timedwait(
4,
timeout as _,
), ignore ERROR_TIMEOUT };
true
false
},
|atomic, value| {
win32call! { WaitOnAddress(
Expand Down
9 changes: 6 additions & 3 deletions iceoryx2-pal/posix/src/windows/semaphore.rs
Original file line number Diff line number Diff line change
Expand Up @@ -37,9 +37,12 @@ pub unsafe fn sem_post(sem: *mut sem_t) -> int {
return -1;
}

(*sem).semaphore.post(|atomic| {
WakeByAddressSingle((atomic as *const AtomicU32).cast());
});
(*sem).semaphore.post(
|atomic| {
WakeByAddressSingle((atomic as *const AtomicU32).cast());
},
1,
);

Errno::set(Errno::ESUCCES);
0
Expand Down