This is an automated email from the ASF dual-hosted git repository.
yiguolei pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/master by this push:
new e22a9ecc3b [enhancement](execute model) using thread pool to execute
report or join task instead of staring too many thread (#17212)
e22a9ecc3b is described below
commit e22a9ecc3b116546e4aafeeaeebfa46e15adfb4e
Author: yiguolei <[email protected]>
AuthorDate: Wed Mar 1 08:35:27 2023 +0800
[enhancement](execute model) using thread pool to execute report or join
task instead of staring too many thread (#17212)
* [enhancement](execute model) using thread pool to execute report or join
task instead of staring too many thread
Doris will start report thread and join thread during fragment execution.
There are many problems if create and destroy thread very frequently. Jemalloc
may not behave very well, it may crashed.
jemalloc/jemalloc#1405
It is better to using thread pool to do these tasks.
---------
Co-authored-by: yiguolei <[email protected]>
---
be/src/pipeline/pipeline_fragment_context.cpp | 10 ++++++++--
be/src/pipeline/pipeline_fragment_context.h | 5 ++++-
be/src/runtime/exec_env.h | 7 +++++++
be/src/runtime/exec_env_init.cpp | 15 +++++++++++++++
be/src/runtime/plan_fragment_executor.cpp | 14 +++++++++++---
be/src/runtime/plan_fragment_executor.h | 4 +++-
be/src/vec/exec/join/vjoin_node_base.cpp | 11 ++++++-----
7 files changed, 54 insertions(+), 12 deletions(-)
diff --git a/be/src/pipeline/pipeline_fragment_context.cpp
b/be/src/pipeline/pipeline_fragment_context.cpp
index f37dd4db7b..df51da373a 100644
--- a/be/src/pipeline/pipeline_fragment_context.cpp
+++ b/be/src/pipeline/pipeline_fragment_context.cpp
@@ -103,6 +103,7 @@ PipelineFragmentContext::PipelineFragmentContext(
_report_thread_active(false),
_report_status_cb(report_status_cb),
_is_report_on_cancel(true) {
+ _report_thread_future = _report_thread_promise.get_future();
_fragment_watcher.start();
}
@@ -276,7 +277,10 @@ Status PipelineFragmentContext::prepare(const
doris::TExecPlanFragmentParams& re
if (_is_report_success && config::status_report_interval > 0) {
std::unique_lock<std::mutex> l(_report_thread_lock);
- _report_thread = std::thread(&PipelineFragmentContext::report_profile,
this);
+ _exec_env->send_report_thread_pool()->submit_func([this] {
+ Defer defer {[&]() { this->_report_thread_promise.set_value(true);
}};
+ this->report_profile();
+ });
// make sure the thread started up, otherwise report_profile() might
get into a race
// with stop_report_thread()
_report_thread_started_cv.wait(l);
@@ -475,7 +479,9 @@ void PipelineFragmentContext::_stop_report_thread() {
_report_thread_active = false;
_stop_report_thread_cv.notify_one();
- _report_thread.join();
+ // Wait infinitly to ensure that the report task is finished and the this
variable
+ // is not used in report thread.
+ _report_thread_future.wait();
}
void PipelineFragmentContext::report_profile() {
diff --git a/be/src/pipeline/pipeline_fragment_context.h
b/be/src/pipeline/pipeline_fragment_context.h
index 38b07d503e..0a756ed37f 100644
--- a/be/src/pipeline/pipeline_fragment_context.h
+++ b/be/src/pipeline/pipeline_fragment_context.h
@@ -17,6 +17,8 @@
#pragma once
+#include <future>
+
#include "io/fs/stream_load_pipe.h"
#include "pipeline/pipeline.h"
#include "pipeline/pipeline_task.h"
@@ -179,7 +181,8 @@ private:
bool _report_thread_active;
// profile reporting-related
report_status_callback _report_status_cb;
- std::thread _report_thread;
+ std::promise<bool> _report_thread_promise;
+ std::future<bool> _report_thread_future;
std::mutex _report_thread_lock;
// Indicates that profile reporting thread should stop.
diff --git a/be/src/runtime/exec_env.h b/be/src/runtime/exec_env.h
index 4d2163cf39..ae402453be 100644
--- a/be/src/runtime/exec_env.h
+++ b/be/src/runtime/exec_env.h
@@ -124,6 +124,9 @@ public:
MemTrackerLimiter* experimental_mem_tracker() { return
_experimental_mem_tracker.get(); }
ThreadPool* send_batch_thread_pool() { return
_send_batch_thread_pool.get(); }
ThreadPool* download_cache_thread_pool() { return
_download_cache_thread_pool.get(); }
+ ThreadPool* send_report_thread_pool() { return
_send_report_thread_pool.get(); }
+ ThreadPool* join_node_thread_pool() { return _join_node_thread_pool.get();
}
+
void set_serial_download_cache_thread_token() {
_serial_download_cache_thread_token =
download_cache_thread_pool()->new_token(ThreadPool::ExecutionMode::SERIAL, 1);
@@ -215,6 +218,10 @@ private:
std::unique_ptr<ThreadPool> _download_cache_thread_pool;
// A token used to submit download cache task serially
std::unique_ptr<ThreadPoolToken> _serial_download_cache_thread_token;
+ // Pool used by fragment manager to send profile or status to FE
coordinator
+ std::unique_ptr<ThreadPool> _send_report_thread_pool;
+ // Pool used by join node to build hash table
+ std::unique_ptr<ThreadPool> _join_node_thread_pool;
// ThreadPoolToken -> buffer
std::unordered_map<ThreadPoolToken*, std::unique_ptr<char[]>>
_download_cache_buf_map;
CgroupsMgr* _cgroups_mgr = nullptr;
diff --git a/be/src/runtime/exec_env_init.cpp b/be/src/runtime/exec_env_init.cpp
index a2770ce6f5..64c0c7cb20 100644
--- a/be/src/runtime/exec_env_init.cpp
+++ b/be/src/runtime/exec_env_init.cpp
@@ -102,6 +102,21 @@ Status ExecEnv::_init(const std::vector<StorePath>&
store_paths) {
init_download_cache_required_components();
+ // min num equal to fragment pool's min num
+ // max num is useless because it will start as many as requested in the
past
+ // queue size is useless because the max thread num is very large
+ ThreadPoolBuilder("SendReportThreadPool")
+ .set_min_threads(config::fragment_pool_thread_num_min)
+ .set_max_threads(std::numeric_limits<int>::max())
+ .set_max_queue_size(config::fragment_pool_queue_size)
+ .build(&_send_report_thread_pool);
+
+ ThreadPoolBuilder("JoinNodeThreadPool")
+ .set_min_threads(config::fragment_pool_thread_num_min)
+ .set_max_threads(std::numeric_limits<int>::max())
+ .set_max_queue_size(config::fragment_pool_queue_size)
+ .build(&_join_node_thread_pool);
+
RETURN_IF_ERROR(init_pipeline_task_scheduler());
_scanner_scheduler = new doris::vectorized::ScannerScheduler();
diff --git a/be/src/runtime/plan_fragment_executor.cpp
b/be/src/runtime/plan_fragment_executor.cpp
index f62bd4bae1..94c200a5f2 100644
--- a/be/src/runtime/plan_fragment_executor.cpp
+++ b/be/src/runtime/plan_fragment_executor.cpp
@@ -64,7 +64,9 @@ PlanFragmentExecutor::PlanFragmentExecutor(ExecEnv* exec_env,
_is_report_success(false),
_is_report_on_cancel(true),
_collect_query_statistics_with_every_batch(false),
- _cancel_reason(PPlanFragmentCancelReason::INTERNAL_ERROR) {}
+ _cancel_reason(PPlanFragmentCancelReason::INTERNAL_ERROR) {
+ _report_thread_future = _report_thread_promise.get_future();
+}
PlanFragmentExecutor::~PlanFragmentExecutor() {
close();
@@ -228,7 +230,10 @@ Status PlanFragmentExecutor::open() {
// at end, otherwise the coordinator hangs in case we finish w/ an error
if (_is_report_success && config::status_report_interval > 0) {
std::unique_lock<std::mutex> l(_report_thread_lock);
- _report_thread = std::thread(&PlanFragmentExecutor::report_profile,
this);
+ _exec_env->send_report_thread_pool()->submit_func([this] {
+ Defer defer {[&]() { this->_report_thread_promise.set_value(true);
}};
+ this->report_profile();
+ });
// make sure the thread started up, otherwise report_profile() might
get into a race
// with stop_report_thread()
_report_thread_started_cv.wait(l);
@@ -456,7 +461,10 @@ void PlanFragmentExecutor::stop_report_thread() {
_report_thread_active = false;
_stop_report_thread_cv.notify_one();
- _report_thread.join();
+ // Wait infinitly until the thread is stopped and the future is set.
+ // The reporting thread depends on the PlanFragmentExecutor object, if not
wait infinitly here, the reporting
+ // thread may crashed because the PlanFragmentExecutor is destroyed.
+ _report_thread_future.wait();
}
void PlanFragmentExecutor::cancel(const PPlanFragmentCancelReason& reason,
const std::string& msg) {
diff --git a/be/src/runtime/plan_fragment_executor.h
b/be/src/runtime/plan_fragment_executor.h
index f4bcdcaa13..352edfccbe 100644
--- a/be/src/runtime/plan_fragment_executor.h
+++ b/be/src/runtime/plan_fragment_executor.h
@@ -22,6 +22,7 @@
#include <condition_variable>
#include <functional>
+#include <future>
#include <vector>
#include "common/object_pool.h"
@@ -135,7 +136,8 @@ private:
// profile reporting-related
report_status_callback _report_status_cb;
- std::thread _report_thread;
+ std::promise<bool> _report_thread_promise;
+ std::future<bool> _report_thread_future;
std::mutex _report_thread_lock;
// Indicates that profile reporting thread should stop.
diff --git a/be/src/vec/exec/join/vjoin_node_base.cpp
b/be/src/vec/exec/join/vjoin_node_base.cpp
index e186f15d8c..7daf1fe237 100644
--- a/be/src/vec/exec/join/vjoin_node_base.cpp
+++ b/be/src/vec/exec/join/vjoin_node_base.cpp
@@ -180,11 +180,12 @@ Status VJoinNodeBase::open(RuntimeState* state) {
std::promise<Status> thread_status;
try {
- std::thread([this, state, thread_status_p = &thread_status,
- parent_span =
opentelemetry::trace::Tracer::GetCurrentSpan()] {
- OpentelemetryScope scope {parent_span};
- this->_probe_side_open_thread(state, thread_status_p);
- }).detach();
+ state->exec_env()->join_node_thread_pool()->submit_func(
+ [this, state, thread_status_p = &thread_status,
+ parent_span = opentelemetry::trace::Tracer::GetCurrentSpan()]
{
+ OpentelemetryScope scope {parent_span};
+ this->_probe_side_open_thread(state, thread_status_p);
+ });
} catch (const std::system_error& e) {
LOG(WARNING) << "In VJoinNodeBase::open create thread fail, " <<
e.what();
return Status::InternalError(e.what());
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]