Skip to content

Commit

Permalink
Support timedlock of fast/hook pthread and bthread::Mutex
Browse files Browse the repository at this point in the history
  • Loading branch information
chenBright committed Sep 12, 2024
1 parent 9643150 commit 5f8b95e
Show file tree
Hide file tree
Showing 6 changed files with 212 additions and 37 deletions.
5 changes: 0 additions & 5 deletions src/bthread/butex.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -68,11 +68,6 @@ inline bvar::Adder<int64_t>& butex_waiter_count() {
}
#endif

// If a thread would suspend for less than so many microseconds, return
// ETIMEDOUT directly.
// Use 1: sleeping for less than 2 microsecond is inefficient and useless.
static const int64_t MIN_SLEEP_US = 2;

enum WaiterState {
WAITER_STATE_NONE,
WAITER_STATE_READY,
Expand Down
5 changes: 5 additions & 0 deletions src/bthread/butex.h
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,11 @@

namespace bthread {

// If a thread would suspend for less than so many microseconds, return
// ETIMEDOUT directly.
// Use 1: sleeping for less than 2 microsecond is inefficient and useless.
static const int64_t MIN_SLEEP_US = 2;

// Create a butex which is a futex-like 32-bit primitive for synchronizing
// bthreads/pthreads.
// Returns a pointer to 32-bit data, NULL on failure.
Expand Down
154 changes: 124 additions & 30 deletions src/bthread/mutex.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@

// Date: Sun Aug 3 12:46:15 CST 2014

#include <sys/cdefs.h>
#include <pthread.h>
#include <dlfcn.h> // dlsym
#include <fcntl.h> // O_RDONLY
Expand Down Expand Up @@ -47,9 +48,9 @@
#include "bthread/processor.h"
#include "bthread/task_group.h"

extern "C" {
__BEGIN_DECLS
extern void* BAIDU_WEAK _dl_sym(void* handle, const char* symbol, void* caller);
}
__END_DECLS

namespace bthread {

Expand Down Expand Up @@ -391,6 +392,13 @@ int first_sys_pthread_mutex_unlock(pthread_mutex_t* mutex);
static MutexOp sys_pthread_mutex_lock = first_sys_pthread_mutex_lock;
static MutexOp sys_pthread_mutex_trylock = first_sys_pthread_mutex_trylock;
static MutexOp sys_pthread_mutex_unlock = first_sys_pthread_mutex_unlock;
#if HAS_PTHREAD_MUTEX_TIMEDLOCK
typedef int (*TimedMutexOp)(pthread_mutex_t*, const struct timespec*);
int first_sys_pthread_mutex_timedlock(pthread_mutex_t* mutex,
const struct timespec* __abstime);
static TimedMutexOp sys_pthread_mutex_timedlock = first_sys_pthread_mutex_timedlock;
#endif // HAS_PTHREAD_MUTEX_TIMEDLOCK

static pthread_once_t init_sys_mutex_lock_once = PTHREAD_ONCE_INIT;

// dlsym may call malloc to allocate space for dlerror and causes contention
Expand Down Expand Up @@ -438,11 +446,18 @@ static void init_sys_mutex_lock() {
RTLD_NEXT, "pthread_mutex_unlock", (void*)init_sys_mutex_lock);
sys_pthread_mutex_trylock = (MutexOp)_dl_sym(
RTLD_NEXT, "pthread_mutex_trylock", (void*)init_sys_mutex_lock);
#if HAS_PTHREAD_MUTEX_TIMEDLOCK
sys_pthread_mutex_timedlock = (TimedMutexOp)_dl_sym(
RTLD_NEXT, "pthread_mutex_timedlock", (void*)init_sys_mutex_lock);
#endif // HAS_PTHREAD_MUTEX_TIMEDLOCK
} else {
// _dl_sym may be undefined reference in some system, fallback to dlsym
sys_pthread_mutex_lock = (MutexOp)dlsym(RTLD_NEXT, "pthread_mutex_lock");
sys_pthread_mutex_unlock = (MutexOp)dlsym(RTLD_NEXT, "pthread_mutex_unlock");
sys_pthread_mutex_trylock = (MutexOp)dlsym(RTLD_NEXT, "pthread_mutex_trylock");
#if HAS_PTHREAD_MUTEX_TIMEDLOCK
sys_pthread_mutex_timedlock = (TimedMutexOp)dlsym(RTLD_NEXT, "pthread_mutex_timedlock");
#endif // HAS_PTHREAD_MUTEX_TIMEDLOCK
}
#elif defined(OS_MACOSX)
// TODO: look workaround for dlsym on mac
Expand All @@ -465,6 +480,14 @@ int first_sys_pthread_mutex_trylock(pthread_mutex_t* mutex) {
return sys_pthread_mutex_trylock(mutex);
}

#if HAS_PTHREAD_MUTEX_TIMEDLOCK
int first_sys_pthread_mutex_timedlock(pthread_mutex_t* mutex,
const struct timespec* abstime) {
pthread_once(&init_sys_mutex_lock_once, init_sys_mutex_lock);
return sys_pthread_mutex_timedlock(mutex, abstime);
}
#endif // HAS_PTHREAD_MUTEX_TIMEDLOCK

int first_sys_pthread_mutex_unlock(pthread_mutex_t* mutex) {
pthread_once(&init_sys_mutex_lock_once, init_sys_mutex_lock);
return sys_pthread_mutex_unlock(mutex);
Expand Down Expand Up @@ -499,7 +522,7 @@ void CheckBthreadScheSafety() {
true, butil::memory_order_relaxed))) {
butil::debug::StackTrace trace(true);
// It can only be checked once because the counter is messed up.
LOG(ERROR) << "bthread is suspended while holding"
LOG(ERROR) << "bthread is suspended while holding "
<< tls_pthread_lock_count << " pthread locks."
<< std::endl << trace.ToString();
}
Expand Down Expand Up @@ -610,10 +633,27 @@ void submit_contention(const bthread_contention_site_t& csite, int64_t now_ns) {

namespace internal {
#ifndef NO_PTHREAD_MUTEX_HOOK
BUTIL_FORCE_INLINE int pthread_mutex_lock_internal(pthread_mutex_t* mutex) {
++bthread::tls_pthread_lock_count;
return sys_pthread_mutex_lock(mutex);
#if HAS_PTHREAD_MUTEX_TIMEDLOCK
BUTIL_FORCE_INLINE int pthread_mutex_lock_internal(pthread_mutex_t* mutex,
const struct timespec* abstime) {
int rc = NULL == abstime ?
sys_pthread_mutex_lock(mutex) :
sys_pthread_mutex_timedlock(mutex, abstime);
if (0 == rc) {
++tls_pthread_lock_count;
}
return rc;
}
#else
BUTIL_FORCE_INLINE int pthread_mutex_lock_internal(pthread_mutex_t* mutex,
const struct timespec*/* Not supported */) {
int rc = sys_pthread_mutex_lock(mutex);
if (0 == rc) {
++tls_pthread_lock_count;
}
return rc;
}
#endif // HAS_PTHREAD_MUTEX_TIMEDLOCK

BUTIL_FORCE_INLINE int pthread_mutex_trylock_internal(pthread_mutex_t* mutex) {
int rc = sys_pthread_mutex_trylock(mutex);
Expand All @@ -627,11 +667,16 @@ BUTIL_FORCE_INLINE int pthread_mutex_unlock_internal(pthread_mutex_t* mutex) {
--tls_pthread_lock_count;
return sys_pthread_mutex_unlock(mutex);
}
#endif
#endif // NO_PTHREAD_MUTEX_HOOK

BUTIL_FORCE_INLINE int pthread_mutex_lock_internal(FastPthreadMutex* mutex) {
mutex->lock();
return 0;
BUTIL_FORCE_INLINE int pthread_mutex_lock_internal(FastPthreadMutex* mutex,
const struct timespec* abstime) {
if (NULL == abstime) {
mutex->lock();
return 0;
} else {
return mutex->timed_lock(abstime) ? 0 : errno;
}
}

BUTIL_FORCE_INLINE int pthread_mutex_trylock_internal(FastPthreadMutex* mutex) {
Expand All @@ -644,13 +689,13 @@ BUTIL_FORCE_INLINE int pthread_mutex_unlock_internal(FastPthreadMutex* mutex) {
}

template <typename Mutex>
BUTIL_FORCE_INLINE int pthread_mutex_lock_impl(Mutex* mutex) {
BUTIL_FORCE_INLINE int pthread_mutex_lock_impl(Mutex* mutex, const struct timespec* abstime) {
// Don't change behavior of lock when profiler is off.
if (!g_cp ||
// collecting code including backtrace() and submit() may call
// pthread_mutex_lock and cause deadlock. Don't sample.
tls_inside_lock) {
return pthread_mutex_lock_internal(mutex);
return pthread_mutex_lock_internal(mutex, abstime);
}
// Don't slow down non-contended locks.
int rc = pthread_mutex_trylock_internal(mutex);
Expand All @@ -673,16 +718,16 @@ BUTIL_FORCE_INLINE int pthread_mutex_lock_impl(Mutex* mutex) {
csite = &entry.csite;
if (!sampling_range) {
make_contention_site_invalid(&entry.csite);
return pthread_mutex_lock_internal(mutex);
return pthread_mutex_lock_internal(mutex, abstime);
}
}
#endif
if (!sampling_range) { // don't sample
return pthread_mutex_lock_internal(mutex);
return pthread_mutex_lock_internal(mutex, abstime);
}
// Lock and monitor the waiting time.
const int64_t start_ns = butil::cpuwide_time_ns();
rc = pthread_mutex_lock_internal(mutex);
rc = pthread_mutex_lock_internal(mutex, abstime);
if (!rc) { // Inside lock
if (!csite) {
csite = add_pthread_contention_site(mutex);
Expand Down Expand Up @@ -748,13 +793,20 @@ BUTIL_FORCE_INLINE int pthread_mutex_unlock_impl(Mutex* mutex) {

#ifndef NO_PTHREAD_MUTEX_HOOK
BUTIL_FORCE_INLINE int pthread_mutex_lock_impl(pthread_mutex_t* mutex) {
return internal::pthread_mutex_lock_impl(mutex);
return internal::pthread_mutex_lock_impl(mutex, NULL);
}

BUTIL_FORCE_INLINE int pthread_mutex_trylock_impl(pthread_mutex_t* mutex) {
return internal::pthread_mutex_trylock_impl(mutex);
}

#if HAS_PTHREAD_MUTEX_TIMEDLOCK
BUTIL_FORCE_INLINE int pthread_mutex_timedlock_impl(pthread_mutex_t* mutex,
const struct timespec* abstime) {
return internal::pthread_mutex_lock_impl(mutex, abstime);
}
#endif // HAS_PTHREAD_MUTEX_TIMEDLOCK

BUTIL_FORCE_INLINE int pthread_mutex_unlock_impl(pthread_mutex_t* mutex) {
return internal::pthread_mutex_unlock_impl(mutex);
}
Expand All @@ -779,8 +831,7 @@ BAIDU_CASSERT(sizeof(unsigned) == sizeof(MutexInternal),

const int MAX_SPIN_ITER = 4;

inline int mutex_lock_contended_impl(
bthread_mutex_t* m, const struct timespec* __restrict abstime) {
inline int mutex_lock_contended_impl(bthread_mutex_t* m, const struct timespec* abstime) {
// When a bthread first contends for a lock, active spinning makes sense.
// Spin only few times and only if local `rq' is empty.
TaskGroup* g = BAIDU_GET_VOLATILE_THREAD_LOCAL(tls_task_group);
Expand Down Expand Up @@ -819,22 +870,41 @@ inline int mutex_lock_contended_impl(
#ifdef BTHREAD_USE_FAST_PTHREAD_MUTEX
namespace internal {

int FastPthreadMutex::lock_contended() {
butil::atomic<unsigned>* whole = (butil::atomic<unsigned>*)&_futex;
int FastPthreadMutex::lock_contended(const struct timespec* abstime) {
int64_t abstime_us = 0;
if (NULL != abstime) {
abstime_us = butil::timespec_to_microseconds(*abstime);
}
auto whole = (butil::atomic<unsigned>*)&_futex;
while (whole->exchange(BTHREAD_MUTEX_CONTENDED) & BTHREAD_MUTEX_LOCKED) {
if (futex_wait_private(whole, BTHREAD_MUTEX_CONTENDED, NULL) < 0
&& errno != EWOULDBLOCK) {
timespec* ptimeout = NULL;
timespec timeout{};
if (NULL != abstime) {
timeout = butil::microseconds_to_timespec(
abstime_us - butil::gettimeofday_us());
ptimeout = &timeout;
}
if (NULL == abstime || abstime_us > MIN_SLEEP_US) {
if (futex_wait_private(whole, BTHREAD_MUTEX_CONTENDED, ptimeout) < 0
&& errno != EWOULDBLOCK && errno != EINTR/*note*/) {
// A mutex lock should ignore interruptions in general since
// user code is unlikely to check the return value.
return errno;
}
} else {
errno = ETIMEDOUT;
return errno;
}
}
return 0;
}

void FastPthreadMutex::lock() {
auto split = (bthread::MutexInternal*)&_futex;
if (split->locked.exchange(1, butil::memory_order_acquire)) {
(void)lock_contended();
if (try_lock()) {
return;
}

(void)lock_contended(NULL);
++tls_pthread_lock_count;
}

Expand All @@ -847,30 +917,47 @@ bool FastPthreadMutex::try_lock() {
return lock;
}

bool FastPthreadMutex::timed_lock(const struct timespec* abstime) {
if (try_lock()) {
return true;
}
int rc = lock_contended(abstime);
if (rc == 0) {
++tls_pthread_lock_count;
}
return rc == 0;
}

void FastPthreadMutex::unlock() {
--tls_pthread_lock_count;
auto whole = (butil::atomic<unsigned>*)&_futex;
const unsigned prev = whole->exchange(0, butil::memory_order_release);
// CAUTION: the mutex may be destroyed, check comments before butex_create
if (prev != BTHREAD_MUTEX_LOCKED) {
futex_wake_private(whole, 1);
}
--tls_pthread_lock_count;
}

} // namespace internal
#endif // BTHREAD_USE_FAST_PTHREAD_MUTEX

void FastPthreadMutex::lock() {
internal::pthread_mutex_lock_impl(&_mutex);
internal::pthread_mutex_lock_impl(&_mutex, NULL);
}

void FastPthreadMutex::unlock() {
internal::pthread_mutex_unlock_impl(&_mutex);
}

#if defined(BTHREAD_USE_FAST_PTHREAD_MUTEX) || HAS_PTHREAD_MUTEX_TIMEDLOCK
bool FastPthreadMutex::timed_lock(const struct timespec* abstime) {
return internal::pthread_mutex_lock_impl(&_mutex, abstime) == 0;
}
#endif // BTHREAD_USE_FAST_PTHREAD_MUTEX HAS_PTHREAD_MUTEX_TIMEDLOCK

} // namespace bthread

extern "C" {
__BEGIN_DECLS

int bthread_mutex_init(bthread_mutex_t* __restrict m,
const bthread_mutexattr_t* __restrict) {
Expand Down Expand Up @@ -990,9 +1077,16 @@ int pthread_mutex_lock(pthread_mutex_t* __mutex) {
int pthread_mutex_trylock(pthread_mutex_t* __mutex) {
return bthread::pthread_mutex_trylock_impl(__mutex);
}
#if defined(OS_LINUX) && defined(OS_POSIX) && defined(__USE_XOPEN2K)
int pthread_mutex_timedlock(pthread_mutex_t *__restrict __mutex,
const struct timespec *__restrict __abstime) {
return bthread::pthread_mutex_timedlock_impl(__mutex, __abstime);
}
#endif // OS_POSIX __USE_XOPEN2K
int pthread_mutex_unlock(pthread_mutex_t* __mutex) {
return bthread::pthread_mutex_unlock_impl(__mutex);
}
#endif
#endif // NO_PTHREAD_MUTEX_HOOK


} // extern "C"
__END_DECLS
10 changes: 9 additions & 1 deletion src/bthread/mutex.h
Original file line number Diff line number Diff line change
Expand Up @@ -61,6 +61,9 @@ class Mutex {
}
void unlock() { bthread_mutex_unlock(&_mutex); }
bool try_lock() { return !bthread_mutex_trylock(&_mutex); }
bool timed_lock(const struct timespec* abstime) {
return !bthread_mutex_timedlock(&_mutex, abstime);
}
// TODO(chenzhangyi01): Complement interfaces for C++11
private:
DISALLOW_COPY_AND_ASSIGN(Mutex);
Expand All @@ -76,9 +79,10 @@ class FastPthreadMutex {
void lock();
void unlock();
bool try_lock();
bool timed_lock(const struct timespec* abstime);
private:
DISALLOW_COPY_AND_ASSIGN(FastPthreadMutex);
int lock_contended();
int lock_contended(const struct timespec* abstime);
unsigned _futex;
};
#else
Expand All @@ -95,6 +99,10 @@ class FastPthreadMutex {
void lock();
void unlock();
bool try_lock() { return _mutex.try_lock(); }
#if defined(BTHREAD_USE_FAST_PTHREAD_MUTEX) || HAS_PTHREAD_MUTEX_TIMEDLOCK
bool timed_lock(const struct timespec* abstime);
#endif // BTHREAD_USE_FAST_PTHREAD_MUTEX HAS_PTHREAD_MUTEX_TIMEDLOCK

private:
internal::FastPthreadMutex _mutex;
};
Expand Down
13 changes: 12 additions & 1 deletion src/butil/synchronization/lock.h
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,12 @@
#include <windows.h>
#elif defined(OS_POSIX)
#include <pthread.h>
#endif
#if defined(OS_LINUX) && defined(__USE_XOPEN2K)
#define HAS_PTHREAD_MUTEX_TIMEDLOCK 1
#else
#define HAS_PTHREAD_MUTEX_TIMEDLOCK 0
#endif // OS_LINUX __USE_XOPEN2K
#endif // OS_POSIX

#include "butil/base_export.h"
#include "butil/macros.h"
Expand Down Expand Up @@ -90,6 +95,12 @@ class BUTIL_EXPORT Mutex {
#endif
}

#if HAS_PTHREAD_MUTEX_TIMEDLOCK
bool timed_lock(const struct timespec* abstime) {
return pthread_mutex_timedlock(&_native_handle, abstime) == 0;
}
#endif // HAS_PTHREAD_MUTEX_TIMEDLOCK

// Returns the underlying implementation-defined native handle object.
NativeHandle* native_handle() { return &_native_handle; }

Expand Down
Loading

0 comments on commit 5f8b95e

Please sign in to comment.