This is an automated email from the ASF dual-hosted git repository. wangbo pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/master by this push: new 9ec4c54c7a3 [Fix]add set thread num config for wg flush pool (#37028) 9ec4c54c7a3 is described below commit 9ec4c54c7a33a63c2646fb20f2cddea5952ef0e5 Author: wangbo <wan...@apache.org> AuthorDate: Tue Jul 2 09:58:57 2024 +0800 [Fix]add set thread num config for wg flush pool (#37028) ## Proposed changes calculate workload group's mem table flush pool 's thread num by cpu and disk num, other wise it may cause mem table flush cost more memory than not enable workload group. --- be/src/common/config.cpp | 3 ++ be/src/common/config.h | 4 ++ be/src/olap/delta_writer_v2.cpp | 2 +- be/src/olap/storage_engine.cpp | 3 +- be/src/olap/storage_engine.h | 4 ++ be/src/runtime/query_context.cpp | 6 +-- be/src/runtime/query_context.h | 4 +- be/src/runtime/workload_group/workload_group.cpp | 53 ++++++++++++++++-------- be/src/runtime/workload_group/workload_group.h | 4 +- be/src/vec/sink/writer/async_result_writer.cpp | 27 ++++-------- 10 files changed, 64 insertions(+), 46 deletions(-) diff --git a/be/src/common/config.cpp b/be/src/common/config.cpp index 7166b39dda8..78afc756af8 100644 --- a/be/src/common/config.cpp +++ b/be/src/common/config.cpp @@ -691,6 +691,9 @@ DEFINE_Int32(high_priority_flush_thread_num_per_store, "6"); // max_flush_thread_num_per_cpu * num_cpu) DEFINE_Int32(max_flush_thread_num_per_cpu, "4"); +DEFINE_mInt32(wg_flush_thread_num_per_store, "6"); +DEFINE_mInt32(wg_flush_thread_num_per_cpu, "4"); + // config for tablet meta checkpoint DEFINE_mInt32(tablet_meta_checkpoint_min_new_rowsets_num, "10"); DEFINE_mInt32(tablet_meta_checkpoint_min_interval_secs, "600"); diff --git a/be/src/common/config.h b/be/src/common/config.h index dbb5b716b78..1a9e3291db5 100644 --- a/be/src/common/config.h +++ b/be/src/common/config.h @@ -743,6 +743,10 @@ DECLARE_Int32(high_priority_flush_thread_num_per_store); // max_flush_thread_num_per_cpu * num_cpu) DECLARE_Int32(max_flush_thread_num_per_cpu); +// workload group flush pool params +DECLARE_mInt32(wg_flush_thread_num_per_store); +DECLARE_mInt32(wg_flush_thread_num_per_cpu); + // config for tablet meta checkpoint DECLARE_mInt32(tablet_meta_checkpoint_min_new_rowsets_num); DECLARE_mInt32(tablet_meta_checkpoint_min_interval_secs); diff --git a/be/src/olap/delta_writer_v2.cpp b/be/src/olap/delta_writer_v2.cpp index 80978280b92..3f2f7bf99fa 100644 --- a/be/src/olap/delta_writer_v2.cpp +++ b/be/src/olap/delta_writer_v2.cpp @@ -128,7 +128,7 @@ Status DeltaWriterV2::init() { RETURN_IF_ERROR(_rowset_writer->init(context)); ThreadPool* wg_thread_pool_ptr = nullptr; if (_state->get_query_ctx()) { - wg_thread_pool_ptr = _state->get_query_ctx()->get_non_pipe_exec_thread_pool(); + wg_thread_pool_ptr = _state->get_query_ctx()->get_memtable_flush_pool(); } RETURN_IF_ERROR(_memtable_writer->init(_rowset_writer, _tablet_schema, _partial_update_info, wg_thread_pool_ptr, diff --git a/be/src/olap/storage_engine.cpp b/be/src/olap/storage_engine.cpp index 5d50bb5f4df..90093241ad2 100644 --- a/be/src/olap/storage_engine.cpp +++ b/be/src/olap/storage_engine.cpp @@ -233,8 +233,9 @@ Status StorageEngine::_open() { auto dirs = get_stores(); RETURN_IF_ERROR(load_data_dirs(dirs)); + _disk_num = dirs.size(); _memtable_flush_executor = std::make_unique<MemTableFlushExecutor>(); - _memtable_flush_executor->init(dirs.size()); + _memtable_flush_executor->init(_disk_num); _calc_delete_bitmap_executor = std::make_unique<CalcDeleteBitmapExecutor>(); _calc_delete_bitmap_executor->init(); diff --git a/be/src/olap/storage_engine.h b/be/src/olap/storage_engine.h index 94cf142a8c1..5ddd888db6d 100644 --- a/be/src/olap/storage_engine.h +++ b/be/src/olap/storage_engine.h @@ -131,6 +131,8 @@ public: int64_t memory_limitation_bytes_per_thread_for_schema_change() const; + int get_disk_num() { return _disk_num; } + protected: void _evict_querying_rowset(); void _evict_quring_rowset_thread_callback(); @@ -153,6 +155,8 @@ protected: scoped_refptr<Thread> _evict_quering_rowset_thread; int64_t _memory_limitation_bytes_for_schema_change; + + int _disk_num {-1}; }; class StorageEngine final : public BaseStorageEngine { diff --git a/be/src/runtime/query_context.cpp b/be/src/runtime/query_context.cpp index 18d565dcfef..dd7cf4f55b8 100644 --- a/be/src/runtime/query_context.cpp +++ b/be/src/runtime/query_context.cpp @@ -326,9 +326,9 @@ doris::pipeline::TaskScheduler* QueryContext::get_pipe_exec_scheduler() { return _exec_env->pipeline_task_scheduler(); } -ThreadPool* QueryContext::get_non_pipe_exec_thread_pool() { +ThreadPool* QueryContext::get_memtable_flush_pool() { if (_workload_group) { - return _non_pipe_thread_pool; + return _memtable_flush_pool; } else { return nullptr; } @@ -340,7 +340,7 @@ Status QueryContext::set_workload_group(WorkloadGroupPtr& tg) { // see task_group_manager::delete_workload_group_by_ids _workload_group->add_mem_tracker_limiter(query_mem_tracker); _workload_group->get_query_scheduler(&_task_scheduler, &_scan_task_scheduler, - &_non_pipe_thread_pool, &_remote_scan_task_scheduler); + &_memtable_flush_pool, &_remote_scan_task_scheduler); return Status::OK(); } diff --git a/be/src/runtime/query_context.h b/be/src/runtime/query_context.h index aee05ed3185..b565214ef22 100644 --- a/be/src/runtime/query_context.h +++ b/be/src/runtime/query_context.h @@ -205,7 +205,7 @@ public: doris::pipeline::TaskScheduler* get_pipe_exec_scheduler(); - ThreadPool* get_non_pipe_exec_thread_pool(); + ThreadPool* get_memtable_flush_pool(); std::vector<TUniqueId> get_fragment_instance_ids() const { return fragment_instance_ids; } @@ -298,7 +298,7 @@ private: doris::pipeline::TaskScheduler* _task_scheduler = nullptr; vectorized::SimplifiedScanScheduler* _scan_task_scheduler = nullptr; - ThreadPool* _non_pipe_thread_pool = nullptr; + ThreadPool* _memtable_flush_pool = nullptr; vectorized::SimplifiedScanScheduler* _remote_scan_task_scheduler = nullptr; std::unique_ptr<pipeline::Dependency> _execution_dependency; diff --git a/be/src/runtime/workload_group/workload_group.cpp b/be/src/runtime/workload_group/workload_group.cpp index b7a46090230..843e06440d2 100644 --- a/be/src/runtime/workload_group/workload_group.cpp +++ b/be/src/runtime/workload_group/workload_group.cpp @@ -27,6 +27,7 @@ #include <utility> #include "common/logging.h" +#include "olap/storage_engine.h" #include "pipeline/task_queue.h" #include "pipeline/task_scheduler.h" #include "runtime/exec_env.h" @@ -430,19 +431,35 @@ void WorkloadGroup::upsert_task_scheduler(WorkloadGroupInfo* tg_info, ExecEnv* e min_remote_scan_thread_num); } - if (_non_pipe_thread_pool == nullptr) { - std::unique_ptr<ThreadPool> thread_pool = nullptr; - auto ret = ThreadPoolBuilder("nonPip_" + tg_name) - .set_min_threads(1) - .set_max_threads(config::fragment_pool_thread_num_max) - .set_max_queue_size(config::fragment_pool_queue_size) - .set_cgroup_cpu_ctl(cg_cpu_ctl_ptr) - .build(&thread_pool); - if (!ret.ok()) { - LOG(INFO) << "[upsert wg thread pool] create non-pipline thread pool failed, gid=" - << tg_id; - } else { - _non_pipe_thread_pool = std::move(thread_pool); + if (_memtable_flush_pool == nullptr) { + int num_disk = ExecEnv::GetInstance()->storage_engine().get_disk_num(); + // -1 means disk num may not be inited, so not create flush pool + if (num_disk != -1) { + std::unique_ptr<ThreadPool> thread_pool = nullptr; + num_disk = std::max(1, num_disk); + int num_cpus = std::thread::hardware_concurrency(); + + int min_threads = std::max(1, config::wg_flush_thread_num_per_store); + int max_threads = num_cpus == 0 + ? num_disk * min_threads + : std::min(num_disk * min_threads, + num_cpus * config::wg_flush_thread_num_per_cpu); + + std::string pool_name = "wg_flush_" + tg_name; + auto ret = ThreadPoolBuilder(pool_name) + .set_min_threads(min_threads) + .set_max_threads(max_threads) + .set_cgroup_cpu_ctl(cg_cpu_ctl_ptr) + .build(&thread_pool); + if (!ret.ok()) { + LOG(INFO) << "[upsert wg thread pool] create " + pool_name + " failed, gid=" + << tg_id; + } else { + _memtable_flush_pool = std::move(thread_pool); + LOG(INFO) << "[upsert wg thread pool] create " + pool_name + " succ, gid=" << tg_id + << ", max thread num=" << max_threads + << ", min thread num=" << min_threads; + } } } @@ -470,13 +487,13 @@ void WorkloadGroup::upsert_task_scheduler(WorkloadGroupInfo* tg_info, ExecEnv* e void WorkloadGroup::get_query_scheduler(doris::pipeline::TaskScheduler** exec_sched, vectorized::SimplifiedScanScheduler** scan_sched, - ThreadPool** non_pipe_thread_pool, + ThreadPool** memtable_flush_pool, vectorized::SimplifiedScanScheduler** remote_scan_sched) { std::shared_lock<std::shared_mutex> rlock(_task_sched_lock); *exec_sched = _task_sched.get(); *scan_sched = _scan_task_sched.get(); *remote_scan_sched = _remote_scan_task_sched.get(); - *non_pipe_thread_pool = _non_pipe_thread_pool.get(); + *memtable_flush_pool = _memtable_flush_pool.get(); } void WorkloadGroup::try_stop_schedulers() { @@ -490,9 +507,9 @@ void WorkloadGroup::try_stop_schedulers() { if (_remote_scan_task_sched) { _remote_scan_task_sched->stop(); } - if (_non_pipe_thread_pool) { - _non_pipe_thread_pool->shutdown(); - _non_pipe_thread_pool->wait(); + if (_memtable_flush_pool) { + _memtable_flush_pool->shutdown(); + _memtable_flush_pool->wait(); } } diff --git a/be/src/runtime/workload_group/workload_group.h b/be/src/runtime/workload_group/workload_group.h index 8386d778aec..786e297bc29 100644 --- a/be/src/runtime/workload_group/workload_group.h +++ b/be/src/runtime/workload_group/workload_group.h @@ -151,7 +151,7 @@ public: void get_query_scheduler(doris::pipeline::TaskScheduler** exec_sched, vectorized::SimplifiedScanScheduler** scan_sched, - ThreadPool** non_pipe_thread_pool, + ThreadPool** memtable_flush_pool, vectorized::SimplifiedScanScheduler** remote_scan_sched); void try_stop_schedulers(); @@ -189,7 +189,7 @@ private: std::unique_ptr<doris::pipeline::TaskScheduler> _task_sched {nullptr}; std::unique_ptr<vectorized::SimplifiedScanScheduler> _scan_task_sched {nullptr}; std::unique_ptr<vectorized::SimplifiedScanScheduler> _remote_scan_task_sched {nullptr}; - std::unique_ptr<ThreadPool> _non_pipe_thread_pool = nullptr; + std::unique_ptr<ThreadPool> _memtable_flush_pool = nullptr; }; using WorkloadGroupPtr = std::shared_ptr<WorkloadGroup>; diff --git a/be/src/vec/sink/writer/async_result_writer.cpp b/be/src/vec/sink/writer/async_result_writer.cpp index 42fd8468e86..82c5f4ab288 100644 --- a/be/src/vec/sink/writer/async_result_writer.cpp +++ b/be/src/vec/sink/writer/async_result_writer.cpp @@ -96,25 +96,14 @@ Status AsyncResultWriter::start_writer(RuntimeState* state, RuntimeProfile* prof // This is a async thread, should lock the task ctx, to make sure runtimestate and profile // not deconstructed before the thread exit. auto task_ctx = state->get_task_execution_context(); - if (state->get_query_ctx() && state->get_query_ctx()->get_non_pipe_exec_thread_pool()) { - ThreadPool* pool_ptr = state->get_query_ctx()->get_non_pipe_exec_thread_pool(); - RETURN_IF_ERROR(pool_ptr->submit_func([this, state, profile, task_ctx]() { - auto task_lock = task_ctx.lock(); - if (task_lock == nullptr) { - return; - } - this->process_block(state, profile); - })); - } else { - RETURN_IF_ERROR(ExecEnv::GetInstance()->fragment_mgr()->get_thread_pool()->submit_func( - [this, state, profile, task_ctx]() { - auto task_lock = task_ctx.lock(); - if (task_lock == nullptr) { - return; - } - this->process_block(state, profile); - })); - } + RETURN_IF_ERROR(ExecEnv::GetInstance()->fragment_mgr()->get_thread_pool()->submit_func( + [this, state, profile, task_ctx]() { + auto task_lock = task_ctx.lock(); + if (task_lock == nullptr) { + return; + } + this->process_block(state, profile); + })); return Status::OK(); } --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org For additional commands, e-mail: commits-h...@doris.apache.org