This is an automated email from the ASF dual-hosted git repository.

wwbmmm pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/brpc.git


The following commit(s) were added to refs/heads/master by this push:
     new 95b7de3d bugfix: runtime extend block pool (#2965)
95b7de3d is described below

commit 95b7de3d40ae579d5cc28ea242d0d5f7114a0200
Author: Yang,Liming <liming.y...@139.com>
AuthorDate: Wed Jul 23 10:20:53 2025 +0800

    bugfix: runtime extend block pool (#2965)
---
 src/brpc/rdma/block_pool.cpp      | 128 +++++++++++++++++++++++++++-----------
 src/brpc/rdma/block_pool.h        |  10 ++-
 src/brpc/rdma/rdma_helper.cpp     |   5 ++
 test/brpc_block_pool_unittest.cpp |  12 ++--
 4 files changed, 111 insertions(+), 44 deletions(-)

diff --git a/src/brpc/rdma/block_pool.cpp b/src/brpc/rdma/block_pool.cpp
index 46435c3c..543cdb6d 100644
--- a/src/brpc/rdma/block_pool.cpp
+++ b/src/brpc/rdma/block_pool.cpp
@@ -36,9 +36,12 @@ DEFINE_int32(rdma_memory_pool_initial_size_mb, 1024,
              "Initial size of memory pool for RDMA (MB)");
 DEFINE_int32(rdma_memory_pool_increase_size_mb, 1024,
              "Increased size of memory pool for RDMA (MB)");
-DEFINE_int32(rdma_memory_pool_max_regions, 4, "Max number of regions");
+DEFINE_int32(rdma_memory_pool_max_regions, 1, "Max number of regions");
 DEFINE_int32(rdma_memory_pool_buckets, 4, "Number of buckets to reduce race");
 DEFINE_int32(rdma_memory_pool_tls_cache_num, 128, "Number of cached block in 
tls");
+DEFINE_bool(rdma_memory_pool_user_specified_memory, false,
+            "If true, the user must call UserExtendBlockPool() to extend "
+            "memory. bRPC will not handle memory extension.");
 
 static RegisterCallback g_cb = NULL;
 
@@ -125,31 +128,13 @@ uint32_t GetRegionId(const void* buf) {
     return r->id;
 }
 
-// Extend the block pool with a new region (with different region ID)
-static void* ExtendBlockPool(size_t region_size, int block_type) {
-    if (region_size < 1) {
-        errno = EINVAL;
-        return NULL;
-    }
-
+static void* ExtendBlockPoolImpl(void* region_base, size_t region_size,
+                                 int block_type) {
     if (g_region_num == FLAGS_rdma_memory_pool_max_regions) {
         LOG(INFO) << "Memory pool reaches max regions";
         errno = ENOMEM;
         return NULL;
     }
-
-    // Regularize region size
-    region_size = region_size * BYTES_IN_MB / g_block_size[block_type] / 
g_buckets;
-    region_size *= g_block_size[block_type] * g_buckets;
-
-    LOG(INFO) << "Start extend rdma memory " << region_size / BYTES_IN_MB << 
"MB";
-
-    void* region_base = NULL;
-    if (posix_memalign(&region_base, 4096, region_size) != 0) {
-        PLOG_EVERY_SECOND(ERROR) << "Memory not enough";
-        return NULL;
-    }
-
     uint32_t id = g_cb(region_base, region_size);
     if (id == 0) {
         free(region_base);
@@ -168,7 +153,7 @@ static void* ExtendBlockPool(size_t region_size, int 
block_type) {
             return NULL;
         }
     }
- 
+
     Region* region = &g_regions[g_region_num++];
     region->start = (uintptr_t)region_base;
     region->size = region_size;
@@ -178,7 +163,7 @@ static void* ExtendBlockPool(size_t region_size, int 
block_type) {
     for (size_t i = 0; i < g_buckets; ++i) {
         node[i]->start = (void*)(region->start + i * (region_size / 
g_buckets));
         node[i]->len = region_size / g_buckets;
-        node[i]->next = NULL;
+        node[i]->next = g_info->idle_list[block_type][i];
         g_info->idle_list[block_type][i] = node[i];
         g_info->idle_size[block_type][i] += node[i]->len;
     }
@@ -186,15 +171,71 @@ static void* ExtendBlockPool(size_t region_size, int 
block_type) {
     return region_base;
 }
 
-void* InitBlockPool(RegisterCallback cb) {
-    if (!cb) {
+// Extend the block pool with a new region (with different region ID)
+static void* ExtendBlockPool(size_t region_size, int block_type) {
+    if (region_size < 1) {
         errno = EINVAL;
         return NULL;
     }
+
+    if (FLAGS_rdma_memory_pool_user_specified_memory) {
+        LOG_EVERY_SECOND(ERROR) << "Fail to extend new region, "
+                                   "rdma_memory_pool_user_specified_memory is "
+                                   "true,  ExtendBlockPool is disabled";
+        return NULL;
+    }
+
+    // Regularize region size
+    region_size = region_size * BYTES_IN_MB / g_block_size[block_type] / 
g_buckets;
+    region_size *= g_block_size[block_type] * g_buckets;
+
+    LOG(INFO) << "Start extend rdma memory " << region_size / BYTES_IN_MB << 
"MB";
+
+    void* region_base = NULL;
+    if (posix_memalign(&region_base, 4096, region_size) != 0) {
+        PLOG_EVERY_SECOND(ERROR) << "Memory not enough";
+        return NULL;
+    }
+
+    return ExtendBlockPoolImpl(region_base, region_size, block_type);
+}
+
+void* ExtendBlockPoolByUser(void* region_base, size_t region_size,
+                            int block_type) {
+    if (FLAGS_rdma_memory_pool_user_specified_memory == false) {
+        LOG_EVERY_SECOND(ERROR) << "User extend memory is disabled";
+        return NULL;
+    }
+    if (reinterpret_cast<uintptr_t>(region_base) % 4096 != 0) {
+        LOG_EVERY_SECOND(ERROR) << "region_base must be 4096 aligned";
+        return NULL;
+    }
+
+    uint64_t index = butil::fast_rand() % g_buckets;
+    BAIDU_SCOPED_LOCK(*g_info->lock[block_type][index]);
+    BAIDU_SCOPED_LOCK(g_info->extend_lock);
+
+    if (g_region_num > 1 && FLAGS_rdma_memory_pool_buckets > 1) {
+        LOG_EVERY_SECOND(ERROR)
+            << "Runtime extend memory only support single bucket";
+        return NULL;
+    }
+    region_size =
+        region_size * BYTES_IN_MB / g_block_size[block_type] / g_buckets;
+    region_size *= g_block_size[block_type] * g_buckets;
+
+    return ExtendBlockPoolImpl(region_base, region_size, block_type);
+}
+
+bool InitBlockPool(RegisterCallback cb) {
+    if (!cb) {
+        errno = EINVAL;
+        return false;
+    }
     if (g_cb) {
         LOG(WARNING) << "Do not initialize block pool repeatedly";
         errno = EINVAL;
-        return NULL;
+        return false;
     }
     g_cb = cb;
     if (FLAGS_rdma_memory_pool_max_regions < RDMA_MEMORY_POOL_MIN_REGIONS ||
@@ -204,7 +245,7 @@ void* InitBlockPool(RegisterCallback cb) {
                      << RDMA_MEMORY_POOL_MIN_REGIONS << ","
                      << RDMA_MEMORY_POOL_MAX_REGIONS << "]!";
         errno = EINVAL;
-        return NULL;
+        return false;
     }
     if (FLAGS_rdma_memory_pool_initial_size_mb < RDMA_MEMORY_POOL_MIN_SIZE ||
         FLAGS_rdma_memory_pool_initial_size_mb > RDMA_MEMORY_POOL_MAX_SIZE) {
@@ -213,7 +254,7 @@ void* InitBlockPool(RegisterCallback cb) {
                      << RDMA_MEMORY_POOL_MIN_SIZE << ","
                      << RDMA_MEMORY_POOL_MAX_SIZE << "]!";
         errno = EINVAL;
-        return NULL;
+        return false;
     }
     if (FLAGS_rdma_memory_pool_increase_size_mb < RDMA_MEMORY_POOL_MIN_SIZE ||
         FLAGS_rdma_memory_pool_increase_size_mb > RDMA_MEMORY_POOL_MAX_SIZE) {
@@ -222,7 +263,7 @@ void* InitBlockPool(RegisterCallback cb) {
                      << RDMA_MEMORY_POOL_MIN_SIZE << ","
                      << RDMA_MEMORY_POOL_MAX_SIZE << "]!";
         errno = EINVAL;
-        return NULL;
+        return false;
     }
     if (FLAGS_rdma_memory_pool_buckets < RDMA_MEMORY_POOL_MIN_BUCKETS ||
         FLAGS_rdma_memory_pool_buckets > RDMA_MEMORY_POOL_MAX_BUCKETS) {
@@ -231,32 +272,38 @@ void* InitBlockPool(RegisterCallback cb) {
                      << RDMA_MEMORY_POOL_MIN_BUCKETS << ","
                      << RDMA_MEMORY_POOL_MAX_BUCKETS << "]!";
         errno = EINVAL;
-        return NULL;
+        return false;
+    }
+    // runtime extend block pool only support 1 bucket
+    if (FLAGS_rdma_memory_pool_max_regions > 1 &&
+        FLAGS_rdma_memory_pool_buckets > 1) {
+        LOG(WARNING) << "rdma runtime extend block pool only support 1 bucket";
+        return false;
     }
     g_buckets = FLAGS_rdma_memory_pool_buckets;
 
     g_info = new (std::nothrow) GlobalInfo;
     if (!g_info) {
-        return NULL;
+        return false;
     }
 
     for (int i = 0; i < BLOCK_SIZE_COUNT; ++i) {
         g_info->idle_list[i].resize(g_buckets, NULL);
         if (g_info->idle_list[i].size() != g_buckets) {
-            return NULL;
+            return false;
         }
         g_info->lock[i].resize(g_buckets, NULL);
         if (g_info->lock[i].size() != g_buckets) {
-            return NULL;
+            return false;
         }
         g_info->idle_size[i].resize(g_buckets, 0);
         if (g_info->idle_size[i].size() != g_buckets) {
-            return NULL;
+            return false;
         }
         for (size_t j = 0; j < g_buckets; ++j) {
             g_info->lock[i][j] = new (std::nothrow) butil::Mutex;
             if (!g_info->lock[i][j]) {
-                return NULL;
+                return false;
             }
         }
     }
@@ -264,8 +311,15 @@ void* InitBlockPool(RegisterCallback cb) {
     g_dump_mutex = new butil::Mutex;
     g_tls_info_mutex = new butil::Mutex;
 
-    return ExtendBlockPool(FLAGS_rdma_memory_pool_initial_size_mb,
-                           BLOCK_DEFAULT);
+    if (FLAGS_rdma_memory_pool_user_specified_memory) {
+        return true;
+    }
+
+    if (ExtendBlockPool(FLAGS_rdma_memory_pool_initial_size_mb,
+                        BLOCK_DEFAULT) != NULL) {
+        return true;
+    }
+    return false;
 }
 
 static void* AllocBlockFrom(int block_type) {
diff --git a/src/brpc/rdma/block_pool.h b/src/brpc/rdma/block_pool.h
index 9edfb837..00a31082 100644
--- a/src/brpc/rdma/block_pool.h
+++ b/src/brpc/rdma/block_pool.h
@@ -73,7 +73,15 @@ typedef uint32_t (*RegisterCallback)(void*, size_t);
 // region. It should be the memory registration in brpc. However,
 // in block_pool, we just abstract it into a function to get region id.
 // Return the first region's address, NULL if failed and errno is set.
-void* InitBlockPool(RegisterCallback cb);
+bool InitBlockPool(RegisterCallback cb);
+
+// In scenarios where users need to manually specify memory regions (e.g., 
using
+// hugepages or custom memory pools), when
+// FLAGS_rdma_memory_pool_user_specified_memory is true, user is  
responsibility
+// of extending memory blocks , this ensuring flexibility for advanced use
+// cases.
+void* ExtendBlockPoolByUser(void* region_base, size_t region_size,
+                            int block_type);
 
 // Allocate a buf with length at least @a size (require: size>0)
 // Return the address allocated, NULL if failed and errno is set.
diff --git a/src/brpc/rdma/rdma_helper.cpp b/src/brpc/rdma/rdma_helper.cpp
index f7cd9ecd..c755bc8f 100644
--- a/src/brpc/rdma/rdma_helper.cpp
+++ b/src/brpc/rdma/rdma_helper.cpp
@@ -168,6 +168,11 @@ static void GlobalRelease() {
     }
 }
 
+void* UserExtendBlockPool(void* region_base, size_t region_size,
+                          int block_type) {
+    return ExtendBlockPoolByUser(region_base, region_size, block_type);
+}
+
 uint32_t RdmaRegisterMemory(void* buf, size_t size) {
     // Register the memory as callback in block_pool
     // The thread-safety should be guaranteed by the caller
diff --git a/test/brpc_block_pool_unittest.cpp 
b/test/brpc_block_pool_unittest.cpp
index 76e8c325..f65f2a00 100644
--- a/test/brpc_block_pool_unittest.cpp
+++ b/test/brpc_block_pool_unittest.cpp
@@ -54,7 +54,7 @@ TEST_F(BlockPoolTest, single_thread) {
     FLAGS_rdma_memory_pool_increase_size_mb = 1024;
     FLAGS_rdma_memory_pool_max_regions = 16;
     FLAGS_rdma_memory_pool_buckets = 4;
-    EXPECT_TRUE(InitBlockPool(DummyCallback) != NULL);
+    EXPECT_TRUE(InitBlockPool(DummyCallback));
 
     size_t num = 1024;
     void* buf[num];
@@ -108,7 +108,7 @@ TEST_F(BlockPoolTest, multiple_thread) {
     FLAGS_rdma_memory_pool_increase_size_mb = 1024;
     FLAGS_rdma_memory_pool_max_regions = 16;
     FLAGS_rdma_memory_pool_buckets = 4;
-    EXPECT_TRUE(InitBlockPool(DummyCallback) != NULL);
+    EXPECT_TRUE(InitBlockPool(DummyCallback));
 
     uintptr_t thread_num = 32;
     bthread_t tid[thread_num];
@@ -130,7 +130,7 @@ TEST_F(BlockPoolTest, extend) {
     FLAGS_rdma_memory_pool_increase_size_mb = 64;
     FLAGS_rdma_memory_pool_max_regions = 16;
     FLAGS_rdma_memory_pool_buckets = 1;
-    EXPECT_TRUE(InitBlockPool(DummyCallback) != NULL);
+    EXPECT_TRUE(InitBlockPool(DummyCallback));
 
     EXPECT_EQ(1, GetRegionNum());
     size_t num = 15 * 64 * 1024 * 1024 / GetBlockSize(2);
@@ -153,7 +153,7 @@ TEST_F(BlockPoolTest, memory_not_enough) {
     FLAGS_rdma_memory_pool_increase_size_mb = 64;
     FLAGS_rdma_memory_pool_max_regions = 2;
     FLAGS_rdma_memory_pool_buckets = 1;
-    EXPECT_TRUE(InitBlockPool(DummyCallback) != NULL);
+    EXPECT_TRUE(InitBlockPool(DummyCallback));
 
     EXPECT_EQ(1, GetRegionNum());
     size_t num = 64 * 1024 * 1024 / GetBlockSize(2);
@@ -179,7 +179,7 @@ TEST_F(BlockPoolTest, invalid_use) {
     FLAGS_rdma_memory_pool_increase_size_mb = 64;
     FLAGS_rdma_memory_pool_max_regions = 2;
     FLAGS_rdma_memory_pool_buckets = 1;
-    EXPECT_TRUE(InitBlockPool(DummyCallback) != NULL);
+    EXPECT_TRUE(InitBlockPool(DummyCallback));
 
     void* buf = AllocBlock(0);
     EXPECT_EQ(NULL, buf);
@@ -201,7 +201,7 @@ TEST_F(BlockPoolTest, dump_info) {
     FLAGS_rdma_memory_pool_increase_size_mb = 64;
     FLAGS_rdma_memory_pool_max_regions = 2;
     FLAGS_rdma_memory_pool_buckets = 4;
-    EXPECT_TRUE(InitBlockPool(DummyCallback) != NULL);
+    EXPECT_TRUE(InitBlockPool(DummyCallback));
     DumpMemoryPoolInfo(std::cout);
     void* buf = AllocBlock(8192);
     DumpMemoryPoolInfo(std::cout);


---------------------------------------------------------------------
To unsubscribe, e-mail: dev-unsubscr...@brpc.apache.org
For additional commands, e-mail: dev-h...@brpc.apache.org

Reply via email to