This is an automated email from the ASF dual-hosted git repository.

liaoxin pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/doris.git


The following commit(s) were added to refs/heads/master by this push:
     new a9040c8113b [improve](load) limit flush thread num by CPU count 
(#33325)
a9040c8113b is described below

commit a9040c8113b9f41ac480720ef98713d8dd550d5e
Author: Kaijie Chen <c...@apache.org>
AuthorDate: Sat Apr 27 16:24:45 2024 +0800

    [improve](load) limit flush thread num by CPU count (#33325)
---
 be/src/common/config.cpp                |  3 +++
 be/src/common/config.h                  |  3 +++
 be/src/olap/memtable_flush_executor.cpp | 11 ++++++++---
 be/src/runtime/load_stream_mgr.cpp      |  8 ++++++--
 4 files changed, 20 insertions(+), 5 deletions(-)

diff --git a/be/src/common/config.cpp b/be/src/common/config.cpp
index ea0e0bb3aad..0c257e48a8f 100644
--- a/be/src/common/config.cpp
+++ b/be/src/common/config.cpp
@@ -662,6 +662,9 @@ DEFINE_mInt64(storage_flood_stage_left_capacity_bytes, 
"1073741824"); // 1GB
 DEFINE_Int32(flush_thread_num_per_store, "6");
 // number of thread for flushing memtable per store, for high priority load 
task
 DEFINE_Int32(high_priority_flush_thread_num_per_store, "6");
+// number of threads = min(flush_thread_num_per_store * num_store,
+//                         max_flush_thread_num_per_cpu * num_cpu)
+DEFINE_Int32(max_flush_thread_num_per_cpu, "4");
 
 // config for tablet meta checkpoint
 DEFINE_mInt32(tablet_meta_checkpoint_min_new_rowsets_num, "10");
diff --git a/be/src/common/config.h b/be/src/common/config.h
index 40d336d2308..e1ec93ff63f 100644
--- a/be/src/common/config.h
+++ b/be/src/common/config.h
@@ -715,6 +715,9 @@ DECLARE_mInt64(storage_flood_stage_left_capacity_bytes); // 
1GB
 DECLARE_Int32(flush_thread_num_per_store);
 // number of thread for flushing memtable per store, for high priority load 
task
 DECLARE_Int32(high_priority_flush_thread_num_per_store);
+// number of threads = min(flush_thread_num_per_store * num_store,
+//                         max_flush_thread_num_per_cpu * num_cpu)
+DECLARE_Int32(max_flush_thread_num_per_cpu);
 
 // config for tablet meta checkpoint
 DECLARE_mInt32(tablet_meta_checkpoint_min_new_rowsets_num);
diff --git a/be/src/olap/memtable_flush_executor.cpp 
b/be/src/olap/memtable_flush_executor.cpp
index db14b9acaee..b52a87dff07 100644
--- a/be/src/olap/memtable_flush_executor.cpp
+++ b/be/src/olap/memtable_flush_executor.cpp
@@ -202,15 +202,20 @@ void 
FlushToken::_flush_memtable(std::unique_ptr<MemTable> memtable_ptr, int32_t
 
 void MemTableFlushExecutor::init(int num_disk) {
     num_disk = std::max(1, num_disk);
-    size_t min_threads = std::max(1, config::flush_thread_num_per_store);
-    size_t max_threads = num_disk * min_threads;
+    int num_cpus = std::thread::hardware_concurrency();
+    int min_threads = std::max(1, config::flush_thread_num_per_store);
+    int max_threads = num_cpus == 0 ? num_disk * min_threads
+                                    : std::min(num_disk * min_threads,
+                                               num_cpus * 
config::max_flush_thread_num_per_cpu);
     static_cast<void>(ThreadPoolBuilder("MemTableFlushThreadPool")
                               .set_min_threads(min_threads)
                               .set_max_threads(max_threads)
                               .build(&_flush_pool));
 
     min_threads = std::max(1, 
config::high_priority_flush_thread_num_per_store);
-    max_threads = num_disk * min_threads;
+    max_threads = num_cpus == 0 ? num_disk * min_threads
+                                : std::min(num_disk * min_threads,
+                                           num_cpus * 
config::max_flush_thread_num_per_cpu);
     static_cast<void>(ThreadPoolBuilder("MemTableHighPriorityFlushThreadPool")
                               .set_min_threads(min_threads)
                               .set_max_threads(max_threads)
diff --git a/be/src/runtime/load_stream_mgr.cpp 
b/be/src/runtime/load_stream_mgr.cpp
index 7d9de12d078..1964cf257e3 100644
--- a/be/src/runtime/load_stream_mgr.cpp
+++ b/be/src/runtime/load_stream_mgr.cpp
@@ -37,9 +37,13 @@ LoadStreamMgr::LoadStreamMgr(uint32_t 
segment_file_writer_thread_num,
         : _num_threads(segment_file_writer_thread_num),
           _heavy_work_pool(heavy_work_pool),
           _light_work_pool(light_work_pool) {
+    uint32_t num_cpu = std::thread::hardware_concurrency();
+    uint32_t thread_num = num_cpu == 0 ? segment_file_writer_thread_num
+                                       : 
std::min(segment_file_writer_thread_num,
+                                                  num_cpu * 
config::max_flush_thread_num_per_cpu);
     static_cast<void>(ThreadPoolBuilder("SegmentFileWriterThreadPool")
-                              .set_min_threads(segment_file_writer_thread_num)
-                              .set_max_threads(segment_file_writer_thread_num)
+                              .set_min_threads(thread_num)
+                              .set_max_threads(thread_num)
                               .build(&_file_writer_thread_pool));
 }
 


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org
For additional commands, e-mail: commits-h...@doris.apache.org

Reply via email to