This is an automated email from the ASF dual-hosted git repository.

wwbmmm pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/brpc.git


The following commit(s) were added to refs/heads/master by this push:
     new f17048e1 Support semaphore and rwlock for bthread (#2752)
f17048e1 is described below

commit f17048e18011d842bcaf3f37e50dc45307573a35
Author: Bright Chen <chenguangmin...@foxmail.com>
AuthorDate: Thu Sep 26 10:49:23 2024 +0800

    Support semaphore and rwlock for bthread (#2752)
    
    * Support bthread semaphore
    
    * Support bthread rwlock
    
    * Support contention profiler for semaphore and rwlock
---
 src/bthread/bthread.h               |  66 +++++-
 src/bthread/butex.cpp               |   8 +-
 src/bthread/butex.h                 |   5 +
 src/bthread/mutex.cpp               |  87 ++++----
 src/bthread/mutex.h                 |  14 +-
 src/bthread/rwlock.cpp              | 368 +++++++++++++++++++++++++++++++++
 src/bthread/rwlock.h                | 214 ++++++++++++++++++++
 src/bthread/semaphore.cpp           | 173 ++++++++++++++++
 src/bthread/types.h                 |  14 ++
 src/bvar/collector.h                |   8 +-
 test/BUILD.bazel                    |   2 +
 test/bthread_rwlock_unittest.cpp    | 393 +++++++++++++++++++++++++++++++++++-
 test/bthread_semaphore_unittest.cpp | 208 +++++++++++++++++++
 13 files changed, 1493 insertions(+), 67 deletions(-)

diff --git a/src/bthread/bthread.h b/src/bthread/bthread.h
index 68734e05..8532b3b3 100644
--- a/src/bthread/bthread.h
+++ b/src/bthread/bthread.h
@@ -170,7 +170,7 @@ extern int bthread_usleep(uint64_t microseconds);
 // NOTE: mutexattr is not used in current mutex implementation. User shall
 //       always pass a NULL attribute.
 extern int bthread_mutex_init(bthread_mutex_t* __restrict mutex,
-                              const bthread_mutexattr_t* __restrict 
mutex_attr);
+                              const bthread_mutexattr_t* __restrict attr);
 
 // Destroy `mutex'.
 extern int bthread_mutex_destroy(bthread_mutex_t* mutex);
@@ -188,6 +188,13 @@ extern int bthread_mutex_timedlock(bthread_mutex_t* 
__restrict mutex,
 // Unlock `mutex'.
 extern int bthread_mutex_unlock(bthread_mutex_t* mutex);
 
+extern int bthread_mutexattr_init(bthread_mutexattr_t* attr);
+
+// Disable the contention profile of the mutex.
+extern int bthread_mutexattr_disable_csite(bthread_mutexattr_t* attr);
+
+extern int bthread_mutexattr_destroy(bthread_mutexattr_t* attr);
+
 // -----------------------------------------------
 // Functions for handling conditional variables.
 // -----------------------------------------------
@@ -241,9 +248,8 @@ extern int bthread_rwlock_rdlock(bthread_rwlock_t* rwlock);
 extern int bthread_rwlock_tryrdlock(bthread_rwlock_t* rwlock);
 
 // Try to acquire read lock for `rwlock' or return after specfied time.
-extern int bthread_rwlock_timedrdlock(
-    bthread_rwlock_t* __restrict rwlock,
-    const struct timespec* __restrict abstime);
+extern int bthread_rwlock_timedrdlock(bthread_rwlock_t* __restrict rwlock,
+                                      const struct timespec* __restrict 
abstime);
 
 // Acquire write lock for `rwlock'.
 extern int bthread_rwlock_wrlock(bthread_rwlock_t* rwlock);
@@ -252,9 +258,8 @@ extern int bthread_rwlock_wrlock(bthread_rwlock_t* rwlock);
 extern int bthread_rwlock_trywrlock(bthread_rwlock_t* rwlock);
 
 // Try to acquire write lock for `rwlock' or return after specfied time.
-extern int bthread_rwlock_timedwrlock(
-    bthread_rwlock_t* __restrict rwlock,
-    const struct timespec* __restrict abstime);
+extern int bthread_rwlock_timedwrlock(bthread_rwlock_t* __restrict rwlock,
+                                      const struct timespec* __restrict 
abstime);
 
 // Unlock `rwlock'.
 extern int bthread_rwlock_unlock(bthread_rwlock_t* rwlock);
@@ -277,6 +282,53 @@ extern int bthread_rwlockattr_getkind_np(const 
bthread_rwlockattr_t* attr,
 extern int bthread_rwlockattr_setkind_np(bthread_rwlockattr_t* attr,
                                          int pref);
 
+// -------------------------------------------
+// Functions for handling semaphore.
+// -------------------------------------------
+
+// Initialize the semaphore referred to by `sem'. The value of the
+// initialized semaphore shall be `value'.
+// Return 0 on success, errno otherwise.
+extern int bthread_sem_init(bthread_sem_t* sem, unsigned value);
+
+// Disable the contention profile of the semaphore  referred to by `sem'.
+extern int bthread_sem_disable_csite(bthread_sem_t* sem);
+
+// Destroy the semaphore indicated by `sem'.
+// Return 0 on success, errno otherwise.
+extern int bthread_sem_destroy(bthread_sem_t* semaphore);
+
+// Lock the semaphore referenced by `sem' by performing a semaphore
+// lock operation on that semaphore. If the semaphore value is currently
+// zero, then the calling (b)thread shall not return from the call to
+// bthread_sema_wait() function until it locks the semaphore.
+// Return 0 on success, errno otherwise.
+extern int bthread_sem_wait(bthread_sem_t* sem);
+
+// Lock the semaphore referenced by `sem' as in the bthread_sem_wait()
+// function. However, if the semaphore cannot be locked without waiting
+// for another (b)thread to unlock the semaphore by performing a
+// bthread_sem_post() function, this wait shall be terminated when the
+// specified timeout expires.
+// Return 0 on success, errno otherwise.
+extern int bthread_sem_timedwait(bthread_sem_t* sem, const struct timespec* 
abstime);
+
+// Lock the semaphore referenced by `sem' only if the semaphore is
+// currently not locked; that is, if the semaphore value is currently
+// positive. Otherwise, it shall not lock the semaphore.
+// Return 0 on success, errno otherwise.
+extern int bthread_sem_trywait(bthread_sem_t* sem);
+
+// Unlock the semaphore referenced by `sem' by performing
+// a semaphore unlock operation on that semaphore.
+// Return 0 on success, errno otherwise.
+extern int bthread_sem_post(bthread_sem_t* sem);
+
+// Unlock the semaphore referenced by `sem' by performing
+// `n' semaphore unlock operation on that semaphore.
+// Return 0 on success, errno otherwise.
+extern int bthread_sem_post_n(bthread_sem_t* sem, size_t n);
+
 
 // ----------------------------------------------------------------------
 // Functions for handling barrier which is a new feature in 1003.1j-2000.
diff --git a/src/bthread/butex.cpp b/src/bthread/butex.cpp
index b603d89c..4a0f9c37 100644
--- a/src/bthread/butex.cpp
+++ b/src/bthread/butex.cpp
@@ -329,14 +329,14 @@ int butex_wake(void* arg, bool nosignal) {
     return 1;
 }
 
-int butex_wake_all(void* arg, bool nosignal) {
+int butex_wake_n(void* arg, size_t n, bool nosignal) {
     Butex* b = container_of(static_cast<butil::atomic<int>*>(arg), Butex, 
value);
 
     ButexWaiterList bthread_waiters;
     ButexWaiterList pthread_waiters;
     {
         BAIDU_SCOPED_LOCK(b->waiter_lock);
-        while (!b->waiters.empty()) {
+        for (size_t i = 0; (n == 0 || i < n) && !b->waiters.empty(); ++i) {
             ButexWaiter* bw = b->waiters.head()->value();
             bw->RemoveFromList();
             bw->container.store(NULL, butil::memory_order_relaxed);
@@ -393,6 +393,10 @@ int butex_wake_all(void* arg, bool nosignal) {
     return nwakeup;
 }
 
+int butex_wake_all(void* arg, bool nosignal) {
+    return butex_wake_n(arg, 0, nosignal);
+}
+
 int butex_wake_except(void* arg, bthread_t excluded_bthread) {
     Butex* b = container_of(static_cast<butil::atomic<int>*>(arg), Butex, 
value);
 
diff --git a/src/bthread/butex.h b/src/bthread/butex.h
index b40ec1e0..2786ef68 100644
--- a/src/bthread/butex.h
+++ b/src/bthread/butex.h
@@ -48,6 +48,11 @@ void butex_destroy(void* butex);
 // Returns # of threads woken up.
 int butex_wake(void* butex, bool nosignal = false);
 
+// Wake up all threads waiting on |butex| if n is zero,
+// Otherwise, wake up at most n thread waiting on |butex|.
+// Returns # of threads woken up.
+int butex_wake_n(void* butex, size_t n, bool nosignal = false);
+
 // Wake up all threads waiting on |butex|.
 // Returns # of threads woken up.
 int butex_wake_all(void* butex, bool nosignal = false);
diff --git a/src/bthread/mutex.cpp b/src/bthread/mutex.cpp
index 403f6bb8..fa2f91c6 100644
--- a/src/bthread/mutex.cpp
+++ b/src/bthread/mutex.cpp
@@ -59,7 +59,7 @@ EXTERN_BAIDU_VOLATILE_THREAD_LOCAL(TaskGroup*, 
tls_task_group);
 const butil::debug::StackTrace ALLOW_UNUSED dummy_bt;
 
 // For controlling contentions collected per second.
-static bvar::CollectorSpeedLimit g_cp_sl = 
BVAR_COLLECTOR_SPEED_LIMIT_INITIALIZER;
+bvar::CollectorSpeedLimit g_cp_sl = BVAR_COLLECTOR_SPEED_LIMIT_INITIALIZER;
 
 const size_t MAX_CACHED_CONTENTIONS = 512;
 // Skip frames which are always same: the unlock function and 
submit_contention()
@@ -267,7 +267,7 @@ void ContentionProfiler::flush_to_disk(bool ending) {
 
 // If contention profiler is on, this variable will be set with a valid
 // instance. NULL otherwise.
-BAIDU_CACHELINE_ALIGNMENT static ContentionProfiler* g_cp = NULL;
+BAIDU_CACHELINE_ALIGNMENT ContentionProfiler* g_cp = NULL;
 // Need this version to solve an issue that non-empty entries left by
 // previous contention profilers should be detected and overwritten.
 static uint64_t g_cp_version = 0;
@@ -369,13 +369,11 @@ void ContentionProfilerStop() {
     LOG(ERROR) << "Contention profiler is not started!";
 }
 
-BUTIL_FORCE_INLINE bool
-is_contention_site_valid(const bthread_contention_site_t& cs) {
-    return cs.sampling_range;
+bool is_contention_site_valid(const bthread_contention_site_t& cs) {
+    return bvar::is_sampling_range_valid(cs.sampling_range);
 }
 
-BUTIL_FORCE_INLINE void
-make_contention_site_invalid(bthread_contention_site_t* cs) {
+void make_contention_site_invalid(bthread_contention_site_t* cs) {
     cs->sampling_range = 0;
 }
 
@@ -671,13 +669,13 @@ BUTIL_FORCE_INLINE int pthread_mutex_lock_impl(Mutex* 
mutex) {
         MutexAndContentionSite& entry = fast_alt.list[fast_alt.count++];
         entry.mutex = mutex;
         csite = &entry.csite;
-        if (!sampling_range) {
+        if (!bvar::is_sampling_range_valid(sampling_range)) {
             make_contention_site_invalid(&entry.csite);
             return pthread_mutex_lock_internal(mutex);
         }
     }
 #endif
-    if (!sampling_range) {  // don't sample
+    if (!bvar::is_sampling_range_valid(sampling_range)) {  // don't sample
         return pthread_mutex_lock_internal(mutex);
     }
     // Lock and monitor the waiting time.
@@ -873,13 +871,14 @@ void FastPthreadMutex::unlock() {
 extern "C" {
 
 int bthread_mutex_init(bthread_mutex_t* __restrict m,
-                       const bthread_mutexattr_t* __restrict) {
+                       const bthread_mutexattr_t* __restrict attr) {
     bthread::make_contention_site_invalid(&m->csite);
     m->butex = bthread::butex_create_checked<unsigned>();
     if (!m->butex) {
         return ENOMEM;
     }
     *m->butex = 0;
+    m->enable_csite = NULL == attr ? true : attr->enable_csite;
     return 0;
 }
 
@@ -900,35 +899,9 @@ int bthread_mutex_lock_contended(bthread_mutex_t* m) {
     return bthread::mutex_lock_contended_impl(m, NULL);
 }
 
-int bthread_mutex_lock(bthread_mutex_t* m) {
-    bthread::MutexInternal* split = (bthread::MutexInternal*)m->butex;
-    if (!split->locked.exchange(1, butil::memory_order_acquire)) {
-        return 0;
-    }
-    // Don't sample when contention profiler is off.
-    if (!bthread::g_cp) {
-        return bthread::mutex_lock_contended_impl(m, NULL);
-    }
-    // Ask Collector if this (contended) locking should be sampled.
-    const size_t sampling_range = bvar::is_collectable(&bthread::g_cp_sl);
-    if (!sampling_range) { // Don't sample
-        return bthread::mutex_lock_contended_impl(m, NULL);
-    }
-    // Start sampling.
-    const int64_t start_ns = butil::cpuwide_time_ns();
-    // NOTE: Don't modify m->csite outside lock since multiple threads are
-    // still contending with each other.
-    const int rc = bthread::mutex_lock_contended_impl(m, NULL);
-    if (!rc) { // Inside lock
-        m->csite.duration_ns = butil::cpuwide_time_ns() - start_ns;
-        m->csite.sampling_range = sampling_range;
-    } // else rare
-    return rc;
-}
-
-int bthread_mutex_timedlock(bthread_mutex_t* __restrict m,
-                            const struct timespec* __restrict abstime) {
-    bthread::MutexInternal* split = (bthread::MutexInternal*)m->butex;
+static int bthread_mutex_lock_impl(bthread_mutex_t* __restrict m,
+                                   const struct timespec* __restrict abstime) {
+    auto split = (bthread::MutexInternal*)m->butex;
     if (!split->locked.exchange(1, butil::memory_order_acquire)) {
         return 0;
     }
@@ -937,8 +910,9 @@ int bthread_mutex_timedlock(bthread_mutex_t* __restrict m,
         return bthread::mutex_lock_contended_impl(m, abstime);
     }
     // Ask Collector if this (contended) locking should be sampled.
-    const size_t sampling_range = bvar::is_collectable(&bthread::g_cp_sl);
-    if (!sampling_range) { // Don't sample
+    const size_t sampling_range =
+        m->enable_csite ? bvar::is_collectable(&bthread::g_cp_sl) : 
bvar::INVALID_SAMPLING_RANGE;
+    if (!bvar::is_sampling_range_valid(sampling_range)) { // Don't sample
         return bthread::mutex_lock_contended_impl(m, abstime);
     }
     // Start sampling.
@@ -958,10 +932,20 @@ int bthread_mutex_timedlock(bthread_mutex_t* __restrict m,
     return rc;
 }
 
+int bthread_mutex_lock(bthread_mutex_t* m) {
+    return bthread_mutex_lock_impl(m, NULL);
+}
+
+int bthread_mutex_timedlock(bthread_mutex_t* __restrict m,
+                            const struct timespec* __restrict abstime) {
+    return bthread_mutex_lock_impl(m, abstime);
+}
+
 int bthread_mutex_unlock(bthread_mutex_t* m) {
-    butil::atomic<unsigned>* whole = (butil::atomic<unsigned>*)m->butex;
+    auto whole = (butil::atomic<unsigned>*)m->butex;
     bthread_contention_site_t saved_csite = {0, 0};
-    if (bthread::is_contention_site_valid(m->csite)) {
+    bool is_valid = bthread::is_contention_site_valid(m->csite);
+    if (is_valid) {
         saved_csite = m->csite;
         bthread::make_contention_site_invalid(&m->csite);
     }
@@ -971,7 +955,7 @@ int bthread_mutex_unlock(bthread_mutex_t* m) {
         return 0;
     }
     // Wakeup one waiter
-    if (!bthread::is_contention_site_valid(saved_csite)) {
+    if (!is_valid) {
         bthread::butex_wake(whole);
         return 0;
     }
@@ -983,6 +967,21 @@ int bthread_mutex_unlock(bthread_mutex_t* m) {
     return 0;
 }
 
+int bthread_mutexattr_init(bthread_mutexattr_t* attr) {
+    attr->enable_csite = true;
+    return 0;
+}
+
+int bthread_mutexattr_disable_csite(bthread_mutexattr_t* attr) {
+    attr->enable_csite = false;
+    return 0;
+}
+
+int bthread_mutexattr_destroy(bthread_mutexattr_t* attr) {
+    attr->enable_csite = true;
+    return 0;
+}
+
 #ifndef NO_PTHREAD_MUTEX_HOOK
 int pthread_mutex_lock(pthread_mutex_t* __mutex) {
     return bthread::pthread_mutex_lock_impl(__mutex);
diff --git a/src/bthread/mutex.h b/src/bthread/mutex.h
index ad6d2e5c..f1d1029b 100644
--- a/src/bthread/mutex.h
+++ b/src/bthread/mutex.h
@@ -28,7 +28,7 @@
 
 __BEGIN_DECLS
 extern int bthread_mutex_init(bthread_mutex_t* __restrict mutex,
-                              const bthread_mutexattr_t* __restrict 
mutex_attr);
+                              const bthread_mutexattr_t* __restrict attr);
 extern int bthread_mutex_destroy(bthread_mutex_t* mutex);
 extern int bthread_mutex_trylock(bthread_mutex_t* mutex);
 extern int bthread_mutex_lock(bthread_mutex_t* mutex);
@@ -48,7 +48,8 @@ public:
     Mutex() {
         int ec = bthread_mutex_init(&_mutex, NULL);
         if (ec != 0) {
-            throw std::system_error(std::error_code(ec, 
std::system_category()), "Mutex constructor failed");
+            throw std::system_error(std::error_code(ec, 
std::system_category()),
+                                    "Mutex constructor failed");
         }
     }
     ~Mutex() { CHECK_EQ(0, bthread_mutex_destroy(&_mutex)); }
@@ -56,11 +57,12 @@ public:
     void lock() {
         int ec = bthread_mutex_lock(&_mutex);
         if (ec != 0) {
-            throw std::system_error(std::error_code(ec, 
std::system_category()), "Mutex lock failed");
+            throw std::system_error(std::error_code(ec, 
std::system_category()),
+                                    "Mutex lock failed");
         }
     }
-    void unlock() { bthread_mutex_unlock(&_mutex); }
-    bool try_lock() { return !bthread_mutex_trylock(&_mutex); }
+    void unlock() { (bthread_mutex_unlock(&_mutex)); }
+    bool try_lock() { return 0 == bthread_mutex_trylock(&_mutex); }
     // TODO(chenzhangyi01): Complement interfaces for C++11
 private:
     DISALLOW_COPY_AND_ASSIGN(Mutex);
@@ -107,7 +109,7 @@ namespace std {
 
 template <> class lock_guard<bthread_mutex_t> {
 public:
-    explicit lock_guard(bthread_mutex_t & mutex) : _pmutex(&mutex) {
+    explicit lock_guard(bthread_mutex_t& mutex) : _pmutex(&mutex) {
 #if !defined(NDEBUG)
         const int rc = bthread_mutex_lock(_pmutex);
         if (rc) {
diff --git a/src/bthread/rwlock.cpp b/src/bthread/rwlock.cpp
new file mode 100644
index 00000000..e6356683
--- /dev/null
+++ b/src/bthread/rwlock.cpp
@@ -0,0 +1,368 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#include "bvar/collector.h"
+#include "bthread/rwlock.h"
+#include "bthread/butex.h"
+
+namespace bthread {
+
+// A `bthread_rwlock_t' is a reader/writer mutual exclusion lock,
+// which is a bthread implementation of golang RWMutex.
+// The lock can be held by an arbitrary number of readers or a single writer.
+// For details, see 
https://github.com/golang/go/blob/master/src/sync/rwmutex.go
+
+// Define in bthread/mutex.cpp
+class ContentionProfiler;
+extern ContentionProfiler* g_cp;
+extern bvar::CollectorSpeedLimit g_cp_sl;
+extern bool is_contention_site_valid(const bthread_contention_site_t& cs);
+extern void make_contention_site_invalid(bthread_contention_site_t* cs);
+extern void submit_contention(const bthread_contention_site_t& csite, int64_t 
now_ns);
+
+// It is enough for readers. If the reader exceeds this value,
+// need to use `int64_t' instead of `int'.
+const int RWLockMaxReaders = 1 << 30;
+
+// For reading.
+static int rwlock_rdlock_impl(bthread_rwlock_t* __restrict rwlock,
+                              const struct timespec* __restrict abstime) {
+    int reader_count = ((butil::atomic<int>*)&rwlock->reader_count)
+        ->fetch_add(1, butil::memory_order_acquire) + 1;
+    // Fast path.
+    if (reader_count >= 0) {
+        CHECK_LT(reader_count, RWLockMaxReaders);
+        return 0;
+    }
+
+    // Slow path.
+
+    // Don't sample when contention profiler is off.
+    if (NULL == bthread::g_cp) {
+        return bthread_sem_timedwait(&rwlock->reader_sema, abstime);
+    }
+    // Ask Collector if this (contended) locking should be sampled.
+    const size_t sampling_range = bvar::is_collectable(&bthread::g_cp_sl);
+    if (!bvar::is_sampling_range_valid(sampling_range)) { // Don't sample.
+        return bthread_sem_timedwait(&rwlock->reader_sema, abstime);
+    }
+
+    // Sample.
+    const int64_t start_ns = butil::cpuwide_time_ns();
+    int rc = bthread_sem_timedwait(&rwlock->reader_sema, abstime);
+    const int64_t end_ns = butil::cpuwide_time_ns();
+    const bthread_contention_site_t csite{end_ns - start_ns, sampling_range};
+    // Submit `csite' for each reader immediately after
+    // owning rdlock to avoid the contention of `csite'.
+    bthread::submit_contention(csite, end_ns);
+
+    return rc;
+}
+
+static inline int rwlock_rdlock(bthread_rwlock_t* rwlock) {
+    return rwlock_rdlock_impl(rwlock, NULL);
+}
+
+static inline int rwlock_timedrdlock(bthread_rwlock_t* __restrict rwlock,
+                                     const struct timespec* __restrict 
abstime) {
+    return rwlock_rdlock_impl(rwlock, abstime);
+}
+
+// Returns 0 if the lock was acquired, otherwise errno.
+static  inline int rwlock_tryrdlock(bthread_rwlock_t* rwlock) {
+    while (true) {
+        int reader_count = ((butil::atomic<int>*)&rwlock->reader_count)
+            ->load(butil::memory_order_relaxed);
+        if (reader_count < 0) {
+            // Failed to acquire the read lock because there is a writer.
+            return EBUSY;
+        }
+        if (((butil::atomic<int>*)&rwlock->reader_count)
+                ->compare_exchange_weak(reader_count, reader_count + 1,
+                                        butil::memory_order_acquire,
+                                        butil::memory_order_relaxed)) {
+            return 0;
+        }
+    }
+}
+
+static inline int rwlock_unrdlock(bthread_rwlock_t* rwlock) {
+    int reader_count = ((butil::atomic<int>*)&rwlock->reader_count)
+        ->fetch_add(-1, butil::memory_order_relaxed) - 1;
+    // Fast path.
+    if (reader_count >= 0) {
+        return 0;
+    }
+    // Slow path.
+
+    if (BAIDU_UNLIKELY(reader_count + 1 == 0 || reader_count + 1 == 
-RWLockMaxReaders)) {
+        CHECK(false) << "rwlock_unrdlock of unlocked rwlock";
+        return EINVAL;
+    }
+
+    // A writer is pending.
+    int reader_wait = ((butil::atomic<int>*)&rwlock->reader_wait)
+        ->fetch_add(-1, butil::memory_order_relaxed) - 1;
+    if (reader_wait != 0) {
+        return 0;
+    }
+
+    // The last reader unblocks the writer.
+
+    if (NULL == bthread::g_cp) {
+        bthread_sem_post(&rwlock->writer_sema);
+        return 0;
+    }
+    // Ask Collector if this (contended) locking should be sampled.
+    const size_t sampling_range = bvar::is_collectable(&bthread::g_cp_sl);
+    if (!sampling_range) { // Don't sample
+        bthread_sem_post(&rwlock->writer_sema);
+        return 0;
+    }
+
+    // Sampling.
+    const int64_t start_ns = butil::cpuwide_time_ns();
+    bthread_sem_post(&rwlock->writer_sema);
+    const int64_t end_ns = butil::cpuwide_time_ns();
+    const bthread_contention_site_t csite{end_ns - start_ns, sampling_range};
+    // Submit `csite' for each reader immediately after
+    // releasing rdlock to avoid the contention of `csite'.
+    bthread::submit_contention(csite, end_ns);
+    return 0;
+}
+
+#define DO_CSITE_IF_NEED                                                       
       \
+    do {                                                                       
       \
+        /* Don't sample when contention profiler is off. */                    
       \
+        if (NULL != bthread::g_cp) {                                           
       \
+            /* Ask Collector if this (contended) locking should be sampled. */ 
       \
+            sampling_range = bvar::is_collectable(&bthread::g_cp_sl);          
       \
+            start_ns = bvar::is_sampling_range_valid(sampling_range) ?         
       \
+                butil::cpuwide_time_ns() : -1;                                 
       \
+        } else {                                                               
       \
+            start_ns = -1;                                                     
       \
+        }                                                                      
       \
+    } while (0)
+
+#define SUBMIT_CSITE_IF_NEED                                                   
       \
+    do {                                                                       
       \
+        if (ETIMEDOUT == rc && start_ns > 0) {                                 
       \
+            /* Failed to lock due to ETIMEDOUT, submit the elapse directly. */ 
       \
+            const int64_t end_ns = butil::cpuwide_time_ns();                   
       \
+            const bthread_contention_site_t csite{end_ns - start_ns, 
sampling_range}; \
+            bthread::submit_contention(csite, end_ns);                         
       \
+        }                                                                      
       \
+    } while (0)
+
+// For writing.
+static inline int rwlock_wrlock_impl(bthread_rwlock_t* __restrict rwlock,
+                                     const struct timespec* __restrict 
abstime) {
+    // First, resolve competition with other writers.
+    int rc = bthread_mutex_trylock(&rwlock->write_queue_mutex);
+    size_t sampling_range = bvar::INVALID_SAMPLING_RANGE;
+    // -1: don't sample.
+    // 0: default value.
+    // > 0: Start time of sampling.
+    int64_t start_ns = 0;
+    if (0 != rc) {
+        DO_CSITE_IF_NEED;
+
+        rc = bthread_mutex_timedlock(&rwlock->write_queue_mutex, abstime);
+        if (0 != rc) {
+            SUBMIT_CSITE_IF_NEED;
+            return rc;
+        }
+    }
+
+    // Announce to readers there is a pending writer.
+    int reader_count = ((butil::atomic<int>*)&rwlock->reader_count)
+        ->fetch_add(-RWLockMaxReaders, butil::memory_order_release);
+    // Wait for active readers.
+    if (reader_count != 0 &&
+        ((butil::atomic<int>*)&rwlock->reader_wait)
+            ->fetch_add(reader_count) + reader_count != 0) {
+        rc = bthread_sem_trywait(&rwlock->writer_sema);
+        if (0 != rc) {
+            if (0 == start_ns) {
+                DO_CSITE_IF_NEED;
+            }
+
+            rc = bthread_sem_timedwait(&rwlock->writer_sema, abstime);
+            if (0 != rc) {
+                SUBMIT_CSITE_IF_NEED;
+                bthread_mutex_unlock(&rwlock->write_queue_mutex);
+                return rc;
+            }
+        }
+    }
+    if (start_ns > 0) {
+        rwlock->writer_csite.duration_ns = butil::cpuwide_time_ns() - start_ns;
+        rwlock->writer_csite.sampling_range = sampling_range;
+    }
+    rwlock->wlock_flag = true;
+    return 0;
+}
+#undef DO_CSITE_IF_NEED
+#undef SUBMIT_CSITE_IF_NEED
+
+static inline int rwlock_wrlock(bthread_rwlock_t* rwlock) {
+    return rwlock_wrlock_impl(rwlock, NULL);
+}
+
+static inline int rwlock_timedwrlock(bthread_rwlock_t* __restrict rwlock,
+                                     const struct timespec* __restrict 
abstime) {
+    return rwlock_wrlock_impl(rwlock, abstime);
+}
+
+static inline int rwlock_trywrlock(bthread_rwlock_t* rwlock) {
+    int rc = bthread_mutex_trylock(&rwlock->write_queue_mutex);
+    if (0 != rc) {
+        return rc;
+    }
+
+    int expected = 0;
+    if (!((butil::atomic<int>*)&rwlock->reader_count)
+            ->compare_exchange_strong(expected, -RWLockMaxReaders,
+                                      butil::memory_order_acquire,
+                                      butil::memory_order_relaxed)) {
+        // Failed to acquire the write lock because there are active readers.
+        bthread_mutex_unlock(&rwlock->write_queue_mutex);
+        return EBUSY;
+    }
+    rwlock->wlock_flag = true;
+
+    return 0;
+}
+
+static inline void rwlock_unwrlock_slow(bthread_rwlock_t* rwlock, int 
reader_count) {
+    bthread_sem_post_n(&rwlock->reader_sema, reader_count);
+    // Allow other writers to proceed.
+    bthread_mutex_unlock(&rwlock->write_queue_mutex);
+}
+
+static inline int rwlock_unwrlock(bthread_rwlock_t* rwlock) {
+    rwlock->wlock_flag = false;
+
+    // Announce to readers there is no active writer.
+    int reader_count = ((butil::atomic<int>*)&rwlock->reader_count)->fetch_add(
+        RWLockMaxReaders, butil::memory_order_release) + RWLockMaxReaders;
+    if (BAIDU_UNLIKELY(reader_count >= RWLockMaxReaders)) {
+        CHECK(false) << "rwlock_unwlock of unlocked rwlock";
+        return EINVAL;
+    }
+
+    bool is_valid = bthread::is_contention_site_valid(rwlock->writer_csite);
+    if (BAIDU_UNLIKELY(is_valid)) {
+        bthread_contention_site_t saved_csite = rwlock->writer_csite;
+        bthread::make_contention_site_invalid(&rwlock->writer_csite);
+
+        const int64_t unlock_start_ns = butil::cpuwide_time_ns();
+        rwlock_unwrlock_slow(rwlock, reader_count);
+        const int64_t unlock_end_ns = butil::cpuwide_time_ns();
+        saved_csite.duration_ns += unlock_end_ns - unlock_start_ns;
+        bthread::submit_contention(saved_csite, unlock_end_ns);
+    } else {
+        rwlock_unwrlock_slow(rwlock, reader_count);
+    }
+
+    return 0;
+}
+
+static inline int rwlock_unlock(bthread_rwlock_t* rwlock) {
+    if (rwlock->wlock_flag) {
+        return rwlock_unwrlock(rwlock);
+    } else {
+        return rwlock_unrdlock(rwlock);
+    }
+}
+
+} // namespace bthread
+
+__BEGIN_DECLS
+
+int bthread_rwlock_init(bthread_rwlock_t* __restrict rwlock,
+                        const bthread_rwlockattr_t* __restrict) {
+    int rc = bthread_sem_init(&rwlock->reader_sema, 0);
+    if (BAIDU_UNLIKELY(0 != rc)) {
+        return rc;
+    }
+    bthread_sem_disable_csite(&rwlock->reader_sema);
+    rc = bthread_sem_init(&rwlock->writer_sema, 0);
+    if (BAIDU_UNLIKELY(0 != rc)) {
+        bthread_sem_destroy(&rwlock->reader_sema);
+        return rc;
+    }
+    bthread_sem_disable_csite(&rwlock->writer_sema);
+
+    rwlock->reader_count = 0;
+    rwlock->reader_wait = 0;
+    rwlock->wlock_flag = false;
+
+    bthread_mutexattr_t attr;
+    bthread_mutexattr_init(&attr);
+    bthread_mutexattr_disable_csite(&attr);
+    rc = bthread_mutex_init(&rwlock->write_queue_mutex, &attr);
+    if (BAIDU_UNLIKELY(0 != rc)) {
+        bthread_sem_destroy(&rwlock->reader_sema);
+        bthread_sem_destroy(&rwlock->writer_sema);
+        return rc;
+    }
+    bthread_mutexattr_destroy(&attr);
+
+    bthread::make_contention_site_invalid(&rwlock->writer_csite);
+
+    return 0;
+}
+
+int bthread_rwlock_destroy(bthread_rwlock_t* rwlock) {
+    bthread_sem_destroy(&rwlock->reader_sema);
+    bthread_sem_destroy(&rwlock->writer_sema);
+    bthread_mutex_destroy(&rwlock->write_queue_mutex);
+    return 0;
+}
+
+int bthread_rwlock_rdlock(bthread_rwlock_t* rwlock) {
+    return bthread::rwlock_rdlock(rwlock);
+}
+
+int bthread_rwlock_tryrdlock(bthread_rwlock_t* rwlock) {
+    return bthread::rwlock_tryrdlock(rwlock);
+}
+
+int bthread_rwlock_timedrdlock(bthread_rwlock_t* __restrict rwlock,
+                               const struct timespec* __restrict abstime) {
+    return bthread::rwlock_timedrdlock(rwlock, abstime);
+}
+
+int bthread_rwlock_wrlock(bthread_rwlock_t* rwlock) {
+    return bthread::rwlock_wrlock(rwlock);
+}
+
+int bthread_rwlock_trywrlock(bthread_rwlock_t* rwlock) {
+    return bthread::rwlock_trywrlock(rwlock);
+}
+
+int bthread_rwlock_timedwrlock(bthread_rwlock_t* __restrict rwlock,
+                               const struct timespec* __restrict abstime) {
+    return bthread::rwlock_timedwrlock(rwlock, abstime);
+}
+
+int bthread_rwlock_unlock(bthread_rwlock_t* rwlock) {
+    return bthread::rwlock_unlock(rwlock);
+}
+
+__END_DECLS
diff --git a/src/bthread/rwlock.h b/src/bthread/rwlock.h
new file mode 100644
index 00000000..a2708b99
--- /dev/null
+++ b/src/bthread/rwlock.h
@@ -0,0 +1,214 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#include "bthread/types.h"
+#include "bthread/bthread.h"
+#include "butil/scoped_lock.h"
+
+namespace bthread {
+
+// The C++ Wrapper of bthread_rwlock
+
+// NOTE: Not aligned to cacheline as the container of RWLock is practically 
aligned.
+
+class RWLock {
+public:
+    typedef bthread_rwlock_t* native_handler_type;
+
+    RWLock() {
+        int rc = bthread_rwlock_init(&_rwlock, NULL);
+        if (rc) {
+            throw std::system_error(std::error_code(rc, 
std::system_category()),
+                                    "RWLock constructor failed");
+        }
+    }
+
+    ~RWLock() {
+        CHECK_EQ(0, bthread_rwlock_destroy(&_rwlock));
+    }
+
+    DISALLOW_COPY_AND_ASSIGN(RWLock);
+
+    native_handler_type native_handler() { return &_rwlock; }
+
+    void rdlock() {
+        int rc = bthread_rwlock_rdlock(&_rwlock);
+        if (rc) {
+            throw std::system_error(std::error_code(rc, 
std::system_category()),
+                                    "RWLock rdlock failed");
+        }
+    }
+
+    bool try_rdlock() {
+        return 0 == bthread_rwlock_tryrdlock(&_rwlock);
+    }
+
+    bool timed_rdlock(const struct timespec* abstime) {
+        return 0 == bthread_rwlock_timedrdlock(&_rwlock, abstime);
+    }
+
+    void wrlock() {
+        int rc = bthread_rwlock_wrlock(&_rwlock);
+        if (rc) {
+            throw std::system_error(std::error_code(rc, 
std::system_category()),
+                                    "RWLock wrlock failed");
+        }
+    }
+
+    bool try_wrlock() {
+        return 0 == bthread_rwlock_trywrlock(&_rwlock);
+    }
+
+    bool timed_wrlock(const struct timespec* abstime) {
+        return 0 == bthread_rwlock_timedwrlock(&_rwlock, abstime);
+    }
+
+    void unlock() { bthread_rwlock_unlock(&_rwlock); }
+
+private:
+    bthread_rwlock_t _rwlock{};
+};
+
+// Read lock guard of rwlock.
+class RWLockRdGuard {
+public:
+    explicit RWLockRdGuard(bthread_rwlock_t& rwlock)
+        : _rwlock(&rwlock) {
+#if !defined(NDEBUG)
+        const int rc = bthread_rwlock_rdlock(_rwlock);
+        if (rc) {
+            LOG(FATAL) << "Fail to rdlock bthread_rwlock_t=" << _rwlock << ", 
" << berror(rc);
+            _rwlock = NULL;
+        }
+#else
+        bthread_rwlock_rdlock(_rwlock);
+#endif // NDEBUG
+    }
+
+    explicit RWLockRdGuard(RWLock& rwlock)
+        : RWLockRdGuard(*rwlock.native_handler()) {}
+
+    ~RWLockRdGuard() {
+#ifndef NDEBUG
+        if (NULL != _rwlock) {
+            bthread_rwlock_unlock(_rwlock);
+        }
+#else
+        bthread_rwlock_unlock(_rwlock);
+#endif // NDEBUG
+    }
+
+    DISALLOW_COPY_AND_ASSIGN(RWLockRdGuard);
+
+private:
+    bthread_rwlock_t* _rwlock;
+};
+
+// Write lock guard of rwlock.
+class RWLockWrGuard {
+public:
+    explicit RWLockWrGuard(bthread_rwlock_t& rwlock)
+        : _rwlock(&rwlock) {
+#if !defined(NDEBUG)
+        const int rc = bthread_rwlock_wrlock(_rwlock);
+        if (rc) {
+            LOG(FATAL) << "Fail to wrlock bthread_rwlock_t=" << _rwlock << ", 
" << berror(rc);
+            _rwlock = NULL;
+        }
+#else
+        bthread_rwlock_wrlock(_rwlock);
+#endif // NDEBUG
+    }
+
+    explicit RWLockWrGuard(RWLock& rwlock)
+        : RWLockWrGuard(*rwlock.native_handler()) {}
+
+    ~RWLockWrGuard() {
+#ifndef NDEBUG
+        if (NULL != _rwlock) {
+            bthread_rwlock_unlock(_rwlock);
+        }
+#else
+        bthread_rwlock_unlock(_rwlock);
+#endif // NDEBUG
+    }
+
+    DISALLOW_COPY_AND_ASSIGN(RWLockWrGuard);
+
+private:
+    bthread_rwlock_t* _rwlock;
+};
+
+} // namespace bthread
+
+namespace std {
+
+template <>
+class lock_guard<bthread_rwlock_t> {
+public:
+    lock_guard(bthread_rwlock_t& rwlock, bool read)
+        :_rwlock(&rwlock), _read(read) {
+#if !defined(NDEBUG)
+        int rc;
+        if (_read) {
+            rc = bthread_rwlock_rdlock(_rwlock);
+        } else {
+            rc = bthread_rwlock_wrlock(_rwlock);
+        }
+        if (rc) {
+            LOG(FATAL) << "Fail to lock bthread_rwlock_t=" << _rwlock << ", " 
<< berror(rc);
+            _rwlock = NULL;
+        }
+#else
+        if (_read) {
+            bthread_rwlock_rdlock(_rwlock);
+        } else {
+            bthread_rwlock_wrlock(_rwlock);
+        }
+#endif // NDEBUG
+    }
+
+    ~lock_guard() {
+#ifndef NDEBUG
+        if (NULL != _rwlock) {
+            bthread_rwlock_unlock(_rwlock);
+        }
+#else
+        bthread_rwlock_unlock(_rwlock);
+#endif // NDEBUG
+    }
+
+    DISALLOW_COPY_AND_ASSIGN(lock_guard);
+
+private:
+    bthread_rwlock_t* _rwlock;
+    bool _read;
+};
+
+template <>
+class lock_guard<bthread::RWLock> {
+public:
+    lock_guard(bthread::RWLock& rwlock, bool read)
+        :_rwlock_guard(*rwlock.native_handler(), read) {}
+
+    DISALLOW_COPY_AND_ASSIGN(lock_guard);
+
+private:
+    std::lock_guard<bthread_rwlock_t> _rwlock_guard;
+};
+
+} // namespace std
diff --git a/src/bthread/semaphore.cpp b/src/bthread/semaphore.cpp
new file mode 100644
index 00000000..3813a8a6
--- /dev/null
+++ b/src/bthread/semaphore.cpp
@@ -0,0 +1,173 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#include "butil/memory/scope_guard.h"
+#include "bvar/collector.h"
+#include "bthread/bthread.h"
+#include "bthread/butex.h"
+
+namespace bthread {
+
+// Define in bthread/mutex.cpp
+class ContentionProfiler;
+extern ContentionProfiler* g_cp;
+extern bvar::CollectorSpeedLimit g_cp_sl;
+extern bool is_contention_site_valid(const bthread_contention_site_t& cs);
+extern void make_contention_site_invalid(bthread_contention_site_t* cs);
+extern void submit_contention(const bthread_contention_site_t& csite, int64_t 
now_ns);
+
+static inline int bthread_sem_trywait(bthread_sem_t* sema) {
+    auto whole = (butil::atomic<unsigned>*)sema->butex;
+    while (true) {
+        unsigned num = whole->load(butil::memory_order_relaxed);
+        if (num == 0) {
+            return EAGAIN;
+        }
+        if (whole->compare_exchange_weak(num, num - 1,
+                                         butil::memory_order_acquire,
+                                         butil::memory_order_relaxed)) {
+            return 0;
+        }
+    }
+
+}
+
+static int bthread_sem_wait_impl(bthread_sem_t* sem, const struct timespec* 
abstime) {
+    bool queue_lifo = false;
+    bool first_wait = true;
+    size_t sampling_range = bvar::INVALID_SAMPLING_RANGE;
+    // -1: don't sample.
+    // 0: default value.
+    // > 0: Start time of sampling.
+    int64_t start_ns = 0;
+    auto whole = (butil::atomic<unsigned>*)sem->butex;
+    while (true) {
+        unsigned num = whole->load(butil::memory_order_relaxed);
+        if (num > 0) {
+            if (whole->compare_exchange_weak(num, num - 1,
+                                             butil::memory_order_acquire,
+                                             butil::memory_order_relaxed)) {
+                if (start_ns > 0) {
+                    const int64_t end_ns = butil::cpuwide_time_ns();
+                    const bthread_contention_site_t csite{end_ns - start_ns, 
sampling_range};
+                    bthread::submit_contention(csite, end_ns);
+                }
+
+                return 0;
+            }
+        }
+        // Don't sample when contention profiler is off.
+        if (NULL != bthread::g_cp && start_ns == 0 && sem->enable_csite &&
+            !bvar::is_sampling_range_valid(sampling_range)) {
+            // Ask Collector if this (contended) sem waiting should be sampled.
+            sampling_range = bvar::is_collectable(&bthread::g_cp_sl);
+            start_ns = bvar::is_sampling_range_valid(sampling_range) ?
+                       butil::cpuwide_time_ns() : -1;
+        } else {
+            start_ns = -1;
+        }
+        if (bthread::butex_wait(sem->butex, 0, abstime, queue_lifo) < 0 &&
+            errno != EWOULDBLOCK && errno != EINTR) {
+            // A sema should ignore interruptions in general since
+            // user code is unlikely to check the return value.
+            if (ETIMEDOUT == errno && start_ns > 0) {
+                // Failed to lock due to ETIMEDOUT, submit the elapse directly.
+                const int64_t end_ns = butil::cpuwide_time_ns();
+                const bthread_contention_site_t csite{end_ns - start_ns, 
sampling_range};
+                bthread::submit_contention(csite, end_ns);
+            }
+
+            return errno;
+        }
+        // Ignore EWOULDBLOCK and EINTR.
+        if (first_wait && 0 == errno) {
+            first_wait = false;
+        }
+        if (!first_wait) {
+            // Normally, bthreads are queued in FIFO order. But competing with 
new
+            // arriving bthreads over sema, a woken up bthread has good 
chances of
+            // losing. Because new arriving bthreads are already running on 
CPU and
+            // there can be lots of them. In such case, for fairness, to avoid
+            // starvation, it is queued at the head of the waiter queue.
+            queue_lifo = true;
+        }
+    }
+}
+
+static inline int bthread_sem_post(bthread_sem_t* sem, size_t num) {
+    if (num > 0) {
+        unsigned n = ((butil::atomic<unsigned>*)sem->butex)
+            ->fetch_add(num, butil::memory_order_relaxed);
+        const size_t sampling_range = NULL != bthread::g_cp && 
sem->enable_csite ?
+            bvar::is_collectable(&bthread::g_cp_sl) : 
bvar::INVALID_SAMPLING_RANGE;
+        const int64_t start_ns = bvar::is_sampling_range_valid(sampling_range) 
?
+                                 butil::cpuwide_time_ns() : -1;
+        bthread::butex_wake_n(sem->butex, n);
+        if (start_ns > 0) {
+            const int64_t end_ns = butil::cpuwide_time_ns();
+            const bthread_contention_site_t csite{end_ns - start_ns, 
sampling_range};
+            bthread::submit_contention(csite, end_ns);
+        }
+    }
+    return 0;
+}
+
+} // namespace bthread
+
+__BEGIN_DECLS
+
+int bthread_sem_init(bthread_sem_t* sem, unsigned value) {
+    sem->butex = bthread::butex_create_checked<unsigned>();
+    if (!sem->butex) {
+        return ENOMEM;
+    }
+    *sem->butex = value;
+    sem->enable_csite = true;
+    return 0;
+}
+
+int bthread_sem_disable_csite(bthread_sem_t* sema) {
+    sema->enable_csite = false;
+    return 0;
+}
+
+int bthread_sem_destroy(bthread_sem_t* semaphore) {
+    bthread::butex_destroy(semaphore->butex);
+    return 0;
+}
+
+int bthread_sem_trywait(bthread_sem_t* sem) {
+    return bthread::bthread_sem_trywait(sem);
+}
+
+int bthread_sem_wait(bthread_sem_t* sem) {
+    return bthread::bthread_sem_wait_impl(sem, NULL);
+}
+
+int bthread_sem_timedwait(bthread_sem_t* sem, const struct timespec* abstime) {
+    return bthread::bthread_sem_wait_impl(sem, abstime);
+}
+
+int bthread_sem_post(bthread_sem_t* sem) {
+    return bthread::bthread_sem_post(sem, 1);
+}
+
+int bthread_sem_post_n(bthread_sem_t* sem, size_t n) {
+    return bthread::bthread_sem_post(sem, n);
+}
+
+__END_DECLS
\ No newline at end of file
diff --git a/src/bthread/types.h b/src/bthread/types.h
index 0aad64c4..d177ea72 100644
--- a/src/bthread/types.h
+++ b/src/bthread/types.h
@@ -172,9 +172,11 @@ typedef struct bthread_mutex_t {
 #endif
     unsigned* butex;
     bthread_contention_site_t csite;
+    bool enable_csite;
 } bthread_mutex_t;
 
 typedef struct {
+    bool enable_csite;
 } bthread_mutexattr_t;
 
 typedef struct bthread_cond_t {
@@ -190,6 +192,18 @@ typedef struct {
 } bthread_condattr_t;
 
 typedef struct {
+    unsigned* butex;
+    bool enable_csite;
+} bthread_sem_t;
+
+typedef struct {
+    bthread_sem_t reader_sema; // Semaphore for readers to wait for completing 
writers.
+    bthread_sem_t writer_sema; // Semaphore for writers to wait for completing 
readers.
+    int reader_count; // Number of pending readers.
+    int reader_wait; // Number of departing readers.
+    bool wlock_flag; // Flag used to indicate that a write lock has been hold.
+    bthread_mutex_t write_queue_mutex; // Held if there are pending writers.
+    bthread_contention_site_t writer_csite;
 } bthread_rwlock_t;
 
 typedef struct {
diff --git a/src/bvar/collector.h b/src/bvar/collector.h
index 56db7214..a603d96b 100644
--- a/src/bvar/collector.h
+++ b/src/bvar/collector.h
@@ -28,6 +28,12 @@
 
 namespace bvar {
 
+static const size_t INVALID_SAMPLING_RANGE = 0;
+
+inline bool is_sampling_range_valid(size_t sampling_range) {
+    return sampling_range > 0;
+}
+
 // Containing the context for limiting sampling speed.
 struct CollectorSpeedLimit {
     // [Managed by Collector, don't change!]
@@ -115,7 +121,7 @@ inline size_t is_collectable(CollectorSpeedLimit* 
speed_limit) {
         const size_t sampling_range = speed_limit->sampling_range;
         // fast_rand is faster than fast_rand_in
         if ((butil::fast_rand() & (COLLECTOR_SAMPLING_BASE - 1)) >= 
sampling_range) {
-            return 0;
+            return INVALID_SAMPLING_RANGE;
         }
         return sampling_range;
     }
diff --git a/test/BUILD.bazel b/test/BUILD.bazel
index d9af2ae7..b8c3b3d2 100644
--- a/test/BUILD.bazel
+++ b/test/BUILD.bazel
@@ -226,6 +226,8 @@ cc_test(
             # glog CHECK die with a fatal error
             "bthread_key_unittest.cpp",
             "bthread_butex_multi_tag_unittest.cpp",
+            "bthread_rwlock_unittest.cpp",
+            "bthread_semaphore_unittest.cpp",
         ],
     ),
     copts = COPTS,
diff --git a/test/bthread_rwlock_unittest.cpp b/test/bthread_rwlock_unittest.cpp
index 60cbfbe2..3318956b 100644
--- a/test/bthread_rwlock_unittest.cpp
+++ b/test/bthread_rwlock_unittest.cpp
@@ -15,15 +15,394 @@
 // specific language governing permissions and limitations
 // under the License.
 
-#include <stdlib.h>
-#include <unistd.h>
-#include <stdio.h>
-#include <signal.h>
 #include <gtest/gtest.h>
-#include "butil/time.h"
-#include "butil/macros.h"
+#include <butil/gperftools_profiler.h>
+#include <bthread/rwlock.h>
 
 namespace {
+
+long start_time = butil::cpuwide_time_ms();
+int c = 0;
+void* rdlocker(void* arg) {
+    auto rw = (bthread_rwlock_t*)arg;
+    bthread_rwlock_rdlock(rw);
+    LOG(INFO) <<butil::string_printf("[%" PRIu64 "] I'm rdlocker, %d, %" 
PRId64 "ms\n",
+        pthread_numeric_id(), ++c,
+        butil::cpuwide_time_ms() - start_time);
+    bthread_usleep(10000);
+    bthread_rwlock_unlock(rw);
+    return NULL;
+}
+
+void* wrlocker(void* arg) {
+    auto rw = (bthread_rwlock_t*)arg;
+    bthread_rwlock_wrlock(rw);
+    LOG(INFO) << butil::string_printf("[%" PRIu64 "] I'm wrlocker, %d, %" 
PRId64 "ms\n",
+        pthread_numeric_id(), ++c,
+        butil::cpuwide_time_ms() - start_time);
+    bthread_usleep(10000);
+    bthread_rwlock_unlock(rw);
+    return NULL;
+}
+
+TEST(RWLockTest, sanity) {
+    bthread_rwlock_t rw;
+    ASSERT_EQ(0, bthread_rwlock_init(&rw, NULL));
+    ASSERT_EQ(0, bthread_rwlock_rdlock(&rw));
+    ASSERT_EQ(0, bthread_rwlock_unlock(&rw));
+    ASSERT_EQ(0, bthread_rwlock_wrlock(&rw));
+    ASSERT_EQ(0, bthread_rwlock_unlock(&rw));
+
+    bthread_t rdth;
+    bthread_t rwth;
+    ASSERT_EQ(0, bthread_start_urgent(&rdth, NULL, rdlocker, &rw));
+    ASSERT_EQ(0, bthread_start_urgent(&rwth, NULL, wrlocker, &rw));
+
+    ASSERT_EQ(0, bthread_join(rdth, NULL));
+    ASSERT_EQ(0, bthread_join(rwth, NULL));
+    ASSERT_EQ(0, bthread_rwlock_destroy(&rw));
+}
+
+TEST(RWLockTest, used_in_pthread) {
+    bthread_rwlock_t rw;
+    ASSERT_EQ(0, bthread_rwlock_init(&rw, NULL));
+    pthread_t rdth[8];
+    pthread_t wrth[8];
+    for (size_t i = 0; i < ARRAY_SIZE(rdth); ++i) {
+        ASSERT_EQ(0, pthread_create(&rdth[i], NULL, rdlocker, &rw));
+    }
+    for (size_t i = 0; i < ARRAY_SIZE(wrth); ++i) {
+        ASSERT_EQ(0, pthread_create(&wrth[i], NULL, wrlocker, &rw));
+    }
+
+    for (size_t i = 0; i < ARRAY_SIZE(rdth); ++i) {
+        pthread_join(rdth[i], NULL);
+    }
+    for (size_t i = 0; i < ARRAY_SIZE(rdth); ++i) {
+        pthread_join(wrth[i], NULL);
+    }
+    ASSERT_EQ(0, bthread_rwlock_destroy(&rw));
+}
+
+void* do_timedrdlock(void *arg) {
+    struct timespec t = { -2, 0 };
+    EXPECT_EQ(ETIMEDOUT, bthread_rwlock_timedrdlock((bthread_rwlock_t*)arg, 
&t));
+    return NULL;
+}
+
+void* do_timedwrlock(void *arg) {
+    struct timespec t = { -2, 0 };
+    EXPECT_EQ(ETIMEDOUT, bthread_rwlock_timedwrlock((bthread_rwlock_t*)arg, 
&t));
+    LOG(INFO) << 10;
+    return NULL;
+}
+
+TEST(RWLockTest, timedlock) {
+    bthread_rwlock_t rw;
+    ASSERT_EQ(0, bthread_rwlock_init(&rw, NULL));
+
+    ASSERT_EQ(0, bthread_rwlock_rdlock(&rw));
+    bthread_t th;
+    ASSERT_EQ(0, bthread_start_urgent(&th, NULL, do_timedwrlock, &rw));
+    ASSERT_EQ(0, bthread_join(th, NULL));
+    ASSERT_EQ(0, bthread_rwlock_unlock(&rw));
+
+    ASSERT_EQ(0, bthread_rwlock_wrlock(&rw));
+    ASSERT_EQ(0, bthread_start_urgent(&th, NULL, do_timedwrlock, &rw));
+    ASSERT_EQ(0, bthread_join(th, NULL));
+    ASSERT_EQ(0, bthread_start_urgent(&th, NULL, do_timedrdlock, &rw));
+    ASSERT_EQ(0, bthread_join(th, NULL));
+    ASSERT_EQ(0, bthread_rwlock_unlock(&rw));
+    ASSERT_EQ(0, bthread_rwlock_destroy(&rw));
+}
+
+struct TrylockArgs {
+    bthread_rwlock_t* rw;
+    int rc;
+};
+
+void* do_tryrdlock(void *arg) {
+    auto trylock_args = (TrylockArgs*)arg;
+    EXPECT_EQ(trylock_args->rc, bthread_rwlock_tryrdlock(trylock_args->rw));
+    if (0 != trylock_args->rc) {
+        return NULL;
+    }
+    EXPECT_EQ(trylock_args->rc, bthread_rwlock_unlock(trylock_args->rw));
+    return NULL;
+}
+
+void* do_trywrlock(void *arg) {
+    auto trylock_args = (TrylockArgs*)arg;
+    EXPECT_EQ(trylock_args->rc, bthread_rwlock_trywrlock(trylock_args->rw));
+    if (0 != trylock_args->rc) {
+        return NULL;
+    }
+    EXPECT_EQ(trylock_args->rc, bthread_rwlock_unlock(trylock_args->rw));
+    return NULL;
+}
+
+TEST(RWLockTest, trylock) {
+    bthread_rwlock_t rw;
+    ASSERT_EQ(0, bthread_rwlock_init(&rw, NULL));
+
+    ASSERT_EQ(0, bthread_rwlock_tryrdlock(&rw));
+    ASSERT_EQ(0, bthread_rwlock_unlock(&rw));
+    ASSERT_EQ(0, bthread_rwlock_rdlock(&rw));
+    bthread_t th;
+    TrylockArgs args{&rw, 0};
+    ASSERT_EQ(0, bthread_start_urgent(&th, NULL, do_tryrdlock, &args));
+    ASSERT_EQ(0, bthread_join(th, NULL));
+    args.rc = EBUSY;
+    ASSERT_EQ(0, bthread_start_urgent(&th, NULL, do_trywrlock, &args));
+    ASSERT_EQ(0, bthread_join(th, NULL));
+    ASSERT_EQ(0, bthread_rwlock_unlock(&rw));
+
+    ASSERT_EQ(0, bthread_rwlock_trywrlock(&rw));
+    ASSERT_EQ(0, bthread_rwlock_unlock(&rw));
+    ASSERT_EQ(0, bthread_rwlock_wrlock(&rw));
+    ASSERT_EQ(0, bthread_start_urgent(&th, NULL, do_tryrdlock, &args));
+    ASSERT_EQ(0, bthread_join(th, NULL));
+    ASSERT_EQ(0, bthread_start_urgent(&th, NULL, do_trywrlock, &args));
+    ASSERT_EQ(0, bthread_join(th, NULL));
+    ASSERT_EQ(0, bthread_rwlock_unlock(&rw));
+
+    ASSERT_EQ(0, bthread_rwlock_destroy(&rw));
+}
+
+TEST(RWLockTest, cpp_wrapper) {
+    bthread::RWLock rw;
+    ASSERT_TRUE(rw.try_rdlock());
+    rw.unlock();
+    rw.rdlock();
+    rw.unlock();
+    ASSERT_TRUE(rw.try_wrlock());
+    rw.unlock();
+    rw.wrlock();
+    rw.unlock();
+
+    struct timespec t = { -2, 0 };
+    ASSERT_TRUE(rw.timed_rdlock(&t));
+    rw.unlock();
+    ASSERT_TRUE(rw.timed_wrlock(&t));
+    rw.unlock();
+
+    {
+        bthread::RWLockRdGuard guard(rw);
+    }
+    {
+        bthread::RWLockWrGuard guard(rw);
+    }
+    {
+        std::lock_guard<bthread::RWLock> guard(rw, true);
+    }
+    {
+        std::lock_guard<bthread::RWLock> guard(rw, false);
+    }
+    {
+        std::lock_guard<bthread_rwlock_t> guard(*rw.native_handler(), true);
+    }
+    {
+        std::lock_guard<bthread_rwlock_t> guard(*rw.native_handler(), false);
+    }
+}
+
+bool g_started = false;
+bool g_stopped = false;
+
+void read_op(bthread_rwlock_t* rw, int64_t sleep_us) {
+    ASSERT_EQ(0, bthread_rwlock_rdlock(rw));
+    if (0 != sleep_us) {
+        bthread_usleep(sleep_us);
+    }
+    ASSERT_EQ(0, bthread_rwlock_unlock(rw));
+}
+
+void write_op(bthread_rwlock_t* rw, int64_t sleep_us) {
+    ASSERT_EQ(0, bthread_rwlock_wrlock(rw));
+    if (0 != sleep_us) {
+        bthread_usleep(sleep_us);
+    }
+    ASSERT_EQ(0, bthread_rwlock_unlock(rw));
+}
+
+typedef void (*OP)(bthread_rwlock_t* rw, int64_t sleep_us);
+
+struct MixThreadArg {
+    bthread_rwlock_t* rw;
+    OP op;
+};
+
+void* loop_until_stopped(void* arg) {
+    auto args = (MixThreadArg*)arg;
+    while (!g_stopped) {
+        args->op(args->rw, 20);
+    }
+    return NULL;
+}
+
+TEST(RWLockTest, mix_thread_types) {
+    g_stopped = false;
+    bthread_rwlock_t rw;
+    ASSERT_EQ(0, bthread_rwlock_init(&rw, NULL));
+
+    const int N = 16;
+    const int M = N * 2;
+    pthread_t pthreads[N];
+    bthread_t bthreads[M];
+    // reserve enough workers for test. This is a must since we have
+    // BTHREAD_ATTR_PTHREAD bthreads which may cause deadlocks (the
+    // bhtread_usleep below can't be scheduled and g_stopped is never
+    // true, thus loop_until_stopped spins forever)
+    bthread_setconcurrency(M);
+    std::vector<MixThreadArg> args;
+    args.reserve(N + M);
+    for (int i = 0; i < N; ++i) {
+        if (i % 2 == 0) {
+            args.push_back({&rw, read_op});
+        } else {
+            args.push_back({&rw, write_op});
+        }
+        ASSERT_EQ(0, pthread_create(&pthreads[i], NULL, loop_until_stopped, 
&args.back()));
+    }
+
+    for (int i = 0; i < M; ++i) {
+        if (i % 2 == 0) {
+            args.push_back({&rw, read_op});
+        } else {
+            args.push_back({&rw, write_op});
+        }
+        const bthread_attr_t* attr = i % 2 ? NULL : &BTHREAD_ATTR_PTHREAD;
+        ASSERT_EQ(0, bthread_start_urgent(&bthreads[i], attr, 
loop_until_stopped, &args.back()));
+    }
+    bthread_usleep(1000L * 1000);
+    g_stopped = true;
+    for (int i = 0; i < M; ++i) {
+        bthread_join(bthreads[i], NULL);
+    }
+    for (int i = 0; i < N; ++i) {
+        pthread_join(pthreads[i], NULL);
+    }
+
+    ASSERT_EQ(0, bthread_rwlock_destroy(&rw));
+}
+
+struct BAIDU_CACHELINE_ALIGNMENT PerfArgs {
+    bthread_rwlock_t* rw;
+    int64_t counter;
+    int64_t elapse_ns;
+    bool ready;
+
+    PerfArgs() : rw(NULL), counter(0), elapse_ns(0), ready(false) {}
+};
+
+template <bool Reader>
+void* add_with_mutex(void* void_arg) {
+    auto args = (PerfArgs*)void_arg;
+    args->ready = true;
+    butil::Timer t;
+    while (!g_stopped) {
+        if (g_started) {
+            break;
+        }
+        bthread_usleep(10);
+    }
+    t.start();
+    while (!g_stopped) {
+        if (Reader) {
+            bthread_rwlock_rdlock(args->rw);
+        } else {
+            bthread_rwlock_wrlock(args->rw);
+        }
+        ++args->counter;
+        bthread_rwlock_unlock(args->rw);
+    }
+    t.stop();
+    args->elapse_ns = t.n_elapsed();
+    return NULL;
+}
+
+int g_prof_name_counter = 0;
+
+template <typename ThreadId, typename ThreadCreateFn, typename ThreadJoinFn>
+void PerfTest(uint32_t writer_ratio, ThreadId* /*dummy*/, int thread_num,
+              const ThreadCreateFn& create_fn, const ThreadJoinFn& join_fn) {
+    ASSERT_LE(writer_ratio, 100U);
+
+    g_started = false;
+    g_stopped = false;
+    bthread_setconcurrency(thread_num + 4);
+    std::vector<ThreadId> threads(thread_num);
+    std::vector<PerfArgs> args(thread_num);
+    bthread_rwlock_t rw;
+    bthread_rwlock_init(&rw, NULL);
+    int writer_num = thread_num * writer_ratio / 100;
+    int reader_num = thread_num - writer_num;
+    for (int i = 0; i < thread_num; ++i) {
+        args[i].rw = &rw;
+        if (i < writer_num) {
+            ASSERT_EQ(0, create_fn(&threads[i], NULL, add_with_mutex<false>, 
&args[i]));
+        } else {
+            ASSERT_EQ(0, create_fn(&threads[i], NULL, add_with_mutex<true>, 
&args[i]));
+        }
+    }
+    while (true) {
+        bool all_ready = true;
+        for (int i = 0; i < thread_num; ++i) {
+            if (!args[i].ready) {
+                all_ready = false;
+                break;
+            }
+        }
+        if (all_ready) {
+            break;
+        }
+        usleep(1000);
+    }
+    g_started = true;
+    char prof_name[32];
+    snprintf(prof_name, sizeof(prof_name), "bthread_rwlock_perf_%d.prof", 
++g_prof_name_counter);
+    ProfilerStart(prof_name);
+    usleep(1000 * 1000);
+    ProfilerStop();
+    g_stopped = true;
+
+    int64_t read_wait_time = 0;
+    int64_t read_count = 0;
+    int64_t write_wait_time = 0;
+    int64_t write_count = 0;
+    for (int i = 0; i < thread_num; ++i) {
+        ASSERT_EQ(0, join_fn(threads[i], NULL));
+        if (i < writer_num) {
+            write_wait_time += args[i].elapse_ns;
+            write_count += args[i].counter;
+        } else {
+            read_wait_time += args[i].elapse_ns;
+            read_count += args[i].counter;
+        }
+    }
+    LOG(INFO) << "bthread rwlock in "
+              << ((void*)create_fn == (void*)pthread_create ? "pthread" : 
"bthread")
+              << " thread_num=" << thread_num
+              << " writer_ratio=" << writer_ratio
+              << " reader_num=" << reader_num
+              << " read_count=" << read_count
+              << " read_average_time=" << (read_count == 0 ? 0 : 
read_wait_time / (double)read_count)
+              << " writer_num=" << writer_num
+              << " write_count=" << write_count
+              << " write_average_time=" << (write_count == 0 ? 0 : 
write_wait_time / (double)write_count);
+}
+
+TEST(RWLockTest, performance) {
+    const int thread_num = 12;
+    PerfTest(0, (pthread_t*)NULL, thread_num, pthread_create, pthread_join);
+    PerfTest(0, (bthread_t*)NULL, thread_num, bthread_start_background, 
bthread_join);
+    PerfTest(10, (pthread_t*)NULL, thread_num, pthread_create, pthread_join);
+    PerfTest(20, (bthread_t*)NULL, thread_num, bthread_start_background, 
bthread_join);
+    PerfTest(100, (pthread_t*)NULL, thread_num, pthread_create, pthread_join);
+    PerfTest(100, (bthread_t*)NULL, thread_num, bthread_start_background, 
bthread_join);
+}
+
+
 void* read_thread(void* arg) {
     const size_t N = 10000;
 #ifdef CHECK_RWLOCK
@@ -49,7 +428,7 @@ void* write_thread(void*) {
     return NULL;
 }
 
-TEST(RWLockTest, rdlock_performance) {
+TEST(RWLockTest, pthread_rdlock_performance) {
 #ifdef CHECK_RWLOCK
     pthread_rwlock_t lock1;
     ASSERT_EQ(0, pthread_rwlock_init(&lock1, NULL));
diff --git a/test/bthread_semaphore_unittest.cpp 
b/test/bthread_semaphore_unittest.cpp
new file mode 100644
index 00000000..cc598a4c
--- /dev/null
+++ b/test/bthread_semaphore_unittest.cpp
@@ -0,0 +1,208 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#include <gtest/gtest.h>
+#include <bthread/bthread.h>
+
+namespace {
+
+const size_t SEM_COUNT = 10000;
+
+void* sem_waiter(void* arg) {
+    bthread_usleep(10 * 1000);
+    auto sem = (bthread_sem_t*)arg;
+    for (size_t i = 0; i < SEM_COUNT; ++i) {
+        bthread_sem_wait(sem);
+    }
+    return NULL;
+}
+
+void* sem_poster(void* arg) {
+    bthread_usleep(10 * 1000);
+    auto sem = (bthread_sem_t*)arg;
+    for (size_t i = 0; i < SEM_COUNT; ++i) {
+        bthread_sem_post(sem);
+    }
+    return NULL;
+}
+
+TEST(SemaphoreTest, sanity) {
+    bthread_sem_t sem;
+    ASSERT_EQ(0, bthread_sem_init(&sem, 1));
+    ASSERT_EQ(0, bthread_sem_wait(&sem));
+    ASSERT_EQ(0, bthread_sem_post(&sem));
+    ASSERT_EQ(0, bthread_sem_wait(&sem));
+
+    bthread_t waiter_th;
+    bthread_t poster_th;
+    ASSERT_EQ(0, bthread_start_urgent(&waiter_th, NULL, sem_waiter, &sem));
+    ASSERT_EQ(0, bthread_start_urgent(&poster_th, NULL, sem_poster, &sem));
+    ASSERT_EQ(0, bthread_join(waiter_th, NULL));
+    ASSERT_EQ(0, bthread_join(poster_th, NULL));
+
+    ASSERT_EQ(0, bthread_sem_destroy(&sem));
+}
+
+
+
+TEST(SemaphoreTest, used_in_pthread) {
+    bthread_sem_t sem;
+    ASSERT_EQ(0, bthread_sem_init(&sem, 0));
+
+    pthread_t waiter_th[8];
+    pthread_t poster_th[8];
+    for (auto& th : waiter_th) {
+        ASSERT_EQ(0, pthread_create(&th, NULL, sem_waiter, &sem));
+    }
+    for (auto& th : poster_th) {
+        ASSERT_EQ(0, pthread_create(&th, NULL, sem_poster, &sem));
+    }
+    for (auto& th : waiter_th) {
+        pthread_join(th, NULL);
+    }
+    for (auto& th : poster_th) {
+        pthread_join(th, NULL);
+    }
+
+    ASSERT_EQ(0, bthread_sem_destroy(&sem));
+}
+
+void* do_timedwait(void *arg) {
+    struct timespec t = { -2, 0 };
+    EXPECT_EQ(ETIMEDOUT, bthread_sem_timedwait((bthread_sem_t*)arg, &t));
+    return NULL;
+}
+
+TEST(SemaphoreTest, timedwait) {
+    bthread_sem_t sem;
+    ASSERT_EQ(0, bthread_sem_init(&sem, 0));
+    bthread_t th;
+    ASSERT_EQ(0, bthread_start_urgent(&th, NULL, do_timedwait, &sem));
+    ASSERT_EQ(0, bthread_join(th, NULL));
+    ASSERT_EQ(0, bthread_sem_destroy(&sem));
+}
+
+
+struct TryWaitArgs {
+    bthread_sem_t* sem;
+    int rc;
+};
+
+void* do_trywait(void *arg) {
+    auto trylock_args = (TryWaitArgs*)arg;
+    EXPECT_EQ(trylock_args->rc, bthread_sem_trywait(trylock_args->sem));
+    return NULL;
+}
+
+TEST(SemaphoreTest, trywait) {
+    bthread_sem_t sem;
+    ASSERT_EQ(0, bthread_sem_init(&sem, 0));
+
+    ASSERT_EQ(EAGAIN, bthread_sem_trywait(&sem));
+    ASSERT_EQ(0, bthread_sem_post(&sem));
+    ASSERT_EQ(0, bthread_sem_trywait(&sem));
+    ASSERT_EQ(EAGAIN, bthread_sem_trywait(&sem));
+
+    ASSERT_EQ(0, bthread_sem_post(&sem));
+    bthread_t th;
+    TryWaitArgs args{ &sem, 0};
+    ASSERT_EQ(0, bthread_start_urgent(&th, NULL, do_trywait, &args));
+    ASSERT_EQ(0, bthread_join(th, NULL));
+    args.rc = EAGAIN;
+    ASSERT_EQ(0, bthread_start_urgent(&th, NULL, do_trywait, &args));
+    ASSERT_EQ(0, bthread_join(th, NULL));
+
+    ASSERT_EQ(0, bthread_sem_destroy(&sem));
+}
+
+bool g_started = false;
+bool g_stopped = false;
+
+void wait_op(bthread_sem_t* sem, int64_t sleep_us) {
+    ASSERT_EQ(0, bthread_sem_wait(sem));
+    if (0 != sleep_us) {
+        bthread_usleep(sleep_us);
+    }
+}
+
+void post_op(bthread_sem_t* rw, int64_t sleep_us) {
+    ASSERT_EQ(0, bthread_sem_post(rw));
+    if (0 != sleep_us) {
+        bthread_usleep(sleep_us);
+    }
+}
+
+typedef void (*OP)(bthread_sem_t* sem, int64_t sleep_us);
+
+struct MixThreadArg {
+    bthread_sem_t* sem;
+    OP op;
+};
+
+void* loop_until_stopped(void* arg) {
+    auto args = (MixThreadArg*)arg;
+    for (size_t i = 0; i < SEM_COUNT; ++i) {
+        args->op(args->sem, 20);
+    }
+    return NULL;
+}
+
+TEST(SemaphoreTest, mix_thread_types) {
+    g_stopped = false;
+    bthread_sem_t sem;
+    ASSERT_EQ(0, bthread_sem_init(&sem, 0));
+
+    const int N = 16;
+    const int M = N * 2;
+    pthread_t pthreads[N];
+    bthread_t bthreads[M];
+    // reserve enough workers for test. This is a must since we have
+    // BTHREAD_ATTR_PTHREAD bthreads which may cause deadlocks (the
+    // bhtread_usleep below can't be scheduled and g_stopped is never
+    // true, thus loop_until_stopped spins forever)
+    bthread_setconcurrency(M);
+    std::vector<MixThreadArg> args;
+    args.reserve(N + M);
+    for (int i = 0; i < N; ++i) {
+        if (i % 2 == 0) {
+            args.push_back({ &sem, wait_op });
+        } else {
+            args.push_back({ &sem, post_op });
+        }
+        ASSERT_EQ(0, pthread_create(&pthreads[i], NULL, loop_until_stopped, 
&args.back()));
+    }
+
+    for (int i = 0; i < M; ++i) {
+        if (i % 2 == 0) {
+            args.push_back({ &sem, wait_op });
+        } else {
+            args.push_back({ &sem, post_op });
+        }
+        const bthread_attr_t* attr = i % 2 ? NULL : &BTHREAD_ATTR_PTHREAD;
+        ASSERT_EQ(0, bthread_start_urgent(&bthreads[i], attr, 
loop_until_stopped, &args.back()));
+    }
+    for (bthread_t bthread : bthreads) {
+        bthread_join(bthread, NULL);
+    }
+    for (pthread_t pthread : pthreads) {
+        pthread_join(pthread, NULL);
+    }
+
+    ASSERT_EQ(0, bthread_sem_destroy(&sem));
+}
+
+} // namespace


---------------------------------------------------------------------
To unsubscribe, e-mail: dev-unsubscr...@brpc.apache.org
For additional commands, e-mail: dev-h...@brpc.apache.org

Reply via email to