This is an automated email from the ASF dual-hosted git repository. yiguolei pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/master by this push: new 72220440dc [fix](memtracker) Remove mem tracker record mem pool actual memory usage #12954 72220440dc is described below commit 72220440dc364fea4ca7f48ebe8262374d18bd70 Author: Xinyi Zou <zouxiny...@gmail.com> AuthorDate: Mon Sep 26 12:54:06 2022 +0800 [fix](memtracker) Remove mem tracker record mem pool actual memory usage #12954 In order to avoid different mem tracker consumption values of multiple queries/loads, and the difference between the virtual memory of alloc and the physical memory actually increased by the process. The memory alloc in PODArray and mempool will not be recorded in the query/load mem tracker immediately, but will be gradually recorded in the mem tracker during the memory usage. But mem pool allocates memory from chunk allocator. If this chunk is used after the second time, it may have used physical memory. The above mechanism will cause the load channel memory statistics to be less than the actual value. --- be/src/runtime/exec_env_init.cpp | 1 - be/src/runtime/mem_pool.cpp | 11 ----------- be/src/runtime/mem_pool.h | 13 ------------- be/src/runtime/memory/mem_tracker.cpp | 7 ++----- be/src/runtime/memory/mem_tracker_limiter.cpp | 8 ++------ be/src/runtime/memory/mem_tracker_limiter.h | 17 +++++++++++------ 6 files changed, 15 insertions(+), 42 deletions(-) diff --git a/be/src/runtime/exec_env_init.cpp b/be/src/runtime/exec_env_init.cpp index c13c059d5b..1afb147b05 100644 --- a/be/src/runtime/exec_env_init.cpp +++ b/be/src/runtime/exec_env_init.cpp @@ -216,7 +216,6 @@ Status ExecEnv::_init_mem_tracker() { init_hook(); } #endif - _allocator_cache_mem_tracker = std::make_shared<MemTracker>("Tc/JemallocAllocatorCache"); _query_pool_mem_tracker = std::make_shared<MemTrackerLimiter>(-1, "QueryPool", _process_mem_tracker); diff --git a/be/src/runtime/mem_pool.cpp b/be/src/runtime/mem_pool.cpp index 9064f58818..c2b709162c 100644 --- a/be/src/runtime/mem_pool.cpp +++ b/be/src/runtime/mem_pool.cpp @@ -45,7 +45,6 @@ MemPool::MemPool(MemTracker* mem_tracker) next_chunk_size_(INITIAL_CHUNK_SIZE), total_allocated_bytes_(0), total_reserved_bytes_(0), - peak_allocated_bytes_(0), _mem_tracker(mem_tracker) {} MemPool::MemPool() @@ -53,7 +52,6 @@ MemPool::MemPool() next_chunk_size_(INITIAL_CHUNK_SIZE), total_allocated_bytes_(0), total_reserved_bytes_(0), - peak_allocated_bytes_(0), _mem_tracker(nullptr) {} MemPool::ChunkInfo::ChunkInfo(const Chunk& chunk_) : chunk(chunk_), allocated_bytes(0) { @@ -66,8 +64,6 @@ MemPool::~MemPool() { total_bytes_released += chunk.chunk.size; ChunkAllocator::instance()->free(chunk.chunk); } - THREAD_MEM_TRACKER_TRANSFER_FROM(total_bytes_released - peak_allocated_bytes_, - ExecEnv::GetInstance()->orphan_mem_tracker_raw()); if (_mem_tracker) _mem_tracker->release(total_bytes_released); DorisMetrics::instance()->memory_pool_bytes_total->increment(-total_bytes_released); } @@ -88,15 +84,12 @@ void MemPool::free_all() { total_bytes_released += chunk.chunk.size; ChunkAllocator::instance()->free(chunk.chunk); } - THREAD_MEM_TRACKER_TRANSFER_FROM(total_bytes_released - peak_allocated_bytes_, - ExecEnv::GetInstance()->orphan_mem_tracker_raw()); if (_mem_tracker) _mem_tracker->release(total_bytes_released); chunks_.clear(); next_chunk_size_ = INITIAL_CHUNK_SIZE; current_chunk_idx_ = -1; total_allocated_bytes_ = 0; total_reserved_bytes_ = 0; - peak_allocated_bytes_ = 0; DorisMetrics::instance()->memory_pool_bytes_total->increment(-total_bytes_released); } @@ -150,7 +143,6 @@ Status MemPool::find_chunk(size_t min_size, bool check_limits) { // Allocate a new chunk. Return early if allocate fails. Chunk chunk; RETURN_IF_ERROR(ChunkAllocator::instance()->allocate(chunk_size, &chunk)); - THREAD_MEM_TRACKER_TRANSFER_TO(chunk_size, ExecEnv::GetInstance()->orphan_mem_tracker_raw()); if (_mem_tracker) _mem_tracker->consume(chunk_size); ASAN_POISON_MEMORY_REGION(chunk.data, chunk_size); // Put it before the first free chunk. If no free chunks, it goes at the end. @@ -221,8 +213,6 @@ void MemPool::acquire_data(MemPool* src, bool keep_current) { src->total_allocated_bytes_ = 0; } - reset_peak(); - if (!keep_current) src->free_all(); DCHECK(src->check_integrity(false)); DCHECK(check_integrity(false)); @@ -243,7 +233,6 @@ void MemPool::exchange_data(MemPool* other) { std::swap(next_chunk_size_, other->next_chunk_size_); std::swap(total_allocated_bytes_, other->total_allocated_bytes_); std::swap(total_reserved_bytes_, other->total_reserved_bytes_); - std::swap(peak_allocated_bytes_, other->peak_allocated_bytes_); std::swap(chunks_, other->chunks_); } diff --git a/be/src/runtime/mem_pool.h b/be/src/runtime/mem_pool.h index bf388dde77..41240ab375 100644 --- a/be/src/runtime/mem_pool.h +++ b/be/src/runtime/mem_pool.h @@ -162,7 +162,6 @@ public: int64_t total_allocated_bytes() const { return total_allocated_bytes_; } int64_t total_reserved_bytes() const { return total_reserved_bytes_; } - int64_t peak_allocated_bytes() const { return peak_allocated_bytes_; } MemTracker* mem_tracker() { return _mem_tracker; } @@ -211,14 +210,6 @@ private: /// data. Otherwise the current chunk can be either empty or full. bool check_integrity(bool check_current_chunk_empty); - void reset_peak() { - if (total_allocated_bytes_ - peak_allocated_bytes_ > 65536) { - THREAD_MEM_TRACKER_TRANSFER_FROM(total_allocated_bytes_ - peak_allocated_bytes_, - ExecEnv::GetInstance()->orphan_mem_tracker_raw()); - peak_allocated_bytes_ = total_allocated_bytes_; - } - } - /// Return offset to unoccupied space in current chunk. int64_t get_free_offset() const { if (current_chunk_idx_ == -1) return 0; @@ -250,7 +241,6 @@ private: DCHECK_LE(info.allocated_bytes + size, info.chunk.size); info.allocated_bytes += padding + size; total_allocated_bytes_ += padding + size; - reset_peak(); DCHECK_LE(current_chunk_idx_, chunks_.size() - 1); return result; } @@ -308,9 +298,6 @@ private: /// sum of all bytes allocated in chunks_ int64_t total_reserved_bytes_; - /// Maximum number of bytes allocated from this pool at one time. - int64_t peak_allocated_bytes_; - std::vector<ChunkInfo> chunks_; /// The current and peak memory footprint of this pool. This is different from diff --git a/be/src/runtime/memory/mem_tracker.cpp b/be/src/runtime/memory/mem_tracker.cpp index 5d53382c0a..9c0b006281 100644 --- a/be/src/runtime/memory/mem_tracker.cpp +++ b/be/src/runtime/memory/mem_tracker.cpp @@ -61,11 +61,8 @@ MemTracker::MemTracker(const std::string& label, RuntimeProfile* profile) { DCHECK(thread_context()->_thread_mem_tracker_mgr->limiter_mem_tracker_raw() != nullptr); MemTrackerLimiter* parent = thread_context()->_thread_mem_tracker_mgr->limiter_mem_tracker_raw(); - _label = fmt::format("[Observer] {} | {}", label, - parent->label() == "Orphan" ? "Process" : parent->label()); - _bind_group_num = parent->label() == "Orphan" - ? ExecEnv::GetInstance()->process_mem_tracker()->group_num() - : parent->group_num(); + _label = fmt::format("[Observer] {} | {}", label, parent->label()); + _bind_group_num = parent->group_num(); { std::lock_guard<std::mutex> l(mem_tracker_pool[_bind_group_num].group_lock); _tracker_group_it = mem_tracker_pool[_bind_group_num].trackers.insert( diff --git a/be/src/runtime/memory/mem_tracker_limiter.cpp b/be/src/runtime/memory/mem_tracker_limiter.cpp index 0959725d2b..1ca75b2af4 100644 --- a/be/src/runtime/memory/mem_tracker_limiter.cpp +++ b/be/src/runtime/memory/mem_tracker_limiter.cpp @@ -78,7 +78,7 @@ MemTrackerLimiter::~MemTrackerLimiter() { DCHECK(remain_child_count() == 0 || _label == "Process"); // In order to ensure `consumption of all limiter trackers` + `orphan tracker consumption` = `process tracker consumption` // in real time. Merge its consumption into orphan when parent is process, to avoid repetition. - if ((_parent && _parent->label() == "Process")) { + if (_parent && _parent->label() == "Process") { ExecEnv::GetInstance()->orphan_mem_tracker_raw()->cache_consume_local( _consumption->current_value()); } @@ -89,11 +89,7 @@ MemTrackerLimiter::~MemTrackerLimiter() { _all_ancestors.clear(); _all_ancestors.push_back(ExecEnv::GetInstance()->orphan_mem_tracker_raw()); } - for (auto& tracker : _all_ancestors) { - if (tracker->label() != "Process") { - tracker->_consumption->add(_untracked_mem); - } - } + consume_local(_untracked_mem); if (_parent) { std::lock_guard<std::mutex> l(_parent->_child_tracker_limiter_lock); if (_child_tracker_it != _parent->_child_tracker_limiters.end()) { diff --git a/be/src/runtime/memory/mem_tracker_limiter.h b/be/src/runtime/memory/mem_tracker_limiter.h index e68d5b6551..0f882343d3 100644 --- a/be/src/runtime/memory/mem_tracker_limiter.h +++ b/be/src/runtime/memory/mem_tracker_limiter.h @@ -181,6 +181,8 @@ private: WARN_UNUSED_RESULT bool try_consume(int64_t bytes, std::string& failed_msg); + void consume_local(int64_t bytes); + // When the accumulated untracked memory value exceeds the upper limit, // the current value is returned and set to 0. // Thread safety. @@ -273,15 +275,18 @@ inline int64_t MemTrackerLimiter::add_untracked_mem(int64_t bytes) { return 0; } +inline void MemTrackerLimiter::consume_local(int64_t bytes) { + if (bytes == 0) return; + for (auto& tracker : _all_ancestors) { + if (tracker->label() == "Process") return; + tracker->_consumption->add(bytes); + } +} + inline void MemTrackerLimiter::cache_consume_local(int64_t bytes) { if (bytes == 0) return; int64_t consume_bytes = add_untracked_mem(bytes); - if (consume_bytes != 0) { - for (auto& tracker : _all_ancestors) { - if (tracker->label() == "Process") return; - tracker->_consumption->add(consume_bytes); - } - } + consume_local(consume_bytes); } inline bool MemTrackerLimiter::try_consume(int64_t bytes, std::string& failed_msg) { --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org For additional commands, e-mail: commits-h...@doris.apache.org