This is an automated email from the ASF dual-hosted git repository. wwbmmm pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/brpc.git
The following commit(s) were added to refs/heads/master by this push: new f17048e1 Support semaphore and rwlock for bthread (#2752) f17048e1 is described below commit f17048e18011d842bcaf3f37e50dc45307573a35 Author: Bright Chen <chenguangmin...@foxmail.com> AuthorDate: Thu Sep 26 10:49:23 2024 +0800 Support semaphore and rwlock for bthread (#2752) * Support bthread semaphore * Support bthread rwlock * Support contention profiler for semaphore and rwlock --- src/bthread/bthread.h | 66 +++++- src/bthread/butex.cpp | 8 +- src/bthread/butex.h | 5 + src/bthread/mutex.cpp | 87 ++++---- src/bthread/mutex.h | 14 +- src/bthread/rwlock.cpp | 368 +++++++++++++++++++++++++++++++++ src/bthread/rwlock.h | 214 ++++++++++++++++++++ src/bthread/semaphore.cpp | 173 ++++++++++++++++ src/bthread/types.h | 14 ++ src/bvar/collector.h | 8 +- test/BUILD.bazel | 2 + test/bthread_rwlock_unittest.cpp | 393 +++++++++++++++++++++++++++++++++++- test/bthread_semaphore_unittest.cpp | 208 +++++++++++++++++++ 13 files changed, 1493 insertions(+), 67 deletions(-) diff --git a/src/bthread/bthread.h b/src/bthread/bthread.h index 68734e05..8532b3b3 100644 --- a/src/bthread/bthread.h +++ b/src/bthread/bthread.h @@ -170,7 +170,7 @@ extern int bthread_usleep(uint64_t microseconds); // NOTE: mutexattr is not used in current mutex implementation. User shall // always pass a NULL attribute. extern int bthread_mutex_init(bthread_mutex_t* __restrict mutex, - const bthread_mutexattr_t* __restrict mutex_attr); + const bthread_mutexattr_t* __restrict attr); // Destroy `mutex'. extern int bthread_mutex_destroy(bthread_mutex_t* mutex); @@ -188,6 +188,13 @@ extern int bthread_mutex_timedlock(bthread_mutex_t* __restrict mutex, // Unlock `mutex'. extern int bthread_mutex_unlock(bthread_mutex_t* mutex); +extern int bthread_mutexattr_init(bthread_mutexattr_t* attr); + +// Disable the contention profile of the mutex. +extern int bthread_mutexattr_disable_csite(bthread_mutexattr_t* attr); + +extern int bthread_mutexattr_destroy(bthread_mutexattr_t* attr); + // ----------------------------------------------- // Functions for handling conditional variables. // ----------------------------------------------- @@ -241,9 +248,8 @@ extern int bthread_rwlock_rdlock(bthread_rwlock_t* rwlock); extern int bthread_rwlock_tryrdlock(bthread_rwlock_t* rwlock); // Try to acquire read lock for `rwlock' or return after specfied time. -extern int bthread_rwlock_timedrdlock( - bthread_rwlock_t* __restrict rwlock, - const struct timespec* __restrict abstime); +extern int bthread_rwlock_timedrdlock(bthread_rwlock_t* __restrict rwlock, + const struct timespec* __restrict abstime); // Acquire write lock for `rwlock'. extern int bthread_rwlock_wrlock(bthread_rwlock_t* rwlock); @@ -252,9 +258,8 @@ extern int bthread_rwlock_wrlock(bthread_rwlock_t* rwlock); extern int bthread_rwlock_trywrlock(bthread_rwlock_t* rwlock); // Try to acquire write lock for `rwlock' or return after specfied time. -extern int bthread_rwlock_timedwrlock( - bthread_rwlock_t* __restrict rwlock, - const struct timespec* __restrict abstime); +extern int bthread_rwlock_timedwrlock(bthread_rwlock_t* __restrict rwlock, + const struct timespec* __restrict abstime); // Unlock `rwlock'. extern int bthread_rwlock_unlock(bthread_rwlock_t* rwlock); @@ -277,6 +282,53 @@ extern int bthread_rwlockattr_getkind_np(const bthread_rwlockattr_t* attr, extern int bthread_rwlockattr_setkind_np(bthread_rwlockattr_t* attr, int pref); +// ------------------------------------------- +// Functions for handling semaphore. +// ------------------------------------------- + +// Initialize the semaphore referred to by `sem'. The value of the +// initialized semaphore shall be `value'. +// Return 0 on success, errno otherwise. +extern int bthread_sem_init(bthread_sem_t* sem, unsigned value); + +// Disable the contention profile of the semaphore referred to by `sem'. +extern int bthread_sem_disable_csite(bthread_sem_t* sem); + +// Destroy the semaphore indicated by `sem'. +// Return 0 on success, errno otherwise. +extern int bthread_sem_destroy(bthread_sem_t* semaphore); + +// Lock the semaphore referenced by `sem' by performing a semaphore +// lock operation on that semaphore. If the semaphore value is currently +// zero, then the calling (b)thread shall not return from the call to +// bthread_sema_wait() function until it locks the semaphore. +// Return 0 on success, errno otherwise. +extern int bthread_sem_wait(bthread_sem_t* sem); + +// Lock the semaphore referenced by `sem' as in the bthread_sem_wait() +// function. However, if the semaphore cannot be locked without waiting +// for another (b)thread to unlock the semaphore by performing a +// bthread_sem_post() function, this wait shall be terminated when the +// specified timeout expires. +// Return 0 on success, errno otherwise. +extern int bthread_sem_timedwait(bthread_sem_t* sem, const struct timespec* abstime); + +// Lock the semaphore referenced by `sem' only if the semaphore is +// currently not locked; that is, if the semaphore value is currently +// positive. Otherwise, it shall not lock the semaphore. +// Return 0 on success, errno otherwise. +extern int bthread_sem_trywait(bthread_sem_t* sem); + +// Unlock the semaphore referenced by `sem' by performing +// a semaphore unlock operation on that semaphore. +// Return 0 on success, errno otherwise. +extern int bthread_sem_post(bthread_sem_t* sem); + +// Unlock the semaphore referenced by `sem' by performing +// `n' semaphore unlock operation on that semaphore. +// Return 0 on success, errno otherwise. +extern int bthread_sem_post_n(bthread_sem_t* sem, size_t n); + // ---------------------------------------------------------------------- // Functions for handling barrier which is a new feature in 1003.1j-2000. diff --git a/src/bthread/butex.cpp b/src/bthread/butex.cpp index b603d89c..4a0f9c37 100644 --- a/src/bthread/butex.cpp +++ b/src/bthread/butex.cpp @@ -329,14 +329,14 @@ int butex_wake(void* arg, bool nosignal) { return 1; } -int butex_wake_all(void* arg, bool nosignal) { +int butex_wake_n(void* arg, size_t n, bool nosignal) { Butex* b = container_of(static_cast<butil::atomic<int>*>(arg), Butex, value); ButexWaiterList bthread_waiters; ButexWaiterList pthread_waiters; { BAIDU_SCOPED_LOCK(b->waiter_lock); - while (!b->waiters.empty()) { + for (size_t i = 0; (n == 0 || i < n) && !b->waiters.empty(); ++i) { ButexWaiter* bw = b->waiters.head()->value(); bw->RemoveFromList(); bw->container.store(NULL, butil::memory_order_relaxed); @@ -393,6 +393,10 @@ int butex_wake_all(void* arg, bool nosignal) { return nwakeup; } +int butex_wake_all(void* arg, bool nosignal) { + return butex_wake_n(arg, 0, nosignal); +} + int butex_wake_except(void* arg, bthread_t excluded_bthread) { Butex* b = container_of(static_cast<butil::atomic<int>*>(arg), Butex, value); diff --git a/src/bthread/butex.h b/src/bthread/butex.h index b40ec1e0..2786ef68 100644 --- a/src/bthread/butex.h +++ b/src/bthread/butex.h @@ -48,6 +48,11 @@ void butex_destroy(void* butex); // Returns # of threads woken up. int butex_wake(void* butex, bool nosignal = false); +// Wake up all threads waiting on |butex| if n is zero, +// Otherwise, wake up at most n thread waiting on |butex|. +// Returns # of threads woken up. +int butex_wake_n(void* butex, size_t n, bool nosignal = false); + // Wake up all threads waiting on |butex|. // Returns # of threads woken up. int butex_wake_all(void* butex, bool nosignal = false); diff --git a/src/bthread/mutex.cpp b/src/bthread/mutex.cpp index 403f6bb8..fa2f91c6 100644 --- a/src/bthread/mutex.cpp +++ b/src/bthread/mutex.cpp @@ -59,7 +59,7 @@ EXTERN_BAIDU_VOLATILE_THREAD_LOCAL(TaskGroup*, tls_task_group); const butil::debug::StackTrace ALLOW_UNUSED dummy_bt; // For controlling contentions collected per second. -static bvar::CollectorSpeedLimit g_cp_sl = BVAR_COLLECTOR_SPEED_LIMIT_INITIALIZER; +bvar::CollectorSpeedLimit g_cp_sl = BVAR_COLLECTOR_SPEED_LIMIT_INITIALIZER; const size_t MAX_CACHED_CONTENTIONS = 512; // Skip frames which are always same: the unlock function and submit_contention() @@ -267,7 +267,7 @@ void ContentionProfiler::flush_to_disk(bool ending) { // If contention profiler is on, this variable will be set with a valid // instance. NULL otherwise. -BAIDU_CACHELINE_ALIGNMENT static ContentionProfiler* g_cp = NULL; +BAIDU_CACHELINE_ALIGNMENT ContentionProfiler* g_cp = NULL; // Need this version to solve an issue that non-empty entries left by // previous contention profilers should be detected and overwritten. static uint64_t g_cp_version = 0; @@ -369,13 +369,11 @@ void ContentionProfilerStop() { LOG(ERROR) << "Contention profiler is not started!"; } -BUTIL_FORCE_INLINE bool -is_contention_site_valid(const bthread_contention_site_t& cs) { - return cs.sampling_range; +bool is_contention_site_valid(const bthread_contention_site_t& cs) { + return bvar::is_sampling_range_valid(cs.sampling_range); } -BUTIL_FORCE_INLINE void -make_contention_site_invalid(bthread_contention_site_t* cs) { +void make_contention_site_invalid(bthread_contention_site_t* cs) { cs->sampling_range = 0; } @@ -671,13 +669,13 @@ BUTIL_FORCE_INLINE int pthread_mutex_lock_impl(Mutex* mutex) { MutexAndContentionSite& entry = fast_alt.list[fast_alt.count++]; entry.mutex = mutex; csite = &entry.csite; - if (!sampling_range) { + if (!bvar::is_sampling_range_valid(sampling_range)) { make_contention_site_invalid(&entry.csite); return pthread_mutex_lock_internal(mutex); } } #endif - if (!sampling_range) { // don't sample + if (!bvar::is_sampling_range_valid(sampling_range)) { // don't sample return pthread_mutex_lock_internal(mutex); } // Lock and monitor the waiting time. @@ -873,13 +871,14 @@ void FastPthreadMutex::unlock() { extern "C" { int bthread_mutex_init(bthread_mutex_t* __restrict m, - const bthread_mutexattr_t* __restrict) { + const bthread_mutexattr_t* __restrict attr) { bthread::make_contention_site_invalid(&m->csite); m->butex = bthread::butex_create_checked<unsigned>(); if (!m->butex) { return ENOMEM; } *m->butex = 0; + m->enable_csite = NULL == attr ? true : attr->enable_csite; return 0; } @@ -900,35 +899,9 @@ int bthread_mutex_lock_contended(bthread_mutex_t* m) { return bthread::mutex_lock_contended_impl(m, NULL); } -int bthread_mutex_lock(bthread_mutex_t* m) { - bthread::MutexInternal* split = (bthread::MutexInternal*)m->butex; - if (!split->locked.exchange(1, butil::memory_order_acquire)) { - return 0; - } - // Don't sample when contention profiler is off. - if (!bthread::g_cp) { - return bthread::mutex_lock_contended_impl(m, NULL); - } - // Ask Collector if this (contended) locking should be sampled. - const size_t sampling_range = bvar::is_collectable(&bthread::g_cp_sl); - if (!sampling_range) { // Don't sample - return bthread::mutex_lock_contended_impl(m, NULL); - } - // Start sampling. - const int64_t start_ns = butil::cpuwide_time_ns(); - // NOTE: Don't modify m->csite outside lock since multiple threads are - // still contending with each other. - const int rc = bthread::mutex_lock_contended_impl(m, NULL); - if (!rc) { // Inside lock - m->csite.duration_ns = butil::cpuwide_time_ns() - start_ns; - m->csite.sampling_range = sampling_range; - } // else rare - return rc; -} - -int bthread_mutex_timedlock(bthread_mutex_t* __restrict m, - const struct timespec* __restrict abstime) { - bthread::MutexInternal* split = (bthread::MutexInternal*)m->butex; +static int bthread_mutex_lock_impl(bthread_mutex_t* __restrict m, + const struct timespec* __restrict abstime) { + auto split = (bthread::MutexInternal*)m->butex; if (!split->locked.exchange(1, butil::memory_order_acquire)) { return 0; } @@ -937,8 +910,9 @@ int bthread_mutex_timedlock(bthread_mutex_t* __restrict m, return bthread::mutex_lock_contended_impl(m, abstime); } // Ask Collector if this (contended) locking should be sampled. - const size_t sampling_range = bvar::is_collectable(&bthread::g_cp_sl); - if (!sampling_range) { // Don't sample + const size_t sampling_range = + m->enable_csite ? bvar::is_collectable(&bthread::g_cp_sl) : bvar::INVALID_SAMPLING_RANGE; + if (!bvar::is_sampling_range_valid(sampling_range)) { // Don't sample return bthread::mutex_lock_contended_impl(m, abstime); } // Start sampling. @@ -958,10 +932,20 @@ int bthread_mutex_timedlock(bthread_mutex_t* __restrict m, return rc; } +int bthread_mutex_lock(bthread_mutex_t* m) { + return bthread_mutex_lock_impl(m, NULL); +} + +int bthread_mutex_timedlock(bthread_mutex_t* __restrict m, + const struct timespec* __restrict abstime) { + return bthread_mutex_lock_impl(m, abstime); +} + int bthread_mutex_unlock(bthread_mutex_t* m) { - butil::atomic<unsigned>* whole = (butil::atomic<unsigned>*)m->butex; + auto whole = (butil::atomic<unsigned>*)m->butex; bthread_contention_site_t saved_csite = {0, 0}; - if (bthread::is_contention_site_valid(m->csite)) { + bool is_valid = bthread::is_contention_site_valid(m->csite); + if (is_valid) { saved_csite = m->csite; bthread::make_contention_site_invalid(&m->csite); } @@ -971,7 +955,7 @@ int bthread_mutex_unlock(bthread_mutex_t* m) { return 0; } // Wakeup one waiter - if (!bthread::is_contention_site_valid(saved_csite)) { + if (!is_valid) { bthread::butex_wake(whole); return 0; } @@ -983,6 +967,21 @@ int bthread_mutex_unlock(bthread_mutex_t* m) { return 0; } +int bthread_mutexattr_init(bthread_mutexattr_t* attr) { + attr->enable_csite = true; + return 0; +} + +int bthread_mutexattr_disable_csite(bthread_mutexattr_t* attr) { + attr->enable_csite = false; + return 0; +} + +int bthread_mutexattr_destroy(bthread_mutexattr_t* attr) { + attr->enable_csite = true; + return 0; +} + #ifndef NO_PTHREAD_MUTEX_HOOK int pthread_mutex_lock(pthread_mutex_t* __mutex) { return bthread::pthread_mutex_lock_impl(__mutex); diff --git a/src/bthread/mutex.h b/src/bthread/mutex.h index ad6d2e5c..f1d1029b 100644 --- a/src/bthread/mutex.h +++ b/src/bthread/mutex.h @@ -28,7 +28,7 @@ __BEGIN_DECLS extern int bthread_mutex_init(bthread_mutex_t* __restrict mutex, - const bthread_mutexattr_t* __restrict mutex_attr); + const bthread_mutexattr_t* __restrict attr); extern int bthread_mutex_destroy(bthread_mutex_t* mutex); extern int bthread_mutex_trylock(bthread_mutex_t* mutex); extern int bthread_mutex_lock(bthread_mutex_t* mutex); @@ -48,7 +48,8 @@ public: Mutex() { int ec = bthread_mutex_init(&_mutex, NULL); if (ec != 0) { - throw std::system_error(std::error_code(ec, std::system_category()), "Mutex constructor failed"); + throw std::system_error(std::error_code(ec, std::system_category()), + "Mutex constructor failed"); } } ~Mutex() { CHECK_EQ(0, bthread_mutex_destroy(&_mutex)); } @@ -56,11 +57,12 @@ public: void lock() { int ec = bthread_mutex_lock(&_mutex); if (ec != 0) { - throw std::system_error(std::error_code(ec, std::system_category()), "Mutex lock failed"); + throw std::system_error(std::error_code(ec, std::system_category()), + "Mutex lock failed"); } } - void unlock() { bthread_mutex_unlock(&_mutex); } - bool try_lock() { return !bthread_mutex_trylock(&_mutex); } + void unlock() { (bthread_mutex_unlock(&_mutex)); } + bool try_lock() { return 0 == bthread_mutex_trylock(&_mutex); } // TODO(chenzhangyi01): Complement interfaces for C++11 private: DISALLOW_COPY_AND_ASSIGN(Mutex); @@ -107,7 +109,7 @@ namespace std { template <> class lock_guard<bthread_mutex_t> { public: - explicit lock_guard(bthread_mutex_t & mutex) : _pmutex(&mutex) { + explicit lock_guard(bthread_mutex_t& mutex) : _pmutex(&mutex) { #if !defined(NDEBUG) const int rc = bthread_mutex_lock(_pmutex); if (rc) { diff --git a/src/bthread/rwlock.cpp b/src/bthread/rwlock.cpp new file mode 100644 index 00000000..e6356683 --- /dev/null +++ b/src/bthread/rwlock.cpp @@ -0,0 +1,368 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +#include "bvar/collector.h" +#include "bthread/rwlock.h" +#include "bthread/butex.h" + +namespace bthread { + +// A `bthread_rwlock_t' is a reader/writer mutual exclusion lock, +// which is a bthread implementation of golang RWMutex. +// The lock can be held by an arbitrary number of readers or a single writer. +// For details, see https://github.com/golang/go/blob/master/src/sync/rwmutex.go + +// Define in bthread/mutex.cpp +class ContentionProfiler; +extern ContentionProfiler* g_cp; +extern bvar::CollectorSpeedLimit g_cp_sl; +extern bool is_contention_site_valid(const bthread_contention_site_t& cs); +extern void make_contention_site_invalid(bthread_contention_site_t* cs); +extern void submit_contention(const bthread_contention_site_t& csite, int64_t now_ns); + +// It is enough for readers. If the reader exceeds this value, +// need to use `int64_t' instead of `int'. +const int RWLockMaxReaders = 1 << 30; + +// For reading. +static int rwlock_rdlock_impl(bthread_rwlock_t* __restrict rwlock, + const struct timespec* __restrict abstime) { + int reader_count = ((butil::atomic<int>*)&rwlock->reader_count) + ->fetch_add(1, butil::memory_order_acquire) + 1; + // Fast path. + if (reader_count >= 0) { + CHECK_LT(reader_count, RWLockMaxReaders); + return 0; + } + + // Slow path. + + // Don't sample when contention profiler is off. + if (NULL == bthread::g_cp) { + return bthread_sem_timedwait(&rwlock->reader_sema, abstime); + } + // Ask Collector if this (contended) locking should be sampled. + const size_t sampling_range = bvar::is_collectable(&bthread::g_cp_sl); + if (!bvar::is_sampling_range_valid(sampling_range)) { // Don't sample. + return bthread_sem_timedwait(&rwlock->reader_sema, abstime); + } + + // Sample. + const int64_t start_ns = butil::cpuwide_time_ns(); + int rc = bthread_sem_timedwait(&rwlock->reader_sema, abstime); + const int64_t end_ns = butil::cpuwide_time_ns(); + const bthread_contention_site_t csite{end_ns - start_ns, sampling_range}; + // Submit `csite' for each reader immediately after + // owning rdlock to avoid the contention of `csite'. + bthread::submit_contention(csite, end_ns); + + return rc; +} + +static inline int rwlock_rdlock(bthread_rwlock_t* rwlock) { + return rwlock_rdlock_impl(rwlock, NULL); +} + +static inline int rwlock_timedrdlock(bthread_rwlock_t* __restrict rwlock, + const struct timespec* __restrict abstime) { + return rwlock_rdlock_impl(rwlock, abstime); +} + +// Returns 0 if the lock was acquired, otherwise errno. +static inline int rwlock_tryrdlock(bthread_rwlock_t* rwlock) { + while (true) { + int reader_count = ((butil::atomic<int>*)&rwlock->reader_count) + ->load(butil::memory_order_relaxed); + if (reader_count < 0) { + // Failed to acquire the read lock because there is a writer. + return EBUSY; + } + if (((butil::atomic<int>*)&rwlock->reader_count) + ->compare_exchange_weak(reader_count, reader_count + 1, + butil::memory_order_acquire, + butil::memory_order_relaxed)) { + return 0; + } + } +} + +static inline int rwlock_unrdlock(bthread_rwlock_t* rwlock) { + int reader_count = ((butil::atomic<int>*)&rwlock->reader_count) + ->fetch_add(-1, butil::memory_order_relaxed) - 1; + // Fast path. + if (reader_count >= 0) { + return 0; + } + // Slow path. + + if (BAIDU_UNLIKELY(reader_count + 1 == 0 || reader_count + 1 == -RWLockMaxReaders)) { + CHECK(false) << "rwlock_unrdlock of unlocked rwlock"; + return EINVAL; + } + + // A writer is pending. + int reader_wait = ((butil::atomic<int>*)&rwlock->reader_wait) + ->fetch_add(-1, butil::memory_order_relaxed) - 1; + if (reader_wait != 0) { + return 0; + } + + // The last reader unblocks the writer. + + if (NULL == bthread::g_cp) { + bthread_sem_post(&rwlock->writer_sema); + return 0; + } + // Ask Collector if this (contended) locking should be sampled. + const size_t sampling_range = bvar::is_collectable(&bthread::g_cp_sl); + if (!sampling_range) { // Don't sample + bthread_sem_post(&rwlock->writer_sema); + return 0; + } + + // Sampling. + const int64_t start_ns = butil::cpuwide_time_ns(); + bthread_sem_post(&rwlock->writer_sema); + const int64_t end_ns = butil::cpuwide_time_ns(); + const bthread_contention_site_t csite{end_ns - start_ns, sampling_range}; + // Submit `csite' for each reader immediately after + // releasing rdlock to avoid the contention of `csite'. + bthread::submit_contention(csite, end_ns); + return 0; +} + +#define DO_CSITE_IF_NEED \ + do { \ + /* Don't sample when contention profiler is off. */ \ + if (NULL != bthread::g_cp) { \ + /* Ask Collector if this (contended) locking should be sampled. */ \ + sampling_range = bvar::is_collectable(&bthread::g_cp_sl); \ + start_ns = bvar::is_sampling_range_valid(sampling_range) ? \ + butil::cpuwide_time_ns() : -1; \ + } else { \ + start_ns = -1; \ + } \ + } while (0) + +#define SUBMIT_CSITE_IF_NEED \ + do { \ + if (ETIMEDOUT == rc && start_ns > 0) { \ + /* Failed to lock due to ETIMEDOUT, submit the elapse directly. */ \ + const int64_t end_ns = butil::cpuwide_time_ns(); \ + const bthread_contention_site_t csite{end_ns - start_ns, sampling_range}; \ + bthread::submit_contention(csite, end_ns); \ + } \ + } while (0) + +// For writing. +static inline int rwlock_wrlock_impl(bthread_rwlock_t* __restrict rwlock, + const struct timespec* __restrict abstime) { + // First, resolve competition with other writers. + int rc = bthread_mutex_trylock(&rwlock->write_queue_mutex); + size_t sampling_range = bvar::INVALID_SAMPLING_RANGE; + // -1: don't sample. + // 0: default value. + // > 0: Start time of sampling. + int64_t start_ns = 0; + if (0 != rc) { + DO_CSITE_IF_NEED; + + rc = bthread_mutex_timedlock(&rwlock->write_queue_mutex, abstime); + if (0 != rc) { + SUBMIT_CSITE_IF_NEED; + return rc; + } + } + + // Announce to readers there is a pending writer. + int reader_count = ((butil::atomic<int>*)&rwlock->reader_count) + ->fetch_add(-RWLockMaxReaders, butil::memory_order_release); + // Wait for active readers. + if (reader_count != 0 && + ((butil::atomic<int>*)&rwlock->reader_wait) + ->fetch_add(reader_count) + reader_count != 0) { + rc = bthread_sem_trywait(&rwlock->writer_sema); + if (0 != rc) { + if (0 == start_ns) { + DO_CSITE_IF_NEED; + } + + rc = bthread_sem_timedwait(&rwlock->writer_sema, abstime); + if (0 != rc) { + SUBMIT_CSITE_IF_NEED; + bthread_mutex_unlock(&rwlock->write_queue_mutex); + return rc; + } + } + } + if (start_ns > 0) { + rwlock->writer_csite.duration_ns = butil::cpuwide_time_ns() - start_ns; + rwlock->writer_csite.sampling_range = sampling_range; + } + rwlock->wlock_flag = true; + return 0; +} +#undef DO_CSITE_IF_NEED +#undef SUBMIT_CSITE_IF_NEED + +static inline int rwlock_wrlock(bthread_rwlock_t* rwlock) { + return rwlock_wrlock_impl(rwlock, NULL); +} + +static inline int rwlock_timedwrlock(bthread_rwlock_t* __restrict rwlock, + const struct timespec* __restrict abstime) { + return rwlock_wrlock_impl(rwlock, abstime); +} + +static inline int rwlock_trywrlock(bthread_rwlock_t* rwlock) { + int rc = bthread_mutex_trylock(&rwlock->write_queue_mutex); + if (0 != rc) { + return rc; + } + + int expected = 0; + if (!((butil::atomic<int>*)&rwlock->reader_count) + ->compare_exchange_strong(expected, -RWLockMaxReaders, + butil::memory_order_acquire, + butil::memory_order_relaxed)) { + // Failed to acquire the write lock because there are active readers. + bthread_mutex_unlock(&rwlock->write_queue_mutex); + return EBUSY; + } + rwlock->wlock_flag = true; + + return 0; +} + +static inline void rwlock_unwrlock_slow(bthread_rwlock_t* rwlock, int reader_count) { + bthread_sem_post_n(&rwlock->reader_sema, reader_count); + // Allow other writers to proceed. + bthread_mutex_unlock(&rwlock->write_queue_mutex); +} + +static inline int rwlock_unwrlock(bthread_rwlock_t* rwlock) { + rwlock->wlock_flag = false; + + // Announce to readers there is no active writer. + int reader_count = ((butil::atomic<int>*)&rwlock->reader_count)->fetch_add( + RWLockMaxReaders, butil::memory_order_release) + RWLockMaxReaders; + if (BAIDU_UNLIKELY(reader_count >= RWLockMaxReaders)) { + CHECK(false) << "rwlock_unwlock of unlocked rwlock"; + return EINVAL; + } + + bool is_valid = bthread::is_contention_site_valid(rwlock->writer_csite); + if (BAIDU_UNLIKELY(is_valid)) { + bthread_contention_site_t saved_csite = rwlock->writer_csite; + bthread::make_contention_site_invalid(&rwlock->writer_csite); + + const int64_t unlock_start_ns = butil::cpuwide_time_ns(); + rwlock_unwrlock_slow(rwlock, reader_count); + const int64_t unlock_end_ns = butil::cpuwide_time_ns(); + saved_csite.duration_ns += unlock_end_ns - unlock_start_ns; + bthread::submit_contention(saved_csite, unlock_end_ns); + } else { + rwlock_unwrlock_slow(rwlock, reader_count); + } + + return 0; +} + +static inline int rwlock_unlock(bthread_rwlock_t* rwlock) { + if (rwlock->wlock_flag) { + return rwlock_unwrlock(rwlock); + } else { + return rwlock_unrdlock(rwlock); + } +} + +} // namespace bthread + +__BEGIN_DECLS + +int bthread_rwlock_init(bthread_rwlock_t* __restrict rwlock, + const bthread_rwlockattr_t* __restrict) { + int rc = bthread_sem_init(&rwlock->reader_sema, 0); + if (BAIDU_UNLIKELY(0 != rc)) { + return rc; + } + bthread_sem_disable_csite(&rwlock->reader_sema); + rc = bthread_sem_init(&rwlock->writer_sema, 0); + if (BAIDU_UNLIKELY(0 != rc)) { + bthread_sem_destroy(&rwlock->reader_sema); + return rc; + } + bthread_sem_disable_csite(&rwlock->writer_sema); + + rwlock->reader_count = 0; + rwlock->reader_wait = 0; + rwlock->wlock_flag = false; + + bthread_mutexattr_t attr; + bthread_mutexattr_init(&attr); + bthread_mutexattr_disable_csite(&attr); + rc = bthread_mutex_init(&rwlock->write_queue_mutex, &attr); + if (BAIDU_UNLIKELY(0 != rc)) { + bthread_sem_destroy(&rwlock->reader_sema); + bthread_sem_destroy(&rwlock->writer_sema); + return rc; + } + bthread_mutexattr_destroy(&attr); + + bthread::make_contention_site_invalid(&rwlock->writer_csite); + + return 0; +} + +int bthread_rwlock_destroy(bthread_rwlock_t* rwlock) { + bthread_sem_destroy(&rwlock->reader_sema); + bthread_sem_destroy(&rwlock->writer_sema); + bthread_mutex_destroy(&rwlock->write_queue_mutex); + return 0; +} + +int bthread_rwlock_rdlock(bthread_rwlock_t* rwlock) { + return bthread::rwlock_rdlock(rwlock); +} + +int bthread_rwlock_tryrdlock(bthread_rwlock_t* rwlock) { + return bthread::rwlock_tryrdlock(rwlock); +} + +int bthread_rwlock_timedrdlock(bthread_rwlock_t* __restrict rwlock, + const struct timespec* __restrict abstime) { + return bthread::rwlock_timedrdlock(rwlock, abstime); +} + +int bthread_rwlock_wrlock(bthread_rwlock_t* rwlock) { + return bthread::rwlock_wrlock(rwlock); +} + +int bthread_rwlock_trywrlock(bthread_rwlock_t* rwlock) { + return bthread::rwlock_trywrlock(rwlock); +} + +int bthread_rwlock_timedwrlock(bthread_rwlock_t* __restrict rwlock, + const struct timespec* __restrict abstime) { + return bthread::rwlock_timedwrlock(rwlock, abstime); +} + +int bthread_rwlock_unlock(bthread_rwlock_t* rwlock) { + return bthread::rwlock_unlock(rwlock); +} + +__END_DECLS diff --git a/src/bthread/rwlock.h b/src/bthread/rwlock.h new file mode 100644 index 00000000..a2708b99 --- /dev/null +++ b/src/bthread/rwlock.h @@ -0,0 +1,214 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +#include "bthread/types.h" +#include "bthread/bthread.h" +#include "butil/scoped_lock.h" + +namespace bthread { + +// The C++ Wrapper of bthread_rwlock + +// NOTE: Not aligned to cacheline as the container of RWLock is practically aligned. + +class RWLock { +public: + typedef bthread_rwlock_t* native_handler_type; + + RWLock() { + int rc = bthread_rwlock_init(&_rwlock, NULL); + if (rc) { + throw std::system_error(std::error_code(rc, std::system_category()), + "RWLock constructor failed"); + } + } + + ~RWLock() { + CHECK_EQ(0, bthread_rwlock_destroy(&_rwlock)); + } + + DISALLOW_COPY_AND_ASSIGN(RWLock); + + native_handler_type native_handler() { return &_rwlock; } + + void rdlock() { + int rc = bthread_rwlock_rdlock(&_rwlock); + if (rc) { + throw std::system_error(std::error_code(rc, std::system_category()), + "RWLock rdlock failed"); + } + } + + bool try_rdlock() { + return 0 == bthread_rwlock_tryrdlock(&_rwlock); + } + + bool timed_rdlock(const struct timespec* abstime) { + return 0 == bthread_rwlock_timedrdlock(&_rwlock, abstime); + } + + void wrlock() { + int rc = bthread_rwlock_wrlock(&_rwlock); + if (rc) { + throw std::system_error(std::error_code(rc, std::system_category()), + "RWLock wrlock failed"); + } + } + + bool try_wrlock() { + return 0 == bthread_rwlock_trywrlock(&_rwlock); + } + + bool timed_wrlock(const struct timespec* abstime) { + return 0 == bthread_rwlock_timedwrlock(&_rwlock, abstime); + } + + void unlock() { bthread_rwlock_unlock(&_rwlock); } + +private: + bthread_rwlock_t _rwlock{}; +}; + +// Read lock guard of rwlock. +class RWLockRdGuard { +public: + explicit RWLockRdGuard(bthread_rwlock_t& rwlock) + : _rwlock(&rwlock) { +#if !defined(NDEBUG) + const int rc = bthread_rwlock_rdlock(_rwlock); + if (rc) { + LOG(FATAL) << "Fail to rdlock bthread_rwlock_t=" << _rwlock << ", " << berror(rc); + _rwlock = NULL; + } +#else + bthread_rwlock_rdlock(_rwlock); +#endif // NDEBUG + } + + explicit RWLockRdGuard(RWLock& rwlock) + : RWLockRdGuard(*rwlock.native_handler()) {} + + ~RWLockRdGuard() { +#ifndef NDEBUG + if (NULL != _rwlock) { + bthread_rwlock_unlock(_rwlock); + } +#else + bthread_rwlock_unlock(_rwlock); +#endif // NDEBUG + } + + DISALLOW_COPY_AND_ASSIGN(RWLockRdGuard); + +private: + bthread_rwlock_t* _rwlock; +}; + +// Write lock guard of rwlock. +class RWLockWrGuard { +public: + explicit RWLockWrGuard(bthread_rwlock_t& rwlock) + : _rwlock(&rwlock) { +#if !defined(NDEBUG) + const int rc = bthread_rwlock_wrlock(_rwlock); + if (rc) { + LOG(FATAL) << "Fail to wrlock bthread_rwlock_t=" << _rwlock << ", " << berror(rc); + _rwlock = NULL; + } +#else + bthread_rwlock_wrlock(_rwlock); +#endif // NDEBUG + } + + explicit RWLockWrGuard(RWLock& rwlock) + : RWLockWrGuard(*rwlock.native_handler()) {} + + ~RWLockWrGuard() { +#ifndef NDEBUG + if (NULL != _rwlock) { + bthread_rwlock_unlock(_rwlock); + } +#else + bthread_rwlock_unlock(_rwlock); +#endif // NDEBUG + } + + DISALLOW_COPY_AND_ASSIGN(RWLockWrGuard); + +private: + bthread_rwlock_t* _rwlock; +}; + +} // namespace bthread + +namespace std { + +template <> +class lock_guard<bthread_rwlock_t> { +public: + lock_guard(bthread_rwlock_t& rwlock, bool read) + :_rwlock(&rwlock), _read(read) { +#if !defined(NDEBUG) + int rc; + if (_read) { + rc = bthread_rwlock_rdlock(_rwlock); + } else { + rc = bthread_rwlock_wrlock(_rwlock); + } + if (rc) { + LOG(FATAL) << "Fail to lock bthread_rwlock_t=" << _rwlock << ", " << berror(rc); + _rwlock = NULL; + } +#else + if (_read) { + bthread_rwlock_rdlock(_rwlock); + } else { + bthread_rwlock_wrlock(_rwlock); + } +#endif // NDEBUG + } + + ~lock_guard() { +#ifndef NDEBUG + if (NULL != _rwlock) { + bthread_rwlock_unlock(_rwlock); + } +#else + bthread_rwlock_unlock(_rwlock); +#endif // NDEBUG + } + + DISALLOW_COPY_AND_ASSIGN(lock_guard); + +private: + bthread_rwlock_t* _rwlock; + bool _read; +}; + +template <> +class lock_guard<bthread::RWLock> { +public: + lock_guard(bthread::RWLock& rwlock, bool read) + :_rwlock_guard(*rwlock.native_handler(), read) {} + + DISALLOW_COPY_AND_ASSIGN(lock_guard); + +private: + std::lock_guard<bthread_rwlock_t> _rwlock_guard; +}; + +} // namespace std diff --git a/src/bthread/semaphore.cpp b/src/bthread/semaphore.cpp new file mode 100644 index 00000000..3813a8a6 --- /dev/null +++ b/src/bthread/semaphore.cpp @@ -0,0 +1,173 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +#include "butil/memory/scope_guard.h" +#include "bvar/collector.h" +#include "bthread/bthread.h" +#include "bthread/butex.h" + +namespace bthread { + +// Define in bthread/mutex.cpp +class ContentionProfiler; +extern ContentionProfiler* g_cp; +extern bvar::CollectorSpeedLimit g_cp_sl; +extern bool is_contention_site_valid(const bthread_contention_site_t& cs); +extern void make_contention_site_invalid(bthread_contention_site_t* cs); +extern void submit_contention(const bthread_contention_site_t& csite, int64_t now_ns); + +static inline int bthread_sem_trywait(bthread_sem_t* sema) { + auto whole = (butil::atomic<unsigned>*)sema->butex; + while (true) { + unsigned num = whole->load(butil::memory_order_relaxed); + if (num == 0) { + return EAGAIN; + } + if (whole->compare_exchange_weak(num, num - 1, + butil::memory_order_acquire, + butil::memory_order_relaxed)) { + return 0; + } + } + +} + +static int bthread_sem_wait_impl(bthread_sem_t* sem, const struct timespec* abstime) { + bool queue_lifo = false; + bool first_wait = true; + size_t sampling_range = bvar::INVALID_SAMPLING_RANGE; + // -1: don't sample. + // 0: default value. + // > 0: Start time of sampling. + int64_t start_ns = 0; + auto whole = (butil::atomic<unsigned>*)sem->butex; + while (true) { + unsigned num = whole->load(butil::memory_order_relaxed); + if (num > 0) { + if (whole->compare_exchange_weak(num, num - 1, + butil::memory_order_acquire, + butil::memory_order_relaxed)) { + if (start_ns > 0) { + const int64_t end_ns = butil::cpuwide_time_ns(); + const bthread_contention_site_t csite{end_ns - start_ns, sampling_range}; + bthread::submit_contention(csite, end_ns); + } + + return 0; + } + } + // Don't sample when contention profiler is off. + if (NULL != bthread::g_cp && start_ns == 0 && sem->enable_csite && + !bvar::is_sampling_range_valid(sampling_range)) { + // Ask Collector if this (contended) sem waiting should be sampled. + sampling_range = bvar::is_collectable(&bthread::g_cp_sl); + start_ns = bvar::is_sampling_range_valid(sampling_range) ? + butil::cpuwide_time_ns() : -1; + } else { + start_ns = -1; + } + if (bthread::butex_wait(sem->butex, 0, abstime, queue_lifo) < 0 && + errno != EWOULDBLOCK && errno != EINTR) { + // A sema should ignore interruptions in general since + // user code is unlikely to check the return value. + if (ETIMEDOUT == errno && start_ns > 0) { + // Failed to lock due to ETIMEDOUT, submit the elapse directly. + const int64_t end_ns = butil::cpuwide_time_ns(); + const bthread_contention_site_t csite{end_ns - start_ns, sampling_range}; + bthread::submit_contention(csite, end_ns); + } + + return errno; + } + // Ignore EWOULDBLOCK and EINTR. + if (first_wait && 0 == errno) { + first_wait = false; + } + if (!first_wait) { + // Normally, bthreads are queued in FIFO order. But competing with new + // arriving bthreads over sema, a woken up bthread has good chances of + // losing. Because new arriving bthreads are already running on CPU and + // there can be lots of them. In such case, for fairness, to avoid + // starvation, it is queued at the head of the waiter queue. + queue_lifo = true; + } + } +} + +static inline int bthread_sem_post(bthread_sem_t* sem, size_t num) { + if (num > 0) { + unsigned n = ((butil::atomic<unsigned>*)sem->butex) + ->fetch_add(num, butil::memory_order_relaxed); + const size_t sampling_range = NULL != bthread::g_cp && sem->enable_csite ? + bvar::is_collectable(&bthread::g_cp_sl) : bvar::INVALID_SAMPLING_RANGE; + const int64_t start_ns = bvar::is_sampling_range_valid(sampling_range) ? + butil::cpuwide_time_ns() : -1; + bthread::butex_wake_n(sem->butex, n); + if (start_ns > 0) { + const int64_t end_ns = butil::cpuwide_time_ns(); + const bthread_contention_site_t csite{end_ns - start_ns, sampling_range}; + bthread::submit_contention(csite, end_ns); + } + } + return 0; +} + +} // namespace bthread + +__BEGIN_DECLS + +int bthread_sem_init(bthread_sem_t* sem, unsigned value) { + sem->butex = bthread::butex_create_checked<unsigned>(); + if (!sem->butex) { + return ENOMEM; + } + *sem->butex = value; + sem->enable_csite = true; + return 0; +} + +int bthread_sem_disable_csite(bthread_sem_t* sema) { + sema->enable_csite = false; + return 0; +} + +int bthread_sem_destroy(bthread_sem_t* semaphore) { + bthread::butex_destroy(semaphore->butex); + return 0; +} + +int bthread_sem_trywait(bthread_sem_t* sem) { + return bthread::bthread_sem_trywait(sem); +} + +int bthread_sem_wait(bthread_sem_t* sem) { + return bthread::bthread_sem_wait_impl(sem, NULL); +} + +int bthread_sem_timedwait(bthread_sem_t* sem, const struct timespec* abstime) { + return bthread::bthread_sem_wait_impl(sem, abstime); +} + +int bthread_sem_post(bthread_sem_t* sem) { + return bthread::bthread_sem_post(sem, 1); +} + +int bthread_sem_post_n(bthread_sem_t* sem, size_t n) { + return bthread::bthread_sem_post(sem, n); +} + +__END_DECLS \ No newline at end of file diff --git a/src/bthread/types.h b/src/bthread/types.h index 0aad64c4..d177ea72 100644 --- a/src/bthread/types.h +++ b/src/bthread/types.h @@ -172,9 +172,11 @@ typedef struct bthread_mutex_t { #endif unsigned* butex; bthread_contention_site_t csite; + bool enable_csite; } bthread_mutex_t; typedef struct { + bool enable_csite; } bthread_mutexattr_t; typedef struct bthread_cond_t { @@ -190,6 +192,18 @@ typedef struct { } bthread_condattr_t; typedef struct { + unsigned* butex; + bool enable_csite; +} bthread_sem_t; + +typedef struct { + bthread_sem_t reader_sema; // Semaphore for readers to wait for completing writers. + bthread_sem_t writer_sema; // Semaphore for writers to wait for completing readers. + int reader_count; // Number of pending readers. + int reader_wait; // Number of departing readers. + bool wlock_flag; // Flag used to indicate that a write lock has been hold. + bthread_mutex_t write_queue_mutex; // Held if there are pending writers. + bthread_contention_site_t writer_csite; } bthread_rwlock_t; typedef struct { diff --git a/src/bvar/collector.h b/src/bvar/collector.h index 56db7214..a603d96b 100644 --- a/src/bvar/collector.h +++ b/src/bvar/collector.h @@ -28,6 +28,12 @@ namespace bvar { +static const size_t INVALID_SAMPLING_RANGE = 0; + +inline bool is_sampling_range_valid(size_t sampling_range) { + return sampling_range > 0; +} + // Containing the context for limiting sampling speed. struct CollectorSpeedLimit { // [Managed by Collector, don't change!] @@ -115,7 +121,7 @@ inline size_t is_collectable(CollectorSpeedLimit* speed_limit) { const size_t sampling_range = speed_limit->sampling_range; // fast_rand is faster than fast_rand_in if ((butil::fast_rand() & (COLLECTOR_SAMPLING_BASE - 1)) >= sampling_range) { - return 0; + return INVALID_SAMPLING_RANGE; } return sampling_range; } diff --git a/test/BUILD.bazel b/test/BUILD.bazel index d9af2ae7..b8c3b3d2 100644 --- a/test/BUILD.bazel +++ b/test/BUILD.bazel @@ -226,6 +226,8 @@ cc_test( # glog CHECK die with a fatal error "bthread_key_unittest.cpp", "bthread_butex_multi_tag_unittest.cpp", + "bthread_rwlock_unittest.cpp", + "bthread_semaphore_unittest.cpp", ], ), copts = COPTS, diff --git a/test/bthread_rwlock_unittest.cpp b/test/bthread_rwlock_unittest.cpp index 60cbfbe2..3318956b 100644 --- a/test/bthread_rwlock_unittest.cpp +++ b/test/bthread_rwlock_unittest.cpp @@ -15,15 +15,394 @@ // specific language governing permissions and limitations // under the License. -#include <stdlib.h> -#include <unistd.h> -#include <stdio.h> -#include <signal.h> #include <gtest/gtest.h> -#include "butil/time.h" -#include "butil/macros.h" +#include <butil/gperftools_profiler.h> +#include <bthread/rwlock.h> namespace { + +long start_time = butil::cpuwide_time_ms(); +int c = 0; +void* rdlocker(void* arg) { + auto rw = (bthread_rwlock_t*)arg; + bthread_rwlock_rdlock(rw); + LOG(INFO) <<butil::string_printf("[%" PRIu64 "] I'm rdlocker, %d, %" PRId64 "ms\n", + pthread_numeric_id(), ++c, + butil::cpuwide_time_ms() - start_time); + bthread_usleep(10000); + bthread_rwlock_unlock(rw); + return NULL; +} + +void* wrlocker(void* arg) { + auto rw = (bthread_rwlock_t*)arg; + bthread_rwlock_wrlock(rw); + LOG(INFO) << butil::string_printf("[%" PRIu64 "] I'm wrlocker, %d, %" PRId64 "ms\n", + pthread_numeric_id(), ++c, + butil::cpuwide_time_ms() - start_time); + bthread_usleep(10000); + bthread_rwlock_unlock(rw); + return NULL; +} + +TEST(RWLockTest, sanity) { + bthread_rwlock_t rw; + ASSERT_EQ(0, bthread_rwlock_init(&rw, NULL)); + ASSERT_EQ(0, bthread_rwlock_rdlock(&rw)); + ASSERT_EQ(0, bthread_rwlock_unlock(&rw)); + ASSERT_EQ(0, bthread_rwlock_wrlock(&rw)); + ASSERT_EQ(0, bthread_rwlock_unlock(&rw)); + + bthread_t rdth; + bthread_t rwth; + ASSERT_EQ(0, bthread_start_urgent(&rdth, NULL, rdlocker, &rw)); + ASSERT_EQ(0, bthread_start_urgent(&rwth, NULL, wrlocker, &rw)); + + ASSERT_EQ(0, bthread_join(rdth, NULL)); + ASSERT_EQ(0, bthread_join(rwth, NULL)); + ASSERT_EQ(0, bthread_rwlock_destroy(&rw)); +} + +TEST(RWLockTest, used_in_pthread) { + bthread_rwlock_t rw; + ASSERT_EQ(0, bthread_rwlock_init(&rw, NULL)); + pthread_t rdth[8]; + pthread_t wrth[8]; + for (size_t i = 0; i < ARRAY_SIZE(rdth); ++i) { + ASSERT_EQ(0, pthread_create(&rdth[i], NULL, rdlocker, &rw)); + } + for (size_t i = 0; i < ARRAY_SIZE(wrth); ++i) { + ASSERT_EQ(0, pthread_create(&wrth[i], NULL, wrlocker, &rw)); + } + + for (size_t i = 0; i < ARRAY_SIZE(rdth); ++i) { + pthread_join(rdth[i], NULL); + } + for (size_t i = 0; i < ARRAY_SIZE(rdth); ++i) { + pthread_join(wrth[i], NULL); + } + ASSERT_EQ(0, bthread_rwlock_destroy(&rw)); +} + +void* do_timedrdlock(void *arg) { + struct timespec t = { -2, 0 }; + EXPECT_EQ(ETIMEDOUT, bthread_rwlock_timedrdlock((bthread_rwlock_t*)arg, &t)); + return NULL; +} + +void* do_timedwrlock(void *arg) { + struct timespec t = { -2, 0 }; + EXPECT_EQ(ETIMEDOUT, bthread_rwlock_timedwrlock((bthread_rwlock_t*)arg, &t)); + LOG(INFO) << 10; + return NULL; +} + +TEST(RWLockTest, timedlock) { + bthread_rwlock_t rw; + ASSERT_EQ(0, bthread_rwlock_init(&rw, NULL)); + + ASSERT_EQ(0, bthread_rwlock_rdlock(&rw)); + bthread_t th; + ASSERT_EQ(0, bthread_start_urgent(&th, NULL, do_timedwrlock, &rw)); + ASSERT_EQ(0, bthread_join(th, NULL)); + ASSERT_EQ(0, bthread_rwlock_unlock(&rw)); + + ASSERT_EQ(0, bthread_rwlock_wrlock(&rw)); + ASSERT_EQ(0, bthread_start_urgent(&th, NULL, do_timedwrlock, &rw)); + ASSERT_EQ(0, bthread_join(th, NULL)); + ASSERT_EQ(0, bthread_start_urgent(&th, NULL, do_timedrdlock, &rw)); + ASSERT_EQ(0, bthread_join(th, NULL)); + ASSERT_EQ(0, bthread_rwlock_unlock(&rw)); + ASSERT_EQ(0, bthread_rwlock_destroy(&rw)); +} + +struct TrylockArgs { + bthread_rwlock_t* rw; + int rc; +}; + +void* do_tryrdlock(void *arg) { + auto trylock_args = (TrylockArgs*)arg; + EXPECT_EQ(trylock_args->rc, bthread_rwlock_tryrdlock(trylock_args->rw)); + if (0 != trylock_args->rc) { + return NULL; + } + EXPECT_EQ(trylock_args->rc, bthread_rwlock_unlock(trylock_args->rw)); + return NULL; +} + +void* do_trywrlock(void *arg) { + auto trylock_args = (TrylockArgs*)arg; + EXPECT_EQ(trylock_args->rc, bthread_rwlock_trywrlock(trylock_args->rw)); + if (0 != trylock_args->rc) { + return NULL; + } + EXPECT_EQ(trylock_args->rc, bthread_rwlock_unlock(trylock_args->rw)); + return NULL; +} + +TEST(RWLockTest, trylock) { + bthread_rwlock_t rw; + ASSERT_EQ(0, bthread_rwlock_init(&rw, NULL)); + + ASSERT_EQ(0, bthread_rwlock_tryrdlock(&rw)); + ASSERT_EQ(0, bthread_rwlock_unlock(&rw)); + ASSERT_EQ(0, bthread_rwlock_rdlock(&rw)); + bthread_t th; + TrylockArgs args{&rw, 0}; + ASSERT_EQ(0, bthread_start_urgent(&th, NULL, do_tryrdlock, &args)); + ASSERT_EQ(0, bthread_join(th, NULL)); + args.rc = EBUSY; + ASSERT_EQ(0, bthread_start_urgent(&th, NULL, do_trywrlock, &args)); + ASSERT_EQ(0, bthread_join(th, NULL)); + ASSERT_EQ(0, bthread_rwlock_unlock(&rw)); + + ASSERT_EQ(0, bthread_rwlock_trywrlock(&rw)); + ASSERT_EQ(0, bthread_rwlock_unlock(&rw)); + ASSERT_EQ(0, bthread_rwlock_wrlock(&rw)); + ASSERT_EQ(0, bthread_start_urgent(&th, NULL, do_tryrdlock, &args)); + ASSERT_EQ(0, bthread_join(th, NULL)); + ASSERT_EQ(0, bthread_start_urgent(&th, NULL, do_trywrlock, &args)); + ASSERT_EQ(0, bthread_join(th, NULL)); + ASSERT_EQ(0, bthread_rwlock_unlock(&rw)); + + ASSERT_EQ(0, bthread_rwlock_destroy(&rw)); +} + +TEST(RWLockTest, cpp_wrapper) { + bthread::RWLock rw; + ASSERT_TRUE(rw.try_rdlock()); + rw.unlock(); + rw.rdlock(); + rw.unlock(); + ASSERT_TRUE(rw.try_wrlock()); + rw.unlock(); + rw.wrlock(); + rw.unlock(); + + struct timespec t = { -2, 0 }; + ASSERT_TRUE(rw.timed_rdlock(&t)); + rw.unlock(); + ASSERT_TRUE(rw.timed_wrlock(&t)); + rw.unlock(); + + { + bthread::RWLockRdGuard guard(rw); + } + { + bthread::RWLockWrGuard guard(rw); + } + { + std::lock_guard<bthread::RWLock> guard(rw, true); + } + { + std::lock_guard<bthread::RWLock> guard(rw, false); + } + { + std::lock_guard<bthread_rwlock_t> guard(*rw.native_handler(), true); + } + { + std::lock_guard<bthread_rwlock_t> guard(*rw.native_handler(), false); + } +} + +bool g_started = false; +bool g_stopped = false; + +void read_op(bthread_rwlock_t* rw, int64_t sleep_us) { + ASSERT_EQ(0, bthread_rwlock_rdlock(rw)); + if (0 != sleep_us) { + bthread_usleep(sleep_us); + } + ASSERT_EQ(0, bthread_rwlock_unlock(rw)); +} + +void write_op(bthread_rwlock_t* rw, int64_t sleep_us) { + ASSERT_EQ(0, bthread_rwlock_wrlock(rw)); + if (0 != sleep_us) { + bthread_usleep(sleep_us); + } + ASSERT_EQ(0, bthread_rwlock_unlock(rw)); +} + +typedef void (*OP)(bthread_rwlock_t* rw, int64_t sleep_us); + +struct MixThreadArg { + bthread_rwlock_t* rw; + OP op; +}; + +void* loop_until_stopped(void* arg) { + auto args = (MixThreadArg*)arg; + while (!g_stopped) { + args->op(args->rw, 20); + } + return NULL; +} + +TEST(RWLockTest, mix_thread_types) { + g_stopped = false; + bthread_rwlock_t rw; + ASSERT_EQ(0, bthread_rwlock_init(&rw, NULL)); + + const int N = 16; + const int M = N * 2; + pthread_t pthreads[N]; + bthread_t bthreads[M]; + // reserve enough workers for test. This is a must since we have + // BTHREAD_ATTR_PTHREAD bthreads which may cause deadlocks (the + // bhtread_usleep below can't be scheduled and g_stopped is never + // true, thus loop_until_stopped spins forever) + bthread_setconcurrency(M); + std::vector<MixThreadArg> args; + args.reserve(N + M); + for (int i = 0; i < N; ++i) { + if (i % 2 == 0) { + args.push_back({&rw, read_op}); + } else { + args.push_back({&rw, write_op}); + } + ASSERT_EQ(0, pthread_create(&pthreads[i], NULL, loop_until_stopped, &args.back())); + } + + for (int i = 0; i < M; ++i) { + if (i % 2 == 0) { + args.push_back({&rw, read_op}); + } else { + args.push_back({&rw, write_op}); + } + const bthread_attr_t* attr = i % 2 ? NULL : &BTHREAD_ATTR_PTHREAD; + ASSERT_EQ(0, bthread_start_urgent(&bthreads[i], attr, loop_until_stopped, &args.back())); + } + bthread_usleep(1000L * 1000); + g_stopped = true; + for (int i = 0; i < M; ++i) { + bthread_join(bthreads[i], NULL); + } + for (int i = 0; i < N; ++i) { + pthread_join(pthreads[i], NULL); + } + + ASSERT_EQ(0, bthread_rwlock_destroy(&rw)); +} + +struct BAIDU_CACHELINE_ALIGNMENT PerfArgs { + bthread_rwlock_t* rw; + int64_t counter; + int64_t elapse_ns; + bool ready; + + PerfArgs() : rw(NULL), counter(0), elapse_ns(0), ready(false) {} +}; + +template <bool Reader> +void* add_with_mutex(void* void_arg) { + auto args = (PerfArgs*)void_arg; + args->ready = true; + butil::Timer t; + while (!g_stopped) { + if (g_started) { + break; + } + bthread_usleep(10); + } + t.start(); + while (!g_stopped) { + if (Reader) { + bthread_rwlock_rdlock(args->rw); + } else { + bthread_rwlock_wrlock(args->rw); + } + ++args->counter; + bthread_rwlock_unlock(args->rw); + } + t.stop(); + args->elapse_ns = t.n_elapsed(); + return NULL; +} + +int g_prof_name_counter = 0; + +template <typename ThreadId, typename ThreadCreateFn, typename ThreadJoinFn> +void PerfTest(uint32_t writer_ratio, ThreadId* /*dummy*/, int thread_num, + const ThreadCreateFn& create_fn, const ThreadJoinFn& join_fn) { + ASSERT_LE(writer_ratio, 100U); + + g_started = false; + g_stopped = false; + bthread_setconcurrency(thread_num + 4); + std::vector<ThreadId> threads(thread_num); + std::vector<PerfArgs> args(thread_num); + bthread_rwlock_t rw; + bthread_rwlock_init(&rw, NULL); + int writer_num = thread_num * writer_ratio / 100; + int reader_num = thread_num - writer_num; + for (int i = 0; i < thread_num; ++i) { + args[i].rw = &rw; + if (i < writer_num) { + ASSERT_EQ(0, create_fn(&threads[i], NULL, add_with_mutex<false>, &args[i])); + } else { + ASSERT_EQ(0, create_fn(&threads[i], NULL, add_with_mutex<true>, &args[i])); + } + } + while (true) { + bool all_ready = true; + for (int i = 0; i < thread_num; ++i) { + if (!args[i].ready) { + all_ready = false; + break; + } + } + if (all_ready) { + break; + } + usleep(1000); + } + g_started = true; + char prof_name[32]; + snprintf(prof_name, sizeof(prof_name), "bthread_rwlock_perf_%d.prof", ++g_prof_name_counter); + ProfilerStart(prof_name); + usleep(1000 * 1000); + ProfilerStop(); + g_stopped = true; + + int64_t read_wait_time = 0; + int64_t read_count = 0; + int64_t write_wait_time = 0; + int64_t write_count = 0; + for (int i = 0; i < thread_num; ++i) { + ASSERT_EQ(0, join_fn(threads[i], NULL)); + if (i < writer_num) { + write_wait_time += args[i].elapse_ns; + write_count += args[i].counter; + } else { + read_wait_time += args[i].elapse_ns; + read_count += args[i].counter; + } + } + LOG(INFO) << "bthread rwlock in " + << ((void*)create_fn == (void*)pthread_create ? "pthread" : "bthread") + << " thread_num=" << thread_num + << " writer_ratio=" << writer_ratio + << " reader_num=" << reader_num + << " read_count=" << read_count + << " read_average_time=" << (read_count == 0 ? 0 : read_wait_time / (double)read_count) + << " writer_num=" << writer_num + << " write_count=" << write_count + << " write_average_time=" << (write_count == 0 ? 0 : write_wait_time / (double)write_count); +} + +TEST(RWLockTest, performance) { + const int thread_num = 12; + PerfTest(0, (pthread_t*)NULL, thread_num, pthread_create, pthread_join); + PerfTest(0, (bthread_t*)NULL, thread_num, bthread_start_background, bthread_join); + PerfTest(10, (pthread_t*)NULL, thread_num, pthread_create, pthread_join); + PerfTest(20, (bthread_t*)NULL, thread_num, bthread_start_background, bthread_join); + PerfTest(100, (pthread_t*)NULL, thread_num, pthread_create, pthread_join); + PerfTest(100, (bthread_t*)NULL, thread_num, bthread_start_background, bthread_join); +} + + void* read_thread(void* arg) { const size_t N = 10000; #ifdef CHECK_RWLOCK @@ -49,7 +428,7 @@ void* write_thread(void*) { return NULL; } -TEST(RWLockTest, rdlock_performance) { +TEST(RWLockTest, pthread_rdlock_performance) { #ifdef CHECK_RWLOCK pthread_rwlock_t lock1; ASSERT_EQ(0, pthread_rwlock_init(&lock1, NULL)); diff --git a/test/bthread_semaphore_unittest.cpp b/test/bthread_semaphore_unittest.cpp new file mode 100644 index 00000000..cc598a4c --- /dev/null +++ b/test/bthread_semaphore_unittest.cpp @@ -0,0 +1,208 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +#include <gtest/gtest.h> +#include <bthread/bthread.h> + +namespace { + +const size_t SEM_COUNT = 10000; + +void* sem_waiter(void* arg) { + bthread_usleep(10 * 1000); + auto sem = (bthread_sem_t*)arg; + for (size_t i = 0; i < SEM_COUNT; ++i) { + bthread_sem_wait(sem); + } + return NULL; +} + +void* sem_poster(void* arg) { + bthread_usleep(10 * 1000); + auto sem = (bthread_sem_t*)arg; + for (size_t i = 0; i < SEM_COUNT; ++i) { + bthread_sem_post(sem); + } + return NULL; +} + +TEST(SemaphoreTest, sanity) { + bthread_sem_t sem; + ASSERT_EQ(0, bthread_sem_init(&sem, 1)); + ASSERT_EQ(0, bthread_sem_wait(&sem)); + ASSERT_EQ(0, bthread_sem_post(&sem)); + ASSERT_EQ(0, bthread_sem_wait(&sem)); + + bthread_t waiter_th; + bthread_t poster_th; + ASSERT_EQ(0, bthread_start_urgent(&waiter_th, NULL, sem_waiter, &sem)); + ASSERT_EQ(0, bthread_start_urgent(&poster_th, NULL, sem_poster, &sem)); + ASSERT_EQ(0, bthread_join(waiter_th, NULL)); + ASSERT_EQ(0, bthread_join(poster_th, NULL)); + + ASSERT_EQ(0, bthread_sem_destroy(&sem)); +} + + + +TEST(SemaphoreTest, used_in_pthread) { + bthread_sem_t sem; + ASSERT_EQ(0, bthread_sem_init(&sem, 0)); + + pthread_t waiter_th[8]; + pthread_t poster_th[8]; + for (auto& th : waiter_th) { + ASSERT_EQ(0, pthread_create(&th, NULL, sem_waiter, &sem)); + } + for (auto& th : poster_th) { + ASSERT_EQ(0, pthread_create(&th, NULL, sem_poster, &sem)); + } + for (auto& th : waiter_th) { + pthread_join(th, NULL); + } + for (auto& th : poster_th) { + pthread_join(th, NULL); + } + + ASSERT_EQ(0, bthread_sem_destroy(&sem)); +} + +void* do_timedwait(void *arg) { + struct timespec t = { -2, 0 }; + EXPECT_EQ(ETIMEDOUT, bthread_sem_timedwait((bthread_sem_t*)arg, &t)); + return NULL; +} + +TEST(SemaphoreTest, timedwait) { + bthread_sem_t sem; + ASSERT_EQ(0, bthread_sem_init(&sem, 0)); + bthread_t th; + ASSERT_EQ(0, bthread_start_urgent(&th, NULL, do_timedwait, &sem)); + ASSERT_EQ(0, bthread_join(th, NULL)); + ASSERT_EQ(0, bthread_sem_destroy(&sem)); +} + + +struct TryWaitArgs { + bthread_sem_t* sem; + int rc; +}; + +void* do_trywait(void *arg) { + auto trylock_args = (TryWaitArgs*)arg; + EXPECT_EQ(trylock_args->rc, bthread_sem_trywait(trylock_args->sem)); + return NULL; +} + +TEST(SemaphoreTest, trywait) { + bthread_sem_t sem; + ASSERT_EQ(0, bthread_sem_init(&sem, 0)); + + ASSERT_EQ(EAGAIN, bthread_sem_trywait(&sem)); + ASSERT_EQ(0, bthread_sem_post(&sem)); + ASSERT_EQ(0, bthread_sem_trywait(&sem)); + ASSERT_EQ(EAGAIN, bthread_sem_trywait(&sem)); + + ASSERT_EQ(0, bthread_sem_post(&sem)); + bthread_t th; + TryWaitArgs args{ &sem, 0}; + ASSERT_EQ(0, bthread_start_urgent(&th, NULL, do_trywait, &args)); + ASSERT_EQ(0, bthread_join(th, NULL)); + args.rc = EAGAIN; + ASSERT_EQ(0, bthread_start_urgent(&th, NULL, do_trywait, &args)); + ASSERT_EQ(0, bthread_join(th, NULL)); + + ASSERT_EQ(0, bthread_sem_destroy(&sem)); +} + +bool g_started = false; +bool g_stopped = false; + +void wait_op(bthread_sem_t* sem, int64_t sleep_us) { + ASSERT_EQ(0, bthread_sem_wait(sem)); + if (0 != sleep_us) { + bthread_usleep(sleep_us); + } +} + +void post_op(bthread_sem_t* rw, int64_t sleep_us) { + ASSERT_EQ(0, bthread_sem_post(rw)); + if (0 != sleep_us) { + bthread_usleep(sleep_us); + } +} + +typedef void (*OP)(bthread_sem_t* sem, int64_t sleep_us); + +struct MixThreadArg { + bthread_sem_t* sem; + OP op; +}; + +void* loop_until_stopped(void* arg) { + auto args = (MixThreadArg*)arg; + for (size_t i = 0; i < SEM_COUNT; ++i) { + args->op(args->sem, 20); + } + return NULL; +} + +TEST(SemaphoreTest, mix_thread_types) { + g_stopped = false; + bthread_sem_t sem; + ASSERT_EQ(0, bthread_sem_init(&sem, 0)); + + const int N = 16; + const int M = N * 2; + pthread_t pthreads[N]; + bthread_t bthreads[M]; + // reserve enough workers for test. This is a must since we have + // BTHREAD_ATTR_PTHREAD bthreads which may cause deadlocks (the + // bhtread_usleep below can't be scheduled and g_stopped is never + // true, thus loop_until_stopped spins forever) + bthread_setconcurrency(M); + std::vector<MixThreadArg> args; + args.reserve(N + M); + for (int i = 0; i < N; ++i) { + if (i % 2 == 0) { + args.push_back({ &sem, wait_op }); + } else { + args.push_back({ &sem, post_op }); + } + ASSERT_EQ(0, pthread_create(&pthreads[i], NULL, loop_until_stopped, &args.back())); + } + + for (int i = 0; i < M; ++i) { + if (i % 2 == 0) { + args.push_back({ &sem, wait_op }); + } else { + args.push_back({ &sem, post_op }); + } + const bthread_attr_t* attr = i % 2 ? NULL : &BTHREAD_ATTR_PTHREAD; + ASSERT_EQ(0, bthread_start_urgent(&bthreads[i], attr, loop_until_stopped, &args.back())); + } + for (bthread_t bthread : bthreads) { + bthread_join(bthread, NULL); + } + for (pthread_t pthread : pthreads) { + pthread_join(pthread, NULL); + } + + ASSERT_EQ(0, bthread_sem_destroy(&sem)); +} + +} // namespace --------------------------------------------------------------------- To unsubscribe, e-mail: dev-unsubscr...@brpc.apache.org For additional commands, e-mail: dev-h...@brpc.apache.org