This is an automated email from the ASF dual-hosted git repository. morningman pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/master by this push: new c5a19982a6b [Feature](multi-catalog) Add memory tracker for orc reader/writer and arrow parquet writer。 (#37234) c5a19982a6b is described below commit c5a19982a6bc5c7cffc98c1694d106e9925ecc66 Author: Qi Chen <kaka11.c...@gmail.com> AuthorDate: Tue Jul 30 09:00:44 2024 +0800 [Feature](multi-catalog) Add memory tracker for orc reader/writer and arrow parquet writer。 (#37234) ## Proposed changes [Feature] (multi-catalog) Add memory tracker for orc reader/writer and arrow parquet writer。 ## Future work - Since the parquet reader is written by ourself and does not use the arrow third-party library, some memory usage needs to be added to the memory track. - Added read and write operator-level memory tracker to the profile. --- be/src/runtime/exec_env.h | 13 ++ be/src/runtime/exec_env_init.cpp | 9 + be/src/util/faststring.h | 2 +- be/src/util/slice.h | 2 +- be/src/vec/common/allocator.cpp | 90 ++++++---- be/src/vec/common/allocator.h | 165 +++++++++++++++-- be/src/vec/common/allocator_fwd.h | 4 +- be/src/vec/common/hash_table/phmap_fwd_decl.h | 2 +- be/src/vec/exec/format/orc/orc_memory_pool.h | 53 ++++++ be/src/vec/exec/format/orc/vorc_reader.cpp | 2 + .../vec/exec/format/parquet/arrow_memory_pool.cpp | 74 ++++++++ be/src/vec/exec/format/parquet/arrow_memory_pool.h | 199 +++++++++++++++++++++ be/src/vec/runtime/vorc_transformer.cpp | 22 +-- be/src/vec/runtime/vparquet_transformer.cpp | 15 +- 14 files changed, 591 insertions(+), 61 deletions(-) diff --git a/be/src/runtime/exec_env.h b/be/src/runtime/exec_env.h index 3434d01a59e..89e5593c84b 100644 --- a/be/src/runtime/exec_env.h +++ b/be/src/runtime/exec_env.h @@ -36,6 +36,13 @@ #include "runtime/frontend_info.h" // TODO(zhiqiang): find a way to remove this include header #include "util/threadpool.h" +namespace orc { +class MemoryPool; +} +namespace arrow { +class MemoryPool; +} + namespace doris { namespace vectorized { class VDataStreamMgr; @@ -305,6 +312,9 @@ public: segment_v2::TmpFileDirs* get_tmp_file_dirs() { return _tmp_file_dirs.get(); } io::FDCache* file_cache_open_fd_cache() const { return _file_cache_open_fd_cache.get(); } + orc::MemoryPool* orc_memory_pool() { return _orc_memory_pool; } + arrow::MemoryPool* arrow_memory_pool() { return _arrow_memory_pool; } + private: ExecEnv(); @@ -435,6 +445,9 @@ private: std::unique_ptr<pipeline::PipelineTracerContext> _pipeline_tracer_ctx; std::unique_ptr<segment_v2::TmpFileDirs> _tmp_file_dirs; doris::vectorized::SpillStreamManager* _spill_stream_mgr = nullptr; + + orc::MemoryPool* _orc_memory_pool = nullptr; + arrow::MemoryPool* _arrow_memory_pool = nullptr; }; template <> diff --git a/be/src/runtime/exec_env_init.cpp b/be/src/runtime/exec_env_init.cpp index 32fbc4e0af4..6740f548761 100644 --- a/be/src/runtime/exec_env_init.cpp +++ b/be/src/runtime/exec_env_init.cpp @@ -100,6 +100,8 @@ #include "util/threadpool.h" #include "util/thrift_rpc_helper.h" #include "util/timezone_utils.h" +#include "vec/exec/format/orc/orc_memory_pool.h" +#include "vec/exec/format/parquet/arrow_memory_pool.h" #include "vec/exec/scan/scanner_scheduler.h" #include "vec/runtime/vdata_stream_mgr.h" #include "vec/sink/delta_writer_v2_pool.h" @@ -573,6 +575,10 @@ Status ExecEnv::_init_mem_env() { << PrettyPrinter::print(inverted_index_cache_limit, TUnit::BYTES) << ", origin config value: " << config::inverted_index_query_cache_limit; + // init orc memory pool + _orc_memory_pool = new doris::vectorized::ORCMemoryPool(); + _arrow_memory_pool = new doris::vectorized::ArrowMemoryPool(); + return Status::OK(); } @@ -751,6 +757,9 @@ void ExecEnv::destroy() { // We should free task scheduler finally because task queue / scheduler maybe used by pipelineX. SAFE_DELETE(_without_group_task_scheduler); + SAFE_DELETE(_arrow_memory_pool); + SAFE_DELETE(_orc_memory_pool); + // dns cache is a global instance and need to be released at last SAFE_DELETE(_dns_cache); diff --git a/be/src/util/faststring.h b/be/src/util/faststring.h index 8d9fa6d004f..3ec0acbda01 100644 --- a/be/src/util/faststring.h +++ b/be/src/util/faststring.h @@ -35,7 +35,7 @@ namespace doris { // common use cases (in particular, resize() will fill with uninitialized data // instead of memsetting to \0) // only build() can transfer data to the outside. -class faststring : private Allocator<false, false, false> { +class faststring : private Allocator<false, false, false, DefaultMemoryAllocator> { public: enum { kInitialCapacity = 32 }; diff --git a/be/src/util/slice.h b/be/src/util/slice.h index 80f9616f3da..bae33d4ee75 100644 --- a/be/src/util/slice.h +++ b/be/src/util/slice.h @@ -340,7 +340,7 @@ struct SliceMap { // // only receive the memory allocated by Allocator and disables mmap, // otherwise the memory may not be freed correctly, currently only be constructed by faststring. -class OwnedSlice : private Allocator<false, false, false> { +class OwnedSlice : private Allocator<false, false, false, DefaultMemoryAllocator> { public: OwnedSlice() : _slice((uint8_t*)nullptr, 0) {} diff --git a/be/src/vec/common/allocator.cpp b/be/src/vec/common/allocator.cpp index 2b1c05533cd..879e98d0ca4 100644 --- a/be/src/vec/common/allocator.cpp +++ b/be/src/vec/common/allocator.cpp @@ -40,8 +40,12 @@ #include "util/stack_util.h" #include "util/uid_util.h" -template <bool clear_memory_, bool mmap_populate, bool use_mmap> -void Allocator<clear_memory_, mmap_populate, use_mmap>::sys_memory_check(size_t size) const { +std::unordered_map<void*, size_t> RecordSizeMemoryAllocator::_allocated_sizes; +std::mutex RecordSizeMemoryAllocator::_mutex; + +template <bool clear_memory_, bool mmap_populate, bool use_mmap, typename MemoryAllocator> +void Allocator<clear_memory_, mmap_populate, use_mmap, MemoryAllocator>::sys_memory_check( + size_t size) const { #ifdef BE_TEST if (!doris::ExecEnv::ready()) { return; @@ -155,8 +159,9 @@ void Allocator<clear_memory_, mmap_populate, use_mmap>::sys_memory_check(size_t } } -template <bool clear_memory_, bool mmap_populate, bool use_mmap> -void Allocator<clear_memory_, mmap_populate, use_mmap>::memory_tracker_check(size_t size) const { +template <bool clear_memory_, bool mmap_populate, bool use_mmap, typename MemoryAllocator> +void Allocator<clear_memory_, mmap_populate, use_mmap, MemoryAllocator>::memory_tracker_check( + size_t size) const { #ifdef BE_TEST if (!doris::ExecEnv::ready()) { return; @@ -191,24 +196,27 @@ void Allocator<clear_memory_, mmap_populate, use_mmap>::memory_tracker_check(siz } } -template <bool clear_memory_, bool mmap_populate, bool use_mmap> -void Allocator<clear_memory_, mmap_populate, use_mmap>::memory_check(size_t size) const { +template <bool clear_memory_, bool mmap_populate, bool use_mmap, typename MemoryAllocator> +void Allocator<clear_memory_, mmap_populate, use_mmap, MemoryAllocator>::memory_check( + size_t size) const { sys_memory_check(size); memory_tracker_check(size); } -template <bool clear_memory_, bool mmap_populate, bool use_mmap> -void Allocator<clear_memory_, mmap_populate, use_mmap>::consume_memory(size_t size) const { +template <bool clear_memory_, bool mmap_populate, bool use_mmap, typename MemoryAllocator> +void Allocator<clear_memory_, mmap_populate, use_mmap, MemoryAllocator>::consume_memory( + size_t size) const { CONSUME_THREAD_MEM_TRACKER(size); } -template <bool clear_memory_, bool mmap_populate, bool use_mmap> -void Allocator<clear_memory_, mmap_populate, use_mmap>::release_memory(size_t size) const { +template <bool clear_memory_, bool mmap_populate, bool use_mmap, typename MemoryAllocator> +void Allocator<clear_memory_, mmap_populate, use_mmap, MemoryAllocator>::release_memory( + size_t size) const { RELEASE_THREAD_MEM_TRACKER(size); } -template <bool clear_memory_, bool mmap_populate, bool use_mmap> -void Allocator<clear_memory_, mmap_populate, use_mmap>::throw_bad_alloc( +template <bool clear_memory_, bool mmap_populate, bool use_mmap, typename MemoryAllocator> +void Allocator<clear_memory_, mmap_populate, use_mmap, MemoryAllocator>::throw_bad_alloc( const std::string& err) const { LOG(WARNING) << err << fmt::format("{}, Stacktrace: {}", @@ -219,9 +227,9 @@ void Allocator<clear_memory_, mmap_populate, use_mmap>::throw_bad_alloc( } #ifndef NDEBUG -template <bool clear_memory_, bool mmap_populate, bool use_mmap> -void Allocator<clear_memory_, mmap_populate, use_mmap>::add_address_sanitizers(void* buf, - size_t size) const { +template <bool clear_memory_, bool mmap_populate, bool use_mmap, typename MemoryAllocator> +void Allocator<clear_memory_, mmap_populate, use_mmap, MemoryAllocator>::add_address_sanitizers( + void* buf, size_t size) const { #ifdef BE_TEST if (!doris::ExecEnv::ready()) { return; @@ -230,8 +238,8 @@ void Allocator<clear_memory_, mmap_populate, use_mmap>::add_address_sanitizers(v doris::thread_context()->thread_mem_tracker()->add_address_sanitizers(buf, size); } -template <bool clear_memory_, bool mmap_populate, bool use_mmap> -void Allocator<clear_memory_, mmap_populate, use_mmap>::remove_address_sanitizers( +template <bool clear_memory_, bool mmap_populate, bool use_mmap, typename MemoryAllocator> +void Allocator<clear_memory_, mmap_populate, use_mmap, MemoryAllocator>::remove_address_sanitizers( void* buf, size_t size) const { #ifdef BE_TEST if (!doris::ExecEnv::ready()) { @@ -242,23 +250,43 @@ void Allocator<clear_memory_, mmap_populate, use_mmap>::remove_address_sanitizer } #endif -template <bool clear_memory_, bool mmap_populate, bool use_mmap> -void* Allocator<clear_memory_, mmap_populate, use_mmap>::alloc(size_t size, size_t alignment) { +template <bool clear_memory_, bool mmap_populate, bool use_mmap, typename MemoryAllocator> +void* Allocator<clear_memory_, mmap_populate, use_mmap, MemoryAllocator>::alloc(size_t size, + size_t alignment) { return alloc_impl(size, alignment); } -template <bool clear_memory_, bool mmap_populate, bool use_mmap> -void* Allocator<clear_memory_, mmap_populate, use_mmap>::realloc(void* buf, size_t old_size, - size_t new_size, - size_t alignment) { +template <bool clear_memory_, bool mmap_populate, bool use_mmap, typename MemoryAllocator> +void* Allocator<clear_memory_, mmap_populate, use_mmap, MemoryAllocator>::realloc( + void* buf, size_t old_size, size_t new_size, size_t alignment) { return realloc_impl(buf, old_size, new_size, alignment); } -template class Allocator<true, true, true>; -template class Allocator<true, true, false>; -template class Allocator<true, false, true>; -template class Allocator<true, false, false>; -template class Allocator<false, true, true>; -template class Allocator<false, true, false>; -template class Allocator<false, false, true>; -template class Allocator<false, false, false>; +template class Allocator<true, true, true, DefaultMemoryAllocator>; +template class Allocator<true, true, false, DefaultMemoryAllocator>; +template class Allocator<true, false, true, DefaultMemoryAllocator>; +template class Allocator<true, false, false, DefaultMemoryAllocator>; +template class Allocator<false, true, true, DefaultMemoryAllocator>; +template class Allocator<false, true, false, DefaultMemoryAllocator>; +template class Allocator<false, false, true, DefaultMemoryAllocator>; +template class Allocator<false, false, false, DefaultMemoryAllocator>; + +/** It would be better to put these Memory Allocators where they are used, such as in the orc memory pool and arrow memory pool. + * But currently allocators use templates in .cpp instead of all in .h, so they can only be placed here. + */ +template class Allocator<true, true, false, ORCMemoryAllocator>; +template class Allocator<true, false, true, ORCMemoryAllocator>; +template class Allocator<true, false, false, ORCMemoryAllocator>; +template class Allocator<false, true, true, ORCMemoryAllocator>; +template class Allocator<false, true, false, ORCMemoryAllocator>; +template class Allocator<false, false, true, ORCMemoryAllocator>; +template class Allocator<false, false, false, ORCMemoryAllocator>; + +template class Allocator<true, true, true, RecordSizeMemoryAllocator>; +template class Allocator<true, true, false, RecordSizeMemoryAllocator>; +template class Allocator<true, false, true, RecordSizeMemoryAllocator>; +template class Allocator<true, false, false, RecordSizeMemoryAllocator>; +template class Allocator<false, true, true, RecordSizeMemoryAllocator>; +template class Allocator<false, true, false, RecordSizeMemoryAllocator>; +template class Allocator<false, false, true, RecordSizeMemoryAllocator>; +template class Allocator<false, false, false, RecordSizeMemoryAllocator>; diff --git a/be/src/vec/common/allocator.h b/be/src/vec/common/allocator.h index 3c513f270bb..88c85dceeb3 100644 --- a/be/src/vec/common/allocator.h +++ b/be/src/vec/common/allocator.h @@ -23,6 +23,10 @@ // TODO: Readable #include <fmt/format.h> +#if defined(USE_JEMALLOC) +#include <jemalloc/jemalloc.h> +#endif // defined(USE_JEMALLOC) +#include <malloc.h> #include <stdint.h> #include <string.h> @@ -68,6 +72,128 @@ static constexpr size_t MALLOC_MIN_ALIGNMENT = 8; // is always a multiple of sixteen. (https://www.gnu.org/software/libc/manual/html_node/Aligned-Memory-Blocks.html) static constexpr int ALLOCATOR_ALIGNMENT_16 = 16; +class DefaultMemoryAllocator { +public: + static void* malloc(size_t size) __THROW { return std::malloc(size); } + + static void* calloc(size_t n, size_t size) __THROW { return std::calloc(n, size); } + + static constexpr bool need_record_actual_size() { return false; } + + static int posix_memalign(void** ptr, size_t alignment, size_t size) __THROW { + return ::posix_memalign(ptr, alignment, size); + } + + static void* realloc(void* ptr, size_t size) __THROW { return std::realloc(ptr, size); } + + static void free(void* p) __THROW { std::free(p); } + + static void release_unused() { +#if defined(USE_JEMALLOC) + jemallctl(fmt::format("arena.{}.purge", MALLCTL_ARENAS_ALL).c_str(), NULL, NULL, NULL, 0); +#endif // defined(USE_JEMALLOC) + } +}; + +/** It would be better to put these Memory Allocators where they are used, such as in the orc memory pool and arrow memory pool. + * But currently allocators use templates in .cpp instead of all in .h, so they can only be placed here. + */ +class ORCMemoryAllocator { +public: + static void* malloc(size_t size) __THROW { return reinterpret_cast<char*>(std::malloc(size)); } + + static void* calloc(size_t n, size_t size) __THROW { return std::calloc(n, size); } + + static constexpr bool need_record_actual_size() { return true; } + + static size_t allocated_size(void* ptr) { return malloc_usable_size(ptr); } + + static int posix_memalign(void** ptr, size_t alignment, size_t size) __THROW { + return ::posix_memalign(ptr, alignment, size); + } + + static void* realloc(void* ptr, size_t size) __THROW { + LOG(FATAL) << "__builtin_unreachable"; + __builtin_unreachable(); + } + + static void free(void* p) __THROW { std::free(p); } + + static void release_unused() {} +}; + +class RecordSizeMemoryAllocator { +public: + static void* malloc(size_t size) __THROW { + void* p = std::malloc(size); + if (p) { + std::lock_guard<std::mutex> lock(_mutex); + _allocated_sizes[p] = size; + } + return p; + } + + static void* calloc(size_t n, size_t size) __THROW { + void* p = std::calloc(n, size); + if (p) { + std::lock_guard<std::mutex> lock(_mutex); + _allocated_sizes[p] = n * size; + } + return p; + } + + static constexpr bool need_record_actual_size() { return false; } + + static size_t allocated_size(void* ptr) { + std::lock_guard<std::mutex> lock(_mutex); + auto it = _allocated_sizes.find(ptr); + if (it != _allocated_sizes.end()) { + return it->second; + } + return 0; + } + + static int posix_memalign(void** ptr, size_t alignment, size_t size) __THROW { + int ret = ::posix_memalign(ptr, alignment, size); + if (ret == 0 && *ptr) { + std::lock_guard<std::mutex> lock(_mutex); + _allocated_sizes[*ptr] = size; + } + return ret; + } + + static void* realloc(void* ptr, size_t size) __THROW { + std::lock_guard<std::mutex> lock(_mutex); + + auto it = _allocated_sizes.find(ptr); + if (it != _allocated_sizes.end()) { + _allocated_sizes.erase(it); + } + + void* p = std::realloc(ptr, size); + + if (p) { + _allocated_sizes[p] = size; + } + + return p; + } + + static void free(void* p) __THROW { + if (p) { + std::lock_guard<std::mutex> lock(_mutex); + _allocated_sizes.erase(p); + std::free(p); + } + } + + static void release_unused() {} + +private: + static std::unordered_map<void*, size_t> _allocated_sizes; + static std::mutex _mutex; +}; + /** Responsible for allocating / freeing memory. Used, for example, in PODArray, Arena. * Also used in hash tables. * The interface is different from std::allocator @@ -78,7 +204,7 @@ static constexpr int ALLOCATOR_ALIGNMENT_16 = 16; * - random hint address for mmap * - mmap_threshold for using mmap less or more */ -template <bool clear_memory_, bool mmap_populate, bool use_mmap> +template <bool clear_memory_, bool mmap_populate, bool use_mmap, typename MemoryAllocator> class Allocator { public: void sys_memory_check(size_t size) const; @@ -104,6 +230,7 @@ public: // consume memory in tracker before alloc, similar to early declaration. consume_memory(size); void* buf; + size_t record_size = size; if (use_mmap && size >= doris::config::mmap_threshold) { if (alignment > MMAP_MIN_ALIGNMENT) @@ -117,38 +244,51 @@ public: release_memory(size); throw_bad_alloc(fmt::format("Allocator: Cannot mmap {}.", size)); } + if constexpr (MemoryAllocator::need_record_actual_size()) { + record_size = MemoryAllocator::allocated_size(buf); + } /// No need for zero-fill, because mmap guarantees it. } else { if (alignment <= MALLOC_MIN_ALIGNMENT) { if constexpr (clear_memory) - buf = ::calloc(size, 1); + buf = MemoryAllocator::calloc(size, 1); else - buf = ::malloc(size); + buf = MemoryAllocator::malloc(size); if (nullptr == buf) { release_memory(size); throw_bad_alloc(fmt::format("Allocator: Cannot malloc {}.", size)); } + if constexpr (MemoryAllocator::need_record_actual_size()) { + record_size = MemoryAllocator::allocated_size(buf); + } #ifndef NDEBUG - add_address_sanitizers(buf, size); + add_address_sanitizers(buf, record_size); #endif } else { buf = nullptr; - int res = posix_memalign(&buf, alignment, size); + int res = MemoryAllocator::posix_memalign(&buf, alignment, size); if (0 != res) { release_memory(size); throw_bad_alloc( fmt::format("Cannot allocate memory (posix_memalign) {}.", size)); } -#ifndef NDEBUG - add_address_sanitizers(buf, size); -#endif if constexpr (clear_memory) memset(buf, 0, size); + + if constexpr (MemoryAllocator::need_record_actual_size()) { + record_size = MemoryAllocator::allocated_size(buf); + } +#ifndef NDEBUG + add_address_sanitizers(buf, record_size); +#endif } } + if constexpr (MemoryAllocator::need_record_actual_size()) { + consume_memory(record_size - size); + } return buf; } @@ -162,11 +302,13 @@ public: #ifndef NDEBUG remove_address_sanitizers(buf, size); #endif - ::free(buf); + MemoryAllocator::free(buf); } release_memory(size); } + void release_unused() { MemoryAllocator::release_unused(); } + /** Enlarge memory range. * Data from old range is moved to the beginning of new range. * Address of memory range could change. @@ -187,7 +329,7 @@ public: remove_address_sanitizers(buf, old_size); #endif /// Resize malloc'd memory region with no special alignment requirement. - void* new_buf = ::realloc(buf, new_size); + void* new_buf = MemoryAllocator::realloc(buf, new_size); if (nullptr == new_buf) { release_memory(new_size - old_size); throw_bad_alloc(fmt::format("Allocator: Cannot realloc from {} to {}.", old_size, @@ -195,7 +337,8 @@ public: } #ifndef NDEBUG add_address_sanitizers( - new_buf, new_size); // usually, buf addr = new_buf addr, asan maybe not equal. + new_buf, + new_size); // usually, buf addr = new_buf addr, asan maybe not equal. #endif buf = new_buf; diff --git a/be/src/vec/common/allocator_fwd.h b/be/src/vec/common/allocator_fwd.h index 988f7a5c7af..da43030a133 100644 --- a/be/src/vec/common/allocator_fwd.h +++ b/be/src/vec/common/allocator_fwd.h @@ -24,7 +24,9 @@ #pragma once #include <cstddef> -template <bool clear_memory_, bool mmap_populate = false, bool use_mmap = false> +class DefaultMemoryAllocator; +template <bool clear_memory_, bool mmap_populate = false, bool use_mmap = false, + typename MemoryAllocator = DefaultMemoryAllocator> class Allocator; template <typename Base, size_t N = 64, size_t Alignment = 1> diff --git a/be/src/vec/common/hash_table/phmap_fwd_decl.h b/be/src/vec/common/hash_table/phmap_fwd_decl.h index 62373410968..c6ff77a7e71 100644 --- a/be/src/vec/common/hash_table/phmap_fwd_decl.h +++ b/be/src/vec/common/hash_table/phmap_fwd_decl.h @@ -26,7 +26,7 @@ namespace doris::vectorized { /// `Allocator_` implements several interfaces of `std::allocator` /// which `phmap::flat_hash_map` will use. template <typename T> -class Allocator_ : private Allocator<true, false, false> { +class Allocator_ : private Allocator<true, false, false, DefaultMemoryAllocator> { public: using value_type = T; using pointer = T*; diff --git a/be/src/vec/exec/format/orc/orc_memory_pool.h b/be/src/vec/exec/format/orc/orc_memory_pool.h new file mode 100644 index 00000000000..1df3d63f952 --- /dev/null +++ b/be/src/vec/exec/format/orc/orc_memory_pool.h @@ -0,0 +1,53 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +#pragma once + +#include "orc/MemoryPool.hh" +#include "vec/common/allocator.h" + +#if defined(ADDRESS_SANITIZER) || defined(LEAK_SANITIZER) || defined(THREAD_SANITIZER) +using ORC_MEMORY_ALLOCATOR = RecordSizeMemoryAllocator; +#else +using ORC_MEMORY_ALLOCATOR = ORCMemoryAllocator; +#endif + +namespace doris::vectorized { + +class ORCMemoryPool : public orc::MemoryPool { +public: + char* malloc(uint64_t size) override { + char* p = reinterpret_cast<char*>(_allocator.alloc(size)); + return p; + } + + void free(char* p) override { + if (p == nullptr) { + return; + } + size_t size = ORC_MEMORY_ALLOCATOR::allocated_size(p); + _allocator.free(p, size); + } + + ORCMemoryPool() = default; + ~ORCMemoryPool() override = default; + +private: + Allocator<false, false, false, ORC_MEMORY_ALLOCATOR> _allocator; +}; + +} // namespace doris::vectorized diff --git a/be/src/vec/exec/format/orc/vorc_reader.cpp b/be/src/vec/exec/format/orc/vorc_reader.cpp index e2ba3a57be8..b70e3496133 100644 --- a/be/src/vec/exec/format/orc/vorc_reader.cpp +++ b/be/src/vec/exec/format/orc/vorc_reader.cpp @@ -71,6 +71,7 @@ #include "vec/data_types/data_type_map.h" #include "vec/data_types/data_type_nullable.h" #include "vec/data_types/data_type_struct.h" +#include "vec/exec/format/orc/orc_memory_pool.h" #include "vec/exec/format/table/transactional_hive_common.h" #include "vec/exprs/vbloom_predicate.h" #include "vec/exprs/vdirect_in_predicate.h" @@ -252,6 +253,7 @@ Status OrcReader::_create_file_reader() { // create orc reader try { orc::ReaderOptions options; + options.setMemoryPool(*ExecEnv::GetInstance()->orc_memory_pool()); _reader = orc::createReader( std::unique_ptr<ORCFileInputStream>(_file_input_stream.release()), options); } catch (std::exception& e) { diff --git a/be/src/vec/exec/format/parquet/arrow_memory_pool.cpp b/be/src/vec/exec/format/parquet/arrow_memory_pool.cpp new file mode 100644 index 00000000000..ed06e5c821a --- /dev/null +++ b/be/src/vec/exec/format/parquet/arrow_memory_pool.cpp @@ -0,0 +1,74 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +#include "vec/exec/format/parquet/arrow_memory_pool.h" + +#include "glog/logging.h" + +namespace doris::vectorized { + +// A static piece of memory for 0-size allocations, so as to return +// an aligned non-null pointer. Note the correct value for DebugAllocator +// checks is hardcoded. +alignas(kDefaultBufferAlignment) int64_t zero_size_area[1] = {kDebugXorSuffix}; + +arrow::Status ArrowAllocator::allocate_aligned(int64_t size, int64_t alignment, uint8_t** out) { + if (size == 0) { + *out = kZeroSizeArea; + return arrow::Status::OK(); + } + *out = reinterpret_cast<uint8_t*>(_allocator.alloc(size, alignment)); + if (*out == nullptr) { + return arrow::Status::OutOfMemory("malloc of size ", size, " failed"); + } + return arrow::Status::OK(); +} + +arrow::Status ArrowAllocator::reallocate_aligned(int64_t old_size, int64_t new_size, + int64_t alignment, uint8_t** ptr) { + uint8_t* previous_ptr = *ptr; + if (previous_ptr == kZeroSizeArea) { + DCHECK_EQ(old_size, 0); + return allocate_aligned(new_size, alignment, ptr); + } + if (new_size == 0) { + deallocate_aligned(previous_ptr, old_size, alignment); + *ptr = kZeroSizeArea; + return arrow::Status::OK(); + } + *ptr = reinterpret_cast<uint8_t*>(_allocator.realloc(*ptr, static_cast<size_t>(old_size), + static_cast<size_t>(new_size), alignment)); + if (*ptr == nullptr) { + *ptr = previous_ptr; + return arrow::Status::OutOfMemory("realloc of size ", new_size, " failed"); + } + return arrow::Status::OK(); +} + +void ArrowAllocator::deallocate_aligned(uint8_t* ptr, int64_t size, int64_t alignment) { + if (ptr == kZeroSizeArea) { + DCHECK_EQ(size, 0); + } else { + _allocator.free(ptr, static_cast<size_t>(size)); + } +} + +void ArrowAllocator::release_unused() { + _allocator.release_unused(); +} + +} // namespace doris::vectorized diff --git a/be/src/vec/exec/format/parquet/arrow_memory_pool.h b/be/src/vec/exec/format/parquet/arrow_memory_pool.h new file mode 100644 index 00000000000..a93e426f374 --- /dev/null +++ b/be/src/vec/exec/format/parquet/arrow_memory_pool.h @@ -0,0 +1,199 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +#pragma once + +#include "arrow/memory_pool.h" +#include "arrow/status.h" +#include "vec/common/allocator.h" +#include "vec/common/allocator_fwd.h" + +namespace doris::vectorized { + +constexpr int64_t kDefaultBufferAlignment = 64; +static constexpr int64_t kDebugXorSuffix = -0x181fe80e0b464188LL; +#ifndef NDEBUG +static constexpr uint8_t kAllocPoison = 0xBC; +static constexpr uint8_t kReallocPoison = 0xBD; +static constexpr uint8_t kDeallocPoison = 0xBE; +#endif + +// A static piece of memory for 0-size allocations, so as to return +// an aligned non-null pointer. Note the correct value for DebugAllocator +// checks is hardcoded. +extern int64_t zero_size_area[1]; +static uint8_t* const kZeroSizeArea = reinterpret_cast<uint8_t*>(&zero_size_area); + +using ARROW_MEMORY_ALLOCATOR = DefaultMemoryAllocator; + +class ArrowAllocator { +public: + arrow::Status allocate_aligned(int64_t size, int64_t alignment, uint8_t** out); + arrow::Status reallocate_aligned(int64_t old_size, int64_t new_size, int64_t alignment, + uint8_t** ptr); + void deallocate_aligned(uint8_t* ptr, int64_t size, int64_t alignment); + void release_unused(); + +private: + Allocator<false, false, false, ARROW_MEMORY_ALLOCATOR> _allocator; +}; + +/////////////////////////////////////////////////////////////////////// +// Helper tracking memory statistics + +/// \brief Memory pool statistics +/// +/// 64-byte aligned so that all atomic values are on the same cache line. +class alignas(64) ArrowMemoryPoolStats { +private: + // All atomics are updated according to Acquire-Release ordering. + // https://en.cppreference.com/w/cpp/atomic/memory_order#Release-Acquire_ordering + // + // max_memory_, total_allocated_bytes_, and num_allocs_ only go up (they are + // monotonically increasing) which can allow some optimizations. + std::atomic<int64_t> max_memory_ {0}; + std::atomic<int64_t> bytes_allocated_ {0}; + std::atomic<int64_t> total_allocated_bytes_ {0}; + std::atomic<int64_t> num_allocs_ {0}; + +public: + int64_t max_memory() const { return max_memory_.load(std::memory_order_acquire); } + + int64_t bytes_allocated() const { return bytes_allocated_.load(std::memory_order_acquire); } + + int64_t total_bytes_allocated() const { + return total_allocated_bytes_.load(std::memory_order_acquire); + } + + int64_t num_allocations() const { return num_allocs_.load(std::memory_order_acquire); } + + inline void did_allocate_bytes(int64_t size) { + // Issue the load before everything else. max_memory_ is monotonically increasing, + // so we can use a relaxed load before the read-modify-write. + auto max_memory = max_memory_.load(std::memory_order_relaxed); + const auto old_bytes_allocated = + bytes_allocated_.fetch_add(size, std::memory_order_acq_rel); + // Issue store operations on values that we don't depend on to proceed + // with execution. When done, max_memory and old_bytes_allocated have + // a higher chance of being available on CPU registers. This also has the + // nice side-effect of putting 3 atomic stores close to each other in the + // instruction stream. + total_allocated_bytes_.fetch_add(size, std::memory_order_acq_rel); + num_allocs_.fetch_add(1, std::memory_order_acq_rel); + + // If other threads are updating max_memory_ concurrently we leave the loop without + // updating knowing that it already reached a value even higher than ours. + const auto allocated = old_bytes_allocated + size; + while (max_memory < allocated && + !max_memory_.compare_exchange_weak( + /*expected=*/max_memory, /*desired=*/allocated, std::memory_order_acq_rel)) { + } + } + + inline void did_reallocate_bytes(int64_t old_size, int64_t new_size) { + if (new_size > old_size) { + did_allocate_bytes(new_size - old_size); + } else { + did_free_bytes(old_size - new_size); + } + } + + inline void did_free_bytes(int64_t size) { + bytes_allocated_.fetch_sub(size, std::memory_order_acq_rel); + } +}; + +template <typename Allocator = ArrowAllocator> +class ArrowMemoryPool : public arrow::MemoryPool { +public: + ~ArrowMemoryPool() override = default; + + arrow::Status Allocate(int64_t size, int64_t alignment, uint8_t** out) override { + if (size < 0) { + return arrow::Status::Invalid("negative malloc size"); + } + if (static_cast<uint64_t>(size) >= std::numeric_limits<size_t>::max()) { + return arrow::Status::OutOfMemory("malloc size overflows size_t"); + } + RETURN_NOT_OK(_allocator.allocate_aligned(size, alignment, out)); +#ifndef NDEBUG + // Poison data + if (size > 0) { + DCHECK_NE(*out, nullptr); + (*out)[0] = kAllocPoison; + (*out)[size - 1] = kAllocPoison; + } +#endif + + _stats.did_allocate_bytes(size); + return arrow::Status::OK(); + } + + arrow::Status Reallocate(int64_t old_size, int64_t new_size, int64_t alignment, + uint8_t** ptr) override { + if (new_size < 0) { + return arrow::Status::Invalid("negative realloc size"); + } + if (static_cast<uint64_t>(new_size) >= std::numeric_limits<size_t>::max()) { + return arrow::Status::OutOfMemory("realloc overflows size_t"); + } + RETURN_NOT_OK(_allocator.reallocate_aligned(old_size, new_size, alignment, ptr)); +#ifndef NDEBUG + // Poison data + if (new_size > old_size) { + DCHECK_NE(*ptr, nullptr); + (*ptr)[old_size] = kReallocPoison; + (*ptr)[new_size - 1] = kReallocPoison; + } +#endif + + _stats.did_reallocate_bytes(old_size, new_size); + return arrow::Status::OK(); + } + + void Free(uint8_t* buffer, int64_t size, int64_t alignment) override { +#ifndef NDEBUG + // Poison data + if (size > 0) { + DCHECK_NE(buffer, nullptr); + buffer[0] = kDeallocPoison; + buffer[size - 1] = kDeallocPoison; + } +#endif + _allocator.deallocate_aligned(buffer, size, alignment); + + _stats.did_free_bytes(size); + } + + void ReleaseUnused() override { _allocator.release_unused(); } + + int64_t bytes_allocated() const override { return _stats.bytes_allocated(); } + + int64_t max_memory() const override { return _stats.max_memory(); } + + int64_t total_bytes_allocated() const override { return _stats.total_bytes_allocated(); } + + int64_t num_allocations() const override { return _stats.num_allocations(); } + + std::string backend_name() const override { return "ArrowMemoryPool"; } + +protected: + ArrowMemoryPoolStats _stats; + Allocator _allocator; +}; + +} // namespace doris::vectorized diff --git a/be/src/vec/runtime/vorc_transformer.cpp b/be/src/vec/runtime/vorc_transformer.cpp index 09bae276d65..6c512a94373 100644 --- a/be/src/vec/runtime/vorc_transformer.cpp +++ b/be/src/vec/runtime/vorc_transformer.cpp @@ -31,6 +31,7 @@ #include "orc/OrcFile.hh" #include "orc/Vector.hh" #include "runtime/define_primitive_type.h" +#include "runtime/exec_env.h" #include "runtime/runtime_state.h" #include "runtime/types.h" #include "util/binary_cast.hpp" @@ -151,6 +152,7 @@ Status VOrcTransformer::open() { _output_stream = std::make_unique<VOrcOutputStream>(_file_writer); try { + _write_options->setMemoryPool(ExecEnv::GetInstance()->orc_memory_pool()); _writer = orc::createWriter(*_schema, _output_stream.get(), *_write_options); } catch (const std::exception& e) { return Status::InternalError("failed to create writer: {}", e.what()); @@ -314,15 +316,15 @@ int64_t VOrcTransformer::written_len() { } Status VOrcTransformer::close() { - if (_writer != nullptr) { - try { + try { + if (_writer != nullptr) { _writer->close(); - } catch (const std::exception& e) { - return Status::IOError(e.what()); } - } - if (_output_stream) { - _output_stream->close(); + if (_output_stream) { + _output_stream->close(); + } + } catch (const std::exception& e) { + return Status::IOError(e.what()); } return Status::OK(); } @@ -353,13 +355,13 @@ Status VOrcTransformer::write(const Block& block) { RETURN_IF_ERROR(_serdes[i]->write_column_to_orc( _state->timezone(), *raw_column, nullptr, root->fields[i], 0, sz, buffer_list)); } + root->numElements = sz; + _writer->add(*row_batch); + _cur_written_rows += sz; } catch (const std::exception& e) { LOG(WARNING) << "Orc write error: " << e.what(); return Status::InternalError(e.what()); } - root->numElements = sz; - _writer->add(*row_batch); - _cur_written_rows += sz; return Status::OK(); } diff --git a/be/src/vec/runtime/vparquet_transformer.cpp b/be/src/vec/runtime/vparquet_transformer.cpp index 116a898c4f1..1969858349f 100644 --- a/be/src/vec/runtime/vparquet_transformer.cpp +++ b/be/src/vec/runtime/vparquet_transformer.cpp @@ -42,6 +42,7 @@ #include "olap/olap_common.h" #include "runtime/decimalv2_value.h" #include "runtime/define_primitive_type.h" +#include "runtime/exec_env.h" #include "runtime/runtime_state.h" #include "runtime/types.h" #include "util/arrow/block_convertor.h" @@ -237,6 +238,7 @@ VParquetTransformer::VParquetTransformer(RuntimeState* state, doris::io::FileWri Status VParquetTransformer::_parse_properties() { try { + arrow::MemoryPool* pool = ExecEnv::GetInstance()->arrow_memory_pool(); parquet::WriterProperties::Builder builder; ParquetBuildHelper::build_compression_type(builder, _compression_type); ParquetBuildHelper::build_version(builder, _parquet_version); @@ -248,6 +250,7 @@ Status VParquetTransformer::_parse_properties() { builder.created_by( fmt::format("{}({})", doris::get_short_version(), parquet::DEFAULT_CREATED_BY)); builder.max_row_group_length(std::numeric_limits<int64_t>::max()); + builder.memory_pool(pool); _parquet_writer_properties = builder.build(); _arrow_properties = parquet::ArrowWriterProperties::Builder() .enable_deprecated_int96_timestamps() @@ -292,8 +295,9 @@ Status VParquetTransformer::write(const Block& block) { // serialize std::shared_ptr<arrow::RecordBatch> result; - RETURN_IF_ERROR(convert_to_arrow_batch(block, _arrow_schema, arrow::default_memory_pool(), - &result, _state->timezone_obj())); + RETURN_IF_ERROR(convert_to_arrow_batch(block, _arrow_schema, + ExecEnv::GetInstance()->arrow_memory_pool(), &result, + _state->timezone_obj())); RETURN_DORIS_STATUS_IF_ERROR(_writer->WriteRecordBatch(*result)); _write_size += block.bytes(); @@ -305,9 +309,10 @@ Status VParquetTransformer::write(const Block& block) { } arrow::Status VParquetTransformer::_open_file_writer() { - ARROW_ASSIGN_OR_RAISE(_writer, parquet::arrow::FileWriter::Open( - *_arrow_schema, arrow::default_memory_pool(), _outstream, - _parquet_writer_properties, _arrow_properties)); + ARROW_ASSIGN_OR_RAISE(_writer, + parquet::arrow::FileWriter::Open( + *_arrow_schema, ExecEnv::GetInstance()->arrow_memory_pool(), + _outstream, _parquet_writer_properties, _arrow_properties)); return arrow::Status::OK(); } --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org For additional commands, e-mail: commits-h...@doris.apache.org