This is an automated email from the ASF dual-hosted git repository.
yiguolei pushed a commit to branch branch-1.2-lts
in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/branch-1.2-lts by this push:
new 7a85cd1e4f [enhancement](execute model) using thread pool to execute
report or join task instead of staring too many thread (#17212)
7a85cd1e4f is described below
commit 7a85cd1e4f7bb0dcc34014b7bf6e41eba78a5dc5
Author: yiguolei <[email protected]>
AuthorDate: Wed Mar 1 08:35:27 2023 +0800
[enhancement](execute model) using thread pool to execute report or join
task instead of staring too many thread (#17212)
* [enhancement](execute model) using thread pool to execute report or join
task instead of staring too many thread
Doris will start report thread and join thread during fragment execution.
There are many problems if create and destroy thread very frequently. Jemalloc
may not behave very well, it may crashed.
jemalloc/jemalloc#1405
It is better to using thread pool to do these tasks.
---------
Co-authored-by: yiguolei <[email protected]>
---
be/src/runtime/exec_env.h | 7 +++++++
be/src/runtime/exec_env_init.cpp | 15 +++++++++++++++
be/src/runtime/plan_fragment_executor.cpp | 14 +++++++++++---
be/src/runtime/plan_fragment_executor.h | 4 +++-
be/src/vec/exec/join/vjoin_node_base.cpp | 11 ++++++-----
5 files changed, 42 insertions(+), 9 deletions(-)
diff --git a/be/src/runtime/exec_env.h b/be/src/runtime/exec_env.h
index a0a132cfc4..f1bb540326 100644
--- a/be/src/runtime/exec_env.h
+++ b/be/src/runtime/exec_env.h
@@ -126,6 +126,9 @@ public:
ThreadPool* limited_scan_thread_pool() { return
_limited_scan_thread_pool.get(); }
ThreadPool* send_batch_thread_pool() { return
_send_batch_thread_pool.get(); }
ThreadPool* download_cache_thread_pool() { return
_download_cache_thread_pool.get(); }
+ ThreadPool* send_report_thread_pool() { return
_send_report_thread_pool.get(); }
+ ThreadPool* join_node_thread_pool() { return _join_node_thread_pool.get();
}
+
void set_serial_download_cache_thread_token() {
_serial_download_cache_thread_token =
download_cache_thread_pool()->new_token(ThreadPool::ExecutionMode::SERIAL, 1);
@@ -228,6 +231,10 @@ private:
std::unique_ptr<ThreadPool> _download_cache_thread_pool;
// A token used to submit download cache task serially
std::unique_ptr<ThreadPoolToken> _serial_download_cache_thread_token;
+ // Pool used by fragment manager to send profile or status to FE
coordinator
+ std::unique_ptr<ThreadPool> _send_report_thread_pool;
+ // Pool used by join node to build hash table
+ std::unique_ptr<ThreadPool> _join_node_thread_pool;
// ThreadPoolToken -> buffer
std::unordered_map<ThreadPoolToken*, std::unique_ptr<char[]>>
_download_cache_buf_map;
CgroupsMgr* _cgroups_mgr = nullptr;
diff --git a/be/src/runtime/exec_env_init.cpp b/be/src/runtime/exec_env_init.cpp
index a9a1eac159..779b3fba98 100644
--- a/be/src/runtime/exec_env_init.cpp
+++ b/be/src/runtime/exec_env_init.cpp
@@ -128,6 +128,21 @@ Status ExecEnv::_init(const std::vector<StorePath>&
store_paths) {
init_download_cache_required_components();
+ // min num equal to fragment pool's min num
+ // max num is useless because it will start as many as requested in the
past
+ // queue size is useless because the max thread num is very large
+ ThreadPoolBuilder("SendReportThreadPool")
+ .set_min_threads(config::fragment_pool_thread_num_min)
+ .set_max_threads(std::numeric_limits<int>::max())
+ .set_max_queue_size(config::fragment_pool_queue_size)
+ .build(&_send_report_thread_pool);
+
+ ThreadPoolBuilder("JoinNodeThreadPool")
+ .set_min_threads(config::fragment_pool_thread_num_min)
+ .set_max_threads(std::numeric_limits<int>::max())
+ .set_max_queue_size(config::fragment_pool_queue_size)
+ .build(&_join_node_thread_pool);
+
_scanner_scheduler = new doris::vectorized::ScannerScheduler();
_cgroups_mgr = new CgroupsMgr(this, config::doris_cgroups);
diff --git a/be/src/runtime/plan_fragment_executor.cpp
b/be/src/runtime/plan_fragment_executor.cpp
index 020b6ae9e4..41a64916e8 100644
--- a/be/src/runtime/plan_fragment_executor.cpp
+++ b/be/src/runtime/plan_fragment_executor.cpp
@@ -66,7 +66,9 @@ PlanFragmentExecutor::PlanFragmentExecutor(ExecEnv* exec_env,
_is_report_success(false),
_is_report_on_cancel(true),
_collect_query_statistics_with_every_batch(false),
- _cancel_reason(PPlanFragmentCancelReason::INTERNAL_ERROR) {}
+ _cancel_reason(PPlanFragmentCancelReason::INTERNAL_ERROR) {
+ _report_thread_future = _report_thread_promise.get_future();
+}
PlanFragmentExecutor::~PlanFragmentExecutor() {
close();
@@ -243,7 +245,10 @@ Status PlanFragmentExecutor::open() {
// at end, otherwise the coordinator hangs in case we finish w/ an error
if (_is_report_success && _report_status_cb &&
config::status_report_interval > 0) {
std::unique_lock<std::mutex> l(_report_thread_lock);
- _report_thread = std::thread(&PlanFragmentExecutor::report_profile,
this);
+ _exec_env->send_report_thread_pool()->submit_func([this] {
+ Defer defer {[&]() { this->_report_thread_promise.set_value(true);
}};
+ this->report_profile();
+ });
// make sure the thread started up, otherwise report_profile() might
get into a race
// with stop_report_thread()
_report_thread_started_cv.wait(l);
@@ -560,7 +565,10 @@ void PlanFragmentExecutor::stop_report_thread() {
_report_thread_active = false;
_stop_report_thread_cv.notify_one();
- _report_thread.join();
+ // Wait infinitly until the thread is stopped and the future is set.
+ // The reporting thread depends on the PlanFragmentExecutor object, if not
wait infinitly here, the reporting
+ // thread may crashed because the PlanFragmentExecutor is destroyed.
+ _report_thread_future.wait();
}
Status PlanFragmentExecutor::get_next(RowBatch** batch) {
diff --git a/be/src/runtime/plan_fragment_executor.h
b/be/src/runtime/plan_fragment_executor.h
index 215ff0973b..41c744bc3a 100644
--- a/be/src/runtime/plan_fragment_executor.h
+++ b/be/src/runtime/plan_fragment_executor.h
@@ -22,6 +22,7 @@
#include <condition_variable>
#include <functional>
+#include <future>
#include <vector>
#include "common/object_pool.h"
@@ -147,7 +148,8 @@ private:
// profile reporting-related
report_status_callback _report_status_cb;
- std::thread _report_thread;
+ std::promise<bool> _report_thread_promise;
+ std::future<bool> _report_thread_future;
std::mutex _report_thread_lock;
// Indicates that profile reporting thread should stop.
diff --git a/be/src/vec/exec/join/vjoin_node_base.cpp
b/be/src/vec/exec/join/vjoin_node_base.cpp
index d7f9c96c6a..f477b32c74 100644
--- a/be/src/vec/exec/join/vjoin_node_base.cpp
+++ b/be/src/vec/exec/join/vjoin_node_base.cpp
@@ -176,11 +176,12 @@ Status VJoinNodeBase::open(RuntimeState* state) {
std::promise<Status> thread_status;
try {
- std::thread([this, state, thread_status_p = &thread_status,
- parent_span =
opentelemetry::trace::Tracer::GetCurrentSpan()] {
- OpentelemetryScope scope {parent_span};
- this->_probe_side_open_thread(state, thread_status_p);
- }).detach();
+ state->exec_env()->join_node_thread_pool()->submit_func(
+ [this, state, thread_status_p = &thread_status,
+ parent_span = opentelemetry::trace::Tracer::GetCurrentSpan()]
{
+ OpentelemetryScope scope {parent_span};
+ this->_probe_side_open_thread(state, thread_status_p);
+ });
} catch (const std::system_error& e) {
LOG(WARNING) << "In VJoinNodeBase::open create thread fail, " <<
e.what();
return Status::InternalError(e.what());
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]