This is an automated email from the ASF dual-hosted git repository. yiguolei pushed a commit to branch dev-1.1.2 in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/dev-1.1.2 by this push: new 3e2ae923f7 [dev-1.1.2](cherry-pick) Optimize readability of mem exceed limit error message #11943 3e2ae923f7 is described below commit 3e2ae923f7aa5f4d3f3e4dc9c64a9982a15c764d Author: Xinyi Zou <zouxiny...@gmail.com> AuthorDate: Mon Aug 22 08:46:34 2022 +0800 [dev-1.1.2](cherry-pick) Optimize readability of mem exceed limit error message #11943 --- be/src/runtime/memory/mem_tracker.cpp | 39 ++++++------- be/src/runtime/memory/mem_tracker.h | 8 +-- be/src/runtime/memory/mem_tracker_limiter.cpp | 70 +++++++++++++++++++----- be/src/runtime/memory/mem_tracker_limiter.h | 29 +++++++--- be/src/runtime/memory/mem_tracker_task_pool.cpp | 4 +- be/src/runtime/memory/thread_mem_tracker_mgr.cpp | 16 ++---- be/src/runtime/memory/thread_mem_tracker_mgr.h | 4 +- be/src/runtime/runtime_state.cpp | 4 +- be/src/runtime/thread_context.h | 9 +-- be/src/service/doris_main.cpp | 1 + 10 files changed, 116 insertions(+), 68 deletions(-) diff --git a/be/src/runtime/memory/mem_tracker.cpp b/be/src/runtime/memory/mem_tracker.cpp index dda2c43102..c4801d6611 100644 --- a/be/src/runtime/memory/mem_tracker.cpp +++ b/be/src/runtime/memory/mem_tracker.cpp @@ -42,7 +42,7 @@ struct TrackerGroup { // Multiple groups are used to reduce the impact of locks. static std::vector<TrackerGroup> mem_tracker_pool(1000); -NewMemTracker::NewMemTracker(const std::string& label, RuntimeProfile* profile, bool is_limiter) { +NewMemTracker::NewMemTracker(const std::string& label, RuntimeProfile* profile) { if (profile == nullptr) { _consumption = std::make_shared<RuntimeProfile::HighWaterMarkCounter>(TUnit::BYTES); } else { @@ -58,30 +58,24 @@ NewMemTracker::NewMemTracker(const std::string& label, RuntimeProfile* profile, _consumption = profile->AddSharedHighWaterMarkCounter(COUNTER_NAME, TUnit::BYTES); } - _is_limiter = is_limiter; - if (!_is_limiter) { - if (thread_context()->_thread_mem_tracker_mgr->limiter_mem_tracker()) { - _label = fmt::format( - "{} | {}", label, - thread_context()->_thread_mem_tracker_mgr->limiter_mem_tracker()->label()); - } else { - _label = label + " | "; - } - - _bind_group_num = - thread_context()->_thread_mem_tracker_mgr->limiter_mem_tracker()->group_num(); - { - std::lock_guard<std::mutex> l(mem_tracker_pool[_bind_group_num].group_lock); - _tracker_group_it = mem_tracker_pool[_bind_group_num].trackers.insert( - mem_tracker_pool[_bind_group_num].trackers.end(), this); - } + if (thread_context()->_thread_mem_tracker_mgr->limiter_mem_tracker()) { + _label = fmt::format( + "{} | {}", label, + thread_context()->_thread_mem_tracker_mgr->limiter_mem_tracker()->label()); } else { - _label = label; + _label = label + " | "; + } + + _bind_group_num = thread_context()->_thread_mem_tracker_mgr->limiter_mem_tracker()->group_num(); + { + std::lock_guard<std::mutex> l(mem_tracker_pool[_bind_group_num].group_lock); + _tracker_group_it = mem_tracker_pool[_bind_group_num].trackers.insert( + mem_tracker_pool[_bind_group_num].trackers.end(), this); } } NewMemTracker::~NewMemTracker() { - if (!_is_limiter) { + if (_bind_group_num != -1) { std::lock_guard<std::mutex> l(mem_tracker_pool[_bind_group_num].group_lock); if (_tracker_group_it != mem_tracker_pool[_bind_group_num].trackers.end()) { mem_tracker_pool[_bind_group_num].trackers.erase(_tracker_group_it); @@ -102,8 +96,9 @@ NewMemTracker::Snapshot NewMemTracker::make_snapshot(size_t level) const { return snapshot; } -void NewMemTracker::make_group_snapshot(std::vector<NewMemTracker::Snapshot>* snapshots, size_t level, - int64_t group_num, std::string related_label) { +void NewMemTracker::make_group_snapshot(std::vector<NewMemTracker::Snapshot>* snapshots, + size_t level, int64_t group_num, + std::string related_label) { std::lock_guard<std::mutex> l(mem_tracker_pool[group_num].group_lock); for (auto tracker : mem_tracker_pool[group_num].trackers) { if (split(tracker->label(), " | ")[1] == related_label) { diff --git a/be/src/runtime/memory/mem_tracker.h b/be/src/runtime/memory/mem_tracker.h index 41d2fe4d1c..258f244aae 100644 --- a/be/src/runtime/memory/mem_tracker.h +++ b/be/src/runtime/memory/mem_tracker.h @@ -44,8 +44,9 @@ public: }; // Creates and adds the tracker to the mem_tracker_pool. - NewMemTracker(const std::string& label = std::string(), RuntimeProfile* profile = nullptr, - bool is_limiter = false); + NewMemTracker(const std::string& label, RuntimeProfile* profile = nullptr); + // For MemTrackerLimiter + NewMemTracker() { _bind_group_num = -1; } ~NewMemTracker(); @@ -93,9 +94,6 @@ protected: // Tracker is located in group num in mem_tracker_pool int64_t _bind_group_num; - // Whether is a MemTrackerLimiter - bool _is_limiter; - // Iterator into mem_tracker_pool for this object. Stored to have O(1) remove. std::list<NewMemTracker*>::iterator _tracker_group_it; }; diff --git a/be/src/runtime/memory/mem_tracker_limiter.cpp b/be/src/runtime/memory/mem_tracker_limiter.cpp index dc7789bfe1..3f9a1786d4 100644 --- a/be/src/runtime/memory/mem_tracker_limiter.cpp +++ b/be/src/runtime/memory/mem_tracker_limiter.cpp @@ -32,9 +32,14 @@ namespace doris { MemTrackerLimiter::MemTrackerLimiter(int64_t byte_limit, const std::string& label, const std::shared_ptr<MemTrackerLimiter>& parent, - RuntimeProfile* profile) - : NewMemTracker(label, profile, true) { + RuntimeProfile* profile) { DCHECK_GE(byte_limit, -1); + if (profile == nullptr) { + _consumption = std::make_shared<RuntimeProfile::HighWaterMarkCounter>(TUnit::BYTES); + } else { + _consumption = profile->AddSharedHighWaterMarkCounter(COUNTER_NAME, TUnit::BYTES); + } + _label = label; _limit = byte_limit; _group_num = GetCurrentTimeMicros() % 1000; _parent = parent ? parent : thread_context()->_thread_mem_tracker_mgr->limiter_mem_tracker(); @@ -143,9 +148,9 @@ bool MemTrackerLimiter::gc_memory(int64_t max_consumption) { Status MemTrackerLimiter::try_gc_memory(int64_t bytes) { if (UNLIKELY(gc_memory(_limit - bytes))) { - return Status::MemoryLimitExceeded( - fmt::format("label={}, limit={}, used={}, failed consume size={}", label(), _limit, - _consumption->current_value(), bytes)); + return Status::MemoryLimitExceeded(fmt::format( + "need_size={}, exceeded_tracker={}, limit={}, peak_used={}, current_used={}", bytes, + label(), _limit, _consumption->value(), _consumption->current_value())); } VLOG_NOTICE << "GC succeeded, TryConsume bytes=" << bytes << " consumption=" << _consumption->current_value() << " limit=" << _limit; @@ -218,20 +223,19 @@ std::string MemTrackerLimiter::log_usage(int max_recursive_depth, return join(usage_strings, "\n"); } -Status MemTrackerLimiter::mem_limit_exceeded(const std::string& msg, int64_t failed_consume_size) { - STOP_CHECK_THREAD_MEM_TRACKER_LIMIT(); +Status MemTrackerLimiter::mem_limit_exceeded_log(const std::string& msg) { + DCHECK(_limit != -1); std::string detail = fmt::format( - "{}, failed mem consume:<consume_size={}, mem_limit={}, mem_used={}, tracker_label={}, " - "in backend={} free memory left={}. details mem usage see be.INFO.", - msg, PrettyPrinter::print(failed_consume_size, TUnit::BYTES), _limit, - _consumption->current_value(), _label, BackendOptions::get_localhost(), - PrettyPrinter::print(ExecEnv::GetInstance()->new_process_mem_tracker()->spare_capacity(), - TUnit::BYTES)); + "{}, backend={} memory used={}, free memory left={}. If is query, can change the limit " + "by `set exec_mem_limit=xxx`, details mem usage see be.INFO.", + msg, BackendOptions::get_localhost(), + PrettyPrinter::print(PerfCounters::get_vm_rss(), TUnit::BYTES), + PrettyPrinter::print(MemInfo::mem_limit() - PerfCounters::get_vm_rss(), TUnit::BYTES)); Status status = Status::MemoryLimitExceeded(detail); // only print the tracker log_usage in be log. if (_print_log_usage) { - if (ExecEnv::GetInstance()->new_process_mem_tracker()->spare_capacity() < failed_consume_size) { + if (_label == "Process") { // Dumping the process MemTracker is expensive. Limiting the recursive depth to two // levels limits the level of detail to a one-line summary for each query MemTracker. detail += "\n" + ExecEnv::GetInstance()->new_process_mem_tracker()->log_usage(2); @@ -245,6 +249,44 @@ Status MemTrackerLimiter::mem_limit_exceeded(const std::string& msg, int64_t fai return status; } +Status MemTrackerLimiter::mem_limit_exceeded(const std::string& msg, int64_t failed_consume_size) { + STOP_CHECK_THREAD_MEM_TRACKER_LIMIT(); + DCHECK(!_limited_ancestors.empty()); + for (const auto& tracker : _limited_ancestors) { + if (tracker->has_limit() && + tracker->limit() < tracker->peak_consumption() + failed_consume_size) { + std::string detail; + if (failed_consume_size != 0) { + detail = fmt::format( + "memory limit exceeded:<consumed_tracker={}, need_size={}, " + "exceeded_tracker={}, limit={}, peak_used={}, current_used={}>, " + "executing:<{}>", + _label, PrettyPrinter::print(failed_consume_size, TUnit::BYTES), + tracker->label(), tracker->limit(), tracker->peak_consumption(), + tracker->consumption(), msg); + } else { + detail = fmt::format( + "memory limit exceeded:<exceeded_tracker={}, limit={}, peak_used={}, " + "current_used={}>, executing:<{}>", + tracker->label(), tracker->limit(), tracker->peak_consumption(), + tracker->consumption(), msg); + } + return tracker->mem_limit_exceeded_log(detail); + } + } + return Status::MemoryLimitExceeded("no mem tracker exceed limit"); +} + +Status MemTrackerLimiter::mem_limit_exceeded(const std::string& msg, + MemTrackerLimiter* failed_tracker, + Status failed_try_consume_st) { + STOP_CHECK_THREAD_MEM_TRACKER_LIMIT(); + std::string detail = + fmt::format("memory limit exceeded:<consumed_tracker={}, {}>, executing:<{}>", _label, + failed_try_consume_st.get_error_msg(), msg); + return failed_tracker->mem_limit_exceeded_log(detail); +} + Status MemTrackerLimiter::mem_limit_exceeded(RuntimeState* state, const std::string& msg, int64_t failed_alloc_size) { Status rt = mem_limit_exceeded(msg, failed_alloc_size); diff --git a/be/src/runtime/memory/mem_tracker_limiter.h b/be/src/runtime/memory/mem_tracker_limiter.h index 1dcd655ea4..767e4fdafb 100644 --- a/be/src/runtime/memory/mem_tracker_limiter.h +++ b/be/src/runtime/memory/mem_tracker_limiter.h @@ -122,13 +122,15 @@ public: Status try_gc_memory(int64_t bytes); public: + // up to (but not including) end_tracker. + // This happens when we want to update tracking on a particular mem tracker but the consumption + // against the limit recorded in one of its ancestors already happened. // It is used for revise mem tracker consumption. // If the location of memory alloc and free is different, the consumption value of mem tracker will be inaccurate. // But the consumption value of the process mem tracker is not affecte - void consumption_revise(int64_t bytes) { - DCHECK(_label != "Process"); - _consumption->add(bytes); - } + void consume_local(int64_t bytes, MemTrackerLimiter* end_tracker); + + void enable_print_log_usage() { _print_log_usage = true; } // Logs the usage of this tracker limiter and optionally its children (recursively). // If 'logged_consumption' is non-nullptr, sets the consumption value logged. @@ -143,9 +145,11 @@ public: // If 'failed_allocation_size' is greater than zero, logs the allocation size. If // 'failed_allocation_size' is zero, nothing about the allocation size is logged. // If 'state' is non-nullptr, logs the error to 'state'. - Status mem_limit_exceeded(const std::string& msg, int64_t failed_consume_size); - Status mem_limit_exceeded(RuntimeState* state, const std::string& msg = std::string(), - int64_t failed_consume_size = -1); + Status mem_limit_exceeded(const std::string& msg, int64_t failed_consume_size = 0); + Status mem_limit_exceeded(const std::string& msg, MemTrackerLimiter* failed_tracker, + Status failed_try_consume_st); + Status mem_limit_exceeded(RuntimeState* state, const std::string& msg, + int64_t failed_consume_size = 0); std::string debug_string() { std::stringstream msg; @@ -186,6 +190,8 @@ private: const std::list<MemTrackerLimiter*>& trackers, int64_t* logged_consumption); + Status mem_limit_exceeded_log(const std::string& msg); + private: // Limit on memory consumption, in bytes. If limit_ == -1, there is no consumption limit. Used in log_usage。 int64_t _limit; @@ -253,6 +259,15 @@ inline void MemTrackerLimiter::consume_cache(int64_t bytes) { } } +inline void MemTrackerLimiter::consume_local(int64_t bytes, MemTrackerLimiter* end_tracker) { + DCHECK(end_tracker); + if (bytes == 0) return; + for (auto& tracker : _all_ancestors) { + if (tracker->label() == end_tracker->label()) return; + tracker->_consumption->add(bytes); + } +} + inline Status MemTrackerLimiter::try_consume(int64_t bytes) { if (bytes <= 0) { release(-bytes); diff --git a/be/src/runtime/memory/mem_tracker_task_pool.cpp b/be/src/runtime/memory/mem_tracker_task_pool.cpp index 064fba6930..c080261056 100644 --- a/be/src/runtime/memory/mem_tracker_task_pool.cpp +++ b/be/src/runtime/memory/mem_tracker_task_pool.cpp @@ -93,7 +93,9 @@ void MemTrackerTaskPool::logout_task_mem_tracker() { // In order to ensure that the query pool mem tracker is the sum of all currently running query mem trackers, // the effect of the ended query mem tracker on the query pool mem tracker should be cleared, that is, // the negative number of the current value of consume. - it->second->parent()->consumption_revise(-it->second->consumption()); + it->second->parent()->consume_local( + -it->second->consumption(), + ExecEnv::GetInstance()->new_process_mem_tracker().get()); LOG(INFO) << fmt::format( "Deregister query/load memory tracker, queryId={}, Limit={}, PeakUsed={}", it->first, it->second->limit(), it->second->peak_consumption()); diff --git a/be/src/runtime/memory/thread_mem_tracker_mgr.cpp b/be/src/runtime/memory/thread_mem_tracker_mgr.cpp index aea89acb3b..27e66d3fb8 100644 --- a/be/src/runtime/memory/thread_mem_tracker_mgr.cpp +++ b/be/src/runtime/memory/thread_mem_tracker_mgr.cpp @@ -49,21 +49,15 @@ void ThreadMemTrackerMgr::exceeded_cancel_task(const std::string& cancel_details } } -void ThreadMemTrackerMgr::exceeded(int64_t failed_consume_size) { +void ThreadMemTrackerMgr::exceeded(Status failed_try_consume_st) { if (_cb_func != nullptr) { _cb_func(); } if (is_attach_query()) { - std::string cancel_msg; - if (!_consumer_tracker_stack.empty()) { - cancel_msg = fmt::format( - "exec node:<name={}>, can change the limit by `set exec_mem_limit=xxx`", - _consumer_tracker_stack[-1]->label()); - } else { - cancel_msg = "exec node:unknown, can change the limit by `set exec_mem_limit=xxx`"; - } - auto st = _limiter_tracker->mem_limit_exceeded(cancel_msg, failed_consume_size); - exceeded_cancel_task(st.to_string()); + auto st = _limiter_tracker->mem_limit_exceeded(fmt::format("exec node:<{}>", ""), + _limiter_tracker->parent().get(), + failed_try_consume_st); + exceeded_cancel_task(st.get_error_msg()); _check_limit = false; // Make sure it will only be canceled once } } diff --git a/be/src/runtime/memory/thread_mem_tracker_mgr.h b/be/src/runtime/memory/thread_mem_tracker_mgr.h index 0ef8eaf646..449a1e070c 100644 --- a/be/src/runtime/memory/thread_mem_tracker_mgr.h +++ b/be/src/runtime/memory/thread_mem_tracker_mgr.h @@ -112,7 +112,7 @@ private: // If tryConsume fails due to task mem tracker exceeding the limit, the task must be canceled void exceeded_cancel_task(const std::string& cancel_details); - void exceeded(int64_t failed_consume_size); + void exceeded(Status failed_try_consume_st); private: // Cache untracked mem, only update to _untracked_mems when switching mem tracker. @@ -191,7 +191,7 @@ inline void ThreadMemTrackerMgr::flush_untracked_mem() { // The memory has been allocated, so when TryConsume fails, need to continue to complete // the consume to ensure the accuracy of the statistics. _limiter_tracker->consume(_untracked_mem); - exceeded(_untracked_mem); + exceeded(st); } } else { _limiter_tracker->consume(_untracked_mem); diff --git a/be/src/runtime/runtime_state.cpp b/be/src/runtime/runtime_state.cpp index 2107c74d65..791cdd213e 100644 --- a/be/src/runtime/runtime_state.cpp +++ b/be/src/runtime/runtime_state.cpp @@ -246,8 +246,8 @@ Status RuntimeState::init_mem_trackers(const TUniqueId& query_id) { } _new_instance_mem_tracker = std::make_shared<MemTrackerLimiter>( - bytes_limit, "RuntimeState:instance:" + print_id(_fragment_instance_id), - _new_query_mem_tracker); + -1, "RuntimeState:instance:" + print_id(_fragment_instance_id), + _new_query_mem_tracker, &_profile); /* // TODO: this is a stopgap until we implement ExprContext diff --git a/be/src/runtime/thread_context.h b/be/src/runtime/thread_context.h index e45fe0da78..2f752516c1 100644 --- a/be/src/runtime/thread_context.h +++ b/be/src/runtime/thread_context.h @@ -260,8 +260,9 @@ public: doris::thread_context()->_thread_mem_tracker_mgr->transfer_to(size, tracker) #define THREAD_MEM_TRACKER_TRANSFER_FROM(size, tracker) \ doris::thread_context()->_thread_mem_tracker_mgr->transfer_from(size, tracker) -#define RETURN_LIMIT_EXCEEDED(state, msg, ...) \ - return doris::thread_context() \ - ->_thread_mem_tracker_mgr->limiter_mem_tracker() \ - ->mem_limit_exceeded(state, msg, ##__VA_ARGS__); +#define RETURN_LIMIT_EXCEEDED(state, msg, ...) \ + return doris::thread_context() \ + ->_thread_mem_tracker_mgr->limiter_mem_tracker() \ + ->mem_limit_exceeded(state, fmt::format("exec node:<{}>, {}", "", msg), \ + ##__VA_ARGS__); } // namespace doris diff --git a/be/src/service/doris_main.cpp b/be/src/service/doris_main.cpp index 571911b45d..d80b1bc504 100644 --- a/be/src/service/doris_main.cpp +++ b/be/src/service/doris_main.cpp @@ -462,6 +462,7 @@ int main(int argc, char** argv) { // 1s clear the expired task mem tracker, a query mem tracker is about 57 bytes. doris::ExecEnv::GetInstance()->task_pool_mem_tracker_registry()->logout_task_mem_tracker(); + doris::ExecEnv::GetInstance()->new_process_mem_tracker()->enable_print_log_usage(); sleep(1); } --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org For additional commands, e-mail: commits-h...@doris.apache.org