This is an automated email from the ASF dual-hosted git repository. morningman pushed a commit to branch dev-1.1.2 in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/dev-1.1.2 by this push: new 0d873883ba [dev-1.1.2][fix] avoid thread blocks on wait_for_start() (#12392) 0d873883ba is described below commit 0d873883ba76260d6d2a011a227f6b2de8696eb0 Author: Mingyu Chen <morningman....@gmail.com> AuthorDate: Wed Sep 7 21:34:02 2022 +0800 [dev-1.1.2][fix] avoid thread blocks on wait_for_start() (#12392) When cancel a fragment, we should notify the "wait_for_start()" thread to avoid thread blocking. Only for 1.1.2, #12411 is for master branch --- be/src/common/config.h | 3 +++ be/src/runtime/fragment_mgr.cpp | 43 +++++++++++++------------------ be/src/runtime/plan_fragment_executor.cpp | 8 +++--- be/src/runtime/plan_fragment_executor.h | 5 ---- be/src/runtime/query_fragments_ctx.h | 13 +++++++--- be/src/util/doris_metrics.h | 1 + be/src/vec/exec/volap_scan_node.cpp | 4 +-- be/src/vec/sink/vdata_stream_sender.h | 3 --- 8 files changed, 36 insertions(+), 44 deletions(-) diff --git a/be/src/common/config.h b/be/src/common/config.h index aa0b55bc0b..945e9631cd 100644 --- a/be/src/common/config.h +++ b/be/src/common/config.h @@ -746,6 +746,9 @@ CONF_Int32(quick_compaction_max_rows, "1000"); CONF_Int32(quick_compaction_batch_size, "10"); // do compaction min rowsets CONF_Int32(quick_compaction_min_rowsets, "10"); +// Max waiting time to wait the "plan fragment start" rpc. +// If timeout, the fragment will be cancelled. +CONF_mInt32(max_fragment_start_wait_time_seconds, "30"); } // namespace config } // namespace doris diff --git a/be/src/runtime/fragment_mgr.cpp b/be/src/runtime/fragment_mgr.cpp index a70d744447..3612ce2479 100644 --- a/be/src/runtime/fragment_mgr.cpp +++ b/be/src/runtime/fragment_mgr.cpp @@ -57,6 +57,7 @@ namespace doris { DEFINE_GAUGE_METRIC_PROTOTYPE_2ARG(plan_fragment_count, MetricUnit::NOUNIT); DEFINE_GAUGE_METRIC_PROTOTYPE_2ARG(timeout_canceled_fragment_count, MetricUnit::NOUNIT); +DEFINE_GAUGE_METRIC_PROTOTYPE_2ARG(fragment_thread_pool_queue_size, MetricUnit::NOUNIT); std::string to_load_error_http_path(const std::string& file_name) { if (file_name.empty()) { @@ -92,8 +93,6 @@ public: Status execute(); - Status cancel_before_execute(); - Status cancel(const PPlanFragmentCancelReason& reason, const std::string& msg = ""); TUniqueId fragment_instance_id() const { return _fragment_instance_id; } @@ -233,7 +232,12 @@ Status FragmentExecState::execute() { if (_need_wait_execution_trigger) { // if _need_wait_execution_trigger is true, which means this instance // is prepared but need to wait for the signal to do the rest execution. - _fragments_ctx->wait_for_start(); + if (!_fragments_ctx->wait_for_start()) { + return cancel(PPlanFragmentCancelReason::INTERNAL_ERROR, "wait fragment start timeout"); + } + } + if (_executor.runtime_state()->is_cancelled()) { + return Status::Cancelled("cancelled before execution"); } int64_t duration_ns = 0; { @@ -248,24 +252,9 @@ Status FragmentExecState::execute() { return Status::OK(); } -Status FragmentExecState::cancel_before_execute() { - // set status as 'abort', cuz cancel() won't effect the status arg of DataSink::close(). -#ifndef BE_TEST - SCOPED_ATTACH_TASK(executor()->runtime_state()); -#endif - _executor.set_abort(); - _executor.cancel(); - if (_pipe != nullptr) { - _pipe->cancel("Execution aborted before start"); - } - return Status::OK(); -} - Status FragmentExecState::cancel(const PPlanFragmentCancelReason& reason, const std::string& msg) { if (!_cancelled) { - _cancelled = true; std::lock_guard<std::mutex> l(_status_lock); - RETURN_IF_ERROR(_exec_status); if (reason == PPlanFragmentCancelReason::LIMIT_REACH) { _executor.set_is_report_on_cancel(false); } @@ -273,6 +262,7 @@ Status FragmentExecState::cancel(const PPlanFragmentCancelReason& reason, const if (_pipe != nullptr) { _pipe->cancel(PPlanFragmentCancelReason_Name(reason)); } + _cancelled = true; } return Status::OK(); } @@ -447,11 +437,15 @@ FragmentMgr::FragmentMgr(ExecEnv* exec_env) .set_max_threads(config::fragment_pool_thread_num_max) .set_max_queue_size(config::fragment_pool_queue_size) .build(&_thread_pool); + + REGISTER_HOOK_METRIC(fragment_thread_pool_queue_size, + [this]() { return _thread_pool->get_queue_size(); }); CHECK(s.ok()) << s.to_string(); } FragmentMgr::~FragmentMgr() { DEREGISTER_HOOK_METRIC(plan_fragment_count); + DEREGISTER_HOOK_METRIC(fragment_thread_pool_queue_size); _stop_background_threads_latch.count_down(); if (_cancel_thread) { _cancel_thread->join(); @@ -548,7 +542,7 @@ Status FragmentMgr::start_query_execution(const PExecPlanFragmentStartRequest* r "timeout or be cancelled. host: ", BackendOptions::get_localhost())); } - search->second->set_ready_to_execute(); + search->second->set_ready_to_execute(false); return Status::OK(); } @@ -673,10 +667,12 @@ Status FragmentMgr::exec_plan_fragment(const TExecPlanFragmentParams& params, Fi std::lock_guard<std::mutex> lock(_lock); _fragment_map.erase(params.params.fragment_instance_id); } - exec_state->cancel_before_execute(); + exec_state->cancel(PPlanFragmentCancelReason::INTERNAL_ERROR, + "push plan fragment to thread pool failed"); return Status::InternalError( - strings::Substitute("Put planfragment to thread pool failed. err = $0, BE: $1", - st.get_error_msg(), BackendOptions::get_localhost())); + strings::Substitute("push plan fragment $0 to thread pool failed. err = $1, BE: $2", + print_id(params.params.fragment_instance_id), st.get_error_msg(), + BackendOptions::get_localhost())); } return Status::OK(); @@ -714,9 +710,6 @@ void FragmentMgr::cancel_worker() { } for (auto it = _fragments_ctx_map.begin(); it != _fragments_ctx_map.end();) { if (it->second->is_timeout(now)) { - // The execution logic of the instance needs to be notified. - // The execution logic of the instance will eventually cancel the execution plan. - it->second->set_ready_to_execute(); it = _fragments_ctx_map.erase(it); } else { ++it; diff --git a/be/src/runtime/plan_fragment_executor.cpp b/be/src/runtime/plan_fragment_executor.cpp index 8f8cdd62b7..6923059553 100644 --- a/be/src/runtime/plan_fragment_executor.cpp +++ b/be/src/runtime/plan_fragment_executor.cpp @@ -636,6 +636,8 @@ void PlanFragmentExecutor::cancel(const PPlanFragmentCancelReason& reason, const _cancel_reason = reason; _cancel_msg = msg; _runtime_state->set_is_cancelled(true); + // To notify wait_for_start() + _runtime_state->get_query_fragments_ctx()->set_ready_to_execute(true); // must close stream_mgr to avoid dead lock in Exchange Node auto env = _runtime_state->exec_env(); @@ -646,10 +648,8 @@ void PlanFragmentExecutor::cancel(const PPlanFragmentCancelReason& reason, const env->stream_mgr()->cancel(id); env->result_mgr()->cancel(id); } -} - -void PlanFragmentExecutor::set_abort() { - update_status(Status::Aborted("Execution aborted before start")); + // Cancel the result queue manager used by spark doris connector + _exec_env->result_queue_mgr()->update_queue_status(id, Status::Aborted(msg)); } const RowDescriptor& PlanFragmentExecutor::row_desc() { diff --git a/be/src/runtime/plan_fragment_executor.h b/be/src/runtime/plan_fragment_executor.h index de11ca87d9..72a5d714ea 100644 --- a/be/src/runtime/plan_fragment_executor.h +++ b/be/src/runtime/plan_fragment_executor.h @@ -121,11 +121,6 @@ public: // in open()/get_next(). void close(); - // Abort this execution. Must be called if we skip running open(). - // It will let DataSink node closed with error status, to avoid use resources which created in open() phase. - // DataSink node should distinguish Aborted status from other error status. - void set_abort(); - // Initiate cancellation. Must not be called until after prepare() returned. void cancel(const PPlanFragmentCancelReason& reason = PPlanFragmentCancelReason::INTERNAL_ERROR, const std::string& msg = ""); diff --git a/be/src/runtime/query_fragments_ctx.h b/be/src/runtime/query_fragments_ctx.h index 1e7b3874c6..7d88861031 100644 --- a/be/src/runtime/query_fragments_ctx.h +++ b/be/src/runtime/query_fragments_ctx.h @@ -21,6 +21,7 @@ #include <condition_variable> #include <string> +#include "common/config.h" #include "common/object_pool.h" #include "gen_cpp/PaloInternalService_types.h" // for TQueryOptions #include "gen_cpp/Types_types.h" // for TUniqueId @@ -72,19 +73,22 @@ public: ThreadPoolToken* get_serial_token() { return _serial_thread_token.get(); } - void set_ready_to_execute() { + void set_ready_to_execute(bool is_cancelled) { { std::lock_guard<std::mutex> l(_start_lock); + _is_cancelled = is_cancelled; _ready_to_execute = true; } _start_cond.notify_all(); } - void wait_for_start() { + bool wait_for_start() { + int wait_time = config::max_fragment_start_wait_time_seconds; std::unique_lock<std::mutex> l(_start_lock); - while (!_ready_to_execute.load()) { - _start_cond.wait(l); + while (!_ready_to_execute.load() && !_is_cancelled.load() && --wait_time > 0) { + _start_cond.wait_for(l, std::chrono::seconds(1)); } + return _ready_to_execute.load() && !_is_cancelled.load(); } public: @@ -127,6 +131,7 @@ private: // Only valid when _need_wait_execution_trigger is set to true in FragmentExecState. // And all fragments of this query will start execution when this is set to true. std::atomic<bool> _ready_to_execute {false}; + std::atomic<bool> _is_cancelled {false}; }; } // end of namespace diff --git a/be/src/util/doris_metrics.h b/be/src/util/doris_metrics.h index 8015dcaefa..9353760eaf 100644 --- a/be/src/util/doris_metrics.h +++ b/be/src/util/doris_metrics.h @@ -197,6 +197,7 @@ public: UIntGauge* add_batch_task_queue_size; UIntGauge* send_batch_thread_pool_thread_num; UIntGauge* send_batch_thread_pool_queue_size; + UIntGauge* fragment_thread_pool_queue_size; static DorisMetrics* instance() { static DorisMetrics instance; diff --git a/be/src/vec/exec/volap_scan_node.cpp b/be/src/vec/exec/volap_scan_node.cpp index 2a06138767..999c7716f1 100644 --- a/be/src/vec/exec/volap_scan_node.cpp +++ b/be/src/vec/exec/volap_scan_node.cpp @@ -216,8 +216,7 @@ void VOlapScanNode::scanner_thread(VOlapScanner* scanner) { num_rows_in_block < _runtime_state->batch_size())) { if (UNLIKELY(_transfer_done)) { eos = true; - status = Status::Cancelled("Cancelled"); - LOG(INFO) << "Scan thread cancelled, cause query done, maybe reach limit."; + status = Status::Cancelled("Scan thread cancelled, cause query done, maybe reach limit."); break; } @@ -542,7 +541,6 @@ Status VOlapScanNode::get_next(RuntimeState* state, Block* block, bool* eos) { _block_consumed_cv.notify_all(); *eos = true; - LOG(INFO) << "VOlapScanNode ReachedLimit."; } else { *eos = false; } diff --git a/be/src/vec/sink/vdata_stream_sender.h b/be/src/vec/sink/vdata_stream_sender.h index 09c9549a5b..94aefeda99 100644 --- a/be/src/vec/sink/vdata_stream_sender.h +++ b/be/src/vec/sink/vdata_stream_sender.h @@ -175,9 +175,6 @@ public: std::string localhost = BackendOptions::get_localhost(); _is_local = (_brpc_dest_addr.hostname == localhost) && (_brpc_dest_addr.port == config::brpc_port); - if (_is_local) { - LOG(INFO) << "will use local Exchange, dest_node_id is : " << _dest_node_id; - } } virtual ~Channel() { --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org For additional commands, e-mail: commits-h...@doris.apache.org