This is an automated email from the ASF dual-hosted git repository.

yiguolei pushed a commit to branch branch-2.1
in repository https://gitbox.apache.org/repos/asf/doris.git


The following commit(s) were added to refs/heads/branch-2.1 by this push:
     new 7675383c400 [bugfix](deadlock) fix dead lock in cancel fragment 
(#33181)
7675383c400 is described below

commit 7675383c400a91b1813293306c45c45024854128
Author: yiguolei <676222...@qq.com>
AuthorDate: Wed Apr 3 13:40:07 2024 +0800

    [bugfix](deadlock) fix dead lock in cancel fragment (#33181)
    
    
    
    Co-authored-by: yiguolei <yiguo...@gmail.com>
---
 be/src/pipeline/exec/exchange_sink_buffer.cpp      |  2 +-
 be/src/pipeline/pipeline_fragment_context.cpp      | 48 +++++++++++-----------
 be/src/pipeline/pipeline_fragment_context.h        |  1 -
 .../pipeline_x/pipeline_x_fragment_context.cpp     | 36 ++++++++--------
 be/src/pipeline/task_scheduler.cpp                 |  4 +-
 be/src/runtime/fragment_mgr.cpp                    |  2 +-
 be/src/runtime/query_context.cpp                   | 14 ++++---
 be/src/runtime/query_context.h                     |  2 +-
 8 files changed, 56 insertions(+), 53 deletions(-)

diff --git a/be/src/pipeline/exec/exchange_sink_buffer.cpp 
b/be/src/pipeline/exec/exchange_sink_buffer.cpp
index 0eba79b25c5..3b02373ecbd 100644
--- a/be/src/pipeline/exec/exchange_sink_buffer.cpp
+++ b/be/src/pipeline/exec/exchange_sink_buffer.cpp
@@ -454,7 +454,7 @@ void ExchangeSinkBuffer<Parent>::_ended(InstanceLoId id) {
 template <typename Parent>
 void ExchangeSinkBuffer<Parent>::_failed(InstanceLoId id, const std::string& 
err) {
     _is_finishing = true;
-    _context->cancel(true, err, Status::Cancelled(err));
+    _context->cancel(err, Status::Cancelled(err));
     _ended(id);
 }
 
diff --git a/be/src/pipeline/pipeline_fragment_context.cpp 
b/be/src/pipeline/pipeline_fragment_context.cpp
index 31f8423334f..2c38ef9c890 100644
--- a/be/src/pipeline/pipeline_fragment_context.cpp
+++ b/be/src/pipeline/pipeline_fragment_context.cpp
@@ -158,9 +158,12 @@ bool PipelineFragmentContext::is_timeout(const 
VecDateTimeValue& now) const {
     return false;
 }
 
+// Must not add lock in this method. Because it will call query ctx cancel. And
+// QueryCtx cancel will call fragment ctx cancel. And Also Fragment ctx's 
running
+// Method like exchange sink buffer will call query ctx cancel. If we add lock 
here
+// There maybe dead lock.
 void PipelineFragmentContext::cancel(const PPlanFragmentCancelReason& reason,
                                      const std::string& msg) {
-    std::lock_guard<std::mutex> l(_cancel_lock);
     LOG_INFO("PipelineFragmentContext::cancel")
             .tag("query_id", print_id(_query_ctx->query_id()))
             .tag("fragment_id", _fragment_id)
@@ -172,30 +175,29 @@ void PipelineFragmentContext::cancel(const 
PPlanFragmentCancelReason& reason,
     // can not be cancelled if other fragments set the query_ctx cancelled, 
this will
     // make result receiver on fe be stocked on rpc forever until timeout...
     // We need a more detail discussion.
-    if (_query_ctx->cancel(true, msg, Status::Cancelled(msg))) {
-        if (reason == PPlanFragmentCancelReason::LIMIT_REACH) {
-            _is_report_on_cancel = false;
-        } else {
-            LOG(WARNING) << "PipelineFragmentContext "
-                         << PrintInstanceStandardInfo(_query_id, 
_fragment_instance_id)
-                         << " is canceled, cancel message: " << msg;
-        }
-
-        _runtime_state->set_process_status(_query_ctx->exec_status());
-        // Get pipe from new load stream manager and send cancel to it or the 
fragment may hang to wait read from pipe
-        // For stream load the fragment's query_id == load id, it is set in FE.
-        auto stream_load_ctx = 
_exec_env->new_load_stream_mgr()->get(_query_id);
-        if (stream_load_ctx != nullptr) {
-            stream_load_ctx->pipe->cancel(msg);
-        }
+    _query_ctx->cancel(msg, Status::Cancelled(msg));
+    if (reason == PPlanFragmentCancelReason::LIMIT_REACH) {
+        _is_report_on_cancel = false;
+    } else {
+        LOG(WARNING) << "PipelineFragmentContext "
+                     << PrintInstanceStandardInfo(_query_id, 
_fragment_instance_id)
+                     << " is canceled, cancel message: " << msg;
+    }
 
-        // must close stream_mgr to avoid dead lock in Exchange Node
-        // TODO bug llj  fix this other instance will not cancel
-        _exec_env->vstream_mgr()->cancel(_fragment_instance_id, 
Status::Cancelled(msg));
-        // Cancel the result queue manager used by spark doris connector
-        // TODO pipeline incomp
-        // _exec_env->result_queue_mgr()->update_queue_status(id, 
Status::Aborted(msg));
+    _runtime_state->set_process_status(_query_ctx->exec_status());
+    // Get pipe from new load stream manager and send cancel to it or the 
fragment may hang to wait read from pipe
+    // For stream load the fragment's query_id == load id, it is set in FE.
+    auto stream_load_ctx = _exec_env->new_load_stream_mgr()->get(_query_id);
+    if (stream_load_ctx != nullptr) {
+        stream_load_ctx->pipe->cancel(msg);
     }
+
+    // must close stream_mgr to avoid dead lock in Exchange Node
+    // TODO bug llj  fix this other instance will not cancel
+    _exec_env->vstream_mgr()->cancel(_fragment_instance_id, 
Status::Cancelled(msg));
+    // Cancel the result queue manager used by spark doris connector
+    // TODO pipeline incomp
+    // _exec_env->result_queue_mgr()->update_queue_status(id, 
Status::Aborted(msg));
 }
 
 PipelinePtr PipelineFragmentContext::add_pipeline() {
diff --git a/be/src/pipeline/pipeline_fragment_context.h 
b/be/src/pipeline/pipeline_fragment_context.h
index 8ad36612f4a..96936233b39 100644
--- a/be/src/pipeline/pipeline_fragment_context.h
+++ b/be/src/pipeline/pipeline_fragment_context.h
@@ -213,7 +213,6 @@ protected:
 
     VecDateTimeValue _start_time;
     int _timeout = -1;
-    std::mutex _cancel_lock;
 
 private:
     std::vector<std::unique_ptr<PipelineTask>> _tasks;
diff --git a/be/src/pipeline/pipeline_x/pipeline_x_fragment_context.cpp 
b/be/src/pipeline/pipeline_x/pipeline_x_fragment_context.cpp
index fa53e6f4b11..4419ecbe7f4 100644
--- a/be/src/pipeline/pipeline_x/pipeline_x_fragment_context.cpp
+++ b/be/src/pipeline/pipeline_x/pipeline_x_fragment_context.cpp
@@ -126,7 +126,6 @@ PipelineXFragmentContext::~PipelineXFragmentContext() {
 
 void PipelineXFragmentContext::cancel(const PPlanFragmentCancelReason& reason,
                                       const std::string& msg) {
-    std::lock_guard<std::mutex> l(_cancel_lock);
     LOG_INFO("PipelineXFragmentContext::cancel")
             .tag("query_id", print_id(_query_id))
             .tag("fragment_id", _fragment_id)
@@ -135,25 +134,24 @@ void PipelineXFragmentContext::cancel(const 
PPlanFragmentCancelReason& reason,
     if (reason == PPlanFragmentCancelReason::TIMEOUT) {
         LOG(WARNING) << "PipelineXFragmentContext is cancelled due to timeout 
: " << debug_string();
     }
-    if (_query_ctx->cancel(true, msg, Status::Cancelled(msg), _fragment_id)) {
-        if (reason == PPlanFragmentCancelReason::LIMIT_REACH) {
-            _is_report_on_cancel = false;
-        } else {
-            for (auto& id : _fragment_instance_ids) {
-                LOG(WARNING) << "PipelineXFragmentContext cancel instance: " 
<< print_id(id);
-            }
-        }
-        // Get pipe from new load stream manager and send cancel to it or the 
fragment may hang to wait read from pipe
-        // For stream load the fragment's query_id == load id, it is set in FE.
-        auto stream_load_ctx = 
_exec_env->new_load_stream_mgr()->get(_query_id);
-        if (stream_load_ctx != nullptr) {
-            stream_load_ctx->pipe->cancel(msg);
+    _query_ctx->cancel(msg, Status::Cancelled(msg), _fragment_id);
+    if (reason == PPlanFragmentCancelReason::LIMIT_REACH) {
+        _is_report_on_cancel = false;
+    } else {
+        for (auto& id : _fragment_instance_ids) {
+            LOG(WARNING) << "PipelineXFragmentContext cancel instance: " << 
print_id(id);
         }
-
-        // Cancel the result queue manager used by spark doris connector
-        // TODO pipeline incomp
-        // _exec_env->result_queue_mgr()->update_queue_status(id, 
Status::Aborted(msg));
     }
+    // Get pipe from new load stream manager and send cancel to it or the 
fragment may hang to wait read from pipe
+    // For stream load the fragment's query_id == load id, it is set in FE.
+    auto stream_load_ctx = _exec_env->new_load_stream_mgr()->get(_query_id);
+    if (stream_load_ctx != nullptr) {
+        stream_load_ctx->pipe->cancel(msg);
+    }
+
+    // Cancel the result queue manager used by spark doris connector
+    // TODO pipeline incomp
+    // _exec_env->result_queue_mgr()->update_queue_status(id, 
Status::Aborted(msg));
     for (auto& tasks : _tasks) {
         for (auto& task : tasks) {
             task->clear_blocking_state();
@@ -1326,7 +1324,7 @@ void 
PipelineXFragmentContext::close_if_prepare_failed(Status st) {
             close_a_pipeline();
         }
     }
-    _query_ctx->cancel(true, st.to_string(), st, _fragment_id);
+    _query_ctx->cancel(st.to_string(), st, _fragment_id);
 }
 
 void PipelineXFragmentContext::_close_fragment_instance() {
diff --git a/be/src/pipeline/task_scheduler.cpp 
b/be/src/pipeline/task_scheduler.cpp
index 3e03f3636fc..8819067e597 100644
--- a/be/src/pipeline/task_scheduler.cpp
+++ b/be/src/pipeline/task_scheduler.cpp
@@ -135,7 +135,7 @@ void BlockedTaskScheduler::_schedule() {
                              << ", instance_id=" << 
print_id(task->instance_id())
                              << ", task info: " << task->debug_string();
 
-                task->query_context()->cancel(true, "", Status::Cancelled(""));
+                task->query_context()->cancel("", Status::Cancelled(""));
                 _make_task_run(local_blocked_tasks, iter);
             } else if (state == PipelineTaskState::BLOCKED_FOR_DEPENDENCY) {
                 if (task->has_dependency()) {
@@ -241,7 +241,7 @@ void _close_task(PipelineTask* task, PipelineTaskState 
state, Status exec_status
             
task->fragment_context()->cancel(PPlanFragmentCancelReason::INTERNAL_ERROR,
                                              std::string(status.msg()));
         } else {
-            task->query_context()->cancel(true, status.to_string(),
+            task->query_context()->cancel(status.to_string(),
                                           
Status::Cancelled(status.to_string()));
         }
         state = PipelineTaskState::CANCELED;
diff --git a/be/src/runtime/fragment_mgr.cpp b/be/src/runtime/fragment_mgr.cpp
index d852385d265..68c4afa3821 100644
--- a/be/src/runtime/fragment_mgr.cpp
+++ b/be/src/runtime/fragment_mgr.cpp
@@ -1002,7 +1002,7 @@ void FragmentMgr::cancel_query(const TUniqueId& query_id, 
const PPlanFragmentCan
         }
     }
 
-    query_ctx->cancel(true, msg, Status::Cancelled(msg));
+    query_ctx->cancel(msg, Status::Cancelled(msg));
     {
         std::lock_guard<std::mutex> state_lock(_lock);
         _query_ctx_map.erase(query_id);
diff --git a/be/src/runtime/query_context.cpp b/be/src/runtime/query_context.cpp
index 4fb5df7c7dd..681d0e333c7 100644
--- a/be/src/runtime/query_context.cpp
+++ b/be/src/runtime/query_context.cpp
@@ -157,12 +157,14 @@ void QueryContext::set_execution_dependency_ready() {
     _execution_dependency->set_ready();
 }
 
-bool QueryContext::cancel(bool v, std::string msg, Status new_status, int 
fragment_id) {
-    if (_is_cancelled) {
-        return false;
+void QueryContext::cancel(std::string msg, Status new_status, int fragment_id) 
{
+    // Just for CAS need a left value
+    bool false_cancel = false;
+    if (!_is_cancelled.compare_exchange_strong(false_cancel, true)) {
+        return;
     }
+    DCHECK(!false_cancel && _is_cancelled);
     set_exec_status(new_status);
-    _is_cancelled.store(v);
 
     set_ready_to_execute(true);
     std::vector<std::weak_ptr<pipeline::PipelineFragmentContext>> 
ctx_to_cancel;
@@ -175,12 +177,14 @@ bool QueryContext::cancel(bool v, std::string msg, Status 
new_status, int fragme
             ctx_to_cancel.push_back(f_context);
         }
     }
+    // Must not add lock here. There maybe dead lock because it will call 
fragment
+    // ctx cancel and fragment ctx will call query ctx cancel.
     for (auto& f_context : ctx_to_cancel) {
         if (auto pipeline_ctx = f_context.lock()) {
             pipeline_ctx->cancel(PPlanFragmentCancelReason::INTERNAL_ERROR, 
msg);
         }
     }
-    return true;
+    return;
 }
 
 void QueryContext::cancel_all_pipeline_context(const 
PPlanFragmentCancelReason& reason,
diff --git a/be/src/runtime/query_context.h b/be/src/runtime/query_context.h
index 1551af46c95..5dd0999a63d 100644
--- a/be/src/runtime/query_context.h
+++ b/be/src/runtime/query_context.h
@@ -114,7 +114,7 @@ public:
                                    const std::string& msg);
     void set_pipeline_context(const int fragment_id,
                               
std::shared_ptr<pipeline::PipelineFragmentContext> pip_ctx);
-    bool cancel(bool v, std::string msg, Status new_status, int fragment_id = 
-1);
+    void cancel(std::string msg, Status new_status, int fragment_id = -1);
 
     void set_exec_status(Status new_status) {
         if (new_status.ok()) {


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org
For additional commands, e-mail: commits-h...@doris.apache.org

Reply via email to