This is an automated email from the ASF dual-hosted git repository. yiguolei pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/master by this push: new 80c65f2ce0c [fix](memory) When Load ends, check memory tracker value returns is equal to 0 (#40016) 80c65f2ce0c is described below commit 80c65f2ce0cfacf82f5b9715f8ac8ececcdbacb3 Author: Xinyi Zou <zouxiny...@gmail.com> AuthorDate: Tue Sep 3 12:43:46 2024 +0800 [fix](memory) When Load ends, check memory tracker value returns is equal to 0 (#40016) Check all memory is freed when Load is finished. --- be/src/pipeline/pipeline_fragment_context.cpp | 8 +++ be/src/runtime/memory/mem_tracker_limiter.cpp | 71 ++++++++++++++++----------- be/src/runtime/memory/mem_tracker_limiter.h | 2 + 3 files changed, 52 insertions(+), 29 deletions(-) diff --git a/be/src/pipeline/pipeline_fragment_context.cpp b/be/src/pipeline/pipeline_fragment_context.cpp index 51cfd73e549..3bf54ed7ece 100644 --- a/be/src/pipeline/pipeline_fragment_context.cpp +++ b/be/src/pipeline/pipeline_fragment_context.cpp @@ -1031,6 +1031,10 @@ Status PipelineFragmentContext::_create_data_sink(ObjectPool* pool, const TDataS } case TDataSinkType::GROUP_COMMIT_BLOCK_SINK: { DCHECK(thrift_sink.__isset.olap_table_sink); +#ifndef NDEBUG + DCHECK(state->get_query_ctx() != nullptr); + state->get_query_ctx()->query_mem_tracker->is_group_commit_load = true; +#endif _sink.reset( new GroupCommitBlockSinkOperatorX(next_sink_operator_id(), row_desc, output_exprs)); break; @@ -1177,6 +1181,10 @@ Status PipelineFragmentContext::_create_operator(ObjectPool* pool, const TPlanNo break; } case TPlanNodeType::GROUP_COMMIT_SCAN_NODE: { +#ifndef NDEBUG + DCHECK(_query_ctx != nullptr); + _query_ctx->query_mem_tracker->is_group_commit_load = true; +#endif op.reset(new GroupCommitOperatorX(pool, tnode, next_operator_id(), descs, _num_instances)); RETURN_IF_ERROR(cur_pipe->add_operator(op)); if (request.__isset.parallel_instances) { diff --git a/be/src/runtime/memory/mem_tracker_limiter.cpp b/be/src/runtime/memory/mem_tracker_limiter.cpp index cc695a6fdd5..6df577f8a50 100644 --- a/be/src/runtime/memory/mem_tracker_limiter.cpp +++ b/be/src/runtime/memory/mem_tracker_limiter.cpp @@ -114,6 +114,8 @@ MemTrackerLimiter::~MemTrackerLimiter() { "mem tracker not equal to 0 when mem tracker destruct, this usually means that " "memory tracking is inaccurate and SCOPED_ATTACH_TASK and " "SCOPED_SWITCH_THREAD_MEM_TRACKER_LIMITER are not used correctly. " + "If the log is truncated, search for `Address Sanitizer` in the be.INFO log to see " + "more information." "1. For query and load, memory leaks may have occurred, it is expected that the query " "mem tracker will be bound to the thread context using SCOPED_ATTACH_TASK and " "SCOPED_SWITCH_THREAD_MEM_TRACKER_LIMITER before all memory alloc and free. " @@ -127,7 +129,7 @@ MemTrackerLimiter::~MemTrackerLimiter() { if (_consumption->current_value() != 0) { // TODO, expect mem tracker equal to 0 at the load/compaction/etc. task end. #ifndef NDEBUG - if (_type == Type::QUERY) { + if (_type == Type::QUERY || (_type == Type::LOAD && !is_group_commit_load)) { std::string err_msg = fmt::format("mem tracker label: {}, consumption: {}, peak consumption: {}, {}.", label(), _consumption->current_value(), _consumption->peak_value(), @@ -140,11 +142,11 @@ MemTrackerLimiter::~MemTrackerLimiter() { } _consumption->set(0); #ifndef NDEBUG - } else if (!_address_sanitizers.empty()) { - LOG(INFO) << "[Address Sanitizer] consumption is 0, but address sanitizers not empty. " - << ", mem tracker label: " << _label - << ", peak consumption: " << _consumption->peak_value() - << print_address_sanitizers(); + } else if (!_address_sanitizers.empty() && !is_group_commit_load) { + LOG(FATAL) << "[Address Sanitizer] consumption is 0, but address sanitizers not empty. " + << ", mem tracker label: " << _label + << ", peak consumption: " << _consumption->peak_value() + << print_address_sanitizers(); #endif } memory_memtrackerlimiter_cnt << -1; @@ -152,17 +154,17 @@ MemTrackerLimiter::~MemTrackerLimiter() { #ifndef NDEBUG void MemTrackerLimiter::add_address_sanitizers(void* buf, size_t size) { - if (_type == Type::QUERY) { + if (_type == Type::QUERY || (_type == Type::LOAD && !is_group_commit_load)) { std::lock_guard<std::mutex> l(_address_sanitizers_mtx); auto it = _address_sanitizers.find(buf); if (it != _address_sanitizers.end()) { - LOG(INFO) << "[Address Sanitizer] memory buf repeat add, mem tracker label: " << _label - << ", consumption: " << _consumption->current_value() - << ", peak consumption: " << _consumption->peak_value() << ", buf: " << buf - << ", size: " << size << ", old buf: " << it->first - << ", old size: " << it->second.size - << ", new stack_trace: " << get_stack_trace(1, "DISABLED") - << ", old stack_trace: " << it->second.stack_trace; + _error_address_sanitizers.emplace_back( + fmt::format("[Address Sanitizer] memory buf repeat add, mem tracker label: {}, " + "consumption: {}, peak consumption: {}, buf: {}, size: {}, old " + "buf: {}, old size: {}, new stack_trace: {}, old stack_trace: {}.", + _label, _consumption->current_value(), _consumption->peak_value(), + buf, size, it->first, it->second.size, + get_stack_trace(1, "FULL_WITH_INLINE"), it->second.stack_trace)); } // if alignment not equal to 0, maybe usable_size > size. @@ -174,26 +176,26 @@ void MemTrackerLimiter::add_address_sanitizers(void* buf, size_t size) { } void MemTrackerLimiter::remove_address_sanitizers(void* buf, size_t size) { - if (_type == Type::QUERY) { + if (_type == Type::QUERY || (_type == Type::LOAD && !is_group_commit_load)) { std::lock_guard<std::mutex> l(_address_sanitizers_mtx); auto it = _address_sanitizers.find(buf); if (it != _address_sanitizers.end()) { if (it->second.size != size) { - LOG(INFO) << "[Address Sanitizer] free memory buf size inaccurate, mem tracker " - "label: " - << _label << ", consumption: " << _consumption->current_value() - << ", peak consumption: " << _consumption->peak_value() - << ", buf: " << buf << ", size: " << size << ", old buf: " << it->first - << ", old size: " << it->second.size - << ", new stack_trace: " << get_stack_trace(1, "DISABLED") - << ", old stack_trace: " << it->second.stack_trace; + _error_address_sanitizers.emplace_back(fmt::format( + "[Address Sanitizer] free memory buf size inaccurate, mem tracker label: " + "{}, consumption: {}, peak consumption: {}, buf: {}, size: {}, old buf: " + "{}, old size: {}, new stack_trace: {}, old stack_trace: {}.", + _label, _consumption->current_value(), _consumption->peak_value(), buf, + size, it->first, it->second.size, get_stack_trace(1, "FULL_WITH_INLINE"), + it->second.stack_trace)); } _address_sanitizers.erase(buf); } else { - LOG(INFO) << "[Address Sanitizer] memory buf not exist, mem tracker label: " << _label - << ", consumption: " << _consumption->current_value() - << ", peak consumption: " << _consumption->peak_value() << ", buf: " << buf - << ", size: " << size << ", stack_trace: " << get_stack_trace(1, "DISABLED"); + _error_address_sanitizers.emplace_back(fmt::format( + "[Address Sanitizer] memory buf not exist, mem tracker label: {}, consumption: " + "{}, peak consumption: {}, buf: {}, size: {}, stack_trace: {}.", + _label, _consumption->current_value(), _consumption->peak_value(), buf, size, + get_stack_trace(1, "FULL_WITH_INLINE"))); } } } @@ -201,9 +203,20 @@ void MemTrackerLimiter::remove_address_sanitizers(void* buf, size_t size) { std::string MemTrackerLimiter::print_address_sanitizers() { std::lock_guard<std::mutex> l(_address_sanitizers_mtx); std::string detail = "[Address Sanitizer]:"; + detail += "\n memory not be freed:"; for (const auto& it : _address_sanitizers) { - detail += fmt::format("\n {}, size {}, strack trace: {}", it.first, it.second.size, - it.second.stack_trace); + auto msg = fmt::format( + "\n [Address Sanitizer] buf not be freed, mem tracker label: {}, consumption: " + "{}, peak consumption: {}, buf: {}, size {}, strack trace: {}", + _label, _consumption->current_value(), _consumption->peak_value(), it.first, + it.second.size, it.second.stack_trace); + LOG(INFO) << msg; + detail += msg; + } + detail += "\n incorrect memory alloc and free:"; + for (const auto& err_msg : _error_address_sanitizers) { + LOG(INFO) << err_msg; + detail += fmt::format("\n {}", err_msg); } return detail; } diff --git a/be/src/runtime/memory/mem_tracker_limiter.h b/be/src/runtime/memory/mem_tracker_limiter.h index e5c5cb1bc03..344f3dc92b6 100644 --- a/be/src/runtime/memory/mem_tracker_limiter.h +++ b/be/src/runtime/memory/mem_tracker_limiter.h @@ -209,6 +209,7 @@ public: void add_address_sanitizers(void* buf, size_t size); void remove_address_sanitizers(void* buf, size_t size); std::string print_address_sanitizers(); + bool is_group_commit_load {false}; #endif std::string debug_string() override { @@ -260,6 +261,7 @@ private: std::mutex _address_sanitizers_mtx; std::unordered_map<void*, AddressSanitizer> _address_sanitizers; + std::vector<std::string> _error_address_sanitizers; #endif }; --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org For additional commands, e-mail: commits-h...@doris.apache.org