This is an automated email from the ASF dual-hosted git repository. yiguolei pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/master by this push: new e33f4f90ae [fix](exec) Avoid query thread block on wait_for_start (#12411) e33f4f90ae is described below commit e33f4f90aed728c343757fcba3cc99aa4459185e Author: Mingyu Chen <morningman....@gmail.com> AuthorDate: Tue Sep 13 08:57:37 2022 +0800 [fix](exec) Avoid query thread block on wait_for_start (#12411) When FE send cancel rpc to BE, it does not notify the wait_for_start() thread, so that the fragment will be blocked and occupy the execution thread. Add a max wait time for wait_for_start() thread. So that it will not block forever. --- be/src/common/config.h | 8 +++- be/src/runtime/fragment_mgr.cpp | 48 ++++++++++------------ be/src/runtime/plan_fragment_executor.cpp | 8 ++-- be/src/runtime/plan_fragment_executor.h | 5 --- be/src/runtime/query_fragments_ctx.h | 13 ++++-- be/src/util/doris_metrics.h | 1 + be/src/vec/exec/volap_scan_node.cpp | 5 +-- be/test/runtime/fragment_mgr_test.cpp | 8 ---- docs/sidebars.json | 1 - .../maint-monitor/monitor-metrics/metrics.md | 1 + 10 files changed, 46 insertions(+), 52 deletions(-) diff --git a/be/src/common/config.h b/be/src/common/config.h index a099de3c4f..cf24e4de8b 100644 --- a/be/src/common/config.h +++ b/be/src/common/config.h @@ -859,6 +859,13 @@ CONF_Bool(enable_new_scan_node, "true"); // limit the queue of pending batches which will be sent by a single nodechannel CONF_mInt64(nodechannel_pending_queue_max_bytes, "67108864"); +// Max waiting time to wait the "plan fragment start" rpc. +// If timeout, the fragment will be cancelled. +// This parameter is usually only used when the FE loses connection, +// and the BE can automatically cancel the relevant fragment after the timeout, +// so as to avoid occupying the execution thread for a long time. +CONF_mInt32(max_fragment_start_wait_time_seconds, "30"); + #ifdef BE_TEST // test s3 CONF_String(test_s3_resource, "resource"); @@ -869,7 +876,6 @@ CONF_String(test_s3_region, "region"); CONF_String(test_s3_bucket, "bucket"); CONF_String(test_s3_prefix, "prefix"); #endif - } // namespace config } // namespace doris diff --git a/be/src/runtime/fragment_mgr.cpp b/be/src/runtime/fragment_mgr.cpp index c5b3cbf818..1b86f57a94 100644 --- a/be/src/runtime/fragment_mgr.cpp +++ b/be/src/runtime/fragment_mgr.cpp @@ -60,6 +60,7 @@ namespace doris { DEFINE_GAUGE_METRIC_PROTOTYPE_2ARG(plan_fragment_count, MetricUnit::NOUNIT); DEFINE_GAUGE_METRIC_PROTOTYPE_2ARG(timeout_canceled_fragment_count, MetricUnit::NOUNIT); +DEFINE_GAUGE_METRIC_PROTOTYPE_2ARG(fragment_thread_pool_queue_size, MetricUnit::NOUNIT); std::string to_load_error_http_path(const std::string& file_name) { if (file_name.empty()) { @@ -95,8 +96,6 @@ public: Status execute(); - Status cancel_before_execute(); - Status cancel(const PPlanFragmentCancelReason& reason, const std::string& msg = ""); TUniqueId fragment_instance_id() const { return _fragment_instance_id; } @@ -236,13 +235,20 @@ Status FragmentExecState::execute() { if (_need_wait_execution_trigger) { // if _need_wait_execution_trigger is true, which means this instance // is prepared but need to wait for the signal to do the rest execution. - _fragments_ctx->wait_for_start(); - opentelemetry::trace::Tracer::GetCurrentSpan()->AddEvent("start executing Fragment"); + if (!_fragments_ctx->wait_for_start()) { + return cancel(PPlanFragmentCancelReason::INTERNAL_ERROR, "wait fragment start timeout"); + } + } +#ifndef BE_TEST + if (_executor.runtime_state()->is_cancelled()) { + return Status::Cancelled("cancelled before execution"); } +#endif int64_t duration_ns = 0; { SCOPED_RAW_TIMER(&duration_ns); CgroupsMgr::apply_system_cgroup(); + opentelemetry::trace::Tracer::GetCurrentSpan()->AddEvent("start executing Fragment"); WARN_IF_ERROR(_executor.open(), strings::Substitute("Got error while opening fragment $0", print_id(_fragment_instance_id))); @@ -253,24 +259,9 @@ Status FragmentExecState::execute() { return Status::OK(); } -Status FragmentExecState::cancel_before_execute() { - // set status as 'abort', cuz cancel() won't effect the status arg of DataSink::close(). -#ifndef BE_TEST - SCOPED_ATTACH_TASK(executor()->runtime_state()); -#endif - _executor.set_abort(); - _executor.cancel(); - if (_pipe != nullptr) { - _pipe->cancel("Execution aborted before start"); - } - return Status::OK(); -} - Status FragmentExecState::cancel(const PPlanFragmentCancelReason& reason, const std::string& msg) { if (!_cancelled) { - _cancelled = true; std::lock_guard<std::mutex> l(_status_lock); - RETURN_IF_ERROR(_exec_status); if (reason == PPlanFragmentCancelReason::LIMIT_REACH) { _executor.set_is_report_on_cancel(false); } @@ -278,6 +269,7 @@ Status FragmentExecState::cancel(const PPlanFragmentCancelReason& reason, const if (_pipe != nullptr) { _pipe->cancel(PPlanFragmentCancelReason_Name(reason)); } + _cancelled = true; } return Status::OK(); } @@ -452,11 +444,15 @@ FragmentMgr::FragmentMgr(ExecEnv* exec_env) .set_max_threads(config::fragment_pool_thread_num_max) .set_max_queue_size(config::fragment_pool_queue_size) .build(&_thread_pool); + + REGISTER_HOOK_METRIC(fragment_thread_pool_queue_size, + [this]() { return _thread_pool->get_queue_size(); }); CHECK(s.ok()) << s.to_string(); } FragmentMgr::~FragmentMgr() { DEREGISTER_HOOK_METRIC(plan_fragment_count); + DEREGISTER_HOOK_METRIC(fragment_thread_pool_queue_size); _stop_background_threads_latch.count_down(); if (_cancel_thread) { _cancel_thread->join(); @@ -565,7 +561,7 @@ Status FragmentMgr::start_query_execution(const PExecPlanFragmentStartRequest* r "timeout or be cancelled. host: {}", BackendOptions::get_localhost()); } - search->second->set_ready_to_execute(); + search->second->set_ready_to_execute(false); return Status::OK(); } @@ -712,9 +708,12 @@ Status FragmentMgr::exec_plan_fragment(const TExecPlanFragmentParams& params, Fi std::lock_guard<std::mutex> lock(_lock); _fragment_map.erase(params.params.fragment_instance_id); } - exec_state->cancel_before_execute(); - return Status::InternalError("Put planfragment to thread pool failed. err = {}, BE: {}", - st.get_error_msg(), BackendOptions::get_localhost()); + exec_state->cancel(PPlanFragmentCancelReason::INTERNAL_ERROR, + "push plan fragment to thread pool failed"); + return Status::InternalError( + strings::Substitute("push plan fragment $0 to thread pool failed. err = $1, BE: $2", + print_id(params.params.fragment_instance_id), + st.get_error_msg(), BackendOptions::get_localhost())); } return Status::OK(); @@ -761,9 +760,6 @@ void FragmentMgr::cancel_worker() { } for (auto it = _fragments_ctx_map.begin(); it != _fragments_ctx_map.end();) { if (it->second->is_timeout(now)) { - // The execution logic of the instance needs to be notified. - // The execution logic of the instance will eventually cancel the execution plan. - it->second->set_ready_to_execute(); it = _fragments_ctx_map.erase(it); } else { ++it; diff --git a/be/src/runtime/plan_fragment_executor.cpp b/be/src/runtime/plan_fragment_executor.cpp index aab0b44379..ae5ee8f75a 100644 --- a/be/src/runtime/plan_fragment_executor.cpp +++ b/be/src/runtime/plan_fragment_executor.cpp @@ -628,6 +628,8 @@ void PlanFragmentExecutor::cancel(const PPlanFragmentCancelReason& reason, const _cancel_reason = reason; _cancel_msg = msg; _runtime_state->set_is_cancelled(true); + // To notify wait_for_start() + _runtime_state->get_query_fragments_ctx()->set_ready_to_execute(true); // must close stream_mgr to avoid dead lock in Exchange Node auto env = _runtime_state->exec_env(); @@ -638,10 +640,8 @@ void PlanFragmentExecutor::cancel(const PPlanFragmentCancelReason& reason, const env->stream_mgr()->cancel(id); env->result_mgr()->cancel(id); } -} - -void PlanFragmentExecutor::set_abort() { - update_status(Status::Aborted("Execution aborted before start")); + // Cancel the result queue manager used by spark doris connector + _exec_env->result_queue_mgr()->update_queue_status(id, Status::Aborted(msg)); } const RowDescriptor& PlanFragmentExecutor::row_desc() { diff --git a/be/src/runtime/plan_fragment_executor.h b/be/src/runtime/plan_fragment_executor.h index d2558deedb..215ff0973b 100644 --- a/be/src/runtime/plan_fragment_executor.h +++ b/be/src/runtime/plan_fragment_executor.h @@ -123,11 +123,6 @@ public: // in open()/get_next(). void close(); - // Abort this execution. Must be called if we skip running open(). - // It will let DataSink node closed with error status, to avoid use resources which created in open() phase. - // DataSink node should distinguish Aborted status from other error status. - void set_abort(); - // Initiate cancellation. Must not be called until after prepare() returned. void cancel(const PPlanFragmentCancelReason& reason = PPlanFragmentCancelReason::INTERNAL_ERROR, const std::string& msg = ""); diff --git a/be/src/runtime/query_fragments_ctx.h b/be/src/runtime/query_fragments_ctx.h index 80a07f47e4..8f9ceb38d6 100644 --- a/be/src/runtime/query_fragments_ctx.h +++ b/be/src/runtime/query_fragments_ctx.h @@ -20,6 +20,7 @@ #include <atomic> #include <string> +#include "common/config.h" #include "common/object_pool.h" #include "gen_cpp/PaloInternalService_types.h" // for TQueryOptions #include "gen_cpp/Types_types.h" // for TUniqueId @@ -61,19 +62,22 @@ public: ThreadPoolToken* get_token() { return _thread_token.get(); } - void set_ready_to_execute() { + void set_ready_to_execute(bool is_cancelled) { { std::lock_guard<std::mutex> l(_start_lock); + _is_cancelled = is_cancelled; _ready_to_execute = true; } _start_cond.notify_all(); } - void wait_for_start() { + bool wait_for_start() { + int wait_time = config::max_fragment_start_wait_time_seconds; std::unique_lock<std::mutex> l(_start_lock); - while (!_ready_to_execute.load()) { - _start_cond.wait(l); + while (!_ready_to_execute.load() && !_is_cancelled.load() && --wait_time > 0) { + _start_cond.wait_for(l, std::chrono::seconds(1)); } + return _ready_to_execute.load() && !_is_cancelled.load(); } public: @@ -112,6 +116,7 @@ private: // Only valid when _need_wait_execution_trigger is set to true in FragmentExecState. // And all fragments of this query will start execution when this is set to true. std::atomic<bool> _ready_to_execute {false}; + std::atomic<bool> _is_cancelled {false}; }; } // namespace doris diff --git a/be/src/util/doris_metrics.h b/be/src/util/doris_metrics.h index 57585720c0..da9085671e 100644 --- a/be/src/util/doris_metrics.h +++ b/be/src/util/doris_metrics.h @@ -200,6 +200,7 @@ public: UIntGauge* send_batch_thread_pool_queue_size; UIntGauge* download_cache_thread_pool_thread_num; UIntGauge* download_cache_thread_pool_queue_size; + UIntGauge* fragment_thread_pool_queue_size; // Upload metrics UIntGauge* upload_total_byte; diff --git a/be/src/vec/exec/volap_scan_node.cpp b/be/src/vec/exec/volap_scan_node.cpp index 0f79b7426e..8197c88dbd 100644 --- a/be/src/vec/exec/volap_scan_node.cpp +++ b/be/src/vec/exec/volap_scan_node.cpp @@ -474,8 +474,8 @@ void VOlapScanNode::scanner_thread(VOlapScanner* scanner) { num_rows_in_block < _runtime_state->batch_size())) { if (UNLIKELY(_transfer_done)) { eos = true; - status = Status::Cancelled("Cancelled"); - LOG(INFO) << "Scan thread cancelled, cause query done, maybe reach limit."; + status = Status::Cancelled( + "Scan thread cancelled, cause query done, maybe reach limit."); break; } @@ -1082,7 +1082,6 @@ Status VOlapScanNode::get_next(RuntimeState* state, Block* block, bool* eos) { _block_consumed_cv.notify_all(); *eos = true; - LOG(INFO) << "VOlapScanNode ReachedLimit."; } else { *eos = false; } diff --git a/be/test/runtime/fragment_mgr_test.cpp b/be/test/runtime/fragment_mgr_test.cpp index bcf1b9f250..060b3a501a 100644 --- a/be/test/runtime/fragment_mgr_test.cpp +++ b/be/test/runtime/fragment_mgr_test.cpp @@ -28,7 +28,6 @@ namespace doris { static Status s_prepare_status; static Status s_open_status; -static int s_abort_cnt; // Mock used for this unittest PlanFragmentExecutor::PlanFragmentExecutor(ExecEnv* exec_env, const report_status_callback& report_status_cb) @@ -49,11 +48,6 @@ Status PlanFragmentExecutor::open() { void PlanFragmentExecutor::cancel(const PPlanFragmentCancelReason& reason, const std::string& msg) { } -void PlanFragmentExecutor::set_abort() { - LOG(INFO) << "Plan Aborted"; - s_abort_cnt++; -} - void PlanFragmentExecutor::close() {} class FragmentMgrTest : public testing::Test { @@ -128,7 +122,6 @@ TEST_F(FragmentMgrTest, OfferPoolFailed) { config::fragment_pool_thread_num_min = 1; config::fragment_pool_thread_num_max = 1; config::fragment_pool_queue_size = 0; - s_abort_cnt = 0; FragmentMgr mgr(nullptr); TExecPlanFragmentParams params; @@ -145,7 +138,6 @@ TEST_F(FragmentMgrTest, OfferPoolFailed) { params.params.fragment_instance_id.__set_lo(200); EXPECT_FALSE(mgr.exec_plan_fragment(params).ok()); } - EXPECT_EQ(3, s_abort_cnt); } } // namespace doris diff --git a/docs/sidebars.json b/docs/sidebars.json index 2d8cdb36a0..1b1151e342 100644 --- a/docs/sidebars.json +++ b/docs/sidebars.json @@ -860,7 +860,6 @@ "admin-manual/maint-monitor/multi-tenant", "admin-manual/maint-monitor/tablet-local-debug", "admin-manual/maint-monitor/tablet-restore-tool", - "admin-manual/maint-monitor/monitor-metrics/metrics", "admin-manual/maint-monitor/metadata-operation" ] }, diff --git a/docs/zh-CN/docs/admin-manual/maint-monitor/monitor-metrics/metrics.md b/docs/zh-CN/docs/admin-manual/maint-monitor/monitor-metrics/metrics.md index d5789c05f5..12996ef240 100644 --- a/docs/zh-CN/docs/admin-manual/maint-monitor/monitor-metrics/metrics.md +++ b/docs/zh-CN/docs/admin-manual/maint-monitor/monitor-metrics/metrics.md @@ -275,6 +275,7 @@ curl http://be_host:webserver_port/metrics?type=json |`doris_be_upload_total_byte`| | | 字节 | 冷热分离功能,上传到远端存储成功的rowset数据量累计值| | |`doris_be_load_bytes`| | 字节|通过 tablet sink 发送的数量累计 | 可观测导入数据量 | P0 | |`doris_be_load_rows`| | Num | 通过 tablet sink 发送的行数累计| 可观测导入数据量 | P0 | +|`fragment_thread_pool_queue_size`| | Num | 当前查询执行线程池等待队列的长度 | 如果大于零,则说明查询线程已耗尽,查询会出现堆积 | P0 | ### 机器监控 --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org For additional commands, e-mail: commits-h...@doris.apache.org