This is an automated email from the ASF dual-hosted git repository. yiguolei pushed a commit to branch dev-1.1.2 in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/dev-1.1.2 by this push: new 5d28575ee5 [enhancement](memtracker) Improve performance of tracking real physical memory of PODArray #12168 (#12260) 5d28575ee5 is described below commit 5d28575ee5f563f0f877a9fe722b7ca84ecd7020 Author: Xinyi Zou <zouxiny...@gmail.com> AuthorDate: Thu Sep 1 18:47:10 2022 +0800 [enhancement](memtracker) Improve performance of tracking real physical memory of PODArray #12168 (#12260) --- be/src/runtime/exec_env.h | 3 +++ be/src/runtime/exec_env_init.cpp | 1 + be/src/runtime/mem_pool.cpp | 6 +++--- be/src/runtime/mem_pool.h | 4 ++-- be/src/runtime/memory/mem_tracker.cpp | 15 ++++++--------- be/src/runtime/memory/mem_tracker_limiter.cpp | 2 +- be/src/runtime/memory/mem_tracker_limiter.h | 2 +- be/src/runtime/memory/thread_mem_tracker_mgr.cpp | 4 +++- be/src/runtime/memory/thread_mem_tracker_mgr.h | 11 +++++++---- be/src/runtime/runtime_state.cpp | 9 +++++---- be/src/runtime/thread_context.h | 12 ++++++------ be/src/service/doris_main.cpp | 2 +- be/src/vec/common/pod_array.h | 10 +++++----- 13 files changed, 44 insertions(+), 37 deletions(-) diff --git a/be/src/runtime/exec_env.h b/be/src/runtime/exec_env.h index f9338b448a..e73eaa1c1f 100644 --- a/be/src/runtime/exec_env.h +++ b/be/src/runtime/exec_env.h @@ -121,6 +121,8 @@ public: PoolMemTrackerRegistry* pool_mem_trackers() { return _pool_mem_trackers; } std::shared_ptr<MemTrackerLimiter> new_process_mem_tracker() { return _process_mem_tracker; } + MemTrackerLimiter* process_mem_tracker_raw() { return _process_mem_tracker_raw; } + std::shared_ptr<MemTrackerLimiter> query_pool_mem_tracker() { return _query_pool_mem_tracker; } std::shared_ptr<MemTrackerLimiter> load_pool_mem_tracker() { return _load_pool_mem_tracker; } MemTrackerTaskPool* task_pool_mem_tracker_registry() { return _task_pool_mem_tracker_registry; } @@ -197,6 +199,7 @@ private: // The ancestor for all trackers. Every tracker is visible from the process down. // Not limit total memory by process tracker, and it's just used to track virtual memory of process. std::shared_ptr<MemTrackerLimiter> _process_mem_tracker; + MemTrackerLimiter* _process_mem_tracker_raw; // The ancestor for all querys tracker. std::shared_ptr<MemTrackerLimiter> _query_pool_mem_tracker; // The ancestor for all load tracker. diff --git a/be/src/runtime/exec_env_init.cpp b/be/src/runtime/exec_env_init.cpp index 00c453e91c..54640a6e70 100644 --- a/be/src/runtime/exec_env_init.cpp +++ b/be/src/runtime/exec_env_init.cpp @@ -205,6 +205,7 @@ Status ExecEnv::_init_mem_tracker() { _process_mem_tracker = std::make_shared<MemTrackerLimiter>(global_memory_limit_bytes, "Process"); + _process_mem_tracker_raw = _process_mem_tracker.get(); thread_context()->_thread_mem_tracker_mgr->init(); thread_context()->_thread_mem_tracker_mgr->set_check_attach(false); #if defined(USE_MEM_TRACKER) && !defined(__SANITIZE_ADDRESS__) && !defined(ADDRESS_SANITIZER) && \ diff --git a/be/src/runtime/mem_pool.cpp b/be/src/runtime/mem_pool.cpp index f108f2266b..96a0daad4d 100644 --- a/be/src/runtime/mem_pool.cpp +++ b/be/src/runtime/mem_pool.cpp @@ -60,7 +60,7 @@ MemPool::~MemPool() { } mem_tracker_->Release(total_bytes_released); THREAD_MEM_TRACKER_TRANSFER_FROM(total_bytes_released - peak_allocated_bytes_, - ExecEnv::GetInstance()->new_process_mem_tracker().get()); + ExecEnv::GetInstance()->process_mem_tracker_raw()); DorisMetrics::instance()->memory_pool_bytes_total->increment(-total_bytes_released); } @@ -81,7 +81,7 @@ void MemPool::free_all() { ChunkAllocator::instance()->free(chunk.chunk); } THREAD_MEM_TRACKER_TRANSFER_FROM(total_bytes_released - peak_allocated_bytes_, - ExecEnv::GetInstance()->new_process_mem_tracker().get()); + ExecEnv::GetInstance()->process_mem_tracker_raw()); chunks_.clear(); next_chunk_size_ = INITIAL_CHUNK_SIZE; current_chunk_idx_ = -1; @@ -146,7 +146,7 @@ bool MemPool::find_chunk(size_t min_size, bool check_limits) { mem_tracker_->Release(chunk_size); return false; } - THREAD_MEM_TRACKER_TRANSFER_TO(chunk_size, ExecEnv::GetInstance()->new_process_mem_tracker().get()); + THREAD_MEM_TRACKER_TRANSFER_TO(chunk_size, ExecEnv::GetInstance()->process_mem_tracker_raw()); ASAN_POISON_MEMORY_REGION(chunk.data, chunk_size); // Put it before the first free chunk. If no free chunks, it goes at the end. if (first_free_idx == static_cast<int>(chunks_.size())) { diff --git a/be/src/runtime/mem_pool.h b/be/src/runtime/mem_pool.h index 128916007b..4c93e8568f 100644 --- a/be/src/runtime/mem_pool.h +++ b/be/src/runtime/mem_pool.h @@ -208,9 +208,9 @@ private: bool check_integrity(bool check_current_chunk_empty); void reset_peak() { - if (total_allocated_bytes_ - peak_allocated_bytes_ > 4096) { + if (total_allocated_bytes_ - peak_allocated_bytes_ > 65536) { THREAD_MEM_TRACKER_TRANSFER_FROM(total_allocated_bytes_ - peak_allocated_bytes_, - ExecEnv::GetInstance()->new_process_mem_tracker().get()); + ExecEnv::GetInstance()->process_mem_tracker_raw()); peak_allocated_bytes_ = total_allocated_bytes_; } } diff --git a/be/src/runtime/memory/mem_tracker.cpp b/be/src/runtime/memory/mem_tracker.cpp index c4801d6611..8c0ae6ebba 100644 --- a/be/src/runtime/memory/mem_tracker.cpp +++ b/be/src/runtime/memory/mem_tracker.cpp @@ -58,15 +58,12 @@ NewMemTracker::NewMemTracker(const std::string& label, RuntimeProfile* profile) _consumption = profile->AddSharedHighWaterMarkCounter(COUNTER_NAME, TUnit::BYTES); } - if (thread_context()->_thread_mem_tracker_mgr->limiter_mem_tracker()) { - _label = fmt::format( - "{} | {}", label, - thread_context()->_thread_mem_tracker_mgr->limiter_mem_tracker()->label()); - } else { - _label = label + " | "; - } - - _bind_group_num = thread_context()->_thread_mem_tracker_mgr->limiter_mem_tracker()->group_num(); + DCHECK(thread_context()->_thread_mem_tracker_mgr->limiter_mem_tracker() != nullptr); + _label = fmt::format( + "{} | {}", label, + thread_context()->_thread_mem_tracker_mgr->limiter_mem_tracker_raw()->label()); + _bind_group_num = + thread_context()->_thread_mem_tracker_mgr->limiter_mem_tracker_raw()->group_num(); { std::lock_guard<std::mutex> l(mem_tracker_pool[_bind_group_num].group_lock); _tracker_group_it = mem_tracker_pool[_bind_group_num].trackers.insert( diff --git a/be/src/runtime/memory/mem_tracker_limiter.cpp b/be/src/runtime/memory/mem_tracker_limiter.cpp index 1eb6166b9f..dccfb9db45 100644 --- a/be/src/runtime/memory/mem_tracker_limiter.cpp +++ b/be/src/runtime/memory/mem_tracker_limiter.cpp @@ -301,7 +301,7 @@ Status MemTrackerLimiter::mem_limit_exceeded(const std::string& msg, // The limit of the current tracker and parents is less than 0, the consume will not fail, // and the current process memory has no excess limit. detail += fmt::format("unknown exceed reason, executing_msg:<{}>", msg); - print_log_usage_tracker = ExecEnv::GetInstance()->new_process_mem_tracker().get(); + print_log_usage_tracker = ExecEnv::GetInstance()->process_mem_tracker_raw(); } auto st = MemTrackerLimiter::mem_limit_exceeded_construct(detail); if (print_log_usage_tracker != nullptr) diff --git a/be/src/runtime/memory/mem_tracker_limiter.h b/be/src/runtime/memory/mem_tracker_limiter.h index 275e52375e..0543a992b1 100644 --- a/be/src/runtime/memory/mem_tracker_limiter.h +++ b/be/src/runtime/memory/mem_tracker_limiter.h @@ -73,7 +73,7 @@ public: auto st = Status::MemoryLimitExceeded( fmt::format("process memory used {} exceed limit {}, failed_alloc_size={}", PerfCounters::get_vm_rss(), MemInfo::mem_limit(), bytes)); - ExecEnv::GetInstance()->new_process_mem_tracker()->print_log_usage(st.get_error_msg()); + ExecEnv::GetInstance()->process_mem_tracker_raw()->print_log_usage(st.get_error_msg()); return st; } return Status::OK(); diff --git a/be/src/runtime/memory/thread_mem_tracker_mgr.cpp b/be/src/runtime/memory/thread_mem_tracker_mgr.cpp index 27e66d3fb8..30f7e7f10b 100644 --- a/be/src/runtime/memory/thread_mem_tracker_mgr.cpp +++ b/be/src/runtime/memory/thread_mem_tracker_mgr.cpp @@ -32,6 +32,7 @@ void ThreadMemTrackerMgr::attach_limiter_tracker( _task_id = task_id; _fragment_instance_id = fragment_instance_id; _limiter_tracker = mem_tracker; + _limiter_tracker_raw = mem_tracker.get(); } void ThreadMemTrackerMgr::detach_limiter_tracker() { @@ -39,6 +40,7 @@ void ThreadMemTrackerMgr::detach_limiter_tracker() { _task_id = ""; _fragment_instance_id = TUniqueId(); _limiter_tracker = ExecEnv::GetInstance()->new_process_mem_tracker(); + _limiter_tracker_raw = ExecEnv::GetInstance()->process_mem_tracker_raw(); } void ThreadMemTrackerMgr::exceeded_cancel_task(const std::string& cancel_details) { @@ -54,7 +56,7 @@ void ThreadMemTrackerMgr::exceeded(Status failed_try_consume_st) { _cb_func(); } if (is_attach_query()) { - auto st = _limiter_tracker->mem_limit_exceeded(fmt::format("exec node:<{}>", ""), + auto st = _limiter_tracker_raw->mem_limit_exceeded(fmt::format("exec node:<{}>", ""), _limiter_tracker->parent().get(), failed_try_consume_st); exceeded_cancel_task(st.get_error_msg()); diff --git a/be/src/runtime/memory/thread_mem_tracker_mgr.h b/be/src/runtime/memory/thread_mem_tracker_mgr.h index 0cd371598a..5135d6222b 100644 --- a/be/src/runtime/memory/thread_mem_tracker_mgr.h +++ b/be/src/runtime/memory/thread_mem_tracker_mgr.h @@ -50,7 +50,7 @@ public: // only for tcmalloc hook static void consume_no_attach(int64_t size) { - ExecEnv::GetInstance()->new_process_mem_tracker()->consume(size); + ExecEnv::GetInstance()->process_mem_tracker_raw()->consume(size); } // After thread initialization, calling `init` again must call `clear_untracked_mems` first @@ -81,6 +81,7 @@ public: bool is_attach_query() { return _fragment_instance_id != TUniqueId(); } std::shared_ptr<MemTrackerLimiter> limiter_mem_tracker() { return _limiter_tracker; } + MemTrackerLimiter* limiter_mem_tracker_raw() { return _limiter_tracker_raw; } void set_check_limit(bool check_limit) { _check_limit = check_limit; } void set_check_attach(bool check_attach) { _check_attach = check_attach; } @@ -111,6 +112,7 @@ private: std::shared_ptr<MemTrackerLimiter> _limiter_tracker; std::vector<NewMemTracker*> _consumer_tracker_stack; + MemTrackerLimiter* _limiter_tracker_raw; // If true, call memtracker try_consume, otherwise call consume. bool _check_limit = false; @@ -126,6 +128,7 @@ inline void ThreadMemTrackerMgr::init() { DCHECK(_consumer_tracker_stack.empty()); _task_id = ""; _limiter_tracker = ExecEnv::GetInstance()->new_process_mem_tracker(); + _limiter_tracker_raw = ExecEnv::GetInstance()->process_mem_tracker_raw(); _check_limit = true; } @@ -176,15 +179,15 @@ inline void ThreadMemTrackerMgr::flush_untracked_mem() { // DCHECK(!_check_attach || btls_key != EMPTY_BTLS_KEY || // _limiter_tracker->label() != "Process"); #endif - Status st = _limiter_tracker->try_consume(_untracked_mem); + Status st = _limiter_tracker_raw->try_consume(_untracked_mem); if (!st) { // The memory has been allocated, so when TryConsume fails, need to continue to complete // the consume to ensure the accuracy of the statistics. - _limiter_tracker->consume(_untracked_mem); + _limiter_tracker_raw->consume(_untracked_mem); exceeded(st); } } else { - _limiter_tracker->consume(_untracked_mem); + _limiter_tracker_raw->consume(_untracked_mem); } for (auto tracker : _consumer_tracker_stack) { tracker->consume(_untracked_mem); diff --git a/be/src/runtime/runtime_state.cpp b/be/src/runtime/runtime_state.cpp index e98467f95b..556281e993 100644 --- a/be/src/runtime/runtime_state.cpp +++ b/be/src/runtime/runtime_state.cpp @@ -193,13 +193,14 @@ Status RuntimeState::init(const TUniqueId& fragment_instance_id, const TQueryOpt Status RuntimeState::init_mem_trackers(const TUniqueId& query_id) { bool has_query_mem_tracker = _query_options.__isset.mem_limit && (_query_options.mem_limit > 0); int64_t bytes_limit = has_query_mem_tracker ? _query_options.mem_limit : -1; - if (bytes_limit > ExecEnv::GetInstance()->process_mem_tracker()->limit()) { + if (bytes_limit > ExecEnv::GetInstance()->process_mem_tracker_raw()->limit()) { VLOG_NOTICE << "Query memory limit " << PrettyPrinter::print(bytes_limit, TUnit::BYTES) << " exceeds process memory limit of " - << PrettyPrinter::print(ExecEnv::GetInstance()->process_mem_tracker()->limit(), - TUnit::BYTES) + << PrettyPrinter::print( + ExecEnv::GetInstance()->process_mem_tracker_raw()->limit(), + TUnit::BYTES) << ". Using process memory limit instead"; - bytes_limit = ExecEnv::GetInstance()->process_mem_tracker()->limit(); + bytes_limit = ExecEnv::GetInstance()->process_mem_tracker_raw()->limit(); } // we do not use global query-map for now, to avoid mem-exceeded different fragments diff --git a/be/src/runtime/thread_context.h b/be/src/runtime/thread_context.h index 5f524996ae..c5a4566c73 100644 --- a/be/src/runtime/thread_context.h +++ b/be/src/runtime/thread_context.h @@ -136,7 +136,7 @@ public: const std::shared_ptr<MemTrackerLimiter>& mem_tracker) { DCHECK((_type == TaskType::UNKNOWN || _type == TaskType::BRPC) && _task_id == "") << ",new tracker label: " << mem_tracker->label() << ",old tracker label: " - << _thread_mem_tracker_mgr->limiter_mem_tracker()->label(); + << _thread_mem_tracker_mgr->limiter_mem_tracker_raw()->label(); DCHECK(type != TaskType::UNKNOWN); _type = type; _task_id = task_id; @@ -256,15 +256,15 @@ public: doris::thread_context()->_thread_mem_tracker_mgr->consume(size) #define RELEASE_THREAD_MEM_TRACKER(size) \ doris::thread_context()->_thread_mem_tracker_mgr->consume(-size) -#define THREAD_MEM_TRACKER_TRANSFER_TO(size, tracker) \ - doris::thread_context()->_thread_mem_tracker_mgr->limiter_mem_tracker()->transfer_to(size, \ - tracker) +#define THREAD_MEM_TRACKER_TRANSFER_TO(size, tracker) \ + doris::thread_context()->_thread_mem_tracker_mgr->limiter_mem_tracker_raw()->transfer_to( \ + size, tracker) #define THREAD_MEM_TRACKER_TRANSFER_FROM(size, tracker) \ tracker->transfer_to( \ - size, doris::thread_context()->_thread_mem_tracker_mgr->limiter_mem_tracker().get()) + size, doris::thread_context()->_thread_mem_tracker_mgr->limiter_mem_tracker_raw()) #define RETURN_LIMIT_EXCEEDED(state, msg, ...) \ return doris::thread_context() \ - ->_thread_mem_tracker_mgr->limiter_mem_tracker() \ + ->_thread_mem_tracker_mgr->limiter_mem_tracker_raw() \ ->mem_limit_exceeded(state, fmt::format("exec node:<{}>, {}", "", msg), \ ##__VA_ARGS__); } // namespace doris diff --git a/be/src/service/doris_main.cpp b/be/src/service/doris_main.cpp index d80b1bc504..37d08a2a38 100644 --- a/be/src/service/doris_main.cpp +++ b/be/src/service/doris_main.cpp @@ -462,7 +462,7 @@ int main(int argc, char** argv) { // 1s clear the expired task mem tracker, a query mem tracker is about 57 bytes. doris::ExecEnv::GetInstance()->task_pool_mem_tracker_registry()->logout_task_mem_tracker(); - doris::ExecEnv::GetInstance()->new_process_mem_tracker()->enable_print_log_usage(); + doris::ExecEnv::GetInstance()->process_mem_tracker_raw()->enable_print_log_usage(); sleep(1); } diff --git a/be/src/vec/common/pod_array.h b/be/src/vec/common/pod_array.h index 5d2961ccea..5deb16d106 100644 --- a/be/src/vec/common/pod_array.h +++ b/be/src/vec/common/pod_array.h @@ -113,9 +113,9 @@ protected: } inline void reset_peak() { - if (UNLIKELY(c_end - c_end_peak > 4096)) { + if (UNLIKELY(c_end - c_end_peak > 65536)) { THREAD_MEM_TRACKER_TRANSFER_FROM(c_end - c_end_peak, - ExecEnv::GetInstance()->new_process_mem_tracker().get()); + ExecEnv::GetInstance()->process_mem_tracker_raw()); c_end_peak = c_end; } } @@ -127,7 +127,7 @@ protected: template <typename... TAllocatorParams> void alloc(size_t bytes, TAllocatorParams&&... allocator_params) { THREAD_MEM_TRACKER_TRANSFER_TO(bytes - pad_right - pad_left, - ExecEnv::GetInstance()->new_process_mem_tracker().get()); + ExecEnv::GetInstance()->process_mem_tracker_raw()); c_start = c_end = c_end_peak = reinterpret_cast<char*>(TAllocator::alloc( bytes, std::forward<TAllocatorParams>(allocator_params)...)) + @@ -144,7 +144,7 @@ protected: TAllocator::free(c_start - pad_left, allocated_bytes()); THREAD_MEM_TRACKER_TRANSFER_FROM(c_end_of_storage - c_end_peak, - ExecEnv::GetInstance()->new_process_mem_tracker().get()); + ExecEnv::GetInstance()->process_mem_tracker_raw()); } template <typename... TAllocatorParams> @@ -157,7 +157,7 @@ protected: unprotect(); THREAD_MEM_TRACKER_TRANSFER_TO(bytes - allocated_bytes(), - ExecEnv::GetInstance()->new_process_mem_tracker().get()); + ExecEnv::GetInstance()->process_mem_tracker_raw()); ptrdiff_t end_diff = c_end - c_start; --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org For additional commands, e-mail: commits-h...@doris.apache.org