This is an automated email from the ASF dual-hosted git repository. liaoxin pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/master by this push: new a9040c8113b [improve](load) limit flush thread num by CPU count (#33325) a9040c8113b is described below commit a9040c8113b9f41ac480720ef98713d8dd550d5e Author: Kaijie Chen <c...@apache.org> AuthorDate: Sat Apr 27 16:24:45 2024 +0800 [improve](load) limit flush thread num by CPU count (#33325) --- be/src/common/config.cpp | 3 +++ be/src/common/config.h | 3 +++ be/src/olap/memtable_flush_executor.cpp | 11 ++++++++--- be/src/runtime/load_stream_mgr.cpp | 8 ++++++-- 4 files changed, 20 insertions(+), 5 deletions(-) diff --git a/be/src/common/config.cpp b/be/src/common/config.cpp index ea0e0bb3aad..0c257e48a8f 100644 --- a/be/src/common/config.cpp +++ b/be/src/common/config.cpp @@ -662,6 +662,9 @@ DEFINE_mInt64(storage_flood_stage_left_capacity_bytes, "1073741824"); // 1GB DEFINE_Int32(flush_thread_num_per_store, "6"); // number of thread for flushing memtable per store, for high priority load task DEFINE_Int32(high_priority_flush_thread_num_per_store, "6"); +// number of threads = min(flush_thread_num_per_store * num_store, +// max_flush_thread_num_per_cpu * num_cpu) +DEFINE_Int32(max_flush_thread_num_per_cpu, "4"); // config for tablet meta checkpoint DEFINE_mInt32(tablet_meta_checkpoint_min_new_rowsets_num, "10"); diff --git a/be/src/common/config.h b/be/src/common/config.h index 40d336d2308..e1ec93ff63f 100644 --- a/be/src/common/config.h +++ b/be/src/common/config.h @@ -715,6 +715,9 @@ DECLARE_mInt64(storage_flood_stage_left_capacity_bytes); // 1GB DECLARE_Int32(flush_thread_num_per_store); // number of thread for flushing memtable per store, for high priority load task DECLARE_Int32(high_priority_flush_thread_num_per_store); +// number of threads = min(flush_thread_num_per_store * num_store, +// max_flush_thread_num_per_cpu * num_cpu) +DECLARE_Int32(max_flush_thread_num_per_cpu); // config for tablet meta checkpoint DECLARE_mInt32(tablet_meta_checkpoint_min_new_rowsets_num); diff --git a/be/src/olap/memtable_flush_executor.cpp b/be/src/olap/memtable_flush_executor.cpp index db14b9acaee..b52a87dff07 100644 --- a/be/src/olap/memtable_flush_executor.cpp +++ b/be/src/olap/memtable_flush_executor.cpp @@ -202,15 +202,20 @@ void FlushToken::_flush_memtable(std::unique_ptr<MemTable> memtable_ptr, int32_t void MemTableFlushExecutor::init(int num_disk) { num_disk = std::max(1, num_disk); - size_t min_threads = std::max(1, config::flush_thread_num_per_store); - size_t max_threads = num_disk * min_threads; + int num_cpus = std::thread::hardware_concurrency(); + int min_threads = std::max(1, config::flush_thread_num_per_store); + int max_threads = num_cpus == 0 ? num_disk * min_threads + : std::min(num_disk * min_threads, + num_cpus * config::max_flush_thread_num_per_cpu); static_cast<void>(ThreadPoolBuilder("MemTableFlushThreadPool") .set_min_threads(min_threads) .set_max_threads(max_threads) .build(&_flush_pool)); min_threads = std::max(1, config::high_priority_flush_thread_num_per_store); - max_threads = num_disk * min_threads; + max_threads = num_cpus == 0 ? num_disk * min_threads + : std::min(num_disk * min_threads, + num_cpus * config::max_flush_thread_num_per_cpu); static_cast<void>(ThreadPoolBuilder("MemTableHighPriorityFlushThreadPool") .set_min_threads(min_threads) .set_max_threads(max_threads) diff --git a/be/src/runtime/load_stream_mgr.cpp b/be/src/runtime/load_stream_mgr.cpp index 7d9de12d078..1964cf257e3 100644 --- a/be/src/runtime/load_stream_mgr.cpp +++ b/be/src/runtime/load_stream_mgr.cpp @@ -37,9 +37,13 @@ LoadStreamMgr::LoadStreamMgr(uint32_t segment_file_writer_thread_num, : _num_threads(segment_file_writer_thread_num), _heavy_work_pool(heavy_work_pool), _light_work_pool(light_work_pool) { + uint32_t num_cpu = std::thread::hardware_concurrency(); + uint32_t thread_num = num_cpu == 0 ? segment_file_writer_thread_num + : std::min(segment_file_writer_thread_num, + num_cpu * config::max_flush_thread_num_per_cpu); static_cast<void>(ThreadPoolBuilder("SegmentFileWriterThreadPool") - .set_min_threads(segment_file_writer_thread_num) - .set_max_threads(segment_file_writer_thread_num) + .set_min_threads(thread_num) + .set_max_threads(thread_num) .build(&_file_writer_thread_pool)); } --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org For additional commands, e-mail: commits-h...@doris.apache.org