This is an automated email from the ASF dual-hosted git repository.

yiguolei pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/doris.git


The following commit(s) were added to refs/heads/master by this push:
     new e33f4f90ae [fix](exec) Avoid query thread block on wait_for_start 
(#12411)
e33f4f90ae is described below

commit e33f4f90aed728c343757fcba3cc99aa4459185e
Author: Mingyu Chen <morningman....@gmail.com>
AuthorDate: Tue Sep 13 08:57:37 2022 +0800

    [fix](exec) Avoid query thread block on wait_for_start (#12411)
    
    When FE send cancel rpc to BE, it does not notify the wait_for_start() 
thread, so that the fragment will be blocked and occupy the execution thread.
    Add a max wait time for wait_for_start() thread. So that it will not block 
forever.
---
 be/src/common/config.h                             |  8 +++-
 be/src/runtime/fragment_mgr.cpp                    | 48 ++++++++++------------
 be/src/runtime/plan_fragment_executor.cpp          |  8 ++--
 be/src/runtime/plan_fragment_executor.h            |  5 ---
 be/src/runtime/query_fragments_ctx.h               | 13 ++++--
 be/src/util/doris_metrics.h                        |  1 +
 be/src/vec/exec/volap_scan_node.cpp                |  5 +--
 be/test/runtime/fragment_mgr_test.cpp              |  8 ----
 docs/sidebars.json                                 |  1 -
 .../maint-monitor/monitor-metrics/metrics.md       |  1 +
 10 files changed, 46 insertions(+), 52 deletions(-)

diff --git a/be/src/common/config.h b/be/src/common/config.h
index a099de3c4f..cf24e4de8b 100644
--- a/be/src/common/config.h
+++ b/be/src/common/config.h
@@ -859,6 +859,13 @@ CONF_Bool(enable_new_scan_node, "true");
 // limit the queue of pending batches which will be sent by a single 
nodechannel
 CONF_mInt64(nodechannel_pending_queue_max_bytes, "67108864");
 
+// Max waiting time to wait the "plan fragment start" rpc.
+// If timeout, the fragment will be cancelled.
+// This parameter is usually only used when the FE loses connection,
+// and the BE can automatically cancel the relevant fragment after the timeout,
+// so as to avoid occupying the execution thread for a long time.
+CONF_mInt32(max_fragment_start_wait_time_seconds, "30");
+
 #ifdef BE_TEST
 // test s3
 CONF_String(test_s3_resource, "resource");
@@ -869,7 +876,6 @@ CONF_String(test_s3_region, "region");
 CONF_String(test_s3_bucket, "bucket");
 CONF_String(test_s3_prefix, "prefix");
 #endif
-
 } // namespace config
 
 } // namespace doris
diff --git a/be/src/runtime/fragment_mgr.cpp b/be/src/runtime/fragment_mgr.cpp
index c5b3cbf818..1b86f57a94 100644
--- a/be/src/runtime/fragment_mgr.cpp
+++ b/be/src/runtime/fragment_mgr.cpp
@@ -60,6 +60,7 @@ namespace doris {
 
 DEFINE_GAUGE_METRIC_PROTOTYPE_2ARG(plan_fragment_count, MetricUnit::NOUNIT);
 DEFINE_GAUGE_METRIC_PROTOTYPE_2ARG(timeout_canceled_fragment_count, 
MetricUnit::NOUNIT);
+DEFINE_GAUGE_METRIC_PROTOTYPE_2ARG(fragment_thread_pool_queue_size, 
MetricUnit::NOUNIT);
 
 std::string to_load_error_http_path(const std::string& file_name) {
     if (file_name.empty()) {
@@ -95,8 +96,6 @@ public:
 
     Status execute();
 
-    Status cancel_before_execute();
-
     Status cancel(const PPlanFragmentCancelReason& reason, const std::string& 
msg = "");
     TUniqueId fragment_instance_id() const { return _fragment_instance_id; }
 
@@ -236,13 +235,20 @@ Status FragmentExecState::execute() {
     if (_need_wait_execution_trigger) {
         // if _need_wait_execution_trigger is true, which means this instance
         // is prepared but need to wait for the signal to do the rest 
execution.
-        _fragments_ctx->wait_for_start();
-        opentelemetry::trace::Tracer::GetCurrentSpan()->AddEvent("start 
executing Fragment");
+        if (!_fragments_ctx->wait_for_start()) {
+            return cancel(PPlanFragmentCancelReason::INTERNAL_ERROR, "wait 
fragment start timeout");
+        }
+    }
+#ifndef BE_TEST
+    if (_executor.runtime_state()->is_cancelled()) {
+        return Status::Cancelled("cancelled before execution");
     }
+#endif
     int64_t duration_ns = 0;
     {
         SCOPED_RAW_TIMER(&duration_ns);
         CgroupsMgr::apply_system_cgroup();
+        opentelemetry::trace::Tracer::GetCurrentSpan()->AddEvent("start 
executing Fragment");
         WARN_IF_ERROR(_executor.open(), strings::Substitute("Got error while 
opening fragment $0",
                                                             
print_id(_fragment_instance_id)));
 
@@ -253,24 +259,9 @@ Status FragmentExecState::execute() {
     return Status::OK();
 }
 
-Status FragmentExecState::cancel_before_execute() {
-    // set status as 'abort', cuz cancel() won't effect the status arg of 
DataSink::close().
-#ifndef BE_TEST
-    SCOPED_ATTACH_TASK(executor()->runtime_state());
-#endif
-    _executor.set_abort();
-    _executor.cancel();
-    if (_pipe != nullptr) {
-        _pipe->cancel("Execution aborted before start");
-    }
-    return Status::OK();
-}
-
 Status FragmentExecState::cancel(const PPlanFragmentCancelReason& reason, 
const std::string& msg) {
     if (!_cancelled) {
-        _cancelled = true;
         std::lock_guard<std::mutex> l(_status_lock);
-        RETURN_IF_ERROR(_exec_status);
         if (reason == PPlanFragmentCancelReason::LIMIT_REACH) {
             _executor.set_is_report_on_cancel(false);
         }
@@ -278,6 +269,7 @@ Status FragmentExecState::cancel(const 
PPlanFragmentCancelReason& reason, const
         if (_pipe != nullptr) {
             _pipe->cancel(PPlanFragmentCancelReason_Name(reason));
         }
+        _cancelled = true;
     }
     return Status::OK();
 }
@@ -452,11 +444,15 @@ FragmentMgr::FragmentMgr(ExecEnv* exec_env)
                 .set_max_threads(config::fragment_pool_thread_num_max)
                 .set_max_queue_size(config::fragment_pool_queue_size)
                 .build(&_thread_pool);
+
+    REGISTER_HOOK_METRIC(fragment_thread_pool_queue_size,
+                         [this]() { return _thread_pool->get_queue_size(); });
     CHECK(s.ok()) << s.to_string();
 }
 
 FragmentMgr::~FragmentMgr() {
     DEREGISTER_HOOK_METRIC(plan_fragment_count);
+    DEREGISTER_HOOK_METRIC(fragment_thread_pool_queue_size);
     _stop_background_threads_latch.count_down();
     if (_cancel_thread) {
         _cancel_thread->join();
@@ -565,7 +561,7 @@ Status FragmentMgr::start_query_execution(const 
PExecPlanFragmentStartRequest* r
                 "timeout or be cancelled. host: {}",
                 BackendOptions::get_localhost());
     }
-    search->second->set_ready_to_execute();
+    search->second->set_ready_to_execute(false);
     return Status::OK();
 }
 
@@ -712,9 +708,12 @@ Status FragmentMgr::exec_plan_fragment(const 
TExecPlanFragmentParams& params, Fi
             std::lock_guard<std::mutex> lock(_lock);
             _fragment_map.erase(params.params.fragment_instance_id);
         }
-        exec_state->cancel_before_execute();
-        return Status::InternalError("Put planfragment to thread pool failed. 
err = {}, BE: {}",
-                                     st.get_error_msg(), 
BackendOptions::get_localhost());
+        exec_state->cancel(PPlanFragmentCancelReason::INTERNAL_ERROR,
+                           "push plan fragment to thread pool failed");
+        return Status::InternalError(
+                strings::Substitute("push plan fragment $0 to thread pool 
failed. err = $1, BE: $2",
+                                    
print_id(params.params.fragment_instance_id),
+                                    st.get_error_msg(), 
BackendOptions::get_localhost()));
     }
 
     return Status::OK();
@@ -761,9 +760,6 @@ void FragmentMgr::cancel_worker() {
             }
             for (auto it = _fragments_ctx_map.begin(); it != 
_fragments_ctx_map.end();) {
                 if (it->second->is_timeout(now)) {
-                    // The execution logic of the instance needs to be 
notified.
-                    // The execution logic of the instance will eventually 
cancel the execution plan.
-                    it->second->set_ready_to_execute();
                     it = _fragments_ctx_map.erase(it);
                 } else {
                     ++it;
diff --git a/be/src/runtime/plan_fragment_executor.cpp 
b/be/src/runtime/plan_fragment_executor.cpp
index aab0b44379..ae5ee8f75a 100644
--- a/be/src/runtime/plan_fragment_executor.cpp
+++ b/be/src/runtime/plan_fragment_executor.cpp
@@ -628,6 +628,8 @@ void PlanFragmentExecutor::cancel(const 
PPlanFragmentCancelReason& reason, const
     _cancel_reason = reason;
     _cancel_msg = msg;
     _runtime_state->set_is_cancelled(true);
+    // To notify wait_for_start()
+    _runtime_state->get_query_fragments_ctx()->set_ready_to_execute(true);
 
     // must close stream_mgr to avoid dead lock in Exchange Node
     auto env = _runtime_state->exec_env();
@@ -638,10 +640,8 @@ void PlanFragmentExecutor::cancel(const 
PPlanFragmentCancelReason& reason, const
         env->stream_mgr()->cancel(id);
         env->result_mgr()->cancel(id);
     }
-}
-
-void PlanFragmentExecutor::set_abort() {
-    update_status(Status::Aborted("Execution aborted before start"));
+    // Cancel the result queue manager used by spark doris connector
+    _exec_env->result_queue_mgr()->update_queue_status(id, 
Status::Aborted(msg));
 }
 
 const RowDescriptor& PlanFragmentExecutor::row_desc() {
diff --git a/be/src/runtime/plan_fragment_executor.h 
b/be/src/runtime/plan_fragment_executor.h
index d2558deedb..215ff0973b 100644
--- a/be/src/runtime/plan_fragment_executor.h
+++ b/be/src/runtime/plan_fragment_executor.h
@@ -123,11 +123,6 @@ public:
     // in open()/get_next().
     void close();
 
-    // Abort this execution. Must be called if we skip running open().
-    // It will let DataSink node closed with error status, to avoid use 
resources which created in open() phase.
-    // DataSink node should distinguish Aborted status from other error status.
-    void set_abort();
-
     // Initiate cancellation. Must not be called until after prepare() 
returned.
     void cancel(const PPlanFragmentCancelReason& reason = 
PPlanFragmentCancelReason::INTERNAL_ERROR,
                 const std::string& msg = "");
diff --git a/be/src/runtime/query_fragments_ctx.h 
b/be/src/runtime/query_fragments_ctx.h
index 80a07f47e4..8f9ceb38d6 100644
--- a/be/src/runtime/query_fragments_ctx.h
+++ b/be/src/runtime/query_fragments_ctx.h
@@ -20,6 +20,7 @@
 #include <atomic>
 #include <string>
 
+#include "common/config.h"
 #include "common/object_pool.h"
 #include "gen_cpp/PaloInternalService_types.h" // for TQueryOptions
 #include "gen_cpp/Types_types.h"               // for TUniqueId
@@ -61,19 +62,22 @@ public:
 
     ThreadPoolToken* get_token() { return _thread_token.get(); }
 
-    void set_ready_to_execute() {
+    void set_ready_to_execute(bool is_cancelled) {
         {
             std::lock_guard<std::mutex> l(_start_lock);
+            _is_cancelled = is_cancelled;
             _ready_to_execute = true;
         }
         _start_cond.notify_all();
     }
 
-    void wait_for_start() {
+    bool wait_for_start() {
+        int wait_time = config::max_fragment_start_wait_time_seconds;
         std::unique_lock<std::mutex> l(_start_lock);
-        while (!_ready_to_execute.load()) {
-            _start_cond.wait(l);
+        while (!_ready_to_execute.load() && !_is_cancelled.load() && 
--wait_time > 0) {
+            _start_cond.wait_for(l, std::chrono::seconds(1));
         }
+        return _ready_to_execute.load() && !_is_cancelled.load();
     }
 
 public:
@@ -112,6 +116,7 @@ private:
     // Only valid when _need_wait_execution_trigger is set to true in 
FragmentExecState.
     // And all fragments of this query will start execution when this is set 
to true.
     std::atomic<bool> _ready_to_execute {false};
+    std::atomic<bool> _is_cancelled {false};
 };
 
 } // namespace doris
diff --git a/be/src/util/doris_metrics.h b/be/src/util/doris_metrics.h
index 57585720c0..da9085671e 100644
--- a/be/src/util/doris_metrics.h
+++ b/be/src/util/doris_metrics.h
@@ -200,6 +200,7 @@ public:
     UIntGauge* send_batch_thread_pool_queue_size;
     UIntGauge* download_cache_thread_pool_thread_num;
     UIntGauge* download_cache_thread_pool_queue_size;
+    UIntGauge* fragment_thread_pool_queue_size;
 
     // Upload metrics
     UIntGauge* upload_total_byte;
diff --git a/be/src/vec/exec/volap_scan_node.cpp 
b/be/src/vec/exec/volap_scan_node.cpp
index 0f79b7426e..8197c88dbd 100644
--- a/be/src/vec/exec/volap_scan_node.cpp
+++ b/be/src/vec/exec/volap_scan_node.cpp
@@ -474,8 +474,8 @@ void VOlapScanNode::scanner_thread(VOlapScanner* scanner) {
             num_rows_in_block < _runtime_state->batch_size())) {
         if (UNLIKELY(_transfer_done)) {
             eos = true;
-            status = Status::Cancelled("Cancelled");
-            LOG(INFO) << "Scan thread cancelled, cause query done, maybe reach 
limit.";
+            status = Status::Cancelled(
+                    "Scan thread cancelled, cause query done, maybe reach 
limit.");
             break;
         }
 
@@ -1082,7 +1082,6 @@ Status VOlapScanNode::get_next(RuntimeState* state, 
Block* block, bool* eos) {
 
             _block_consumed_cv.notify_all();
             *eos = true;
-            LOG(INFO) << "VOlapScanNode ReachedLimit.";
         } else {
             *eos = false;
         }
diff --git a/be/test/runtime/fragment_mgr_test.cpp 
b/be/test/runtime/fragment_mgr_test.cpp
index bcf1b9f250..060b3a501a 100644
--- a/be/test/runtime/fragment_mgr_test.cpp
+++ b/be/test/runtime/fragment_mgr_test.cpp
@@ -28,7 +28,6 @@ namespace doris {
 
 static Status s_prepare_status;
 static Status s_open_status;
-static int s_abort_cnt;
 // Mock used for this unittest
 PlanFragmentExecutor::PlanFragmentExecutor(ExecEnv* exec_env,
                                            const report_status_callback& 
report_status_cb)
@@ -49,11 +48,6 @@ Status PlanFragmentExecutor::open() {
 void PlanFragmentExecutor::cancel(const PPlanFragmentCancelReason& reason, 
const std::string& msg) {
 }
 
-void PlanFragmentExecutor::set_abort() {
-    LOG(INFO) << "Plan Aborted";
-    s_abort_cnt++;
-}
-
 void PlanFragmentExecutor::close() {}
 
 class FragmentMgrTest : public testing::Test {
@@ -128,7 +122,6 @@ TEST_F(FragmentMgrTest, OfferPoolFailed) {
     config::fragment_pool_thread_num_min = 1;
     config::fragment_pool_thread_num_max = 1;
     config::fragment_pool_queue_size = 0;
-    s_abort_cnt = 0;
     FragmentMgr mgr(nullptr);
 
     TExecPlanFragmentParams params;
@@ -145,7 +138,6 @@ TEST_F(FragmentMgrTest, OfferPoolFailed) {
         params.params.fragment_instance_id.__set_lo(200);
         EXPECT_FALSE(mgr.exec_plan_fragment(params).ok());
     }
-    EXPECT_EQ(3, s_abort_cnt);
 }
 
 } // namespace doris
diff --git a/docs/sidebars.json b/docs/sidebars.json
index 2d8cdb36a0..1b1151e342 100644
--- a/docs/sidebars.json
+++ b/docs/sidebars.json
@@ -860,7 +860,6 @@
                        "admin-manual/maint-monitor/multi-tenant",
                        "admin-manual/maint-monitor/tablet-local-debug",
                        "admin-manual/maint-monitor/tablet-restore-tool",
-                       "admin-manual/maint-monitor/monitor-metrics/metrics",
                        "admin-manual/maint-monitor/metadata-operation"
                     ]
                 },
diff --git 
a/docs/zh-CN/docs/admin-manual/maint-monitor/monitor-metrics/metrics.md 
b/docs/zh-CN/docs/admin-manual/maint-monitor/monitor-metrics/metrics.md
index d5789c05f5..12996ef240 100644
--- a/docs/zh-CN/docs/admin-manual/maint-monitor/monitor-metrics/metrics.md
+++ b/docs/zh-CN/docs/admin-manual/maint-monitor/monitor-metrics/metrics.md
@@ -275,6 +275,7 @@ curl http://be_host:webserver_port/metrics?type=json
 |`doris_be_upload_total_byte`| | | 字节 | 冷热分离功能,上传到远端存储成功的rowset数据量累计值| |
 |`doris_be_load_bytes`| | 字节|通过 tablet sink 发送的数量累计 | 可观测导入数据量 | P0 |
 |`doris_be_load_rows`| | Num | 通过 tablet sink 发送的行数累计| 可观测导入数据量 | P0 |
+|`fragment_thread_pool_queue_size`| | Num | 当前查询执行线程池等待队列的长度 | 
如果大于零,则说明查询线程已耗尽,查询会出现堆积 | P0 |
 
 ### 机器监控
 


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org
For additional commands, e-mail: commits-h...@doris.apache.org

Reply via email to