This is an automated email from the ASF dual-hosted git repository. kxiao pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/master by this push: new 84b97860a1 [fix](memory) Fix memory exceed limit and query has been canceled, Allocator will block 100ms (#20959) 84b97860a1 is described below commit 84b97860a103f684cf892c7151fa3879ef9a7fbb Author: Xinyi Zou <zouxiny...@gmail.com> AuthorDate: Wed Jun 21 17:35:19 2023 +0800 [fix](memory) Fix memory exceed limit and query has been canceled, Allocator will block 100ms (#20959) --- be/src/runtime/memory/mem_tracker_limiter.cpp | 4 ++- be/src/vec/common/allocator.cpp | 39 ++++++++++++++++++--------- 2 files changed, 30 insertions(+), 13 deletions(-) diff --git a/be/src/runtime/memory/mem_tracker_limiter.cpp b/be/src/runtime/memory/mem_tracker_limiter.cpp index 24a79bc1b6..ded5b0c0b8 100644 --- a/be/src/runtime/memory/mem_tracker_limiter.cpp +++ b/be/src/runtime/memory/mem_tracker_limiter.cpp @@ -437,7 +437,9 @@ int64_t MemTrackerLimiter::free_top_overcommit_query( std::lock_guard<std::mutex> l(tracker_groups[i].group_lock); for (auto tracker : tracker_groups[i].trackers) { if (tracker->type() == type) { - if (tracker->consumption() <= 33554432) { // 32M small query does not cancel + // 32M small query does not cancel + if (tracker->consumption() <= 33554432 || + tracker->consumption() < tracker->limit()) { continue; } if (tracker->is_query_cancelled()) { diff --git a/be/src/vec/common/allocator.cpp b/be/src/vec/common/allocator.cpp index b74ed398d4..366fc48ce2 100644 --- a/be/src/vec/common/allocator.cpp +++ b/be/src/vec/common/allocator.cpp @@ -31,6 +31,7 @@ #include "runtime/memory/mem_tracker_limiter.h" #include "runtime/memory/thread_mem_tracker_mgr.h" #include "runtime/thread_context.h" +#include "util/defer_op.h" #include "util/mem_info.h" #include "util/uid_util.h" @@ -47,12 +48,22 @@ void Allocator<clear_memory_, mmap_populate, use_mmap>::sys_memory_check(size_t size, doris::thread_context()->thread_mem_tracker()->label(), doris::thread_context()->thread_mem_tracker_mgr->last_consumer_tracker(), doris::MemTrackerLimiter::process_limit_exceeded_errmsg_str()); + + // TODO, Save the query context in the thread context, instead of finding whether the query id is canceled in fragment_mgr. + if (doris::ExecEnv::GetInstance()->fragment_mgr()->query_is_canceled( + doris::thread_context()->task_id())) { + if (doris::enable_thread_catch_bad_alloc) { + throw doris::Exception(doris::ErrorCode::MEM_ALLOC_FAILED, err_msg); + } + return; + } if (doris::thread_context()->thread_mem_tracker_mgr->is_attach_query() && doris::thread_context()->thread_mem_tracker_mgr->wait_gc()) { - int64_t wait_milliseconds = doris::config::thread_wait_gc_max_milliseconds; - LOG(INFO) << fmt::format("Query:{} waiting for enough memory, maximum 5s, {}.", - print_id(doris::thread_context()->task_id()), err_msg); - while (wait_milliseconds > 0) { + int64_t wait_milliseconds = 0; + LOG(INFO) << fmt::format("Query:{} waiting for enough memory, maximum {}ms, {}.", + print_id(doris::thread_context()->task_id()), + doris::config::thread_wait_gc_max_milliseconds, err_msg); + while (wait_milliseconds < doris::config::thread_wait_gc_max_milliseconds) { std::this_thread::sleep_for(std::chrono::milliseconds(100)); if (!doris::MemTrackerLimiter::sys_mem_exceed_limit_check(size)) { doris::MemInfo::refresh_interval_memory_growth += size; @@ -60,25 +71,29 @@ void Allocator<clear_memory_, mmap_populate, use_mmap>::sys_memory_check(size_t } if (doris::ExecEnv::GetInstance()->fragment_mgr()->query_is_canceled( doris::thread_context()->task_id())) { - wait_milliseconds = 0; - break; + if (doris::enable_thread_catch_bad_alloc) { + throw doris::Exception(doris::ErrorCode::MEM_ALLOC_FAILED, err_msg); + } + return; } - wait_milliseconds -= 100; + wait_milliseconds += 100; } - if (wait_milliseconds <= 0) { + if (wait_milliseconds >= doris::config::thread_wait_gc_max_milliseconds) { // Make sure to completely wait thread_wait_gc_max_milliseconds only once. doris::thread_context()->thread_mem_tracker_mgr->disable_wait_gc(); doris::MemTrackerLimiter::print_log_process_usage(err_msg); // If the external catch, throw bad::alloc first, let the query actively cancel. Otherwise asynchronous cancel. if (!doris::enable_thread_catch_bad_alloc) { LOG(INFO) << fmt::format( - "Query:{} canceled asyn, after waiting for memory 5s, {}.", - print_id(doris::thread_context()->task_id()), err_msg); + "Query:{} canceled asyn, after waiting for memory {}ms, {}.", + print_id(doris::thread_context()->task_id()), wait_milliseconds, + err_msg); doris::thread_context()->thread_mem_tracker_mgr->cancel_fragment(err_msg); } else { LOG(INFO) << fmt::format( - "Query:{} throw exception, after waiting for memory 5s, {}.", - print_id(doris::thread_context()->task_id()), err_msg); + "Query:{} throw exception, after waiting for memory {}ms, {}.", + print_id(doris::thread_context()->task_id()), wait_milliseconds, + err_msg); throw doris::Exception(doris::ErrorCode::MEM_ALLOC_FAILED, err_msg); } } --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org For additional commands, e-mail: commits-h...@doris.apache.org