This is an automated email from the ASF dual-hosted git repository. yiguolei pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/master by this push: new 16bb5cb430 [enhancement](memory) Jemalloc performance optimization and compatibility with MemTracker #12496 16bb5cb430 is described below commit 16bb5cb4306836e52acc2b609586e67324acc2ef Author: Xinyi Zou <zouxiny...@gmail.com> AuthorDate: Wed Sep 28 12:04:29 2022 +0800 [enhancement](memory) Jemalloc performance optimization and compatibility with MemTracker #12496 --- be/CMakeLists.txt | 6 +- be/src/runtime/CMakeLists.txt | 6 ++ be/src/runtime/exec_env_init.cpp | 2 +- be/src/runtime/memory/jemalloc_hook.cpp | 141 +++++++++++++++++++++++++ be/src/runtime/memory/tcmalloc_hook.h | 21 +--- be/src/runtime/memory/thread_mem_tracker_mgr.h | 35 ++++-- be/src/runtime/thread_context.h | 26 +++++ bin/start_be.sh | 2 + 8 files changed, 208 insertions(+), 31 deletions(-) diff --git a/be/CMakeLists.txt b/be/CMakeLists.txt index edcbebdc0e..0674bc7922 100644 --- a/be/CMakeLists.txt +++ b/be/CMakeLists.txt @@ -223,7 +223,10 @@ add_library(leveldb STATIC IMPORTED) set_target_properties(leveldb PROPERTIES IMPORTED_LOCATION ${THIRDPARTY_DIR}/lib/libleveldb.a) add_library(jemalloc STATIC IMPORTED) -set_target_properties(jemalloc PROPERTIES IMPORTED_LOCATION ${THIRDPARTY_DIR}/lib64/libjemalloc.a) +set_target_properties(jemalloc PROPERTIES IMPORTED_LOCATION ${THIRDPARTY_DIR}/lib/libjemalloc_doris.a) + +add_library(jemalloc_arrow STATIC IMPORTED) +set_target_properties(jemalloc_arrow PROPERTIES IMPORTED_LOCATION ${THIRDPARTY_DIR}/lib64/libjemalloc.a) add_library(brotlicommon STATIC IMPORTED) set_target_properties(brotlicommon PROPERTIES IMPORTED_LOCATION ${THIRDPARTY_DIR}/lib64/libbrotlicommon.a) @@ -681,6 +684,7 @@ set(COMMON_THIRDPARTY roaring fmt jemalloc + jemalloc_arrow brotlicommon brotlidec brotlienc diff --git a/be/src/runtime/CMakeLists.txt b/be/src/runtime/CMakeLists.txt index 210fb844de..cdd77455b4 100644 --- a/be/src/runtime/CMakeLists.txt +++ b/be/src/runtime/CMakeLists.txt @@ -115,6 +115,12 @@ if (WITH_MYSQL) ) endif() +if (USE_JEMALLOC) + set(RUNTIME_FILES ${RUNTIME_FILES} + memory/jemalloc_hook.cpp + ) +endif() + add_library(Runtime STATIC ${RUNTIME_FILES} ) diff --git a/be/src/runtime/exec_env_init.cpp b/be/src/runtime/exec_env_init.cpp index 1afb147b05..828830ebeb 100644 --- a/be/src/runtime/exec_env_init.cpp +++ b/be/src/runtime/exec_env_init.cpp @@ -208,7 +208,7 @@ Status ExecEnv::_init_mem_tracker() { std::make_shared<MemTrackerLimiter>(global_memory_limit_bytes, "Process"); _orphan_mem_tracker = std::make_shared<MemTrackerLimiter>(-1, "Orphan", _process_mem_tracker); _orphan_mem_tracker_raw = _orphan_mem_tracker.get(); - thread_context()->_thread_mem_tracker_mgr->init(); + thread_context()->_thread_mem_tracker_mgr->init_impl(); thread_context()->_thread_mem_tracker_mgr->set_check_attach(false); #if defined(USE_MEM_TRACKER) && !defined(__SANITIZE_ADDRESS__) && !defined(ADDRESS_SANITIZER) && \ !defined(LEAK_SANITIZER) && !defined(THREAD_SANITIZER) && !defined(USE_JEMALLOC) diff --git a/be/src/runtime/memory/jemalloc_hook.cpp b/be/src/runtime/memory/jemalloc_hook.cpp new file mode 100644 index 0000000000..11dee8eab6 --- /dev/null +++ b/be/src/runtime/memory/jemalloc_hook.cpp @@ -0,0 +1,141 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +#include "jemalloc/jemalloc.h" +#include "runtime/thread_context.h" + +extern "C" { +void* doris_malloc(size_t size) __THROW { + MEM_MALLOC_HOOK(je_nallocx(size, 0)); + void* ptr = je_malloc(size); + if (UNLIKELY(ptr == nullptr)) { + MEM_FREE_HOOK(je_nallocx(size, 0)); + } + return ptr; +} + +void doris_free(void* p) __THROW { + MEM_FREE_HOOK(je_malloc_usable_size(p)); + je_free(p); +} + +void* doris_realloc(void* p, size_t size) __THROW { + if (UNLIKELY(size == 0)) { + return nullptr; + } + int64_t old_size = je_malloc_usable_size(p); + MEM_MALLOC_HOOK(je_nallocx(size, 0) - old_size); + void* ptr = je_realloc(p, size); + if (UNLIKELY(ptr == nullptr)) { + MEM_FREE_HOOK(je_nallocx(size, 0) - old_size); + } + return ptr; +} + +void* doris_calloc(size_t n, size_t size) __THROW { + if (UNLIKELY(size == 0)) { + return nullptr; + } + + MEM_MALLOC_HOOK(n * size); + void* ptr = je_calloc(n, size); + if (UNLIKELY(ptr == nullptr)) { + MEM_FREE_HOOK(n * size); + } else { + MEM_FREE_HOOK(je_malloc_usable_size(ptr) - n * size); + } + return ptr; +} + +void doris_cfree(void* ptr) __THROW { + MEM_FREE_HOOK(je_malloc_usable_size(ptr)); + je_free(ptr); +} + +void* doris_memalign(size_t align, size_t size) __THROW { + MEM_MALLOC_HOOK(size); + void* ptr = je_aligned_alloc(align, size); + if (UNLIKELY(ptr == nullptr)) { + MEM_FREE_HOOK(size); + } else { + MEM_MALLOC_HOOK(je_malloc_usable_size(ptr) - size); + } + return ptr; +} + +void* doris_aligned_alloc(size_t align, size_t size) __THROW { + MEM_MALLOC_HOOK(size); + void* ptr = je_aligned_alloc(align, size); + if (UNLIKELY(ptr == nullptr)) { + MEM_FREE_HOOK(size); + } else { + MEM_MALLOC_HOOK(je_malloc_usable_size(ptr) - size); + } + return ptr; +} + +void* doris_valloc(size_t size) __THROW { + MEM_MALLOC_HOOK(size); + void* ptr = je_valloc(size); + if (UNLIKELY(ptr == nullptr)) { + MEM_FREE_HOOK(size); + } else { + MEM_MALLOC_HOOK(je_malloc_usable_size(ptr) - size); + } + return ptr; +} + +void* doris_pvalloc(size_t size) __THROW { + MEM_MALLOC_HOOK(size); + void* ptr = je_valloc(size); + if (UNLIKELY(ptr == nullptr)) { + MEM_FREE_HOOK(size); + } else { + MEM_MALLOC_HOOK(je_malloc_usable_size(ptr) - size); + } + return ptr; +} + +int doris_posix_memalign(void** r, size_t align, size_t size) __THROW { + MEM_MALLOC_HOOK(size); + int ret = je_posix_memalign(r, align, size); + if (UNLIKELY(ret != 0)) { + MEM_FREE_HOOK(size); + } else { + MEM_MALLOC_HOOK(je_malloc_usable_size(*r) - size); + } + return ret; +} + +size_t doris_malloc_usable_size(void* ptr) __THROW { + size_t ret = je_malloc_usable_size(ptr); + return ret; +} + +#define ALIAS(doris_fn) __attribute__((alias(#doris_fn), used)) +void* malloc(size_t size) __THROW ALIAS(doris_malloc); +void free(void* p) __THROW ALIAS(doris_free); +void* realloc(void* p, size_t size) __THROW ALIAS(doris_realloc); +void* calloc(size_t n, size_t size) __THROW ALIAS(doris_calloc); +void cfree(void* ptr) __THROW ALIAS(doris_cfree); +void* memalign(size_t align, size_t size) __THROW ALIAS(doris_memalign); +void* aligned_alloc(size_t align, size_t size) __THROW ALIAS(doris_aligned_alloc); +void* valloc(size_t size) __THROW ALIAS(doris_valloc); +void* pvalloc(size_t size) __THROW ALIAS(doris_pvalloc); +int posix_memalign(void** r, size_t a, size_t s) __THROW ALIAS(doris_posix_memalign); +size_t malloc_usable_size(void* ptr) __THROW ALIAS(doris_malloc_usable_size); +} diff --git a/be/src/runtime/memory/tcmalloc_hook.h b/be/src/runtime/memory/tcmalloc_hook.h index 627f42795d..6ec9352ad3 100644 --- a/be/src/runtime/memory/tcmalloc_hook.h +++ b/be/src/runtime/memory/tcmalloc_hook.h @@ -36,28 +36,11 @@ // destructor to control the behavior of consume can lead to unexpected behavior, // like this: if (LIKELY(doris::start_thread_mem_tracker)) { void new_hook(const void* ptr, size_t size) { - if (doris::btls_key != doris::EMPTY_BTLS_KEY && doris::bthread_context != nullptr) { - // Currently in bthread, consume thread context mem tracker in bthread tls. - doris::update_bthread_context(); - doris::bthread_context->_thread_mem_tracker_mgr->consume(tc_nallocx(size, 0)); - } else if (doris::thread_context_ptr._init) { - doris::thread_context_ptr._ptr->_thread_mem_tracker_mgr->consume(tc_nallocx(size, 0)); - } else { - doris::ThreadMemTrackerMgr::consume_no_attach(tc_nallocx(size, 0)); - } + MEM_MALLOC_HOOK(tc_nallocx(size, 0)); } void delete_hook(const void* ptr) { - if (doris::btls_key != doris::EMPTY_BTLS_KEY && doris::bthread_context != nullptr) { - doris::update_bthread_context(); - doris::bthread_context->_thread_mem_tracker_mgr->consume( - -tc_malloc_size(const_cast<void*>(ptr))); - } else if (doris::thread_context_ptr._init) { - doris::thread_context_ptr._ptr->_thread_mem_tracker_mgr->consume( - -tc_malloc_size(const_cast<void*>(ptr))); - } else { - doris::ThreadMemTrackerMgr::consume_no_attach(-tc_malloc_size(const_cast<void*>(ptr))); - } + MEM_FREE_HOOK(tc_malloc_size(const_cast<void*>(ptr))); } void init_hook() { diff --git a/be/src/runtime/memory/thread_mem_tracker_mgr.h b/be/src/runtime/memory/thread_mem_tracker_mgr.h index 399265bc51..4c9099528d 100644 --- a/be/src/runtime/memory/thread_mem_tracker_mgr.h +++ b/be/src/runtime/memory/thread_mem_tracker_mgr.h @@ -51,12 +51,15 @@ public: // only for tcmalloc hook static void consume_no_attach(int64_t size) { - ExecEnv::GetInstance()->orphan_mem_tracker_raw()->consume(size); + if (ExecEnv::GetInstance()->initialized()) { + ExecEnv::GetInstance()->orphan_mem_tracker_raw()->consume(size); + } } // After thread initialization, calling `init` again must call `clear_untracked_mems` first // to avoid memory tracking loss. void init(); + void init_impl(); // After attach, the current thread TCMalloc Hook starts to consume/release task mem_tracker void attach_limiter_tracker(const std::string& task_id, const TUniqueId& fragment_instance_id, @@ -85,9 +88,13 @@ public: bool is_attach_query() { return _fragment_instance_id_stack.back() != TUniqueId(); } std::shared_ptr<MemTrackerLimiter> limiter_mem_tracker() { + if (_limiter_tracker_raw == nullptr) init_impl(); return _limiter_tracker_stack.back(); } - MemTrackerLimiter* limiter_mem_tracker_raw() { return _limiter_tracker_raw; } + MemTrackerLimiter* limiter_mem_tracker_raw() { + if (_limiter_tracker_raw == nullptr) init_impl(); + return _limiter_tracker_raw; + } void set_check_limit(bool check_limit) { _check_limit = check_limit; } void set_check_attach(bool check_attach) { _check_attach = check_attach; } @@ -120,7 +127,7 @@ private: // _limiter_tracker_stack[0] = orphan_mem_tracker std::vector<std::shared_ptr<MemTrackerLimiter>> _limiter_tracker_stack; - MemTrackerLimiter* _limiter_tracker_raw; + MemTrackerLimiter* _limiter_tracker_raw = nullptr; std::vector<MemTracker*> _consumer_tracker_stack; // If true, call memtracker try_consume, otherwise call consume. @@ -138,12 +145,18 @@ inline void ThreadMemTrackerMgr::init() { // _limiter_tracker_stack[0] = orphan_mem_tracker DCHECK(_limiter_tracker_stack.size() <= 1) << "limiter_tracker_stack.size(): " << _limiter_tracker_stack.size(); - if (_limiter_tracker_stack.size() == 0) { - _limiter_tracker_stack.push_back(ExecEnv::GetInstance()->orphan_mem_tracker()); - _limiter_tracker_raw = ExecEnv::GetInstance()->orphan_mem_tracker_raw(); - _task_id_stack.push_back(""); - _fragment_instance_id_stack.push_back(TUniqueId()); + if (_limiter_tracker_raw == nullptr && ExecEnv::GetInstance()->initialized()) { + init_impl(); } +} + +inline void ThreadMemTrackerMgr::init_impl() { + DCHECK(_limiter_tracker_stack.size() == 0); + DCHECK(_limiter_tracker_raw == nullptr); + _limiter_tracker_stack.push_back(ExecEnv::GetInstance()->orphan_mem_tracker()); + _limiter_tracker_raw = ExecEnv::GetInstance()->orphan_mem_tracker_raw(); + _task_id_stack.push_back(""); + _fragment_instance_id_stack.push_back(TUniqueId()); _check_limit = true; } @@ -166,9 +179,10 @@ inline void ThreadMemTrackerMgr::consume(int64_t size) { // When some threads `0 < _untracked_mem < config::mem_tracker_consume_min_size_bytes` // and some threads `_untracked_mem <= -config::mem_tracker_consume_min_size_bytes` trigger consumption(), // it will cause tracker->consumption to be temporarily less than 0. + // After the jemalloc hook is loaded, before ExecEnv init, _limiter_tracker=nullptr. if ((_untracked_mem >= config::mem_tracker_consume_min_size_bytes || _untracked_mem <= -config::mem_tracker_consume_min_size_bytes) && - !_stop_consume) { + !_stop_consume && ExecEnv::GetInstance()->initialized()) { if (_check_limit) { flush_untracked_mem<true>(); } else { @@ -182,8 +196,9 @@ inline void ThreadMemTrackerMgr::flush_untracked_mem() { // Temporary memory may be allocated during the consumption of the mem tracker, which will lead to entering // the TCMalloc Hook again, so suspend consumption to avoid falling into an infinite loop. _stop_consume = true; - old_untracked_mem = _untracked_mem; + if (_limiter_tracker_raw == nullptr) init_impl(); DCHECK(_limiter_tracker_raw); + old_untracked_mem = _untracked_mem; if (CheckLimit) { #ifndef BE_TEST // When all threads are started, `attach_limiter_tracker` is expected to be called to bind the limiter tracker. diff --git a/be/src/runtime/thread_context.h b/be/src/runtime/thread_context.h index 670b3ff613..fbfac1be59 100644 --- a/be/src/runtime/thread_context.h +++ b/be/src/runtime/thread_context.h @@ -268,4 +268,30 @@ public: ->_thread_mem_tracker_mgr->last_consumer_tracker(), \ msg), \ ##__VA_ARGS__); + +// Mem Hook to consume thread mem tracker +#define MEM_MALLOC_HOOK(size) \ + do { \ + if (doris::btls_key != doris::EMPTY_BTLS_KEY && doris::bthread_context != nullptr) { \ + doris::update_bthread_context(); \ + doris::bthread_context->_thread_mem_tracker_mgr->consume(size); \ + } else if (LIKELY(doris::thread_context_ptr._init)) { \ + doris::thread_context_ptr._ptr->_thread_mem_tracker_mgr->consume(size); \ + } else { \ + doris::ThreadMemTrackerMgr::consume_no_attach(size); \ + } \ + } while (0) + +#define MEM_FREE_HOOK(size) \ + do { \ + if (doris::btls_key != doris::EMPTY_BTLS_KEY && doris::bthread_context != nullptr) { \ + doris::update_bthread_context(); \ + doris::bthread_context->_thread_mem_tracker_mgr->consume(-size); \ + } else if (doris::thread_context_ptr._init) { \ + doris::thread_context_ptr._ptr->_thread_mem_tracker_mgr->consume(-size); \ + } else { \ + doris::ThreadMemTrackerMgr::consume_no_attach(-size); \ + } \ + } while (0) + } // namespace doris diff --git a/bin/start_be.sh b/bin/start_be.sh index f60795d756..69d36cfaa3 100755 --- a/bin/start_be.sh +++ b/bin/start_be.sh @@ -207,6 +207,8 @@ export UBSAN_OPTIONS=print_stacktrace=1 ## set hdfs conf export LIBHDFS3_CONF="${DORIS_HOME}/conf/hdfs-site.xml" +export MALLOC_CONF="percpu_arena:percpu,background_thread:true,metadata_thp:auto,muzzy_decay_ms:30000,dirty_decay_ms:30000,oversize_threshold:0,lg_tcache_max:16" + if [[ "${RUN_DAEMON}" -eq 1 ]]; then nohup ${LIMIT:+${LIMIT}} "${DORIS_HOME}/lib/doris_be" "$@" >>"${LOG_DIR}/be.out" 2>&1 </dev/null & else --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org For additional commands, e-mail: commits-h...@doris.apache.org