This is an automated email from the ASF dual-hosted git repository.

yiguolei pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/doris.git


The following commit(s) were added to refs/heads/master by this push:
     new 80c65f2ce0c [fix](memory) When Load ends, check memory tracker value 
returns is equal to 0 (#40016)
80c65f2ce0c is described below

commit 80c65f2ce0cfacf82f5b9715f8ac8ececcdbacb3
Author: Xinyi Zou <zouxiny...@gmail.com>
AuthorDate: Tue Sep 3 12:43:46 2024 +0800

    [fix](memory) When Load ends, check memory tracker value returns is equal 
to 0 (#40016)
    
    Check all memory is freed when Load is finished.
---
 be/src/pipeline/pipeline_fragment_context.cpp |  8 +++
 be/src/runtime/memory/mem_tracker_limiter.cpp | 71 ++++++++++++++++-----------
 be/src/runtime/memory/mem_tracker_limiter.h   |  2 +
 3 files changed, 52 insertions(+), 29 deletions(-)

diff --git a/be/src/pipeline/pipeline_fragment_context.cpp 
b/be/src/pipeline/pipeline_fragment_context.cpp
index 51cfd73e549..3bf54ed7ece 100644
--- a/be/src/pipeline/pipeline_fragment_context.cpp
+++ b/be/src/pipeline/pipeline_fragment_context.cpp
@@ -1031,6 +1031,10 @@ Status 
PipelineFragmentContext::_create_data_sink(ObjectPool* pool, const TDataS
     }
     case TDataSinkType::GROUP_COMMIT_BLOCK_SINK: {
         DCHECK(thrift_sink.__isset.olap_table_sink);
+#ifndef NDEBUG
+        DCHECK(state->get_query_ctx() != nullptr);
+        state->get_query_ctx()->query_mem_tracker->is_group_commit_load = true;
+#endif
         _sink.reset(
                 new GroupCommitBlockSinkOperatorX(next_sink_operator_id(), 
row_desc, output_exprs));
         break;
@@ -1177,6 +1181,10 @@ Status 
PipelineFragmentContext::_create_operator(ObjectPool* pool, const TPlanNo
         break;
     }
     case TPlanNodeType::GROUP_COMMIT_SCAN_NODE: {
+#ifndef NDEBUG
+        DCHECK(_query_ctx != nullptr);
+        _query_ctx->query_mem_tracker->is_group_commit_load = true;
+#endif
         op.reset(new GroupCommitOperatorX(pool, tnode, next_operator_id(), 
descs, _num_instances));
         RETURN_IF_ERROR(cur_pipe->add_operator(op));
         if (request.__isset.parallel_instances) {
diff --git a/be/src/runtime/memory/mem_tracker_limiter.cpp 
b/be/src/runtime/memory/mem_tracker_limiter.cpp
index cc695a6fdd5..6df577f8a50 100644
--- a/be/src/runtime/memory/mem_tracker_limiter.cpp
+++ b/be/src/runtime/memory/mem_tracker_limiter.cpp
@@ -114,6 +114,8 @@ MemTrackerLimiter::~MemTrackerLimiter() {
             "mem tracker not equal to 0 when mem tracker destruct, this 
usually means that "
             "memory tracking is inaccurate and SCOPED_ATTACH_TASK and "
             "SCOPED_SWITCH_THREAD_MEM_TRACKER_LIMITER are not used correctly. "
+            "If the log is truncated, search for `Address Sanitizer` in the 
be.INFO log to see "
+            "more information."
             "1. For query and load, memory leaks may have occurred, it is 
expected that the query "
             "mem tracker will be bound to the thread context using 
SCOPED_ATTACH_TASK and "
             "SCOPED_SWITCH_THREAD_MEM_TRACKER_LIMITER before all memory alloc 
and free. "
@@ -127,7 +129,7 @@ MemTrackerLimiter::~MemTrackerLimiter() {
     if (_consumption->current_value() != 0) {
         // TODO, expect mem tracker equal to 0 at the load/compaction/etc. 
task end.
 #ifndef NDEBUG
-        if (_type == Type::QUERY) {
+        if (_type == Type::QUERY || (_type == Type::LOAD && 
!is_group_commit_load)) {
             std::string err_msg =
                     fmt::format("mem tracker label: {}, consumption: {}, peak 
consumption: {}, {}.",
                                 label(), _consumption->current_value(), 
_consumption->peak_value(),
@@ -140,11 +142,11 @@ MemTrackerLimiter::~MemTrackerLimiter() {
         }
         _consumption->set(0);
 #ifndef NDEBUG
-    } else if (!_address_sanitizers.empty()) {
-        LOG(INFO) << "[Address Sanitizer] consumption is 0, but address 
sanitizers not empty. "
-                  << ", mem tracker label: " << _label
-                  << ", peak consumption: " << _consumption->peak_value()
-                  << print_address_sanitizers();
+    } else if (!_address_sanitizers.empty() && !is_group_commit_load) {
+        LOG(FATAL) << "[Address Sanitizer] consumption is 0, but address 
sanitizers not empty. "
+                   << ", mem tracker label: " << _label
+                   << ", peak consumption: " << _consumption->peak_value()
+                   << print_address_sanitizers();
 #endif
     }
     memory_memtrackerlimiter_cnt << -1;
@@ -152,17 +154,17 @@ MemTrackerLimiter::~MemTrackerLimiter() {
 
 #ifndef NDEBUG
 void MemTrackerLimiter::add_address_sanitizers(void* buf, size_t size) {
-    if (_type == Type::QUERY) {
+    if (_type == Type::QUERY || (_type == Type::LOAD && 
!is_group_commit_load)) {
         std::lock_guard<std::mutex> l(_address_sanitizers_mtx);
         auto it = _address_sanitizers.find(buf);
         if (it != _address_sanitizers.end()) {
-            LOG(INFO) << "[Address Sanitizer] memory buf repeat add, mem 
tracker label: " << _label
-                      << ", consumption: " << _consumption->current_value()
-                      << ", peak consumption: " << _consumption->peak_value() 
<< ", buf: " << buf
-                      << ", size: " << size << ", old buf: " << it->first
-                      << ", old size: " << it->second.size
-                      << ", new stack_trace: " << get_stack_trace(1, 
"DISABLED")
-                      << ", old stack_trace: " << it->second.stack_trace;
+            _error_address_sanitizers.emplace_back(
+                    fmt::format("[Address Sanitizer] memory buf repeat add, 
mem tracker label: {}, "
+                                "consumption: {}, peak consumption: {}, buf: 
{}, size: {}, old "
+                                "buf: {}, old size: {}, new stack_trace: {}, 
old stack_trace: {}.",
+                                _label, _consumption->current_value(), 
_consumption->peak_value(),
+                                buf, size, it->first, it->second.size,
+                                get_stack_trace(1, "FULL_WITH_INLINE"), 
it->second.stack_trace));
         }
 
         // if alignment not equal to 0, maybe usable_size > size.
@@ -174,26 +176,26 @@ void MemTrackerLimiter::add_address_sanitizers(void* buf, 
size_t size) {
 }
 
 void MemTrackerLimiter::remove_address_sanitizers(void* buf, size_t size) {
-    if (_type == Type::QUERY) {
+    if (_type == Type::QUERY || (_type == Type::LOAD && 
!is_group_commit_load)) {
         std::lock_guard<std::mutex> l(_address_sanitizers_mtx);
         auto it = _address_sanitizers.find(buf);
         if (it != _address_sanitizers.end()) {
             if (it->second.size != size) {
-                LOG(INFO) << "[Address Sanitizer] free memory buf size 
inaccurate, mem tracker "
-                             "label: "
-                          << _label << ", consumption: " << 
_consumption->current_value()
-                          << ", peak consumption: " << 
_consumption->peak_value()
-                          << ", buf: " << buf << ", size: " << size << ", old 
buf: " << it->first
-                          << ", old size: " << it->second.size
-                          << ", new stack_trace: " << get_stack_trace(1, 
"DISABLED")
-                          << ", old stack_trace: " << it->second.stack_trace;
+                _error_address_sanitizers.emplace_back(fmt::format(
+                        "[Address Sanitizer] free memory buf size inaccurate, 
mem tracker label: "
+                        "{}, consumption: {}, peak consumption: {}, buf: {}, 
size: {}, old buf: "
+                        "{}, old size: {}, new stack_trace: {}, old 
stack_trace: {}.",
+                        _label, _consumption->current_value(), 
_consumption->peak_value(), buf,
+                        size, it->first, it->second.size, get_stack_trace(1, 
"FULL_WITH_INLINE"),
+                        it->second.stack_trace));
             }
             _address_sanitizers.erase(buf);
         } else {
-            LOG(INFO) << "[Address Sanitizer] memory buf not exist, mem 
tracker label: " << _label
-                      << ", consumption: " << _consumption->current_value()
-                      << ", peak consumption: " << _consumption->peak_value() 
<< ", buf: " << buf
-                      << ", size: " << size << ", stack_trace: " << 
get_stack_trace(1, "DISABLED");
+            _error_address_sanitizers.emplace_back(fmt::format(
+                    "[Address Sanitizer] memory buf not exist, mem tracker 
label: {}, consumption: "
+                    "{}, peak consumption: {}, buf: {}, size: {}, stack_trace: 
{}.",
+                    _label, _consumption->current_value(), 
_consumption->peak_value(), buf, size,
+                    get_stack_trace(1, "FULL_WITH_INLINE")));
         }
     }
 }
@@ -201,9 +203,20 @@ void MemTrackerLimiter::remove_address_sanitizers(void* 
buf, size_t size) {
 std::string MemTrackerLimiter::print_address_sanitizers() {
     std::lock_guard<std::mutex> l(_address_sanitizers_mtx);
     std::string detail = "[Address Sanitizer]:";
+    detail += "\n memory not be freed:";
     for (const auto& it : _address_sanitizers) {
-        detail += fmt::format("\n    {}, size {}, strack trace: {}", it.first, 
it.second.size,
-                              it.second.stack_trace);
+        auto msg = fmt::format(
+                "\n    [Address Sanitizer] buf not be freed, mem tracker 
label: {}, consumption: "
+                "{}, peak consumption: {}, buf: {}, size {}, strack trace: {}",
+                _label, _consumption->current_value(), 
_consumption->peak_value(), it.first,
+                it.second.size, it.second.stack_trace);
+        LOG(INFO) << msg;
+        detail += msg;
+    }
+    detail += "\n incorrect memory alloc and free:";
+    for (const auto& err_msg : _error_address_sanitizers) {
+        LOG(INFO) << err_msg;
+        detail += fmt::format("\n    {}", err_msg);
     }
     return detail;
 }
diff --git a/be/src/runtime/memory/mem_tracker_limiter.h 
b/be/src/runtime/memory/mem_tracker_limiter.h
index e5c5cb1bc03..344f3dc92b6 100644
--- a/be/src/runtime/memory/mem_tracker_limiter.h
+++ b/be/src/runtime/memory/mem_tracker_limiter.h
@@ -209,6 +209,7 @@ public:
     void add_address_sanitizers(void* buf, size_t size);
     void remove_address_sanitizers(void* buf, size_t size);
     std::string print_address_sanitizers();
+    bool is_group_commit_load {false};
 #endif
 
     std::string debug_string() override {
@@ -260,6 +261,7 @@ private:
 
     std::mutex _address_sanitizers_mtx;
     std::unordered_map<void*, AddressSanitizer> _address_sanitizers;
+    std::vector<std::string> _error_address_sanitizers;
 #endif
 };
 


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org
For additional commands, e-mail: commits-h...@doris.apache.org

Reply via email to