This is an automated email from the ASF dual-hosted git repository. dataroaring pushed a commit to branch branch-2.0 in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/branch-2.0 by this push: new a96bd4b0e6f fix load channel may memory leak (#32301) a96bd4b0e6f is described below commit a96bd4b0e6f93a5cd9857eda2638dda6871911ef Author: HHoflittlefish777 <77738092+hhoflittlefish...@users.noreply.github.com> AuthorDate: Sun Mar 17 11:05:18 2024 +0800 fix load channel may memory leak (#32301) Co-authored-by: zhengyu <freeman.zhang1...@gmail.com> --- be/src/pipeline/pipeline_fragment_context.cpp | 2 +- be/src/runtime/plan_fragment_executor.cpp | 2 +- be/src/runtime/runtime_state.cpp | 4 ++++ be/src/runtime/runtime_state.h | 22 ++++++++++++++++------ be/src/vec/sink/vtablet_sink.cpp | 4 ++++ 5 files changed, 26 insertions(+), 8 deletions(-) diff --git a/be/src/pipeline/pipeline_fragment_context.cpp b/be/src/pipeline/pipeline_fragment_context.cpp index f3e1e4ef292..9f860610681 100644 --- a/be/src/pipeline/pipeline_fragment_context.cpp +++ b/be/src/pipeline/pipeline_fragment_context.cpp @@ -157,7 +157,7 @@ void PipelineFragmentContext::cancel(const PPlanFragmentCancelReason& reason, if (reason != PPlanFragmentCancelReason::LIMIT_REACH) { _exec_status = Status::Cancelled(msg); } - _runtime_state->set_is_cancelled(true, msg); + _runtime_state->set_is_cancelled(msg); LOG_WARNING("Query {} instance {} cancelled, reason {}, message {}", print_id(_query_id), print_id(_fragment_instance_id), PPlanFragmentCancelReason_Name(reason), msg.substr(0, 50)); diff --git a/be/src/runtime/plan_fragment_executor.cpp b/be/src/runtime/plan_fragment_executor.cpp index d3f05b32f7c..9a634096d8c 100644 --- a/be/src/runtime/plan_fragment_executor.cpp +++ b/be/src/runtime/plan_fragment_executor.cpp @@ -474,7 +474,7 @@ void PlanFragmentExecutor::cancel(const PPlanFragmentCancelReason& reason, const DCHECK(_prepared); _cancel_reason = reason; _cancel_msg = msg; - _runtime_state->set_is_cancelled(true, msg); + _runtime_state->set_is_cancelled(msg); // To notify wait_for_start() _runtime_state->get_query_ctx()->set_ready_to_execute(true); diff --git a/be/src/runtime/runtime_state.cpp b/be/src/runtime/runtime_state.cpp index a4fa27dff72..88a2e630d43 100644 --- a/be/src/runtime/runtime_state.cpp +++ b/be/src/runtime/runtime_state.cpp @@ -274,6 +274,10 @@ void RuntimeState::get_unreported_errors(std::vector<std::string>* new_errors) { } } +std::string RuntimeState::cancel_reason() const { + return _cancel_reason; +} + Status RuntimeState::set_mem_limit_exceeded(const std::string& msg) { { std::lock_guard<std::mutex> l(_process_status_lock); diff --git a/be/src/runtime/runtime_state.h b/be/src/runtime/runtime_state.h index 02425ed2e5c..1e7375cbb97 100644 --- a/be/src/runtime/runtime_state.h +++ b/be/src/runtime/runtime_state.h @@ -165,13 +165,22 @@ public: void get_unreported_errors(std::vector<std::string>* new_errors); bool is_cancelled() const { return _is_cancelled.load(); } + std::string cancel_reason() const; int codegen_level() const { return _query_options.codegen_level; } - void set_is_cancelled(bool v, std::string msg) { - _is_cancelled.store(v); - // Create a error status, so that we could print error stack, and - // we could know which path call cancel. - LOG_WARNING("Task {} is cancelled, msg: {}", print_id(_fragment_instance_id), - Status::Error<ErrorCode::CANCELLED>(msg)); + void set_is_cancelled(std::string msg) { + if (!_is_cancelled.exchange(true)) { + _cancel_reason = msg; + // Create a error status, so that we could print error stack, and + // we could know which path call cancel. + LOG(WARNING) << "Task is cancelled, query id: " << print_id(_query_id) + << ", instance id: " << print_id(_fragment_instance_id) + << ", st = " << Status::Error<ErrorCode::CANCELLED>(msg); + } else { + LOG(WARNING) << "Task is cancelled, query id: " << print_id(_query_id) + << ", instance id: " << print_id(_fragment_instance_id) + << ", original cancel msg: " << _cancel_reason + << ", new cancel msg: " << Status::Error<ErrorCode::CANCELLED>(msg); + } } void set_backend_id(int64_t backend_id) { _backend_id = backend_id; } @@ -486,6 +495,7 @@ private: // if true, execution should stop with a CANCELLED status std::atomic<bool> _is_cancelled; + std::string _cancel_reason; int _per_fragment_instance_idx; int _num_per_fragment_instances = 0; diff --git a/be/src/vec/sink/vtablet_sink.cpp b/be/src/vec/sink/vtablet_sink.cpp index 56e1829202c..a4e539b4bf6 100644 --- a/be/src/vec/sink/vtablet_sink.cpp +++ b/be/src/vec/sink/vtablet_sink.cpp @@ -904,6 +904,10 @@ Status VNodeChannel::close_wait(RuntimeState* state) { } _close_time_ms = UnixMillis() - _close_time_ms; + if (_cancelled || state->is_cancelled()) { + cancel(state->cancel_reason()); + } + if (_add_batches_finished) { _close_check(); state->tablet_commit_infos().insert(state->tablet_commit_infos().end(), --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org For additional commands, e-mail: commits-h...@doris.apache.org