This is an automated email from the ASF dual-hosted git repository.

morningman pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/doris.git


The following commit(s) were added to refs/heads/master by this push:
     new c5a19982a6b [Feature](multi-catalog) Add memory tracker for orc 
reader/writer and arrow parquet writer。 (#37234)
c5a19982a6b is described below

commit c5a19982a6bc5c7cffc98c1694d106e9925ecc66
Author: Qi Chen <kaka11.c...@gmail.com>
AuthorDate: Tue Jul 30 09:00:44 2024 +0800

    [Feature](multi-catalog) Add memory tracker for orc reader/writer and arrow 
parquet writer。 (#37234)
    
    ## Proposed changes
    
    [Feature] (multi-catalog) Add memory tracker for orc reader/writer and
    arrow parquet writer。
    
    ## Future work
    
    - Since the parquet reader is written by ourself and does not use the
    arrow third-party library, some memory usage needs to be added to the
    memory track.
    - Added read and write operator-level memory tracker to the profile.
---
 be/src/runtime/exec_env.h                          |  13 ++
 be/src/runtime/exec_env_init.cpp                   |   9 +
 be/src/util/faststring.h                           |   2 +-
 be/src/util/slice.h                                |   2 +-
 be/src/vec/common/allocator.cpp                    |  90 ++++++----
 be/src/vec/common/allocator.h                      | 165 +++++++++++++++--
 be/src/vec/common/allocator_fwd.h                  |   4 +-
 be/src/vec/common/hash_table/phmap_fwd_decl.h      |   2 +-
 be/src/vec/exec/format/orc/orc_memory_pool.h       |  53 ++++++
 be/src/vec/exec/format/orc/vorc_reader.cpp         |   2 +
 .../vec/exec/format/parquet/arrow_memory_pool.cpp  |  74 ++++++++
 be/src/vec/exec/format/parquet/arrow_memory_pool.h | 199 +++++++++++++++++++++
 be/src/vec/runtime/vorc_transformer.cpp            |  22 +--
 be/src/vec/runtime/vparquet_transformer.cpp        |  15 +-
 14 files changed, 591 insertions(+), 61 deletions(-)

diff --git a/be/src/runtime/exec_env.h b/be/src/runtime/exec_env.h
index 3434d01a59e..89e5593c84b 100644
--- a/be/src/runtime/exec_env.h
+++ b/be/src/runtime/exec_env.h
@@ -36,6 +36,13 @@
 #include "runtime/frontend_info.h" // TODO(zhiqiang): find a way to remove 
this include header
 #include "util/threadpool.h"
 
+namespace orc {
+class MemoryPool;
+}
+namespace arrow {
+class MemoryPool;
+}
+
 namespace doris {
 namespace vectorized {
 class VDataStreamMgr;
@@ -305,6 +312,9 @@ public:
     segment_v2::TmpFileDirs* get_tmp_file_dirs() { return 
_tmp_file_dirs.get(); }
     io::FDCache* file_cache_open_fd_cache() const { return 
_file_cache_open_fd_cache.get(); }
 
+    orc::MemoryPool* orc_memory_pool() { return _orc_memory_pool; }
+    arrow::MemoryPool* arrow_memory_pool() { return _arrow_memory_pool; }
+
 private:
     ExecEnv();
 
@@ -435,6 +445,9 @@ private:
     std::unique_ptr<pipeline::PipelineTracerContext> _pipeline_tracer_ctx;
     std::unique_ptr<segment_v2::TmpFileDirs> _tmp_file_dirs;
     doris::vectorized::SpillStreamManager* _spill_stream_mgr = nullptr;
+
+    orc::MemoryPool* _orc_memory_pool = nullptr;
+    arrow::MemoryPool* _arrow_memory_pool = nullptr;
 };
 
 template <>
diff --git a/be/src/runtime/exec_env_init.cpp b/be/src/runtime/exec_env_init.cpp
index 32fbc4e0af4..6740f548761 100644
--- a/be/src/runtime/exec_env_init.cpp
+++ b/be/src/runtime/exec_env_init.cpp
@@ -100,6 +100,8 @@
 #include "util/threadpool.h"
 #include "util/thrift_rpc_helper.h"
 #include "util/timezone_utils.h"
+#include "vec/exec/format/orc/orc_memory_pool.h"
+#include "vec/exec/format/parquet/arrow_memory_pool.h"
 #include "vec/exec/scan/scanner_scheduler.h"
 #include "vec/runtime/vdata_stream_mgr.h"
 #include "vec/sink/delta_writer_v2_pool.h"
@@ -573,6 +575,10 @@ Status ExecEnv::_init_mem_env() {
               << PrettyPrinter::print(inverted_index_cache_limit, TUnit::BYTES)
               << ", origin config value: " << 
config::inverted_index_query_cache_limit;
 
+    // init orc memory pool
+    _orc_memory_pool = new doris::vectorized::ORCMemoryPool();
+    _arrow_memory_pool = new doris::vectorized::ArrowMemoryPool();
+
     return Status::OK();
 }
 
@@ -751,6 +757,9 @@ void ExecEnv::destroy() {
     // We should free task scheduler finally because task queue / scheduler 
maybe used by pipelineX.
     SAFE_DELETE(_without_group_task_scheduler);
 
+    SAFE_DELETE(_arrow_memory_pool);
+    SAFE_DELETE(_orc_memory_pool);
+
     // dns cache is a global instance and need to be released at last
     SAFE_DELETE(_dns_cache);
 
diff --git a/be/src/util/faststring.h b/be/src/util/faststring.h
index 8d9fa6d004f..3ec0acbda01 100644
--- a/be/src/util/faststring.h
+++ b/be/src/util/faststring.h
@@ -35,7 +35,7 @@ namespace doris {
 // common use cases (in particular, resize() will fill with uninitialized data
 // instead of memsetting to \0)
 // only build() can transfer data to the outside.
-class faststring : private Allocator<false, false, false> {
+class faststring : private Allocator<false, false, false, 
DefaultMemoryAllocator> {
 public:
     enum { kInitialCapacity = 32 };
 
diff --git a/be/src/util/slice.h b/be/src/util/slice.h
index 80f9616f3da..bae33d4ee75 100644
--- a/be/src/util/slice.h
+++ b/be/src/util/slice.h
@@ -340,7 +340,7 @@ struct SliceMap {
 //
 // only receive the memory allocated by Allocator and disables mmap,
 // otherwise the memory may not be freed correctly, currently only be 
constructed by faststring.
-class OwnedSlice : private Allocator<false, false, false> {
+class OwnedSlice : private Allocator<false, false, false, 
DefaultMemoryAllocator> {
 public:
     OwnedSlice() : _slice((uint8_t*)nullptr, 0) {}
 
diff --git a/be/src/vec/common/allocator.cpp b/be/src/vec/common/allocator.cpp
index 2b1c05533cd..879e98d0ca4 100644
--- a/be/src/vec/common/allocator.cpp
+++ b/be/src/vec/common/allocator.cpp
@@ -40,8 +40,12 @@
 #include "util/stack_util.h"
 #include "util/uid_util.h"
 
-template <bool clear_memory_, bool mmap_populate, bool use_mmap>
-void Allocator<clear_memory_, mmap_populate, 
use_mmap>::sys_memory_check(size_t size) const {
+std::unordered_map<void*, size_t> RecordSizeMemoryAllocator::_allocated_sizes;
+std::mutex RecordSizeMemoryAllocator::_mutex;
+
+template <bool clear_memory_, bool mmap_populate, bool use_mmap, typename 
MemoryAllocator>
+void Allocator<clear_memory_, mmap_populate, use_mmap, 
MemoryAllocator>::sys_memory_check(
+        size_t size) const {
 #ifdef BE_TEST
     if (!doris::ExecEnv::ready()) {
         return;
@@ -155,8 +159,9 @@ void Allocator<clear_memory_, mmap_populate, 
use_mmap>::sys_memory_check(size_t
     }
 }
 
-template <bool clear_memory_, bool mmap_populate, bool use_mmap>
-void Allocator<clear_memory_, mmap_populate, 
use_mmap>::memory_tracker_check(size_t size) const {
+template <bool clear_memory_, bool mmap_populate, bool use_mmap, typename 
MemoryAllocator>
+void Allocator<clear_memory_, mmap_populate, use_mmap, 
MemoryAllocator>::memory_tracker_check(
+        size_t size) const {
 #ifdef BE_TEST
     if (!doris::ExecEnv::ready()) {
         return;
@@ -191,24 +196,27 @@ void Allocator<clear_memory_, mmap_populate, 
use_mmap>::memory_tracker_check(siz
     }
 }
 
-template <bool clear_memory_, bool mmap_populate, bool use_mmap>
-void Allocator<clear_memory_, mmap_populate, use_mmap>::memory_check(size_t 
size) const {
+template <bool clear_memory_, bool mmap_populate, bool use_mmap, typename 
MemoryAllocator>
+void Allocator<clear_memory_, mmap_populate, use_mmap, 
MemoryAllocator>::memory_check(
+        size_t size) const {
     sys_memory_check(size);
     memory_tracker_check(size);
 }
 
-template <bool clear_memory_, bool mmap_populate, bool use_mmap>
-void Allocator<clear_memory_, mmap_populate, use_mmap>::consume_memory(size_t 
size) const {
+template <bool clear_memory_, bool mmap_populate, bool use_mmap, typename 
MemoryAllocator>
+void Allocator<clear_memory_, mmap_populate, use_mmap, 
MemoryAllocator>::consume_memory(
+        size_t size) const {
     CONSUME_THREAD_MEM_TRACKER(size);
 }
 
-template <bool clear_memory_, bool mmap_populate, bool use_mmap>
-void Allocator<clear_memory_, mmap_populate, use_mmap>::release_memory(size_t 
size) const {
+template <bool clear_memory_, bool mmap_populate, bool use_mmap, typename 
MemoryAllocator>
+void Allocator<clear_memory_, mmap_populate, use_mmap, 
MemoryAllocator>::release_memory(
+        size_t size) const {
     RELEASE_THREAD_MEM_TRACKER(size);
 }
 
-template <bool clear_memory_, bool mmap_populate, bool use_mmap>
-void Allocator<clear_memory_, mmap_populate, use_mmap>::throw_bad_alloc(
+template <bool clear_memory_, bool mmap_populate, bool use_mmap, typename 
MemoryAllocator>
+void Allocator<clear_memory_, mmap_populate, use_mmap, 
MemoryAllocator>::throw_bad_alloc(
         const std::string& err) const {
     LOG(WARNING) << err
                  << fmt::format("{}, Stacktrace: {}",
@@ -219,9 +227,9 @@ void Allocator<clear_memory_, mmap_populate, 
use_mmap>::throw_bad_alloc(
 }
 
 #ifndef NDEBUG
-template <bool clear_memory_, bool mmap_populate, bool use_mmap>
-void Allocator<clear_memory_, mmap_populate, 
use_mmap>::add_address_sanitizers(void* buf,
-                                                                               
size_t size) const {
+template <bool clear_memory_, bool mmap_populate, bool use_mmap, typename 
MemoryAllocator>
+void Allocator<clear_memory_, mmap_populate, use_mmap, 
MemoryAllocator>::add_address_sanitizers(
+        void* buf, size_t size) const {
 #ifdef BE_TEST
     if (!doris::ExecEnv::ready()) {
         return;
@@ -230,8 +238,8 @@ void Allocator<clear_memory_, mmap_populate, 
use_mmap>::add_address_sanitizers(v
     doris::thread_context()->thread_mem_tracker()->add_address_sanitizers(buf, 
size);
 }
 
-template <bool clear_memory_, bool mmap_populate, bool use_mmap>
-void Allocator<clear_memory_, mmap_populate, 
use_mmap>::remove_address_sanitizers(
+template <bool clear_memory_, bool mmap_populate, bool use_mmap, typename 
MemoryAllocator>
+void Allocator<clear_memory_, mmap_populate, use_mmap, 
MemoryAllocator>::remove_address_sanitizers(
         void* buf, size_t size) const {
 #ifdef BE_TEST
     if (!doris::ExecEnv::ready()) {
@@ -242,23 +250,43 @@ void Allocator<clear_memory_, mmap_populate, 
use_mmap>::remove_address_sanitizer
 }
 #endif
 
-template <bool clear_memory_, bool mmap_populate, bool use_mmap>
-void* Allocator<clear_memory_, mmap_populate, use_mmap>::alloc(size_t size, 
size_t alignment) {
+template <bool clear_memory_, bool mmap_populate, bool use_mmap, typename 
MemoryAllocator>
+void* Allocator<clear_memory_, mmap_populate, use_mmap, 
MemoryAllocator>::alloc(size_t size,
+                                                                               
 size_t alignment) {
     return alloc_impl(size, alignment);
 }
 
-template <bool clear_memory_, bool mmap_populate, bool use_mmap>
-void* Allocator<clear_memory_, mmap_populate, use_mmap>::realloc(void* buf, 
size_t old_size,
-                                                                 size_t 
new_size,
-                                                                 size_t 
alignment) {
+template <bool clear_memory_, bool mmap_populate, bool use_mmap, typename 
MemoryAllocator>
+void* Allocator<clear_memory_, mmap_populate, use_mmap, 
MemoryAllocator>::realloc(
+        void* buf, size_t old_size, size_t new_size, size_t alignment) {
     return realloc_impl(buf, old_size, new_size, alignment);
 }
 
-template class Allocator<true, true, true>;
-template class Allocator<true, true, false>;
-template class Allocator<true, false, true>;
-template class Allocator<true, false, false>;
-template class Allocator<false, true, true>;
-template class Allocator<false, true, false>;
-template class Allocator<false, false, true>;
-template class Allocator<false, false, false>;
+template class Allocator<true, true, true, DefaultMemoryAllocator>;
+template class Allocator<true, true, false, DefaultMemoryAllocator>;
+template class Allocator<true, false, true, DefaultMemoryAllocator>;
+template class Allocator<true, false, false, DefaultMemoryAllocator>;
+template class Allocator<false, true, true, DefaultMemoryAllocator>;
+template class Allocator<false, true, false, DefaultMemoryAllocator>;
+template class Allocator<false, false, true, DefaultMemoryAllocator>;
+template class Allocator<false, false, false, DefaultMemoryAllocator>;
+
+/** It would be better to put these Memory Allocators where they are used, 
such as in the orc memory pool and arrow memory pool.
+  * But currently allocators use templates in .cpp instead of all in .h, so 
they can only be placed here.
+  */
+template class Allocator<true, true, false, ORCMemoryAllocator>;
+template class Allocator<true, false, true, ORCMemoryAllocator>;
+template class Allocator<true, false, false, ORCMemoryAllocator>;
+template class Allocator<false, true, true, ORCMemoryAllocator>;
+template class Allocator<false, true, false, ORCMemoryAllocator>;
+template class Allocator<false, false, true, ORCMemoryAllocator>;
+template class Allocator<false, false, false, ORCMemoryAllocator>;
+
+template class Allocator<true, true, true, RecordSizeMemoryAllocator>;
+template class Allocator<true, true, false, RecordSizeMemoryAllocator>;
+template class Allocator<true, false, true, RecordSizeMemoryAllocator>;
+template class Allocator<true, false, false, RecordSizeMemoryAllocator>;
+template class Allocator<false, true, true, RecordSizeMemoryAllocator>;
+template class Allocator<false, true, false, RecordSizeMemoryAllocator>;
+template class Allocator<false, false, true, RecordSizeMemoryAllocator>;
+template class Allocator<false, false, false, RecordSizeMemoryAllocator>;
diff --git a/be/src/vec/common/allocator.h b/be/src/vec/common/allocator.h
index 3c513f270bb..88c85dceeb3 100644
--- a/be/src/vec/common/allocator.h
+++ b/be/src/vec/common/allocator.h
@@ -23,6 +23,10 @@
 // TODO: Readable
 
 #include <fmt/format.h>
+#if defined(USE_JEMALLOC)
+#include <jemalloc/jemalloc.h>
+#endif // defined(USE_JEMALLOC)
+#include <malloc.h>
 #include <stdint.h>
 #include <string.h>
 
@@ -68,6 +72,128 @@ static constexpr size_t MALLOC_MIN_ALIGNMENT = 8;
 // is always a multiple of sixteen. 
(https://www.gnu.org/software/libc/manual/html_node/Aligned-Memory-Blocks.html)
 static constexpr int ALLOCATOR_ALIGNMENT_16 = 16;
 
+class DefaultMemoryAllocator {
+public:
+    static void* malloc(size_t size) __THROW { return std::malloc(size); }
+
+    static void* calloc(size_t n, size_t size) __THROW { return std::calloc(n, 
size); }
+
+    static constexpr bool need_record_actual_size() { return false; }
+
+    static int posix_memalign(void** ptr, size_t alignment, size_t size) 
__THROW {
+        return ::posix_memalign(ptr, alignment, size);
+    }
+
+    static void* realloc(void* ptr, size_t size) __THROW { return 
std::realloc(ptr, size); }
+
+    static void free(void* p) __THROW { std::free(p); }
+
+    static void release_unused() {
+#if defined(USE_JEMALLOC)
+        jemallctl(fmt::format("arena.{}.purge", MALLCTL_ARENAS_ALL).c_str(), 
NULL, NULL, NULL, 0);
+#endif // defined(USE_JEMALLOC)
+    }
+};
+
+/** It would be better to put these Memory Allocators where they are used, 
such as in the orc memory pool and arrow memory pool.
+  * But currently allocators use templates in .cpp instead of all in .h, so 
they can only be placed here.
+  */
+class ORCMemoryAllocator {
+public:
+    static void* malloc(size_t size) __THROW { return 
reinterpret_cast<char*>(std::malloc(size)); }
+
+    static void* calloc(size_t n, size_t size) __THROW { return std::calloc(n, 
size); }
+
+    static constexpr bool need_record_actual_size() { return true; }
+
+    static size_t allocated_size(void* ptr) { return malloc_usable_size(ptr); }
+
+    static int posix_memalign(void** ptr, size_t alignment, size_t size) 
__THROW {
+        return ::posix_memalign(ptr, alignment, size);
+    }
+
+    static void* realloc(void* ptr, size_t size) __THROW {
+        LOG(FATAL) << "__builtin_unreachable";
+        __builtin_unreachable();
+    }
+
+    static void free(void* p) __THROW { std::free(p); }
+
+    static void release_unused() {}
+};
+
+class RecordSizeMemoryAllocator {
+public:
+    static void* malloc(size_t size) __THROW {
+        void* p = std::malloc(size);
+        if (p) {
+            std::lock_guard<std::mutex> lock(_mutex);
+            _allocated_sizes[p] = size;
+        }
+        return p;
+    }
+
+    static void* calloc(size_t n, size_t size) __THROW {
+        void* p = std::calloc(n, size);
+        if (p) {
+            std::lock_guard<std::mutex> lock(_mutex);
+            _allocated_sizes[p] = n * size;
+        }
+        return p;
+    }
+
+    static constexpr bool need_record_actual_size() { return false; }
+
+    static size_t allocated_size(void* ptr) {
+        std::lock_guard<std::mutex> lock(_mutex);
+        auto it = _allocated_sizes.find(ptr);
+        if (it != _allocated_sizes.end()) {
+            return it->second;
+        }
+        return 0;
+    }
+
+    static int posix_memalign(void** ptr, size_t alignment, size_t size) 
__THROW {
+        int ret = ::posix_memalign(ptr, alignment, size);
+        if (ret == 0 && *ptr) {
+            std::lock_guard<std::mutex> lock(_mutex);
+            _allocated_sizes[*ptr] = size;
+        }
+        return ret;
+    }
+
+    static void* realloc(void* ptr, size_t size) __THROW {
+        std::lock_guard<std::mutex> lock(_mutex);
+
+        auto it = _allocated_sizes.find(ptr);
+        if (it != _allocated_sizes.end()) {
+            _allocated_sizes.erase(it);
+        }
+
+        void* p = std::realloc(ptr, size);
+
+        if (p) {
+            _allocated_sizes[p] = size;
+        }
+
+        return p;
+    }
+
+    static void free(void* p) __THROW {
+        if (p) {
+            std::lock_guard<std::mutex> lock(_mutex);
+            _allocated_sizes.erase(p);
+            std::free(p);
+        }
+    }
+
+    static void release_unused() {}
+
+private:
+    static std::unordered_map<void*, size_t> _allocated_sizes;
+    static std::mutex _mutex;
+};
+
 /** Responsible for allocating / freeing memory. Used, for example, in 
PODArray, Arena.
   * Also used in hash tables.
   * The interface is different from std::allocator
@@ -78,7 +204,7 @@ static constexpr int ALLOCATOR_ALIGNMENT_16 = 16;
   * - random hint address for mmap
   * - mmap_threshold for using mmap less or more
   */
-template <bool clear_memory_, bool mmap_populate, bool use_mmap>
+template <bool clear_memory_, bool mmap_populate, bool use_mmap, typename 
MemoryAllocator>
 class Allocator {
 public:
     void sys_memory_check(size_t size) const;
@@ -104,6 +230,7 @@ public:
         // consume memory in tracker before alloc, similar to early 
declaration.
         consume_memory(size);
         void* buf;
+        size_t record_size = size;
 
         if (use_mmap && size >= doris::config::mmap_threshold) {
             if (alignment > MMAP_MIN_ALIGNMENT)
@@ -117,38 +244,51 @@ public:
                 release_memory(size);
                 throw_bad_alloc(fmt::format("Allocator: Cannot mmap {}.", 
size));
             }
+            if constexpr (MemoryAllocator::need_record_actual_size()) {
+                record_size = MemoryAllocator::allocated_size(buf);
+            }
 
             /// No need for zero-fill, because mmap guarantees it.
         } else {
             if (alignment <= MALLOC_MIN_ALIGNMENT) {
                 if constexpr (clear_memory)
-                    buf = ::calloc(size, 1);
+                    buf = MemoryAllocator::calloc(size, 1);
                 else
-                    buf = ::malloc(size);
+                    buf = MemoryAllocator::malloc(size);
 
                 if (nullptr == buf) {
                     release_memory(size);
                     throw_bad_alloc(fmt::format("Allocator: Cannot malloc 
{}.", size));
                 }
+                if constexpr (MemoryAllocator::need_record_actual_size()) {
+                    record_size = MemoryAllocator::allocated_size(buf);
+                }
 #ifndef NDEBUG
-                add_address_sanitizers(buf, size);
+                add_address_sanitizers(buf, record_size);
 #endif
             } else {
                 buf = nullptr;
-                int res = posix_memalign(&buf, alignment, size);
+                int res = MemoryAllocator::posix_memalign(&buf, alignment, 
size);
 
                 if (0 != res) {
                     release_memory(size);
                     throw_bad_alloc(
                             fmt::format("Cannot allocate memory 
(posix_memalign) {}.", size));
                 }
-#ifndef NDEBUG
-                add_address_sanitizers(buf, size);
-#endif
 
                 if constexpr (clear_memory) memset(buf, 0, size);
+
+                if constexpr (MemoryAllocator::need_record_actual_size()) {
+                    record_size = MemoryAllocator::allocated_size(buf);
+                }
+#ifndef NDEBUG
+                add_address_sanitizers(buf, record_size);
+#endif
             }
         }
+        if constexpr (MemoryAllocator::need_record_actual_size()) {
+            consume_memory(record_size - size);
+        }
         return buf;
     }
 
@@ -162,11 +302,13 @@ public:
 #ifndef NDEBUG
             remove_address_sanitizers(buf, size);
 #endif
-            ::free(buf);
+            MemoryAllocator::free(buf);
         }
         release_memory(size);
     }
 
+    void release_unused() { MemoryAllocator::release_unused(); }
+
     /** Enlarge memory range.
       * Data from old range is moved to the beginning of new range.
       * Address of memory range could change.
@@ -187,7 +329,7 @@ public:
             remove_address_sanitizers(buf, old_size);
 #endif
             /// Resize malloc'd memory region with no special alignment 
requirement.
-            void* new_buf = ::realloc(buf, new_size);
+            void* new_buf = MemoryAllocator::realloc(buf, new_size);
             if (nullptr == new_buf) {
                 release_memory(new_size - old_size);
                 throw_bad_alloc(fmt::format("Allocator: Cannot realloc from {} 
to {}.", old_size,
@@ -195,7 +337,8 @@ public:
             }
 #ifndef NDEBUG
             add_address_sanitizers(
-                    new_buf, new_size); // usually, buf addr = new_buf addr, 
asan maybe not equal.
+                    new_buf,
+                    new_size); // usually, buf addr = new_buf addr, asan maybe 
not equal.
 #endif
 
             buf = new_buf;
diff --git a/be/src/vec/common/allocator_fwd.h 
b/be/src/vec/common/allocator_fwd.h
index 988f7a5c7af..da43030a133 100644
--- a/be/src/vec/common/allocator_fwd.h
+++ b/be/src/vec/common/allocator_fwd.h
@@ -24,7 +24,9 @@
 #pragma once
 
 #include <cstddef>
-template <bool clear_memory_, bool mmap_populate = false, bool use_mmap = 
false>
+class DefaultMemoryAllocator;
+template <bool clear_memory_, bool mmap_populate = false, bool use_mmap = 
false,
+          typename MemoryAllocator = DefaultMemoryAllocator>
 class Allocator;
 
 template <typename Base, size_t N = 64, size_t Alignment = 1>
diff --git a/be/src/vec/common/hash_table/phmap_fwd_decl.h 
b/be/src/vec/common/hash_table/phmap_fwd_decl.h
index 62373410968..c6ff77a7e71 100644
--- a/be/src/vec/common/hash_table/phmap_fwd_decl.h
+++ b/be/src/vec/common/hash_table/phmap_fwd_decl.h
@@ -26,7 +26,7 @@ namespace doris::vectorized {
 /// `Allocator_` implements several interfaces of `std::allocator`
 /// which `phmap::flat_hash_map` will use.
 template <typename T>
-class Allocator_ : private Allocator<true, false, false> {
+class Allocator_ : private Allocator<true, false, false, 
DefaultMemoryAllocator> {
 public:
     using value_type = T;
     using pointer = T*;
diff --git a/be/src/vec/exec/format/orc/orc_memory_pool.h 
b/be/src/vec/exec/format/orc/orc_memory_pool.h
new file mode 100644
index 00000000000..1df3d63f952
--- /dev/null
+++ b/be/src/vec/exec/format/orc/orc_memory_pool.h
@@ -0,0 +1,53 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#pragma once
+
+#include "orc/MemoryPool.hh"
+#include "vec/common/allocator.h"
+
+#if defined(ADDRESS_SANITIZER) || defined(LEAK_SANITIZER) || 
defined(THREAD_SANITIZER)
+using ORC_MEMORY_ALLOCATOR = RecordSizeMemoryAllocator;
+#else
+using ORC_MEMORY_ALLOCATOR = ORCMemoryAllocator;
+#endif
+
+namespace doris::vectorized {
+
+class ORCMemoryPool : public orc::MemoryPool {
+public:
+    char* malloc(uint64_t size) override {
+        char* p = reinterpret_cast<char*>(_allocator.alloc(size));
+        return p;
+    }
+
+    void free(char* p) override {
+        if (p == nullptr) {
+            return;
+        }
+        size_t size = ORC_MEMORY_ALLOCATOR::allocated_size(p);
+        _allocator.free(p, size);
+    }
+
+    ORCMemoryPool() = default;
+    ~ORCMemoryPool() override = default;
+
+private:
+    Allocator<false, false, false, ORC_MEMORY_ALLOCATOR> _allocator;
+};
+
+} // namespace doris::vectorized
diff --git a/be/src/vec/exec/format/orc/vorc_reader.cpp 
b/be/src/vec/exec/format/orc/vorc_reader.cpp
index e2ba3a57be8..b70e3496133 100644
--- a/be/src/vec/exec/format/orc/vorc_reader.cpp
+++ b/be/src/vec/exec/format/orc/vorc_reader.cpp
@@ -71,6 +71,7 @@
 #include "vec/data_types/data_type_map.h"
 #include "vec/data_types/data_type_nullable.h"
 #include "vec/data_types/data_type_struct.h"
+#include "vec/exec/format/orc/orc_memory_pool.h"
 #include "vec/exec/format/table/transactional_hive_common.h"
 #include "vec/exprs/vbloom_predicate.h"
 #include "vec/exprs/vdirect_in_predicate.h"
@@ -252,6 +253,7 @@ Status OrcReader::_create_file_reader() {
     // create orc reader
     try {
         orc::ReaderOptions options;
+        options.setMemoryPool(*ExecEnv::GetInstance()->orc_memory_pool());
         _reader = orc::createReader(
                 
std::unique_ptr<ORCFileInputStream>(_file_input_stream.release()), options);
     } catch (std::exception& e) {
diff --git a/be/src/vec/exec/format/parquet/arrow_memory_pool.cpp 
b/be/src/vec/exec/format/parquet/arrow_memory_pool.cpp
new file mode 100644
index 00000000000..ed06e5c821a
--- /dev/null
+++ b/be/src/vec/exec/format/parquet/arrow_memory_pool.cpp
@@ -0,0 +1,74 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#include "vec/exec/format/parquet/arrow_memory_pool.h"
+
+#include "glog/logging.h"
+
+namespace doris::vectorized {
+
+// A static piece of memory for 0-size allocations, so as to return
+// an aligned non-null pointer.  Note the correct value for DebugAllocator
+// checks is hardcoded.
+alignas(kDefaultBufferAlignment) int64_t zero_size_area[1] = {kDebugXorSuffix};
+
+arrow::Status ArrowAllocator::allocate_aligned(int64_t size, int64_t 
alignment, uint8_t** out) {
+    if (size == 0) {
+        *out = kZeroSizeArea;
+        return arrow::Status::OK();
+    }
+    *out = reinterpret_cast<uint8_t*>(_allocator.alloc(size, alignment));
+    if (*out == nullptr) {
+        return arrow::Status::OutOfMemory("malloc of size ", size, " failed");
+    }
+    return arrow::Status::OK();
+}
+
+arrow::Status ArrowAllocator::reallocate_aligned(int64_t old_size, int64_t 
new_size,
+                                                 int64_t alignment, uint8_t** 
ptr) {
+    uint8_t* previous_ptr = *ptr;
+    if (previous_ptr == kZeroSizeArea) {
+        DCHECK_EQ(old_size, 0);
+        return allocate_aligned(new_size, alignment, ptr);
+    }
+    if (new_size == 0) {
+        deallocate_aligned(previous_ptr, old_size, alignment);
+        *ptr = kZeroSizeArea;
+        return arrow::Status::OK();
+    }
+    *ptr = reinterpret_cast<uint8_t*>(_allocator.realloc(*ptr, 
static_cast<size_t>(old_size),
+                                                         
static_cast<size_t>(new_size), alignment));
+    if (*ptr == nullptr) {
+        *ptr = previous_ptr;
+        return arrow::Status::OutOfMemory("realloc of size ", new_size, " 
failed");
+    }
+    return arrow::Status::OK();
+}
+
+void ArrowAllocator::deallocate_aligned(uint8_t* ptr, int64_t size, int64_t 
alignment) {
+    if (ptr == kZeroSizeArea) {
+        DCHECK_EQ(size, 0);
+    } else {
+        _allocator.free(ptr, static_cast<size_t>(size));
+    }
+}
+
+void ArrowAllocator::release_unused() {
+    _allocator.release_unused();
+}
+
+} // namespace doris::vectorized
diff --git a/be/src/vec/exec/format/parquet/arrow_memory_pool.h 
b/be/src/vec/exec/format/parquet/arrow_memory_pool.h
new file mode 100644
index 00000000000..a93e426f374
--- /dev/null
+++ b/be/src/vec/exec/format/parquet/arrow_memory_pool.h
@@ -0,0 +1,199 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#pragma once
+
+#include "arrow/memory_pool.h"
+#include "arrow/status.h"
+#include "vec/common/allocator.h"
+#include "vec/common/allocator_fwd.h"
+
+namespace doris::vectorized {
+
+constexpr int64_t kDefaultBufferAlignment = 64;
+static constexpr int64_t kDebugXorSuffix = -0x181fe80e0b464188LL;
+#ifndef NDEBUG
+static constexpr uint8_t kAllocPoison = 0xBC;
+static constexpr uint8_t kReallocPoison = 0xBD;
+static constexpr uint8_t kDeallocPoison = 0xBE;
+#endif
+
+// A static piece of memory for 0-size allocations, so as to return
+// an aligned non-null pointer.  Note the correct value for DebugAllocator
+// checks is hardcoded.
+extern int64_t zero_size_area[1];
+static uint8_t* const kZeroSizeArea = 
reinterpret_cast<uint8_t*>(&zero_size_area);
+
+using ARROW_MEMORY_ALLOCATOR = DefaultMemoryAllocator;
+
+class ArrowAllocator {
+public:
+    arrow::Status allocate_aligned(int64_t size, int64_t alignment, uint8_t** 
out);
+    arrow::Status reallocate_aligned(int64_t old_size, int64_t new_size, 
int64_t alignment,
+                                     uint8_t** ptr);
+    void deallocate_aligned(uint8_t* ptr, int64_t size, int64_t alignment);
+    void release_unused();
+
+private:
+    Allocator<false, false, false, ARROW_MEMORY_ALLOCATOR> _allocator;
+};
+
+///////////////////////////////////////////////////////////////////////
+// Helper tracking memory statistics
+
+/// \brief Memory pool statistics
+///
+/// 64-byte aligned so that all atomic values are on the same cache line.
+class alignas(64) ArrowMemoryPoolStats {
+private:
+    // All atomics are updated according to Acquire-Release ordering.
+    // 
https://en.cppreference.com/w/cpp/atomic/memory_order#Release-Acquire_ordering
+    //
+    // max_memory_, total_allocated_bytes_, and num_allocs_ only go up (they 
are
+    // monotonically increasing) which can allow some optimizations.
+    std::atomic<int64_t> max_memory_ {0};
+    std::atomic<int64_t> bytes_allocated_ {0};
+    std::atomic<int64_t> total_allocated_bytes_ {0};
+    std::atomic<int64_t> num_allocs_ {0};
+
+public:
+    int64_t max_memory() const { return 
max_memory_.load(std::memory_order_acquire); }
+
+    int64_t bytes_allocated() const { return 
bytes_allocated_.load(std::memory_order_acquire); }
+
+    int64_t total_bytes_allocated() const {
+        return total_allocated_bytes_.load(std::memory_order_acquire);
+    }
+
+    int64_t num_allocations() const { return 
num_allocs_.load(std::memory_order_acquire); }
+
+    inline void did_allocate_bytes(int64_t size) {
+        // Issue the load before everything else. max_memory_ is monotonically 
increasing,
+        // so we can use a relaxed load before the read-modify-write.
+        auto max_memory = max_memory_.load(std::memory_order_relaxed);
+        const auto old_bytes_allocated =
+                bytes_allocated_.fetch_add(size, std::memory_order_acq_rel);
+        // Issue store operations on values that we don't depend on to proceed
+        // with execution. When done, max_memory and old_bytes_allocated have
+        // a higher chance of being available on CPU registers. This also has 
the
+        // nice side-effect of putting 3 atomic stores close to each other in 
the
+        // instruction stream.
+        total_allocated_bytes_.fetch_add(size, std::memory_order_acq_rel);
+        num_allocs_.fetch_add(1, std::memory_order_acq_rel);
+
+        // If other threads are updating max_memory_ concurrently we leave the 
loop without
+        // updating knowing that it already reached a value even higher than 
ours.
+        const auto allocated = old_bytes_allocated + size;
+        while (max_memory < allocated &&
+               !max_memory_.compare_exchange_weak(
+                       /*expected=*/max_memory, /*desired=*/allocated, 
std::memory_order_acq_rel)) {
+        }
+    }
+
+    inline void did_reallocate_bytes(int64_t old_size, int64_t new_size) {
+        if (new_size > old_size) {
+            did_allocate_bytes(new_size - old_size);
+        } else {
+            did_free_bytes(old_size - new_size);
+        }
+    }
+
+    inline void did_free_bytes(int64_t size) {
+        bytes_allocated_.fetch_sub(size, std::memory_order_acq_rel);
+    }
+};
+
+template <typename Allocator = ArrowAllocator>
+class ArrowMemoryPool : public arrow::MemoryPool {
+public:
+    ~ArrowMemoryPool() override = default;
+
+    arrow::Status Allocate(int64_t size, int64_t alignment, uint8_t** out) 
override {
+        if (size < 0) {
+            return arrow::Status::Invalid("negative malloc size");
+        }
+        if (static_cast<uint64_t>(size) >= std::numeric_limits<size_t>::max()) 
{
+            return arrow::Status::OutOfMemory("malloc size overflows size_t");
+        }
+        RETURN_NOT_OK(_allocator.allocate_aligned(size, alignment, out));
+#ifndef NDEBUG
+        // Poison data
+        if (size > 0) {
+            DCHECK_NE(*out, nullptr);
+            (*out)[0] = kAllocPoison;
+            (*out)[size - 1] = kAllocPoison;
+        }
+#endif
+
+        _stats.did_allocate_bytes(size);
+        return arrow::Status::OK();
+    }
+
+    arrow::Status Reallocate(int64_t old_size, int64_t new_size, int64_t 
alignment,
+                             uint8_t** ptr) override {
+        if (new_size < 0) {
+            return arrow::Status::Invalid("negative realloc size");
+        }
+        if (static_cast<uint64_t>(new_size) >= 
std::numeric_limits<size_t>::max()) {
+            return arrow::Status::OutOfMemory("realloc overflows size_t");
+        }
+        RETURN_NOT_OK(_allocator.reallocate_aligned(old_size, new_size, 
alignment, ptr));
+#ifndef NDEBUG
+        // Poison data
+        if (new_size > old_size) {
+            DCHECK_NE(*ptr, nullptr);
+            (*ptr)[old_size] = kReallocPoison;
+            (*ptr)[new_size - 1] = kReallocPoison;
+        }
+#endif
+
+        _stats.did_reallocate_bytes(old_size, new_size);
+        return arrow::Status::OK();
+    }
+
+    void Free(uint8_t* buffer, int64_t size, int64_t alignment) override {
+#ifndef NDEBUG
+        // Poison data
+        if (size > 0) {
+            DCHECK_NE(buffer, nullptr);
+            buffer[0] = kDeallocPoison;
+            buffer[size - 1] = kDeallocPoison;
+        }
+#endif
+        _allocator.deallocate_aligned(buffer, size, alignment);
+
+        _stats.did_free_bytes(size);
+    }
+
+    void ReleaseUnused() override { _allocator.release_unused(); }
+
+    int64_t bytes_allocated() const override { return 
_stats.bytes_allocated(); }
+
+    int64_t max_memory() const override { return _stats.max_memory(); }
+
+    int64_t total_bytes_allocated() const override { return 
_stats.total_bytes_allocated(); }
+
+    int64_t num_allocations() const override { return 
_stats.num_allocations(); }
+
+    std::string backend_name() const override { return "ArrowMemoryPool"; }
+
+protected:
+    ArrowMemoryPoolStats _stats;
+    Allocator _allocator;
+};
+
+} // namespace doris::vectorized
diff --git a/be/src/vec/runtime/vorc_transformer.cpp 
b/be/src/vec/runtime/vorc_transformer.cpp
index 09bae276d65..6c512a94373 100644
--- a/be/src/vec/runtime/vorc_transformer.cpp
+++ b/be/src/vec/runtime/vorc_transformer.cpp
@@ -31,6 +31,7 @@
 #include "orc/OrcFile.hh"
 #include "orc/Vector.hh"
 #include "runtime/define_primitive_type.h"
+#include "runtime/exec_env.h"
 #include "runtime/runtime_state.h"
 #include "runtime/types.h"
 #include "util/binary_cast.hpp"
@@ -151,6 +152,7 @@ Status VOrcTransformer::open() {
 
     _output_stream = std::make_unique<VOrcOutputStream>(_file_writer);
     try {
+        
_write_options->setMemoryPool(ExecEnv::GetInstance()->orc_memory_pool());
         _writer = orc::createWriter(*_schema, _output_stream.get(), 
*_write_options);
     } catch (const std::exception& e) {
         return Status::InternalError("failed to create writer: {}", e.what());
@@ -314,15 +316,15 @@ int64_t VOrcTransformer::written_len() {
 }
 
 Status VOrcTransformer::close() {
-    if (_writer != nullptr) {
-        try {
+    try {
+        if (_writer != nullptr) {
             _writer->close();
-        } catch (const std::exception& e) {
-            return Status::IOError(e.what());
         }
-    }
-    if (_output_stream) {
-        _output_stream->close();
+        if (_output_stream) {
+            _output_stream->close();
+        }
+    } catch (const std::exception& e) {
+        return Status::IOError(e.what());
     }
     return Status::OK();
 }
@@ -353,13 +355,13 @@ Status VOrcTransformer::write(const Block& block) {
             RETURN_IF_ERROR(_serdes[i]->write_column_to_orc(
                     _state->timezone(), *raw_column, nullptr, root->fields[i], 
0, sz, buffer_list));
         }
+        root->numElements = sz;
+        _writer->add(*row_batch);
+        _cur_written_rows += sz;
     } catch (const std::exception& e) {
         LOG(WARNING) << "Orc write error: " << e.what();
         return Status::InternalError(e.what());
     }
-    root->numElements = sz;
-    _writer->add(*row_batch);
-    _cur_written_rows += sz;
 
     return Status::OK();
 }
diff --git a/be/src/vec/runtime/vparquet_transformer.cpp 
b/be/src/vec/runtime/vparquet_transformer.cpp
index 116a898c4f1..1969858349f 100644
--- a/be/src/vec/runtime/vparquet_transformer.cpp
+++ b/be/src/vec/runtime/vparquet_transformer.cpp
@@ -42,6 +42,7 @@
 #include "olap/olap_common.h"
 #include "runtime/decimalv2_value.h"
 #include "runtime/define_primitive_type.h"
+#include "runtime/exec_env.h"
 #include "runtime/runtime_state.h"
 #include "runtime/types.h"
 #include "util/arrow/block_convertor.h"
@@ -237,6 +238,7 @@ VParquetTransformer::VParquetTransformer(RuntimeState* 
state, doris::io::FileWri
 
 Status VParquetTransformer::_parse_properties() {
     try {
+        arrow::MemoryPool* pool = ExecEnv::GetInstance()->arrow_memory_pool();
         parquet::WriterProperties::Builder builder;
         ParquetBuildHelper::build_compression_type(builder, _compression_type);
         ParquetBuildHelper::build_version(builder, _parquet_version);
@@ -248,6 +250,7 @@ Status VParquetTransformer::_parse_properties() {
         builder.created_by(
                 fmt::format("{}({})", doris::get_short_version(), 
parquet::DEFAULT_CREATED_BY));
         builder.max_row_group_length(std::numeric_limits<int64_t>::max());
+        builder.memory_pool(pool);
         _parquet_writer_properties = builder.build();
         _arrow_properties = parquet::ArrowWriterProperties::Builder()
                                     .enable_deprecated_int96_timestamps()
@@ -292,8 +295,9 @@ Status VParquetTransformer::write(const Block& block) {
 
     // serialize
     std::shared_ptr<arrow::RecordBatch> result;
-    RETURN_IF_ERROR(convert_to_arrow_batch(block, _arrow_schema, 
arrow::default_memory_pool(),
-                                           &result, _state->timezone_obj()));
+    RETURN_IF_ERROR(convert_to_arrow_batch(block, _arrow_schema,
+                                           
ExecEnv::GetInstance()->arrow_memory_pool(), &result,
+                                           _state->timezone_obj()));
 
     RETURN_DORIS_STATUS_IF_ERROR(_writer->WriteRecordBatch(*result));
     _write_size += block.bytes();
@@ -305,9 +309,10 @@ Status VParquetTransformer::write(const Block& block) {
 }
 
 arrow::Status VParquetTransformer::_open_file_writer() {
-    ARROW_ASSIGN_OR_RAISE(_writer, parquet::arrow::FileWriter::Open(
-                                           *_arrow_schema, 
arrow::default_memory_pool(), _outstream,
-                                           _parquet_writer_properties, 
_arrow_properties));
+    ARROW_ASSIGN_OR_RAISE(_writer,
+                          parquet::arrow::FileWriter::Open(
+                                  *_arrow_schema, 
ExecEnv::GetInstance()->arrow_memory_pool(),
+                                  _outstream, _parquet_writer_properties, 
_arrow_properties));
     return arrow::Status::OK();
 }
 


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org
For additional commands, e-mail: commits-h...@doris.apache.org


Reply via email to