This is an automated email from the ASF dual-hosted git repository. dataroaring pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/master by this push: new 7682c08af0 [improvement](load) reduce memory in batch for small load channels (#14214) 7682c08af0 is described below commit 7682c08af0b8e8619a099cf8a682238d2cd6568e Author: zhannngchen <48427519+zhannngc...@users.noreply.github.com> AuthorDate: Sat Nov 12 22:14:01 2022 +0800 [improvement](load) reduce memory in batch for small load channels (#14214) --- be/src/runtime/load_channel_mgr.cpp | 6 +++ be/src/runtime/load_channel_mgr.h | 103 ++++++++++++++++++++++++++---------- 2 files changed, 80 insertions(+), 29 deletions(-) diff --git a/be/src/runtime/load_channel_mgr.cpp b/be/src/runtime/load_channel_mgr.cpp index 6eef349fe1..c81ace2487 100644 --- a/be/src/runtime/load_channel_mgr.cpp +++ b/be/src/runtime/load_channel_mgr.cpp @@ -70,6 +70,12 @@ LoadChannelMgr::~LoadChannelMgr() { Status LoadChannelMgr::init(int64_t process_mem_limit) { _load_hard_mem_limit = calc_process_max_load_memory(process_mem_limit); _load_soft_mem_limit = _load_hard_mem_limit * config::load_process_soft_mem_limit_percent / 100; + // If a load channel's memory consumption is no more than 10% of the hard limit, it's not + // worth to reduce memory on it. Since we only reduce 1/3 memory for one load channel, + // for a channel consume 10% of hard limit, we can only release about 3% memory each time, + // it's not quite helpfull to reduce memory pressure. + // In this case we need to pick multiple load channels to reduce memory more effectively. + _load_channel_min_mem_to_reduce = _load_hard_mem_limit * 0.1; _mem_tracker = std::make_unique<MemTracker>("LoadChannelMgr"); _mem_tracker_set = std::make_unique<MemTrackerLimiter>(MemTrackerLimiter::Type::LOAD, "LoadChannelMgrTrackerSet"); diff --git a/be/src/runtime/load_channel_mgr.h b/be/src/runtime/load_channel_mgr.h index 3f27eafd0e..c1c86a7890 100644 --- a/be/src/runtime/load_channel_mgr.h +++ b/be/src/runtime/load_channel_mgr.h @@ -87,7 +87,6 @@ protected: std::mutex _lock; // load id -> load channel std::unordered_map<UniqueId, std::shared_ptr<LoadChannel>> _load_channels; - std::shared_ptr<LoadChannel> _reduce_memory_channel = nullptr; Cache* _last_success_channel = nullptr; // check the total load channel mem consumption of this Backend @@ -96,6 +95,14 @@ protected: std::unique_ptr<MemTrackerLimiter> _mem_tracker_set; int64_t _load_hard_mem_limit = -1; int64_t _load_soft_mem_limit = -1; + // By default, we try to reduce memory on the load channel with largest mem consumption, + // but if there are lots of small load channel, even the largest one consumes very + // small memory, in this case we need to pick multiple load channels to reduce memory + // more effectively. + // `_load_channel_min_mem_to_reduce` is used to determine whether the largest load channel's + // memory consumption is big enough. + int64_t _load_channel_min_mem_to_reduce = -1; + bool _soft_reduce_mem_in_progress = false; // If hard limit reached, one thread will trigger load channel flush, // other threads should wait on the condition variable. @@ -171,10 +178,9 @@ Status LoadChannelMgr::_handle_mem_exceed_limit(TabletWriterAddResult* response) MemInfo::proc_mem_no_allocator_cache() < process_mem_limit) { return Status::OK(); } - // Pick load channel to reduce memory. - std::shared_ptr<LoadChannel> channel; // Indicate whether current thread is reducing mem on hard limit. bool reducing_mem_on_hard_limit = false; + std::vector<std::shared_ptr<LoadChannel>> channels_to_reduce_mem; { std::unique_lock<std::mutex> l(_lock); while (_should_wait_flush) { @@ -182,41 +188,74 @@ Status LoadChannelMgr::_handle_mem_exceed_limit(TabletWriterAddResult* response) << ", waiting for flush"; _wait_flush_cond.wait(l); } + bool hard_limit_reached = _mem_tracker->consumption() >= _load_hard_mem_limit || + MemInfo::proc_mem_no_allocator_cache() >= process_mem_limit; // Some other thread is flushing data, and not reached hard limit now, // we don't need to handle mem limit in current thread. - if (_reduce_memory_channel != nullptr && - _mem_tracker->consumption() < _load_hard_mem_limit && - MemInfo::proc_mem_no_allocator_cache() < process_mem_limit) { + if (_soft_reduce_mem_in_progress && !hard_limit_reached) { return Status::OK(); } - // We need to pick a LoadChannel to reduce memory usage. - // If `_reduce_memory_channel` is not null, it means the hard limit is - // exceed now, we still need to pick a load channel again. Because - // `_reduce_memory_channel` might not be the largest consumer now. - int64_t max_consume = 0; + // Pick LoadChannels to reduce memory usage, if some other thread is reducing memory + // due to soft limit, and we reached hard limit now, current thread may pick some + // duplicate channels and trigger duplicate reducing memory process. + // But the load channel's reduce memory process is thread safe, only 1 thread can + // reduce memory at the same time, other threads will wait on a condition variable, + // after the reduce-memory work finished, all threads will return. + using ChannelMemPair = std::pair<std::shared_ptr<LoadChannel>, int64_t>; + std::vector<ChannelMemPair> candidate_channels; + int64_t total_consume = 0; for (auto& kv : _load_channels) { if (kv.second->is_high_priority()) { // do not select high priority channel to reduce memory // to avoid blocking them. continue; } - if (kv.second->mem_consumption() > max_consume) { - max_consume = kv.second->mem_consumption(); - channel = kv.second; - } + int64_t mem = kv.second->mem_consumption(); + // save the mem consumption, since the calculation might be expensive. + candidate_channels.push_back(std::make_pair(kv.second, mem)); + total_consume += mem; } - if (max_consume == 0) { + + if (candidate_channels.empty()) { // should not happen, add log to observe - LOG(WARNING) << "failed to find suitable load channel when total load mem limit exceed"; + LOG(WARNING) << "All load channels are high priority, failed to find suitable" + << "channels to reduce memory when total load mem limit exceed"; return Status::OK(); } - DCHECK(channel.get() != nullptr); - _reduce_memory_channel = channel; + + // sort all load channels, try to find the largest one. + std::sort(candidate_channels.begin(), candidate_channels.end(), + [](const ChannelMemPair& lhs, const ChannelMemPair& rhs) { + return lhs.second > rhs.second; + }); + + int64_t mem_consumption_in_picked_channel = 0; + auto largest_channel = *candidate_channels.begin(); + // If some load-channel is big enough, we can reduce it only, try our best to avoid + // reducing small load channels. + if (_load_channel_min_mem_to_reduce > 0 && + largest_channel.second > _load_channel_min_mem_to_reduce) { + // Pick 1 load channel to reduce memory. + channels_to_reduce_mem.push_back(largest_channel.first); + mem_consumption_in_picked_channel = largest_channel.second; + } else { + // Pick multiple channels to reduce memory. + int64_t mem_to_flushed = total_consume / 3; + for (auto ch : candidate_channels) { + channels_to_reduce_mem.push_back(ch.first); + mem_consumption_in_picked_channel += ch.second; + if (mem_consumption_in_picked_channel >= mem_to_flushed) { + break; + } + } + } std::ostringstream oss; if (MemInfo::proc_mem_no_allocator_cache() < process_mem_limit) { - oss << "reducing memory of " << *channel << " because total load mem consumption " + oss << "reducing memory of " << channels_to_reduce_mem.size() + << " load channels (total mem consumption: " << mem_consumption_in_picked_channel + << " bytes), because total load mem consumption " << PrettyPrinter::print(_mem_tracker->consumption(), TUnit::BYTES) << " has exceeded"; if (_mem_tracker->consumption() > _load_hard_mem_limit) { @@ -224,24 +263,30 @@ Status LoadChannelMgr::_handle_mem_exceed_limit(TabletWriterAddResult* response) reducing_mem_on_hard_limit = true; oss << " hard limit: " << PrettyPrinter::print(_load_hard_mem_limit, TUnit::BYTES); } else { + _soft_reduce_mem_in_progress = true; oss << " soft limit: " << PrettyPrinter::print(_load_soft_mem_limit, TUnit::BYTES); } } else { _should_wait_flush = true; reducing_mem_on_hard_limit = true; - oss << "reducing memory of " << *channel << " because process memory used " - << PerfCounters::get_vm_rss_str() << " has exceeded limit " + oss << "reducing memory of " << channels_to_reduce_mem.size() + << " load channels (total mem consumption: " << mem_consumption_in_picked_channel + << " bytes), because " << PerfCounters::get_vm_rss_str() << " has exceeded limit " << PrettyPrinter::print(process_mem_limit, TUnit::BYTES) << " , tc/jemalloc allocator cache " << MemInfo::allocator_cache_mem_str(); } LOG(INFO) << oss.str(); } - // No matter soft limit or hard limit reached, only 1 thread will wait here, - // if hard limit reached, other threads will pend at the beginning of this - // method. - Status st = channel->handle_mem_exceed_limit(response); - LOG(INFO) << "reduce memory of " << *channel << " finished"; + Status st = Status::OK(); + for (auto ch : channels_to_reduce_mem) { + uint64_t begin = GetCurrentTimeMicros(); + int64_t mem_usage = ch->mem_consumption(); + st = ch->handle_mem_exceed_limit(response); + LOG(INFO) << "reduced memory of " << *ch << ", cost " + << (GetCurrentTimeMicros() - begin) / 1000 + << " ms, released memory: " << mem_usage - ch->mem_consumption() << " bytes"; + } { std::lock_guard<std::mutex> l(_lock); @@ -251,8 +296,8 @@ Status LoadChannelMgr::_handle_mem_exceed_limit(TabletWriterAddResult* response) _should_wait_flush = false; _wait_flush_cond.notify_all(); } - if (_reduce_memory_channel == channel) { - _reduce_memory_channel = nullptr; + if (_soft_reduce_mem_in_progress) { + _soft_reduce_mem_in_progress = false; } } return st; --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org For additional commands, e-mail: commits-h...@doris.apache.org