This is an automated email from the ASF dual-hosted git repository.

yiguolei pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/doris.git


The following commit(s) were added to refs/heads/master by this push:
     new 72220440dc [fix](memtracker) Remove mem tracker record mem pool actual 
memory usage #12954
72220440dc is described below

commit 72220440dc364fea4ca7f48ebe8262374d18bd70
Author: Xinyi Zou <zouxiny...@gmail.com>
AuthorDate: Mon Sep 26 12:54:06 2022 +0800

    [fix](memtracker) Remove mem tracker record mem pool actual memory usage 
#12954
    
    In order to avoid different mem tracker consumption values of multiple 
queries/loads, and the difference between the virtual memory of alloc and the 
physical memory actually increased by the process.
    
    The memory alloc in PODArray and mempool will not be recorded in the 
query/load mem tracker immediately, but will be gradually recorded in the mem 
tracker during the memory usage.
    
    But mem pool allocates memory from chunk allocator. If this chunk is used 
after the second time, it may have used physical memory. The above mechanism 
will cause the load channel memory statistics to be less than the actual value.
---
 be/src/runtime/exec_env_init.cpp              |  1 -
 be/src/runtime/mem_pool.cpp                   | 11 -----------
 be/src/runtime/mem_pool.h                     | 13 -------------
 be/src/runtime/memory/mem_tracker.cpp         |  7 ++-----
 be/src/runtime/memory/mem_tracker_limiter.cpp |  8 ++------
 be/src/runtime/memory/mem_tracker_limiter.h   | 17 +++++++++++------
 6 files changed, 15 insertions(+), 42 deletions(-)

diff --git a/be/src/runtime/exec_env_init.cpp b/be/src/runtime/exec_env_init.cpp
index c13c059d5b..1afb147b05 100644
--- a/be/src/runtime/exec_env_init.cpp
+++ b/be/src/runtime/exec_env_init.cpp
@@ -216,7 +216,6 @@ Status ExecEnv::_init_mem_tracker() {
         init_hook();
     }
 #endif
-
     _allocator_cache_mem_tracker = 
std::make_shared<MemTracker>("Tc/JemallocAllocatorCache");
     _query_pool_mem_tracker =
             std::make_shared<MemTrackerLimiter>(-1, "QueryPool", 
_process_mem_tracker);
diff --git a/be/src/runtime/mem_pool.cpp b/be/src/runtime/mem_pool.cpp
index 9064f58818..c2b709162c 100644
--- a/be/src/runtime/mem_pool.cpp
+++ b/be/src/runtime/mem_pool.cpp
@@ -45,7 +45,6 @@ MemPool::MemPool(MemTracker* mem_tracker)
           next_chunk_size_(INITIAL_CHUNK_SIZE),
           total_allocated_bytes_(0),
           total_reserved_bytes_(0),
-          peak_allocated_bytes_(0),
           _mem_tracker(mem_tracker) {}
 
 MemPool::MemPool()
@@ -53,7 +52,6 @@ MemPool::MemPool()
           next_chunk_size_(INITIAL_CHUNK_SIZE),
           total_allocated_bytes_(0),
           total_reserved_bytes_(0),
-          peak_allocated_bytes_(0),
           _mem_tracker(nullptr) {}
 
 MemPool::ChunkInfo::ChunkInfo(const Chunk& chunk_) : chunk(chunk_), 
allocated_bytes(0) {
@@ -66,8 +64,6 @@ MemPool::~MemPool() {
         total_bytes_released += chunk.chunk.size;
         ChunkAllocator::instance()->free(chunk.chunk);
     }
-    THREAD_MEM_TRACKER_TRANSFER_FROM(total_bytes_released - 
peak_allocated_bytes_,
-                                     
ExecEnv::GetInstance()->orphan_mem_tracker_raw());
     if (_mem_tracker) _mem_tracker->release(total_bytes_released);
     
DorisMetrics::instance()->memory_pool_bytes_total->increment(-total_bytes_released);
 }
@@ -88,15 +84,12 @@ void MemPool::free_all() {
         total_bytes_released += chunk.chunk.size;
         ChunkAllocator::instance()->free(chunk.chunk);
     }
-    THREAD_MEM_TRACKER_TRANSFER_FROM(total_bytes_released - 
peak_allocated_bytes_,
-                                     
ExecEnv::GetInstance()->orphan_mem_tracker_raw());
     if (_mem_tracker) _mem_tracker->release(total_bytes_released);
     chunks_.clear();
     next_chunk_size_ = INITIAL_CHUNK_SIZE;
     current_chunk_idx_ = -1;
     total_allocated_bytes_ = 0;
     total_reserved_bytes_ = 0;
-    peak_allocated_bytes_ = 0;
 
     
DorisMetrics::instance()->memory_pool_bytes_total->increment(-total_bytes_released);
 }
@@ -150,7 +143,6 @@ Status MemPool::find_chunk(size_t min_size, bool 
check_limits) {
     // Allocate a new chunk. Return early if allocate fails.
     Chunk chunk;
     RETURN_IF_ERROR(ChunkAllocator::instance()->allocate(chunk_size, &chunk));
-    THREAD_MEM_TRACKER_TRANSFER_TO(chunk_size, 
ExecEnv::GetInstance()->orphan_mem_tracker_raw());
     if (_mem_tracker) _mem_tracker->consume(chunk_size);
     ASAN_POISON_MEMORY_REGION(chunk.data, chunk_size);
     // Put it before the first free chunk. If no free chunks, it goes at the 
end.
@@ -221,8 +213,6 @@ void MemPool::acquire_data(MemPool* src, bool keep_current) 
{
         src->total_allocated_bytes_ = 0;
     }
 
-    reset_peak();
-
     if (!keep_current) src->free_all();
     DCHECK(src->check_integrity(false));
     DCHECK(check_integrity(false));
@@ -243,7 +233,6 @@ void MemPool::exchange_data(MemPool* other) {
     std::swap(next_chunk_size_, other->next_chunk_size_);
     std::swap(total_allocated_bytes_, other->total_allocated_bytes_);
     std::swap(total_reserved_bytes_, other->total_reserved_bytes_);
-    std::swap(peak_allocated_bytes_, other->peak_allocated_bytes_);
     std::swap(chunks_, other->chunks_);
 }
 
diff --git a/be/src/runtime/mem_pool.h b/be/src/runtime/mem_pool.h
index bf388dde77..41240ab375 100644
--- a/be/src/runtime/mem_pool.h
+++ b/be/src/runtime/mem_pool.h
@@ -162,7 +162,6 @@ public:
 
     int64_t total_allocated_bytes() const { return total_allocated_bytes_; }
     int64_t total_reserved_bytes() const { return total_reserved_bytes_; }
-    int64_t peak_allocated_bytes() const { return peak_allocated_bytes_; }
 
     MemTracker* mem_tracker() { return _mem_tracker; }
 
@@ -211,14 +210,6 @@ private:
     /// data. Otherwise the current chunk can be either empty or full.
     bool check_integrity(bool check_current_chunk_empty);
 
-    void reset_peak() {
-        if (total_allocated_bytes_ - peak_allocated_bytes_ > 65536) {
-            THREAD_MEM_TRACKER_TRANSFER_FROM(total_allocated_bytes_ - 
peak_allocated_bytes_,
-                                             
ExecEnv::GetInstance()->orphan_mem_tracker_raw());
-            peak_allocated_bytes_ = total_allocated_bytes_;
-        }
-    }
-
     /// Return offset to unoccupied space in current chunk.
     int64_t get_free_offset() const {
         if (current_chunk_idx_ == -1) return 0;
@@ -250,7 +241,6 @@ private:
             DCHECK_LE(info.allocated_bytes + size, info.chunk.size);
             info.allocated_bytes += padding + size;
             total_allocated_bytes_ += padding + size;
-            reset_peak();
             DCHECK_LE(current_chunk_idx_, chunks_.size() - 1);
             return result;
         }
@@ -308,9 +298,6 @@ private:
     /// sum of all bytes allocated in chunks_
     int64_t total_reserved_bytes_;
 
-    /// Maximum number of bytes allocated from this pool at one time.
-    int64_t peak_allocated_bytes_;
-
     std::vector<ChunkInfo> chunks_;
 
     /// The current and peak memory footprint of this pool. This is different 
from
diff --git a/be/src/runtime/memory/mem_tracker.cpp 
b/be/src/runtime/memory/mem_tracker.cpp
index 5d53382c0a..9c0b006281 100644
--- a/be/src/runtime/memory/mem_tracker.cpp
+++ b/be/src/runtime/memory/mem_tracker.cpp
@@ -61,11 +61,8 @@ MemTracker::MemTracker(const std::string& label, 
RuntimeProfile* profile) {
     
DCHECK(thread_context()->_thread_mem_tracker_mgr->limiter_mem_tracker_raw() != 
nullptr);
     MemTrackerLimiter* parent =
             
thread_context()->_thread_mem_tracker_mgr->limiter_mem_tracker_raw();
-    _label = fmt::format("[Observer] {} | {}", label,
-                         parent->label() == "Orphan" ? "Process" : 
parent->label());
-    _bind_group_num = parent->label() == "Orphan"
-                              ? 
ExecEnv::GetInstance()->process_mem_tracker()->group_num()
-                              : parent->group_num();
+    _label = fmt::format("[Observer] {} | {}", label, parent->label());
+    _bind_group_num = parent->group_num();
     {
         std::lock_guard<std::mutex> 
l(mem_tracker_pool[_bind_group_num].group_lock);
         _tracker_group_it = mem_tracker_pool[_bind_group_num].trackers.insert(
diff --git a/be/src/runtime/memory/mem_tracker_limiter.cpp 
b/be/src/runtime/memory/mem_tracker_limiter.cpp
index 0959725d2b..1ca75b2af4 100644
--- a/be/src/runtime/memory/mem_tracker_limiter.cpp
+++ b/be/src/runtime/memory/mem_tracker_limiter.cpp
@@ -78,7 +78,7 @@ MemTrackerLimiter::~MemTrackerLimiter() {
     DCHECK(remain_child_count() == 0 || _label == "Process");
     // In order to ensure `consumption of all limiter trackers` + `orphan 
tracker consumption` = `process tracker consumption`
     // in real time. Merge its consumption into orphan when parent is process, 
to avoid repetition.
-    if ((_parent && _parent->label() == "Process")) {
+    if (_parent && _parent->label() == "Process") {
         ExecEnv::GetInstance()->orphan_mem_tracker_raw()->cache_consume_local(
                 _consumption->current_value());
     }
@@ -89,11 +89,7 @@ MemTrackerLimiter::~MemTrackerLimiter() {
         _all_ancestors.clear();
         
_all_ancestors.push_back(ExecEnv::GetInstance()->orphan_mem_tracker_raw());
     }
-    for (auto& tracker : _all_ancestors) {
-        if (tracker->label() != "Process") {
-            tracker->_consumption->add(_untracked_mem);
-        }
-    }
+    consume_local(_untracked_mem);
     if (_parent) {
         std::lock_guard<std::mutex> l(_parent->_child_tracker_limiter_lock);
         if (_child_tracker_it != _parent->_child_tracker_limiters.end()) {
diff --git a/be/src/runtime/memory/mem_tracker_limiter.h 
b/be/src/runtime/memory/mem_tracker_limiter.h
index e68d5b6551..0f882343d3 100644
--- a/be/src/runtime/memory/mem_tracker_limiter.h
+++ b/be/src/runtime/memory/mem_tracker_limiter.h
@@ -181,6 +181,8 @@ private:
     WARN_UNUSED_RESULT
     bool try_consume(int64_t bytes, std::string& failed_msg);
 
+    void consume_local(int64_t bytes);
+
     // When the accumulated untracked memory value exceeds the upper limit,
     // the current value is returned and set to 0.
     // Thread safety.
@@ -273,15 +275,18 @@ inline int64_t 
MemTrackerLimiter::add_untracked_mem(int64_t bytes) {
     return 0;
 }
 
+inline void MemTrackerLimiter::consume_local(int64_t bytes) {
+    if (bytes == 0) return;
+    for (auto& tracker : _all_ancestors) {
+        if (tracker->label() == "Process") return;
+        tracker->_consumption->add(bytes);
+    }
+}
+
 inline void MemTrackerLimiter::cache_consume_local(int64_t bytes) {
     if (bytes == 0) return;
     int64_t consume_bytes = add_untracked_mem(bytes);
-    if (consume_bytes != 0) {
-        for (auto& tracker : _all_ancestors) {
-            if (tracker->label() == "Process") return;
-            tracker->_consumption->add(consume_bytes);
-        }
-    }
+    consume_local(consume_bytes);
 }
 
 inline bool MemTrackerLimiter::try_consume(int64_t bytes, std::string& 
failed_msg) {


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org
For additional commands, e-mail: commits-h...@doris.apache.org

Reply via email to