Skip to content

Commit

Permalink
[#51] Adjust windows to new concurrency primitives api
Browse files Browse the repository at this point in the history
  • Loading branch information
elfenpiff committed Dec 20, 2023
1 parent c314af8 commit 05d43b0
Show file tree
Hide file tree
Showing 4 changed files with 47 additions and 42 deletions.
64 changes: 33 additions & 31 deletions iceoryx2-pal/posix/src/windows/pthread.rs
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ use std::{

use iceoryx2_pal_concurrency_sync::rwlock::*;
use iceoryx2_pal_concurrency_sync::{barrier::Barrier, mutex::Mutex};
use iceoryx2_pal_concurrency_sync::{WaitAction, WaitResult};
use windows_sys::Win32::{
Foundation::{CloseHandle, ERROR_TIMEOUT, STILL_ACTIVE, WAIT_FAILED},
System::{
Expand Down Expand Up @@ -92,7 +93,7 @@ impl ThreadStates {
INFINITE,
) };
}
true
WaitAction::Continue
});
}

Expand Down Expand Up @@ -492,7 +493,7 @@ pub unsafe fn pthread_rwlock_rdlock(lock: *mut pthread_rwlock_t) -> int {
4,
INFINITE,
)};
true
WaitAction::Continue
}),
RwLockType::PreferWriter(ref l) => l.read_lock(|atomic, value| {
win32call! {WaitOnAddress(
Expand All @@ -501,7 +502,7 @@ pub unsafe fn pthread_rwlock_rdlock(lock: *mut pthread_rwlock_t) -> int {
4,
INFINITE,
)};
true
WaitAction::Continue
}),
_ => {
return Errno::EINVAL as _;
Expand All @@ -512,15 +513,15 @@ pub unsafe fn pthread_rwlock_rdlock(lock: *mut pthread_rwlock_t) -> int {
}

pub unsafe fn pthread_rwlock_tryrdlock(lock: *mut pthread_rwlock_t) -> int {
let has_lock = match (*lock).lock {
let wait_result = match (*lock).lock {
RwLockType::PreferReader(ref l) => l.try_read_lock(),
RwLockType::PreferWriter(ref l) => l.try_read_lock(),
_ => {
return Errno::EINVAL as _;
}
};

if has_lock {
if wait_result == WaitResult::Success {
Errno::ESUCCES as _
} else {
Errno::EBUSY as _
Expand Down Expand Up @@ -557,7 +558,7 @@ pub unsafe fn pthread_rwlock_wrlock(lock: *mut pthread_rwlock_t) -> int {
4,
INFINITE,
)};
true
WaitAction::Continue
}),
RwLockType::PreferWriter(ref l) => l.write_lock(
|atomic, value| {
Expand All @@ -567,7 +568,7 @@ pub unsafe fn pthread_rwlock_wrlock(lock: *mut pthread_rwlock_t) -> int {
4,
INFINITE,
)};
true
WaitAction::Continue
},
|atomic| {
win32call! { WakeByAddressSingle((atomic as *const AtomicU32).cast()) };
Expand All @@ -585,15 +586,15 @@ pub unsafe fn pthread_rwlock_wrlock(lock: *mut pthread_rwlock_t) -> int {
}

pub unsafe fn pthread_rwlock_trywrlock(lock: *mut pthread_rwlock_t) -> int {
let has_lock = match (*lock).lock {
let wait_result = match (*lock).lock {
RwLockType::PreferReader(ref l) => l.try_write_lock(),
RwLockType::PreferWriter(ref l) => l.try_write_lock(),
_ => {
return Errno::EINVAL as _;
}
};

if has_lock {
if wait_result == WaitResult::Success {
Errno::ESUCCES as _
} else {
Errno::EBUSY as _
Expand All @@ -611,15 +612,15 @@ pub unsafe fn pthread_rwlock_timedwrlock(
- now.as_millis() as i64,
);

let has_lock = match (*lock).lock {
let wait_result = match (*lock).lock {
RwLockType::PreferReader(ref l) => l.write_lock(|atomic, value| {
win32call! { WaitOnAddress(
(atomic as *const AtomicU32).cast(),
(value as *const u32).cast(),
4,
timeout as _,
), ignore ERROR_TIMEOUT };
true
WaitAction::Continue
}),
RwLockType::PreferWriter(ref l) => l.write_lock(
|atomic, value| {
Expand All @@ -629,7 +630,7 @@ pub unsafe fn pthread_rwlock_timedwrlock(
4,
timeout as _,
), ignore ERROR_TIMEOUT};
true
WaitAction::Continue
},
|atomic| {
WakeByAddressSingle((atomic as *const AtomicU32).cast());
Expand All @@ -643,7 +644,7 @@ pub unsafe fn pthread_rwlock_timedwrlock(
}
};

if has_lock {
if wait_result == WaitResult::Success {
Errno::ESUCCES as _
} else {
Errno::ETIMEDOUT as _
Expand All @@ -661,15 +662,15 @@ pub unsafe fn pthread_rwlock_timedrdlock(
- now.as_millis() as i64,
);

let has_lock = match (*lock).lock {
let wait_result = match (*lock).lock {
RwLockType::PreferReader(ref l) => l.read_lock(|atomic, value| {
win32call! { WaitOnAddress(
(atomic as *const AtomicU32).cast(),
(value as *const u32).cast(),
4,
timeout as _,
), ignore ERROR_TIMEOUT };
true
WaitAction::Continue
}),
RwLockType::PreferWriter(ref l) => l.read_lock(|atomic, value| {
win32call! {WaitOnAddress(
Expand All @@ -678,14 +679,14 @@ pub unsafe fn pthread_rwlock_timedrdlock(
4,
timeout as _,
), ignore ERROR_TIMEOUT};
true
WaitAction::Continue
}),
_ => {
return Errno::EINVAL as _;
}
};

if has_lock {
if wait_result == WaitResult::Success {
Errno::ESUCCES as _
} else {
Errno::ETIMEDOUT as _
Expand Down Expand Up @@ -728,7 +729,7 @@ pub unsafe fn pthread_cond_wait(cond: *mut pthread_cond_t, mutex: *mut pthread_m
4,
INFINITE,
)};
true
WaitAction::Continue
},
|atomic, value| {
win32call! {WaitOnAddress(
Expand All @@ -737,7 +738,7 @@ pub unsafe fn pthread_cond_wait(cond: *mut pthread_cond_t, mutex: *mut pthread_m
4,
INFINITE,
)};
false
WaitAction::Continue
},
);

Expand All @@ -755,7 +756,7 @@ pub unsafe fn pthread_cond_timedwait(
(*abstime).tv_sec * 1000 + (*abstime).tv_nsec as i64 / 1000000 - now.as_millis() as i64,
);

(*cond).cv.wait(
match (*cond).cv.wait(
&(*mutex).mtx,
|atomic| {
win32call! { WakeByAddressSingle((atomic as *const AtomicU32).cast()) };
Expand All @@ -767,7 +768,7 @@ pub unsafe fn pthread_cond_timedwait(
4,
timeout as _,
), ignore ERROR_TIMEOUT };
false
WaitAction::Abort
},
|atomic, value| {
win32call! { WaitOnAddress(
Expand All @@ -776,11 +777,12 @@ pub unsafe fn pthread_cond_timedwait(
4,
timeout as _,
), ignore ERROR_TIMEOUT };
false
WaitAction::Abort
},
);

Errno::ESUCCES as _
) {
WaitResult::Success => Errno::ESUCCES as _,
WaitResult::Interrupted => Errno::ETIMEDOUT as _,
}
}

pub unsafe fn pthread_condattr_init(attr: *mut pthread_condattr_t) -> int {
Expand Down Expand Up @@ -898,7 +900,7 @@ pub unsafe fn pthread_mutex_lock(mtx: *mut pthread_mutex_t) -> int {
4,
INFINITE,
) };
true
WaitAction::Continue
});

if (*mtx).track_thread_id {
Expand Down Expand Up @@ -937,9 +939,9 @@ pub unsafe fn pthread_mutex_timedlock(
4,
timeout as _,
), ignore ERROR_TIMEOUT };
false
WaitAction::Abort
}) {
true => {
WaitResult::Success => {
if (*mtx).track_thread_id {
(*mtx)
.current_owner
Expand All @@ -948,7 +950,7 @@ pub unsafe fn pthread_mutex_timedlock(

Errno::ESUCCES as _
}
false => Errno::ETIMEDOUT as _,
WaitResult::Interrupted => Errno::ETIMEDOUT as _,
}
}

Expand All @@ -960,7 +962,7 @@ pub unsafe fn pthread_mutex_trylock(mtx: *mut pthread_mutex_t) -> int {
};

match (*mtx).mtx.try_lock() {
true => {
WaitResult::Success => {
if (*mtx).track_thread_id {
(*mtx)
.current_owner
Expand All @@ -969,7 +971,7 @@ pub unsafe fn pthread_mutex_trylock(mtx: *mut pthread_mutex_t) -> int {

Errno::ESUCCES as _
}
false => Errno::EBUSY as _,
WaitResult::Interrupted => Errno::EBUSY as _,
}
}

Expand Down
17 changes: 9 additions & 8 deletions iceoryx2-pal/posix/src/windows/semaphore.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ use std::time::SystemTime;
use std::time::UNIX_EPOCH;

use iceoryx2_pal_concurrency_sync::semaphore::Semaphore;
use iceoryx2_pal_concurrency_sync::{WaitAction, WaitResult};
use windows_sys::Win32::System::Threading::WaitOnAddress;
use windows_sys::Win32::System::Threading::WakeByAddressSingle;
use windows_sys::Win32::System::Threading::INFINITE;
Expand Down Expand Up @@ -49,15 +50,15 @@ pub unsafe fn sem_post(sem: *mut sem_t) -> int {
}

pub unsafe fn sem_wait(sem: *mut sem_t) -> int {
(*sem).semaphore.wait(|atomic, value| -> bool {
(*sem).semaphore.wait(|atomic, value| -> WaitAction {
WaitOnAddress(
(atomic as *const AtomicU32).cast(),
(value as *const u32).cast(),
4,
INFINITE,
);

true
WaitAction::Continue
});

Errno::set(Errno::ESUCCES);
Expand All @@ -66,11 +67,11 @@ pub unsafe fn sem_wait(sem: *mut sem_t) -> int {

pub unsafe fn sem_trywait(sem: *mut sem_t) -> int {
match (*sem).semaphore.try_wait() {
true => {
WaitResult::Success => {
Errno::set(Errno::ESUCCES);
0
}
false => {
WaitResult::Interrupted => {
Errno::set(Errno::EAGAIN);
-1
}
Expand All @@ -82,21 +83,21 @@ pub unsafe fn sem_timedwait(sem: *mut sem_t, abs_timeout: *const timespec) -> in
let milli_seconds = (*abs_timeout).tv_sec * 1000 + (*abs_timeout).tv_nsec as i64 / 1000000
- now.as_millis() as i64;

match (*sem).semaphore.wait(|atomic, value| -> bool {
match (*sem).semaphore.wait(|atomic, value| -> WaitAction {
WaitOnAddress(
(atomic as *const AtomicU32).cast(),
(value as *const u32).cast(),
4,
milli_seconds as _,
);

false
WaitAction::Abort
}) {
true => {
WaitResult::Success => {
Errno::set(Errno::ESUCCES);
0
}
false => {
WaitResult::Interrupted => {
Errno::set(Errno::ETIMEDOUT);
-1
}
Expand Down
5 changes: 3 additions & 2 deletions iceoryx2-pal/posix/src/windows/signal.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
#![allow(unused_variables)]

use iceoryx2_pal_concurrency_sync::mutex::Mutex;
use iceoryx2_pal_concurrency_sync::WaitAction;
use windows_sys::Win32::{
Foundation::{FALSE, TRUE},
System::{
Expand Down Expand Up @@ -54,14 +55,14 @@ impl SigAction {
}

fn get(&self) -> sigaction_t {
self.mtx.lock(|_, _| true);
self.mtx.lock(|_, _| WaitAction::Continue);
let ret_val = unsafe { *self.action.get() };
self.mtx.unlock(|_| {});
ret_val
}

fn set(&self, value: sigaction_t) -> sigaction_t {
self.mtx.lock(|_, _| true);
self.mtx.lock(|_, _| WaitAction::Continue);
let ret_val = unsafe { *self.action.get() };
unsafe { *self.action.get() = value };
self.mtx.unlock(|_| {});
Expand Down
3 changes: 2 additions & 1 deletion iceoryx2-pal/posix/src/windows/win32_handle_translator.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ use windows_sys::Win32::{
use crate::posix::{c_string_length, ntohs, types::*};
use core::{cell::UnsafeCell, panic};
use iceoryx2_pal_concurrency_sync::mutex::Mutex;
use iceoryx2_pal_concurrency_sync::WaitAction;
use std::sync::atomic::{AtomicBool, AtomicU32, AtomicUsize, Ordering};

use super::win32_udp_port_to_uds_name::{PortToUds, MAX_UDS_NAME_LEN};
Expand Down Expand Up @@ -141,7 +142,7 @@ impl HandleTranslator {
INFINITE,
);
}
true
WaitAction::Continue
});
}

Expand Down

0 comments on commit 05d43b0

Please sign in to comment.