This is an automated email from the ASF dual-hosted git repository.

yiguolei pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/doris.git


The following commit(s) were added to refs/heads/master by this push:
     new 16bb5cb430 [enhancement](memory) Jemalloc performance optimization and 
compatibility with MemTracker #12496
16bb5cb430 is described below

commit 16bb5cb4306836e52acc2b609586e67324acc2ef
Author: Xinyi Zou <zouxiny...@gmail.com>
AuthorDate: Wed Sep 28 12:04:29 2022 +0800

    [enhancement](memory) Jemalloc performance optimization and compatibility 
with MemTracker #12496
---
 be/CMakeLists.txt                              |   6 +-
 be/src/runtime/CMakeLists.txt                  |   6 ++
 be/src/runtime/exec_env_init.cpp               |   2 +-
 be/src/runtime/memory/jemalloc_hook.cpp        | 141 +++++++++++++++++++++++++
 be/src/runtime/memory/tcmalloc_hook.h          |  21 +---
 be/src/runtime/memory/thread_mem_tracker_mgr.h |  35 ++++--
 be/src/runtime/thread_context.h                |  26 +++++
 bin/start_be.sh                                |   2 +
 8 files changed, 208 insertions(+), 31 deletions(-)

diff --git a/be/CMakeLists.txt b/be/CMakeLists.txt
index edcbebdc0e..0674bc7922 100644
--- a/be/CMakeLists.txt
+++ b/be/CMakeLists.txt
@@ -223,7 +223,10 @@ add_library(leveldb STATIC IMPORTED)
 set_target_properties(leveldb PROPERTIES IMPORTED_LOCATION 
${THIRDPARTY_DIR}/lib/libleveldb.a)
 
 add_library(jemalloc STATIC IMPORTED)
-set_target_properties(jemalloc PROPERTIES IMPORTED_LOCATION 
${THIRDPARTY_DIR}/lib64/libjemalloc.a)
+set_target_properties(jemalloc PROPERTIES IMPORTED_LOCATION 
${THIRDPARTY_DIR}/lib/libjemalloc_doris.a)
+
+add_library(jemalloc_arrow STATIC IMPORTED)
+set_target_properties(jemalloc_arrow PROPERTIES IMPORTED_LOCATION 
${THIRDPARTY_DIR}/lib64/libjemalloc.a)
 
 add_library(brotlicommon STATIC IMPORTED)
 set_target_properties(brotlicommon PROPERTIES IMPORTED_LOCATION 
${THIRDPARTY_DIR}/lib64/libbrotlicommon.a)
@@ -681,6 +684,7 @@ set(COMMON_THIRDPARTY
     roaring
     fmt
     jemalloc
+    jemalloc_arrow
     brotlicommon
     brotlidec
     brotlienc
diff --git a/be/src/runtime/CMakeLists.txt b/be/src/runtime/CMakeLists.txt
index 210fb844de..cdd77455b4 100644
--- a/be/src/runtime/CMakeLists.txt
+++ b/be/src/runtime/CMakeLists.txt
@@ -115,6 +115,12 @@ if (WITH_MYSQL)
         )
 endif()
 
+if (USE_JEMALLOC)
+    set(RUNTIME_FILES ${RUNTIME_FILES}
+        memory/jemalloc_hook.cpp
+        )
+endif()
+
 add_library(Runtime STATIC
     ${RUNTIME_FILES}
     )
diff --git a/be/src/runtime/exec_env_init.cpp b/be/src/runtime/exec_env_init.cpp
index 1afb147b05..828830ebeb 100644
--- a/be/src/runtime/exec_env_init.cpp
+++ b/be/src/runtime/exec_env_init.cpp
@@ -208,7 +208,7 @@ Status ExecEnv::_init_mem_tracker() {
             std::make_shared<MemTrackerLimiter>(global_memory_limit_bytes, 
"Process");
     _orphan_mem_tracker = std::make_shared<MemTrackerLimiter>(-1, "Orphan", 
_process_mem_tracker);
     _orphan_mem_tracker_raw = _orphan_mem_tracker.get();
-    thread_context()->_thread_mem_tracker_mgr->init();
+    thread_context()->_thread_mem_tracker_mgr->init_impl();
     thread_context()->_thread_mem_tracker_mgr->set_check_attach(false);
 #if defined(USE_MEM_TRACKER) && !defined(__SANITIZE_ADDRESS__) && 
!defined(ADDRESS_SANITIZER) && \
         !defined(LEAK_SANITIZER) && !defined(THREAD_SANITIZER) && 
!defined(USE_JEMALLOC)
diff --git a/be/src/runtime/memory/jemalloc_hook.cpp 
b/be/src/runtime/memory/jemalloc_hook.cpp
new file mode 100644
index 0000000000..11dee8eab6
--- /dev/null
+++ b/be/src/runtime/memory/jemalloc_hook.cpp
@@ -0,0 +1,141 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#include "jemalloc/jemalloc.h"
+#include "runtime/thread_context.h"
+
+extern "C" {
+void* doris_malloc(size_t size) __THROW {
+    MEM_MALLOC_HOOK(je_nallocx(size, 0));
+    void* ptr = je_malloc(size);
+    if (UNLIKELY(ptr == nullptr)) {
+        MEM_FREE_HOOK(je_nallocx(size, 0));
+    }
+    return ptr;
+}
+
+void doris_free(void* p) __THROW {
+    MEM_FREE_HOOK(je_malloc_usable_size(p));
+    je_free(p);
+}
+
+void* doris_realloc(void* p, size_t size) __THROW {
+    if (UNLIKELY(size == 0)) {
+        return nullptr;
+    }
+    int64_t old_size = je_malloc_usable_size(p);
+    MEM_MALLOC_HOOK(je_nallocx(size, 0) - old_size);
+    void* ptr = je_realloc(p, size);
+    if (UNLIKELY(ptr == nullptr)) {
+        MEM_FREE_HOOK(je_nallocx(size, 0) - old_size);
+    }
+    return ptr;
+}
+
+void* doris_calloc(size_t n, size_t size) __THROW {
+    if (UNLIKELY(size == 0)) {
+        return nullptr;
+    }
+
+    MEM_MALLOC_HOOK(n * size);
+    void* ptr = je_calloc(n, size);
+    if (UNLIKELY(ptr == nullptr)) {
+        MEM_FREE_HOOK(n * size);
+    } else {
+        MEM_FREE_HOOK(je_malloc_usable_size(ptr) - n * size);
+    }
+    return ptr;
+}
+
+void doris_cfree(void* ptr) __THROW {
+    MEM_FREE_HOOK(je_malloc_usable_size(ptr));
+    je_free(ptr);
+}
+
+void* doris_memalign(size_t align, size_t size) __THROW {
+    MEM_MALLOC_HOOK(size);
+    void* ptr = je_aligned_alloc(align, size);
+    if (UNLIKELY(ptr == nullptr)) {
+        MEM_FREE_HOOK(size);
+    } else {
+        MEM_MALLOC_HOOK(je_malloc_usable_size(ptr) - size);
+    }
+    return ptr;
+}
+
+void* doris_aligned_alloc(size_t align, size_t size) __THROW {
+    MEM_MALLOC_HOOK(size);
+    void* ptr = je_aligned_alloc(align, size);
+    if (UNLIKELY(ptr == nullptr)) {
+        MEM_FREE_HOOK(size);
+    } else {
+        MEM_MALLOC_HOOK(je_malloc_usable_size(ptr) - size);
+    }
+    return ptr;
+}
+
+void* doris_valloc(size_t size) __THROW {
+    MEM_MALLOC_HOOK(size);
+    void* ptr = je_valloc(size);
+    if (UNLIKELY(ptr == nullptr)) {
+        MEM_FREE_HOOK(size);
+    } else {
+        MEM_MALLOC_HOOK(je_malloc_usable_size(ptr) - size);
+    }
+    return ptr;
+}
+
+void* doris_pvalloc(size_t size) __THROW {
+    MEM_MALLOC_HOOK(size);
+    void* ptr = je_valloc(size);
+    if (UNLIKELY(ptr == nullptr)) {
+        MEM_FREE_HOOK(size);
+    } else {
+        MEM_MALLOC_HOOK(je_malloc_usable_size(ptr) - size);
+    }
+    return ptr;
+}
+
+int doris_posix_memalign(void** r, size_t align, size_t size) __THROW {
+    MEM_MALLOC_HOOK(size);
+    int ret = je_posix_memalign(r, align, size);
+    if (UNLIKELY(ret != 0)) {
+        MEM_FREE_HOOK(size);
+    } else {
+        MEM_MALLOC_HOOK(je_malloc_usable_size(*r) - size);
+    }
+    return ret;
+}
+
+size_t doris_malloc_usable_size(void* ptr) __THROW {
+    size_t ret = je_malloc_usable_size(ptr);
+    return ret;
+}
+
+#define ALIAS(doris_fn) __attribute__((alias(#doris_fn), used))
+void* malloc(size_t size) __THROW ALIAS(doris_malloc);
+void free(void* p) __THROW ALIAS(doris_free);
+void* realloc(void* p, size_t size) __THROW ALIAS(doris_realloc);
+void* calloc(size_t n, size_t size) __THROW ALIAS(doris_calloc);
+void cfree(void* ptr) __THROW ALIAS(doris_cfree);
+void* memalign(size_t align, size_t size) __THROW ALIAS(doris_memalign);
+void* aligned_alloc(size_t align, size_t size) __THROW 
ALIAS(doris_aligned_alloc);
+void* valloc(size_t size) __THROW ALIAS(doris_valloc);
+void* pvalloc(size_t size) __THROW ALIAS(doris_pvalloc);
+int posix_memalign(void** r, size_t a, size_t s) __THROW 
ALIAS(doris_posix_memalign);
+size_t malloc_usable_size(void* ptr) __THROW ALIAS(doris_malloc_usable_size);
+}
diff --git a/be/src/runtime/memory/tcmalloc_hook.h 
b/be/src/runtime/memory/tcmalloc_hook.h
index 627f42795d..6ec9352ad3 100644
--- a/be/src/runtime/memory/tcmalloc_hook.h
+++ b/be/src/runtime/memory/tcmalloc_hook.h
@@ -36,28 +36,11 @@
 //  destructor to control the behavior of consume can lead to unexpected 
behavior,
 //  like this: if (LIKELY(doris::start_thread_mem_tracker)) {
 void new_hook(const void* ptr, size_t size) {
-    if (doris::btls_key != doris::EMPTY_BTLS_KEY && doris::bthread_context != 
nullptr) {
-        // Currently in bthread, consume thread context mem tracker in bthread 
tls.
-        doris::update_bthread_context();
-        
doris::bthread_context->_thread_mem_tracker_mgr->consume(tc_nallocx(size, 0));
-    } else if (doris::thread_context_ptr._init) {
-        
doris::thread_context_ptr._ptr->_thread_mem_tracker_mgr->consume(tc_nallocx(size,
 0));
-    } else {
-        doris::ThreadMemTrackerMgr::consume_no_attach(tc_nallocx(size, 0));
-    }
+    MEM_MALLOC_HOOK(tc_nallocx(size, 0));
 }
 
 void delete_hook(const void* ptr) {
-    if (doris::btls_key != doris::EMPTY_BTLS_KEY && doris::bthread_context != 
nullptr) {
-        doris::update_bthread_context();
-        doris::bthread_context->_thread_mem_tracker_mgr->consume(
-                -tc_malloc_size(const_cast<void*>(ptr)));
-    } else if (doris::thread_context_ptr._init) {
-        doris::thread_context_ptr._ptr->_thread_mem_tracker_mgr->consume(
-                -tc_malloc_size(const_cast<void*>(ptr)));
-    } else {
-        
doris::ThreadMemTrackerMgr::consume_no_attach(-tc_malloc_size(const_cast<void*>(ptr)));
-    }
+    MEM_FREE_HOOK(tc_malloc_size(const_cast<void*>(ptr)));
 }
 
 void init_hook() {
diff --git a/be/src/runtime/memory/thread_mem_tracker_mgr.h 
b/be/src/runtime/memory/thread_mem_tracker_mgr.h
index 399265bc51..4c9099528d 100644
--- a/be/src/runtime/memory/thread_mem_tracker_mgr.h
+++ b/be/src/runtime/memory/thread_mem_tracker_mgr.h
@@ -51,12 +51,15 @@ public:
 
     // only for tcmalloc hook
     static void consume_no_attach(int64_t size) {
-        ExecEnv::GetInstance()->orphan_mem_tracker_raw()->consume(size);
+        if (ExecEnv::GetInstance()->initialized()) {
+            ExecEnv::GetInstance()->orphan_mem_tracker_raw()->consume(size);
+        }
     }
 
     // After thread initialization, calling `init` again must call 
`clear_untracked_mems` first
     // to avoid memory tracking loss.
     void init();
+    void init_impl();
 
     // After attach, the current thread TCMalloc Hook starts to 
consume/release task mem_tracker
     void attach_limiter_tracker(const std::string& task_id, const TUniqueId& 
fragment_instance_id,
@@ -85,9 +88,13 @@ public:
     bool is_attach_query() { return _fragment_instance_id_stack.back() != 
TUniqueId(); }
 
     std::shared_ptr<MemTrackerLimiter> limiter_mem_tracker() {
+        if (_limiter_tracker_raw == nullptr) init_impl();
         return _limiter_tracker_stack.back();
     }
-    MemTrackerLimiter* limiter_mem_tracker_raw() { return 
_limiter_tracker_raw; }
+    MemTrackerLimiter* limiter_mem_tracker_raw() {
+        if (_limiter_tracker_raw == nullptr) init_impl();
+        return _limiter_tracker_raw;
+    }
 
     void set_check_limit(bool check_limit) { _check_limit = check_limit; }
     void set_check_attach(bool check_attach) { _check_attach = check_attach; }
@@ -120,7 +127,7 @@ private:
 
     // _limiter_tracker_stack[0] = orphan_mem_tracker
     std::vector<std::shared_ptr<MemTrackerLimiter>> _limiter_tracker_stack;
-    MemTrackerLimiter* _limiter_tracker_raw;
+    MemTrackerLimiter* _limiter_tracker_raw = nullptr;
     std::vector<MemTracker*> _consumer_tracker_stack;
 
     // If true, call memtracker try_consume, otherwise call consume.
@@ -138,12 +145,18 @@ inline void ThreadMemTrackerMgr::init() {
     // _limiter_tracker_stack[0] = orphan_mem_tracker
     DCHECK(_limiter_tracker_stack.size() <= 1)
             << "limiter_tracker_stack.size(): " << 
_limiter_tracker_stack.size();
-    if (_limiter_tracker_stack.size() == 0) {
-        
_limiter_tracker_stack.push_back(ExecEnv::GetInstance()->orphan_mem_tracker());
-        _limiter_tracker_raw = 
ExecEnv::GetInstance()->orphan_mem_tracker_raw();
-        _task_id_stack.push_back("");
-        _fragment_instance_id_stack.push_back(TUniqueId());
+    if (_limiter_tracker_raw == nullptr && 
ExecEnv::GetInstance()->initialized()) {
+        init_impl();
     }
+}
+
+inline void ThreadMemTrackerMgr::init_impl() {
+    DCHECK(_limiter_tracker_stack.size() == 0);
+    DCHECK(_limiter_tracker_raw == nullptr);
+    
_limiter_tracker_stack.push_back(ExecEnv::GetInstance()->orphan_mem_tracker());
+    _limiter_tracker_raw = ExecEnv::GetInstance()->orphan_mem_tracker_raw();
+    _task_id_stack.push_back("");
+    _fragment_instance_id_stack.push_back(TUniqueId());
     _check_limit = true;
 }
 
@@ -166,9 +179,10 @@ inline void ThreadMemTrackerMgr::consume(int64_t size) {
     // When some threads `0 < _untracked_mem < 
config::mem_tracker_consume_min_size_bytes`
     // and some threads `_untracked_mem <= 
-config::mem_tracker_consume_min_size_bytes` trigger consumption(),
     // it will cause tracker->consumption to be temporarily less than 0.
+    // After the jemalloc hook is loaded, before ExecEnv init, 
_limiter_tracker=nullptr.
     if ((_untracked_mem >= config::mem_tracker_consume_min_size_bytes ||
          _untracked_mem <= -config::mem_tracker_consume_min_size_bytes) &&
-        !_stop_consume) {
+        !_stop_consume && ExecEnv::GetInstance()->initialized()) {
         if (_check_limit) {
             flush_untracked_mem<true>();
         } else {
@@ -182,8 +196,9 @@ inline void ThreadMemTrackerMgr::flush_untracked_mem() {
     // Temporary memory may be allocated during the consumption of the mem 
tracker, which will lead to entering
     // the TCMalloc Hook again, so suspend consumption to avoid falling into 
an infinite loop.
     _stop_consume = true;
-    old_untracked_mem = _untracked_mem;
+    if (_limiter_tracker_raw == nullptr) init_impl();
     DCHECK(_limiter_tracker_raw);
+    old_untracked_mem = _untracked_mem;
     if (CheckLimit) {
 #ifndef BE_TEST
         // When all threads are started, `attach_limiter_tracker` is expected 
to be called to bind the limiter tracker.
diff --git a/be/src/runtime/thread_context.h b/be/src/runtime/thread_context.h
index 670b3ff613..fbfac1be59 100644
--- a/be/src/runtime/thread_context.h
+++ b/be/src/runtime/thread_context.h
@@ -268,4 +268,30 @@ public:
                                         
->_thread_mem_tracker_mgr->last_consumer_tracker(), \
                                 msg),                                          
             \
                     ##__VA_ARGS__);
+
+// Mem Hook to consume thread mem tracker
+#define MEM_MALLOC_HOOK(size)                                                  
              \
+    do {                                                                       
              \
+        if (doris::btls_key != doris::EMPTY_BTLS_KEY && doris::bthread_context 
!= nullptr) { \
+            doris::update_bthread_context();                                   
              \
+            doris::bthread_context->_thread_mem_tracker_mgr->consume(size);    
              \
+        } else if (LIKELY(doris::thread_context_ptr._init)) {                  
              \
+            
doris::thread_context_ptr._ptr->_thread_mem_tracker_mgr->consume(size);         
 \
+        } else {                                                               
              \
+            doris::ThreadMemTrackerMgr::consume_no_attach(size);               
              \
+        }                                                                      
              \
+    } while (0)
+
+#define MEM_FREE_HOOK(size)                                                    
              \
+    do {                                                                       
              \
+        if (doris::btls_key != doris::EMPTY_BTLS_KEY && doris::bthread_context 
!= nullptr) { \
+            doris::update_bthread_context();                                   
              \
+            doris::bthread_context->_thread_mem_tracker_mgr->consume(-size);   
              \
+        } else if (doris::thread_context_ptr._init) {                          
              \
+            
doris::thread_context_ptr._ptr->_thread_mem_tracker_mgr->consume(-size);        
 \
+        } else {                                                               
              \
+            doris::ThreadMemTrackerMgr::consume_no_attach(-size);              
              \
+        }                                                                      
              \
+    } while (0)
+
 } // namespace doris
diff --git a/bin/start_be.sh b/bin/start_be.sh
index f60795d756..69d36cfaa3 100755
--- a/bin/start_be.sh
+++ b/bin/start_be.sh
@@ -207,6 +207,8 @@ export UBSAN_OPTIONS=print_stacktrace=1
 ## set hdfs conf
 export LIBHDFS3_CONF="${DORIS_HOME}/conf/hdfs-site.xml"
 
+export 
MALLOC_CONF="percpu_arena:percpu,background_thread:true,metadata_thp:auto,muzzy_decay_ms:30000,dirty_decay_ms:30000,oversize_threshold:0,lg_tcache_max:16"
+
 if [[ "${RUN_DAEMON}" -eq 1 ]]; then
     nohup ${LIMIT:+${LIMIT}} "${DORIS_HOME}/lib/doris_be" "$@" 
>>"${LOG_DIR}/be.out" 2>&1 </dev/null &
 else


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org
For additional commands, e-mail: commits-h...@doris.apache.org

Reply via email to