This is an automated email from the ASF dual-hosted git repository.
chenBright pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/brpc.git
The following commit(s) were added to refs/heads/master by this push:
new f85f1c57 Reimplement writer-priority RWLock (#3286)
f85f1c57 is described below
commit f85f1c57041f20256ec7dcd5241ad5568391ac99
Author: Bright Chen <[email protected]>
AuthorDate: Tue May 19 16:45:22 2026 +0800
Reimplement writer-priority RWLock (#3286)
The previous go-like RWLock bumps the reader count first and rolls it
back only on the full success path. The state is not reversible on a
partial failure, so a read timeout while a writer holds the lock leaves a
dangling reader credit and permanently blocks future writers (issue #3051).
For the same reason the old implementation could not offer correct
try_/timed_ APIs
at all.
Co-authored-by: hairet
---
src/bthread/rwlock.cpp | 666 ++++++++++++++++++++++++---------------
src/bthread/types.h | 26 +-
test/bthread_rwlock_unittest.cpp | 253 ++++++++++++++-
3 files changed, 681 insertions(+), 264 deletions(-)
diff --git a/src/bthread/rwlock.cpp b/src/bthread/rwlock.cpp
index e6356683..e28f5ccb 100644
--- a/src/bthread/rwlock.cpp
+++ b/src/bthread/rwlock.cpp
@@ -15,350 +15,508 @@
// specific language governing permissions and limitations
// under the License.
+#include <memory>
#include "bvar/collector.h"
+#include "butil/memory/scope_guard.h"
#include "bthread/rwlock.h"
+#include "bthread/mutex.h"
#include "bthread/butex.h"
namespace bthread {
-// A `bthread_rwlock_t' is a reader/writer mutual exclusion lock,
-// which is a bthread implementation of golang RWMutex.
-// The lock can be held by an arbitrary number of readers or a single writer.
-// For details, see
https://github.com/golang/go/blob/master/src/sync/rwmutex.go
-
-// Define in bthread/mutex.cpp
+// Defined in bthread/mutex.cpp; reused here so that bthread_rwlock_t
+// participates in the global ContentionProfiler just like bthread_mutex_t
+// and bthread_sem_t.
class ContentionProfiler;
extern ContentionProfiler* g_cp;
extern bvar::CollectorSpeedLimit g_cp_sl;
-extern bool is_contention_site_valid(const bthread_contention_site_t& cs);
-extern void make_contention_site_invalid(bthread_contention_site_t* cs);
extern void submit_contention(const bthread_contention_site_t& csite, int64_t
now_ns);
-// It is enough for readers. If the reader exceeds this value,
-// need to use `int64_t' instead of `int'.
-const int RWLockMaxReaders = 1 << 30;
-
-// For reading.
-static int rwlock_rdlock_impl(bthread_rwlock_t* __restrict rwlock,
- const struct timespec* __restrict abstime) {
- int reader_count = ((butil::atomic<int>*)&rwlock->reader_count)
- ->fetch_add(1, butil::memory_order_acquire) + 1;
- // Fast path.
- if (reader_count >= 0) {
- CHECK_LT(reader_count, RWLockMaxReaders);
- return 0;
- }
-
- // Slow path.
+// Lazily arm sampling on first contention. Caller must declare
+// `size_t sampling_range' and `int64_t start_ns' in scope:
+// start_ns == 0 -> not yet decided
+// start_ns == -1 -> decided NOT to sample (profiler off / not selected)
+// start_ns > 0 -> sampling armed; value is the wall-clock start time
+#define BTHREAD_RWLOCK_MAYBE_START_SAMPLING
\
+ do {
\
+ if (start_ns == 0) {
\
+ if (BAIDU_UNLIKELY(g_cp != NULL)) {
\
+ sampling_range = bvar::is_collectable(&g_cp_sl);
\
+ start_ns = bvar::is_sampling_range_valid(sampling_range) ?
\
+ butil::cpuwide_time_ns() : -1;
\
+ } else {
\
+ start_ns = -1;
\
+ }
\
+ }
\
+ } while (0)
- // Don't sample when contention profiler is off.
- if (NULL == bthread::g_cp) {
- return bthread_sem_timedwait(&rwlock->reader_sema, abstime);
- }
- // Ask Collector if this (contended) locking should be sampled.
- const size_t sampling_range = bvar::is_collectable(&bthread::g_cp_sl);
- if (!bvar::is_sampling_range_valid(sampling_range)) { // Don't sample.
- return bthread_sem_timedwait(&rwlock->reader_sema, abstime);
+// Submit one contention sample if sampling was armed for this attempt.
+// `start_ns > 0' is the convention used everywhere in this file to indicate
+// that BTHREAD_RWLOCK_MAYBE_START_SAMPLING actually decided to sample.
+// No-op otherwise. Force-inlined so the uncontended fast path stays cheap.
+static BUTIL_FORCE_INLINE void submit_contention_if_sampled(
+ int64_t start_ns, size_t sampling_range) {
+ if (BAIDU_UNLIKELY(start_ns > 0)) {
+ const int64_t end_ns = butil::cpuwide_time_ns();
+ const bthread_contention_site_t csite{end_ns - start_ns,
sampling_range};
+ submit_contention(csite, end_ns);
}
-
- // Sample.
- const int64_t start_ns = butil::cpuwide_time_ns();
- int rc = bthread_sem_timedwait(&rwlock->reader_sema, abstime);
- const int64_t end_ns = butil::cpuwide_time_ns();
- const bthread_contention_site_t csite{end_ns - start_ns, sampling_range};
- // Submit `csite' for each reader immediately after
- // owning rdlock to avoid the contention of `csite'.
- bthread::submit_contention(csite, end_ns);
-
- return rc;
-}
-
-static inline int rwlock_rdlock(bthread_rwlock_t* rwlock) {
- return rwlock_rdlock_impl(rwlock, NULL);
}
-static inline int rwlock_timedrdlock(bthread_rwlock_t* __restrict rwlock,
- const struct timespec* __restrict
abstime) {
- return rwlock_rdlock_impl(rwlock, abstime);
-}
+// bthread RWLock
+// writer-priority implementation overview
+// Three synchronization fields are used:
+//
+// * `lock_word' (32-bit butex):
+// bit 31 : 1 if the write lock is held, 0 otherwise.
+// bit 0~30: number of readers currently holding the read lock.
+// Mutually exclusive: when bit 31 is set, the lower 31 bits are 0.
+//
+// * `writer_wait_count' (32-bit butex):
+// Number of writers that have entered wrlock() but not yet finished
+// (i.e. currently waiting for the mutex / waiting for lock_word==0 /
+// holding the write lock). Each writer accounts for itself: it is
+// incremented at the very beginning of wrlock() and decremented at
+// the very end of unwrlock()/cleanup().
+// Readers consult this field to implement writer-priority: if any
+// writer is "in flight", new readers yield by waiting on it.
+//
+// * `writer_queue_mutex' (bthread_mutex_t):
+// Serializes writers so that at most one writer races for `lock_word'
+// at any time. Other writers queue up on this mutex.
+//
+// Wakeup channels:
+// * Readers waiting on writers -> wait on writer_wait_count, woken by
unwrlock/cleanup
+// * Writers waiting on readers -> wait on lock_word, woken by unrdlock
+// * Writers waiting on writers -> wait on writer_queue_mutex
+
+static int rwlock_rdlock(bthread_rwlock_t* rwlock, bool try_lock,
+ const struct timespec* abstime) {
+ auto lock_word = (butil::atomic<unsigned>*)rwlock->lock_word;
+ auto writer_wait_count =
(butil::atomic<unsigned>*)rwlock->writer_wait_count;
+
+ // Sampling state for the contention profiler (lazily armed on first
+ // contention so that the uncontended fast path stays cheap):
+ // start_ns == 0 -> not yet decided
+ // start_ns == -1 -> decided NOT to sample
+ // start_ns > 0 -> sampling armed; submit on exit
+ // Each reader samples independently and submits once on its own way out;
+ // we deliberately do NOT use rwlock->writer_csite here because that field
+ // is exclusively owned by the writer.
+ size_t sampling_range = bvar::INVALID_SAMPLING_RANGE;
+ int64_t start_ns = 0;
+ int rc = 0;
-// Returns 0 if the lock was acquired, otherwise errno.
-static inline int rwlock_tryrdlock(bthread_rwlock_t* rwlock) {
while (true) {
- int reader_count = ((butil::atomic<int>*)&rwlock->reader_count)
- ->load(butil::memory_order_relaxed);
- if (reader_count < 0) {
- // Failed to acquire the read lock because there is a writer.
- return EBUSY;
- }
- if (((butil::atomic<int>*)&rwlock->reader_count)
- ->compare_exchange_weak(reader_count, reader_count + 1,
- butil::memory_order_acquire,
- butil::memory_order_relaxed)) {
- return 0;
+ // Writer-priority: if any writer is in flight, yield to it.
+ // `relaxed' is sufficient here because:
+ // - There is no data published via writer_wait_count;
+ // data visibility is established via the acquire-CAS on
+ // `lock_word' below paired with the release-CAS in unwrlock().
+ // - butex_wait() will re-check the expected value before sleeping,
+ // so we cannot lose a wakeup even if `w' is slightly stale.
+ unsigned w = writer_wait_count->load(butil::memory_order_relaxed);
+ if (w > 0) {
+ if (try_lock) {
+ // Don't sample tryrdlock failures: they are by design a
+ // non-blocking probe, not a contention event.
+ return EBUSY;
+ }
+ // We are about to block on writer_wait_count; arm sampling
+ // before parking so the wait time is included in the report.
+ BTHREAD_RWLOCK_MAYBE_START_SAMPLING;
+ if (butex_wait(writer_wait_count, w, abstime) < 0 &&
+ errno != EWOULDBLOCK && errno != EINTR) {
+ rc = errno;
+ break;
+ }
+ continue;
}
- }
-}
-static inline int rwlock_unrdlock(bthread_rwlock_t* rwlock) {
- int reader_count = ((butil::atomic<int>*)&rwlock->reader_count)
- ->fetch_add(-1, butil::memory_order_relaxed) - 1;
- // Fast path.
- if (reader_count >= 0) {
- return 0;
+ // No writer in flight: try to add ourselves to the reader count.
+ // 2^31 - 1 readers should be enough for any realistic workload.
+ unsigned l = lock_word->load(butil::memory_order_relaxed);
+ if ((l >> 31) == 0) {
+ // Refuse to increment when the reader count has saturated
+ // the low 31 bits. Otherwise `l + 1' would flip bit 31 and
+ // we would corrupt lock_word into "writer held" state.
+ // POSIX-style: report EAGAIN ("max read locks exceeded").
+ if (BAIDU_UNLIKELY(l == 0x7FFFFFFFu)) {
+ LOG(ERROR) << "Too many readers on bthread_rwlock_t=" <<
rwlock;
+ rc = EAGAIN;
+ break;
+ }
+ // Acquire on success synchronizes-with the release-CAS in
+ // unwrlock(), so any data written by the previous writer is
+ // visible to us before we start reading.
+ if (lock_word->compare_exchange_weak(l, l + 1,
+ butil::memory_order_acquire,
+ butil::memory_order_relaxed))
{
+ rc = 0;
+ break;
+ }
+ // CAS failed (likely another reader bumped r): retry.
+ } else if (try_lock) {
+ // Write lock is currently held.
+ return EBUSY;
+ } else {
+ // Write lock currently held but not yet self-accounted as a
+ // pending writer (very narrow window inside wrlock). Arm
+ // sampling now so the spin/wait until writer_wait_count >= 1
+ // is also accounted for.
+ BTHREAD_RWLOCK_MAYBE_START_SAMPLING;
+ }
+ // Otherwise (write lock held but not try_lock): spin once more.
+ // The next iteration will observe writer_wait_count >= 1 (writers
+ // self-account in writer_wait_count for the entire wrlock lifetime),
+ // and we will block on it instead of busy spinning.
}
- // Slow path.
- if (BAIDU_UNLIKELY(reader_count + 1 == 0 || reader_count + 1 ==
-RWLockMaxReaders)) {
- CHECK(false) << "rwlock_unrdlock of unlocked rwlock";
- return EINVAL;
- }
+ // Submit one contention sample for this reader (success or failure).
+ submit_contention_if_sampled(start_ns, sampling_range);
+ return rc;
+}
- // A writer is pending.
- int reader_wait = ((butil::atomic<int>*)&rwlock->reader_wait)
- ->fetch_add(-1, butil::memory_order_relaxed) - 1;
- if (reader_wait != 0) {
+static int rwlock_unrdlock(bthread_rwlock_t* rwlock) {
+ auto lock_word = (butil::atomic<unsigned>*)rwlock->lock_word;
+ while (true) {
+ unsigned l = lock_word->load(butil::memory_order_relaxed);
+ // Misuse detection: the caller must currently hold a read lock.
+ // l == 0 -> no lock is held (double unlock?)
+ // (l >> 31) != 0 -> write lock is held, not read lock
+ if (l == 0 || (l >> 31) != 0) {
+ LOG(ERROR) << "Invalid unrdlock on bthread_rwlock_t=" << rwlock
+ << ", lock_word=" << l;
+ return EINVAL;
+ }
+ // Release on success publishes any reads/writes done while holding
+ // the read lock to the next acquirer (typically a writer's
+ // acquire-CAS in wrlock()).
+ if(!(lock_word->compare_exchange_weak(l, l - 1,
+ butil::memory_order_release,
+ butil::memory_order_relaxed))) {
+ continue;
+ }
+ // We were the last reader (lock_word transitioned 1 -> 0). Wake the
+ // single writer (if any) that may be sleeping on `lock_word' inside
+ // wrlock(). At most one writer can be there because writers are
+ // serialized by writer_queue_mutex.
+ // No-op if nobody is waiting; butex_wake() short-circuits cheaply.
+ if (l == 1) {
+ butex_wake(lock_word);
+ }
return 0;
}
+}
- // The last reader unblocks the writer.
-
- if (NULL == bthread::g_cp) {
- bthread_sem_post(&rwlock->writer_sema);
- return 0;
+// Roll back the side effects of a failed wrlock attempt:
+// - Release writer_queue_mutex if we managed to acquire it.
+// - Decrement our share of writer_wait_count.
+// - If we were the last in-flight writer, wake all readers that have
+// been parked by writer-priority (w == 1 means writer_wait_count is now
0).
+// Called on EBUSY (try_lock failed), ETIMEDOUT, EINTR-leading-to-fail.
+static BUTIL_FORCE_INLINE void rwlock_wrlock_cleanup(bthread_rwlock_t* rwlock,
bool write_queue_locked) {
+ if (write_queue_locked) {
+ bthread_mutex_unlock(&rwlock->writer_queue_mutex);
}
- // Ask Collector if this (contended) locking should be sampled.
- const size_t sampling_range = bvar::is_collectable(&bthread::g_cp_sl);
- if (!sampling_range) { // Don't sample
- bthread_sem_post(&rwlock->writer_sema);
- return 0;
+ auto writer_wait_count =
(butil::atomic<unsigned>*)rwlock->writer_wait_count;
+ // Withdraw our writer-priority "vote" so readers can make progress.
+ auto w = writer_wait_count->fetch_sub(1, butil::memory_order_relaxed);
+ // w is the value BEFORE the subtraction, so w == 1 means we were the
+ // last writer in flight; wake every reader parked on writer_wait_count.
+ if (w == 1) {
+ butex_wake_all(writer_wait_count);
}
-
- // Sampling.
- const int64_t start_ns = butil::cpuwide_time_ns();
- bthread_sem_post(&rwlock->writer_sema);
- const int64_t end_ns = butil::cpuwide_time_ns();
- const bthread_contention_site_t csite{end_ns - start_ns, sampling_range};
- // Submit `csite' for each reader immediately after
- // releasing rdlock to avoid the contention of `csite'.
- bthread::submit_contention(csite, end_ns);
- return 0;
}
-#define DO_CSITE_IF_NEED
\
- do {
\
- /* Don't sample when contention profiler is off. */
\
- if (NULL != bthread::g_cp) {
\
- /* Ask Collector if this (contended) locking should be sampled. */
\
- sampling_range = bvar::is_collectable(&bthread::g_cp_sl);
\
- start_ns = bvar::is_sampling_range_valid(sampling_range) ?
\
- butil::cpuwide_time_ns() : -1;
\
- } else {
\
- start_ns = -1;
\
- }
\
- } while (0)
-
-#define SUBMIT_CSITE_IF_NEED
\
- do {
\
- if (ETIMEDOUT == rc && start_ns > 0) {
\
- /* Failed to lock due to ETIMEDOUT, submit the elapse directly. */
\
- const int64_t end_ns = butil::cpuwide_time_ns();
\
- const bthread_contention_site_t csite{end_ns - start_ns,
sampling_range}; \
- bthread::submit_contention(csite, end_ns);
\
- }
\
- } while (0)
-
-// For writing.
-static inline int rwlock_wrlock_impl(bthread_rwlock_t* __restrict rwlock,
- const struct timespec* __restrict
abstime) {
- // First, resolve competition with other writers.
- int rc = bthread_mutex_trylock(&rwlock->write_queue_mutex);
+static int rwlock_wrlock(bthread_rwlock_t* rwlock, bool try_lock,
+ const struct timespec* abstime) {
+ auto writer_wait_count =
(butil::atomic<unsigned>*)rwlock->writer_wait_count;
+ // Step 1: announce ourselves before doing anything else, so that
+ // concurrent readers immediately observe writer-priority and back off.
+ // This MUST happen before we try to acquire writer_queue_mutex,
+ // otherwise a flood of readers could starve us indefinitely.
+ // 2^31 in-flight writers should be enough for any realistic workload.
+ writer_wait_count->fetch_add(1, butil::memory_order_relaxed);
+
+ // Sampling state for the contention profiler. Both wrlock() and
+ // unwrlock() sample independently: wrlock() submits its own wait time
+ // on the way out (success or failure); unwrlock() samples its own
+ // CAS-spin / mutex_unlock / butex_wake_all latency separately. We do
+ // NOT use rwlock->writer_csite here -- the two operations are not
+ // forced to share a single sample.
size_t sampling_range = bvar::INVALID_SAMPLING_RANGE;
- // -1: don't sample.
- // 0: default value.
- // > 0: Start time of sampling.
int64_t start_ns = 0;
- if (0 != rc) {
- DO_CSITE_IF_NEED;
- rc = bthread_mutex_timedlock(&rwlock->write_queue_mutex, abstime);
+ // Step 2: serialize with other writers. At most one writer holds
+ // `writer_queue_mutex' at a time and races for `lock_word'.
+ int rc = bthread_mutex_trylock(&rwlock->writer_queue_mutex);
+ if (0 != rc) {
+ if (try_lock) {
+ // Fail to acquire the wrlock. Don't sample trywrlock failures:
+ // they are by design a non-blocking probe, not a contention event.
+ rwlock_wrlock_cleanup(rwlock, false);
+ return rc;
+ }
+ // We are about to block on writer_queue_mutex; arm sampling.
+ // Note: the inner mutex itself has csite disabled (see init), so
+ // its blocking time is only counted once -- here, by the rwlock.
+ BTHREAD_RWLOCK_MAYBE_START_SAMPLING;
+ rc = bthread_mutex_timedlock(&rwlock->writer_queue_mutex, abstime);
if (0 != rc) {
- SUBMIT_CSITE_IF_NEED;
+ // Fail to acquire the wrlock. Submit the elapsed wait time
+ // directly (no unwrlock() will run for this writer).
+ submit_contention_if_sampled(start_ns, sampling_range);
+ rwlock_wrlock_cleanup(rwlock, false);
return rc;
}
}
- // Announce to readers there is a pending writer.
- int reader_count = ((butil::atomic<int>*)&rwlock->reader_count)
- ->fetch_add(-RWLockMaxReaders, butil::memory_order_release);
- // Wait for active readers.
- if (reader_count != 0 &&
- ((butil::atomic<int>*)&rwlock->reader_wait)
- ->fetch_add(reader_count) + reader_count != 0) {
- rc = bthread_sem_trywait(&rwlock->writer_sema);
- if (0 != rc) {
- if (0 == start_ns) {
- DO_CSITE_IF_NEED;
+ // Step 3: with `writer_queue_mutex' held, wait for all readers to drain
+ // and then claim the write bit of `lock_word'.
+ auto lock_word = (butil::atomic<unsigned>*)rwlock->lock_word;
+ while (true) {
+ unsigned l = lock_word->load(butil::memory_order_relaxed);
+ if (l != 0) {
+ // Readers still hold the lock. Park on `lock_word' until the last
+ // reader releases (unrdlock will butex_wake on transition 1->0).
+ if (try_lock) {
+ errno = EBUSY;
+ break;
}
-
- rc = bthread_sem_timedwait(&rwlock->writer_sema, abstime);
- if (0 != rc) {
- SUBMIT_CSITE_IF_NEED;
- bthread_mutex_unlock(&rwlock->write_queue_mutex);
- return rc;
+ // Arm sampling before parking so the wait-for-readers time is
+ // counted (in case the queue_mutex acquisition above was
uncontended).
+ BTHREAD_RWLOCK_MAYBE_START_SAMPLING;
+ // Use the freshly read `r' as expected; if lock_word changes
+ // before we sleep, butex_wait returns EWOULDBLOCK and we retry.
+ if (butex_wait(lock_word, l, abstime) < 0 &&
+ errno != EWOULDBLOCK && errno != EINTR) {
+ break;
}
+ continue;
}
+ // Acquire on success synchronizes-with release-CAS in
+ // unrdlock()/unwrlock(): we will see all data published by the
+ // previous reader/writer before we start writing.
+ if (lock_word->compare_exchange_weak(l, (unsigned)(1 << 31),
+ butil::memory_order_acquire,
+ butil::memory_order_relaxed)) {
+ // Submit the writer's wait sample immediately on success.
+ // unwrlock() will sample its own latency separately.
+ submit_contention_if_sampled(start_ns, sampling_range);
+ return 0;
+ }
+ // CAS may spuriously fail (weak); retry without sleeping.
}
- if (start_ns > 0) {
- rwlock->writer_csite.duration_ns = butil::cpuwide_time_ns() - start_ns;
- rwlock->writer_csite.sampling_range = sampling_range;
- }
- rwlock->wlock_flag = true;
- return 0;
-}
-#undef DO_CSITE_IF_NEED
-#undef SUBMIT_CSITE_IF_NEED
-static inline int rwlock_wrlock(bthread_rwlock_t* rwlock) {
- return rwlock_wrlock_impl(rwlock, NULL);
+ // Failure path: snapshot errno before cleanup, because
+ // bthread_mutex_unlock / butex_wake_all inside cleanup may invoke
+ // syscalls or yield and clobber errno on this thread.
+ int saved_errno = errno;
+ // Submit the elapsed wait directly; we never reached unwrlock().
+ submit_contention_if_sampled(start_ns, sampling_range);
+ rwlock_wrlock_cleanup(rwlock, true);
+ return saved_errno;
}
-static inline int rwlock_timedwrlock(bthread_rwlock_t* __restrict rwlock,
- const struct timespec* __restrict
abstime) {
- return rwlock_wrlock_impl(rwlock, abstime);
-}
-
-static inline int rwlock_trywrlock(bthread_rwlock_t* rwlock) {
- int rc = bthread_mutex_trylock(&rwlock->write_queue_mutex);
- if (0 != rc) {
- return rc;
- }
-
- int expected = 0;
- if (!((butil::atomic<int>*)&rwlock->reader_count)
- ->compare_exchange_strong(expected, -RWLockMaxReaders,
- butil::memory_order_acquire,
- butil::memory_order_relaxed)) {
- // Failed to acquire the write lock because there are active readers.
- bthread_mutex_unlock(&rwlock->write_queue_mutex);
- return EBUSY;
- }
- rwlock->wlock_flag = true;
-
- return 0;
-}
+static int rwlock_unwrlock(bthread_rwlock_t* rwlock) {
+ auto lock_word = (butil::atomic<unsigned>*)rwlock->lock_word;
+ auto writer_wait_count =
(butil::atomic<unsigned>*)rwlock->writer_wait_count;
-static inline void rwlock_unwrlock_slow(bthread_rwlock_t* rwlock, int
reader_count) {
- bthread_sem_post_n(&rwlock->reader_sema, reader_count);
- // Allow other writers to proceed.
- bthread_mutex_unlock(&rwlock->write_queue_mutex);
-}
-
-static inline int rwlock_unwrlock(bthread_rwlock_t* rwlock) {
- rwlock->wlock_flag = false;
+ // Sampling state for the contention profiler. unwrlock() samples
+ // independently of wrlock(): although the release-CAS itself cannot
+ // fail due to writer-writer contention (writers are serialized by
+ // writer_queue_mutex), the body still does mutex_unlock(),
+ // butex_wake_all() and may spuriously spin on the weak CAS, all of
+ // which contribute to the critical-section tail latency.
+ size_t sampling_range = bvar::INVALID_SAMPLING_RANGE;
+ int64_t start_ns = 0;
+ BTHREAD_RWLOCK_MAYBE_START_SAMPLING;
- // Announce to readers there is no active writer.
- int reader_count = ((butil::atomic<int>*)&rwlock->reader_count)->fetch_add(
- RWLockMaxReaders, butil::memory_order_release) + RWLockMaxReaders;
- if (BAIDU_UNLIKELY(reader_count >= RWLockMaxReaders)) {
- CHECK(false) << "rwlock_unwlock of unlocked rwlock";
- return EINVAL;
- }
+ while (true) {
+ unsigned l = lock_word->load(butil::memory_order_relaxed);
+ // Misuse detection: we must currently hold the write lock.
+ if (BAIDU_UNLIKELY(l != (unsigned)(1 << 31))) {
+ LOG(ERROR) << "Invalid unwrlock!";
+ return EINVAL;
+ }
+ // Release-CAS publishes all writes performed under the write lock
+ // to the next acquirer (a reader's acquire-CAS or another writer's
+ // acquire-CAS). The CAS itself cannot fail due to contention since
+ // writers are serialized by writer_queue_mutex; weak failure here is
+ // only a spurious CAS failure -- just retry.
+ if (!lock_word->compare_exchange_weak(l, 0,
+ butil::memory_order_release,
+ butil::memory_order_relaxed)) {
+ continue;
+ }
- bool is_valid = bthread::is_contention_site_valid(rwlock->writer_csite);
- if (BAIDU_UNLIKELY(is_valid)) {
- bthread_contention_site_t saved_csite = rwlock->writer_csite;
- bthread::make_contention_site_invalid(&rwlock->writer_csite);
+ // ---- Order of the next two operations is INTENTIONAL ----
+ //
+ // We deliberately:
+ // (1) unlock writer_queue_mutex FIRST, then
+ // (2) fetch_sub(writer_wait_count) and conditionally wake readers.
+ //
+ // Rationale (writer-priority semantics):
+ // * Any writer queued on writer_queue_mutex has already
+ // fetch_add'ed its share into writer_wait_count back in wrlock()
+ // (before it even tried to lock the mutex). So when it wakes
+ // up here and we later fetch_sub, the counter still reflects
+ // "there is at least one more writer in flight": w_old >= 2,
+ // which means w != 1, which means we will NOT wake readers.
+ // Readers must keep yielding to the next writer -- exactly the
+ // writer-priority invariant.
+ // * Only when we are truly the last writer in flight (w_old == 1
+ // after our fetch_sub, i.e. writer_wait_count is now 0) do we
+ // wake_all readers parked on writer_wait_count.
+ //
+ // Subtle but harmless effect:
+ // Between (1) and (2) there is a small window in which our
+ // own "ghost share" is still counted in writer_wait_count even
though
+ // we have effectively left. New readers entering rdlock() during
+ // this window will see writer_wait_count >= 1 and park on it; they
+ // will be woken either by step (2) below (if no successor writer
+ // appeared) or by the successor writer's eventual unwrlock.
+ // No wakeup is ever lost: butex_wait re-checks the expected
+ // value before truly sleeping, and any successor writer will
+ // itself execute this same wake logic on its way out.
+ //
+ // Reversing the order (fetch_sub before unlock mutex) would break
+ // strict writer-priority because woken readers could grab the
+ // read lock before a successor writer queued on the mutex even
+ // gets a chance to CAS lock_word.
+ bthread_mutex_unlock(&rwlock->writer_queue_mutex);
+ unsigned w = writer_wait_count->fetch_sub(1,
butil::memory_order_relaxed);
+ if (w == 1) {
+ butex_wake_all(writer_wait_count);
+ }
- const int64_t unlock_start_ns = butil::cpuwide_time_ns();
- rwlock_unwrlock_slow(rwlock, reader_count);
- const int64_t unlock_end_ns = butil::cpuwide_time_ns();
- saved_csite.duration_ns += unlock_end_ns - unlock_start_ns;
- bthread::submit_contention(saved_csite, unlock_end_ns);
- } else {
- rwlock_unwrlock_slow(rwlock, reader_count);
+ // Submit our own unwrlock-side sample (CAS spin + mutex_unlock +
+ // butex_wake_all). This is independent of the wrlock-side sample.
+ submit_contention_if_sampled(start_ns, sampling_range);
+ return 0;
}
-
- return 0;
}
-static inline int rwlock_unlock(bthread_rwlock_t* rwlock) {
- if (rwlock->wlock_flag) {
+// Generic unlock entry that dispatches to unwrlock/unrdlock by inspecting
+// `lock_word'. This is safe ONLY because the caller must already hold one of
+// the two locks: while holding a read lock the high bit of `lock_word' cannot
+// flip on, and while holding the write lock the low bits cannot be set.
+// Therefore a relaxed load is sufficient to make the dispatch decision.
+static int rwlock_unlock(bthread_rwlock_t* rwlock) {
+ auto lock_word = (butil::atomic<unsigned>*)rwlock->lock_word;
+ unsigned r = lock_word->load(butil::memory_order_relaxed);
+ if ((r >> 31) != 0) {
return rwlock_unwrlock(rwlock);
} else {
return rwlock_unrdlock(rwlock);
}
}
-} // namespace bthread
-
-__BEGIN_DECLS
-
-int bthread_rwlock_init(bthread_rwlock_t* __restrict rwlock,
- const bthread_rwlockattr_t* __restrict) {
- int rc = bthread_sem_init(&rwlock->reader_sema, 0);
- if (BAIDU_UNLIKELY(0 != rc)) {
- return rc;
+// Deleter that turns butex_create_checked()'s raw pointer into something
+// std::unique_ptr can clean up automatically. Using RAII here lets the
+// init-error paths just `return rc' without manually unwinding partial
+// allocations; ownership is `release()'d only on the all-success path.
+struct ButexDeleter {
+ void operator()(void* butex) const {
+ if (butex != NULL) {
+ butex_destroy(butex);
+ }
}
- bthread_sem_disable_csite(&rwlock->reader_sema);
- rc = bthread_sem_init(&rwlock->writer_sema, 0);
- if (BAIDU_UNLIKELY(0 != rc)) {
- bthread_sem_destroy(&rwlock->reader_sema);
- return rc;
+};
+
+static int rwlock_init(bthread_rwlock_t* rwlock) {
+ std::unique_ptr<unsigned, ButexDeleter> writer_wait_count(
+ butex_create_checked<unsigned>());
+ if (writer_wait_count == NULL) {
+ LOG(ERROR) << "Fail to create writer_wait_count butex: out of memory";
+ return ENOMEM;
}
- bthread_sem_disable_csite(&rwlock->writer_sema);
-
- rwlock->reader_count = 0;
- rwlock->reader_wait = 0;
- rwlock->wlock_flag = false;
+ std::unique_ptr<unsigned, ButexDeleter>
lock_word(butex_create_checked<unsigned>());
+ if (lock_word == NULL) {
+ LOG(ERROR) << "Fail to create lock_word butex: out of memory";
+ return ENOMEM;
+ }
+ *writer_wait_count = 0;
+ *lock_word = 0;
bthread_mutexattr_t attr;
bthread_mutexattr_init(&attr);
+ BRPC_SCOPE_EXIT { bthread_mutexattr_destroy(&attr); };
+ // Disable csite on the inner queue mutex so the writer's wait time is
+ // accounted exactly once -- by the rwlock layer, not double-counted via
+ // the inner mutex.
bthread_mutexattr_disable_csite(&attr);
- rc = bthread_mutex_init(&rwlock->write_queue_mutex, &attr);
- if (BAIDU_UNLIKELY(0 != rc)) {
- bthread_sem_destroy(&rwlock->reader_sema);
- bthread_sem_destroy(&rwlock->writer_sema);
+ const int rc = bthread_mutex_init(&rwlock->writer_queue_mutex, &attr);
+ if (rc != 0) {
+ LOG(ERROR) << "Fail to init writer_queue_mutex, rc=" << rc;
return rc;
}
- bthread_mutexattr_destroy(&attr);
-
- bthread::make_contention_site_invalid(&rwlock->writer_csite);
+ // All resources successfully created; transfer butex ownership to
+ // rwlock. From here on, bthread_rwlock_destroy() is responsible for
+ // releasing them.
+ rwlock->writer_wait_count = writer_wait_count.release();
+ rwlock->lock_word = lock_word.release();
return 0;
}
+static int rwlock_destroy(bthread_rwlock_t* rwlock) {
+ // Destroy the inner mutex first; bthread_mutex_init() allocates an
+ // internal butex which would otherwise leak. Pointers are nulled to
+ // surface accidental double-destroy / use-after-destroy bugs early.
+ int rc = bthread_mutex_destroy(&rwlock->writer_queue_mutex);
+ if (rc != 0) {
+ LOG(ERROR) << "Fail to destroy writer_queue_mutex, rc=" << rc;
+ }
+ if (rwlock->writer_wait_count != NULL) {
+ butex_destroy(rwlock->writer_wait_count);
+ rwlock->writer_wait_count = NULL;
+ }
+ if (rwlock->lock_word != NULL) {
+ butex_destroy(rwlock->lock_word);
+ rwlock->lock_word = NULL;
+ }
+ return rc;
+}
+
+} // namespace bthread
+
+__BEGIN_DECLS
+
+int bthread_rwlock_init(bthread_rwlock_t* __restrict rwlock,
+ const bthread_rwlockattr_t* __restrict) {
+ return bthread::rwlock_init(rwlock);
+}
+
int bthread_rwlock_destroy(bthread_rwlock_t* rwlock) {
- bthread_sem_destroy(&rwlock->reader_sema);
- bthread_sem_destroy(&rwlock->writer_sema);
- bthread_mutex_destroy(&rwlock->write_queue_mutex);
- return 0;
+ return bthread::rwlock_destroy(rwlock);
}
int bthread_rwlock_rdlock(bthread_rwlock_t* rwlock) {
- return bthread::rwlock_rdlock(rwlock);
+ return bthread::rwlock_rdlock(rwlock, false, NULL);
}
int bthread_rwlock_tryrdlock(bthread_rwlock_t* rwlock) {
- return bthread::rwlock_tryrdlock(rwlock);
+ return bthread::rwlock_rdlock(rwlock, true, NULL);
}
int bthread_rwlock_timedrdlock(bthread_rwlock_t* __restrict rwlock,
const struct timespec* __restrict abstime) {
- return bthread::rwlock_timedrdlock(rwlock, abstime);
+ return bthread::rwlock_rdlock(rwlock, false, abstime);
}
int bthread_rwlock_wrlock(bthread_rwlock_t* rwlock) {
- return bthread::rwlock_wrlock(rwlock);
+ return bthread::rwlock_wrlock(rwlock, false, NULL);
}
int bthread_rwlock_trywrlock(bthread_rwlock_t* rwlock) {
- return bthread::rwlock_trywrlock(rwlock);
+ return bthread::rwlock_wrlock(rwlock, true, NULL);
}
int bthread_rwlock_timedwrlock(bthread_rwlock_t* __restrict rwlock,
const struct timespec* __restrict abstime) {
- return bthread::rwlock_timedwrlock(rwlock, abstime);
+ return bthread::rwlock_wrlock(rwlock, false, abstime);
}
int bthread_rwlock_unlock(bthread_rwlock_t* rwlock) {
diff --git a/src/bthread/types.h b/src/bthread/types.h
index 86148c93..d46de1e8 100644
--- a/src/bthread/types.h
+++ b/src/bthread/types.h
@@ -225,16 +225,26 @@ typedef struct bthread_sem_t {
typedef struct bthread_rwlock_t {
#if defined(__cplusplus)
bthread_rwlock_t()
- : reader_count(0), reader_wait(0), wlock_flag(false), writer_csite{} {}
+ : writer_wait_count(0), lock_word(NULL) {}
DISALLOW_COPY_AND_ASSIGN(bthread_rwlock_t);
#endif
- bthread_sem_t reader_sema; // Semaphore for readers to wait for completing
writers.
- bthread_sem_t writer_sema; // Semaphore for writers to wait for completing
readers.
- int reader_count; // Number of pending readers.
- int reader_wait; // Number of departing readers.
- bool wlock_flag; // Flag used to indicate that a write lock has been held.
- bthread_mutex_t write_queue_mutex; // Held if there are pending writers.
- bthread_contention_site_t writer_csite;
+ // Number of writers currently in flight (used as a butex):
+ // writers waiting on writer_queue_mutex, writers waiting for
+ // lock_word == 0, and the writer currently holding the write lock
+ // are all counted here. Each writer accounts for itself: incremented
+ // at the very beginning of wrlock() and decremented at the very end
+ // of unwrlock()/cleanup(). Readers consult this field to honor
+ // writer-priority: any non-zero value parks new readers.
+ unsigned* writer_wait_count;
+ // Serializes writers so that at most one writer at a time races for
+ // lock_word. Other writers queue up on this mutex.
+ bthread_mutex_t writer_queue_mutex;
+ // Bit-packed atomic lock word (used as a butex):
+ // bit 31 : 1 if the write lock is held, 0 otherwise.
+ // bit 0~30: number of readers currently holding the read lock.
+ // 0 : unlocked.
+ // The high bit and the low 31 bits are mutually exclusive.
+ unsigned* lock_word;
} bthread_rwlock_t;
typedef struct {
diff --git a/test/bthread_rwlock_unittest.cpp b/test/bthread_rwlock_unittest.cpp
index 2da226cb..9a88051c 100644
--- a/test/bthread_rwlock_unittest.cpp
+++ b/test/bthread_rwlock_unittest.cpp
@@ -17,6 +17,7 @@
#include <gtest/gtest.h>
#include "gperftools_helper.h"
+#include "butil/atomicops.h"
#include <bthread/rwlock.h>
namespace {
@@ -286,6 +287,253 @@ TEST(RWLockTest, mix_thread_types) {
ASSERT_EQ(0, bthread_rwlock_destroy(&rw));
}
+// Tests below verify the writer-priority semantics and the cleanup path
+// guarded by the design notes in bthread/rwlock.cpp.
+struct WriterPriorityArgs {
+ bthread_rwlock_t* rw;
+ butil::atomic<int>* order;
+ int my_order; // sequence number captured inside the critical section
+ int hold_us;
+};
+
+void* wp_writer_fn(void* arg) {
+ auto* a = (WriterPriorityArgs*)arg;
+ EXPECT_EQ(0, bthread_rwlock_wrlock(a->rw));
+ a->my_order = a->order->fetch_add(1, butil::memory_order_relaxed);
+ bthread_usleep(a->hold_us);
+ EXPECT_EQ(0, bthread_rwlock_unlock(a->rw));
+ return NULL;
+}
+
+void* wp_reader_fn(void* arg) {
+ auto* a = (WriterPriorityArgs*)arg;
+ EXPECT_EQ(0, bthread_rwlock_rdlock(a->rw));
+ a->my_order = a->order->fetch_add(1, butil::memory_order_relaxed);
+ bthread_usleep(a->hold_us);
+ EXPECT_EQ(0, bthread_rwlock_unlock(a->rw));
+ return NULL;
+}
+
+// Verifies the writer-priority invariant guarded by the order
+// "unlock writer_queue_mutex BEFORE fetch_sub(writer_wait_count)" in
+// rwlock_unwrlock(): once a writer is queued, any new reader arriving
+// later MUST yield to that writer.
+TEST(RWLockTest, writer_priority) {
+ bthread_setconcurrency(8);
+ bthread_rwlock_t rw;
+ ASSERT_EQ(0, bthread_rwlock_init(&rw, NULL));
+
+ // (1) Main thread holds the read lock first.
+ ASSERT_EQ(0, bthread_rwlock_rdlock(&rw));
+
+ butil::atomic<int> order(0);
+ WriterPriorityArgs warg {&rw, &order, -1, 5000};
+ WriterPriorityArgs r2arg {&rw, &order, -1, 0};
+
+ // (2) Start a writer; it should park inside wrlock() because the read
+ // lock is held. Sleep long enough for it to fetch_add into
+ // writer_wait_count and reach the butex_wait on `lock_word'.
+ bthread_t wth;
+ ASSERT_EQ(0, bthread_start_urgent(&wth, NULL, wp_writer_fn, &warg));
+ bthread_usleep(50 * 1000);
+
+ // (3) Now spawn a fresh reader. By writer-priority it MUST observe
+ // writer_wait_count > 0 and park on it (NOT join the active read
+ // lock).
+ bthread_t r2th;
+ ASSERT_EQ(0, bthread_start_urgent(&r2th, NULL, wp_reader_fn, &r2arg));
+ bthread_usleep(50 * 1000);
+
+ // (4) Release the original read lock. The writer should win the race
+ // and complete BEFORE the queued reader.
+ ASSERT_EQ(0, bthread_rwlock_unlock(&rw));
+
+ bthread_join(wth, NULL);
+ bthread_join(r2th, NULL);
+
+ EXPECT_GE(warg.my_order, 0);
+ EXPECT_GE(r2arg.my_order, 0);
+ EXPECT_LT(warg.my_order, r2arg.my_order)
+ << "Writer-priority violated: writer entered with order="
+ << warg.my_order << " but late reader entered with order="
+ << r2arg.my_order;
+
+ ASSERT_EQ(0, bthread_rwlock_destroy(&rw));
+}
+
+void* wp_timed_wrlock_short(void* arg) {
+ auto* rw = (bthread_rwlock_t*)arg;
+ timespec ts = butil::milliseconds_from_now(50);
+ EXPECT_EQ(ETIMEDOUT, bthread_rwlock_timedwrlock(rw, &ts));
+ return NULL;
+}
+
+// Verifies the cleanup path of rwlock_wrlock_cleanup(): after multiple
+// writers fail with ETIMEDOUT, writer_wait_count must be back to 0 so
+// that subsequent readers are not blocked by leftover "ghost shares".
+TEST(RWLockTest, wrlock_failure_does_not_leak_writer_count) {
+ bthread_setconcurrency(8);
+ bthread_rwlock_t rw;
+ ASSERT_EQ(0, bthread_rwlock_init(&rw, NULL));
+
+ // Hold the read lock so every wrlock attempt must block on `lock_word'.
+ ASSERT_EQ(0, bthread_rwlock_rdlock(&rw));
+
+ const int N = 8;
+ bthread_t wth[N];
+ for (int i = 0; i < N; ++i) {
+ ASSERT_EQ(0, bthread_start_urgent(&wth[i], NULL,
wp_timed_wrlock_short, &rw));
+ }
+ // Wait for all timed wrlock attempts to time out and run cleanup.
+ for (int i = 0; i < N; ++i) {
+ bthread_join(wth[i], NULL);
+ }
+
+ // Release the read lock; from this point on no writer is in flight,
+ // so a new reader MUST acquire the lock immediately.
+ ASSERT_EQ(0, bthread_rwlock_unlock(&rw));
+
+ timespec ts = butil::milliseconds_from_now(500);
+ butil::Timer t;
+ t.start();
+ ASSERT_EQ(0, bthread_rwlock_timedrdlock(&rw, &ts));
+ t.stop();
+ EXPECT_LT(t.m_elapsed(), 100)
+ << "Reader was blocked for " << t.m_elapsed() << "ms; "
+ << "writer_wait_count was likely leaked by the cleanup path.";
+
+ ASSERT_EQ(0, bthread_rwlock_unlock(&rw));
+ ASSERT_EQ(0, bthread_rwlock_destroy(&rw));
+}
+
+struct DataConsistencyArgs {
+ bthread_rwlock_t* rw;
+ int64_t* shared; // protected by rw
+ int64_t local_inc; // writer: number of increments this thread did
+ int64_t observed_max; // reader: max value observed
+ bool is_writer;
+};
+
+void* dc_worker(void* arg) {
+ auto* a = (DataConsistencyArgs*)arg;
+ while (!g_stopped) {
+ if (a->is_writer) {
+ EXPECT_EQ(0, bthread_rwlock_wrlock(a->rw));
+ ++(*a->shared);
+ ++a->local_inc;
+ EXPECT_EQ(0, bthread_rwlock_unlock(a->rw));
+ } else {
+ EXPECT_EQ(0, bthread_rwlock_rdlock(a->rw));
+ int64_t v = *a->shared;
+ if (v > a->observed_max) {
+ a->observed_max = v;
+ }
+ EXPECT_EQ(0, bthread_rwlock_unlock(a->rw));
+ }
+ }
+ return NULL;
+}
+
+// Verifies the release/acquire memory ordering pair on `lock_word'.
+// If the CAS in unwrlock()/unrdlock() weren't release-ordered, or the
+// CAS in rdlock()/wrlock() weren't acquire-ordered, writes done inside
+// the critical section could appear lost or inconsistent to other
+// threads, causing the final counter to disagree with total writer ops.
+TEST(RWLockTest, data_consistency) {
+ bthread_rwlock_t rw;
+ ASSERT_EQ(0, bthread_rwlock_init(&rw, NULL));
+
+ g_stopped = false;
+ const int W = 4;
+ const int R = 8;
+ bthread_setconcurrency(W + R + 4);
+
+ int64_t shared = 0;
+ std::vector<DataConsistencyArgs> args(W + R);
+ std::vector<bthread_t> threads(W + R);
+ for (int i = 0; i < W + R; ++i) {
+ args[i].rw = &rw;
+ args[i].shared = &shared;
+ args[i].local_inc = 0;
+ args[i].observed_max = -1;
+ args[i].is_writer = (i < W);
+ ASSERT_EQ(0, bthread_start_urgent(&threads[i], NULL, dc_worker,
&args[i]));
+ }
+
+ bthread_usleep(500 * 1000);
+ g_stopped = true;
+
+ int64_t total_inc = 0;
+ for (int i = 0; i < W + R; ++i) {
+ bthread_join(threads[i], NULL);
+ if (args[i].is_writer) {
+ total_inc += args[i].local_inc;
+ }
+ }
+
+ // No lost updates: every writer's increment is reflected in `shared'.
+ EXPECT_EQ(total_inc, shared)
+ << "Lost updates: total writer ops=" << total_inc
+ << " but shared counter=" << shared;
+ // No reader saw a value greater than the final counter.
+ for (int i = W; i < W + R; ++i) {
+ EXPECT_LE(args[i].observed_max, shared)
+ << "Reader " << i << " observed_max=" << args[i].observed_max
+ << " > final shared=" << shared;
+ }
+
+ ASSERT_EQ(0, bthread_rwlock_destroy(&rw));
+}
+
+void* ws_reader_loop(void* arg) {
+ auto* rw = (bthread_rwlock_t*)arg;
+ while (!g_stopped) {
+ EXPECT_EQ(0, bthread_rwlock_rdlock(rw));
+ // Hold the read lock briefly to keep the lock continuously busy.
+ bthread_usleep(100);
+ EXPECT_EQ(0, bthread_rwlock_unlock(rw));
+ }
+ return NULL;
+}
+
+// Verifies that under a continuous read load, a writer can still acquire
+// the lock in bounded time. This is the end-to-end guarantee of the
+// writer-priority strategy: any reader arriving AFTER the writer entered
+// wrlock() must yield, ensuring the writer never starves.
+TEST(RWLockTest, no_writer_starvation) {
+ bthread_rwlock_t rw;
+ ASSERT_EQ(0, bthread_rwlock_init(&rw, NULL));
+
+ g_stopped = false;
+ const int R = 16;
+ bthread_setconcurrency(R + 4);
+ bthread_t rth[R];
+ for (int i = 0; i < R; ++i) {
+ ASSERT_EQ(0, bthread_start_urgent(&rth[i], NULL, ws_reader_loop, &rw));
+ }
+
+ // Let the readers ramp up and saturate the lock.
+ bthread_usleep(50 * 1000);
+
+ // A single writer must succeed within a generous budget.
+ butil::Timer t;
+ t.start();
+ ASSERT_EQ(0, bthread_rwlock_wrlock(&rw));
+ t.stop();
+
+ EXPECT_LT(t.m_elapsed(), 1000)
+ << "Writer starved for " << t.m_elapsed() << "ms under "
+ << R << " concurrent readers; writer-priority is broken.";
+
+ ASSERT_EQ(0, bthread_rwlock_unlock(&rw));
+
+ g_stopped = true;
+ for (int i = 0; i < R; ++i) {
+ bthread_join(rth[i], NULL);
+ }
+ ASSERT_EQ(0, bthread_rwlock_destroy(&rw));
+}
+
struct BAIDU_CACHELINE_ALIGNMENT PerfArgs {
bthread_rwlock_t* rw;
int64_t counter;
@@ -386,13 +634,14 @@ void PerfTest(uint32_t writer_ratio, ThreadId* /*dummy*/,
int thread_num,
<< " writer_ratio=" << writer_ratio
<< " reader_num=" << reader_num
<< " read_count=" << read_count
- << " read_average_time=" << (read_count == 0 ? 0 :
read_wait_time / (double)read_count)
+ << " read_average_time=" << (read_count == 0 ? 0 :
read_wait_time / (double)read_count) << "ns"
<< " writer_num=" << writer_num
<< " write_count=" << write_count
- << " write_average_time=" << (write_count == 0 ? 0 :
write_wait_time / (double)write_count);
+ << " write_average_time=" << (write_count == 0 ? 0 :
write_wait_time / (double)write_count) << "ns";
}
TEST(RWLockTest, performance) {
+ bthread_setconcurrency(16);
const int thread_num = 12;
PerfTest(0, (pthread_t*)NULL, thread_num, pthread_create, pthread_join);
PerfTest(0, (bthread_t*)NULL, thread_num, bthread_start_background,
bthread_join);
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]