This is an automated email from the ASF dual-hosted git repository.

wwbmmm pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/brpc.git


The following commit(s) were added to refs/heads/master by this push:
     new fea2952a Active Spinning and queue old bthread at the head for bthread 
mutex (#2749)
fea2952a is described below

commit fea2952aaf7b2ee8ef1953294321b78f94ec683f
Author: Bright Chen <chenguangmin...@foxmail.com>
AuthorDate: Fri Sep 6 10:42:35 2024 +0800

    Active Spinning and queue old bthread at the head for bthread mutex (#2749)
---
 src/bthread/butex.cpp              | 29 +++++++++++++-----
 src/bthread/butex.h                |  7 ++++-
 src/bthread/mutex.cpp              | 63 ++++++++++++++++++++++++--------------
 src/bthread/task_group.h           |  5 +++
 src/butil/containers/linked_list.h |  5 +++
 src/butil/thread_local.h           | 18 +++++++++--
 6 files changed, 93 insertions(+), 34 deletions(-)

diff --git a/src/bthread/butex.cpp b/src/bthread/butex.cpp
index 2b7c78b8..b603d89c 100644
--- a/src/bthread/butex.cpp
+++ b/src/bthread/butex.cpp
@@ -537,8 +537,14 @@ inline bool erase_from_butex(ButexWaiter* bw, bool wakeup, 
WaiterState state) {
     return erased;
 }
 
+struct WaitForButexArgs {
+    ButexBthreadWaiter* bw;
+    bool prepend;
+};
+
 static void wait_for_butex(void* arg) {
-    ButexBthreadWaiter* const bw = static_cast<ButexBthreadWaiter*>(arg);
+    auto args = static_cast<WaitForButexArgs*>(arg);
+    ButexBthreadWaiter* const bw = args->bw;
     Butex* const b = bw->initial_butex;
     // 1: waiter with timeout should have waiter_state == WAITER_STATE_READY
     //    before they're queued, otherwise the waiter is already timedout
@@ -560,7 +566,11 @@ static void wait_for_butex(void* arg) {
             bw->waiter_state = WAITER_STATE_UNMATCHEDVALUE;
         } else if (bw->waiter_state == WAITER_STATE_READY/*1*/ &&
                    !bw->task_meta->interrupted) {
-            b->waiters.Append(bw);
+            if (args->prepend) {
+                b->waiters.Prepend(bw);
+            } else {
+                b->waiters.Append(bw);
+            }
             bw->container.store(b, butil::memory_order_relaxed);
             if (bw->abstime != NULL) {
                 bw->sleep_id = get_global_timer_thread()->schedule(
@@ -593,7 +603,7 @@ static void wait_for_butex(void* arg) {
 }
 
 static int butex_wait_from_pthread(TaskGroup* g, Butex* b, int expected_value,
-                                   const timespec* abstime) {
+                                   const timespec* abstime, bool prepend) {
     TaskMeta* task = NULL;
     ButexPthreadWaiter pw;
     pw.tid = 0;
@@ -616,7 +626,11 @@ static int butex_wait_from_pthread(TaskGroup* g, Butex* b, 
int expected_value,
         errno = EINTR;
         rc = -1;
     } else {
-        b->waiters.Append(&pw);
+        if (prepend) {
+            b->waiters.Prepend(&pw);
+        } else {
+            b->waiters.Append(&pw);
+        }
         pw.container.store(b, butil::memory_order_relaxed);
         b->waiter_lock.unlock();
 
@@ -646,7 +660,7 @@ static int butex_wait_from_pthread(TaskGroup* g, Butex* b, 
int expected_value,
     return rc;
 }
 
-int butex_wait(void* arg, int expected_value, const timespec* abstime) {
+int butex_wait(void* arg, int expected_value, const timespec* abstime, bool 
prepend) {
     Butex* b = container_of(static_cast<butil::atomic<int>*>(arg), Butex, 
value);
     if (b->value.load(butil::memory_order_relaxed) != expected_value) {
         errno = EWOULDBLOCK;
@@ -657,7 +671,7 @@ int butex_wait(void* arg, int expected_value, const 
timespec* abstime) {
     }
     TaskGroup* g = tls_task_group;
     if (NULL == g || g->is_current_pthread_task()) {
-        return butex_wait_from_pthread(g, b, expected_value, abstime);
+        return butex_wait_from_pthread(g, b, expected_value, abstime, prepend);
     }
     ButexBthreadWaiter bbw;
     // tid is 0 iff the thread is non-bthread
@@ -690,7 +704,8 @@ int butex_wait(void* arg, int expected_value, const 
timespec* abstime) {
     // release fence matches with acquire fence in 
interrupt_and_consume_waiters
     // in task_group.cpp to guarantee visibility of `interrupted'.
     bbw.task_meta->current_waiter.store(&bbw, butil::memory_order_release);
-    g->set_remained(wait_for_butex, &bbw);
+    WaitForButexArgs args{ &bbw, prepend};
+    g->set_remained(wait_for_butex, &args);
     TaskGroup::sched(&g);
 
     // erase_from_butex_and_wakeup (called by TimerThread) is possibly still
diff --git a/src/bthread/butex.h b/src/bthread/butex.h
index 93a1f6ec..b40ec1e0 100644
--- a/src/bthread/butex.h
+++ b/src/bthread/butex.h
@@ -67,8 +67,13 @@ int butex_requeue(void* butex1, void* butex2);
 // abstime is not NULL.
 // About |abstime|:
 //   Different from FUTEX_WAIT, butex_wait uses absolute time.
+// About |prepend|:
+//   If |prepend| is true, queue the bthread at the head of the queue,
+//   otherwise at the tail.
 // Returns 0 on success, -1 otherwise and errno is set.
-int butex_wait(void* butex, int expected_value, const timespec* abstime);
+int butex_wait(void* butex, int expected_value,
+               const timespec* abstime,
+               bool prepend = false);
 
 }  // namespace bthread
 
diff --git a/src/bthread/mutex.cpp b/src/bthread/mutex.cpp
index f2606bb9..403f6bb8 100644
--- a/src/bthread/mutex.cpp
+++ b/src/bthread/mutex.cpp
@@ -39,17 +39,22 @@
 #include "butil/logging.h"
 #include "butil/object_pool.h"
 #include "butil/debug/stack_trace.h"
+#include "butil/thread_local.h"
 #include "bthread/butex.h"                       // butex_*
 #include "bthread/mutex.h"                       // bthread_mutex_t
 #include "bthread/sys_futex.h"
 #include "bthread/log.h"
-#include "butil/debug/stack_trace.h"
+#include "bthread/processor.h"
+#include "bthread/task_group.h"
 
 extern "C" {
 extern void* BAIDU_WEAK _dl_sym(void* handle, const char* symbol, void* 
caller);
 }
 
 namespace bthread {
+
+EXTERN_BAIDU_VOLATILE_THREAD_LOCAL(TaskGroup*, tls_task_group);
+
 // Warm up backtrace before main().
 const butil::debug::StackTrace ALLOW_UNUSED dummy_bt;
 
@@ -772,29 +777,41 @@ const MutexInternal MUTEX_LOCKED_RAW = {{1},{0},0};
 BAIDU_CASSERT(sizeof(unsigned) == sizeof(MutexInternal),
               sizeof_mutex_internal_must_equal_unsigned);
 
-inline int mutex_lock_contended(bthread_mutex_t* m) {
-    butil::atomic<unsigned>* whole = (butil::atomic<unsigned>*)m->butex;
-    while (whole->exchange(BTHREAD_MUTEX_CONTENDED) & BTHREAD_MUTEX_LOCKED) {
-        if (bthread::butex_wait(whole, BTHREAD_MUTEX_CONTENDED, NULL) < 0 &&
-            errno != EWOULDBLOCK && errno != EINTR/*note*/) {
-            // a mutex lock should ignore interruptions in general since
-            // user code is unlikely to check the return value.
-            return errno;
+const int MAX_SPIN_ITER = 4;
+
+inline int mutex_lock_contended_impl(
+    bthread_mutex_t* m, const struct timespec* __restrict abstime) {
+    // When a bthread first contends for a lock, active spinning makes sense.
+    // Spin only few times and only if local `rq' is empty.
+    TaskGroup* g = BAIDU_GET_VOLATILE_THREAD_LOCAL(tls_task_group);
+    if (BAIDU_UNLIKELY(NULL == g || g->rq_size() == 0)) {
+        for (int i = 0; i < MAX_SPIN_ITER; ++i) {
+            cpu_relax();
         }
     }
-    return 0;
-}
 
-inline int mutex_timedlock_contended(
-    bthread_mutex_t* m, const struct timespec* __restrict abstime) {
-    butil::atomic<unsigned>* whole = (butil::atomic<unsigned>*)m->butex;
+    bool queue_lifo = false;
+    bool first_wait = true;
+    auto whole = (butil::atomic<unsigned>*)m->butex;
     while (whole->exchange(BTHREAD_MUTEX_CONTENDED) & BTHREAD_MUTEX_LOCKED) {
-        if (bthread::butex_wait(whole, BTHREAD_MUTEX_CONTENDED, abstime) < 0 &&
+        if (bthread::butex_wait(whole, BTHREAD_MUTEX_CONTENDED, abstime, 
queue_lifo) < 0 &&
             errno != EWOULDBLOCK && errno != EINTR/*note*/) {
-            // a mutex lock should ignore interrruptions in general since
+            // A mutex lock should ignore interruptions in general since
             // user code is unlikely to check the return value.
             return errno;
         }
+        // Ignore EWOULDBLOCK and EINTR.
+        if (first_wait && 0 == errno) {
+            first_wait = false;
+        }
+        if (!first_wait) {
+            // Normally, bthreads are queued in FIFO order. But competing with 
new
+            // arriving bthreads over the ownership of mutex, a woken up 
bthread
+            // has good chances of losing. Because new arriving bthreads are 
already
+            // running on CPU and there can be lots of them. In such case, for 
fairness,
+            // to avoid starvation, it is queued at the head of the waiter 
queue.
+            queue_lifo = true;
+        }
     }
     return 0;
 }
@@ -880,7 +897,7 @@ int bthread_mutex_trylock(bthread_mutex_t* m) {
 }
 
 int bthread_mutex_lock_contended(bthread_mutex_t* m) {
-    return bthread::mutex_lock_contended(m);
+    return bthread::mutex_lock_contended_impl(m, NULL);
 }
 
 int bthread_mutex_lock(bthread_mutex_t* m) {
@@ -890,18 +907,18 @@ int bthread_mutex_lock(bthread_mutex_t* m) {
     }
     // Don't sample when contention profiler is off.
     if (!bthread::g_cp) {
-        return bthread::mutex_lock_contended(m);
+        return bthread::mutex_lock_contended_impl(m, NULL);
     }
     // Ask Collector if this (contended) locking should be sampled.
     const size_t sampling_range = bvar::is_collectable(&bthread::g_cp_sl);
     if (!sampling_range) { // Don't sample
-        return bthread::mutex_lock_contended(m);
+        return bthread::mutex_lock_contended_impl(m, NULL);
     }
     // Start sampling.
     const int64_t start_ns = butil::cpuwide_time_ns();
     // NOTE: Don't modify m->csite outside lock since multiple threads are
     // still contending with each other.
-    const int rc = bthread::mutex_lock_contended(m);
+    const int rc = bthread::mutex_lock_contended_impl(m, NULL);
     if (!rc) { // Inside lock
         m->csite.duration_ns = butil::cpuwide_time_ns() - start_ns;
         m->csite.sampling_range = sampling_range;
@@ -917,18 +934,18 @@ int bthread_mutex_timedlock(bthread_mutex_t* __restrict m,
     }
     // Don't sample when contention profiler is off.
     if (!bthread::g_cp) {
-        return bthread::mutex_timedlock_contended(m, abstime);
+        return bthread::mutex_lock_contended_impl(m, abstime);
     }
     // Ask Collector if this (contended) locking should be sampled.
     const size_t sampling_range = bvar::is_collectable(&bthread::g_cp_sl);
     if (!sampling_range) { // Don't sample
-        return bthread::mutex_timedlock_contended(m, abstime);
+        return bthread::mutex_lock_contended_impl(m, abstime);
     }
     // Start sampling.
     const int64_t start_ns = butil::cpuwide_time_ns();
     // NOTE: Don't modify m->csite outside lock since multiple threads are
     // still contending with each other.
-    const int rc = bthread::mutex_timedlock_contended(m, abstime);
+    const int rc = bthread::mutex_lock_contended_impl(m, abstime);
     if (!rc) { // Inside lock
         m->csite.duration_ns = butil::cpuwide_time_ns() - start_ns;
         m->csite.sampling_range = sampling_range;
diff --git a/src/bthread/task_group.h b/src/bthread/task_group.h
index b71994a0..a19bd023 100644
--- a/src/bthread/task_group.h
+++ b/src/bthread/task_group.h
@@ -182,6 +182,11 @@ public:
     // process make go on indefinitely.
     void push_rq(bthread_t tid);
 
+    // Returns size of local run queue.
+    size_t rq_size() const {
+        return _rq.volatile_size();
+    }
+
     bthread_tag_t tag() const { return _tag; }
 
 private:
diff --git a/src/butil/containers/linked_list.h 
b/src/butil/containers/linked_list.h
index 7130c046..7874b65a 100644
--- a/src/butil/containers/linked_list.h
+++ b/src/butil/containers/linked_list.h
@@ -171,6 +171,11 @@ class LinkedList {
     e->InsertBefore(&root_);
   }
 
+  // Prepend |e| to the head of the linked list.
+  void Prepend(LinkNode<T>* e) {
+    e->InsertAfter(&root_);
+  }
+
   LinkNode<T>* head() const {
     return root_.next();
   }
diff --git a/src/butil/thread_local.h b/src/butil/thread_local.h
index a3cb1ff0..a67327c6 100644
--- a/src/butil/thread_local.h
+++ b/src/butil/thread_local.h
@@ -32,20 +32,25 @@
 
 #define BAIDU_VOLATILE_THREAD_LOCAL(type, var_name, default_value)             
\
   BAIDU_THREAD_LOCAL type var_name = default_value;                            
\
-  static __attribute__((noinline, unused)) type get_##var_name(void) {         
\
+  __attribute__((noinline, unused)) type get_##var_name(void) {                
\
     asm volatile("");                                                          
\
     return var_name;                                                           
\
   }                                                                            
\
-  static __attribute__((noinline, unused)) type *get_ptr_##var_name(void) {    
\
+  __attribute__((noinline, unused)) type *get_ptr_##var_name(void) {           
\
     type *ptr = &var_name;                                                     
\
     asm volatile("" : "+rm"(ptr));                                             
\
     return ptr;                                                                
\
   }                                                                            
\
-  static __attribute__((noinline, unused)) void set_##var_name(type v) {       
\
+  __attribute__((noinline, unused)) void set_##var_name(type v) {              
\
     asm volatile("");                                                          
\
     var_name = v;                                                              
\
   }
 
+#define EXTERN_BAIDU_VOLATILE_THREAD_LOCAL(type, var_name)                     
\
+    type get_##var_name(void);                                                 
\
+    type *get_ptr_##var_name(void);                                            
\
+    void set_##var_name(type v)
+
 #if (defined (__aarch64__) && defined (__GNUC__)) || defined(__clang__)
 // GNU compiler under aarch and Clang compiler is incorrectly caching the 
 // address of thread_local variables across a suspend-point. The following
@@ -53,10 +58,17 @@
 #define BAIDU_GET_VOLATILE_THREAD_LOCAL(var_name) get_##var_name()
 #define BAIDU_GET_PTR_VOLATILE_THREAD_LOCAL(var_name) get_ptr_##var_name()
 #define BAIDU_SET_VOLATILE_THREAD_LOCAL(var_name, value) set_##var_name(value)
+
+#define EXTERN_BAIDU_VOLATILE_THREAD_LOCAL(type, var_name)                     
\
+    type get_##var_name(void);                                                 
\
+    type *get_ptr_##var_name(void);                                            
\
+    void set_##var_name(type v)
 #else
 #define BAIDU_GET_VOLATILE_THREAD_LOCAL(var_name) var_name
 #define BAIDU_GET_PTR_VOLATILE_THREAD_LOCAL(var_name) &##var_name
 #define BAIDU_SET_VOLATILE_THREAD_LOCAL(var_name, value) var_name = value
+#define EXTERN_BAIDU_VOLATILE_THREAD_LOCAL(type, var_name)                     
\
+    extern BAIDU_THREAD_LOCAL type var_name
 #endif
 
 namespace butil {


---------------------------------------------------------------------
To unsubscribe, e-mail: dev-unsubscr...@brpc.apache.org
For additional commands, e-mail: dev-h...@brpc.apache.org

Reply via email to