This is an automated email from the ASF dual-hosted git repository.

wwbmmm pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/brpc.git


The following commit(s) were added to refs/heads/master by this push:
     new f949962e Support custom ParkingLot number (#3033)
f949962e is described below

commit f949962e141fecba92bbd399d1da75fe2ac6862d
Author: Bright Chen <chenguangmin...@foxmail.com>
AuthorDate: Mon Aug 18 10:18:27 2025 +0800

    Support custom ParkingLot number (#3033)
    
    * Support custom ParkingLot number
    
    * Validate bthread_parking_lot_of_each_tag flag
    
    * Fast path, no need to futex_wait
---
 src/bthread/bthread.cpp      | 17 ++++++++++++++++-
 src/bthread/parking_lot.h    |  4 ++++
 src/bthread/task_control.cpp | 21 ++++++++++-----------
 src/bthread/task_control.h   |  8 ++++----
 src/bthread/types.h          |  3 +++
 5 files changed, 37 insertions(+), 16 deletions(-)

diff --git a/src/bthread/bthread.cpp b/src/bthread/bthread.cpp
index 35d87477..a5f178e3 100644
--- a/src/bthread/bthread.cpp
+++ b/src/bthread/bthread.cpp
@@ -59,6 +59,21 @@ DEFINE_int32(bthread_concurrency_by_tag, 8 + 
BTHREAD_EPOLL_THREAD_NUM,
              "Number of pthread workers of FLAGS_bthread_current_tag");
 BUTIL_VALIDATE_GFLAG(bthread_concurrency_by_tag, 
validate_bthread_concurrency_by_tag);
 
+DEFINE_int32(bthread_parking_lot_of_each_tag, 4, "Number of parking lots of 
each tag");
+BUTIL_VALIDATE_GFLAG(bthread_parking_lot_of_each_tag, [](const char*, int32_t 
val) {
+    if (val < BTHREAD_MIN_PARKINGLOT) {
+        LOG(ERROR) << "bthread_parking_lot_of_each_tag must be greater than or 
equal to "
+                   << BTHREAD_MIN_PARKINGLOT;
+        return false;
+    }
+    if (val > BTHREAD_MAX_PARKINGLOT) {
+        LOG(ERROR) << "bthread_parking_lot_of_each_tag must be less than or 
equal to "
+                   << BTHREAD_MAX_PARKINGLOT;
+        return false;
+    }
+    return true;
+});
+
 static bool never_set_bthread_concurrency = true;
 
 BAIDU_CASSERT(sizeof(TaskControl*) == sizeof(butil::atomic<TaskControl*>), 
atomic_size_match);
@@ -216,7 +231,7 @@ static bool validate_bthread_current_tag(const char*, 
int32_t val) {
         return false;
     }
     BAIDU_SCOPED_LOCK(bthread::g_task_control_mutex);
-    auto c = bthread::get_task_control();
+    auto c = get_task_control();
     if (c == NULL) {
         FLAGS_bthread_concurrency_by_tag = 8 + BTHREAD_EPOLL_THREAD_NUM;
         return true;
diff --git a/src/bthread/parking_lot.h b/src/bthread/parking_lot.h
index 620e3c89..315e9956 100644
--- a/src/bthread/parking_lot.h
+++ b/src/bthread/parking_lot.h
@@ -60,6 +60,10 @@ public:
     // Wait for tasks.
     // If the `expected_state' does not match, wait() may finish directly.
     void wait(const State& expected_state) {
+        if (get_state().val != expected_state.val) {
+            // Fast path, no need to futex_wait.
+            return;
+        }
         _waiter_num.fetch_add(1, butil::memory_order_relaxed);
         futex_wait_private(&_pending_signal, expected_state.val, NULL);
         _waiter_num.fetch_sub(1, butil::memory_order_relaxed);
diff --git a/src/bthread/task_control.cpp b/src/bthread/task_control.cpp
index 0cc3754a..05ceec09 100644
--- a/src/bthread/task_control.cpp
+++ b/src/bthread/task_control.cpp
@@ -46,6 +46,7 @@ namespace bthread {
 
 DECLARE_int32(bthread_concurrency);
 DECLARE_int32(bthread_min_concurrency);
+DECLARE_int32(bthread_parking_lot_of_each_tag);
 
 extern pthread_mutex_t g_task_control_mutex;
 extern BAIDU_THREAD_LOCAL TaskGroup* tls_task_group;
@@ -186,7 +187,8 @@ TaskControl::TaskControl()
     , _status(print_rq_sizes_in_the_tc, this)
     , _nbthreads("bthread_count")
     , _priority_queues(FLAGS_task_group_ntags)
-    , _pl(FLAGS_task_group_ntags)
+    , _pl_num_of_each_tag(FLAGS_bthread_parking_lot_of_each_tag)
+    , _tagged_pl(FLAGS_task_group_ntags)
 {}
 
 int TaskControl::init(int concurrency) {
@@ -326,7 +328,7 @@ void TaskControl::stop_and_join() {
             [](butil::atomic<size_t>& index) { index.store(0, 
butil::memory_order_relaxed); });
     }
     for (int i = 0; i < FLAGS_task_group_ntags; ++i) {
-        for (auto& pl : _pl[i]) {
+        for (auto& pl : _tagged_pl[i]) {
             pl.stop();
         }
     }
@@ -367,7 +369,7 @@ int TaskControl::_add_group(TaskGroup* g, bthread_tag_t 
tag) {
         return -1;
     }
     g->set_tag(tag);
-    g->set_pl(&_pl[tag][butil::fmix64(pthread_numeric_id()) % 
PARKING_LOT_NUM]);
+    g->set_pl(&_tagged_pl[tag][butil::fmix64(pthread_numeric_id()) % 
_pl_num_of_each_tag]);
     size_t ngroup = _tagged_ngroup[tag].load(butil::memory_order_relaxed);
     if (ngroup < (size_t)BTHREAD_MAX_CONCURRENCY) {
         _tagged_groups[tag][ngroup] = g;
@@ -482,14 +484,11 @@ void TaskControl::signal_task(int num_task, bthread_tag_t 
tag) {
         num_task = 2;
     }
     auto& pl = tag_pl(tag);
-    int start_index = butil::fmix64(pthread_numeric_id()) % PARKING_LOT_NUM;
-    num_task -= pl[start_index].signal(1);
-    if (num_task > 0) {
-        for (int i = 1; i < PARKING_LOT_NUM && num_task > 0; ++i) {
-            if (++start_index >= PARKING_LOT_NUM) {
-                start_index = 0;
-            }
-            num_task -= pl[start_index].signal(1);
+    size_t start_index = butil::fmix64(pthread_numeric_id()) % 
_pl_num_of_each_tag;
+    for (size_t i = 0; i < _pl_num_of_each_tag && num_task > 0; ++i) {
+        num_task -= pl[start_index].signal(1);
+        if (++start_index >= _pl_num_of_each_tag) {
+            start_index = 0;
         }
     }
     if (num_task > 0 &&
diff --git a/src/bthread/task_control.h b/src/bthread/task_control.h
index 11587b29..2426b00c 100644
--- a/src/bthread/task_control.h
+++ b/src/bthread/task_control.h
@@ -103,8 +103,7 @@ public:
 
 private:
     typedef std::array<TaskGroup*, BTHREAD_MAX_CONCURRENCY> TaggedGroups;
-    static const int PARKING_LOT_NUM = 4;
-    typedef std::array<ParkingLot, PARKING_LOT_NUM> TaggedParkingLot;
+    typedef std::array<ParkingLot, BTHREAD_MAX_PARKINGLOT> TaggedParkingLot;
     // Add/Remove a TaskGroup.
     // Returns 0 on success, -1 otherwise.
     int _add_group(TaskGroup*, bthread_tag_t tag);
@@ -117,7 +116,7 @@ private:
     butil::atomic<size_t>& tag_ngroup(int tag) { return _tagged_ngroup[tag]; }
 
     // Tag parking slot
-    TaggedParkingLot& tag_pl(bthread_tag_t tag) { return _pl[tag]; }
+    TaggedParkingLot& tag_pl(bthread_tag_t tag) { return _tagged_pl[tag]; }
 
     static void delete_task_group(void* arg);
 
@@ -159,7 +158,8 @@ private:
     std::vector<bvar::Adder<int64_t>*> _tagged_nbthreads;
     std::vector<WorkStealingQueue<bthread_t>> _priority_queues;
 
-    std::vector<TaggedParkingLot> _pl;
+    size_t _pl_num_of_each_tag;
+    std::vector<TaggedParkingLot> _tagged_pl;
 
 #ifdef BRPC_BTHREAD_TRACER
     TaskTracer _task_tracer;
diff --git a/src/bthread/types.h b/src/bthread/types.h
index c0f23f1c..30368f68 100644
--- a/src/bthread/types.h
+++ b/src/bthread/types.h
@@ -154,6 +154,9 @@ static const bthread_t BTHREAD_ATOMIC_INIT = 0;
 // Min/Max number of work pthreads.
 static const int BTHREAD_MIN_CONCURRENCY = 3 + BTHREAD_EPOLL_THREAD_NUM;
 static const int BTHREAD_MAX_CONCURRENCY = 1024;
+// Min/max number of ParkingLot.
+static const int BTHREAD_MIN_PARKINGLOT = 4;
+static const int BTHREAD_MAX_PARKINGLOT = 1024;
 
 typedef struct {
     void* impl;


---------------------------------------------------------------------
To unsubscribe, e-mail: dev-unsubscr...@brpc.apache.org
For additional commands, e-mail: dev-h...@brpc.apache.org

Reply via email to