This is an automated email from the ASF dual-hosted git repository.

dataroaring pushed a commit to branch master
in repository

The following commit(s) were added to refs/heads/master by this push:
     new 7682c08af0 [improvement](load) reduce memory in batch for small load 
channels (#14214)
7682c08af0 is described below

commit 7682c08af0b8e8619a099cf8a682238d2cd6568e
Author: zhannngchen <>
AuthorDate: Sat Nov 12 22:14:01 2022 +0800

    [improvement](load) reduce memory in batch for small load channels (#14214)
 be/src/runtime/load_channel_mgr.cpp |   6 +++
 be/src/runtime/load_channel_mgr.h   | 103 ++++++++++++++++++++++++++----------
 2 files changed, 80 insertions(+), 29 deletions(-)

diff --git a/be/src/runtime/load_channel_mgr.cpp 
index 6eef349fe1..c81ace2487 100644
--- a/be/src/runtime/load_channel_mgr.cpp
+++ b/be/src/runtime/load_channel_mgr.cpp
@@ -70,6 +70,12 @@ LoadChannelMgr::~LoadChannelMgr() {
 Status LoadChannelMgr::init(int64_t process_mem_limit) {
     _load_hard_mem_limit = calc_process_max_load_memory(process_mem_limit);
     _load_soft_mem_limit = _load_hard_mem_limit * 
config::load_process_soft_mem_limit_percent / 100;
+    // If a load channel's memory consumption is no more than 10% of the hard 
limit, it's not
+    // worth to reduce memory on it. Since we only reduce 1/3 memory for one 
load channel,
+    // for a channel consume 10% of hard limit, we can only release about 3% 
memory each time,
+    // it's not quite helpfull to reduce memory pressure.
+    // In this case we need to pick multiple load channels to reduce memory 
more effectively.
+    _load_channel_min_mem_to_reduce = _load_hard_mem_limit * 0.1;
     _mem_tracker = std::make_unique<MemTracker>("LoadChannelMgr");
     _mem_tracker_set = 
diff --git a/be/src/runtime/load_channel_mgr.h 
index 3f27eafd0e..c1c86a7890 100644
--- a/be/src/runtime/load_channel_mgr.h
+++ b/be/src/runtime/load_channel_mgr.h
@@ -87,7 +87,6 @@ protected:
     std::mutex _lock;
     // load id -> load channel
     std::unordered_map<UniqueId, std::shared_ptr<LoadChannel>> _load_channels;
-    std::shared_ptr<LoadChannel> _reduce_memory_channel = nullptr;
     Cache* _last_success_channel = nullptr;
     // check the total load channel mem consumption of this Backend
@@ -96,6 +95,14 @@ protected:
     std::unique_ptr<MemTrackerLimiter> _mem_tracker_set;
     int64_t _load_hard_mem_limit = -1;
     int64_t _load_soft_mem_limit = -1;
+    // By default, we try to reduce memory on the load channel with largest 
mem consumption,
+    // but if there are lots of small load channel, even the largest one 
consumes very
+    // small memory, in this case we need to pick multiple load channels to 
reduce memory
+    // more effectively.
+    // `_load_channel_min_mem_to_reduce` is used to determine whether the 
largest load channel's
+    // memory consumption is big enough.
+    int64_t _load_channel_min_mem_to_reduce = -1;
+    bool _soft_reduce_mem_in_progress = false;
     // If hard limit reached, one thread will trigger load channel flush,
     // other threads should wait on the condition variable.
@@ -171,10 +178,9 @@ Status 
LoadChannelMgr::_handle_mem_exceed_limit(TabletWriterAddResult* response)
         MemInfo::proc_mem_no_allocator_cache() < process_mem_limit) {
         return Status::OK();
-    // Pick load channel to reduce memory.
-    std::shared_ptr<LoadChannel> channel;
     // Indicate whether current thread is reducing mem on hard limit.
     bool reducing_mem_on_hard_limit = false;
+    std::vector<std::shared_ptr<LoadChannel>> channels_to_reduce_mem;
         std::unique_lock<std::mutex> l(_lock);
         while (_should_wait_flush) {
@@ -182,41 +188,74 @@ Status 
LoadChannelMgr::_handle_mem_exceed_limit(TabletWriterAddResult* response)
                       << ", waiting for flush";
+        bool hard_limit_reached = _mem_tracker->consumption() >= 
_load_hard_mem_limit ||
+                                  MemInfo::proc_mem_no_allocator_cache() >= 
         // Some other thread is flushing data, and not reached hard limit now,
         // we don't need to handle mem limit in current thread.
-        if (_reduce_memory_channel != nullptr &&
-            _mem_tracker->consumption() < _load_hard_mem_limit &&
-            MemInfo::proc_mem_no_allocator_cache() < process_mem_limit) {
+        if (_soft_reduce_mem_in_progress && !hard_limit_reached) {
             return Status::OK();
-        // We need to pick a LoadChannel to reduce memory usage.
-        // If `_reduce_memory_channel` is not null, it means the hard limit is
-        // exceed now, we still need to pick a load channel again. Because
-        // `_reduce_memory_channel` might not be the largest consumer now.
-        int64_t max_consume = 0;
+        // Pick LoadChannels to reduce memory usage, if some other thread is 
reducing memory
+        // due to soft limit, and we reached hard limit now, current thread 
may pick some
+        // duplicate channels and trigger duplicate reducing memory process.
+        // But the load channel's reduce memory process is thread safe, only 1 
thread can
+        // reduce memory at the same time, other threads will wait on a 
condition variable,
+        // after the reduce-memory work finished, all threads will return.
+        using ChannelMemPair = std::pair<std::shared_ptr<LoadChannel>, 
+        std::vector<ChannelMemPair> candidate_channels;
+        int64_t total_consume = 0;
         for (auto& kv : _load_channels) {
             if (kv.second->is_high_priority()) {
                 // do not select high priority channel to reduce memory
                 // to avoid blocking them.
-            if (kv.second->mem_consumption() > max_consume) {
-                max_consume = kv.second->mem_consumption();
-                channel = kv.second;
-            }
+            int64_t mem = kv.second->mem_consumption();
+            // save the mem consumption, since the calculation might be 
+            candidate_channels.push_back(std::make_pair(kv.second, mem));
+            total_consume += mem;
-        if (max_consume == 0) {
+        if (candidate_channels.empty()) {
             // should not happen, add log to observe
-            LOG(WARNING) << "failed to find suitable load channel when total 
load mem limit exceed";
+            LOG(WARNING) << "All load channels are high priority, failed to 
find suitable"
+                         << "channels to reduce memory when total load mem 
limit exceed";
             return Status::OK();
-        DCHECK(channel.get() != nullptr);
-        _reduce_memory_channel = channel;
+        // sort all load channels, try to find the largest one.
+        std::sort(candidate_channels.begin(), candidate_channels.end(),
+                  [](const ChannelMemPair& lhs, const ChannelMemPair& rhs) {
+                      return lhs.second > rhs.second;
+                  });
+        int64_t mem_consumption_in_picked_channel = 0;
+        auto largest_channel = *candidate_channels.begin();
+        // If some load-channel is big enough, we can reduce it only, try our 
best to avoid
+        // reducing small load channels.
+        if (_load_channel_min_mem_to_reduce > 0 &&
+            largest_channel.second > _load_channel_min_mem_to_reduce) {
+            // Pick 1 load channel to reduce memory.
+            channels_to_reduce_mem.push_back(largest_channel.first);
+            mem_consumption_in_picked_channel = largest_channel.second;
+        } else {
+            // Pick multiple channels to reduce memory.
+            int64_t mem_to_flushed = total_consume / 3;
+            for (auto ch : candidate_channels) {
+                channels_to_reduce_mem.push_back(ch.first);
+                mem_consumption_in_picked_channel += ch.second;
+                if (mem_consumption_in_picked_channel >= mem_to_flushed) {
+                    break;
+                }
+            }
+        }
         std::ostringstream oss;
         if (MemInfo::proc_mem_no_allocator_cache() < process_mem_limit) {
-            oss << "reducing memory of " << *channel << " because total load 
mem consumption "
+            oss << "reducing memory of " << channels_to_reduce_mem.size()
+                << " load channels (total mem consumption: " << 
+                << " bytes), because total load mem consumption "
                 << PrettyPrinter::print(_mem_tracker->consumption(), 
                 << " has exceeded";
             if (_mem_tracker->consumption() > _load_hard_mem_limit) {
@@ -224,24 +263,30 @@ Status 
LoadChannelMgr::_handle_mem_exceed_limit(TabletWriterAddResult* response)
                 reducing_mem_on_hard_limit = true;
                 oss << " hard limit: " << 
PrettyPrinter::print(_load_hard_mem_limit, TUnit::BYTES);
             } else {
+                _soft_reduce_mem_in_progress = true;
                 oss << " soft limit: " << 
PrettyPrinter::print(_load_soft_mem_limit, TUnit::BYTES);
         } else {
             _should_wait_flush = true;
             reducing_mem_on_hard_limit = true;
-            oss << "reducing memory of " << *channel << " because process 
memory used "
-                << PerfCounters::get_vm_rss_str() << " has exceeded limit "
+            oss << "reducing memory of " << channels_to_reduce_mem.size()
+                << " load channels (total mem consumption: " << 
+                << " bytes), because " << PerfCounters::get_vm_rss_str() << " 
has exceeded limit "
                 << PrettyPrinter::print(process_mem_limit, TUnit::BYTES)
                 << " , tc/jemalloc allocator cache " << 
         LOG(INFO) << oss.str();
-    // No matter soft limit or hard limit reached, only 1 thread will wait 
-    // if hard limit reached, other threads will pend at the beginning of this
-    // method.
-    Status st = channel->handle_mem_exceed_limit(response);
-    LOG(INFO) << "reduce memory of " << *channel << " finished";
+    Status st = Status::OK();
+    for (auto ch : channels_to_reduce_mem) {
+        uint64_t begin = GetCurrentTimeMicros();
+        int64_t mem_usage = ch->mem_consumption();
+        st = ch->handle_mem_exceed_limit(response);
+        LOG(INFO) << "reduced memory of " << *ch << ", cost "
+                  << (GetCurrentTimeMicros() - begin) / 1000
+                  << " ms, released memory: " << mem_usage - 
ch->mem_consumption() << " bytes";
+    }
         std::lock_guard<std::mutex> l(_lock);
@@ -251,8 +296,8 @@ Status 
LoadChannelMgr::_handle_mem_exceed_limit(TabletWriterAddResult* response)
             _should_wait_flush = false;
-        if (_reduce_memory_channel == channel) {
-            _reduce_memory_channel = nullptr;
+        if (_soft_reduce_mem_in_progress) {
+            _soft_reduce_mem_in_progress = false;
     return st;

To unsubscribe, e-mail:
For additional commands, e-mail:

Reply via email to