This is an automated email from the ASF dual-hosted git repository.

morningman pushed a commit to branch dev-1.1.2
in repository https://gitbox.apache.org/repos/asf/doris.git


The following commit(s) were added to refs/heads/dev-1.1.2 by this push:
     new 0d873883ba [dev-1.1.2][fix] avoid thread blocks on wait_for_start() 
(#12392)
0d873883ba is described below

commit 0d873883ba76260d6d2a011a227f6b2de8696eb0
Author: Mingyu Chen <morningman....@gmail.com>
AuthorDate: Wed Sep 7 21:34:02 2022 +0800

    [dev-1.1.2][fix] avoid thread blocks on wait_for_start() (#12392)
    
    When cancel a fragment, we should notify the "wait_for_start()" thread to 
avoid thread blocking.
    Only for 1.1.2, #12411 is for master branch
---
 be/src/common/config.h                    |  3 +++
 be/src/runtime/fragment_mgr.cpp           | 43 +++++++++++++------------------
 be/src/runtime/plan_fragment_executor.cpp |  8 +++---
 be/src/runtime/plan_fragment_executor.h   |  5 ----
 be/src/runtime/query_fragments_ctx.h      | 13 +++++++---
 be/src/util/doris_metrics.h               |  1 +
 be/src/vec/exec/volap_scan_node.cpp       |  4 +--
 be/src/vec/sink/vdata_stream_sender.h     |  3 ---
 8 files changed, 36 insertions(+), 44 deletions(-)

diff --git a/be/src/common/config.h b/be/src/common/config.h
index aa0b55bc0b..945e9631cd 100644
--- a/be/src/common/config.h
+++ b/be/src/common/config.h
@@ -746,6 +746,9 @@ CONF_Int32(quick_compaction_max_rows, "1000");
 CONF_Int32(quick_compaction_batch_size, "10");
 // do compaction min rowsets
 CONF_Int32(quick_compaction_min_rowsets, "10");
+// Max waiting time to wait the "plan fragment start" rpc.
+// If timeout, the fragment will be cancelled.
+CONF_mInt32(max_fragment_start_wait_time_seconds, "30");
 } // namespace config
 
 } // namespace doris
diff --git a/be/src/runtime/fragment_mgr.cpp b/be/src/runtime/fragment_mgr.cpp
index a70d744447..3612ce2479 100644
--- a/be/src/runtime/fragment_mgr.cpp
+++ b/be/src/runtime/fragment_mgr.cpp
@@ -57,6 +57,7 @@ namespace doris {
 
 DEFINE_GAUGE_METRIC_PROTOTYPE_2ARG(plan_fragment_count, MetricUnit::NOUNIT);
 DEFINE_GAUGE_METRIC_PROTOTYPE_2ARG(timeout_canceled_fragment_count, 
MetricUnit::NOUNIT);
+DEFINE_GAUGE_METRIC_PROTOTYPE_2ARG(fragment_thread_pool_queue_size, 
MetricUnit::NOUNIT);
 
 std::string to_load_error_http_path(const std::string& file_name) {
     if (file_name.empty()) {
@@ -92,8 +93,6 @@ public:
 
     Status execute();
 
-    Status cancel_before_execute();
-
     Status cancel(const PPlanFragmentCancelReason& reason, const std::string& 
msg = "");
     TUniqueId fragment_instance_id() const { return _fragment_instance_id; }
 
@@ -233,7 +232,12 @@ Status FragmentExecState::execute() {
     if (_need_wait_execution_trigger) {
         // if _need_wait_execution_trigger is true, which means this instance
         // is prepared but need to wait for the signal to do the rest 
execution.
-        _fragments_ctx->wait_for_start();
+        if (!_fragments_ctx->wait_for_start()) {
+            return cancel(PPlanFragmentCancelReason::INTERNAL_ERROR, "wait 
fragment start timeout");
+        }
+    }
+    if (_executor.runtime_state()->is_cancelled()) {
+        return Status::Cancelled("cancelled before execution");
     }
     int64_t duration_ns = 0;
     {
@@ -248,24 +252,9 @@ Status FragmentExecState::execute() {
     return Status::OK();
 }
 
-Status FragmentExecState::cancel_before_execute() {
-    // set status as 'abort', cuz cancel() won't effect the status arg of 
DataSink::close().
-#ifndef BE_TEST
-    SCOPED_ATTACH_TASK(executor()->runtime_state());
-#endif
-    _executor.set_abort();
-    _executor.cancel();
-    if (_pipe != nullptr) {
-        _pipe->cancel("Execution aborted before start");
-    }
-    return Status::OK();
-}
-
 Status FragmentExecState::cancel(const PPlanFragmentCancelReason& reason, 
const std::string& msg) {
     if (!_cancelled) {
-        _cancelled = true;
         std::lock_guard<std::mutex> l(_status_lock);
-        RETURN_IF_ERROR(_exec_status);
         if (reason == PPlanFragmentCancelReason::LIMIT_REACH) {
             _executor.set_is_report_on_cancel(false);
         }
@@ -273,6 +262,7 @@ Status FragmentExecState::cancel(const 
PPlanFragmentCancelReason& reason, const
         if (_pipe != nullptr) {
             _pipe->cancel(PPlanFragmentCancelReason_Name(reason));
         }
+        _cancelled = true;
     }
     return Status::OK();
 }
@@ -447,11 +437,15 @@ FragmentMgr::FragmentMgr(ExecEnv* exec_env)
                 .set_max_threads(config::fragment_pool_thread_num_max)
                 .set_max_queue_size(config::fragment_pool_queue_size)
                 .build(&_thread_pool);
+
+    REGISTER_HOOK_METRIC(fragment_thread_pool_queue_size,
+                         [this]() { return _thread_pool->get_queue_size(); });
     CHECK(s.ok()) << s.to_string();
 }
 
 FragmentMgr::~FragmentMgr() {
     DEREGISTER_HOOK_METRIC(plan_fragment_count);
+    DEREGISTER_HOOK_METRIC(fragment_thread_pool_queue_size);
     _stop_background_threads_latch.count_down();
     if (_cancel_thread) {
         _cancel_thread->join();
@@ -548,7 +542,7 @@ Status FragmentMgr::start_query_execution(const 
PExecPlanFragmentStartRequest* r
                                     "timeout or be cancelled. host: ",
                                     BackendOptions::get_localhost()));
     }
-    search->second->set_ready_to_execute();
+    search->second->set_ready_to_execute(false);
     return Status::OK();
 }
 
@@ -673,10 +667,12 @@ Status FragmentMgr::exec_plan_fragment(const 
TExecPlanFragmentParams& params, Fi
             std::lock_guard<std::mutex> lock(_lock);
             _fragment_map.erase(params.params.fragment_instance_id);
         }
-        exec_state->cancel_before_execute();
+        exec_state->cancel(PPlanFragmentCancelReason::INTERNAL_ERROR,
+                           "push plan fragment to thread pool failed");
         return Status::InternalError(
-                strings::Substitute("Put planfragment to thread pool failed. 
err = $0, BE: $1",
-                                    st.get_error_msg(), 
BackendOptions::get_localhost()));
+                strings::Substitute("push plan fragment $0 to thread pool 
failed. err = $1, BE: $2",
+                                    
print_id(params.params.fragment_instance_id), st.get_error_msg(),
+                                    BackendOptions::get_localhost()));
     }
 
     return Status::OK();
@@ -714,9 +710,6 @@ void FragmentMgr::cancel_worker() {
             }
             for (auto it = _fragments_ctx_map.begin(); it != 
_fragments_ctx_map.end();) {
                 if (it->second->is_timeout(now)) {
-                    // The execution logic of the instance needs to be 
notified.
-                    // The execution logic of the instance will eventually 
cancel the execution plan.
-                    it->second->set_ready_to_execute();
                     it = _fragments_ctx_map.erase(it);
                 } else {
                     ++it;
diff --git a/be/src/runtime/plan_fragment_executor.cpp 
b/be/src/runtime/plan_fragment_executor.cpp
index 8f8cdd62b7..6923059553 100644
--- a/be/src/runtime/plan_fragment_executor.cpp
+++ b/be/src/runtime/plan_fragment_executor.cpp
@@ -636,6 +636,8 @@ void PlanFragmentExecutor::cancel(const 
PPlanFragmentCancelReason& reason, const
     _cancel_reason = reason;
     _cancel_msg = msg;
     _runtime_state->set_is_cancelled(true);
+    // To notify wait_for_start()
+    _runtime_state->get_query_fragments_ctx()->set_ready_to_execute(true);
 
     // must close stream_mgr to avoid dead lock in Exchange Node
     auto env = _runtime_state->exec_env();
@@ -646,10 +648,8 @@ void PlanFragmentExecutor::cancel(const 
PPlanFragmentCancelReason& reason, const
         env->stream_mgr()->cancel(id);
         env->result_mgr()->cancel(id);
     }
-}
-
-void PlanFragmentExecutor::set_abort() {
-    update_status(Status::Aborted("Execution aborted before start"));
+    // Cancel the result queue manager used by spark doris connector
+    _exec_env->result_queue_mgr()->update_queue_status(id, 
Status::Aborted(msg));
 }
 
 const RowDescriptor& PlanFragmentExecutor::row_desc() {
diff --git a/be/src/runtime/plan_fragment_executor.h 
b/be/src/runtime/plan_fragment_executor.h
index de11ca87d9..72a5d714ea 100644
--- a/be/src/runtime/plan_fragment_executor.h
+++ b/be/src/runtime/plan_fragment_executor.h
@@ -121,11 +121,6 @@ public:
     // in open()/get_next().
     void close();
 
-    // Abort this execution. Must be called if we skip running open().
-    // It will let DataSink node closed with error status, to avoid use 
resources which created in open() phase.
-    // DataSink node should distinguish Aborted status from other error status.
-    void set_abort();
-
     // Initiate cancellation. Must not be called until after prepare() 
returned.
     void cancel(const PPlanFragmentCancelReason& reason = 
PPlanFragmentCancelReason::INTERNAL_ERROR,
                 const std::string& msg = "");
diff --git a/be/src/runtime/query_fragments_ctx.h 
b/be/src/runtime/query_fragments_ctx.h
index 1e7b3874c6..7d88861031 100644
--- a/be/src/runtime/query_fragments_ctx.h
+++ b/be/src/runtime/query_fragments_ctx.h
@@ -21,6 +21,7 @@
 #include <condition_variable>
 #include <string>
 
+#include "common/config.h"
 #include "common/object_pool.h"
 #include "gen_cpp/PaloInternalService_types.h" // for TQueryOptions
 #include "gen_cpp/Types_types.h"               // for TUniqueId
@@ -72,19 +73,22 @@ public:
 
     ThreadPoolToken* get_serial_token() { return _serial_thread_token.get(); }
 
-    void set_ready_to_execute() {
+    void set_ready_to_execute(bool is_cancelled) {
         {
             std::lock_guard<std::mutex> l(_start_lock);
+            _is_cancelled = is_cancelled;
             _ready_to_execute = true;
         }
         _start_cond.notify_all();
     }
 
-    void wait_for_start() {
+    bool wait_for_start() {
+        int wait_time = config::max_fragment_start_wait_time_seconds;
         std::unique_lock<std::mutex> l(_start_lock);
-        while (!_ready_to_execute.load()) {
-            _start_cond.wait(l);
+        while (!_ready_to_execute.load() && !_is_cancelled.load() && 
--wait_time > 0) {
+            _start_cond.wait_for(l, std::chrono::seconds(1));
         }
+        return _ready_to_execute.load() && !_is_cancelled.load();
     }
 
 public:
@@ -127,6 +131,7 @@ private:
     // Only valid when _need_wait_execution_trigger is set to true in 
FragmentExecState.
     // And all fragments of this query will start execution when this is set 
to true.
     std::atomic<bool> _ready_to_execute {false};
+    std::atomic<bool> _is_cancelled {false};
 };
 
 } // end of namespace
diff --git a/be/src/util/doris_metrics.h b/be/src/util/doris_metrics.h
index 8015dcaefa..9353760eaf 100644
--- a/be/src/util/doris_metrics.h
+++ b/be/src/util/doris_metrics.h
@@ -197,6 +197,7 @@ public:
     UIntGauge* add_batch_task_queue_size;
     UIntGauge* send_batch_thread_pool_thread_num;
     UIntGauge* send_batch_thread_pool_queue_size;
+    UIntGauge* fragment_thread_pool_queue_size;
 
     static DorisMetrics* instance() {
         static DorisMetrics instance;
diff --git a/be/src/vec/exec/volap_scan_node.cpp 
b/be/src/vec/exec/volap_scan_node.cpp
index 2a06138767..999c7716f1 100644
--- a/be/src/vec/exec/volap_scan_node.cpp
+++ b/be/src/vec/exec/volap_scan_node.cpp
@@ -216,8 +216,7 @@ void VOlapScanNode::scanner_thread(VOlapScanner* scanner) {
             num_rows_in_block < _runtime_state->batch_size())) {
         if (UNLIKELY(_transfer_done)) {
             eos = true;
-            status = Status::Cancelled("Cancelled");
-            LOG(INFO) << "Scan thread cancelled, cause query done, maybe reach 
limit.";
+            status = Status::Cancelled("Scan thread cancelled, cause query 
done, maybe reach limit.");
             break;
         }
 
@@ -542,7 +541,6 @@ Status VOlapScanNode::get_next(RuntimeState* state, Block* 
block, bool* eos) {
 
             _block_consumed_cv.notify_all();
             *eos = true;
-            LOG(INFO) << "VOlapScanNode ReachedLimit.";
         } else {
             *eos = false;
         }
diff --git a/be/src/vec/sink/vdata_stream_sender.h 
b/be/src/vec/sink/vdata_stream_sender.h
index 09c9549a5b..94aefeda99 100644
--- a/be/src/vec/sink/vdata_stream_sender.h
+++ b/be/src/vec/sink/vdata_stream_sender.h
@@ -175,9 +175,6 @@ public:
         std::string localhost = BackendOptions::get_localhost();
         _is_local = (_brpc_dest_addr.hostname == localhost) &&
                     (_brpc_dest_addr.port == config::brpc_port);
-        if (_is_local) {
-            LOG(INFO) << "will use local Exchange, dest_node_id is : " << 
_dest_node_id;
-        }
     }
 
     virtual ~Channel() {


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org
For additional commands, e-mail: commits-h...@doris.apache.org

Reply via email to