This is an automated email from the ASF dual-hosted git repository. wwbmmm pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/brpc.git
The following commit(s) were added to refs/heads/master by this push: new 95b7de3d bugfix: runtime extend block pool (#2965) 95b7de3d is described below commit 95b7de3d40ae579d5cc28ea242d0d5f7114a0200 Author: Yang,Liming <liming.y...@139.com> AuthorDate: Wed Jul 23 10:20:53 2025 +0800 bugfix: runtime extend block pool (#2965) --- src/brpc/rdma/block_pool.cpp | 128 +++++++++++++++++++++++++++----------- src/brpc/rdma/block_pool.h | 10 ++- src/brpc/rdma/rdma_helper.cpp | 5 ++ test/brpc_block_pool_unittest.cpp | 12 ++-- 4 files changed, 111 insertions(+), 44 deletions(-) diff --git a/src/brpc/rdma/block_pool.cpp b/src/brpc/rdma/block_pool.cpp index 46435c3c..543cdb6d 100644 --- a/src/brpc/rdma/block_pool.cpp +++ b/src/brpc/rdma/block_pool.cpp @@ -36,9 +36,12 @@ DEFINE_int32(rdma_memory_pool_initial_size_mb, 1024, "Initial size of memory pool for RDMA (MB)"); DEFINE_int32(rdma_memory_pool_increase_size_mb, 1024, "Increased size of memory pool for RDMA (MB)"); -DEFINE_int32(rdma_memory_pool_max_regions, 4, "Max number of regions"); +DEFINE_int32(rdma_memory_pool_max_regions, 1, "Max number of regions"); DEFINE_int32(rdma_memory_pool_buckets, 4, "Number of buckets to reduce race"); DEFINE_int32(rdma_memory_pool_tls_cache_num, 128, "Number of cached block in tls"); +DEFINE_bool(rdma_memory_pool_user_specified_memory, false, + "If true, the user must call UserExtendBlockPool() to extend " + "memory. bRPC will not handle memory extension."); static RegisterCallback g_cb = NULL; @@ -125,31 +128,13 @@ uint32_t GetRegionId(const void* buf) { return r->id; } -// Extend the block pool with a new region (with different region ID) -static void* ExtendBlockPool(size_t region_size, int block_type) { - if (region_size < 1) { - errno = EINVAL; - return NULL; - } - +static void* ExtendBlockPoolImpl(void* region_base, size_t region_size, + int block_type) { if (g_region_num == FLAGS_rdma_memory_pool_max_regions) { LOG(INFO) << "Memory pool reaches max regions"; errno = ENOMEM; return NULL; } - - // Regularize region size - region_size = region_size * BYTES_IN_MB / g_block_size[block_type] / g_buckets; - region_size *= g_block_size[block_type] * g_buckets; - - LOG(INFO) << "Start extend rdma memory " << region_size / BYTES_IN_MB << "MB"; - - void* region_base = NULL; - if (posix_memalign(®ion_base, 4096, region_size) != 0) { - PLOG_EVERY_SECOND(ERROR) << "Memory not enough"; - return NULL; - } - uint32_t id = g_cb(region_base, region_size); if (id == 0) { free(region_base); @@ -168,7 +153,7 @@ static void* ExtendBlockPool(size_t region_size, int block_type) { return NULL; } } - + Region* region = &g_regions[g_region_num++]; region->start = (uintptr_t)region_base; region->size = region_size; @@ -178,7 +163,7 @@ static void* ExtendBlockPool(size_t region_size, int block_type) { for (size_t i = 0; i < g_buckets; ++i) { node[i]->start = (void*)(region->start + i * (region_size / g_buckets)); node[i]->len = region_size / g_buckets; - node[i]->next = NULL; + node[i]->next = g_info->idle_list[block_type][i]; g_info->idle_list[block_type][i] = node[i]; g_info->idle_size[block_type][i] += node[i]->len; } @@ -186,15 +171,71 @@ static void* ExtendBlockPool(size_t region_size, int block_type) { return region_base; } -void* InitBlockPool(RegisterCallback cb) { - if (!cb) { +// Extend the block pool with a new region (with different region ID) +static void* ExtendBlockPool(size_t region_size, int block_type) { + if (region_size < 1) { errno = EINVAL; return NULL; } + + if (FLAGS_rdma_memory_pool_user_specified_memory) { + LOG_EVERY_SECOND(ERROR) << "Fail to extend new region, " + "rdma_memory_pool_user_specified_memory is " + "true, ExtendBlockPool is disabled"; + return NULL; + } + + // Regularize region size + region_size = region_size * BYTES_IN_MB / g_block_size[block_type] / g_buckets; + region_size *= g_block_size[block_type] * g_buckets; + + LOG(INFO) << "Start extend rdma memory " << region_size / BYTES_IN_MB << "MB"; + + void* region_base = NULL; + if (posix_memalign(®ion_base, 4096, region_size) != 0) { + PLOG_EVERY_SECOND(ERROR) << "Memory not enough"; + return NULL; + } + + return ExtendBlockPoolImpl(region_base, region_size, block_type); +} + +void* ExtendBlockPoolByUser(void* region_base, size_t region_size, + int block_type) { + if (FLAGS_rdma_memory_pool_user_specified_memory == false) { + LOG_EVERY_SECOND(ERROR) << "User extend memory is disabled"; + return NULL; + } + if (reinterpret_cast<uintptr_t>(region_base) % 4096 != 0) { + LOG_EVERY_SECOND(ERROR) << "region_base must be 4096 aligned"; + return NULL; + } + + uint64_t index = butil::fast_rand() % g_buckets; + BAIDU_SCOPED_LOCK(*g_info->lock[block_type][index]); + BAIDU_SCOPED_LOCK(g_info->extend_lock); + + if (g_region_num > 1 && FLAGS_rdma_memory_pool_buckets > 1) { + LOG_EVERY_SECOND(ERROR) + << "Runtime extend memory only support single bucket"; + return NULL; + } + region_size = + region_size * BYTES_IN_MB / g_block_size[block_type] / g_buckets; + region_size *= g_block_size[block_type] * g_buckets; + + return ExtendBlockPoolImpl(region_base, region_size, block_type); +} + +bool InitBlockPool(RegisterCallback cb) { + if (!cb) { + errno = EINVAL; + return false; + } if (g_cb) { LOG(WARNING) << "Do not initialize block pool repeatedly"; errno = EINVAL; - return NULL; + return false; } g_cb = cb; if (FLAGS_rdma_memory_pool_max_regions < RDMA_MEMORY_POOL_MIN_REGIONS || @@ -204,7 +245,7 @@ void* InitBlockPool(RegisterCallback cb) { << RDMA_MEMORY_POOL_MIN_REGIONS << "," << RDMA_MEMORY_POOL_MAX_REGIONS << "]!"; errno = EINVAL; - return NULL; + return false; } if (FLAGS_rdma_memory_pool_initial_size_mb < RDMA_MEMORY_POOL_MIN_SIZE || FLAGS_rdma_memory_pool_initial_size_mb > RDMA_MEMORY_POOL_MAX_SIZE) { @@ -213,7 +254,7 @@ void* InitBlockPool(RegisterCallback cb) { << RDMA_MEMORY_POOL_MIN_SIZE << "," << RDMA_MEMORY_POOL_MAX_SIZE << "]!"; errno = EINVAL; - return NULL; + return false; } if (FLAGS_rdma_memory_pool_increase_size_mb < RDMA_MEMORY_POOL_MIN_SIZE || FLAGS_rdma_memory_pool_increase_size_mb > RDMA_MEMORY_POOL_MAX_SIZE) { @@ -222,7 +263,7 @@ void* InitBlockPool(RegisterCallback cb) { << RDMA_MEMORY_POOL_MIN_SIZE << "," << RDMA_MEMORY_POOL_MAX_SIZE << "]!"; errno = EINVAL; - return NULL; + return false; } if (FLAGS_rdma_memory_pool_buckets < RDMA_MEMORY_POOL_MIN_BUCKETS || FLAGS_rdma_memory_pool_buckets > RDMA_MEMORY_POOL_MAX_BUCKETS) { @@ -231,32 +272,38 @@ void* InitBlockPool(RegisterCallback cb) { << RDMA_MEMORY_POOL_MIN_BUCKETS << "," << RDMA_MEMORY_POOL_MAX_BUCKETS << "]!"; errno = EINVAL; - return NULL; + return false; + } + // runtime extend block pool only support 1 bucket + if (FLAGS_rdma_memory_pool_max_regions > 1 && + FLAGS_rdma_memory_pool_buckets > 1) { + LOG(WARNING) << "rdma runtime extend block pool only support 1 bucket"; + return false; } g_buckets = FLAGS_rdma_memory_pool_buckets; g_info = new (std::nothrow) GlobalInfo; if (!g_info) { - return NULL; + return false; } for (int i = 0; i < BLOCK_SIZE_COUNT; ++i) { g_info->idle_list[i].resize(g_buckets, NULL); if (g_info->idle_list[i].size() != g_buckets) { - return NULL; + return false; } g_info->lock[i].resize(g_buckets, NULL); if (g_info->lock[i].size() != g_buckets) { - return NULL; + return false; } g_info->idle_size[i].resize(g_buckets, 0); if (g_info->idle_size[i].size() != g_buckets) { - return NULL; + return false; } for (size_t j = 0; j < g_buckets; ++j) { g_info->lock[i][j] = new (std::nothrow) butil::Mutex; if (!g_info->lock[i][j]) { - return NULL; + return false; } } } @@ -264,8 +311,15 @@ void* InitBlockPool(RegisterCallback cb) { g_dump_mutex = new butil::Mutex; g_tls_info_mutex = new butil::Mutex; - return ExtendBlockPool(FLAGS_rdma_memory_pool_initial_size_mb, - BLOCK_DEFAULT); + if (FLAGS_rdma_memory_pool_user_specified_memory) { + return true; + } + + if (ExtendBlockPool(FLAGS_rdma_memory_pool_initial_size_mb, + BLOCK_DEFAULT) != NULL) { + return true; + } + return false; } static void* AllocBlockFrom(int block_type) { diff --git a/src/brpc/rdma/block_pool.h b/src/brpc/rdma/block_pool.h index 9edfb837..00a31082 100644 --- a/src/brpc/rdma/block_pool.h +++ b/src/brpc/rdma/block_pool.h @@ -73,7 +73,15 @@ typedef uint32_t (*RegisterCallback)(void*, size_t); // region. It should be the memory registration in brpc. However, // in block_pool, we just abstract it into a function to get region id. // Return the first region's address, NULL if failed and errno is set. -void* InitBlockPool(RegisterCallback cb); +bool InitBlockPool(RegisterCallback cb); + +// In scenarios where users need to manually specify memory regions (e.g., using +// hugepages or custom memory pools), when +// FLAGS_rdma_memory_pool_user_specified_memory is true, user is responsibility +// of extending memory blocks , this ensuring flexibility for advanced use +// cases. +void* ExtendBlockPoolByUser(void* region_base, size_t region_size, + int block_type); // Allocate a buf with length at least @a size (require: size>0) // Return the address allocated, NULL if failed and errno is set. diff --git a/src/brpc/rdma/rdma_helper.cpp b/src/brpc/rdma/rdma_helper.cpp index f7cd9ecd..c755bc8f 100644 --- a/src/brpc/rdma/rdma_helper.cpp +++ b/src/brpc/rdma/rdma_helper.cpp @@ -168,6 +168,11 @@ static void GlobalRelease() { } } +void* UserExtendBlockPool(void* region_base, size_t region_size, + int block_type) { + return ExtendBlockPoolByUser(region_base, region_size, block_type); +} + uint32_t RdmaRegisterMemory(void* buf, size_t size) { // Register the memory as callback in block_pool // The thread-safety should be guaranteed by the caller diff --git a/test/brpc_block_pool_unittest.cpp b/test/brpc_block_pool_unittest.cpp index 76e8c325..f65f2a00 100644 --- a/test/brpc_block_pool_unittest.cpp +++ b/test/brpc_block_pool_unittest.cpp @@ -54,7 +54,7 @@ TEST_F(BlockPoolTest, single_thread) { FLAGS_rdma_memory_pool_increase_size_mb = 1024; FLAGS_rdma_memory_pool_max_regions = 16; FLAGS_rdma_memory_pool_buckets = 4; - EXPECT_TRUE(InitBlockPool(DummyCallback) != NULL); + EXPECT_TRUE(InitBlockPool(DummyCallback)); size_t num = 1024; void* buf[num]; @@ -108,7 +108,7 @@ TEST_F(BlockPoolTest, multiple_thread) { FLAGS_rdma_memory_pool_increase_size_mb = 1024; FLAGS_rdma_memory_pool_max_regions = 16; FLAGS_rdma_memory_pool_buckets = 4; - EXPECT_TRUE(InitBlockPool(DummyCallback) != NULL); + EXPECT_TRUE(InitBlockPool(DummyCallback)); uintptr_t thread_num = 32; bthread_t tid[thread_num]; @@ -130,7 +130,7 @@ TEST_F(BlockPoolTest, extend) { FLAGS_rdma_memory_pool_increase_size_mb = 64; FLAGS_rdma_memory_pool_max_regions = 16; FLAGS_rdma_memory_pool_buckets = 1; - EXPECT_TRUE(InitBlockPool(DummyCallback) != NULL); + EXPECT_TRUE(InitBlockPool(DummyCallback)); EXPECT_EQ(1, GetRegionNum()); size_t num = 15 * 64 * 1024 * 1024 / GetBlockSize(2); @@ -153,7 +153,7 @@ TEST_F(BlockPoolTest, memory_not_enough) { FLAGS_rdma_memory_pool_increase_size_mb = 64; FLAGS_rdma_memory_pool_max_regions = 2; FLAGS_rdma_memory_pool_buckets = 1; - EXPECT_TRUE(InitBlockPool(DummyCallback) != NULL); + EXPECT_TRUE(InitBlockPool(DummyCallback)); EXPECT_EQ(1, GetRegionNum()); size_t num = 64 * 1024 * 1024 / GetBlockSize(2); @@ -179,7 +179,7 @@ TEST_F(BlockPoolTest, invalid_use) { FLAGS_rdma_memory_pool_increase_size_mb = 64; FLAGS_rdma_memory_pool_max_regions = 2; FLAGS_rdma_memory_pool_buckets = 1; - EXPECT_TRUE(InitBlockPool(DummyCallback) != NULL); + EXPECT_TRUE(InitBlockPool(DummyCallback)); void* buf = AllocBlock(0); EXPECT_EQ(NULL, buf); @@ -201,7 +201,7 @@ TEST_F(BlockPoolTest, dump_info) { FLAGS_rdma_memory_pool_increase_size_mb = 64; FLAGS_rdma_memory_pool_max_regions = 2; FLAGS_rdma_memory_pool_buckets = 4; - EXPECT_TRUE(InitBlockPool(DummyCallback) != NULL); + EXPECT_TRUE(InitBlockPool(DummyCallback)); DumpMemoryPoolInfo(std::cout); void* buf = AllocBlock(8192); DumpMemoryPoolInfo(std::cout); --------------------------------------------------------------------- To unsubscribe, e-mail: dev-unsubscr...@brpc.apache.org For additional commands, e-mail: dev-h...@brpc.apache.org