This is an automated email from the ASF dual-hosted git repository.

lihaopeng pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/doris.git


The following commit(s) were added to refs/heads/master by this push:
     new af589c0b13b [memtracker](accuracy) should not account resuable buffer 
to query memtracker (#33933)
af589c0b13b is described below

commit af589c0b13b0af8c3ea8d17753b820f9ee20aba2
Author: yiguolei <676222...@qq.com>
AuthorDate: Mon Apr 22 00:10:03 2024 +0800

    [memtracker](accuracy) should not account resuable buffer to query 
memtracker (#33933)
    
    
    Co-authored-by: yiguolei <yiguo...@gmail.com>
---
 be/src/util/block_compression.cpp | 118 +++++++++++++++++++++-----------------
 1 file changed, 65 insertions(+), 53 deletions(-)

diff --git a/be/src/util/block_compression.cpp 
b/be/src/util/block_compression.cpp
index f3b1e781e7e..21f7e72f5d9 100644
--- a/be/src/util/block_compression.cpp
+++ b/be/src/util/block_compression.cpp
@@ -96,13 +96,20 @@ private:
         ENABLE_FACTORY_CREATOR(Context);
 
     public:
-        Context() : ctx(nullptr) {}
+        Context() : ctx(nullptr) {
+            SCOPED_SWITCH_THREAD_MEM_TRACKER_LIMITER(
+                    ExecEnv::GetInstance()->block_compression_mem_tracker());
+            buffer = std::make_unique<faststring>();
+        }
         LZ4_stream_t* ctx;
-        faststring buffer;
+        std::unique_ptr<faststring> buffer;
         ~Context() {
+            SCOPED_SWITCH_THREAD_MEM_TRACKER_LIMITER(
+                    ExecEnv::GetInstance()->block_compression_mem_tracker());
             if (ctx) {
                 LZ4_freeStream(ctx);
             }
+            buffer.reset();
         }
     };
 
@@ -118,8 +125,6 @@ public:
     }
 
     Status compress(const Slice& input, faststring* output) override {
-        SCOPED_SWITCH_THREAD_MEM_TRACKER_LIMITER(
-                ExecEnv::GetInstance()->block_compression_mem_tracker());
         if (input.size > INT_MAX) {
             return Status::InvalidArgument(
                     "LZ4 not support those case(input.size>INT_MAX), maybe you 
should change "
@@ -144,8 +149,14 @@ public:
             compressed_buf.size = max_len;
         } else {
             // reuse context buffer if max_len <= 
MAX_COMPRESSION_BUFFER_FOR_REUSE
-            context->buffer.resize(max_len);
-            compressed_buf.data = 
reinterpret_cast<char*>(context->buffer.data());
+            {
+                // context->buffer is resuable between queries, should 
accouting to
+                // global tracker.
+                SCOPED_SWITCH_THREAD_MEM_TRACKER_LIMITER(
+                        
ExecEnv::GetInstance()->block_compression_mem_tracker());
+                context->buffer->resize(max_len);
+            }
+            compressed_buf.data = 
reinterpret_cast<char*>(context->buffer->data());
             compressed_buf.size = max_len;
         }
 
@@ -165,8 +176,6 @@ public:
     }
 
     Status decompress(const Slice& input, Slice* output) override {
-        SCOPED_SWITCH_THREAD_MEM_TRACKER_LIMITER(
-                ExecEnv::GetInstance()->block_compression_mem_tracker());
         auto decompressed_len =
                 LZ4_decompress_safe(input.data, output->data, input.size, 
output->size);
         if (decompressed_len < 0) {
@@ -218,8 +227,6 @@ public:
         return &s_instance;
     }
     Status decompress(const Slice& input, Slice* output) override {
-        SCOPED_SWITCH_THREAD_MEM_TRACKER_LIMITER(
-                ExecEnv::GetInstance()->block_compression_mem_tracker());
         
RETURN_IF_ERROR(Decompressor::create_decompressor(CompressType::LZ4BLOCK, 
&_decompressor));
         size_t input_bytes_read = 0;
         size_t decompressed_len = 0;
@@ -245,13 +252,20 @@ private:
         ENABLE_FACTORY_CREATOR(CContext);
 
     public:
-        CContext() : ctx(nullptr) {}
+        CContext() : ctx(nullptr) {
+            SCOPED_SWITCH_THREAD_MEM_TRACKER_LIMITER(
+                    ExecEnv::GetInstance()->block_compression_mem_tracker());
+            buffer = std::make_unique<faststring>();
+        }
         LZ4F_compressionContext_t ctx;
-        faststring buffer;
+        std::unique_ptr<faststring> buffer;
         ~CContext() {
+            SCOPED_SWITCH_THREAD_MEM_TRACKER_LIMITER(
+                    ExecEnv::GetInstance()->block_compression_mem_tracker());
             if (ctx) {
                 LZ4F_freeCompressionContext(ctx);
             }
+            buffer.reset();
         }
     };
     class DContext {
@@ -301,8 +315,6 @@ public:
 private:
     Status _compress(const std::vector<Slice>& inputs, size_t 
uncompressed_size,
                      faststring* output) {
-        SCOPED_SWITCH_THREAD_MEM_TRACKER_LIMITER(
-                ExecEnv::GetInstance()->block_compression_mem_tracker());
         std::unique_ptr<CContext> context;
         RETURN_IF_ERROR(_acquire_compression_ctx(context));
         bool compress_failed = false;
@@ -319,9 +331,13 @@ private:
             compressed_buf.data = reinterpret_cast<char*>(output->data());
             compressed_buf.size = max_len;
         } else {
-            // reuse context buffer if max_len <= 
MAX_COMPRESSION_BUFFER_FOR_REUSE
-            context->buffer.resize(max_len);
-            compressed_buf.data = 
reinterpret_cast<char*>(context->buffer.data());
+            {
+                SCOPED_SWITCH_THREAD_MEM_TRACKER_LIMITER(
+                        
ExecEnv::GetInstance()->block_compression_mem_tracker());
+                // reuse context buffer if max_len <= 
MAX_COMPRESSION_BUFFER_FOR_REUSE
+                context->buffer->resize(max_len);
+            }
+            compressed_buf.data = 
reinterpret_cast<char*>(context->buffer->data());
             compressed_buf.size = max_len;
         }
 
@@ -361,8 +377,6 @@ private:
     }
 
     Status _decompress(const Slice& input, Slice* output) {
-        SCOPED_SWITCH_THREAD_MEM_TRACKER_LIMITER(
-                ExecEnv::GetInstance()->block_compression_mem_tracker());
         bool decompress_failed = false;
         std::unique_ptr<DContext> context;
         RETURN_IF_ERROR(_acquire_decompression_ctx(context));
@@ -472,13 +486,20 @@ private:
         ENABLE_FACTORY_CREATOR(Context);
 
     public:
-        Context() : ctx(nullptr) {}
+        Context() : ctx(nullptr) {
+            SCOPED_SWITCH_THREAD_MEM_TRACKER_LIMITER(
+                    ExecEnv::GetInstance()->block_compression_mem_tracker());
+            buffer = std::make_unique<faststring>();
+        }
         LZ4_streamHC_t* ctx;
-        faststring buffer;
+        std::unique_ptr<faststring> buffer;
         ~Context() {
+            SCOPED_SWITCH_THREAD_MEM_TRACKER_LIMITER(
+                    ExecEnv::GetInstance()->block_compression_mem_tracker());
             if (ctx) {
                 LZ4_freeStreamHC(ctx);
             }
+            buffer.reset();
         }
     };
 
@@ -494,8 +515,6 @@ public:
     }
 
     Status compress(const Slice& input, faststring* output) override {
-        SCOPED_SWITCH_THREAD_MEM_TRACKER_LIMITER(
-                ExecEnv::GetInstance()->block_compression_mem_tracker());
         std::unique_ptr<Context> context;
         RETURN_IF_ERROR(_acquire_compression_ctx(context));
         bool compress_failed = false;
@@ -512,9 +531,13 @@ public:
             compressed_buf.data = reinterpret_cast<char*>(output->data());
             compressed_buf.size = max_len;
         } else {
-            // reuse context buffer if max_len <= 
MAX_COMPRESSION_BUFFER_FOR_REUSE
-            context->buffer.resize(max_len);
-            compressed_buf.data = 
reinterpret_cast<char*>(context->buffer.data());
+            {
+                SCOPED_SWITCH_THREAD_MEM_TRACKER_LIMITER(
+                        
ExecEnv::GetInstance()->block_compression_mem_tracker());
+                // reuse context buffer if max_len <= 
MAX_COMPRESSION_BUFFER_FOR_REUSE
+                context->buffer->resize(max_len);
+            }
+            compressed_buf.data = 
reinterpret_cast<char*>(context->buffer->data());
             compressed_buf.size = max_len;
         }
 
@@ -533,8 +556,6 @@ public:
     }
 
     Status decompress(const Slice& input, Slice* output) override {
-        SCOPED_SWITCH_THREAD_MEM_TRACKER_LIMITER(
-                ExecEnv::GetInstance()->block_compression_mem_tracker());
         auto decompressed_len =
                 LZ4_decompress_safe(input.data, output->data, input.size, 
output->size);
         if (decompressed_len < 0) {
@@ -654,8 +675,6 @@ public:
     ~SnappyBlockCompression() override {}
 
     Status compress(const Slice& input, faststring* output) override {
-        SCOPED_SWITCH_THREAD_MEM_TRACKER_LIMITER(
-                ExecEnv::GetInstance()->block_compression_mem_tracker());
         size_t max_len = max_compressed_len(input.size);
         output->resize(max_len);
         Slice s(*output);
@@ -666,8 +685,6 @@ public:
     }
 
     Status decompress(const Slice& input, Slice* output) override {
-        SCOPED_SWITCH_THREAD_MEM_TRACKER_LIMITER(
-                ExecEnv::GetInstance()->block_compression_mem_tracker());
         if (!snappy::RawUncompress(input.data, input.size, output->data)) {
             return Status::InvalidArgument("Fail to do Snappy decompress");
         }
@@ -699,8 +716,6 @@ public:
     ~ZlibBlockCompression() {}
 
     Status compress(const Slice& input, faststring* output) override {
-        SCOPED_SWITCH_THREAD_MEM_TRACKER_LIMITER(
-                ExecEnv::GetInstance()->block_compression_mem_tracker());
         size_t max_len = max_compressed_len(input.size);
         output->resize(max_len);
         Slice s(*output);
@@ -715,8 +730,6 @@ public:
 
     Status compress(const std::vector<Slice>& inputs, size_t uncompressed_size,
                     faststring* output) override {
-        SCOPED_SWITCH_THREAD_MEM_TRACKER_LIMITER(
-                ExecEnv::GetInstance()->block_compression_mem_tracker());
         size_t max_len = max_compressed_len(uncompressed_size);
         output->resize(max_len);
 
@@ -757,8 +770,6 @@ public:
     }
 
     Status decompress(const Slice& input, Slice* output) override {
-        SCOPED_SWITCH_THREAD_MEM_TRACKER_LIMITER(
-                ExecEnv::GetInstance()->block_compression_mem_tracker());
         size_t input_size = input.size;
         auto zres =
                 ::uncompress2((Bytef*)output->data, &output->size, 
(Bytef*)input.data, &input_size);
@@ -781,13 +792,20 @@ private:
         ENABLE_FACTORY_CREATOR(CContext);
 
     public:
-        CContext() : ctx(nullptr) {}
+        CContext() : ctx(nullptr) {
+            SCOPED_SWITCH_THREAD_MEM_TRACKER_LIMITER(
+                    ExecEnv::GetInstance()->block_compression_mem_tracker());
+            buffer = std::make_unique<faststring>();
+        }
         ZSTD_CCtx* ctx;
-        faststring buffer;
+        std::unique_ptr<faststring> buffer;
         ~CContext() {
+            SCOPED_SWITCH_THREAD_MEM_TRACKER_LIMITER(
+                    ExecEnv::GetInstance()->block_compression_mem_tracker());
             if (ctx) {
                 ZSTD_freeCCtx(ctx);
             }
+            buffer.reset();
         }
     };
     class DContext {
@@ -826,8 +844,6 @@ public:
     //  
https://github.com/facebook/zstd/blob/dev/examples/streaming_compression.c
     Status compress(const std::vector<Slice>& inputs, size_t uncompressed_size,
                     faststring* output) override {
-        SCOPED_SWITCH_THREAD_MEM_TRACKER_LIMITER(
-                ExecEnv::GetInstance()->block_compression_mem_tracker());
         std::unique_ptr<CContext> context;
         RETURN_IF_ERROR(_acquire_compression_ctx(context));
         bool compress_failed = false;
@@ -845,9 +861,13 @@ public:
             compressed_buf.data = reinterpret_cast<char*>(output->data());
             compressed_buf.size = max_len;
         } else {
-            // reuse context buffer if max_len <= 
MAX_COMPRESSION_BUFFER_FOR_REUSE
-            context->buffer.resize(max_len);
-            compressed_buf.data = 
reinterpret_cast<char*>(context->buffer.data());
+            {
+                SCOPED_SWITCH_THREAD_MEM_TRACKER_LIMITER(
+                        
ExecEnv::GetInstance()->block_compression_mem_tracker());
+                // reuse context buffer if max_len <= 
MAX_COMPRESSION_BUFFER_FOR_REUSE
+                context->buffer->resize(max_len);
+            }
+            compressed_buf.data = 
reinterpret_cast<char*>(context->buffer->data());
             compressed_buf.size = max_len;
         }
 
@@ -904,8 +924,6 @@ public:
     }
 
     Status decompress(const Slice& input, Slice* output) override {
-        SCOPED_SWITCH_THREAD_MEM_TRACKER_LIMITER(
-                ExecEnv::GetInstance()->block_compression_mem_tracker());
         std::unique_ptr<DContext> context;
         bool decompress_failed = false;
         RETURN_IF_ERROR(_acquire_decompression_ctx(context));
@@ -1001,8 +1019,6 @@ public:
     ~GzipBlockCompression() override = default;
 
     Status decompress(const Slice& input, Slice* output) override {
-        SCOPED_SWITCH_THREAD_MEM_TRACKER_LIMITER(
-                ExecEnv::GetInstance()->block_compression_mem_tracker());
         z_stream z_strm = {};
         z_strm.zalloc = Z_NULL;
         z_strm.zfree = Z_NULL;
@@ -1084,8 +1100,6 @@ public:
     ~GzipBlockCompressionByLibdeflate() override = default;
 
     Status decompress(const Slice& input, Slice* output) override {
-        SCOPED_SWITCH_THREAD_MEM_TRACKER_LIMITER(
-                ExecEnv::GetInstance()->block_compression_mem_tracker());
         if (input.empty()) {
             output->size = 0;
             return Status::OK();
@@ -1118,8 +1132,6 @@ public:
     }
     size_t max_compressed_len(size_t len) override { return 0; };
     Status decompress(const Slice& input, Slice* output) override {
-        SCOPED_SWITCH_THREAD_MEM_TRACKER_LIMITER(
-                ExecEnv::GetInstance()->block_compression_mem_tracker());
         auto* input_ptr = input.data;
         auto remain_input_size = input.size;
         auto* output_ptr = output->data;


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org
For additional commands, e-mail: commits-h...@doris.apache.org

Reply via email to