This is an automated email from the ASF dual-hosted git repository. yiguolei pushed a commit to branch branch-2.1 in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/branch-2.1 by this push: new 7675383c400 [bugfix](deadlock) fix dead lock in cancel fragment (#33181) 7675383c400 is described below commit 7675383c400a91b1813293306c45c45024854128 Author: yiguolei <676222...@qq.com> AuthorDate: Wed Apr 3 13:40:07 2024 +0800 [bugfix](deadlock) fix dead lock in cancel fragment (#33181) Co-authored-by: yiguolei <yiguo...@gmail.com> --- be/src/pipeline/exec/exchange_sink_buffer.cpp | 2 +- be/src/pipeline/pipeline_fragment_context.cpp | 48 +++++++++++----------- be/src/pipeline/pipeline_fragment_context.h | 1 - .../pipeline_x/pipeline_x_fragment_context.cpp | 36 ++++++++-------- be/src/pipeline/task_scheduler.cpp | 4 +- be/src/runtime/fragment_mgr.cpp | 2 +- be/src/runtime/query_context.cpp | 14 ++++--- be/src/runtime/query_context.h | 2 +- 8 files changed, 56 insertions(+), 53 deletions(-) diff --git a/be/src/pipeline/exec/exchange_sink_buffer.cpp b/be/src/pipeline/exec/exchange_sink_buffer.cpp index 0eba79b25c5..3b02373ecbd 100644 --- a/be/src/pipeline/exec/exchange_sink_buffer.cpp +++ b/be/src/pipeline/exec/exchange_sink_buffer.cpp @@ -454,7 +454,7 @@ void ExchangeSinkBuffer<Parent>::_ended(InstanceLoId id) { template <typename Parent> void ExchangeSinkBuffer<Parent>::_failed(InstanceLoId id, const std::string& err) { _is_finishing = true; - _context->cancel(true, err, Status::Cancelled(err)); + _context->cancel(err, Status::Cancelled(err)); _ended(id); } diff --git a/be/src/pipeline/pipeline_fragment_context.cpp b/be/src/pipeline/pipeline_fragment_context.cpp index 31f8423334f..2c38ef9c890 100644 --- a/be/src/pipeline/pipeline_fragment_context.cpp +++ b/be/src/pipeline/pipeline_fragment_context.cpp @@ -158,9 +158,12 @@ bool PipelineFragmentContext::is_timeout(const VecDateTimeValue& now) const { return false; } +// Must not add lock in this method. Because it will call query ctx cancel. And +// QueryCtx cancel will call fragment ctx cancel. And Also Fragment ctx's running +// Method like exchange sink buffer will call query ctx cancel. If we add lock here +// There maybe dead lock. void PipelineFragmentContext::cancel(const PPlanFragmentCancelReason& reason, const std::string& msg) { - std::lock_guard<std::mutex> l(_cancel_lock); LOG_INFO("PipelineFragmentContext::cancel") .tag("query_id", print_id(_query_ctx->query_id())) .tag("fragment_id", _fragment_id) @@ -172,30 +175,29 @@ void PipelineFragmentContext::cancel(const PPlanFragmentCancelReason& reason, // can not be cancelled if other fragments set the query_ctx cancelled, this will // make result receiver on fe be stocked on rpc forever until timeout... // We need a more detail discussion. - if (_query_ctx->cancel(true, msg, Status::Cancelled(msg))) { - if (reason == PPlanFragmentCancelReason::LIMIT_REACH) { - _is_report_on_cancel = false; - } else { - LOG(WARNING) << "PipelineFragmentContext " - << PrintInstanceStandardInfo(_query_id, _fragment_instance_id) - << " is canceled, cancel message: " << msg; - } - - _runtime_state->set_process_status(_query_ctx->exec_status()); - // Get pipe from new load stream manager and send cancel to it or the fragment may hang to wait read from pipe - // For stream load the fragment's query_id == load id, it is set in FE. - auto stream_load_ctx = _exec_env->new_load_stream_mgr()->get(_query_id); - if (stream_load_ctx != nullptr) { - stream_load_ctx->pipe->cancel(msg); - } + _query_ctx->cancel(msg, Status::Cancelled(msg)); + if (reason == PPlanFragmentCancelReason::LIMIT_REACH) { + _is_report_on_cancel = false; + } else { + LOG(WARNING) << "PipelineFragmentContext " + << PrintInstanceStandardInfo(_query_id, _fragment_instance_id) + << " is canceled, cancel message: " << msg; + } - // must close stream_mgr to avoid dead lock in Exchange Node - // TODO bug llj fix this other instance will not cancel - _exec_env->vstream_mgr()->cancel(_fragment_instance_id, Status::Cancelled(msg)); - // Cancel the result queue manager used by spark doris connector - // TODO pipeline incomp - // _exec_env->result_queue_mgr()->update_queue_status(id, Status::Aborted(msg)); + _runtime_state->set_process_status(_query_ctx->exec_status()); + // Get pipe from new load stream manager and send cancel to it or the fragment may hang to wait read from pipe + // For stream load the fragment's query_id == load id, it is set in FE. + auto stream_load_ctx = _exec_env->new_load_stream_mgr()->get(_query_id); + if (stream_load_ctx != nullptr) { + stream_load_ctx->pipe->cancel(msg); } + + // must close stream_mgr to avoid dead lock in Exchange Node + // TODO bug llj fix this other instance will not cancel + _exec_env->vstream_mgr()->cancel(_fragment_instance_id, Status::Cancelled(msg)); + // Cancel the result queue manager used by spark doris connector + // TODO pipeline incomp + // _exec_env->result_queue_mgr()->update_queue_status(id, Status::Aborted(msg)); } PipelinePtr PipelineFragmentContext::add_pipeline() { diff --git a/be/src/pipeline/pipeline_fragment_context.h b/be/src/pipeline/pipeline_fragment_context.h index 8ad36612f4a..96936233b39 100644 --- a/be/src/pipeline/pipeline_fragment_context.h +++ b/be/src/pipeline/pipeline_fragment_context.h @@ -213,7 +213,6 @@ protected: VecDateTimeValue _start_time; int _timeout = -1; - std::mutex _cancel_lock; private: std::vector<std::unique_ptr<PipelineTask>> _tasks; diff --git a/be/src/pipeline/pipeline_x/pipeline_x_fragment_context.cpp b/be/src/pipeline/pipeline_x/pipeline_x_fragment_context.cpp index fa53e6f4b11..4419ecbe7f4 100644 --- a/be/src/pipeline/pipeline_x/pipeline_x_fragment_context.cpp +++ b/be/src/pipeline/pipeline_x/pipeline_x_fragment_context.cpp @@ -126,7 +126,6 @@ PipelineXFragmentContext::~PipelineXFragmentContext() { void PipelineXFragmentContext::cancel(const PPlanFragmentCancelReason& reason, const std::string& msg) { - std::lock_guard<std::mutex> l(_cancel_lock); LOG_INFO("PipelineXFragmentContext::cancel") .tag("query_id", print_id(_query_id)) .tag("fragment_id", _fragment_id) @@ -135,25 +134,24 @@ void PipelineXFragmentContext::cancel(const PPlanFragmentCancelReason& reason, if (reason == PPlanFragmentCancelReason::TIMEOUT) { LOG(WARNING) << "PipelineXFragmentContext is cancelled due to timeout : " << debug_string(); } - if (_query_ctx->cancel(true, msg, Status::Cancelled(msg), _fragment_id)) { - if (reason == PPlanFragmentCancelReason::LIMIT_REACH) { - _is_report_on_cancel = false; - } else { - for (auto& id : _fragment_instance_ids) { - LOG(WARNING) << "PipelineXFragmentContext cancel instance: " << print_id(id); - } - } - // Get pipe from new load stream manager and send cancel to it or the fragment may hang to wait read from pipe - // For stream load the fragment's query_id == load id, it is set in FE. - auto stream_load_ctx = _exec_env->new_load_stream_mgr()->get(_query_id); - if (stream_load_ctx != nullptr) { - stream_load_ctx->pipe->cancel(msg); + _query_ctx->cancel(msg, Status::Cancelled(msg), _fragment_id); + if (reason == PPlanFragmentCancelReason::LIMIT_REACH) { + _is_report_on_cancel = false; + } else { + for (auto& id : _fragment_instance_ids) { + LOG(WARNING) << "PipelineXFragmentContext cancel instance: " << print_id(id); } - - // Cancel the result queue manager used by spark doris connector - // TODO pipeline incomp - // _exec_env->result_queue_mgr()->update_queue_status(id, Status::Aborted(msg)); } + // Get pipe from new load stream manager and send cancel to it or the fragment may hang to wait read from pipe + // For stream load the fragment's query_id == load id, it is set in FE. + auto stream_load_ctx = _exec_env->new_load_stream_mgr()->get(_query_id); + if (stream_load_ctx != nullptr) { + stream_load_ctx->pipe->cancel(msg); + } + + // Cancel the result queue manager used by spark doris connector + // TODO pipeline incomp + // _exec_env->result_queue_mgr()->update_queue_status(id, Status::Aborted(msg)); for (auto& tasks : _tasks) { for (auto& task : tasks) { task->clear_blocking_state(); @@ -1326,7 +1324,7 @@ void PipelineXFragmentContext::close_if_prepare_failed(Status st) { close_a_pipeline(); } } - _query_ctx->cancel(true, st.to_string(), st, _fragment_id); + _query_ctx->cancel(st.to_string(), st, _fragment_id); } void PipelineXFragmentContext::_close_fragment_instance() { diff --git a/be/src/pipeline/task_scheduler.cpp b/be/src/pipeline/task_scheduler.cpp index 3e03f3636fc..8819067e597 100644 --- a/be/src/pipeline/task_scheduler.cpp +++ b/be/src/pipeline/task_scheduler.cpp @@ -135,7 +135,7 @@ void BlockedTaskScheduler::_schedule() { << ", instance_id=" << print_id(task->instance_id()) << ", task info: " << task->debug_string(); - task->query_context()->cancel(true, "", Status::Cancelled("")); + task->query_context()->cancel("", Status::Cancelled("")); _make_task_run(local_blocked_tasks, iter); } else if (state == PipelineTaskState::BLOCKED_FOR_DEPENDENCY) { if (task->has_dependency()) { @@ -241,7 +241,7 @@ void _close_task(PipelineTask* task, PipelineTaskState state, Status exec_status task->fragment_context()->cancel(PPlanFragmentCancelReason::INTERNAL_ERROR, std::string(status.msg())); } else { - task->query_context()->cancel(true, status.to_string(), + task->query_context()->cancel(status.to_string(), Status::Cancelled(status.to_string())); } state = PipelineTaskState::CANCELED; diff --git a/be/src/runtime/fragment_mgr.cpp b/be/src/runtime/fragment_mgr.cpp index d852385d265..68c4afa3821 100644 --- a/be/src/runtime/fragment_mgr.cpp +++ b/be/src/runtime/fragment_mgr.cpp @@ -1002,7 +1002,7 @@ void FragmentMgr::cancel_query(const TUniqueId& query_id, const PPlanFragmentCan } } - query_ctx->cancel(true, msg, Status::Cancelled(msg)); + query_ctx->cancel(msg, Status::Cancelled(msg)); { std::lock_guard<std::mutex> state_lock(_lock); _query_ctx_map.erase(query_id); diff --git a/be/src/runtime/query_context.cpp b/be/src/runtime/query_context.cpp index 4fb5df7c7dd..681d0e333c7 100644 --- a/be/src/runtime/query_context.cpp +++ b/be/src/runtime/query_context.cpp @@ -157,12 +157,14 @@ void QueryContext::set_execution_dependency_ready() { _execution_dependency->set_ready(); } -bool QueryContext::cancel(bool v, std::string msg, Status new_status, int fragment_id) { - if (_is_cancelled) { - return false; +void QueryContext::cancel(std::string msg, Status new_status, int fragment_id) { + // Just for CAS need a left value + bool false_cancel = false; + if (!_is_cancelled.compare_exchange_strong(false_cancel, true)) { + return; } + DCHECK(!false_cancel && _is_cancelled); set_exec_status(new_status); - _is_cancelled.store(v); set_ready_to_execute(true); std::vector<std::weak_ptr<pipeline::PipelineFragmentContext>> ctx_to_cancel; @@ -175,12 +177,14 @@ bool QueryContext::cancel(bool v, std::string msg, Status new_status, int fragme ctx_to_cancel.push_back(f_context); } } + // Must not add lock here. There maybe dead lock because it will call fragment + // ctx cancel and fragment ctx will call query ctx cancel. for (auto& f_context : ctx_to_cancel) { if (auto pipeline_ctx = f_context.lock()) { pipeline_ctx->cancel(PPlanFragmentCancelReason::INTERNAL_ERROR, msg); } } - return true; + return; } void QueryContext::cancel_all_pipeline_context(const PPlanFragmentCancelReason& reason, diff --git a/be/src/runtime/query_context.h b/be/src/runtime/query_context.h index 1551af46c95..5dd0999a63d 100644 --- a/be/src/runtime/query_context.h +++ b/be/src/runtime/query_context.h @@ -114,7 +114,7 @@ public: const std::string& msg); void set_pipeline_context(const int fragment_id, std::shared_ptr<pipeline::PipelineFragmentContext> pip_ctx); - bool cancel(bool v, std::string msg, Status new_status, int fragment_id = -1); + void cancel(std::string msg, Status new_status, int fragment_id = -1); void set_exec_status(Status new_status) { if (new_status.ok()) { --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org For additional commands, e-mail: commits-h...@doris.apache.org