This is an automated email from the ASF dual-hosted git repository.

yiguolei pushed a commit to branch branch-2.1
in repository https://gitbox.apache.org/repos/asf/doris.git


The following commit(s) were added to refs/heads/branch-2.1 by this push:
     new 8abd136ba2b [Improvement](executor)Refactor Workload group memory GC 
(#33797)
8abd136ba2b is described below

commit 8abd136ba2b64f3188f92536e02e3de9ae720d33
Author: wangbo <wan...@apache.org>
AuthorDate: Tue Apr 30 19:33:41 2024 +0800

    [Improvement](executor)Refactor Workload group memory GC (#33797)
    
    * just gc group's overcommit query when minor gc
    
    * add process usage
---
 be/src/common/config.cpp                         |  2 +
 be/src/common/config.h                           |  2 +
 be/src/runtime/workload_group/workload_group.cpp | 52 ++++++++++++++++--------
 be/src/runtime/workload_group/workload_group.h   |  2 +-
 be/src/util/mem_info.cpp                         | 32 +++++++++------
 be/src/util/mem_info.h                           |  2 +-
 6 files changed, 60 insertions(+), 32 deletions(-)

diff --git a/be/src/common/config.cpp b/be/src/common/config.cpp
index a14ad0e7700..2e0818c6f51 100644
--- a/be/src/common/config.cpp
+++ b/be/src/common/config.cpp
@@ -1124,6 +1124,8 @@ DEFINE_Bool(enable_flush_file_cache_async, "true");
 DEFINE_mString(doris_cgroup_cpu_path, "");
 DEFINE_mBool(enable_cgroup_cpu_soft_limit, "true");
 
+DEFINE_mBool(enable_workload_group_memory_gc, "true");
+
 DEFINE_Bool(ignore_always_true_predicate_for_segment, "true");
 
 // Dir of default timezone files
diff --git a/be/src/common/config.h b/be/src/common/config.h
index e94934ae728..02d55ed81e2 100644
--- a/be/src/common/config.h
+++ b/be/src/common/config.h
@@ -1196,6 +1196,8 @@ DECLARE_mBool(exit_on_exception);
 DECLARE_mString(doris_cgroup_cpu_path);
 DECLARE_mBool(enable_cgroup_cpu_soft_limit);
 
+DECLARE_mBool(enable_workload_group_memory_gc);
+
 // This config controls whether the s3 file writer would flush cache 
asynchronously
 DECLARE_Bool(enable_flush_file_cache_async);
 
diff --git a/be/src/runtime/workload_group/workload_group.cpp 
b/be/src/runtime/workload_group/workload_group.cpp
index c82346f040e..01622204fd3 100644
--- a/be/src/runtime/workload_group/workload_group.cpp
+++ b/be/src/runtime/workload_group/workload_group.cpp
@@ -142,30 +142,47 @@ void 
WorkloadGroup::remove_mem_tracker_limiter(std::shared_ptr<MemTrackerLimiter
     }
 }
 
-int64_t WorkloadGroup::gc_memory(int64_t need_free_mem, RuntimeProfile* 
profile) {
+int64_t WorkloadGroup::gc_memory(int64_t need_free_mem, RuntimeProfile* 
profile, bool is_minor_gc) {
     if (need_free_mem <= 0) {
         return 0;
     }
     int64_t used_memory = memory_used();
     int64_t freed_mem = 0;
 
-    std::string cancel_str = fmt::format(
-            "work load group memory exceeded limit, group id:{}, name:{}, 
used:{}, limit:{}, "
-            "backend:{}.",
-            _id, _name, MemTracker::print_bytes(used_memory),
-            MemTracker::print_bytes(_memory_limit), 
BackendOptions::get_localhost());
-    auto cancel_top_overcommit_str = [cancel_str](int64_t mem_consumption,
-                                                  const std::string& label) {
+    std::string cancel_str = "";
+    if (is_minor_gc) {
+        cancel_str = fmt::format(
+                "MinorGC kill overcommit query, wg id:{}, name:{}, used:{}, 
limit:{}, "
+                "backend:{}.",
+                _id, _name, MemTracker::print_bytes(used_memory),
+                MemTracker::print_bytes(_memory_limit), 
BackendOptions::get_localhost());
+    } else {
+        if (_enable_memory_overcommit) {
+            cancel_str = fmt::format(
+                    "FullGC release wg overcommit mem, wg id:{}, name:{}, "
+                    "used:{},limit:{},backend:{}.",
+                    _id, _name, MemTracker::print_bytes(used_memory),
+                    MemTracker::print_bytes(_memory_limit), 
BackendOptions::get_localhost());
+        } else {
+            cancel_str = fmt::format(
+                    "GC wg for hard limit, wg id:{}, name:{}, used:{}, 
limit:{}, "
+                    "backend:{}.",
+                    _id, _name, MemTracker::print_bytes(used_memory),
+                    MemTracker::print_bytes(_memory_limit), 
BackendOptions::get_localhost());
+        }
+    }
+    std::string process_mem_usage_str = 
MemTrackerLimiter::process_mem_log_str();
+    auto cancel_top_overcommit_str = [cancel_str, 
process_mem_usage_str](int64_t mem_consumption,
+                                                                         const 
std::string& label) {
         return fmt::format(
-                "{} cancel top memory overcommit tracker <{}> consumption {}. 
execute again after "
-                "enough memory, details see be.INFO.",
-                cancel_str, label, MemTracker::print_bytes(mem_consumption));
+                "{} cancel top memory overcommit tracker <{}> consumption {}. 
details:{}",
+                cancel_str, label, MemTracker::print_bytes(mem_consumption), 
process_mem_usage_str);
     };
-    auto cancel_top_usage_str = [cancel_str](int64_t mem_consumption, const 
std::string& label) {
-        return fmt::format(
-                "{} cancel top memory used tracker <{}> consumption {}. 
execute again after "
-                "enough memory, details see be.INFO.",
-                cancel_str, label, MemTracker::print_bytes(mem_consumption));
+    auto cancel_top_usage_str = [cancel_str, process_mem_usage_str](int64_t 
mem_consumption,
+                                                                    const 
std::string& label) {
+        return fmt::format("{} cancel top memory used tracker <{}> consumption 
{}. details:{}",
+                           cancel_str, label, 
MemTracker::print_bytes(mem_consumption),
+                           process_mem_usage_str);
     };
 
     LOG(INFO) << fmt::format(
@@ -188,7 +205,8 @@ int64_t WorkloadGroup::gc_memory(int64_t need_free_mem, 
RuntimeProfile* profile)
                 _mem_tracker_limiter_pool, cancel_top_overcommit_str, 
tmq_profile,
                 MemTrackerLimiter::GCType::WORK_LOAD_GROUP);
     }
-    if (freed_mem >= need_free_mem) {
+    // To be compatible with the non-group's gc logic, minorGC just gc 
overcommit query
+    if (is_minor_gc || freed_mem >= need_free_mem) {
         return freed_mem;
     }
 
diff --git a/be/src/runtime/workload_group/workload_group.h 
b/be/src/runtime/workload_group/workload_group.h
index d4ef689766a..cee35c66af8 100644
--- a/be/src/runtime/workload_group/workload_group.h
+++ b/be/src/runtime/workload_group/workload_group.h
@@ -146,7 +146,7 @@ public:
         return _query_ctxs.size();
     }
 
-    int64_t gc_memory(int64_t need_free_mem, RuntimeProfile* profile);
+    int64_t gc_memory(int64_t need_free_mem, RuntimeProfile* profile, bool 
is_minor_gc);
 
     void upsert_task_scheduler(WorkloadGroupInfo* tg_info, ExecEnv* exec_env);
 
diff --git a/be/src/util/mem_info.cpp b/be/src/util/mem_info.cpp
index a36acddc139..75c68cce4d1 100644
--- a/be/src/util/mem_info.cpp
+++ b/be/src/util/mem_info.cpp
@@ -147,10 +147,13 @@ bool MemInfo::process_minor_gc() {
         return true;
     }
 
-    RuntimeProfile* tg_profile = profile->create_child("WorkloadGroup", true, 
true);
-    freed_mem += tg_enable_overcommit_group_gc(_s_process_minor_gc_size - 
freed_mem, tg_profile);
-    if (freed_mem > _s_process_minor_gc_size) {
-        return true;
+    if (config::enable_workload_group_memory_gc) {
+        RuntimeProfile* tg_profile = profile->create_child("WorkloadGroup", 
true, true);
+        freed_mem += tg_enable_overcommit_group_gc(_s_process_minor_gc_size - 
freed_mem, tg_profile,
+                                                   true);
+        if (freed_mem > _s_process_minor_gc_size) {
+            return true;
+        }
     }
 
     if (config::enable_query_memory_overcommit) {
@@ -198,10 +201,13 @@ bool MemInfo::process_full_gc() {
         return true;
     }
 
-    RuntimeProfile* tg_profile = profile->create_child("WorkloadGroup", true, 
true);
-    freed_mem += tg_enable_overcommit_group_gc(_s_process_full_gc_size - 
freed_mem, tg_profile);
-    if (freed_mem > _s_process_full_gc_size) {
-        return true;
+    if (config::enable_workload_group_memory_gc) {
+        RuntimeProfile* tg_profile = profile->create_child("WorkloadGroup", 
true, true);
+        freed_mem += tg_enable_overcommit_group_gc(_s_process_full_gc_size - 
freed_mem, tg_profile,
+                                                   false);
+        if (freed_mem > _s_process_full_gc_size) {
+            return true;
+        }
     }
 
     VLOG_NOTICE << MemTrackerLimiter::type_detail_usage(
@@ -286,14 +292,14 @@ int64_t MemInfo::tg_not_enable_overcommit_group_gc() {
 
     for (const auto& workload_group : task_groups_overcommit) {
         auto used = workload_group->memory_used();
-        total_free_memory +=
-                workload_group->gc_memory(used - 
workload_group->memory_limit(), tg_profile.get());
+        total_free_memory += workload_group->gc_memory(used - 
workload_group->memory_limit(),
+                                                       tg_profile.get(), 
false);
     }
     return total_free_memory;
 }
 
-int64_t MemInfo::tg_enable_overcommit_group_gc(int64_t request_free_memory,
-                                               RuntimeProfile* profile) {
+int64_t MemInfo::tg_enable_overcommit_group_gc(int64_t request_free_memory, 
RuntimeProfile* profile,
+                                               bool is_minor_gc) {
     MonotonicStopWatch watch;
     watch.start();
     std::vector<WorkloadGroupPtr> task_groups;
@@ -359,7 +365,7 @@ int64_t MemInfo::tg_enable_overcommit_group_gc(int64_t 
request_free_memory,
                                 : static_cast<double>(exceeded_memorys[i]) / 
total_exceeded_memory *
                                           request_free_memory); // exceeded 
memory as a weight
         auto workload_group = task_groups[i];
-        total_free_memory += workload_group->gc_memory(tg_need_free_memory, 
profile);
+        total_free_memory += workload_group->gc_memory(tg_need_free_memory, 
profile, is_minor_gc);
     }
     return total_free_memory;
 }
diff --git a/be/src/util/mem_info.h b/be/src/util/mem_info.h
index 8d702ddf065..c2bb8b209be 100644
--- a/be/src/util/mem_info.h
+++ b/be/src/util/mem_info.h
@@ -195,7 +195,7 @@ public:
 
     static int64_t tg_not_enable_overcommit_group_gc();
     static int64_t tg_enable_overcommit_group_gc(int64_t request_free_memory,
-                                                 RuntimeProfile* profile);
+                                                 RuntimeProfile* profile, bool 
is_minor_gc);
 
     // It is only used after the memory limit is exceeded. When multiple 
threads are waiting for the available memory of the process,
     // avoid multiple threads starting at the same time and causing OOM.


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org
For additional commands, e-mail: commits-h...@doris.apache.org

Reply via email to