This is an automated email from the ASF dual-hosted git repository.

dataroaring pushed a commit to branch branch-2.0
in repository https://gitbox.apache.org/repos/asf/doris.git


The following commit(s) were added to refs/heads/branch-2.0 by this push:
     new a96bd4b0e6f fix load channel may memory leak (#32301)
a96bd4b0e6f is described below

commit a96bd4b0e6f93a5cd9857eda2638dda6871911ef
Author: HHoflittlefish777 <77738092+hhoflittlefish...@users.noreply.github.com>
AuthorDate: Sun Mar 17 11:05:18 2024 +0800

    fix load channel may memory leak (#32301)
    
    Co-authored-by: zhengyu <freeman.zhang1...@gmail.com>
---
 be/src/pipeline/pipeline_fragment_context.cpp |  2 +-
 be/src/runtime/plan_fragment_executor.cpp     |  2 +-
 be/src/runtime/runtime_state.cpp              |  4 ++++
 be/src/runtime/runtime_state.h                | 22 ++++++++++++++++------
 be/src/vec/sink/vtablet_sink.cpp              |  4 ++++
 5 files changed, 26 insertions(+), 8 deletions(-)

diff --git a/be/src/pipeline/pipeline_fragment_context.cpp 
b/be/src/pipeline/pipeline_fragment_context.cpp
index f3e1e4ef292..9f860610681 100644
--- a/be/src/pipeline/pipeline_fragment_context.cpp
+++ b/be/src/pipeline/pipeline_fragment_context.cpp
@@ -157,7 +157,7 @@ void PipelineFragmentContext::cancel(const 
PPlanFragmentCancelReason& reason,
         if (reason != PPlanFragmentCancelReason::LIMIT_REACH) {
             _exec_status = Status::Cancelled(msg);
         }
-        _runtime_state->set_is_cancelled(true, msg);
+        _runtime_state->set_is_cancelled(msg);
         LOG_WARNING("Query {} instance {} cancelled, reason {}, message {}", 
print_id(_query_id),
                     print_id(_fragment_instance_id), 
PPlanFragmentCancelReason_Name(reason),
                     msg.substr(0, 50));
diff --git a/be/src/runtime/plan_fragment_executor.cpp 
b/be/src/runtime/plan_fragment_executor.cpp
index d3f05b32f7c..9a634096d8c 100644
--- a/be/src/runtime/plan_fragment_executor.cpp
+++ b/be/src/runtime/plan_fragment_executor.cpp
@@ -474,7 +474,7 @@ void PlanFragmentExecutor::cancel(const 
PPlanFragmentCancelReason& reason, const
     DCHECK(_prepared);
     _cancel_reason = reason;
     _cancel_msg = msg;
-    _runtime_state->set_is_cancelled(true, msg);
+    _runtime_state->set_is_cancelled(msg);
     // To notify wait_for_start()
     _runtime_state->get_query_ctx()->set_ready_to_execute(true);
 
diff --git a/be/src/runtime/runtime_state.cpp b/be/src/runtime/runtime_state.cpp
index a4fa27dff72..88a2e630d43 100644
--- a/be/src/runtime/runtime_state.cpp
+++ b/be/src/runtime/runtime_state.cpp
@@ -274,6 +274,10 @@ void 
RuntimeState::get_unreported_errors(std::vector<std::string>* new_errors) {
     }
 }
 
+std::string RuntimeState::cancel_reason() const {
+    return _cancel_reason;
+}
+
 Status RuntimeState::set_mem_limit_exceeded(const std::string& msg) {
     {
         std::lock_guard<std::mutex> l(_process_status_lock);
diff --git a/be/src/runtime/runtime_state.h b/be/src/runtime/runtime_state.h
index 02425ed2e5c..1e7375cbb97 100644
--- a/be/src/runtime/runtime_state.h
+++ b/be/src/runtime/runtime_state.h
@@ -165,13 +165,22 @@ public:
     void get_unreported_errors(std::vector<std::string>* new_errors);
 
     bool is_cancelled() const { return _is_cancelled.load(); }
+    std::string cancel_reason() const;
     int codegen_level() const { return _query_options.codegen_level; }
-    void set_is_cancelled(bool v, std::string msg) {
-        _is_cancelled.store(v);
-        // Create a error status, so that we could print error stack, and
-        // we could know which path call cancel.
-        LOG_WARNING("Task {} is cancelled, msg: {}", 
print_id(_fragment_instance_id),
-                    Status::Error<ErrorCode::CANCELLED>(msg));
+    void set_is_cancelled(std::string msg) {
+        if (!_is_cancelled.exchange(true)) {
+            _cancel_reason = msg;
+            // Create a error status, so that we could print error stack, and
+            // we could know which path call cancel.
+            LOG(WARNING) << "Task is cancelled, query id: " << 
print_id(_query_id)
+                         << ", instance id: " << 
print_id(_fragment_instance_id)
+                         << ", st = " << 
Status::Error<ErrorCode::CANCELLED>(msg);
+        } else {
+            LOG(WARNING) << "Task is cancelled, query id: " << 
print_id(_query_id)
+                         << ", instance id: " << 
print_id(_fragment_instance_id)
+                         << ", original cancel msg: " << _cancel_reason
+                         << ", new cancel msg: " << 
Status::Error<ErrorCode::CANCELLED>(msg);
+        }
     }
 
     void set_backend_id(int64_t backend_id) { _backend_id = backend_id; }
@@ -486,6 +495,7 @@ private:
 
     // if true, execution should stop with a CANCELLED status
     std::atomic<bool> _is_cancelled;
+    std::string _cancel_reason;
 
     int _per_fragment_instance_idx;
     int _num_per_fragment_instances = 0;
diff --git a/be/src/vec/sink/vtablet_sink.cpp b/be/src/vec/sink/vtablet_sink.cpp
index 56e1829202c..a4e539b4bf6 100644
--- a/be/src/vec/sink/vtablet_sink.cpp
+++ b/be/src/vec/sink/vtablet_sink.cpp
@@ -904,6 +904,10 @@ Status VNodeChannel::close_wait(RuntimeState* state) {
     }
     _close_time_ms = UnixMillis() - _close_time_ms;
 
+    if (_cancelled || state->is_cancelled()) {
+        cancel(state->cancel_reason());
+    }
+
     if (_add_batches_finished) {
         _close_check();
         state->tablet_commit_infos().insert(state->tablet_commit_infos().end(),


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org
For additional commands, e-mail: commits-h...@doris.apache.org

Reply via email to