This is an automated email from the ASF dual-hosted git repository. dataroaring pushed a commit to branch branch-1.1-lts in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/branch-1.1-lts by this push: new efe29795d5 [branch-1.1-lts](cherry-pick) Remove mem tracker record mem pool actual memory usage (#12974) efe29795d5 is described below commit efe29795d53968e5bc219e5f647279a95a4dc72e Author: Xinyi Zou <zouxiny...@gmail.com> AuthorDate: Mon Sep 26 17:14:05 2022 +0800 [branch-1.1-lts](cherry-pick) Remove mem tracker record mem pool actual memory usage (#12974) --- be/src/runtime/mem_pool.cpp | 15 +-------------- be/src/runtime/mem_pool.h | 14 -------------- be/src/runtime/memory/mem_tracker.cpp | 7 ++----- be/src/runtime/memory/mem_tracker_limiter.cpp | 8 ++------ be/src/runtime/memory/mem_tracker_limiter.h | 17 +++++++++++------ 5 files changed, 16 insertions(+), 45 deletions(-) diff --git a/be/src/runtime/mem_pool.cpp b/be/src/runtime/mem_pool.cpp index 21b475400e..050614eb5f 100644 --- a/be/src/runtime/mem_pool.cpp +++ b/be/src/runtime/mem_pool.cpp @@ -41,13 +41,11 @@ MemPool::ChunkInfo::ChunkInfo(const Chunk& chunk_) : chunk(chunk_), allocated_by DorisMetrics::instance()->memory_pool_bytes_total->increment(chunk.size); } - MemPool::MemPool(const std::string& label) : current_chunk_idx_(-1), next_chunk_size_(INITIAL_CHUNK_SIZE), total_allocated_bytes_(0), - total_reserved_bytes_(0), - peak_allocated_bytes_(0) { + total_reserved_bytes_(0) { mem_tracker_own_ = MemTracker::CreateTracker(-1, label + ":MemPool"); mem_tracker_ = mem_tracker_own_.get(); } @@ -59,9 +57,6 @@ MemPool::~MemPool() { ChunkAllocator::instance()->free(chunk.chunk); } mem_tracker_->Release(total_bytes_released); - THREAD_MEM_TRACKER_TRANSFER_FROM(total_bytes_released - peak_allocated_bytes_, - ExecEnv::GetInstance()->orphan_mem_tracker_raw()); - DorisMetrics::instance()->memory_pool_bytes_total->increment(-total_bytes_released); } @@ -81,15 +76,11 @@ void MemPool::free_all() { total_bytes_released += chunk.chunk.size; ChunkAllocator::instance()->free(chunk.chunk); } - THREAD_MEM_TRACKER_TRANSFER_FROM(total_bytes_released - peak_allocated_bytes_, - ExecEnv::GetInstance()->orphan_mem_tracker_raw()); - chunks_.clear(); next_chunk_size_ = INITIAL_CHUNK_SIZE; current_chunk_idx_ = -1; total_allocated_bytes_ = 0; total_reserved_bytes_ = 0; - peak_allocated_bytes_ = 0; mem_tracker_->Release(total_bytes_released); DorisMetrics::instance()->memory_pool_bytes_total->increment(-total_bytes_released); @@ -148,7 +139,6 @@ bool MemPool::find_chunk(size_t min_size, bool check_limits) { mem_tracker_->Release(chunk_size); return false; } - THREAD_MEM_TRACKER_TRANSFER_TO(chunk_size, ExecEnv::GetInstance()->orphan_mem_tracker_raw()); ASAN_POISON_MEMORY_REGION(chunk.data, chunk_size); // Put it before the first free chunk. If no free chunks, it goes at the end. if (first_free_idx == static_cast<int>(chunks_.size())) { @@ -214,8 +204,6 @@ void MemPool::acquire_data(MemPool* src, bool keep_current) { src->total_allocated_bytes_ = 0; } - reset_peak(); - if (!keep_current) src->free_all(); DCHECK(src->check_integrity(false)); DCHECK(check_integrity(false)); @@ -228,7 +216,6 @@ void MemPool::exchange_data(MemPool* other) { std::swap(next_chunk_size_, other->next_chunk_size_); std::swap(total_allocated_bytes_, other->total_allocated_bytes_); std::swap(total_reserved_bytes_, other->total_reserved_bytes_); - std::swap(peak_allocated_bytes_, other->peak_allocated_bytes_); std::swap(chunks_, other->chunks_); // update MemTracker diff --git a/be/src/runtime/mem_pool.h b/be/src/runtime/mem_pool.h index 8eee3334ce..11853ebfc5 100644 --- a/be/src/runtime/mem_pool.h +++ b/be/src/runtime/mem_pool.h @@ -96,7 +96,6 @@ public: next_chunk_size_(INITIAL_CHUNK_SIZE), total_allocated_bytes_(0), total_reserved_bytes_(0), - peak_allocated_bytes_(0), mem_tracker_(mem_tracker) { DCHECK(mem_tracker != nullptr); } @@ -161,7 +160,6 @@ public: int64_t total_allocated_bytes() const { return total_allocated_bytes_; } int64_t total_reserved_bytes() const { return total_reserved_bytes_; } - int64_t peak_allocated_bytes() const { return peak_allocated_bytes_; } MemTracker* mem_tracker() { return mem_tracker_; } @@ -207,14 +205,6 @@ private: /// data. Otherwise the current chunk can be either empty or full. bool check_integrity(bool check_current_chunk_empty); - void reset_peak() { - if (total_allocated_bytes_ - peak_allocated_bytes_ > 65536) { - THREAD_MEM_TRACKER_TRANSFER_FROM(total_allocated_bytes_ - peak_allocated_bytes_, - ExecEnv::GetInstance()->orphan_mem_tracker_raw()); - peak_allocated_bytes_ = total_allocated_bytes_; - } - } - /// Return offset to unoccupied space in current chunk. int64_t get_free_offset() const { if (current_chunk_idx_ == -1) return 0; @@ -246,7 +236,6 @@ private: DCHECK_LE(info.allocated_bytes + size, info.chunk.size); info.allocated_bytes += padding + size; total_allocated_bytes_ += padding + size; - reset_peak(); DCHECK_LE(current_chunk_idx_, chunks_.size() - 1); return result; } @@ -306,9 +295,6 @@ private: /// sum of all bytes allocated in chunks_ int64_t total_reserved_bytes_; - /// Maximum number of bytes allocated from this pool at one time. - int64_t peak_allocated_bytes_; - std::vector<ChunkInfo> chunks_; /// The current and peak memory footprint of this pool. This is different from diff --git a/be/src/runtime/memory/mem_tracker.cpp b/be/src/runtime/memory/mem_tracker.cpp index bae8474cf4..31d40563d4 100644 --- a/be/src/runtime/memory/mem_tracker.cpp +++ b/be/src/runtime/memory/mem_tracker.cpp @@ -61,11 +61,8 @@ NewMemTracker::NewMemTracker(const std::string& label, RuntimeProfile* profile) DCHECK(thread_context()->_thread_mem_tracker_mgr->limiter_mem_tracker_raw() != nullptr); MemTrackerLimiter* parent = thread_context()->_thread_mem_tracker_mgr->limiter_mem_tracker_raw(); - _label = fmt::format("[Observer] {} | {}", label, - parent->label() == "Orphan" ? "Process" : parent->label()); - _bind_group_num = parent->label() == "Orphan" - ? ExecEnv::GetInstance()->new_process_mem_tracker()->group_num() - : parent->group_num(); + _label = fmt::format("[Observer] {} | {}", label, parent->label()); + _bind_group_num = parent->group_num(); { std::lock_guard<std::mutex> l(mem_tracker_pool[_bind_group_num].group_lock); _tracker_group_it = mem_tracker_pool[_bind_group_num].trackers.insert( diff --git a/be/src/runtime/memory/mem_tracker_limiter.cpp b/be/src/runtime/memory/mem_tracker_limiter.cpp index aa2f207b16..a8e6315f74 100644 --- a/be/src/runtime/memory/mem_tracker_limiter.cpp +++ b/be/src/runtime/memory/mem_tracker_limiter.cpp @@ -77,7 +77,7 @@ MemTrackerLimiter::~MemTrackerLimiter() { DCHECK(remain_child_count() == 0 || _label == "Process"); // In order to ensure `consumption of all limiter trackers` + `orphan tracker consumption` = `process tracker consumption` // in real time. Merge its consumption into orphan when parent is process, to avoid repetition. - if ((_parent && _parent->label() == "Process")) { + if (_parent && _parent->label() == "Process") { ExecEnv::GetInstance()->orphan_mem_tracker_raw()->cache_consume_local( _consumption->current_value()); } @@ -88,11 +88,7 @@ MemTrackerLimiter::~MemTrackerLimiter() { _all_ancestors.clear(); _all_ancestors.push_back(ExecEnv::GetInstance()->orphan_mem_tracker_raw()); } - for (auto& tracker : _all_ancestors) { - if (tracker->label() != "Process") { - tracker->_consumption->add(_untracked_mem); - } - } + consume_local(_untracked_mem); if (_parent) { std::lock_guard<std::mutex> l(_parent->_child_tracker_limiter_lock); if (_child_tracker_it != _parent->_child_tracker_limiters.end()) { diff --git a/be/src/runtime/memory/mem_tracker_limiter.h b/be/src/runtime/memory/mem_tracker_limiter.h index 09b42cb7e6..447dfe4e35 100644 --- a/be/src/runtime/memory/mem_tracker_limiter.h +++ b/be/src/runtime/memory/mem_tracker_limiter.h @@ -182,6 +182,8 @@ private: WARN_UNUSED_RESULT bool try_consume(int64_t bytes, std::string& failed_msg); + void consume_local(int64_t bytes); + // When the accumulated untracked memory value exceeds the upper limit, // the current value is returned and set to 0. // Thread safety. @@ -274,15 +276,18 @@ inline int64_t MemTrackerLimiter::add_untracked_mem(int64_t bytes) { return 0; } +inline void MemTrackerLimiter::consume_local(int64_t bytes) { + if (bytes == 0) return; + for (auto& tracker : _all_ancestors) { + if (tracker->label() == "Process") return; + tracker->_consumption->add(bytes); + } +} + inline void MemTrackerLimiter::cache_consume_local(int64_t bytes) { if (bytes == 0) return; int64_t consume_bytes = add_untracked_mem(bytes); - if (consume_bytes != 0) { - for (auto& tracker : _all_ancestors) { - if (tracker->label() == "Process") return; - tracker->_consumption->add(consume_bytes); - } - } + consume_local(consume_bytes); } inline bool MemTrackerLimiter::try_consume(int64_t bytes, std::string& failed_msg) { --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org For additional commands, e-mail: commits-h...@doris.apache.org