This is an automated email from the ASF dual-hosted git repository. morningman pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/master by this push: new e9afd3210c [improvement](memory) Optimize the log of process memory insufficient and support regular GC cache (#16084) e9afd3210c is described below commit e9afd3210cd111fdff078778f8f82d9a13ae767f Author: Xinyi Zou <zouxiny...@gmail.com> AuthorDate: Sun Jan 29 10:02:04 2023 +0800 [improvement](memory) Optimize the log of process memory insufficient and support regular GC cache (#16084) 1. When the process memory is insufficient, print the process memory statistics in a more timely and detailed manner. 2. Support regular GC cache, currently only page cache and chunk allocator are included, because many people reported that the memory does not drop after the query ends. 3. Reduce system available memory warning water mark to reduce memory waste 4. Optimize soft mem limit logging --- be/src/common/config.h | 6 ++- be/src/common/daemon.cpp | 42 ++++++++++++++++----- be/src/runtime/memory/mem_tracker_limiter.cpp | 54 ++++++++++++++++++--------- be/src/runtime/memory/mem_tracker_limiter.h | 12 ++++-- be/src/util/mem_info.cpp | 50 ++++++++++++++----------- be/src/util/mem_info.h | 10 ++++- 6 files changed, 119 insertions(+), 55 deletions(-) diff --git a/be/src/common/config.h b/be/src/common/config.h index 1cf3807958..142731b537 100644 --- a/be/src/common/config.h +++ b/be/src/common/config.h @@ -74,6 +74,10 @@ CONF_Int64(max_sys_mem_available_low_water_mark_bytes, "1717986918"); // The size of the memory that gc wants to release each time, as a percentage of the mem limit. CONF_mString(process_minor_gc_size, "10%"); CONF_mString(process_full_gc_size, "20%"); +// Some caches have their own gc threads, such as segment cache. +// For caches that do not have a separate gc thread, perform regular gc in the memory maintenance thread. +// Currently only storage page cache, chunk allocator, more in the future. +CONF_mInt32(cache_gc_interval_s, "60"); // If true, when the process does not exceed the soft mem limit, the query memory will not be limited; // when the process memory exceeds the soft mem limit, the query with the largest ratio between the currently @@ -477,7 +481,7 @@ CONF_Bool(madvise_huge_pages, "false"); CONF_Bool(mmap_buffers, "false"); // Sleep time in milliseconds between memory maintenance iterations -CONF_mInt64(memory_maintenance_sleep_time_ms, "500"); +CONF_mInt32(memory_maintenance_sleep_time_ms, "500"); // Sleep time in milliseconds between load channel memory refresh iterations CONF_mInt64(load_channel_memory_refresh_sleep_time_ms, "100"); diff --git a/be/src/common/daemon.cpp b/be/src/common/daemon.cpp index 00dd7263c9..d6b6568e3b 100644 --- a/be/src/common/daemon.cpp +++ b/be/src/common/daemon.cpp @@ -165,7 +165,9 @@ void Daemon::tcmalloc_gc_thread() { } void Daemon::memory_maintenance_thread() { - int64_t interval_milliseconds = config::memory_maintenance_sleep_time_ms; + int32_t interval_milliseconds = config::memory_maintenance_sleep_time_ms; + int32_t cache_gc_interval_ms = config::cache_gc_interval_s * 1000; + int64_t cache_gc_freed_mem = 0; while (!_stop_background_threads_latch.wait_for( std::chrono::milliseconds(interval_milliseconds))) { if (!MemInfo::initialized()) { @@ -180,29 +182,49 @@ void Daemon::memory_maintenance_thread() { doris::MemInfo::refresh_allocator_mem(); #endif doris::MemInfo::refresh_proc_mem_no_allocator_cache(); - LOG_EVERY_N(INFO, 10) << MemTrackerLimiter::process_mem_log_str(); // Refresh mem tracker each type metrics. doris::MemTrackerLimiter::refresh_global_counter(); - if (doris::config::memory_debug) { - doris::MemTrackerLimiter::print_log_process_usage("memory_debug", false); - } - doris::MemTrackerLimiter::enable_print_log_process_usage(); // If system available memory is not enough, or the process memory exceeds the limit, reduce refresh interval. if (doris::MemInfo::sys_mem_available() < doris::MemInfo::sys_mem_available_low_water_mark() || doris::MemInfo::proc_mem_no_allocator_cache() >= doris::MemInfo::mem_limit()) { - interval_milliseconds = 100; - doris::MemInfo::process_full_gc(); + doris::MemTrackerLimiter::print_log_process_usage("process full gc", false); + interval_milliseconds = std::min(100, config::memory_maintenance_sleep_time_ms); + if (doris::MemInfo::process_full_gc()) { + // If there is not enough memory to be gc, the process memory usage will not be printed in the next continuous gc. + doris::MemTrackerLimiter::enable_print_log_process_usage(); + } + cache_gc_interval_ms = config::cache_gc_interval_s * 1000; } else if (doris::MemInfo::sys_mem_available() < doris::MemInfo::sys_mem_available_warning_water_mark() || doris::MemInfo::proc_mem_no_allocator_cache() >= doris::MemInfo::soft_mem_limit()) { - interval_milliseconds = 200; - doris::MemInfo::process_minor_gc(); + doris::MemTrackerLimiter::print_log_process_usage("process minor gc", false); + interval_milliseconds = std::min(200, config::memory_maintenance_sleep_time_ms); + if (doris::MemInfo::process_minor_gc()) { + doris::MemTrackerLimiter::enable_print_log_process_usage(); + } + cache_gc_interval_ms = config::cache_gc_interval_s * 1000; } else { + doris::MemTrackerLimiter::enable_print_log_process_usage(); interval_milliseconds = config::memory_maintenance_sleep_time_ms; + if (doris::config::memory_debug) { + LOG_EVERY_N(WARNING, 20) << doris::MemTrackerLimiter::log_process_usage_str( + "memory debug", false); // default 10s print once + } else { + LOG_EVERY_N(INFO, 10) + << MemTrackerLimiter::process_mem_log_str(); // default 5s print once + } + cache_gc_interval_ms -= interval_milliseconds; + if (cache_gc_interval_ms < 0) { + cache_gc_freed_mem = 0; + doris::MemInfo::process_cache_gc(cache_gc_freed_mem); + LOG(INFO) << fmt::format("Process regular GC Cache, Free Memory {} Bytes", + cache_gc_freed_mem); // default 6s print once + cache_gc_interval_ms = config::cache_gc_interval_s * 1000; + } } } } diff --git a/be/src/runtime/memory/mem_tracker_limiter.cpp b/be/src/runtime/memory/mem_tracker_limiter.cpp index 77126eba2d..938f72d2a0 100644 --- a/be/src/runtime/memory/mem_tracker_limiter.cpp +++ b/be/src/runtime/memory/mem_tracker_limiter.cpp @@ -23,6 +23,7 @@ #include <queue> #include "runtime/fragment_mgr.h" +#include "runtime/load_channel_mgr.h" #include "runtime/runtime_state.h" #include "runtime/thread_context.h" #include "util/pretty_printer.h" @@ -195,26 +196,35 @@ void MemTrackerLimiter::print_log_usage(const std::string& msg) { } } +std::string MemTrackerLimiter::log_process_usage_str(const std::string& msg, bool with_stacktrace) { + std::string detail = msg; + detail += "\nProcess Memory Summary:\n " + MemTrackerLimiter::process_mem_log_str(); + if (with_stacktrace) detail += "\nAlloc Stacktrace:\n" + get_stack_trace(); + std::vector<MemTracker::Snapshot> snapshots; + MemTrackerLimiter::make_process_snapshots(&snapshots); + MemTrackerLimiter::make_type_snapshots(&snapshots, MemTrackerLimiter::Type::GLOBAL); + + // Add additional tracker printed when memory exceeds limit. + snapshots.emplace_back( + ExecEnv::GetInstance()->load_channel_mgr()->mem_tracker()->make_snapshot()); + + detail += "\nMemory Tracker Summary:"; + for (const auto& snapshot : snapshots) { + if (snapshot.label == "" && snapshot.parent_label == "") { + detail += "\n " + MemTrackerLimiter::type_log_usage(snapshot); + } else if (snapshot.parent_label == "") { + detail += "\n " + MemTrackerLimiter::log_usage(snapshot); + } else { + detail += "\n " + MemTracker::log_usage(snapshot); + } + } + return detail; +} + void MemTrackerLimiter::print_log_process_usage(const std::string& msg, bool with_stacktrace) { if (MemTrackerLimiter::_enable_print_log_process_usage) { MemTrackerLimiter::_enable_print_log_process_usage = false; - std::string detail = msg; - detail += "\nProcess Memory Summary:\n " + MemTrackerLimiter::process_mem_log_str(); - if (with_stacktrace) detail += "\nAlloc Stacktrace:\n" + get_stack_trace(); - std::vector<MemTracker::Snapshot> snapshots; - MemTrackerLimiter::make_process_snapshots(&snapshots); - MemTrackerLimiter::make_type_snapshots(&snapshots, MemTrackerLimiter::Type::GLOBAL); - detail += "\nMemory Tracker Summary:"; - for (const auto& snapshot : snapshots) { - if (snapshot.label == "" && snapshot.parent_label == "") { - detail += "\n " + MemTrackerLimiter::type_log_usage(snapshot); - } else if (snapshot.parent_label == "") { - detail += "\n " + MemTrackerLimiter::log_usage(snapshot); - } else { - detail += "\n " + MemTracker::log_usage(snapshot); - } - } - LOG(WARNING) << detail; + LOG(WARNING) << log_process_usage_str(msg, with_stacktrace); } } @@ -252,6 +262,10 @@ int64_t MemTrackerLimiter::free_top_memory_query(int64_t min_free_mem, Type type int64_t freed_mem = 0; while (!min_pq.empty()) { TUniqueId cancelled_queryid = label_to_queryid(min_pq.top().second); + if (cancelled_queryid == TUniqueId()) { + min_pq.pop(); + continue; + } ExecEnv::GetInstance()->fragment_mgr()->cancel_query( cancelled_queryid, PPlanFragmentCancelReason::MEMORY_LIMIT_EXCEED, fmt::format("Process has no memory available, cancel top memory usage {}: " @@ -344,6 +358,10 @@ int64_t MemTrackerLimiter::free_top_overcommit_query(int64_t min_free_mem, Type int64_t freed_mem = 0; while (!max_pq.empty()) { TUniqueId cancelled_queryid = label_to_queryid(max_pq.top().second); + if (cancelled_queryid == TUniqueId()) { + max_pq.pop(); + continue; + } int64_t query_mem = query_consumption[max_pq.top().second]; ExecEnv::GetInstance()->fragment_mgr()->cancel_query( cancelled_queryid, PPlanFragmentCancelReason::MEMORY_LIMIT_EXCEED, @@ -354,7 +372,7 @@ int64_t MemTrackerLimiter::free_top_overcommit_query(int64_t min_free_mem, Type "details see be.INFO.", TypeString[type], TypeString[type], max_pq.top().second, print_bytes(query_mem), BackendOptions::get_localhost(), - PerfCounters::get_vm_rss_str(), print_bytes(MemInfo::soft_mem_limit()), + PerfCounters::get_vm_rss_str(), MemInfo::soft_mem_limit_str(), MemInfo::sys_mem_available_str(), print_bytes(MemInfo::sys_mem_available_warning_water_mark()))); diff --git a/be/src/runtime/memory/mem_tracker_limiter.h b/be/src/runtime/memory/mem_tracker_limiter.h index 9523dd12e8..2e059f8770 100644 --- a/be/src/runtime/memory/mem_tracker_limiter.h +++ b/be/src/runtime/memory/mem_tracker_limiter.h @@ -136,6 +136,7 @@ public: void print_log_usage(const std::string& msg); void enable_print_log_usage() { _enable_print_log_usage = true; } static void enable_print_log_process_usage() { _enable_print_log_process_usage = true; } + static std::string log_process_usage_str(const std::string& msg, bool with_stacktrace = true); static void print_log_process_usage(const std::string& msg, bool with_stacktrace = true); // Log the memory usage when memory limit is exceeded. @@ -157,6 +158,9 @@ public: } // only for Type::QUERY or Type::LOAD. static TUniqueId label_to_queryid(const std::string& label) { + if (label.rfind("Query#Id=", 0) != 0 && label.rfind("Load#Id=", 0) != 0) { + return TUniqueId(); + } auto queryid = split(label, "#Id=")[1]; TUniqueId querytid; parse_id(queryid, &querytid); @@ -165,12 +169,14 @@ public: static std::string process_mem_log_str() { return fmt::format( - "OS physical memory {}, process memory used {} limit {}, sys mem available {} low " - "water mark {}, refresh interval memory growth {} B", + "OS physical memory {}. Process memory usage {}, limit {}, soft limit {}. Sys " + "available memory {}, low water mark {}, warning water mark {}. Refresh interval " + "memory growth {} B", PrettyPrinter::print(MemInfo::physical_mem(), TUnit::BYTES), PerfCounters::get_vm_rss_str(), MemInfo::mem_limit_str(), - MemInfo::sys_mem_available_str(), + MemInfo::soft_mem_limit_str(), MemInfo::sys_mem_available_str(), PrettyPrinter::print(MemInfo::sys_mem_available_low_water_mark(), TUnit::BYTES), + PrettyPrinter::print(MemInfo::sys_mem_available_warning_water_mark(), TUnit::BYTES), MemInfo::refresh_interval_memory_growth); } diff --git a/be/src/util/mem_info.cpp b/be/src/util/mem_info.cpp index 82e4f125db..b7e68674e8 100644 --- a/be/src/util/mem_info.cpp +++ b/be/src/util/mem_info.cpp @@ -47,6 +47,7 @@ int64_t MemInfo::_s_physical_mem = -1; int64_t MemInfo::_s_mem_limit = -1; std::string MemInfo::_s_mem_limit_str = ""; int64_t MemInfo::_s_soft_mem_limit = -1; +std::string MemInfo::_s_soft_mem_limit_str = ""; int64_t MemInfo::_s_allocator_cache_mem = 0; std::string MemInfo::_s_allocator_cache_mem_str = ""; @@ -89,61 +90,66 @@ void MemInfo::refresh_allocator_mem() { #endif } +void MemInfo::process_cache_gc(int64_t& freed_mem) { + // TODO, free more cache, and should free a certain percentage of capacity, not all. + freed_mem += ChunkAllocator::instance()->mem_consumption(); + ChunkAllocator::instance()->clear(); + freed_mem += + StoragePageCache::instance()->get_page_cache_mem_consumption(segment_v2::DATA_PAGE); + StoragePageCache::instance()->prune(segment_v2::DATA_PAGE); +} + // step1: free all cache // step2: free top overcommit query, if enable query memroy overcommit -void MemInfo::process_minor_gc() { - // TODO, free more cache, and should free a certain percentage of capacity, not all. +bool MemInfo::process_minor_gc() { int64_t freed_mem = 0; Defer defer {[&]() { LOG(INFO) << fmt::format("Process Minor GC Free Memory {} Bytes", freed_mem); }}; - freed_mem += ChunkAllocator::instance()->mem_consumption(); - ChunkAllocator::instance()->clear(); + MemInfo::process_cache_gc(freed_mem); if (freed_mem > _s_process_minor_gc_size) { - return; + return true; } - freed_mem += - StoragePageCache::instance()->get_page_cache_mem_consumption(segment_v2::DATA_PAGE); - StoragePageCache::instance()->prune(segment_v2::DATA_PAGE); if (config::enable_query_memroy_overcommit) { freed_mem += MemTrackerLimiter::free_top_overcommit_query(_s_process_minor_gc_size - freed_mem); } + if (freed_mem > _s_process_minor_gc_size) { + return true; + } + return false; } // step1: free all cache // step2: free top memory query // step3: free top overcommit load, load retries are more expensive, So cancel at the end. // step4: free top memory load -void MemInfo::process_full_gc() { +bool MemInfo::process_full_gc() { int64_t freed_mem = 0; Defer defer { [&]() { LOG(INFO) << fmt::format("Process Full GC Free Memory {} Bytes", freed_mem); }}; - freed_mem += - StoragePageCache::instance()->get_page_cache_mem_consumption(segment_v2::DATA_PAGE); - StoragePageCache::instance()->prune(segment_v2::DATA_PAGE); + MemInfo::process_cache_gc(freed_mem); if (freed_mem > _s_process_full_gc_size) { - return; - } - freed_mem += ChunkAllocator::instance()->mem_consumption(); - ChunkAllocator::instance()->clear(); - if (freed_mem > _s_process_full_gc_size) { - return; + return true; } freed_mem += MemTrackerLimiter::free_top_memory_query(_s_process_full_gc_size - freed_mem); if (freed_mem > _s_process_full_gc_size) { - return; + return true; } if (config::enable_query_memroy_overcommit) { freed_mem += MemTrackerLimiter::free_top_overcommit_load(_s_process_full_gc_size - freed_mem); if (freed_mem > _s_process_full_gc_size) { - return; + return true; } } freed_mem += MemTrackerLimiter::free_top_memory_load(_s_process_full_gc_size - freed_mem); + if (freed_mem > _s_process_full_gc_size) { + return true; + } + return false; } #ifndef __APPLE__ @@ -203,6 +209,7 @@ void MemInfo::init() { } _s_mem_limit_str = PrettyPrinter::print(_s_mem_limit, TUnit::BYTES); _s_soft_mem_limit = _s_mem_limit * config::soft_mem_limit_frac; + _s_soft_mem_limit_str = PrettyPrinter::print(_s_soft_mem_limit, TUnit::BYTES); _s_process_minor_gc_size = ParseUtil::parse_mem_spec(config::process_minor_gc_size, -1, _s_mem_limit, &is_percent); @@ -239,7 +246,7 @@ void MemInfo::init() { config::max_sys_mem_available_low_water_mark_bytes); int64_t p2 = std::max<int64_t>(_s_vm_min_free_kbytes - _s_physical_mem * 0.01, 0); _s_sys_mem_available_low_water_mark = std::max<int64_t>(p1 - p2, 0); - _s_sys_mem_available_warning_water_mark = _s_sys_mem_available_low_water_mark + p1 * 2; + _s_sys_mem_available_warning_water_mark = _s_sys_mem_available_low_water_mark + p1; LOG(INFO) << "Physical Memory: " << PrettyPrinter::print(_s_physical_mem, TUnit::BYTES) << ", Mem Limit: " << _s_mem_limit_str @@ -264,6 +271,7 @@ void MemInfo::init() { _s_mem_limit = ParseUtil::parse_mem_spec(config::mem_limit, -1, _s_physical_mem, &is_percent); _s_mem_limit_str = PrettyPrinter::print(_s_mem_limit, TUnit::BYTES); _s_soft_mem_limit = _s_mem_limit * config::soft_mem_limit_frac; + _s_soft_mem_limit_str = PrettyPrinter::print(_s_soft_mem_limit, TUnit::BYTES); LOG(INFO) << "Physical Memory: " << PrettyPrinter::print(_s_physical_mem, TUnit::BYTES); _s_initialized = true; diff --git a/be/src/util/mem_info.h b/be/src/util/mem_info.h index bd76c6124c..2cb17eb6b8 100644 --- a/be/src/util/mem_info.h +++ b/be/src/util/mem_info.h @@ -111,11 +111,16 @@ public: DCHECK(_s_initialized); return _s_soft_mem_limit; } + static inline std::string soft_mem_limit_str() { + DCHECK(_s_initialized); + return _s_soft_mem_limit_str; + } static std::string debug_string(); - static void process_minor_gc(); - static void process_full_gc(); + static void process_cache_gc(int64_t& freed_mem); + static bool process_minor_gc(); + static bool process_full_gc(); // It is only used after the memory limit is exceeded. When multiple threads are waiting for the available memory of the process, // avoid multiple threads starting at the same time and causing OOM. @@ -127,6 +132,7 @@ private: static int64_t _s_mem_limit; static std::string _s_mem_limit_str; static int64_t _s_soft_mem_limit; + static std::string _s_soft_mem_limit_str; static int64_t _s_allocator_cache_mem; static std::string _s_allocator_cache_mem_str; --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org For additional commands, e-mail: commits-h...@doris.apache.org