This is an automated email from the ASF dual-hosted git repository.

wwbmmm pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/brpc.git


The following commit(s) were added to refs/heads/master by this push:
     new 3775b419 Support FastPthreadMutex contention profiler && expose 
FastPthreadMutex to user (#2589)
3775b419 is described below

commit 3775b41918a0e8c27ea5ba9ba414f0eb2b84e283
Author: Bright Chen <[email protected]>
AuthorDate: Mon Jun 3 16:48:02 2024 +0800

    Support FastPthreadMutex contention profiler && expose FastPthreadMutex to 
user (#2589)
---
 src/bthread/butex.cpp           |  6 ++--
 src/bthread/id.cpp              |  2 +-
 src/bthread/mutex.cpp           | 80 +++++++++++++++++++++++++++++++++--------
 src/bthread/mutex.h             | 15 +++++++-
 src/bthread/timer_thread.cpp    |  2 +-
 src/bthread/timer_thread.h      |  2 +-
 test/bthread_mutex_unittest.cpp | 40 +++++++++++++++++++--
 7 files changed, 122 insertions(+), 25 deletions(-)

diff --git a/src/bthread/butex.cpp b/src/bthread/butex.cpp
index 1dbd8930..2b7c78b8 100644
--- a/src/bthread/butex.cpp
+++ b/src/bthread/butex.cpp
@@ -121,7 +121,7 @@ struct BAIDU_CACHELINE_ALIGNMENT Butex {
 
     butil::atomic<int> value;
     ButexWaiterList waiters;
-    internal::FastPthreadMutex waiter_lock;
+    FastPthreadMutex waiter_lock;
 };
 
 BAIDU_CASSERT(offsetof(Butex, value) == 0, offsetof_value_must_0);
@@ -460,8 +460,8 @@ int butex_requeue(void* arg, void* arg2) {
 
     ButexWaiter* front = NULL;
     {
-        std::unique_lock<internal::FastPthreadMutex> lck1(b->waiter_lock, 
std::defer_lock);
-        std::unique_lock<internal::FastPthreadMutex> lck2(m->waiter_lock, 
std::defer_lock);
+        std::unique_lock<FastPthreadMutex> lck1(b->waiter_lock, 
std::defer_lock);
+        std::unique_lock<FastPthreadMutex> lck2(m->waiter_lock, 
std::defer_lock);
         butil::double_lock(lck1, lck2);
         if (b->waiters.empty()) {
             return 0;
diff --git a/src/bthread/id.cpp b/src/bthread/id.cpp
index ba77580a..7aabed68 100644
--- a/src/bthread/id.cpp
+++ b/src/bthread/id.cpp
@@ -114,7 +114,7 @@ struct BAIDU_CACHELINE_ALIGNMENT Id {
     // contended_ver: locked and contended
     uint32_t first_ver;
     uint32_t locked_ver;
-    internal::FastPthreadMutex mutex;
+    FastPthreadMutex mutex;
     void* data;
     int (*on_error)(bthread_id_t, void*, int);
     int (*on_error2)(bthread_id_t, void*, int, const std::string&);
diff --git a/src/bthread/mutex.cpp b/src/bthread/mutex.cpp
index d22c8753..357452ee 100644
--- a/src/bthread/mutex.cpp
+++ b/src/bthread/mutex.cpp
@@ -448,7 +448,8 @@ int first_sys_pthread_mutex_unlock(pthread_mutex_t* mutex) {
     return sys_pthread_mutex_unlock(mutex);
 }
 
-inline uint64_t hash_mutex_ptr(const pthread_mutex_t* m) {
+template <typename Mutex>
+inline uint64_t hash_mutex_ptr(const Mutex* m) {
     return butil::fmix64((uint64_t)m);
 }
 
@@ -468,7 +469,7 @@ static __thread bool tls_inside_lock = false;
 #ifndef DONT_SPEEDUP_PTHREAD_CONTENTION_PROFILER_WITH_TLS
 const int TLS_MAX_COUNT = 3;
 struct MutexAndContentionSite {
-    pthread_mutex_t* mutex;
+    void* mutex;
     bthread_contention_site_t csite;
 };
 struct TLSPthreadContentionSites {
@@ -482,8 +483,9 @@ static __thread TLSPthreadContentionSites tls_csites = 
{0,0,{}};
 // Guaranteed in linux/win.
 const int PTR_BITS = 48;
 
+template <typename Mutex>
 inline bthread_contention_site_t*
-add_pthread_contention_site(pthread_mutex_t* mutex) {
+add_pthread_contention_site(const Mutex* mutex) {
     MutexMapEntry& entry = g_mutex_map[hash_mutex_ptr(mutex) & (MUTEX_MAP_SIZE 
- 1)];
     butil::static_atomic<uint64_t>& m = entry.versioned_mutex;
     uint64_t expected = m.load(butil::memory_order_relaxed);
@@ -500,8 +502,9 @@ add_pthread_contention_site(pthread_mutex_t* mutex) {
     return NULL;
 }
 
-inline bool remove_pthread_contention_site(
-    pthread_mutex_t* mutex, bthread_contention_site_t* saved_csite) {
+template <typename Mutex>
+inline bool remove_pthread_contention_site(const Mutex* mutex,
+                                           bthread_contention_site_t* 
saved_csite) {
     MutexMapEntry& entry = g_mutex_map[hash_mutex_ptr(mutex) & (MUTEX_MAP_SIZE 
- 1)];
     butil::static_atomic<uint64_t>& m = entry.versioned_mutex;
     if ((m.load(butil::memory_order_relaxed) & ((((uint64_t)1) << PTR_BITS) - 
1))
@@ -538,16 +541,44 @@ void submit_contention(const bthread_contention_site_t& 
csite, int64_t now_ns) {
     tls_inside_lock = false;
 }
 
-BUTIL_FORCE_INLINE int pthread_mutex_lock_impl(pthread_mutex_t* mutex) {
+namespace internal {
+BUTIL_FORCE_INLINE int pthread_mutex_lock_internal(pthread_mutex_t* mutex) {
+    return sys_pthread_mutex_lock(mutex);
+}
+
+BUTIL_FORCE_INLINE int pthread_mutex_trylock_internal(pthread_mutex_t* mutex) {
+    return ::pthread_mutex_trylock(mutex);
+}
+
+BUTIL_FORCE_INLINE int pthread_mutex_unlock_internal(pthread_mutex_t* mutex) {
+    return sys_pthread_mutex_unlock(mutex);
+}
+
+BUTIL_FORCE_INLINE int pthread_mutex_lock_internal(FastPthreadMutex* mutex) {
+    mutex->lock();
+    return 0;
+}
+
+BUTIL_FORCE_INLINE int pthread_mutex_trylock_internal(FastPthreadMutex* mutex) 
{
+    return mutex->try_lock() ? 0 : EBUSY;
+}
+
+BUTIL_FORCE_INLINE int pthread_mutex_unlock_internal(FastPthreadMutex* mutex) {
+    mutex->unlock();
+    return 0;
+}
+
+template <typename Mutex>
+BUTIL_FORCE_INLINE int pthread_mutex_lock_impl(Mutex* mutex) {
     // Don't change behavior of lock when profiler is off.
     if (!g_cp ||
         // collecting code including backtrace() and submit() may call
         // pthread_mutex_lock and cause deadlock. Don't sample.
         tls_inside_lock) {
-        return sys_pthread_mutex_lock(mutex);
+        return pthread_mutex_lock_internal(mutex);
     }
     // Don't slow down non-contended locks.
-    int rc = pthread_mutex_trylock(mutex);
+    int rc = pthread_mutex_trylock_internal(mutex);
     if (rc != EBUSY) {
         return rc;
     }
@@ -567,16 +598,16 @@ BUTIL_FORCE_INLINE int 
pthread_mutex_lock_impl(pthread_mutex_t* mutex) {
         csite = &entry.csite;
         if (!sampling_range) {
             make_contention_site_invalid(&entry.csite);
-            return sys_pthread_mutex_lock(mutex);
+            return pthread_mutex_lock_internal(mutex);
         }
     }
 #endif
     if (!sampling_range) {  // don't sample
-        return sys_pthread_mutex_lock(mutex);
+        return pthread_mutex_lock_internal(mutex);
     }
     // Lock and monitor the waiting time.
     const int64_t start_ns = butil::cpuwide_time_ns();
-    rc = sys_pthread_mutex_lock(mutex);
+    rc = pthread_mutex_lock_internal(mutex);
     if (!rc) { // Inside lock
         if (!csite) {
             csite = add_pthread_contention_site(mutex);
@@ -590,13 +621,14 @@ BUTIL_FORCE_INLINE int 
pthread_mutex_lock_impl(pthread_mutex_t* mutex) {
     return rc;
 }
 
-BUTIL_FORCE_INLINE int pthread_mutex_unlock_impl(pthread_mutex_t* mutex) {
+template <typename Mutex>
+BUTIL_FORCE_INLINE int pthread_mutex_unlock_impl(Mutex* mutex) {
     // Don't change behavior of unlock when profiler is off.
     if (!g_cp || tls_inside_lock) {
         // This branch brings an issue that an entry created by
-        // add_pthread_contention_site may not be cleared. Thus we add a 
+        // add_pthread_contention_site may not be cleared. Thus we add a
         // 16-bit rolling version in the entry to find out such entry.
-        return sys_pthread_mutex_unlock(mutex);
+        return pthread_mutex_unlock_internal(mutex);
     }
     int64_t unlock_start_ns = 0;
     bool miss_in_tls = true;
@@ -622,7 +654,7 @@ BUTIL_FORCE_INLINE int 
pthread_mutex_unlock_impl(pthread_mutex_t* mutex) {
             unlock_start_ns = butil::cpuwide_time_ns();
         }
     }
-    const int rc = sys_pthread_mutex_unlock(mutex);
+    const int rc = pthread_mutex_unlock_internal(mutex);
     // [Outside lock]
     if (unlock_start_ns) {
         const int64_t unlock_end_ns = butil::cpuwide_time_ns();
@@ -632,6 +664,16 @@ BUTIL_FORCE_INLINE int 
pthread_mutex_unlock_impl(pthread_mutex_t* mutex) {
     return rc;
 }
 
+}
+
+BUTIL_FORCE_INLINE int pthread_mutex_lock_impl(pthread_mutex_t* mutex) {
+    return internal::pthread_mutex_lock_impl(mutex);
+}
+
+BUTIL_FORCE_INLINE int pthread_mutex_unlock_impl(pthread_mutex_t* mutex) {
+    return internal::pthread_mutex_unlock_impl(mutex);
+}
+
 // Implement bthread_mutex_t related functions
 struct MutexInternal {
     butil::static_atomic<unsigned char> locked;
@@ -714,6 +756,14 @@ void FastPthreadMutex::unlock() {
 } // namespace internal
 #endif // BTHREAD_USE_FAST_PTHREAD_MUTEX
 
+void FastPthreadMutex::lock() {
+    internal::pthread_mutex_lock_impl(&_mutex);
+}
+
+void FastPthreadMutex::unlock() {
+    internal::pthread_mutex_unlock_impl(&_mutex);
+}
+
 } // namespace bthread
 
 extern "C" {
diff --git a/src/bthread/mutex.h b/src/bthread/mutex.h
index 242a620f..ad6d2e5c 100644
--- a/src/bthread/mutex.h
+++ b/src/bthread/mutex.h
@@ -72,7 +72,7 @@ namespace internal {
 class FastPthreadMutex {
 public:
     FastPthreadMutex() : _futex(0) {}
-    ~FastPthreadMutex() {}
+    ~FastPthreadMutex() = default;
     void lock();
     void unlock();
     bool try_lock();
@@ -86,6 +86,19 @@ typedef butil::Mutex FastPthreadMutex;
 #endif
 }
 
+class FastPthreadMutex {
+public:
+    FastPthreadMutex() = default;
+    ~FastPthreadMutex() = default;
+    DISALLOW_COPY_AND_ASSIGN(FastPthreadMutex);
+
+    void lock();
+    void unlock();
+    bool try_lock() { return _mutex.try_lock(); }
+private:
+    internal::FastPthreadMutex _mutex;
+};
+
 }  // namespace bthread
 
 // Specialize std::lock_guard and std::unique_lock for bthread_mutex_t
diff --git a/src/bthread/timer_thread.cpp b/src/bthread/timer_thread.cpp
index 3b2f8a76..ee80568c 100644
--- a/src/bthread/timer_thread.cpp
+++ b/src/bthread/timer_thread.cpp
@@ -92,7 +92,7 @@ public:
     Task* consume_tasks();
 
 private:
-    internal::FastPthreadMutex _mutex;
+    FastPthreadMutex _mutex;
     int64_t _nearest_run_time;
     Task* _task_head;
 };
diff --git a/src/bthread/timer_thread.h b/src/bthread/timer_thread.h
index 139c2e98..1be061cc 100644
--- a/src/bthread/timer_thread.h
+++ b/src/bthread/timer_thread.h
@@ -95,7 +95,7 @@ private:
 
     TimerThreadOptions _options;
     Bucket* _buckets;        // list of tasks to be run
-    internal::FastPthreadMutex _mutex;    // protect _nearest_run_time
+    FastPthreadMutex _mutex;    // protect _nearest_run_time
     int64_t _nearest_run_time;
     // the futex for wake up timer thread. can't use _nearest_run_time because
     // it's 64-bit.
diff --git a/test/bthread_mutex_unittest.cpp b/test/bthread_mutex_unittest.cpp
index 38c43eed..21bd6044 100644
--- a/test/bthread_mutex_unittest.cpp
+++ b/test/bthread_mutex_unittest.cpp
@@ -229,8 +229,9 @@ TEST(MutexTest, performance) {
     PerfTest(&bth_mutex, (bthread_t*)NULL, thread_num, 
bthread_start_background, bthread_join);
 }
 
+template <typename Mutex>
 void* loop_until_stopped(void* arg) {
-    bthread::Mutex *m = (bthread::Mutex*)arg;
+    auto m = (Mutex*)arg;
     while (!g_stopped) {
         BAIDU_SCOPED_LOCK(*m);
         bthread_usleep(20);
@@ -251,11 +252,11 @@ TEST(MutexTest, mix_thread_types) {
     // true, thus loop_until_stopped spins forever)
     bthread_setconcurrency(M);
     for (int i = 0; i < N; ++i) {
-        ASSERT_EQ(0, pthread_create(&pthreads[i], NULL, loop_until_stopped, 
&m));
+        ASSERT_EQ(0, pthread_create(&pthreads[i], NULL, 
loop_until_stopped<bthread::Mutex>, &m));
     }
     for (int i = 0; i < M; ++i) {
         const bthread_attr_t *attr = i % 2 ? NULL : &BTHREAD_ATTR_PTHREAD;
-        ASSERT_EQ(0, bthread_start_urgent(&bthreads[i], attr, 
loop_until_stopped, &m));
+        ASSERT_EQ(0, bthread_start_urgent(&bthreads[i], attr, 
loop_until_stopped<bthread::Mutex>, &m));
     }
     bthread_usleep(1000L * 1000);
     g_stopped = true;
@@ -266,4 +267,37 @@ TEST(MutexTest, mix_thread_types) {
         pthread_join(pthreads[i], NULL);
     }
 }
+
+TEST(MutexTest, fast_pthread_mutex) {
+    bthread::FastPthreadMutex mutex;
+    ASSERT_TRUE(mutex.try_lock());
+    mutex.unlock();
+    mutex.lock();
+    mutex.unlock();
+    {
+        BAIDU_SCOPED_LOCK(mutex);
+    }
+    {
+        std::unique_lock<bthread::FastPthreadMutex> lck1;
+        std::unique_lock<bthread::FastPthreadMutex> lck2(mutex);
+        lck1.swap(lck2);
+        lck1.unlock();
+        lck1.lock();
+    }
+    ASSERT_TRUE(mutex.try_lock());
+    mutex.unlock();
+
+    const int N = 16;
+    pthread_t pthreads[N];
+    for (int i = 0; i < N; ++i) {
+        ASSERT_EQ(0, pthread_create(&pthreads[i], NULL,
+            loop_until_stopped<bthread::FastPthreadMutex>, &mutex));
+    }
+    bthread_usleep(1000L * 1000);
+    g_stopped = true;
+    for (int i = 0; i < N; ++i) {
+        pthread_join(pthreads[i], NULL);
+    }
+}
+
 } // namespace


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to