This is an automated email from the ASF dual-hosted git repository.

yiguolei pushed a commit to branch dev-1.1.2
in repository https://gitbox.apache.org/repos/asf/doris.git


The following commit(s) were added to refs/heads/dev-1.1.2 by this push:
     new 5d28575ee5 [enhancement](memtracker) Improve performance of tracking 
real physical memory of PODArray #12168 (#12260)
5d28575ee5 is described below

commit 5d28575ee5f563f0f877a9fe722b7ca84ecd7020
Author: Xinyi Zou <zouxiny...@gmail.com>
AuthorDate: Thu Sep 1 18:47:10 2022 +0800

    [enhancement](memtracker) Improve performance of tracking real physical 
memory of PODArray #12168 (#12260)
---
 be/src/runtime/exec_env.h                        |  3 +++
 be/src/runtime/exec_env_init.cpp                 |  1 +
 be/src/runtime/mem_pool.cpp                      |  6 +++---
 be/src/runtime/mem_pool.h                        |  4 ++--
 be/src/runtime/memory/mem_tracker.cpp            | 15 ++++++---------
 be/src/runtime/memory/mem_tracker_limiter.cpp    |  2 +-
 be/src/runtime/memory/mem_tracker_limiter.h      |  2 +-
 be/src/runtime/memory/thread_mem_tracker_mgr.cpp |  4 +++-
 be/src/runtime/memory/thread_mem_tracker_mgr.h   | 11 +++++++----
 be/src/runtime/runtime_state.cpp                 |  9 +++++----
 be/src/runtime/thread_context.h                  | 12 ++++++------
 be/src/service/doris_main.cpp                    |  2 +-
 be/src/vec/common/pod_array.h                    | 10 +++++-----
 13 files changed, 44 insertions(+), 37 deletions(-)

diff --git a/be/src/runtime/exec_env.h b/be/src/runtime/exec_env.h
index f9338b448a..e73eaa1c1f 100644
--- a/be/src/runtime/exec_env.h
+++ b/be/src/runtime/exec_env.h
@@ -121,6 +121,8 @@ public:
     PoolMemTrackerRegistry* pool_mem_trackers() { return _pool_mem_trackers; }
 
     std::shared_ptr<MemTrackerLimiter> new_process_mem_tracker() { return 
_process_mem_tracker; }
+    MemTrackerLimiter* process_mem_tracker_raw() { return 
_process_mem_tracker_raw; }
+
     std::shared_ptr<MemTrackerLimiter> query_pool_mem_tracker() { return 
_query_pool_mem_tracker; }
     std::shared_ptr<MemTrackerLimiter> load_pool_mem_tracker() { return 
_load_pool_mem_tracker; }
     MemTrackerTaskPool* task_pool_mem_tracker_registry() { return 
_task_pool_mem_tracker_registry; }
@@ -197,6 +199,7 @@ private:
     // The ancestor for all trackers. Every tracker is visible from the 
process down.
     // Not limit total memory by process tracker, and it's just used to track 
virtual memory of process.
     std::shared_ptr<MemTrackerLimiter> _process_mem_tracker;
+    MemTrackerLimiter* _process_mem_tracker_raw;
     // The ancestor for all querys tracker.
     std::shared_ptr<MemTrackerLimiter> _query_pool_mem_tracker;
     // The ancestor for all load tracker.
diff --git a/be/src/runtime/exec_env_init.cpp b/be/src/runtime/exec_env_init.cpp
index 00c453e91c..54640a6e70 100644
--- a/be/src/runtime/exec_env_init.cpp
+++ b/be/src/runtime/exec_env_init.cpp
@@ -205,6 +205,7 @@ Status ExecEnv::_init_mem_tracker() {
 
     _process_mem_tracker =
             std::make_shared<MemTrackerLimiter>(global_memory_limit_bytes, 
"Process");
+    _process_mem_tracker_raw = _process_mem_tracker.get();
     thread_context()->_thread_mem_tracker_mgr->init();
     thread_context()->_thread_mem_tracker_mgr->set_check_attach(false);
 #if defined(USE_MEM_TRACKER) && !defined(__SANITIZE_ADDRESS__) && 
!defined(ADDRESS_SANITIZER) && \
diff --git a/be/src/runtime/mem_pool.cpp b/be/src/runtime/mem_pool.cpp
index f108f2266b..96a0daad4d 100644
--- a/be/src/runtime/mem_pool.cpp
+++ b/be/src/runtime/mem_pool.cpp
@@ -60,7 +60,7 @@ MemPool::~MemPool() {
     }
     mem_tracker_->Release(total_bytes_released);
     THREAD_MEM_TRACKER_TRANSFER_FROM(total_bytes_released - 
peak_allocated_bytes_,
-                                     
ExecEnv::GetInstance()->new_process_mem_tracker().get());
+                                     
ExecEnv::GetInstance()->process_mem_tracker_raw());
     
DorisMetrics::instance()->memory_pool_bytes_total->increment(-total_bytes_released);
 }
 
@@ -81,7 +81,7 @@ void MemPool::free_all() {
         ChunkAllocator::instance()->free(chunk.chunk);
     }
     THREAD_MEM_TRACKER_TRANSFER_FROM(total_bytes_released - 
peak_allocated_bytes_,
-                                     
ExecEnv::GetInstance()->new_process_mem_tracker().get());
+                                     
ExecEnv::GetInstance()->process_mem_tracker_raw());
     chunks_.clear();
     next_chunk_size_ = INITIAL_CHUNK_SIZE;
     current_chunk_idx_ = -1;
@@ -146,7 +146,7 @@ bool MemPool::find_chunk(size_t min_size, bool 
check_limits) {
         mem_tracker_->Release(chunk_size);
         return false;
     }
-    THREAD_MEM_TRACKER_TRANSFER_TO(chunk_size, 
ExecEnv::GetInstance()->new_process_mem_tracker().get());
+    THREAD_MEM_TRACKER_TRANSFER_TO(chunk_size, 
ExecEnv::GetInstance()->process_mem_tracker_raw());
     ASAN_POISON_MEMORY_REGION(chunk.data, chunk_size);
     // Put it before the first free chunk. If no free chunks, it goes at the 
end.
     if (first_free_idx == static_cast<int>(chunks_.size())) {
diff --git a/be/src/runtime/mem_pool.h b/be/src/runtime/mem_pool.h
index 128916007b..4c93e8568f 100644
--- a/be/src/runtime/mem_pool.h
+++ b/be/src/runtime/mem_pool.h
@@ -208,9 +208,9 @@ private:
     bool check_integrity(bool check_current_chunk_empty);
 
     void reset_peak() {
-        if (total_allocated_bytes_ - peak_allocated_bytes_ > 4096) {
+        if (total_allocated_bytes_ - peak_allocated_bytes_ > 65536) {
             THREAD_MEM_TRACKER_TRANSFER_FROM(total_allocated_bytes_ - 
peak_allocated_bytes_,
-                                             
ExecEnv::GetInstance()->new_process_mem_tracker().get());
+                                             
ExecEnv::GetInstance()->process_mem_tracker_raw());
             peak_allocated_bytes_ = total_allocated_bytes_;
         }
     }
diff --git a/be/src/runtime/memory/mem_tracker.cpp 
b/be/src/runtime/memory/mem_tracker.cpp
index c4801d6611..8c0ae6ebba 100644
--- a/be/src/runtime/memory/mem_tracker.cpp
+++ b/be/src/runtime/memory/mem_tracker.cpp
@@ -58,15 +58,12 @@ NewMemTracker::NewMemTracker(const std::string& label, 
RuntimeProfile* profile)
         _consumption = profile->AddSharedHighWaterMarkCounter(COUNTER_NAME, 
TUnit::BYTES);
     }
 
-    if (thread_context()->_thread_mem_tracker_mgr->limiter_mem_tracker()) {
-        _label = fmt::format(
-                "{} | {}", label,
-                
thread_context()->_thread_mem_tracker_mgr->limiter_mem_tracker()->label());
-    } else {
-        _label = label + " | ";
-    }
-
-    _bind_group_num = 
thread_context()->_thread_mem_tracker_mgr->limiter_mem_tracker()->group_num();
+    DCHECK(thread_context()->_thread_mem_tracker_mgr->limiter_mem_tracker() != 
nullptr);
+    _label = fmt::format(
+            "{} | {}", label,
+            
thread_context()->_thread_mem_tracker_mgr->limiter_mem_tracker_raw()->label());
+    _bind_group_num =
+            
thread_context()->_thread_mem_tracker_mgr->limiter_mem_tracker_raw()->group_num();
     {
         std::lock_guard<std::mutex> 
l(mem_tracker_pool[_bind_group_num].group_lock);
         _tracker_group_it = mem_tracker_pool[_bind_group_num].trackers.insert(
diff --git a/be/src/runtime/memory/mem_tracker_limiter.cpp 
b/be/src/runtime/memory/mem_tracker_limiter.cpp
index 1eb6166b9f..dccfb9db45 100644
--- a/be/src/runtime/memory/mem_tracker_limiter.cpp
+++ b/be/src/runtime/memory/mem_tracker_limiter.cpp
@@ -301,7 +301,7 @@ Status MemTrackerLimiter::mem_limit_exceeded(const 
std::string& msg,
         // The limit of the current tracker and parents is less than 0, the 
consume will not fail,
         // and the current process memory has no excess limit.
         detail += fmt::format("unknown exceed reason, executing_msg:<{}>", 
msg);
-        print_log_usage_tracker = 
ExecEnv::GetInstance()->new_process_mem_tracker().get();
+        print_log_usage_tracker = 
ExecEnv::GetInstance()->process_mem_tracker_raw();
     }
     auto st = MemTrackerLimiter::mem_limit_exceeded_construct(detail);
     if (print_log_usage_tracker != nullptr)
diff --git a/be/src/runtime/memory/mem_tracker_limiter.h 
b/be/src/runtime/memory/mem_tracker_limiter.h
index 275e52375e..0543a992b1 100644
--- a/be/src/runtime/memory/mem_tracker_limiter.h
+++ b/be/src/runtime/memory/mem_tracker_limiter.h
@@ -73,7 +73,7 @@ public:
             auto st = Status::MemoryLimitExceeded(
                     fmt::format("process memory used {} exceed limit {}, 
failed_alloc_size={}",
                     PerfCounters::get_vm_rss(), MemInfo::mem_limit(), bytes));
-            
ExecEnv::GetInstance()->new_process_mem_tracker()->print_log_usage(st.get_error_msg());
+            
ExecEnv::GetInstance()->process_mem_tracker_raw()->print_log_usage(st.get_error_msg());
             return st;
         }
         return Status::OK();
diff --git a/be/src/runtime/memory/thread_mem_tracker_mgr.cpp 
b/be/src/runtime/memory/thread_mem_tracker_mgr.cpp
index 27e66d3fb8..30f7e7f10b 100644
--- a/be/src/runtime/memory/thread_mem_tracker_mgr.cpp
+++ b/be/src/runtime/memory/thread_mem_tracker_mgr.cpp
@@ -32,6 +32,7 @@ void ThreadMemTrackerMgr::attach_limiter_tracker(
     _task_id = task_id;
     _fragment_instance_id = fragment_instance_id;
     _limiter_tracker = mem_tracker;
+    _limiter_tracker_raw = mem_tracker.get();
 }
 
 void ThreadMemTrackerMgr::detach_limiter_tracker() {
@@ -39,6 +40,7 @@ void ThreadMemTrackerMgr::detach_limiter_tracker() {
     _task_id = "";
     _fragment_instance_id = TUniqueId();
     _limiter_tracker = ExecEnv::GetInstance()->new_process_mem_tracker();
+    _limiter_tracker_raw = ExecEnv::GetInstance()->process_mem_tracker_raw();
 }
 
 void ThreadMemTrackerMgr::exceeded_cancel_task(const std::string& 
cancel_details) {
@@ -54,7 +56,7 @@ void ThreadMemTrackerMgr::exceeded(Status 
failed_try_consume_st) {
         _cb_func();
     }
     if (is_attach_query()) {
-        auto st = _limiter_tracker->mem_limit_exceeded(fmt::format("exec 
node:<{}>", ""),
+        auto st = _limiter_tracker_raw->mem_limit_exceeded(fmt::format("exec 
node:<{}>", ""),
                                                        
_limiter_tracker->parent().get(),
                                                        failed_try_consume_st);
         exceeded_cancel_task(st.get_error_msg());
diff --git a/be/src/runtime/memory/thread_mem_tracker_mgr.h 
b/be/src/runtime/memory/thread_mem_tracker_mgr.h
index 0cd371598a..5135d6222b 100644
--- a/be/src/runtime/memory/thread_mem_tracker_mgr.h
+++ b/be/src/runtime/memory/thread_mem_tracker_mgr.h
@@ -50,7 +50,7 @@ public:
 
     // only for tcmalloc hook
     static void consume_no_attach(int64_t size) {
-        ExecEnv::GetInstance()->new_process_mem_tracker()->consume(size);
+        ExecEnv::GetInstance()->process_mem_tracker_raw()->consume(size);
     }
 
     // After thread initialization, calling `init` again must call 
`clear_untracked_mems` first
@@ -81,6 +81,7 @@ public:
     bool is_attach_query() { return _fragment_instance_id != TUniqueId(); }
 
     std::shared_ptr<MemTrackerLimiter> limiter_mem_tracker() { return 
_limiter_tracker; }
+    MemTrackerLimiter* limiter_mem_tracker_raw() { return 
_limiter_tracker_raw; }
 
     void set_check_limit(bool check_limit) { _check_limit = check_limit; }
     void set_check_attach(bool check_attach) { _check_attach = check_attach; }
@@ -111,6 +112,7 @@ private:
 
     std::shared_ptr<MemTrackerLimiter> _limiter_tracker;
     std::vector<NewMemTracker*> _consumer_tracker_stack;
+    MemTrackerLimiter* _limiter_tracker_raw;
 
     // If true, call memtracker try_consume, otherwise call consume.
     bool _check_limit = false;
@@ -126,6 +128,7 @@ inline void ThreadMemTrackerMgr::init() {
     DCHECK(_consumer_tracker_stack.empty());
     _task_id = "";
     _limiter_tracker = ExecEnv::GetInstance()->new_process_mem_tracker();
+    _limiter_tracker_raw = ExecEnv::GetInstance()->process_mem_tracker_raw();
     _check_limit = true;
 }
 
@@ -176,15 +179,15 @@ inline void ThreadMemTrackerMgr::flush_untracked_mem() {
         // DCHECK(!_check_attach || btls_key != EMPTY_BTLS_KEY ||
         //        _limiter_tracker->label() != "Process");
 #endif
-        Status st = _limiter_tracker->try_consume(_untracked_mem);
+        Status st = _limiter_tracker_raw->try_consume(_untracked_mem);
         if (!st) {
             // The memory has been allocated, so when TryConsume fails, need 
to continue to complete
             // the consume to ensure the accuracy of the statistics.
-            _limiter_tracker->consume(_untracked_mem);
+            _limiter_tracker_raw->consume(_untracked_mem);
             exceeded(st);
         }
     } else {
-        _limiter_tracker->consume(_untracked_mem);
+        _limiter_tracker_raw->consume(_untracked_mem);
     }
     for (auto tracker : _consumer_tracker_stack) {
         tracker->consume(_untracked_mem);
diff --git a/be/src/runtime/runtime_state.cpp b/be/src/runtime/runtime_state.cpp
index e98467f95b..556281e993 100644
--- a/be/src/runtime/runtime_state.cpp
+++ b/be/src/runtime/runtime_state.cpp
@@ -193,13 +193,14 @@ Status RuntimeState::init(const TUniqueId& 
fragment_instance_id, const TQueryOpt
 Status RuntimeState::init_mem_trackers(const TUniqueId& query_id) {
     bool has_query_mem_tracker = _query_options.__isset.mem_limit && 
(_query_options.mem_limit > 0);
     int64_t bytes_limit = has_query_mem_tracker ? _query_options.mem_limit : 
-1;
-    if (bytes_limit > ExecEnv::GetInstance()->process_mem_tracker()->limit()) {
+    if (bytes_limit > 
ExecEnv::GetInstance()->process_mem_tracker_raw()->limit()) {
         VLOG_NOTICE << "Query memory limit " << 
PrettyPrinter::print(bytes_limit, TUnit::BYTES)
                     << " exceeds process memory limit of "
-                    << 
PrettyPrinter::print(ExecEnv::GetInstance()->process_mem_tracker()->limit(),
-                                            TUnit::BYTES)
+                    << PrettyPrinter::print(
+                               
ExecEnv::GetInstance()->process_mem_tracker_raw()->limit(),
+                               TUnit::BYTES)
                     << ". Using process memory limit instead";
-        bytes_limit = ExecEnv::GetInstance()->process_mem_tracker()->limit();
+        bytes_limit = 
ExecEnv::GetInstance()->process_mem_tracker_raw()->limit();
     }
 
     // we do not use global query-map  for now, to avoid mem-exceeded 
different fragments
diff --git a/be/src/runtime/thread_context.h b/be/src/runtime/thread_context.h
index 5f524996ae..c5a4566c73 100644
--- a/be/src/runtime/thread_context.h
+++ b/be/src/runtime/thread_context.h
@@ -136,7 +136,7 @@ public:
                      const std::shared_ptr<MemTrackerLimiter>& mem_tracker) {
         DCHECK((_type == TaskType::UNKNOWN || _type == TaskType::BRPC) && 
_task_id == "")
                 << ",new tracker label: " << mem_tracker->label() << ",old 
tracker label: "
-                << _thread_mem_tracker_mgr->limiter_mem_tracker()->label();
+                << _thread_mem_tracker_mgr->limiter_mem_tracker_raw()->label();
         DCHECK(type != TaskType::UNKNOWN);
         _type = type;
         _task_id = task_id;
@@ -256,15 +256,15 @@ public:
     doris::thread_context()->_thread_mem_tracker_mgr->consume(size)
 #define RELEASE_THREAD_MEM_TRACKER(size) \
     doris::thread_context()->_thread_mem_tracker_mgr->consume(-size)
-#define THREAD_MEM_TRACKER_TRANSFER_TO(size, tracker)                          
                \
-    
doris::thread_context()->_thread_mem_tracker_mgr->limiter_mem_tracker()->transfer_to(size,
 \
-                                                                               
          tracker)
+#define THREAD_MEM_TRACKER_TRANSFER_TO(size, tracker)                          
               \
+    
doris::thread_context()->_thread_mem_tracker_mgr->limiter_mem_tracker_raw()->transfer_to(
 \
+            size, tracker)
 #define THREAD_MEM_TRACKER_TRANSFER_FROM(size, tracker) \
     tracker->transfer_to(                               \
-            size, 
doris::thread_context()->_thread_mem_tracker_mgr->limiter_mem_tracker().get())
+            size, 
doris::thread_context()->_thread_mem_tracker_mgr->limiter_mem_tracker_raw())
 #define RETURN_LIMIT_EXCEEDED(state, msg, ...)                                 
     \
     return doris::thread_context()                                             
     \
-            ->_thread_mem_tracker_mgr->limiter_mem_tracker()                   
     \
+            ->_thread_mem_tracker_mgr->limiter_mem_tracker_raw()               
     \
             ->mem_limit_exceeded(state, fmt::format("exec node:<{}>, {}", "", 
msg), \
                                  ##__VA_ARGS__);
 } // namespace doris
diff --git a/be/src/service/doris_main.cpp b/be/src/service/doris_main.cpp
index d80b1bc504..37d08a2a38 100644
--- a/be/src/service/doris_main.cpp
+++ b/be/src/service/doris_main.cpp
@@ -462,7 +462,7 @@ int main(int argc, char** argv) {
 
         // 1s clear the expired task mem tracker, a query mem tracker is about 
57 bytes.
         
doris::ExecEnv::GetInstance()->task_pool_mem_tracker_registry()->logout_task_mem_tracker();
-        
doris::ExecEnv::GetInstance()->new_process_mem_tracker()->enable_print_log_usage();
+        
doris::ExecEnv::GetInstance()->process_mem_tracker_raw()->enable_print_log_usage();
         sleep(1);
     }
 
diff --git a/be/src/vec/common/pod_array.h b/be/src/vec/common/pod_array.h
index 5d2961ccea..5deb16d106 100644
--- a/be/src/vec/common/pod_array.h
+++ b/be/src/vec/common/pod_array.h
@@ -113,9 +113,9 @@ protected:
     }
 
     inline void reset_peak() {
-        if (UNLIKELY(c_end - c_end_peak > 4096)) {
+        if (UNLIKELY(c_end - c_end_peak > 65536)) {
             THREAD_MEM_TRACKER_TRANSFER_FROM(c_end - c_end_peak,
-                                             
ExecEnv::GetInstance()->new_process_mem_tracker().get());
+                                             
ExecEnv::GetInstance()->process_mem_tracker_raw());
             c_end_peak = c_end;
         }
     }
@@ -127,7 +127,7 @@ protected:
     template <typename... TAllocatorParams>
     void alloc(size_t bytes, TAllocatorParams&&... allocator_params) {
         THREAD_MEM_TRACKER_TRANSFER_TO(bytes - pad_right - pad_left,
-                                       
ExecEnv::GetInstance()->new_process_mem_tracker().get());
+                                       
ExecEnv::GetInstance()->process_mem_tracker_raw());
         c_start = c_end = c_end_peak =
                 reinterpret_cast<char*>(TAllocator::alloc(
                         bytes, 
std::forward<TAllocatorParams>(allocator_params)...)) +
@@ -144,7 +144,7 @@ protected:
 
         TAllocator::free(c_start - pad_left, allocated_bytes());
         THREAD_MEM_TRACKER_TRANSFER_FROM(c_end_of_storage - c_end_peak,
-                                         
ExecEnv::GetInstance()->new_process_mem_tracker().get());
+                                         
ExecEnv::GetInstance()->process_mem_tracker_raw());
     }
 
     template <typename... TAllocatorParams>
@@ -157,7 +157,7 @@ protected:
         unprotect();
 
         THREAD_MEM_TRACKER_TRANSFER_TO(bytes - allocated_bytes(),
-                                       
ExecEnv::GetInstance()->new_process_mem_tracker().get());
+                                       
ExecEnv::GetInstance()->process_mem_tracker_raw());
 
         ptrdiff_t end_diff = c_end - c_start;
 


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org
For additional commands, e-mail: commits-h...@doris.apache.org

Reply via email to