github-actions[bot] commented on code in PR #27759:
URL: https://github.com/apache/doris/pull/27759#discussion_r1411734996


##########
be/src/olap/memtable_memory_limiter.cpp:
##########
@@ -59,174 +59,129 @@ void 
MemTableMemoryLimiter::register_writer(std::weak_ptr<MemTableWriter> writer
     _writers.push_back(writer);
 }
 
+int64_t MemTableMemoryLimiter::_avail_mem_lack() {
+    // reserve a small amount of memory so we do not trigger MinorGC
+    auto reserved_mem = doris::MemInfo::sys_mem_available_low_water_mark();
+    auto avail_mem_lack =
+            doris::MemInfo::sys_mem_available_warning_water_mark() - 
MemInfo::sys_mem_available();
+    return avail_mem_lack + reserved_mem;
+}
+
+int64_t MemTableMemoryLimiter::_proc_mem_extra() {
+    // reserve a small amount of memory so we do not trigger MinorGC
+    auto reserved_mem = doris::MemInfo::sys_mem_available_low_water_mark();
+    auto proc_mem_extra = MemInfo::proc_mem_no_allocator_cache() - 
MemInfo::soft_mem_limit();
+    return proc_mem_extra + reserved_mem;
+}
+
+bool MemTableMemoryLimiter::_soft_limit_reached() {
+    return _mem_tracker->consumption() >= _load_soft_mem_limit || 
_hard_limit_reached();
+}
+
+bool MemTableMemoryLimiter::_hard_limit_reached() {
+    return _mem_tracker->consumption() >= _load_hard_mem_limit || 
_avail_mem_lack() >= 0 ||
+           _proc_mem_extra() >= 0;
+}
+
 void MemTableMemoryLimiter::handle_memtable_flush() {
     // Check the soft limit.
     DCHECK(_load_soft_mem_limit > 0);
-    // Record current memory status.
-    int64_t process_soft_mem_limit = MemInfo::soft_mem_limit();
-    int64_t proc_mem_no_allocator_cache = 
MemInfo::proc_mem_no_allocator_cache();
-#ifndef BE_TEST
-    // If process memory is almost full but data load don't consume more than 
5% (50% * 10%) of
-    // total memory, we don't need to flush memtable.
-    bool reduce_on_process_soft_mem_limit =
-            proc_mem_no_allocator_cache >= process_soft_mem_limit &&
-            _mem_tracker->consumption() >= _load_hard_mem_limit / 10;
-    if (_mem_tracker->consumption() < _load_soft_mem_limit && 
!reduce_on_process_soft_mem_limit) {
+    if (!_soft_limit_reached()) {
         return;
     }
-#endif
-    // Indicate whether current thread is reducing mem on hard limit.
-    bool reducing_mem_on_hard_limit = false;
-    Status st;
-    std::vector<WriterMemItem> writers_to_reduce_mem;
     {
         MonotonicStopWatch timer;
         timer.start();
         std::unique_lock<std::mutex> l(_lock);
-        while (_should_wait_flush) {
-            _wait_flush_cond.wait(l);
-        }
-        LOG(INFO) << "Reached the one tenth of load hard limit " << 
_load_hard_mem_limit / 10
-                  << "and process remaining allocator cache " << 
proc_mem_no_allocator_cache
-                  << "reached process soft memory limit " << 
process_soft_mem_limit
-                  << ", waited for flush, time_ns:" << timer.elapsed_time();
-#ifndef BE_TEST
-        bool hard_limit_reached = _mem_tracker->consumption() >= 
_load_hard_mem_limit ||
-                                  proc_mem_no_allocator_cache >= 
process_soft_mem_limit;
-        // Some other thread is flushing data, and not reached hard limit now,
-        // we don't need to handle mem limit in current thread.
-        if (_soft_reduce_mem_in_progress && !hard_limit_reached) {
-            return;
-        }
-#endif
-
-        auto cmp = [](WriterMemItem& lhs, WriterMemItem& rhs) {
-            return lhs.mem_size < rhs.mem_size;
-        };
-        std::priority_queue<WriterMemItem, std::vector<WriterMemItem>, 
decltype(cmp)> mem_heap(cmp);
-
-        for (auto it = _writers.begin(); it != _writers.end();) {
-            if (auto writer = it->lock()) {
-                int64_t active_memtable_mem = 
writer->active_memtable_mem_consumption();
-                mem_heap.emplace(writer, active_memtable_mem);
-                ++it;
-            } else {
-                *it = std::move(_writers.back());
-                _writers.pop_back();
-            }
-        }
-        int64_t mem_to_flushed = _mem_tracker->consumption() / 10;
-        int64_t mem_consumption_in_picked_writer = 0;
-        while (!mem_heap.empty()) {
-            WriterMemItem mem_item = mem_heap.top();
-            mem_heap.pop();
-            auto writer = mem_item.writer.lock();
-            if (!writer) {
-                continue;
+        while (_hard_limit_reached()) {
+            LOG(INFO) << "reached memtable memory hard limit"
+                      << " (active: " << 
PrettyPrinter::print_bytes(_active_mem_usage)
+                      << ", write: " << 
PrettyPrinter::print_bytes(_write_mem_usage)
+                      << ", flush: " << 
PrettyPrinter::print_bytes(_flush_mem_usage) << ")";
+            if (_active_mem_usage >= _write_mem_usage * 0.99) {
+                _flush_active_memtables();
             }
-            int64_t mem_size = mem_item.mem_size;
-            writers_to_reduce_mem.emplace_back(writer, mem_size);
-            st = writer->flush_async();
-            if (!st.ok()) {
-                auto err_msg = fmt::format(
-                        "tablet writer failed to reduce mem consumption by 
flushing memtable, "
-                        "tablet_id={}, err={}",
-                        writer->tablet_id(), st.to_string());
-                LOG(WARNING) << err_msg;
-                static_cast<void>(writer->cancel_with_status(st));
+            auto st = _hard_limit_end_cond.wait_for(l, 
std::chrono::milliseconds(1000));
+            if (st == std::cv_status::timeout) {
+                LOG(INFO) << "timeout when waiting for memory hard limit end, 
try again";
             }
-            mem_consumption_in_picked_writer += mem_size;
-            if (mem_consumption_in_picked_writer > mem_to_flushed) {
-                break;
-            }
-        }
-        if (writers_to_reduce_mem.empty()) {
-            // should not happen, add log to observe
-            LOG(WARNING) << "failed to find suitable writers to reduce memory"
-                         << " when total load mem limit exceed";
-            return;
         }
-
-        std::ostringstream oss;
-        oss << "reducing memory of " << writers_to_reduce_mem.size()
-            << " memtable writers (total mem: "
-            << PrettyPrinter::print_bytes(mem_consumption_in_picked_writer)
-            << ", max mem: " << 
PrettyPrinter::print_bytes(writers_to_reduce_mem.front().mem_size)
-            << ", min mem:" << 
PrettyPrinter::print_bytes(writers_to_reduce_mem.back().mem_size)
-            << "), ";
-        if (proc_mem_no_allocator_cache < process_soft_mem_limit) {
-            oss << "because total load mem consumption "
-                << PrettyPrinter::print_bytes(_mem_tracker->consumption()) << 
" has exceeded";
-            if (_mem_tracker->consumption() > _load_hard_mem_limit) {
-                _should_wait_flush = true;
-                reducing_mem_on_hard_limit = true;
-                oss << " hard limit: " << 
PrettyPrinter::print_bytes(_load_hard_mem_limit);
-            } else {
-                _soft_reduce_mem_in_progress = true;
-                oss << " soft limit: " << 
PrettyPrinter::print_bytes(_load_soft_mem_limit);
+        if (_soft_limit_reached()) {
+            LOG(INFO) << "reached memtable memory soft limit"
+                      << " (active: " << 
PrettyPrinter::print_bytes(_active_mem_usage)
+                      << ", write: " << 
PrettyPrinter::print_bytes(_write_mem_usage)
+                      << ", flush: " << 
PrettyPrinter::print_bytes(_flush_mem_usage) << ")";
+            if (_active_mem_usage >= _write_mem_usage * 0.99) {
+                _flush_active_memtables();
             }
-        } else {
-            _should_wait_flush = true;
-            reducing_mem_on_hard_limit = true;
-            oss << "because proc_mem_no_allocator_cache consumption "
-                << PrettyPrinter::print_bytes(proc_mem_no_allocator_cache)
-                << ", has exceeded process soft limit "
-                << PrettyPrinter::print_bytes(process_soft_mem_limit)
-                << ", total load mem consumption: "
-                << PrettyPrinter::print_bytes(_mem_tracker->consumption())
-                << ", vm_rss: " << PerfCounters::get_vm_rss_str();
         }
-        LOG(INFO) << oss.str();
+        timer.stop();
+        int64_t time_ms = timer.elapsed_time() / 1000 / 1000;
+        LOG(INFO) << "waited " << time_ms << " ms for memtable memory limit";
     }
+}
 
-    // wait all writers flush without lock
-    for (auto item : writers_to_reduce_mem) {
-        VLOG_NOTICE << "reducing memory, wait flush mem_size: "
-                    << PrettyPrinter::print_bytes(item.mem_size);
-        auto writer = item.writer.lock();
-        if (!writer) {
-            continue;
-        }
-        st = writer->wait_flush();
-        if (!st.ok()) {
-            auto err_msg = fmt::format(
-                    "tablet writer failed to reduce mem consumption by 
flushing memtable, "
-                    "tablet_id={}, err={}",
-                    writer->tablet_id(), st.to_string());
-            LOG(WARNING) << err_msg;
-            static_cast<void>(writer->cancel_with_status(st));
-        }
+void MemTableMemoryLimiter::_flush_active_memtables() {

Review Comment:
   warning: method '_flush_active_memtables' can be made const 
[readability-make-member-function-const]
   
   be/src/olap/memtable_memory_limiter.h:56:
   ```diff
   -     void _flush_active_memtables();
   +     void _flush_active_memtables() const;
   ```
   
   ```suggestion
   void MemTableMemoryLimiter::_flush_active_memtables() const {
   ```
   



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org
For additional commands, e-mail: commits-h...@doris.apache.org

Reply via email to