This is an automated email from the ASF dual-hosted git repository. yiguolei pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/master by this push: new 9ca2b836bfe [Improvement](executor)Refactor Workload group memory GC (#33797) 9ca2b836bfe is described below commit 9ca2b836bfe7bbadb8ab4a942bf0b45d18401bdf Author: wangbo <wan...@apache.org> AuthorDate: Tue Apr 30 19:33:41 2024 +0800 [Improvement](executor)Refactor Workload group memory GC (#33797) * just gc group's overcommit query when minor gc * add process usage --- be/src/common/config.cpp | 2 + be/src/common/config.h | 2 + be/src/runtime/workload_group/workload_group.cpp | 52 ++++++++++++++++-------- be/src/runtime/workload_group/workload_group.h | 2 +- be/src/util/mem_info.cpp | 32 +++++++++------ be/src/util/mem_info.h | 2 +- 6 files changed, 60 insertions(+), 32 deletions(-) diff --git a/be/src/common/config.cpp b/be/src/common/config.cpp index 28d0e7e6f7d..e5686adf76c 100644 --- a/be/src/common/config.cpp +++ b/be/src/common/config.cpp @@ -1135,6 +1135,8 @@ DEFINE_Bool(enable_flush_file_cache_async, "true"); DEFINE_mString(doris_cgroup_cpu_path, ""); DEFINE_mBool(enable_cgroup_cpu_soft_limit, "true"); +DEFINE_mBool(enable_workload_group_memory_gc, "true"); + DEFINE_Bool(ignore_always_true_predicate_for_segment, "true"); // Dir of default timezone files diff --git a/be/src/common/config.h b/be/src/common/config.h index 3ab7ea16cd7..0e4300c3c97 100644 --- a/be/src/common/config.h +++ b/be/src/common/config.h @@ -1214,6 +1214,8 @@ DECLARE_mBool(exit_on_exception); DECLARE_mString(doris_cgroup_cpu_path); DECLARE_mBool(enable_cgroup_cpu_soft_limit); +DECLARE_mBool(enable_workload_group_memory_gc); + // This config controls whether the s3 file writer would flush cache asynchronously DECLARE_Bool(enable_flush_file_cache_async); diff --git a/be/src/runtime/workload_group/workload_group.cpp b/be/src/runtime/workload_group/workload_group.cpp index 05e38b973c6..88c8880b060 100644 --- a/be/src/runtime/workload_group/workload_group.cpp +++ b/be/src/runtime/workload_group/workload_group.cpp @@ -142,30 +142,47 @@ void WorkloadGroup::remove_mem_tracker_limiter(std::shared_ptr<MemTrackerLimiter } } -int64_t WorkloadGroup::gc_memory(int64_t need_free_mem, RuntimeProfile* profile) { +int64_t WorkloadGroup::gc_memory(int64_t need_free_mem, RuntimeProfile* profile, bool is_minor_gc) { if (need_free_mem <= 0) { return 0; } int64_t used_memory = memory_used(); int64_t freed_mem = 0; - std::string cancel_str = fmt::format( - "work load group memory exceeded limit, group id:{}, name:{}, used:{}, limit:{}, " - "backend:{}.", - _id, _name, MemTracker::print_bytes(used_memory), - MemTracker::print_bytes(_memory_limit), BackendOptions::get_localhost()); - auto cancel_top_overcommit_str = [cancel_str](int64_t mem_consumption, - const std::string& label) { + std::string cancel_str = ""; + if (is_minor_gc) { + cancel_str = fmt::format( + "MinorGC kill overcommit query, wg id:{}, name:{}, used:{}, limit:{}, " + "backend:{}.", + _id, _name, MemTracker::print_bytes(used_memory), + MemTracker::print_bytes(_memory_limit), BackendOptions::get_localhost()); + } else { + if (_enable_memory_overcommit) { + cancel_str = fmt::format( + "FullGC release wg overcommit mem, wg id:{}, name:{}, " + "used:{},limit:{},backend:{}.", + _id, _name, MemTracker::print_bytes(used_memory), + MemTracker::print_bytes(_memory_limit), BackendOptions::get_localhost()); + } else { + cancel_str = fmt::format( + "GC wg for hard limit, wg id:{}, name:{}, used:{}, limit:{}, " + "backend:{}.", + _id, _name, MemTracker::print_bytes(used_memory), + MemTracker::print_bytes(_memory_limit), BackendOptions::get_localhost()); + } + } + std::string process_mem_usage_str = MemTrackerLimiter::process_mem_log_str(); + auto cancel_top_overcommit_str = [cancel_str, process_mem_usage_str](int64_t mem_consumption, + const std::string& label) { return fmt::format( - "{} cancel top memory overcommit tracker <{}> consumption {}. execute again after " - "enough memory, details see be.INFO.", - cancel_str, label, MemTracker::print_bytes(mem_consumption)); + "{} cancel top memory overcommit tracker <{}> consumption {}. details:{}", + cancel_str, label, MemTracker::print_bytes(mem_consumption), process_mem_usage_str); }; - auto cancel_top_usage_str = [cancel_str](int64_t mem_consumption, const std::string& label) { - return fmt::format( - "{} cancel top memory used tracker <{}> consumption {}. execute again after " - "enough memory, details see be.INFO.", - cancel_str, label, MemTracker::print_bytes(mem_consumption)); + auto cancel_top_usage_str = [cancel_str, process_mem_usage_str](int64_t mem_consumption, + const std::string& label) { + return fmt::format("{} cancel top memory used tracker <{}> consumption {}. details:{}", + cancel_str, label, MemTracker::print_bytes(mem_consumption), + process_mem_usage_str); }; LOG(INFO) << fmt::format( @@ -188,7 +205,8 @@ int64_t WorkloadGroup::gc_memory(int64_t need_free_mem, RuntimeProfile* profile) _mem_tracker_limiter_pool, cancel_top_overcommit_str, tmq_profile, MemTrackerLimiter::GCType::WORK_LOAD_GROUP); } - if (freed_mem >= need_free_mem) { + // To be compatible with the non-group's gc logic, minorGC just gc overcommit query + if (is_minor_gc || freed_mem >= need_free_mem) { return freed_mem; } diff --git a/be/src/runtime/workload_group/workload_group.h b/be/src/runtime/workload_group/workload_group.h index 54fe3bd3157..8386d778aec 100644 --- a/be/src/runtime/workload_group/workload_group.h +++ b/be/src/runtime/workload_group/workload_group.h @@ -145,7 +145,7 @@ public: return _query_ctxs.size(); } - int64_t gc_memory(int64_t need_free_mem, RuntimeProfile* profile); + int64_t gc_memory(int64_t need_free_mem, RuntimeProfile* profile, bool is_minor_gc); void upsert_task_scheduler(WorkloadGroupInfo* tg_info, ExecEnv* exec_env); diff --git a/be/src/util/mem_info.cpp b/be/src/util/mem_info.cpp index a36acddc139..75c68cce4d1 100644 --- a/be/src/util/mem_info.cpp +++ b/be/src/util/mem_info.cpp @@ -147,10 +147,13 @@ bool MemInfo::process_minor_gc() { return true; } - RuntimeProfile* tg_profile = profile->create_child("WorkloadGroup", true, true); - freed_mem += tg_enable_overcommit_group_gc(_s_process_minor_gc_size - freed_mem, tg_profile); - if (freed_mem > _s_process_minor_gc_size) { - return true; + if (config::enable_workload_group_memory_gc) { + RuntimeProfile* tg_profile = profile->create_child("WorkloadGroup", true, true); + freed_mem += tg_enable_overcommit_group_gc(_s_process_minor_gc_size - freed_mem, tg_profile, + true); + if (freed_mem > _s_process_minor_gc_size) { + return true; + } } if (config::enable_query_memory_overcommit) { @@ -198,10 +201,13 @@ bool MemInfo::process_full_gc() { return true; } - RuntimeProfile* tg_profile = profile->create_child("WorkloadGroup", true, true); - freed_mem += tg_enable_overcommit_group_gc(_s_process_full_gc_size - freed_mem, tg_profile); - if (freed_mem > _s_process_full_gc_size) { - return true; + if (config::enable_workload_group_memory_gc) { + RuntimeProfile* tg_profile = profile->create_child("WorkloadGroup", true, true); + freed_mem += tg_enable_overcommit_group_gc(_s_process_full_gc_size - freed_mem, tg_profile, + false); + if (freed_mem > _s_process_full_gc_size) { + return true; + } } VLOG_NOTICE << MemTrackerLimiter::type_detail_usage( @@ -286,14 +292,14 @@ int64_t MemInfo::tg_not_enable_overcommit_group_gc() { for (const auto& workload_group : task_groups_overcommit) { auto used = workload_group->memory_used(); - total_free_memory += - workload_group->gc_memory(used - workload_group->memory_limit(), tg_profile.get()); + total_free_memory += workload_group->gc_memory(used - workload_group->memory_limit(), + tg_profile.get(), false); } return total_free_memory; } -int64_t MemInfo::tg_enable_overcommit_group_gc(int64_t request_free_memory, - RuntimeProfile* profile) { +int64_t MemInfo::tg_enable_overcommit_group_gc(int64_t request_free_memory, RuntimeProfile* profile, + bool is_minor_gc) { MonotonicStopWatch watch; watch.start(); std::vector<WorkloadGroupPtr> task_groups; @@ -359,7 +365,7 @@ int64_t MemInfo::tg_enable_overcommit_group_gc(int64_t request_free_memory, : static_cast<double>(exceeded_memorys[i]) / total_exceeded_memory * request_free_memory); // exceeded memory as a weight auto workload_group = task_groups[i]; - total_free_memory += workload_group->gc_memory(tg_need_free_memory, profile); + total_free_memory += workload_group->gc_memory(tg_need_free_memory, profile, is_minor_gc); } return total_free_memory; } diff --git a/be/src/util/mem_info.h b/be/src/util/mem_info.h index 8d702ddf065..c2bb8b209be 100644 --- a/be/src/util/mem_info.h +++ b/be/src/util/mem_info.h @@ -195,7 +195,7 @@ public: static int64_t tg_not_enable_overcommit_group_gc(); static int64_t tg_enable_overcommit_group_gc(int64_t request_free_memory, - RuntimeProfile* profile); + RuntimeProfile* profile, bool is_minor_gc); // It is only used after the memory limit is exceeded. When multiple threads are waiting for the available memory of the process, // avoid multiple threads starting at the same time and causing OOM. --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org For additional commands, e-mail: commits-h...@doris.apache.org