From 9660e59c666c945a72f98cd8c5049550eb8a156c Mon Sep 17 00:00:00 2001 From: Christian Eltzschig Date: Tue, 19 Dec 2023 18:16:29 +0100 Subject: [PATCH 1/8] [#51] Add condition variable timed wait support for mac os --- .../posix/tests/condition_variable_tests.rs | 252 +++++++++--------- iceoryx2-pal/posix/src/macos/pthread.rs | 33 ++- 2 files changed, 157 insertions(+), 128 deletions(-) diff --git a/iceoryx2-bb/posix/tests/condition_variable_tests.rs b/iceoryx2-bb/posix/tests/condition_variable_tests.rs index b48099683..21b9c8bf7 100644 --- a/iceoryx2-bb/posix/tests/condition_variable_tests.rs +++ b/iceoryx2-bb/posix/tests/condition_variable_tests.rs @@ -198,46 +198,46 @@ fn condition_variable_trigger_all_signals_all_waiters() { }); } -#[test] -fn condition_variable_trigger_one_signals_one_waiter() { - let handle = MutexHandle::>::new(); - thread::scope(|s| { - let counter = Arc::new(AtomicI32::new(0)); - let sut = Arc::new( - ConditionVariableBuilder::new() - .create_condition_variable(500, |v| *v > 1000, &handle) - .unwrap(), - ); - - let sut_thread1 = Arc::clone(&sut); - let counter_thread1 = Arc::clone(&counter); - let t1 = s.spawn(move || { - sut_thread1.wait().unwrap(); - counter_thread1.fetch_add(1, Ordering::Relaxed); - }); - - let sut_thread2 = Arc::clone(&sut); - let counter_thread2 = Arc::clone(&counter); - let t2 = s.spawn(move || { - sut_thread2.wait().unwrap(); - counter_thread2.fetch_add(1, Ordering::Relaxed); - }); - - thread::sleep(TIMEOUT); - let counter_old_1 = counter.load(Ordering::Relaxed); - sut.trigger_one(); - thread::sleep(TIMEOUT); - let counter_old_2 = counter.load(Ordering::Relaxed); - sut.trigger_one(); - - t1.join().unwrap(); - t2.join().unwrap(); - - assert_that!(counter_old_1, eq 0); - assert_that!(counter_old_2, eq 1); - assert_that!(counter.load(Ordering::Relaxed), eq 2); - }); -} +// #[test] +// fn condition_variable_trigger_one_signals_one_waiter() { +// let handle = MutexHandle::>::new(); +// thread::scope(|s| { +// let counter = Arc::new(AtomicI32::new(0)); +// let sut = Arc::new( +// ConditionVariableBuilder::new() +// .create_condition_variable(500, |v| *v > 1000, &handle) +// .unwrap(), +// ); +// +// let sut_thread1 = Arc::clone(&sut); +// let counter_thread1 = Arc::clone(&counter); +// let t1 = s.spawn(move || { +// sut_thread1.wait().unwrap(); +// counter_thread1.fetch_add(1, Ordering::Relaxed); +// }); +// +// let sut_thread2 = Arc::clone(&sut); +// let counter_thread2 = Arc::clone(&counter); +// let t2 = s.spawn(move || { +// sut_thread2.wait().unwrap(); +// counter_thread2.fetch_add(1, Ordering::Relaxed); +// }); +// +// thread::sleep(TIMEOUT); +// let counter_old_1 = counter.load(Ordering::Relaxed); +// sut.trigger_one(); +// thread::sleep(TIMEOUT); +// let counter_old_2 = counter.load(Ordering::Relaxed); +// sut.trigger_one(); +// +// t1.join().unwrap(); +// t2.join().unwrap(); +// +// assert_that!(counter_old_1, eq 0); +// assert_that!(counter_old_2, eq 1); +// assert_that!(counter.load(Ordering::Relaxed), eq 2); +// }); +// } #[test] fn condition_variable_notify_all_signals_all_waiters() { @@ -285,51 +285,51 @@ fn condition_variable_notify_all_signals_all_waiters() { }); } -#[test] -fn condition_variable_notify_one_signals_one_waiter() { - let handle = MutexHandle::>::new(); - thread::scope(|s| { - let counter = Arc::new(AtomicI32::new(0)); - let sut = Arc::new( - ConditionVariableBuilder::new() - .create_condition_variable(500, |v| *v > 1000, &handle) - .unwrap(), - ); - - let sut_thread1 = Arc::clone(&sut); - let counter_thread1 = Arc::clone(&counter); - let t1 = s.spawn(move || { - sut_thread1.wait_while().unwrap(); - counter_thread1.fetch_add(1, Ordering::Relaxed); - }); - - let sut_thread2 = Arc::clone(&sut); - let counter_thread2 = Arc::clone(&counter); - let t2 = s.spawn(move || { - sut_thread2.wait_while().unwrap(); - counter_thread2.fetch_add(1, Ordering::Relaxed); - }); - - thread::sleep(TIMEOUT); - let counter_old_1 = counter.load(Ordering::Relaxed); - let mut guard = sut.notify_one().unwrap(); - *guard = 1750; - drop(guard); - - thread::sleep(TIMEOUT); - let counter_old_2 = counter.load(Ordering::Relaxed); - let mut guard = sut.notify_one().unwrap(); - *guard = 1500; - drop(guard); - - t1.join().unwrap(); - t2.join().unwrap(); - - assert_that!(counter_old_1, eq 0); - assert_that!(counter_old_2, eq 1); - assert_that!(counter.load(Ordering::Relaxed), eq 2); - }); -} +// #[test] +// fn condition_variable_notify_one_signals_one_waiter() { +// let handle = MutexHandle::>::new(); +// thread::scope(|s| { +// let counter = Arc::new(AtomicI32::new(0)); +// let sut = Arc::new( +// ConditionVariableBuilder::new() +// .create_condition_variable(500, |v| *v > 1000, &handle) +// .unwrap(), +// ); +// +// let sut_thread1 = Arc::clone(&sut); +// let counter_thread1 = Arc::clone(&counter); +// let t1 = s.spawn(move || { +// sut_thread1.wait_while().unwrap(); +// counter_thread1.fetch_add(1, Ordering::Relaxed); +// }); +// +// let sut_thread2 = Arc::clone(&sut); +// let counter_thread2 = Arc::clone(&counter); +// let t2 = s.spawn(move || { +// sut_thread2.wait_while().unwrap(); +// counter_thread2.fetch_add(1, Ordering::Relaxed); +// }); +// +// thread::sleep(TIMEOUT); +// let counter_old_1 = counter.load(Ordering::Relaxed); +// let mut guard = sut.notify_one().unwrap(); +// *guard = 1750; +// drop(guard); +// +// thread::sleep(TIMEOUT); +// let counter_old_2 = counter.load(Ordering::Relaxed); +// let mut guard = sut.notify_one().unwrap(); +// *guard = 1500; +// drop(guard); +// +// t1.join().unwrap(); +// t2.join().unwrap(); +// +// assert_that!(counter_old_1, eq 0); +// assert_that!(counter_old_2, eq 1); +// assert_that!(counter.load(Ordering::Relaxed), eq 2); +// }); +// } #[test] fn condition_variable_modify_notify_all_signals_all_waiters() { @@ -367,47 +367,47 @@ fn condition_variable_modify_notify_all_signals_all_waiters() { }); } -#[test] -fn condition_variable_modify_notify_one_signals_one_waiter() { - let handle = MutexHandle::>::new(); - thread::scope(|s| { - let counter = Arc::new(AtomicI32::new(0)); - let sut = Arc::new( - ConditionVariableBuilder::new() - .create_condition_variable(500, |v| *v > 1000, &handle) - .unwrap(), - ); - - let sut_thread1 = Arc::clone(&sut); - let counter_thread1 = Arc::clone(&counter); - let t1 = s.spawn(move || { - sut_thread1.timed_wait_while(TIMEOUT * 10).unwrap(); - counter_thread1.fetch_add(1, Ordering::Relaxed); - }); - - let sut_thread2 = Arc::clone(&sut); - let counter_thread2 = Arc::clone(&counter); - let t2 = s.spawn(move || { - sut_thread2.timed_wait_while(TIMEOUT * 10).unwrap(); - counter_thread2.fetch_add(1, Ordering::Relaxed); - }); - - thread::sleep(TIMEOUT); - let counter_old_1 = counter.load(Ordering::Relaxed); - sut.modify_notify_one(|value| *value = 2213).unwrap(); - - thread::sleep(TIMEOUT); - let counter_old_2 = counter.load(Ordering::Relaxed); - sut.modify_notify_one(|value| *value = 2213).unwrap(); - - t1.join().unwrap(); - t2.join().unwrap(); - - assert_that!(counter_old_1, eq 0); - assert_that!(counter_old_2, eq 1); - assert_that!(counter.load(Ordering::Relaxed), eq 2); - }); -} +// #[test] +// fn condition_variable_modify_notify_one_signals_one_waiter() { +// let handle = MutexHandle::>::new(); +// thread::scope(|s| { +// let counter = Arc::new(AtomicI32::new(0)); +// let sut = Arc::new( +// ConditionVariableBuilder::new() +// .create_condition_variable(500, |v| *v > 1000, &handle) +// .unwrap(), +// ); +// +// let sut_thread1 = Arc::clone(&sut); +// let counter_thread1 = Arc::clone(&counter); +// let t1 = s.spawn(move || { +// sut_thread1.timed_wait_while(TIMEOUT * 10).unwrap(); +// counter_thread1.fetch_add(1, Ordering::Relaxed); +// }); +// +// let sut_thread2 = Arc::clone(&sut); +// let counter_thread2 = Arc::clone(&counter); +// let t2 = s.spawn(move || { +// sut_thread2.timed_wait_while(TIMEOUT * 10).unwrap(); +// counter_thread2.fetch_add(1, Ordering::Relaxed); +// }); +// +// thread::sleep(TIMEOUT); +// let counter_old_1 = counter.load(Ordering::Relaxed); +// sut.modify_notify_one(|value| *value = 2213).unwrap(); +// +// thread::sleep(TIMEOUT); +// let counter_old_2 = counter.load(Ordering::Relaxed); +// sut.modify_notify_one(|value| *value = 2213).unwrap(); +// +// t1.join().unwrap(); +// t2.join().unwrap(); +// +// assert_that!(counter_old_1, eq 0); +// assert_that!(counter_old_2, eq 1); +// assert_that!(counter.load(Ordering::Relaxed), eq 2); +// }); +// } #[test] fn condition_variable_timed_wait_waits_at_least_given_amount_of_time() { diff --git a/iceoryx2-pal/posix/src/macos/pthread.rs b/iceoryx2-pal/posix/src/macos/pthread.rs index 06c8ed513..268e6013c 100644 --- a/iceoryx2-pal/posix/src/macos/pthread.rs +++ b/iceoryx2-pal/posix/src/macos/pthread.rs @@ -169,6 +169,35 @@ pub fn wait(atomic: &AtomicU32, expected: &u32) { unsafe { __libcpp_atomic_wait(ptr, monitor) }; } +pub fn timed_wait(atomic: &AtomicU32, expected: &u32, timeout: timespec) { + let sleep_time = timespec { + tv_sec: 0, + tv_nsec: 1000000, + }; + let mut now = timespec::new(); + loop { + if atomic.load(Ordering::Relaxed) != *expected { + return; + } + + unsafe { clock_gettime(CLOCK_REALTIME, &mut now) }; + if now.tv_sec > timeout.tv_sec + || (now.tv_sec == timeout.tv_sec && now.tv_nsec > timeout.tv_nsec) + { + return; + } + + unsafe { + clock_nanosleep( + CLOCK_REALTIME, + 0, + &sleep_time, + core::ptr::null_mut::().cast(), + ) + }; + } +} + pub fn wake_one(atomic: &AtomicU32) { let ptr = (atomic as *const AtomicU32) as *const void; unsafe { __cxx_atomic_notify_one(ptr) }; @@ -611,11 +640,11 @@ pub unsafe fn pthread_cond_timedwait( &(*mutex).mtx, wake_one, |atomic, value| { - wait(atomic, value); + timed_wait(atomic, value, *abstime); true }, |atomic, value| { - wait(atomic, value); + timed_wait(atomic, value, *abstime); false }, ); From 37d89a4c18612bdc1197b7f5e8f09205e085d7a0 Mon Sep 17 00:00:00 2001 From: Christian Eltzschig Date: Tue, 19 Dec 2023 19:33:14 +0100 Subject: [PATCH 2/8] [#51] Condition variable handles notify one and notify all correctly --- .../posix/tests/condition_variable_tests.rs | 252 +++++++++--------- .../src/condition_variable.rs | 26 +- .../concurrency-sync/src/semaphore.rs | 4 +- iceoryx2-pal/posix/src/macos/pthread.rs | 6 +- iceoryx2-pal/posix/src/macos/semaphore.rs | 2 +- 5 files changed, 151 insertions(+), 139 deletions(-) diff --git a/iceoryx2-bb/posix/tests/condition_variable_tests.rs b/iceoryx2-bb/posix/tests/condition_variable_tests.rs index 21b9c8bf7..b48099683 100644 --- a/iceoryx2-bb/posix/tests/condition_variable_tests.rs +++ b/iceoryx2-bb/posix/tests/condition_variable_tests.rs @@ -198,46 +198,46 @@ fn condition_variable_trigger_all_signals_all_waiters() { }); } -// #[test] -// fn condition_variable_trigger_one_signals_one_waiter() { -// let handle = MutexHandle::>::new(); -// thread::scope(|s| { -// let counter = Arc::new(AtomicI32::new(0)); -// let sut = Arc::new( -// ConditionVariableBuilder::new() -// .create_condition_variable(500, |v| *v > 1000, &handle) -// .unwrap(), -// ); -// -// let sut_thread1 = Arc::clone(&sut); -// let counter_thread1 = Arc::clone(&counter); -// let t1 = s.spawn(move || { -// sut_thread1.wait().unwrap(); -// counter_thread1.fetch_add(1, Ordering::Relaxed); -// }); -// -// let sut_thread2 = Arc::clone(&sut); -// let counter_thread2 = Arc::clone(&counter); -// let t2 = s.spawn(move || { -// sut_thread2.wait().unwrap(); -// counter_thread2.fetch_add(1, Ordering::Relaxed); -// }); -// -// thread::sleep(TIMEOUT); -// let counter_old_1 = counter.load(Ordering::Relaxed); -// sut.trigger_one(); -// thread::sleep(TIMEOUT); -// let counter_old_2 = counter.load(Ordering::Relaxed); -// sut.trigger_one(); -// -// t1.join().unwrap(); -// t2.join().unwrap(); -// -// assert_that!(counter_old_1, eq 0); -// assert_that!(counter_old_2, eq 1); -// assert_that!(counter.load(Ordering::Relaxed), eq 2); -// }); -// } +#[test] +fn condition_variable_trigger_one_signals_one_waiter() { + let handle = MutexHandle::>::new(); + thread::scope(|s| { + let counter = Arc::new(AtomicI32::new(0)); + let sut = Arc::new( + ConditionVariableBuilder::new() + .create_condition_variable(500, |v| *v > 1000, &handle) + .unwrap(), + ); + + let sut_thread1 = Arc::clone(&sut); + let counter_thread1 = Arc::clone(&counter); + let t1 = s.spawn(move || { + sut_thread1.wait().unwrap(); + counter_thread1.fetch_add(1, Ordering::Relaxed); + }); + + let sut_thread2 = Arc::clone(&sut); + let counter_thread2 = Arc::clone(&counter); + let t2 = s.spawn(move || { + sut_thread2.wait().unwrap(); + counter_thread2.fetch_add(1, Ordering::Relaxed); + }); + + thread::sleep(TIMEOUT); + let counter_old_1 = counter.load(Ordering::Relaxed); + sut.trigger_one(); + thread::sleep(TIMEOUT); + let counter_old_2 = counter.load(Ordering::Relaxed); + sut.trigger_one(); + + t1.join().unwrap(); + t2.join().unwrap(); + + assert_that!(counter_old_1, eq 0); + assert_that!(counter_old_2, eq 1); + assert_that!(counter.load(Ordering::Relaxed), eq 2); + }); +} #[test] fn condition_variable_notify_all_signals_all_waiters() { @@ -285,51 +285,51 @@ fn condition_variable_notify_all_signals_all_waiters() { }); } -// #[test] -// fn condition_variable_notify_one_signals_one_waiter() { -// let handle = MutexHandle::>::new(); -// thread::scope(|s| { -// let counter = Arc::new(AtomicI32::new(0)); -// let sut = Arc::new( -// ConditionVariableBuilder::new() -// .create_condition_variable(500, |v| *v > 1000, &handle) -// .unwrap(), -// ); -// -// let sut_thread1 = Arc::clone(&sut); -// let counter_thread1 = Arc::clone(&counter); -// let t1 = s.spawn(move || { -// sut_thread1.wait_while().unwrap(); -// counter_thread1.fetch_add(1, Ordering::Relaxed); -// }); -// -// let sut_thread2 = Arc::clone(&sut); -// let counter_thread2 = Arc::clone(&counter); -// let t2 = s.spawn(move || { -// sut_thread2.wait_while().unwrap(); -// counter_thread2.fetch_add(1, Ordering::Relaxed); -// }); -// -// thread::sleep(TIMEOUT); -// let counter_old_1 = counter.load(Ordering::Relaxed); -// let mut guard = sut.notify_one().unwrap(); -// *guard = 1750; -// drop(guard); -// -// thread::sleep(TIMEOUT); -// let counter_old_2 = counter.load(Ordering::Relaxed); -// let mut guard = sut.notify_one().unwrap(); -// *guard = 1500; -// drop(guard); -// -// t1.join().unwrap(); -// t2.join().unwrap(); -// -// assert_that!(counter_old_1, eq 0); -// assert_that!(counter_old_2, eq 1); -// assert_that!(counter.load(Ordering::Relaxed), eq 2); -// }); -// } +#[test] +fn condition_variable_notify_one_signals_one_waiter() { + let handle = MutexHandle::>::new(); + thread::scope(|s| { + let counter = Arc::new(AtomicI32::new(0)); + let sut = Arc::new( + ConditionVariableBuilder::new() + .create_condition_variable(500, |v| *v > 1000, &handle) + .unwrap(), + ); + + let sut_thread1 = Arc::clone(&sut); + let counter_thread1 = Arc::clone(&counter); + let t1 = s.spawn(move || { + sut_thread1.wait_while().unwrap(); + counter_thread1.fetch_add(1, Ordering::Relaxed); + }); + + let sut_thread2 = Arc::clone(&sut); + let counter_thread2 = Arc::clone(&counter); + let t2 = s.spawn(move || { + sut_thread2.wait_while().unwrap(); + counter_thread2.fetch_add(1, Ordering::Relaxed); + }); + + thread::sleep(TIMEOUT); + let counter_old_1 = counter.load(Ordering::Relaxed); + let mut guard = sut.notify_one().unwrap(); + *guard = 1750; + drop(guard); + + thread::sleep(TIMEOUT); + let counter_old_2 = counter.load(Ordering::Relaxed); + let mut guard = sut.notify_one().unwrap(); + *guard = 1500; + drop(guard); + + t1.join().unwrap(); + t2.join().unwrap(); + + assert_that!(counter_old_1, eq 0); + assert_that!(counter_old_2, eq 1); + assert_that!(counter.load(Ordering::Relaxed), eq 2); + }); +} #[test] fn condition_variable_modify_notify_all_signals_all_waiters() { @@ -367,47 +367,47 @@ fn condition_variable_modify_notify_all_signals_all_waiters() { }); } -// #[test] -// fn condition_variable_modify_notify_one_signals_one_waiter() { -// let handle = MutexHandle::>::new(); -// thread::scope(|s| { -// let counter = Arc::new(AtomicI32::new(0)); -// let sut = Arc::new( -// ConditionVariableBuilder::new() -// .create_condition_variable(500, |v| *v > 1000, &handle) -// .unwrap(), -// ); -// -// let sut_thread1 = Arc::clone(&sut); -// let counter_thread1 = Arc::clone(&counter); -// let t1 = s.spawn(move || { -// sut_thread1.timed_wait_while(TIMEOUT * 10).unwrap(); -// counter_thread1.fetch_add(1, Ordering::Relaxed); -// }); -// -// let sut_thread2 = Arc::clone(&sut); -// let counter_thread2 = Arc::clone(&counter); -// let t2 = s.spawn(move || { -// sut_thread2.timed_wait_while(TIMEOUT * 10).unwrap(); -// counter_thread2.fetch_add(1, Ordering::Relaxed); -// }); -// -// thread::sleep(TIMEOUT); -// let counter_old_1 = counter.load(Ordering::Relaxed); -// sut.modify_notify_one(|value| *value = 2213).unwrap(); -// -// thread::sleep(TIMEOUT); -// let counter_old_2 = counter.load(Ordering::Relaxed); -// sut.modify_notify_one(|value| *value = 2213).unwrap(); -// -// t1.join().unwrap(); -// t2.join().unwrap(); -// -// assert_that!(counter_old_1, eq 0); -// assert_that!(counter_old_2, eq 1); -// assert_that!(counter.load(Ordering::Relaxed), eq 2); -// }); -// } +#[test] +fn condition_variable_modify_notify_one_signals_one_waiter() { + let handle = MutexHandle::>::new(); + thread::scope(|s| { + let counter = Arc::new(AtomicI32::new(0)); + let sut = Arc::new( + ConditionVariableBuilder::new() + .create_condition_variable(500, |v| *v > 1000, &handle) + .unwrap(), + ); + + let sut_thread1 = Arc::clone(&sut); + let counter_thread1 = Arc::clone(&counter); + let t1 = s.spawn(move || { + sut_thread1.timed_wait_while(TIMEOUT * 10).unwrap(); + counter_thread1.fetch_add(1, Ordering::Relaxed); + }); + + let sut_thread2 = Arc::clone(&sut); + let counter_thread2 = Arc::clone(&counter); + let t2 = s.spawn(move || { + sut_thread2.timed_wait_while(TIMEOUT * 10).unwrap(); + counter_thread2.fetch_add(1, Ordering::Relaxed); + }); + + thread::sleep(TIMEOUT); + let counter_old_1 = counter.load(Ordering::Relaxed); + sut.modify_notify_one(|value| *value = 2213).unwrap(); + + thread::sleep(TIMEOUT); + let counter_old_2 = counter.load(Ordering::Relaxed); + sut.modify_notify_one(|value| *value = 2213).unwrap(); + + t1.join().unwrap(); + t2.join().unwrap(); + + assert_that!(counter_old_1, eq 0); + assert_that!(counter_old_2, eq 1); + assert_that!(counter.load(Ordering::Relaxed), eq 2); + }); +} #[test] fn condition_variable_timed_wait_waits_at_least_given_amount_of_time() { diff --git a/iceoryx2-pal/concurrency-sync/src/condition_variable.rs b/iceoryx2-pal/concurrency-sync/src/condition_variable.rs index 32b10b81d..e31c14ae1 100644 --- a/iceoryx2-pal/concurrency-sync/src/condition_variable.rs +++ b/iceoryx2-pal/concurrency-sync/src/condition_variable.rs @@ -13,15 +13,18 @@ use core::sync::atomic::{AtomicU32, Ordering}; pub use crate::mutex::Mutex; +use crate::semaphore::Semaphore; pub struct ConditionVariable { - counter: AtomicU32, + number_of_waiters: AtomicU32, + semaphore: Semaphore, } impl Default for ConditionVariable { fn default() -> Self { Self { - counter: AtomicU32::new(0), + semaphore: Semaphore::new(0), + number_of_waiters: AtomicU32::new(0), } } } @@ -31,9 +34,16 @@ impl ConditionVariable { Self::default() } - pub fn notify(&self, wake_one_or_all: WakeOneOrAll) { - self.counter.fetch_add(1, Ordering::Relaxed); - wake_one_or_all(&self.counter); + pub fn notify_one(&self, wake_one: WakeOne) { + self.semaphore.post( + wake_one, + 1.min(self.number_of_waiters.load(Ordering::Relaxed)), + ); + } + + pub fn notify_all(&self, wake_all: WakeAll) { + self.semaphore + .post(wake_all, self.number_of_waiters.load(Ordering::Relaxed)); } pub fn wait< @@ -47,12 +57,14 @@ impl ConditionVariable { wait: Wait, mtx_wait: MtxWait, ) -> bool { - let counter_value = self.counter.load(Ordering::Relaxed); mtx.unlock(mtx_wake_one); - if !wait(&self.counter, &counter_value) { + self.number_of_waiters.fetch_add(1, Ordering::Relaxed); + if !self.semaphore.wait(wait) { + self.number_of_waiters.fetch_sub(1, Ordering::Relaxed); return false; } + self.number_of_waiters.fetch_sub(1, Ordering::Relaxed); // this maybe problematic when the wait has a timeout. it is possible that // the timeout is nearly doubled when wait is waken up at the end of the timeout diff --git a/iceoryx2-pal/concurrency-sync/src/semaphore.rs b/iceoryx2-pal/concurrency-sync/src/semaphore.rs index 70ff2acfb..45072d3bd 100644 --- a/iceoryx2-pal/concurrency-sync/src/semaphore.rs +++ b/iceoryx2-pal/concurrency-sync/src/semaphore.rs @@ -32,8 +32,8 @@ impl Semaphore { self.value.load(Ordering::Relaxed) } - pub fn post(&self, wakeup: WakeUp) { - self.value.fetch_add(1, Ordering::Acquire); + pub fn post(&self, wakeup: WakeUp, value: u32) { + self.value.fetch_add(value, Ordering::Acquire); wakeup(&self.value); } diff --git a/iceoryx2-pal/posix/src/macos/pthread.rs b/iceoryx2-pal/posix/src/macos/pthread.rs index 268e6013c..6c79b9af9 100644 --- a/iceoryx2-pal/posix/src/macos/pthread.rs +++ b/iceoryx2-pal/posix/src/macos/pthread.rs @@ -596,12 +596,12 @@ pub unsafe fn pthread_rwlock_timedrdlock( } pub unsafe fn pthread_cond_broadcast(cond: *mut pthread_cond_t) -> int { - (*cond).cv.notify(wake_all); + (*cond).cv.notify_all(wake_all); Errno::ESUCCES as _ } pub unsafe fn pthread_cond_signal(cond: *mut pthread_cond_t) -> int { - (*cond).cv.notify(wake_one); + (*cond).cv.notify_one(wake_one); Errno::ESUCCES as _ } @@ -641,7 +641,7 @@ pub unsafe fn pthread_cond_timedwait( wake_one, |atomic, value| { timed_wait(atomic, value, *abstime); - true + false }, |atomic, value| { timed_wait(atomic, value, *abstime); diff --git a/iceoryx2-pal/posix/src/macos/semaphore.rs b/iceoryx2-pal/posix/src/macos/semaphore.rs index 4a6e0227f..5793c8081 100644 --- a/iceoryx2-pal/posix/src/macos/semaphore.rs +++ b/iceoryx2-pal/posix/src/macos/semaphore.rs @@ -30,7 +30,7 @@ pub unsafe fn sem_post(sem: *mut sem_t) -> int { return -1; } - (*sem).semaphore.post(wake_one); + (*sem).semaphore.post(wake_one, 1); Errno::set(Errno::ESUCCES); 0 From cd408aa927e4ec268686826f07fa31756604c573 Mon Sep 17 00:00:00 2001 From: Christian Eltzschig Date: Tue, 19 Dec 2023 22:40:42 +0100 Subject: [PATCH 3/8] [#51] Fix condition variable for windows --- .../tests/condition_variable_tests.rs | 14 ++++++-------- .../concurrency-sync/tests/semaphore_tests.rs | 10 +++------- iceoryx2-pal/posix/src/windows/pthread.rs | 6 +++--- iceoryx2-pal/posix/src/windows/semaphore.rs | 9 ++++++--- 4 files changed, 18 insertions(+), 21 deletions(-) diff --git a/iceoryx2-pal/concurrency-sync/tests/condition_variable_tests.rs b/iceoryx2-pal/concurrency-sync/tests/condition_variable_tests.rs index aeb490b75..1e3f85972 100644 --- a/iceoryx2-pal/concurrency-sync/tests/condition_variable_tests.rs +++ b/iceoryx2-pal/concurrency-sync/tests/condition_variable_tests.rs @@ -94,7 +94,7 @@ fn condition_variable_notify_one_unblocks_one() { let counter_old = counter.load(Ordering::Relaxed); for i in 0..NUMBER_OF_THREADS { - sut.notify(|_| { + sut.notify_one(|_| { triggered_thread.fetch_add(1, Ordering::Relaxed); }); @@ -142,7 +142,7 @@ fn condition_variable_notify_all_unblocks_all() { std::thread::sleep(TIMEOUT); let counter_old = counter.load(Ordering::Relaxed); - sut.notify(|_| { + sut.notify_all(|_| { triggered_thread.fetch_add(1, Ordering::Relaxed); }); @@ -192,12 +192,10 @@ fn condition_variable_mutex_is_locked_when_wait_returns() { std::thread::sleep(TIMEOUT); let counter_old = counter.load(Ordering::Relaxed); - for _ in 0..NUMBER_OF_THREADS { - sut.notify(|_| { - triggered_thread.fetch_add(1, Ordering::Relaxed); - }); - std::thread::sleep(TIMEOUT); - } + sut.notify_all(|_| { + triggered_thread.fetch_add(1, Ordering::Relaxed); + }); + std::thread::sleep(TIMEOUT); assert_that!(counter_old, eq 0); }); diff --git a/iceoryx2-pal/concurrency-sync/tests/semaphore_tests.rs b/iceoryx2-pal/concurrency-sync/tests/semaphore_tests.rs index a3d6eeada..b02551389 100644 --- a/iceoryx2-pal/concurrency-sync/tests/semaphore_tests.rs +++ b/iceoryx2-pal/concurrency-sync/tests/semaphore_tests.rs @@ -30,9 +30,7 @@ fn semaphore_post_and_try_wait_works() { } assert_that!(!sut.try_wait(), eq true); - for _ in 0..initial_value { - sut.post(|_| {}); - } + sut.post(|_| {}, initial_value); for _ in 0..initial_value { assert_that!(sut.try_wait(), eq true); @@ -50,9 +48,7 @@ fn semaphore_post_and_wait_works() { } assert_that!(!sut.wait(|_, _| false), eq true); - for _ in 0..initial_value { - sut.post(|_| {}); - } + sut.post(|_| {}, initial_value); for _ in 0..initial_value { assert_that!(sut.wait(|_, _| false), eq true); @@ -74,7 +70,7 @@ fn semaphore_wait_blocks() { std::thread::sleep(TIMEOUT); let old_counter = counter.load(Ordering::Relaxed); - sut.post(|_| {}); + sut.post(|_| {}, 1); assert_that!(old_counter, eq 0); }); diff --git a/iceoryx2-pal/posix/src/windows/pthread.rs b/iceoryx2-pal/posix/src/windows/pthread.rs index b3f909381..e214c4a30 100644 --- a/iceoryx2-pal/posix/src/windows/pthread.rs +++ b/iceoryx2-pal/posix/src/windows/pthread.rs @@ -693,14 +693,14 @@ pub unsafe fn pthread_rwlock_timedrdlock( } pub unsafe fn pthread_cond_broadcast(cond: *mut pthread_cond_t) -> int { - (*cond).cv.notify(|atomic| { + (*cond).cv.notify_all(|atomic| { WakeByAddressAll((atomic as *const AtomicU32).cast()); }); Errno::ESUCCES as _ } pub unsafe fn pthread_cond_signal(cond: *mut pthread_cond_t) -> int { - (*cond).cv.notify(|atomic| { + (*cond).cv.notify_one(|atomic| { WakeByAddressSingle((atomic as *const AtomicU32).cast()); }); Errno::ESUCCES as _ @@ -767,7 +767,7 @@ pub unsafe fn pthread_cond_timedwait( 4, timeout as _, ), ignore ERROR_TIMEOUT }; - true + false }, |atomic, value| { win32call! { WaitOnAddress( diff --git a/iceoryx2-pal/posix/src/windows/semaphore.rs b/iceoryx2-pal/posix/src/windows/semaphore.rs index aa3a6536d..540d53b3f 100644 --- a/iceoryx2-pal/posix/src/windows/semaphore.rs +++ b/iceoryx2-pal/posix/src/windows/semaphore.rs @@ -37,9 +37,12 @@ pub unsafe fn sem_post(sem: *mut sem_t) -> int { return -1; } - (*sem).semaphore.post(|atomic| { - WakeByAddressSingle((atomic as *const AtomicU32).cast()); - }); + (*sem).semaphore.post( + |atomic| { + WakeByAddressSingle((atomic as *const AtomicU32).cast()); + }, + 1, + ); Errno::set(Errno::ESUCCES); 0 From 0953e88b4572459235d692ee8d072ecd289d6446 Mon Sep 17 00:00:00 2001 From: Christian Eltzschig Date: Wed, 20 Dec 2023 12:36:25 +0100 Subject: [PATCH 4/8] [#51] Add watchdog to potentially blocking tests and adjust semantics of wait call (was wrong before, corrected but the tests weren't adjusted) --- .../tests/condition_variable_tests.rs | 12 +++++++----- 1 file changed, 7 insertions(+), 5 deletions(-) diff --git a/iceoryx2-pal/concurrency-sync/tests/condition_variable_tests.rs b/iceoryx2-pal/concurrency-sync/tests/condition_variable_tests.rs index 1e3f85972..f9c062933 100644 --- a/iceoryx2-pal/concurrency-sync/tests/condition_variable_tests.rs +++ b/iceoryx2-pal/concurrency-sync/tests/condition_variable_tests.rs @@ -42,7 +42,7 @@ fn condition_variable_notify_one_unblocks_one() { while triggered_thread.load(Ordering::Relaxed) < 1 { spin_loop() } - true + false }, |_, _| true, ); @@ -61,7 +61,7 @@ fn condition_variable_notify_one_unblocks_one() { while triggered_thread.load(Ordering::Relaxed) < 2 { spin_loop() } - true + false }, |_, _| true, ); @@ -80,7 +80,7 @@ fn condition_variable_notify_one_unblocks_one() { while triggered_thread.load(Ordering::Relaxed) < 3 { spin_loop() } - true + false }, |_, _| true, ); @@ -109,6 +109,7 @@ fn condition_variable_notify_one_unblocks_one() { #[test] fn condition_variable_notify_all_unblocks_all() { const NUMBER_OF_THREADS: u32 = 5; + let _watchdog = Watchdog::new(Duration::from_secs(10)); let barrier = Barrier::new(NUMBER_OF_THREADS + 1); let sut = ConditionVariable::new(); let mtx = Mutex::new(); @@ -128,7 +129,7 @@ fn condition_variable_notify_all_unblocks_all() { while triggered_thread.load(Ordering::Relaxed) < 1 { spin_loop() } - true + false }, |_, _| true, ); @@ -158,6 +159,7 @@ fn condition_variable_notify_all_unblocks_all() { #[test] fn condition_variable_mutex_is_locked_when_wait_returns() { const NUMBER_OF_THREADS: u32 = 5; + let _watchdog = Watchdog::new(Duration::from_secs(10)); let barrier = Barrier::new(NUMBER_OF_THREADS + 1); let sut = ConditionVariable::new(); let mtx = Mutex::new(); @@ -176,7 +178,7 @@ fn condition_variable_mutex_is_locked_when_wait_returns() { while triggered_thread.load(Ordering::Relaxed) < 1 { spin_loop() } - true + false }, |_, _| true, ); From bf2dc7023ee22ab5edd8f451f9c35afd14886e8d Mon Sep 17 00:00:00 2001 From: Christian Eltzschig Date: Wed, 20 Dec 2023 14:35:26 +0100 Subject: [PATCH 5/8] [#51] Explicitly wait until every thread reached the wait loop of the condition variable --- .../src/condition_variable.rs | 2 +- .../tests/condition_variable_tests.rs | 147 ++++++++++-------- 2 files changed, 83 insertions(+), 66 deletions(-) diff --git a/iceoryx2-pal/concurrency-sync/src/condition_variable.rs b/iceoryx2-pal/concurrency-sync/src/condition_variable.rs index e31c14ae1..5317a6770 100644 --- a/iceoryx2-pal/concurrency-sync/src/condition_variable.rs +++ b/iceoryx2-pal/concurrency-sync/src/condition_variable.rs @@ -57,9 +57,9 @@ impl ConditionVariable { wait: Wait, mtx_wait: MtxWait, ) -> bool { + self.number_of_waiters.fetch_add(1, Ordering::Relaxed); mtx.unlock(mtx_wake_one); - self.number_of_waiters.fetch_add(1, Ordering::Relaxed); if !self.semaphore.wait(wait) { self.number_of_waiters.fetch_sub(1, Ordering::Relaxed); return false; diff --git a/iceoryx2-pal/concurrency-sync/tests/condition_variable_tests.rs b/iceoryx2-pal/concurrency-sync/tests/condition_variable_tests.rs index f9c062933..c83201e4c 100644 --- a/iceoryx2-pal/concurrency-sync/tests/condition_variable_tests.rs +++ b/iceoryx2-pal/concurrency-sync/tests/condition_variable_tests.rs @@ -12,7 +12,7 @@ use std::{ hint::spin_loop, - sync::atomic::{AtomicU32, Ordering}, + sync::atomic::{AtomicBool, AtomicU32, Ordering}, time::Duration, }; @@ -21,75 +21,82 @@ use iceoryx2_pal_concurrency_sync::{barrier::Barrier, condition_variable::*}; const TIMEOUT: Duration = Duration::from_millis(25); +struct ThreadInWait { + thread_id: AtomicU32, + thread_in_wait: [AtomicBool; NUMBER_OF_THREADS], +} + +impl ThreadInWait { + fn new() -> Self { + const FALSE: AtomicBool = AtomicBool::new(false); + Self { + thread_id: AtomicU32::new(0), + thread_in_wait: [FALSE; NUMBER_OF_THREADS], + } + } + + fn get_id(&self) -> usize { + self.thread_id.fetch_add(1, Ordering::Relaxed) as usize + } + + fn signal_is_in_wait(&self, id: usize) { + self.thread_in_wait[id].store(true, Ordering::Relaxed); + } + + fn block_until_all_threads_are_waiting(&self) { + loop { + let mut wait_for_thread = false; + for v in &self.thread_in_wait { + if v.load(Ordering::Relaxed) == false { + wait_for_thread = true; + break; + } + } + + if wait_for_thread == false { + break; + } + } + } +} + #[test] fn condition_variable_notify_one_unblocks_one() { - const NUMBER_OF_THREADS: u32 = 3; + const NUMBER_OF_THREADS: usize = 3; let _watchdog = Watchdog::new(Duration::from_secs(10)); - let barrier = Barrier::new(NUMBER_OF_THREADS + 1); + let barrier = Barrier::new(NUMBER_OF_THREADS as u32 + 1); let sut = ConditionVariable::new(); let mtx = Mutex::new(); let counter = AtomicU32::new(0); let triggered_thread = AtomicU32::new(0); + let thread_in_wait = ThreadInWait::::new(); std::thread::scope(|s| { - s.spawn(|| { - barrier.wait(|_, _| {}, |_| {}); - mtx.lock(|_, _| true); - let wait_result = sut.wait( - &mtx, - |_| {}, - |_, _| { - while triggered_thread.load(Ordering::Relaxed) < 1 { - spin_loop() - } - false - }, - |_, _| true, - ); - counter.fetch_add(1, Ordering::Relaxed); - mtx.unlock(|_| {}); - assert_that!(wait_result, eq true); - }); - - s.spawn(|| { - barrier.wait(|_, _| {}, |_| {}); - mtx.lock(|_, _| true); - let wait_result = sut.wait( - &mtx, - |_| {}, - |_, _| { - while triggered_thread.load(Ordering::Relaxed) < 2 { - spin_loop() - } - false - }, - |_, _| true, - ); - counter.fetch_add(1, Ordering::Relaxed); - mtx.unlock(|_| {}); - assert_that!(wait_result, eq true); - }); - - s.spawn(|| { - barrier.wait(|_, _| {}, |_| {}); - mtx.lock(|_, _| true); - let wait_result = sut.wait( - &mtx, - |_| {}, - |_, _| { - while triggered_thread.load(Ordering::Relaxed) < 3 { - spin_loop() - } - false - }, - |_, _| true, - ); - counter.fetch_add(1, Ordering::Relaxed); - mtx.unlock(|_| {}); - assert_that!(wait_result, eq true); - }); + for _ in 0..NUMBER_OF_THREADS { + s.spawn(|| { + barrier.wait(|_, _| {}, |_| {}); + mtx.lock(|_, _| true); + let id = thread_in_wait.get_id(); + let wait_result = sut.wait( + &mtx, + |_| {}, + |_, _| { + thread_in_wait.signal_is_in_wait(id); + while triggered_thread.load(Ordering::Relaxed) < 1 { + spin_loop() + } + false + }, + |_, _| true, + ); + counter.fetch_add(1, Ordering::Relaxed); + mtx.unlock(|_| {}); + assert_that!(wait_result, eq true); + }); + } barrier.wait(|_, _| {}, |_| {}); + thread_in_wait.block_until_all_threads_are_waiting(); std::thread::sleep(TIMEOUT); let counter_old = counter.load(Ordering::Relaxed); @@ -99,7 +106,7 @@ fn condition_variable_notify_one_unblocks_one() { }); // this can cause a deadlock but the watchdog takes care of it - while counter.load(Ordering::Relaxed) <= i {} + while counter.load(Ordering::Relaxed) as usize <= i {} } assert_that!(counter_old, eq 0); @@ -108,13 +115,14 @@ fn condition_variable_notify_one_unblocks_one() { #[test] fn condition_variable_notify_all_unblocks_all() { - const NUMBER_OF_THREADS: u32 = 5; + const NUMBER_OF_THREADS: usize = 5; let _watchdog = Watchdog::new(Duration::from_secs(10)); - let barrier = Barrier::new(NUMBER_OF_THREADS + 1); + let barrier = Barrier::new(NUMBER_OF_THREADS as u32 + 1); let sut = ConditionVariable::new(); let mtx = Mutex::new(); let counter = AtomicU32::new(0); let triggered_thread = AtomicU32::new(0); + let thread_in_wait = ThreadInWait::::new(); std::thread::scope(|s| { let mut threads = vec![]; @@ -122,10 +130,12 @@ fn condition_variable_notify_all_unblocks_all() { threads.push(s.spawn(|| { barrier.wait(|_, _| {}, |_| {}); mtx.lock(|_, _| true); + let id = thread_in_wait.get_id(); let wait_result = sut.wait( &mtx, |_| {}, |_, _| { + thread_in_wait.signal_is_in_wait(id); while triggered_thread.load(Ordering::Relaxed) < 1 { spin_loop() } @@ -140,6 +150,8 @@ fn condition_variable_notify_all_unblocks_all() { } barrier.wait(|_, _| {}, |_| {}); + + thread_in_wait.block_until_all_threads_are_waiting(); std::thread::sleep(TIMEOUT); let counter_old = counter.load(Ordering::Relaxed); @@ -152,29 +164,32 @@ fn condition_variable_notify_all_unblocks_all() { } assert_that!(counter_old, eq 0); - assert_that!(counter.load(Ordering::Relaxed), eq NUMBER_OF_THREADS); + assert_that!(counter.load(Ordering::Relaxed), eq NUMBER_OF_THREADS as u32); }); } #[test] fn condition_variable_mutex_is_locked_when_wait_returns() { - const NUMBER_OF_THREADS: u32 = 5; + const NUMBER_OF_THREADS: usize = 5; let _watchdog = Watchdog::new(Duration::from_secs(10)); - let barrier = Barrier::new(NUMBER_OF_THREADS + 1); + let barrier = Barrier::new(NUMBER_OF_THREADS as u32 + 1); let sut = ConditionVariable::new(); let mtx = Mutex::new(); let counter = AtomicU32::new(0); let triggered_thread = AtomicU32::new(0); + let thread_in_wait = ThreadInWait::::new(); std::thread::scope(|s| { for _ in 0..NUMBER_OF_THREADS { s.spawn(|| { barrier.wait(|_, _| {}, |_| {}); mtx.lock(|_, _| true); + let id = thread_in_wait.get_id(); let wait_result = sut.wait( &mtx, |_| {}, |_, _| { + thread_in_wait.signal_is_in_wait(id); while triggered_thread.load(Ordering::Relaxed) < 1 { spin_loop() } @@ -191,6 +206,8 @@ fn condition_variable_mutex_is_locked_when_wait_returns() { } barrier.wait(|_, _| {}, |_| {}); + + thread_in_wait.block_until_all_threads_are_waiting(); std::thread::sleep(TIMEOUT); let counter_old = counter.load(Ordering::Relaxed); From d7d9287c69b9e0030407cbd731513e411f686365 Mon Sep 17 00:00:00 2001 From: Christian Eltzschig Date: Wed, 20 Dec 2023 17:41:45 +0100 Subject: [PATCH 6/8] [#51] Introduce WaitResult & WaitAction to replace bool return values --- .../src/condition_variable.rs | 12 +- iceoryx2-pal/concurrency-sync/src/lib.rs | 12 ++ iceoryx2-pal/concurrency-sync/src/mutex.rs | 36 +++-- iceoryx2-pal/concurrency-sync/src/rwlock.rs | 82 +++++++---- .../concurrency-sync/src/semaphore.rs | 16 +-- .../tests/condition_variable_tests.rs | 34 ++--- .../concurrency-sync/tests/mutex_tests.rs | 18 +-- .../concurrency-sync/tests/rwlock_tests.rs | 132 +++++++++--------- .../concurrency-sync/tests/semaphore_tests.rs | 20 +-- 9 files changed, 204 insertions(+), 158 deletions(-) diff --git a/iceoryx2-pal/concurrency-sync/src/condition_variable.rs b/iceoryx2-pal/concurrency-sync/src/condition_variable.rs index 5317a6770..756150e8a 100644 --- a/iceoryx2-pal/concurrency-sync/src/condition_variable.rs +++ b/iceoryx2-pal/concurrency-sync/src/condition_variable.rs @@ -13,7 +13,7 @@ use core::sync::atomic::{AtomicU32, Ordering}; pub use crate::mutex::Mutex; -use crate::semaphore::Semaphore; +use crate::{semaphore::Semaphore, WaitAction, WaitResult}; pub struct ConditionVariable { number_of_waiters: AtomicU32, @@ -48,21 +48,21 @@ impl ConditionVariable { pub fn wait< WakeOne: Fn(&AtomicU32), - Wait: Fn(&AtomicU32, &u32) -> bool, - MtxWait: Fn(&AtomicU32, &u32) -> bool, + Wait: Fn(&AtomicU32, &u32) -> WaitAction, + MtxWait: Fn(&AtomicU32, &u32) -> WaitAction, >( &self, mtx: &Mutex, mtx_wake_one: WakeOne, wait: Wait, mtx_wait: MtxWait, - ) -> bool { + ) -> WaitResult { self.number_of_waiters.fetch_add(1, Ordering::Relaxed); mtx.unlock(mtx_wake_one); - if !self.semaphore.wait(wait) { + if self.semaphore.wait(wait) == WaitResult::Interrupted { self.number_of_waiters.fetch_sub(1, Ordering::Relaxed); - return false; + return WaitResult::Interrupted; } self.number_of_waiters.fetch_sub(1, Ordering::Relaxed); diff --git a/iceoryx2-pal/concurrency-sync/src/lib.rs b/iceoryx2-pal/concurrency-sync/src/lib.rs index ea12bd325..d444448e2 100644 --- a/iceoryx2-pal/concurrency-sync/src/lib.rs +++ b/iceoryx2-pal/concurrency-sync/src/lib.rs @@ -19,3 +19,15 @@ pub mod condition_variable; pub mod mutex; pub mod rwlock; pub mod semaphore; + +#[derive(Debug, PartialEq, Eq)] +pub enum WaitAction { + Continue, + Abort, +} + +#[derive(Debug, PartialEq, Eq)] +pub enum WaitResult { + Interrupted, + Success, +} diff --git a/iceoryx2-pal/concurrency-sync/src/mutex.rs b/iceoryx2-pal/concurrency-sync/src/mutex.rs index ce92953cf..20171a543 100644 --- a/iceoryx2-pal/concurrency-sync/src/mutex.rs +++ b/iceoryx2-pal/concurrency-sync/src/mutex.rs @@ -15,6 +15,8 @@ use core::{ sync::atomic::{AtomicU32, Ordering}, }; +use crate::{WaitAction, WaitResult}; + pub struct Mutex { // we use an AtomicU32 since it should be supported on nearly every platform state: AtomicU32, @@ -33,24 +35,24 @@ impl Mutex { } } - pub fn lock bool>(&self, wait: Wait) -> bool { - if self.uncontested_lock(crate::SPIN_REPETITIONS) { - return true; + pub fn lock WaitAction>(&self, wait: Wait) -> WaitResult { + if self.uncontested_lock(crate::SPIN_REPETITIONS) == WaitResult::Success { + return WaitResult::Success; } loop { - let keep_running = wait(&self.state, &1); + let action = wait(&self.state, &1); if self .state .compare_exchange(0, 1, Ordering::Acquire, Ordering::Relaxed) .is_ok() { - return true; + return WaitResult::Success; } - if !keep_running { - return false; + if action == WaitAction::Abort { + return WaitResult::Interrupted; } } } @@ -60,15 +62,21 @@ impl Mutex { wake_one(&self.state); } - pub fn try_lock(&self) -> bool { - self.state + pub fn try_lock(&self) -> WaitResult { + if self + .state .compare_exchange(0, 1, Ordering::Acquire, Ordering::Relaxed) .is_ok() + { + WaitResult::Success + } else { + WaitResult::Interrupted + } } - fn uncontested_lock(&self, retry_limit: u64) -> bool { - if self.try_lock() { - return true; + fn uncontested_lock(&self, retry_limit: u64) -> WaitResult { + if self.try_lock() == WaitResult::Success { + return WaitResult::Success; } let mut retry_counter = 0; @@ -81,10 +89,10 @@ impl Mutex { retry_counter += 1; if retry_limit == retry_counter { - return false; + return WaitResult::Interrupted; } } - true + WaitResult::Success } } diff --git a/iceoryx2-pal/concurrency-sync/src/rwlock.rs b/iceoryx2-pal/concurrency-sync/src/rwlock.rs index 4e74a434e..28c1a2bb9 100644 --- a/iceoryx2-pal/concurrency-sync/src/rwlock.rs +++ b/iceoryx2-pal/concurrency-sync/src/rwlock.rs @@ -15,7 +15,7 @@ use core::{ sync::atomic::{AtomicU32, Ordering}, }; -use crate::SPIN_REPETITIONS; +use crate::{WaitAction, WaitResult, SPIN_REPETITIONS}; const WRITE_LOCKED: u32 = u32::MAX; const UNLOCKED: u32 = 0; @@ -37,14 +37,15 @@ impl RwLockReaderPreference { Self::default() } - pub fn try_read_lock(&self) -> bool { + pub fn try_read_lock(&self) -> WaitResult { let reader_count = self.reader_count.load(Ordering::Relaxed); if reader_count == WRITE_LOCKED { - return false; + return WaitResult::Interrupted; } - self.reader_count + if self + .reader_count .compare_exchange( reader_count, reader_count + 1, @@ -52,9 +53,14 @@ impl RwLockReaderPreference { Ordering::Relaxed, ) .is_ok() + { + WaitResult::Success + } else { + WaitResult::Interrupted + } } - pub fn read_lock bool>(&self, wait: F) -> bool { + pub fn read_lock WaitAction>(&self, wait: F) -> WaitResult { let mut reader_count = self.reader_count.load(Ordering::Relaxed); let mut retry_counter = 0; @@ -66,13 +72,13 @@ impl RwLockReaderPreference { } if !keep_running { - return false; + return WaitResult::Interrupted; } if retry_counter < SPIN_REPETITIONS { retry_counter += 1; spin_loop(); - } else if !wait(&self.reader_count, &reader_count) { + } else if wait(&self.reader_count, &reader_count) == WaitAction::Abort { keep_running = false; } @@ -85,7 +91,7 @@ impl RwLockReaderPreference { Ordering::Acquire, Ordering::Relaxed, ) { - Ok(_) => return true, + Ok(_) => return WaitResult::Success, Err(v) => { reader_count = v; } @@ -103,13 +109,19 @@ impl RwLockReaderPreference { wake_one(&self.reader_count); } - pub fn try_write_lock(&self) -> bool { - self.reader_count + pub fn try_write_lock(&self) -> WaitResult { + if self + .reader_count .compare_exchange(UNLOCKED, WRITE_LOCKED, Ordering::Acquire, Ordering::Relaxed) .is_ok() + { + WaitResult::Success + } else { + WaitResult::Interrupted + } } - pub fn write_lock bool>(&self, wait: F) -> bool { + pub fn write_lock WaitAction>(&self, wait: F) -> WaitResult { let mut retry_counter = 0; let mut reader_count; @@ -122,17 +134,17 @@ impl RwLockReaderPreference { Ordering::Relaxed, ) { Err(v) => reader_count = v, - Ok(_) => return true, + Ok(_) => return WaitResult::Success, }; if !keep_running { - return false; + return WaitResult::Interrupted; } if retry_counter < SPIN_REPETITIONS { retry_counter += 1; spin_loop(); - } else if !wait(&self.reader_count, &reader_count) { + } else if wait(&self.reader_count, &reader_count) == WaitAction::Abort { keep_running = false; } } @@ -158,18 +170,24 @@ impl RwLockWriterPreference { Self::default() } - pub fn try_read_lock(&self) -> bool { + pub fn try_read_lock(&self) -> WaitResult { let state = self.state.load(Ordering::Relaxed); if state % 2 == 1 { - return false; + return WaitResult::Interrupted; } - self.state + if self + .state .compare_exchange(state, state + 2, Ordering::Acquire, Ordering::Relaxed) .is_ok() + { + WaitResult::Success + } else { + WaitResult::Interrupted + } } - pub fn read_lock bool>(&self, wait: F) -> bool { + pub fn read_lock WaitAction>(&self, wait: F) -> WaitResult { let mut state = self.state.load(Ordering::Relaxed); let mut retry_counter = 0; @@ -177,13 +195,13 @@ impl RwLockWriterPreference { loop { if state % 2 == 1 { if !keep_running { - return false; + return WaitResult::Interrupted; } if retry_counter < SPIN_REPETITIONS { retry_counter += 1; spin_loop(); - } else if !wait(&self.state, &state) { + } else if wait(&self.state, &state) == WaitAction::Abort { keep_running = false; } state = self.state.load(Ordering::Relaxed); @@ -194,7 +212,7 @@ impl RwLockWriterPreference { Ordering::Acquire, Ordering::Relaxed, ) { - Ok(_) => return true, + Ok(_) => return WaitResult::Success, Err(v) => state = v, } } @@ -218,14 +236,20 @@ impl RwLockWriterPreference { } } - pub fn try_write_lock(&self) -> bool { - self.state + pub fn try_write_lock(&self) -> WaitResult { + if self + .state .compare_exchange(0, WRITE_LOCKED, Ordering::Acquire, Ordering::Relaxed) .is_ok() + { + WaitResult::Success + } else { + WaitResult::Interrupted + } } pub fn write_lock< - Wait: Fn(&AtomicU32, &u32) -> bool, + Wait: Fn(&AtomicU32, &u32) -> WaitAction, WakeOne: Fn(&AtomicU32), WakeAll: Fn(&AtomicU32), >( @@ -233,7 +257,7 @@ impl RwLockWriterPreference { wait: Wait, wake_one: WakeOne, wake_all: WakeAll, - ) -> bool { + ) -> WaitResult { let mut state = self.state.load(Ordering::Relaxed); let mut keep_running = true; @@ -246,7 +270,7 @@ impl RwLockWriterPreference { Ordering::Acquire, Ordering::Relaxed, ) { - Ok(_) => return true, + Ok(_) => return WaitResult::Success, Err(v) => state = v, } } @@ -264,12 +288,12 @@ impl RwLockWriterPreference { self.writer_wake_counter.fetch_add(1, Ordering::Relaxed); wake_one(&self.writer_wake_counter); wake_all(&self.state); - return false; + return WaitResult::Interrupted; } Err(v) => state = v, } } else { - return false; + return WaitResult::Interrupted; } } } @@ -288,7 +312,7 @@ impl RwLockWriterPreference { retry_counter += 1; } else { let writer_wake_counter = self.writer_wake_counter.load(Ordering::Relaxed); - if !wait(&self.writer_wake_counter, &writer_wake_counter) { + if wait(&self.writer_wake_counter, &writer_wake_counter) == WaitAction::Abort { keep_running = false; } } diff --git a/iceoryx2-pal/concurrency-sync/src/semaphore.rs b/iceoryx2-pal/concurrency-sync/src/semaphore.rs index 45072d3bd..b0a962cf5 100644 --- a/iceoryx2-pal/concurrency-sync/src/semaphore.rs +++ b/iceoryx2-pal/concurrency-sync/src/semaphore.rs @@ -15,7 +15,7 @@ use core::{ sync::atomic::{AtomicU32, Ordering}, }; -use crate::SPIN_REPETITIONS; +use crate::{WaitAction, WaitResult, SPIN_REPETITIONS}; pub struct Semaphore { value: AtomicU32, @@ -37,7 +37,7 @@ impl Semaphore { wakeup(&self.value); } - pub fn wait bool>(&self, wait: Wait) -> bool { + pub fn wait WaitAction>(&self, wait: Wait) -> WaitResult { let mut retry_counter = 0; let mut current_value = self.value.load(Ordering::Relaxed); @@ -49,13 +49,13 @@ impl Semaphore { } if !keep_running { - return false; + return WaitResult::Interrupted; } if retry_counter < SPIN_REPETITIONS { spin_loop(); retry_counter += 1; - } else if !wait(&self.value, ¤t_value) { + } else if wait(&self.value, ¤t_value) == WaitAction::Abort { keep_running = false; } current_value = self.value.load(Ordering::Relaxed); @@ -72,15 +72,15 @@ impl Semaphore { } } - true + WaitResult::Success } - pub fn try_wait(&self) -> bool { + pub fn try_wait(&self) -> WaitResult { let mut current_value = self.value.load(Ordering::Relaxed); loop { if current_value == 0 { - return false; + return WaitResult::Interrupted; } match self.value.compare_exchange_weak( @@ -89,7 +89,7 @@ impl Semaphore { Ordering::Release, Ordering::Relaxed, ) { - Ok(_) => return true, + Ok(_) => return WaitResult::Success, Err(v) => current_value = v, } } diff --git a/iceoryx2-pal/concurrency-sync/tests/condition_variable_tests.rs b/iceoryx2-pal/concurrency-sync/tests/condition_variable_tests.rs index c83201e4c..40694a165 100644 --- a/iceoryx2-pal/concurrency-sync/tests/condition_variable_tests.rs +++ b/iceoryx2-pal/concurrency-sync/tests/condition_variable_tests.rs @@ -17,7 +17,9 @@ use std::{ }; use iceoryx2_bb_testing::{assert_that, watchdog::Watchdog}; -use iceoryx2_pal_concurrency_sync::{barrier::Barrier, condition_variable::*}; +use iceoryx2_pal_concurrency_sync::{ + barrier::Barrier, condition_variable::*, WaitAction, WaitResult, +}; const TIMEOUT: Duration = Duration::from_millis(25); @@ -75,7 +77,7 @@ fn condition_variable_notify_one_unblocks_one() { for _ in 0..NUMBER_OF_THREADS { s.spawn(|| { barrier.wait(|_, _| {}, |_| {}); - mtx.lock(|_, _| true); + mtx.lock(|_, _| WaitAction::Continue); let id = thread_in_wait.get_id(); let wait_result = sut.wait( &mtx, @@ -85,13 +87,13 @@ fn condition_variable_notify_one_unblocks_one() { while triggered_thread.load(Ordering::Relaxed) < 1 { spin_loop() } - false + WaitAction::Continue }, - |_, _| true, + |_, _| WaitAction::Continue, ); counter.fetch_add(1, Ordering::Relaxed); mtx.unlock(|_| {}); - assert_that!(wait_result, eq true); + assert_that!(wait_result, eq WaitResult::Success); }); } @@ -129,7 +131,7 @@ fn condition_variable_notify_all_unblocks_all() { for _ in 0..NUMBER_OF_THREADS { threads.push(s.spawn(|| { barrier.wait(|_, _| {}, |_| {}); - mtx.lock(|_, _| true); + mtx.lock(|_, _| WaitAction::Continue); let id = thread_in_wait.get_id(); let wait_result = sut.wait( &mtx, @@ -139,13 +141,13 @@ fn condition_variable_notify_all_unblocks_all() { while triggered_thread.load(Ordering::Relaxed) < 1 { spin_loop() } - false + WaitAction::Continue }, - |_, _| true, + |_, _| WaitAction::Continue, ); counter.fetch_add(1, Ordering::Relaxed); mtx.unlock(|_| {}); - assert_that!(wait_result, eq true); + assert_that!(wait_result, eq WaitResult::Success); })); } @@ -183,7 +185,7 @@ fn condition_variable_mutex_is_locked_when_wait_returns() { for _ in 0..NUMBER_OF_THREADS { s.spawn(|| { barrier.wait(|_, _| {}, |_| {}); - mtx.lock(|_, _| true); + mtx.lock(|_, _| WaitAction::Continue); let id = thread_in_wait.get_id(); let wait_result = sut.wait( &mtx, @@ -193,13 +195,13 @@ fn condition_variable_mutex_is_locked_when_wait_returns() { while triggered_thread.load(Ordering::Relaxed) < 1 { spin_loop() } - false + WaitAction::Continue }, - |_, _| true, + |_, _| WaitAction::Continue, ); counter.fetch_add(1, Ordering::Relaxed); - assert_that!(wait_result, eq true); - assert_that!(mtx.try_lock(), eq false); + assert_that!(wait_result, eq WaitResult::Success); + assert_that!(mtx.try_lock(), eq WaitResult::Interrupted); // unlock thread since we own it mtx.unlock(|_| {}); }); @@ -224,7 +226,7 @@ fn condition_variable_mutex_is_locked_when_wait_returns() { fn condition_variable_wait_returns_false_when_functor_returns_false() { let sut = ConditionVariable::new(); let mtx = Mutex::new(); - mtx.lock(|_, _| true); - assert_that!(!sut.wait(&mtx, |_| {}, |_, _| false, |_, _| true), eq true); + mtx.lock(|_, _| WaitAction::Continue); + assert_that!(sut.wait(&mtx, |_| {}, |_, _| WaitAction::Abort, |_, _| WaitAction::Continue), eq WaitResult::Interrupted); mtx.unlock(|_| {}); } diff --git a/iceoryx2-pal/concurrency-sync/tests/mutex_tests.rs b/iceoryx2-pal/concurrency-sync/tests/mutex_tests.rs index fc76c5b3c..28f327d35 100644 --- a/iceoryx2-pal/concurrency-sync/tests/mutex_tests.rs +++ b/iceoryx2-pal/concurrency-sync/tests/mutex_tests.rs @@ -16,7 +16,7 @@ use std::{ }; use iceoryx2_bb_testing::assert_that; -use iceoryx2_pal_concurrency_sync::mutex::*; +use iceoryx2_pal_concurrency_sync::{mutex::*, WaitAction, WaitResult}; const TIMEOUT: Duration = Duration::from_millis(25); @@ -29,7 +29,7 @@ fn mutex_lock_blocks() { sut.try_lock(); let t1 = s.spawn(|| { - sut.lock(|_, _| true); + sut.lock(|_, _| WaitAction::Continue); counter.fetch_add(1, Ordering::Relaxed); sut.unlock(|_| {}); }); @@ -57,15 +57,15 @@ fn mutex_lock_with_timeout_blocks() { let start = Instant::now(); while atomic.load(Ordering::Relaxed) == *value { if start.elapsed() > TIMEOUT * 2 { - return false; + return WaitAction::Abort; } } - true + WaitAction::Continue }); counter.fetch_add(1, Ordering::Relaxed); sut.unlock(|_| {}); - assert_that!(lock_result, eq true); + assert_that!(lock_result, eq WaitResult::Success); }); std::thread::sleep(TIMEOUT); @@ -84,14 +84,14 @@ fn mutex_lock_with_timeout_and_fails_after_timeout() { sut.try_lock(); - assert_that!(!sut.lock(|atomic, value| { + assert_that!(sut.lock(|atomic, value| { let start = Instant::now(); while atomic.load(Ordering::Relaxed) == *value { if start.elapsed() > TIMEOUT { - return false; + return WaitAction::Abort; } } - true - }), eq true); + WaitAction::Continue + }), eq WaitResult::Interrupted); } diff --git a/iceoryx2-pal/concurrency-sync/tests/rwlock_tests.rs b/iceoryx2-pal/concurrency-sync/tests/rwlock_tests.rs index 9564728ab..e3d5de82f 100644 --- a/iceoryx2-pal/concurrency-sync/tests/rwlock_tests.rs +++ b/iceoryx2-pal/concurrency-sync/tests/rwlock_tests.rs @@ -16,7 +16,7 @@ use std::{ }; use iceoryx2_bb_testing::assert_that; -use iceoryx2_pal_concurrency_sync::{barrier::Barrier, rwlock::*}; +use iceoryx2_pal_concurrency_sync::{barrier::Barrier, rwlock::*, WaitAction, WaitResult}; const TIMEOUT: Duration = Duration::from_millis(25); @@ -24,48 +24,48 @@ const TIMEOUT: Duration = Duration::from_millis(25); fn rwlock_reader_preference_try_write_lock_blocks_read_locks() { let sut = RwLockReaderPreference::new(); - assert_that!(sut.try_write_lock(), eq true); - assert_that!(!sut.try_write_lock(), eq true); - assert_that!(!sut.write_lock(|_, _| false), eq true); + assert_that!(sut.try_write_lock(), eq WaitResult::Success); + assert_that!(sut.try_write_lock(), eq WaitResult::Interrupted); + assert_that!(sut.write_lock(|_, _| WaitAction::Abort), eq WaitResult::Interrupted); - assert_that!(!sut.try_read_lock(), eq true); - assert_that!(!sut.read_lock(|_, _| false), eq true); + assert_that!(sut.try_read_lock(), eq WaitResult::Interrupted); + assert_that!(sut.read_lock(|_, _| WaitAction::Abort), eq WaitResult::Interrupted); } #[test] fn rwlock_reader_preference_multiple_read_locks_block_write_lock() { let sut = RwLockReaderPreference::new(); - assert_that!(sut.try_read_lock(), eq true); - assert_that!(sut.try_read_lock(), eq true); - assert_that!(sut.read_lock(|_, _| false), eq true); - assert_that!(sut.read_lock(|_, _| false), eq true); + assert_that!(sut.try_read_lock(), eq WaitResult::Success); + assert_that!(sut.try_read_lock(), eq WaitResult::Success); + assert_that!(sut.read_lock(|_, _| WaitAction::Abort), eq WaitResult::Success); + assert_that!(sut.read_lock(|_, _| WaitAction::Abort), eq WaitResult::Success); - assert_that!(!sut.try_write_lock(), eq true); - assert_that!(!sut.write_lock(|_, _| false), eq true); + assert_that!(sut.try_write_lock(), eq WaitResult::Interrupted); + assert_that!(sut.write_lock(|_, _| WaitAction::Abort), eq WaitResult::Interrupted); } #[test] fn rwlock_reader_preference_write_lock_and_unlock_works() { let sut = RwLockReaderPreference::new(); - assert_that!(sut.write_lock(|_, _| false), eq true); + assert_that!(sut.write_lock(|_, _| WaitAction::Abort), eq WaitResult::Success); - assert_that!(!sut.try_write_lock(), eq true); - assert_that!(!sut.try_read_lock(), eq true); - assert_that!(!sut.read_lock(|_, _| false), eq true); + assert_that!(sut.try_write_lock(), eq WaitResult::Interrupted); + assert_that!(sut.try_read_lock(), eq WaitResult::Interrupted); + assert_that!(sut.read_lock(|_, _| WaitAction::Abort), eq WaitResult::Interrupted); sut.unlock(|_| {}); - assert_that!(sut.try_write_lock(), eq true); + assert_that!(sut.try_write_lock(), eq WaitResult::Success); - assert_that!(!sut.write_lock(|_, _| false), eq true); - assert_that!(!sut.try_read_lock(), eq true); - assert_that!(!sut.read_lock(|_, _| false), eq true); + assert_that!(sut.write_lock(|_, _| WaitAction::Abort), eq WaitResult::Interrupted); + assert_that!(sut.try_read_lock(), eq WaitResult::Interrupted); + assert_that!(sut.read_lock(|_, _| WaitAction::Abort), eq WaitResult::Interrupted); sut.unlock(|_| {}); - assert_that!(sut.write_lock(|_, _| false), eq true); + assert_that!(sut.write_lock(|_, _| WaitAction::Abort), eq WaitResult::Success); } #[test] @@ -74,16 +74,16 @@ fn rwlock_reader_preference_try_read_lock_and_unlock_works() { let sut = RwLockReaderPreference::new(); for _ in 0..NUMBER_OF_READ_LOCKS { - assert_that!(sut.try_read_lock(), eq true); - assert_that!(!sut.try_write_lock(), eq true); - assert_that!(!sut.write_lock(|_, _| false), eq true); + assert_that!(sut.try_read_lock(), eq WaitResult::Success); + assert_that!(sut.try_write_lock(), eq WaitResult::Interrupted); + assert_that!(sut.write_lock(|_, _| WaitAction::Abort), eq WaitResult::Interrupted); } for _ in 0..NUMBER_OF_READ_LOCKS { sut.unlock(|_| {}); } - assert_that!(sut.try_write_lock(), eq true); + assert_that!(sut.try_write_lock(), eq WaitResult::Success); } #[test] @@ -92,16 +92,16 @@ fn rwlock_reader_preference_read_lock_and_unlock_works() { let sut = RwLockReaderPreference::new(); for _ in 0..NUMBER_OF_READ_LOCKS { - assert_that!(sut.read_lock(|_, _| false), eq true); - assert_that!(!sut.try_write_lock(), eq true); - assert_that!(!sut.write_lock(|_, _| false), eq true); + assert_that!(sut.read_lock(|_, _| WaitAction::Abort), eq WaitResult::Success); + assert_that!(sut.try_write_lock(), eq WaitResult::Interrupted); + assert_that!(sut.write_lock(|_, _| WaitAction::Abort), eq WaitResult::Interrupted); } for _ in 0..NUMBER_OF_READ_LOCKS { sut.unlock(|_| {}); } - assert_that!(sut.write_lock(|_, _| false), eq true); + assert_that!(sut.write_lock(|_, _| WaitAction::Abort), eq WaitResult::Success); } #[test] @@ -118,11 +118,11 @@ fn rwlock_reader_preference_read_lock_blocks_only_write_locks() { let write_counter = AtomicU32::new(0); std::thread::scope(|s| { - assert_that!(sut.try_read_lock(), eq true); + assert_that!(sut.try_read_lock(), eq WaitResult::Success); for _ in 0..WRITE_THREADS { s.spawn(|| { barrier.wait(|_, _| {}, |_| {}); - sut.write_lock(|_, _| true); + sut.write_lock(|_, _| WaitAction::Continue); write_counter.fetch_add(1, Ordering::Relaxed); sut.unlock(|_| {}); barrier_write.wait(|_, _| {}, |_| {}); @@ -132,7 +132,7 @@ fn rwlock_reader_preference_read_lock_blocks_only_write_locks() { for _ in 0..READ_THREADS { s.spawn(|| { barrier.wait(|_, _| {}, |_| {}); - sut.read_lock(|_, _| true); + sut.read_lock(|_, _| WaitAction::Continue); read_counter.fetch_add(1, Ordering::Relaxed); barrier_read.wait(|_, _| {}, |_| {}); sut.unlock(|_| {}); @@ -171,11 +171,11 @@ fn rwlock_reader_preference_write_lock_blocks_everything() { let write_counter = AtomicU32::new(0); std::thread::scope(|s| { - assert_that!(sut.try_write_lock(), eq true); + assert_that!(sut.try_write_lock(), eq WaitResult::Success); for _ in 0..WRITE_THREADS { s.spawn(|| { barrier.wait(|_, _| {}, |_| {}); - sut.write_lock(|_, _| true); + sut.write_lock(|_, _| WaitAction::Continue); let current_read_counter = read_counter.load(Ordering::Relaxed); write_counter.fetch_add(1, Ordering::Relaxed); std::thread::sleep(TIMEOUT); @@ -190,7 +190,7 @@ fn rwlock_reader_preference_write_lock_blocks_everything() { for _ in 0..READ_THREADS { s.spawn(|| { barrier.wait(|_, _| {}, |_| {}); - sut.read_lock(|_, _| true); + sut.read_lock(|_, _| WaitAction::Continue); read_counter.fetch_add(1, Ordering::Relaxed); sut.unlock(|_| {}); @@ -226,48 +226,48 @@ fn rwlock_reader_preference_write_lock_blocks_everything() { fn rwlock_writer_preference_try_write_lock_blocks_read_locks() { let sut = RwLockWriterPreference::new(); - assert_that!(sut.try_write_lock(), eq true); - assert_that!(!sut.try_write_lock(), eq true); - assert_that!(!sut.write_lock(|_, _| false, |_| {}, |_| {}), eq true); + assert_that!(sut.try_write_lock(), eq WaitResult::Success); + assert_that!(sut.try_write_lock(), eq WaitResult::Interrupted); + assert_that!(sut.write_lock(|_, _| WaitAction::Abort, |_| {}, |_| {}), eq WaitResult::Interrupted); - assert_that!(!sut.try_read_lock(), eq true); - assert_that!(!sut.read_lock(|_, _| false), eq true); + assert_that!(sut.try_read_lock(), eq WaitResult::Interrupted); + assert_that!(sut.read_lock(|_, _| WaitAction::Abort), eq WaitResult::Interrupted); } #[test] fn rwlock_writer_preference_multiple_read_locks_block_write_lock() { let sut = RwLockWriterPreference::new(); - assert_that!(sut.try_read_lock(), eq true); - assert_that!(sut.try_read_lock(), eq true); - assert_that!(sut.read_lock(|_, _| false), eq true); - assert_that!(sut.read_lock(|_, _| false), eq true); + assert_that!(sut.try_read_lock(), eq WaitResult::Success); + assert_that!(sut.try_read_lock(), eq WaitResult::Success); + assert_that!(sut.read_lock(|_, _| WaitAction::Abort), eq WaitResult::Success); + assert_that!(sut.read_lock(|_, _| WaitAction::Abort), eq WaitResult::Success); - assert_that!(!sut.try_write_lock(), eq true); - assert_that!(!sut.write_lock(|_, _| false, |_| {}, |_| {}), eq true); + assert_that!(sut.try_write_lock(), eq WaitResult::Interrupted); + assert_that!(sut.write_lock(|_, _| WaitAction::Abort, |_| {}, |_| {}), eq WaitResult::Interrupted); } #[test] fn rwlock_writer_preference_write_lock_and_unlock_works() { let sut = RwLockWriterPreference::new(); - assert_that!(sut.write_lock(|_, _| false, |_| {}, |_| {}), eq true); + assert_that!(sut.write_lock(|_, _| WaitAction::Abort, |_| {}, |_| {}), eq WaitResult::Success); - assert_that!(!sut.try_write_lock(), eq true); - assert_that!(!sut.try_read_lock(), eq true); - assert_that!(!sut.read_lock(|_, _| false), eq true); + assert_that!(sut.try_write_lock(), eq WaitResult::Interrupted); + assert_that!(sut.try_read_lock(), eq WaitResult::Interrupted); + assert_that!(sut.read_lock(|_, _| WaitAction::Abort), eq WaitResult::Interrupted); sut.unlock(|_| {}, |_| {}); - assert_that!(sut.try_write_lock(), eq true); + assert_that!(sut.try_write_lock(), eq WaitResult::Success); - assert_that!(!sut.write_lock(|_, _| false, |_| {}, |_| {}), eq true); - assert_that!(!sut.try_read_lock(), eq true); - assert_that!(!sut.read_lock(|_, _| false), eq true); + assert_that!(sut.write_lock(|_, _| WaitAction::Abort, |_| {}, |_| {}), eq WaitResult::Interrupted); + assert_that!(sut.try_read_lock(), eq WaitResult::Interrupted); + assert_that!(sut.read_lock(|_, _| WaitAction::Abort), eq WaitResult::Interrupted); sut.unlock(|_| {}, |_| {}); - assert_that!(sut.write_lock(|_, _| false, |_| {}, |_| {}), eq true); + assert_that!(sut.write_lock(|_, _| WaitAction::Abort, |_| {}, |_| {}), eq WaitResult::Success); } #[test] @@ -276,16 +276,16 @@ fn rwlock_writer_preference_try_read_lock_and_unlock_works() { let sut = RwLockWriterPreference::new(); for _ in 0..NUMBER_OF_READ_LOCKS { - assert_that!(sut.try_read_lock(), eq true); - assert_that!(!sut.try_write_lock(), eq true); - assert_that!(!sut.write_lock(|_, _| false, |_| {}, |_| {}), eq true); + assert_that!(sut.try_read_lock(), eq WaitResult::Success); + assert_that!(sut.try_write_lock(), eq WaitResult::Interrupted); + assert_that!(sut.write_lock(|_, _| WaitAction::Abort, |_| {}, |_| {}), eq WaitResult::Interrupted); } for _ in 0..NUMBER_OF_READ_LOCKS { sut.unlock(|_| {}, |_| {}); } - assert_that!(sut.try_write_lock(), eq true); + assert_that!(sut.try_write_lock(), eq WaitResult::Success); } #[test] @@ -294,16 +294,16 @@ fn rwlock_writer_preference_read_lock_and_unlock_works() { let sut = RwLockWriterPreference::new(); for _ in 0..NUMBER_OF_READ_LOCKS { - assert_that!(sut.read_lock(|_, _| false), eq true); - assert_that!(!sut.try_write_lock(), eq true); - assert_that!(!sut.write_lock(|_, _| false, |_| {}, |_| {}), eq true); + assert_that!(sut.read_lock(|_, _| WaitAction::Abort), eq WaitResult::Success); + assert_that!(sut.try_write_lock(), eq WaitResult::Interrupted); + assert_that!(sut.write_lock(|_, _| WaitAction::Abort, |_| {}, |_| {}), eq WaitResult::Interrupted); } for _ in 0..NUMBER_OF_READ_LOCKS { sut.unlock(|_| {}, |_| {}); } - assert_that!(sut.write_lock(|_, _| false, |_| {}, |_| {}), eq true); + assert_that!(sut.write_lock(|_, _| WaitAction::Abort, |_| {}, |_| {}), eq WaitResult::Success); } #[test] @@ -319,11 +319,11 @@ fn rwlock_writer_preference_write_lock_blocks_everything() { let write_counter = AtomicU32::new(0); std::thread::scope(|s| { - assert_that!(sut.try_write_lock(), eq true); + assert_that!(sut.try_write_lock(), eq WaitResult::Success); for _ in 0..WRITE_THREADS { s.spawn(|| { barrier.wait(|_, _| {}, |_| {}); - sut.write_lock(|_, _| true, |_| {}, |_| {}); + sut.write_lock(|_, _| WaitAction::Continue, |_| {}, |_| {}); let current_read_counter = read_counter.load(Ordering::Relaxed); write_counter.fetch_add(1, Ordering::Relaxed); std::thread::sleep(TIMEOUT); @@ -338,7 +338,7 @@ fn rwlock_writer_preference_write_lock_blocks_everything() { for _ in 0..READ_THREADS { s.spawn(|| { barrier.wait(|_, _| {}, |_| {}); - sut.read_lock(|_, _| true); + sut.read_lock(|_, _| WaitAction::Continue); read_counter.fetch_add(1, Ordering::Relaxed); sut.unlock(|_| {}, |_| {}); diff --git a/iceoryx2-pal/concurrency-sync/tests/semaphore_tests.rs b/iceoryx2-pal/concurrency-sync/tests/semaphore_tests.rs index b02551389..97ca08425 100644 --- a/iceoryx2-pal/concurrency-sync/tests/semaphore_tests.rs +++ b/iceoryx2-pal/concurrency-sync/tests/semaphore_tests.rs @@ -16,7 +16,7 @@ use std::{ }; use iceoryx2_bb_testing::assert_that; -use iceoryx2_pal_concurrency_sync::semaphore::*; +use iceoryx2_pal_concurrency_sync::{semaphore::*, WaitAction, WaitResult}; const TIMEOUT: Duration = Duration::from_millis(25); @@ -26,16 +26,16 @@ fn semaphore_post_and_try_wait_works() { let sut = Semaphore::new(initial_value); for _ in 0..initial_value { - assert_that!(sut.try_wait(), eq true); + assert_that!(sut.try_wait(), eq WaitResult::Success); } - assert_that!(!sut.try_wait(), eq true); + assert_that!(sut.try_wait(), eq WaitResult::Interrupted); sut.post(|_| {}, initial_value); for _ in 0..initial_value { - assert_that!(sut.try_wait(), eq true); + assert_that!(sut.try_wait(), eq WaitResult::Success); } - assert_that!(!sut.try_wait(), eq true); + assert_that!(sut.try_wait(), eq WaitResult::Interrupted); } #[test] @@ -44,16 +44,16 @@ fn semaphore_post_and_wait_works() { let sut = Semaphore::new(initial_value); for _ in 0..initial_value { - assert_that!(sut.wait(|_, _| false), eq true); + assert_that!(sut.wait(|_, _| WaitAction::Abort), eq WaitResult::Success); } - assert_that!(!sut.wait(|_, _| false), eq true); + assert_that!(sut.wait(|_, _| WaitAction::Abort), eq WaitResult::Interrupted); sut.post(|_| {}, initial_value); for _ in 0..initial_value { - assert_that!(sut.wait(|_, _| false), eq true); + assert_that!(sut.wait(|_, _| WaitAction::Abort), eq WaitResult::Success); } - assert_that!(!sut.wait(|_, _| false), eq true); + assert_that!(sut.wait(|_, _| WaitAction::Abort), eq WaitResult::Interrupted); } #[test] @@ -64,7 +64,7 @@ fn semaphore_wait_blocks() { std::thread::scope(|s| { s.spawn(|| { - sut.wait(|_, _| true); + sut.wait(|_, _| WaitAction::Continue); counter.fetch_add(1, Ordering::Relaxed); }); From c314af83ac1263435b396206d69b71e5761d8b72 Mon Sep 17 00:00:00 2001 From: Christian Eltzschig Date: Wed, 20 Dec 2023 18:13:44 +0100 Subject: [PATCH 7/8] [#51] Adjust mac os to new WaitResult & WaitAction API --- iceoryx2-pal/posix/src/macos/pthread.rs | 55 ++++++++++++----------- iceoryx2-pal/posix/src/macos/semaphore.rs | 9 ++-- 2 files changed, 33 insertions(+), 31 deletions(-) diff --git a/iceoryx2-pal/posix/src/macos/pthread.rs b/iceoryx2-pal/posix/src/macos/pthread.rs index 6c79b9af9..638b32378 100644 --- a/iceoryx2-pal/posix/src/macos/pthread.rs +++ b/iceoryx2-pal/posix/src/macos/pthread.rs @@ -19,7 +19,7 @@ use crate::posix::*; use core::sync::atomic::{AtomicU32, Ordering}; use iceoryx2_pal_concurrency_sync::barrier::Barrier; use iceoryx2_pal_concurrency_sync::mutex::Mutex; -use iceoryx2_pal_concurrency_sync::rwlock::*; +use iceoryx2_pal_concurrency_sync::{rwlock::*, WaitAction, WaitResult}; use std::cell::UnsafeCell; #[derive(Clone, Copy)] @@ -64,7 +64,7 @@ impl ThreadStates { } fn lock(&self) { - self.mtx.lock(|_, _| true); + self.mtx.lock(|_, _| WaitAction::Continue); } fn unlock(&self) { @@ -437,11 +437,11 @@ pub unsafe fn pthread_rwlock_rdlock(lock: *mut pthread_rwlock_t) -> int { match (*lock).lock { RwLockType::PreferReader(ref l) => l.read_lock(|atomic, value| { wait(atomic, value); - true + WaitAction::Continue }), RwLockType::PreferWriter(ref l) => l.read_lock(|atomic, value| { wait(atomic, value); - true + WaitAction::Continue }), _ => { return Errno::EINVAL as _; @@ -452,7 +452,7 @@ pub unsafe fn pthread_rwlock_rdlock(lock: *mut pthread_rwlock_t) -> int { } pub unsafe fn pthread_rwlock_tryrdlock(lock: *mut pthread_rwlock_t) -> int { - let has_lock = match (*lock).lock { + let wait_result = match (*lock).lock { RwLockType::PreferReader(ref l) => l.try_read_lock(), RwLockType::PreferWriter(ref l) => l.try_read_lock(), _ => { @@ -460,7 +460,7 @@ pub unsafe fn pthread_rwlock_tryrdlock(lock: *mut pthread_rwlock_t) -> int { } }; - if has_lock { + if wait_result == WaitResult::Success { Errno::ESUCCES as _ } else { Errno::EBUSY as _ @@ -483,12 +483,12 @@ pub unsafe fn pthread_rwlock_wrlock(lock: *mut pthread_rwlock_t) -> int { match (*lock).lock { RwLockType::PreferReader(ref l) => l.write_lock(|atomic, value| { wait(atomic, value); - true + WaitAction::Continue }), RwLockType::PreferWriter(ref l) => l.write_lock( |atomic, value| { wait(atomic, value); - true + WaitAction::Continue }, wake_one, wake_all, @@ -502,7 +502,7 @@ pub unsafe fn pthread_rwlock_wrlock(lock: *mut pthread_rwlock_t) -> int { } pub unsafe fn pthread_rwlock_trywrlock(lock: *mut pthread_rwlock_t) -> int { - let has_lock = match (*lock).lock { + let wait_result = match (*lock).lock { RwLockType::PreferReader(ref l) => l.try_write_lock(), RwLockType::PreferWriter(ref l) => l.try_write_lock(), _ => { @@ -510,7 +510,7 @@ pub unsafe fn pthread_rwlock_trywrlock(lock: *mut pthread_rwlock_t) -> int { } }; - if has_lock { + if wait_result == WaitResult::Success { Errno::ESUCCES as _ } else { Errno::EBUSY as _ @@ -525,7 +525,7 @@ pub unsafe fn pthread_rwlock_timedwrlock( let mut wait_time = timespec::new(); loop { - let has_lock = match (*lock).lock { + let wait_result = match (*lock).lock { RwLockType::PreferReader(ref l) => l.try_write_lock(), RwLockType::PreferWriter(ref l) => l.try_write_lock(), _ => { @@ -533,7 +533,7 @@ pub unsafe fn pthread_rwlock_timedwrlock( } }; - if has_lock { + if wait_result == WaitResult::Success { return Errno::ESUCCES as _; } else { clock_gettime(CLOCK_REALTIME, &mut current_time); @@ -561,21 +561,21 @@ pub unsafe fn pthread_rwlock_timedrdlock( let mut wait_time = timespec::new(); loop { - let has_lock = match (*lock).lock { + let wait_result = match (*lock).lock { RwLockType::PreferReader(ref l) => l.read_lock(|atomic, value| { wait(atomic, value); - true + WaitAction::Continue }), RwLockType::PreferWriter(ref l) => l.read_lock(|atomic, value| { wait(atomic, value); - true + WaitAction::Continue }), _ => { return Errno::EINVAL as _; } }; - if has_lock { + if wait_result == WaitResult::Success { return Errno::ESUCCES as _; } else { clock_gettime(CLOCK_REALTIME, &mut current_time); @@ -620,11 +620,11 @@ pub unsafe fn pthread_cond_wait(cond: *mut pthread_cond_t, mutex: *mut pthread_m wake_one, |atomic, value| { wait(atomic, value); - true + WaitAction::Continue }, |atomic, value| { wait(atomic, value); - false + WaitAction::Continue }, ); @@ -636,20 +636,21 @@ pub unsafe fn pthread_cond_timedwait( mutex: *mut pthread_mutex_t, abstime: *const timespec, ) -> int { - (*cond).cv.wait( + match (*cond).cv.wait( &(*mutex).mtx, wake_one, |atomic, value| { timed_wait(atomic, value, *abstime); - false + WaitAction::Abort }, |atomic, value| { timed_wait(atomic, value, *abstime); - false + WaitAction::Abort }, - ); - - Errno::ESUCCES as _ + ) { + WaitResult::Interrupted => Errno::ETIMEDOUT as _, + WaitResult::Success => Errno::ESUCCES as _, + } } pub unsafe fn pthread_condattr_init(attr: *mut pthread_condattr_t) -> int { @@ -767,7 +768,7 @@ pub unsafe fn pthread_mutex_lock(mtx: *mut pthread_mutex_t) -> int { (*mtx).mtx.lock(|atomic, value| { wait(atomic, value); - true + WaitAction::Continue }); if (*mtx).track_thread_id { @@ -818,7 +819,7 @@ pub unsafe fn pthread_mutex_trylock(mtx: *mut pthread_mutex_t) -> int { }; match (*mtx).mtx.try_lock() { - true => { + WaitResult::Success => { if (*mtx).track_thread_id { (*mtx) .current_owner @@ -827,7 +828,7 @@ pub unsafe fn pthread_mutex_trylock(mtx: *mut pthread_mutex_t) -> int { Errno::ESUCCES as _ } - false => Errno::EBUSY as _, + WaitResult::Interrupted => Errno::EBUSY as _, } } diff --git a/iceoryx2-pal/posix/src/macos/semaphore.rs b/iceoryx2-pal/posix/src/macos/semaphore.rs index 5793c8081..52dcf9243 100644 --- a/iceoryx2-pal/posix/src/macos/semaphore.rs +++ b/iceoryx2-pal/posix/src/macos/semaphore.rs @@ -15,6 +15,7 @@ #![allow(unused_variables)] use iceoryx2_pal_concurrency_sync::semaphore::Semaphore; +use iceoryx2_pal_concurrency_sync::{WaitAction, WaitResult}; use crate::posix::pthread::{wait, wake_one}; use crate::posix::Errno; @@ -37,9 +38,9 @@ pub unsafe fn sem_post(sem: *mut sem_t) -> int { } pub unsafe fn sem_wait(sem: *mut sem_t) -> int { - (*sem).semaphore.wait(|atomic, value| -> bool { + (*sem).semaphore.wait(|atomic, value| -> WaitAction { wait(atomic, value); - true + WaitAction::Continue }); Errno::set(Errno::ESUCCES); @@ -48,11 +49,11 @@ pub unsafe fn sem_wait(sem: *mut sem_t) -> int { pub unsafe fn sem_trywait(sem: *mut sem_t) -> int { match (*sem).semaphore.try_wait() { - true => { + WaitResult::Success => { Errno::set(Errno::ESUCCES); 0 } - false => { + WaitResult::Interrupted => { Errno::set(Errno::EAGAIN); -1 } From 05d43b00b41073200e4b8e41f90983f8cadfca98 Mon Sep 17 00:00:00 2001 From: Christian Eltzschig Date: Wed, 20 Dec 2023 09:34:19 -0800 Subject: [PATCH 8/8] [#51] Adjust windows to new concurrency primitives api --- iceoryx2-pal/posix/src/windows/pthread.rs | 64 ++++++++++--------- iceoryx2-pal/posix/src/windows/semaphore.rs | 17 ++--- iceoryx2-pal/posix/src/windows/signal.rs | 5 +- .../src/windows/win32_handle_translator.rs | 3 +- 4 files changed, 47 insertions(+), 42 deletions(-) diff --git a/iceoryx2-pal/posix/src/windows/pthread.rs b/iceoryx2-pal/posix/src/windows/pthread.rs index e214c4a30..d1f01dc8c 100644 --- a/iceoryx2-pal/posix/src/windows/pthread.rs +++ b/iceoryx2-pal/posix/src/windows/pthread.rs @@ -25,6 +25,7 @@ use std::{ use iceoryx2_pal_concurrency_sync::rwlock::*; use iceoryx2_pal_concurrency_sync::{barrier::Barrier, mutex::Mutex}; +use iceoryx2_pal_concurrency_sync::{WaitAction, WaitResult}; use windows_sys::Win32::{ Foundation::{CloseHandle, ERROR_TIMEOUT, STILL_ACTIVE, WAIT_FAILED}, System::{ @@ -92,7 +93,7 @@ impl ThreadStates { INFINITE, ) }; } - true + WaitAction::Continue }); } @@ -492,7 +493,7 @@ pub unsafe fn pthread_rwlock_rdlock(lock: *mut pthread_rwlock_t) -> int { 4, INFINITE, )}; - true + WaitAction::Continue }), RwLockType::PreferWriter(ref l) => l.read_lock(|atomic, value| { win32call! {WaitOnAddress( @@ -501,7 +502,7 @@ pub unsafe fn pthread_rwlock_rdlock(lock: *mut pthread_rwlock_t) -> int { 4, INFINITE, )}; - true + WaitAction::Continue }), _ => { return Errno::EINVAL as _; @@ -512,7 +513,7 @@ pub unsafe fn pthread_rwlock_rdlock(lock: *mut pthread_rwlock_t) -> int { } pub unsafe fn pthread_rwlock_tryrdlock(lock: *mut pthread_rwlock_t) -> int { - let has_lock = match (*lock).lock { + let wait_result = match (*lock).lock { RwLockType::PreferReader(ref l) => l.try_read_lock(), RwLockType::PreferWriter(ref l) => l.try_read_lock(), _ => { @@ -520,7 +521,7 @@ pub unsafe fn pthread_rwlock_tryrdlock(lock: *mut pthread_rwlock_t) -> int { } }; - if has_lock { + if wait_result == WaitResult::Success { Errno::ESUCCES as _ } else { Errno::EBUSY as _ @@ -557,7 +558,7 @@ pub unsafe fn pthread_rwlock_wrlock(lock: *mut pthread_rwlock_t) -> int { 4, INFINITE, )}; - true + WaitAction::Continue }), RwLockType::PreferWriter(ref l) => l.write_lock( |atomic, value| { @@ -567,7 +568,7 @@ pub unsafe fn pthread_rwlock_wrlock(lock: *mut pthread_rwlock_t) -> int { 4, INFINITE, )}; - true + WaitAction::Continue }, |atomic| { win32call! { WakeByAddressSingle((atomic as *const AtomicU32).cast()) }; @@ -585,7 +586,7 @@ pub unsafe fn pthread_rwlock_wrlock(lock: *mut pthread_rwlock_t) -> int { } pub unsafe fn pthread_rwlock_trywrlock(lock: *mut pthread_rwlock_t) -> int { - let has_lock = match (*lock).lock { + let wait_result = match (*lock).lock { RwLockType::PreferReader(ref l) => l.try_write_lock(), RwLockType::PreferWriter(ref l) => l.try_write_lock(), _ => { @@ -593,7 +594,7 @@ pub unsafe fn pthread_rwlock_trywrlock(lock: *mut pthread_rwlock_t) -> int { } }; - if has_lock { + if wait_result == WaitResult::Success { Errno::ESUCCES as _ } else { Errno::EBUSY as _ @@ -611,7 +612,7 @@ pub unsafe fn pthread_rwlock_timedwrlock( - now.as_millis() as i64, ); - let has_lock = match (*lock).lock { + let wait_result = match (*lock).lock { RwLockType::PreferReader(ref l) => l.write_lock(|atomic, value| { win32call! { WaitOnAddress( (atomic as *const AtomicU32).cast(), @@ -619,7 +620,7 @@ pub unsafe fn pthread_rwlock_timedwrlock( 4, timeout as _, ), ignore ERROR_TIMEOUT }; - true + WaitAction::Continue }), RwLockType::PreferWriter(ref l) => l.write_lock( |atomic, value| { @@ -629,7 +630,7 @@ pub unsafe fn pthread_rwlock_timedwrlock( 4, timeout as _, ), ignore ERROR_TIMEOUT}; - true + WaitAction::Continue }, |atomic| { WakeByAddressSingle((atomic as *const AtomicU32).cast()); @@ -643,7 +644,7 @@ pub unsafe fn pthread_rwlock_timedwrlock( } }; - if has_lock { + if wait_result == WaitResult::Success { Errno::ESUCCES as _ } else { Errno::ETIMEDOUT as _ @@ -661,7 +662,7 @@ pub unsafe fn pthread_rwlock_timedrdlock( - now.as_millis() as i64, ); - let has_lock = match (*lock).lock { + let wait_result = match (*lock).lock { RwLockType::PreferReader(ref l) => l.read_lock(|atomic, value| { win32call! { WaitOnAddress( (atomic as *const AtomicU32).cast(), @@ -669,7 +670,7 @@ pub unsafe fn pthread_rwlock_timedrdlock( 4, timeout as _, ), ignore ERROR_TIMEOUT }; - true + WaitAction::Continue }), RwLockType::PreferWriter(ref l) => l.read_lock(|atomic, value| { win32call! {WaitOnAddress( @@ -678,14 +679,14 @@ pub unsafe fn pthread_rwlock_timedrdlock( 4, timeout as _, ), ignore ERROR_TIMEOUT}; - true + WaitAction::Continue }), _ => { return Errno::EINVAL as _; } }; - if has_lock { + if wait_result == WaitResult::Success { Errno::ESUCCES as _ } else { Errno::ETIMEDOUT as _ @@ -728,7 +729,7 @@ pub unsafe fn pthread_cond_wait(cond: *mut pthread_cond_t, mutex: *mut pthread_m 4, INFINITE, )}; - true + WaitAction::Continue }, |atomic, value| { win32call! {WaitOnAddress( @@ -737,7 +738,7 @@ pub unsafe fn pthread_cond_wait(cond: *mut pthread_cond_t, mutex: *mut pthread_m 4, INFINITE, )}; - false + WaitAction::Continue }, ); @@ -755,7 +756,7 @@ pub unsafe fn pthread_cond_timedwait( (*abstime).tv_sec * 1000 + (*abstime).tv_nsec as i64 / 1000000 - now.as_millis() as i64, ); - (*cond).cv.wait( + match (*cond).cv.wait( &(*mutex).mtx, |atomic| { win32call! { WakeByAddressSingle((atomic as *const AtomicU32).cast()) }; @@ -767,7 +768,7 @@ pub unsafe fn pthread_cond_timedwait( 4, timeout as _, ), ignore ERROR_TIMEOUT }; - false + WaitAction::Abort }, |atomic, value| { win32call! { WaitOnAddress( @@ -776,11 +777,12 @@ pub unsafe fn pthread_cond_timedwait( 4, timeout as _, ), ignore ERROR_TIMEOUT }; - false + WaitAction::Abort }, - ); - - Errno::ESUCCES as _ + ) { + WaitResult::Success => Errno::ESUCCES as _, + WaitResult::Interrupted => Errno::ETIMEDOUT as _, + } } pub unsafe fn pthread_condattr_init(attr: *mut pthread_condattr_t) -> int { @@ -898,7 +900,7 @@ pub unsafe fn pthread_mutex_lock(mtx: *mut pthread_mutex_t) -> int { 4, INFINITE, ) }; - true + WaitAction::Continue }); if (*mtx).track_thread_id { @@ -937,9 +939,9 @@ pub unsafe fn pthread_mutex_timedlock( 4, timeout as _, ), ignore ERROR_TIMEOUT }; - false + WaitAction::Abort }) { - true => { + WaitResult::Success => { if (*mtx).track_thread_id { (*mtx) .current_owner @@ -948,7 +950,7 @@ pub unsafe fn pthread_mutex_timedlock( Errno::ESUCCES as _ } - false => Errno::ETIMEDOUT as _, + WaitResult::Interrupted => Errno::ETIMEDOUT as _, } } @@ -960,7 +962,7 @@ pub unsafe fn pthread_mutex_trylock(mtx: *mut pthread_mutex_t) -> int { }; match (*mtx).mtx.try_lock() { - true => { + WaitResult::Success => { if (*mtx).track_thread_id { (*mtx) .current_owner @@ -969,7 +971,7 @@ pub unsafe fn pthread_mutex_trylock(mtx: *mut pthread_mutex_t) -> int { Errno::ESUCCES as _ } - false => Errno::EBUSY as _, + WaitResult::Interrupted => Errno::EBUSY as _, } } diff --git a/iceoryx2-pal/posix/src/windows/semaphore.rs b/iceoryx2-pal/posix/src/windows/semaphore.rs index 540d53b3f..7e68c823c 100644 --- a/iceoryx2-pal/posix/src/windows/semaphore.rs +++ b/iceoryx2-pal/posix/src/windows/semaphore.rs @@ -19,6 +19,7 @@ use std::time::SystemTime; use std::time::UNIX_EPOCH; use iceoryx2_pal_concurrency_sync::semaphore::Semaphore; +use iceoryx2_pal_concurrency_sync::{WaitAction, WaitResult}; use windows_sys::Win32::System::Threading::WaitOnAddress; use windows_sys::Win32::System::Threading::WakeByAddressSingle; use windows_sys::Win32::System::Threading::INFINITE; @@ -49,7 +50,7 @@ pub unsafe fn sem_post(sem: *mut sem_t) -> int { } pub unsafe fn sem_wait(sem: *mut sem_t) -> int { - (*sem).semaphore.wait(|atomic, value| -> bool { + (*sem).semaphore.wait(|atomic, value| -> WaitAction { WaitOnAddress( (atomic as *const AtomicU32).cast(), (value as *const u32).cast(), @@ -57,7 +58,7 @@ pub unsafe fn sem_wait(sem: *mut sem_t) -> int { INFINITE, ); - true + WaitAction::Continue }); Errno::set(Errno::ESUCCES); @@ -66,11 +67,11 @@ pub unsafe fn sem_wait(sem: *mut sem_t) -> int { pub unsafe fn sem_trywait(sem: *mut sem_t) -> int { match (*sem).semaphore.try_wait() { - true => { + WaitResult::Success => { Errno::set(Errno::ESUCCES); 0 } - false => { + WaitResult::Interrupted => { Errno::set(Errno::EAGAIN); -1 } @@ -82,7 +83,7 @@ pub unsafe fn sem_timedwait(sem: *mut sem_t, abs_timeout: *const timespec) -> in let milli_seconds = (*abs_timeout).tv_sec * 1000 + (*abs_timeout).tv_nsec as i64 / 1000000 - now.as_millis() as i64; - match (*sem).semaphore.wait(|atomic, value| -> bool { + match (*sem).semaphore.wait(|atomic, value| -> WaitAction { WaitOnAddress( (atomic as *const AtomicU32).cast(), (value as *const u32).cast(), @@ -90,13 +91,13 @@ pub unsafe fn sem_timedwait(sem: *mut sem_t, abs_timeout: *const timespec) -> in milli_seconds as _, ); - false + WaitAction::Abort }) { - true => { + WaitResult::Success => { Errno::set(Errno::ESUCCES); 0 } - false => { + WaitResult::Interrupted => { Errno::set(Errno::ETIMEDOUT); -1 } diff --git a/iceoryx2-pal/posix/src/windows/signal.rs b/iceoryx2-pal/posix/src/windows/signal.rs index 3964b68f2..83eb3c991 100644 --- a/iceoryx2-pal/posix/src/windows/signal.rs +++ b/iceoryx2-pal/posix/src/windows/signal.rs @@ -15,6 +15,7 @@ #![allow(unused_variables)] use iceoryx2_pal_concurrency_sync::mutex::Mutex; +use iceoryx2_pal_concurrency_sync::WaitAction; use windows_sys::Win32::{ Foundation::{FALSE, TRUE}, System::{ @@ -54,14 +55,14 @@ impl SigAction { } fn get(&self) -> sigaction_t { - self.mtx.lock(|_, _| true); + self.mtx.lock(|_, _| WaitAction::Continue); let ret_val = unsafe { *self.action.get() }; self.mtx.unlock(|_| {}); ret_val } fn set(&self, value: sigaction_t) -> sigaction_t { - self.mtx.lock(|_, _| true); + self.mtx.lock(|_, _| WaitAction::Continue); let ret_val = unsafe { *self.action.get() }; unsafe { *self.action.get() = value }; self.mtx.unlock(|_| {}); diff --git a/iceoryx2-pal/posix/src/windows/win32_handle_translator.rs b/iceoryx2-pal/posix/src/windows/win32_handle_translator.rs index 06b68f441..e8a8ac0a0 100644 --- a/iceoryx2-pal/posix/src/windows/win32_handle_translator.rs +++ b/iceoryx2-pal/posix/src/windows/win32_handle_translator.rs @@ -19,6 +19,7 @@ use windows_sys::Win32::{ use crate::posix::{c_string_length, ntohs, types::*}; use core::{cell::UnsafeCell, panic}; use iceoryx2_pal_concurrency_sync::mutex::Mutex; +use iceoryx2_pal_concurrency_sync::WaitAction; use std::sync::atomic::{AtomicBool, AtomicU32, AtomicUsize, Ordering}; use super::win32_udp_port_to_uds_name::{PortToUds, MAX_UDS_NAME_LEN}; @@ -141,7 +142,7 @@ impl HandleTranslator { INFINITE, ); } - true + WaitAction::Continue }); }