This is an automated email from the ASF dual-hosted git repository.

laszlog pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/impala.git

commit 5cc358d7ca9d746c8cf063e442b42d7d94bc0e1e
Author: Joe McDonnell <[email protected]>
AuthorDate: Wed Apr 19 15:25:07 2023 -0700

    IMPALA-12076: Use ZSTD interfaces with reusable context
    
    For repeated compression/decompression, ZSTD recommends
    constructing a context once via ZSTD_createCCtx()/ZSTD_createDCtx()
    and using the set of interfaces that passes in the context explicitly
    to avoid constructing the context on each call.
    
    This follows the recommendation and allocates the ZSTD context once for
    each compressor / decompressor and reuses it for the lifetime of the
    compressor / decompressor.
    
    This gets a minor speedup for small-scale ZSTD TPC-H:
    
+----------+------------------------+---------+------------+------------+----------------+
    | Workload | File Format            | Avg (s) | Delta(Avg) | GeoMean(s) | 
Delta(GeoMean) |
    
+----------+------------------------+---------+------------+------------+----------------+
    | TPCH(42) | parquet / zstd / block | 3.55    | -1.40%     | 2.52       | 
-1.63%         |
    
+----------+------------------------+---------+------------+------------+----------------+
    
    Testing:
     - Ran core job
     - Ran a perf-AB-test job
    
    Change-Id: I5010a56bf8202ccb3f1710425002f81587fd412b
    Reviewed-on: http://gerrit.cloudera.org:8080/19773
    Reviewed-by: Impala Public Jenkins <[email protected]>
    Tested-by: Impala Public Jenkins <[email protected]>
---
 be/src/util/compress.cc   | 15 ++++++++++++++-
 be/src/util/compress.h    |  3 ++-
 be/src/util/decompress.cc | 13 ++++++++++---
 3 files changed, 26 insertions(+), 5 deletions(-)

diff --git a/be/src/util/compress.cc b/be/src/util/compress.cc
index 2e5128e71..88c6e96fc 100644
--- a/be/src/util/compress.cc
+++ b/be/src/util/compress.cc
@@ -330,6 +330,12 @@ Status Lz4Compressor::ProcessBlock(bool 
output_preallocated, int64_t input_lengt
 ZstandardCompressor::ZstandardCompressor(MemPool* mem_pool, bool reuse_buffer, 
int clevel)
   : Codec(mem_pool, reuse_buffer), clevel_(clevel) {}
 
+ZstandardCompressor::~ZstandardCompressor() {
+  if (stream_ != nullptr) {
+    static_cast<void>(ZSTD_freeCCtx(stream_));
+  }
+}
+
 int64_t ZstandardCompressor::MaxOutputLen(int64_t input_len, const uint8_t* 
input) {
   return ZSTD_compressBound(input_len);
 }
@@ -339,7 +345,14 @@ Status ZstandardCompressor::ProcessBlock(bool 
output_preallocated, int64_t input
   DCHECK_GE(input_length, 0);
   DCHECK(output_preallocated) << "Output was not allocated for Zstd Codec";
   if (input_length == 0) return Status::OK();
-  *output_length = ZSTD_compress(*output, *output_length, input, input_length, 
clevel_);
+  if (stream_ == nullptr) {
+    stream_ = ZSTD_createCCtx();
+    if (stream_ == nullptr) {
+      return Status(TErrorCode::ZSTD_ERROR, "ZSTD_createCCtx", "nullptr");
+    }
+  }
+  *output_length = ZSTD_compressCCtx(stream_, *output, *output_length, input,
+      input_length, clevel_);
   if (ZSTD_isError(*output_length)) {
     return Status(TErrorCode::ZSTD_ERROR, "ZSTD_compress",
         ZSTD_getErrorString(ZSTD_getErrorCode(*output_length)));
diff --git a/be/src/util/compress.h b/be/src/util/compress.h
index a9d3bbde9..d51e797f0 100644
--- a/be/src/util/compress.h
+++ b/be/src/util/compress.h
@@ -139,7 +139,7 @@ class ZstandardCompressor : public Codec {
  public:
   ZstandardCompressor(MemPool* mem_pool = nullptr, bool reuse_buffer = false,
       int clevel = ZSTD_CLEVEL_DEFAULT);
-  virtual ~ZstandardCompressor() {}
+  virtual ~ZstandardCompressor();
 
   virtual int64_t MaxOutputLen(
       int64_t input_len, const uint8_t* input = nullptr) override;
@@ -150,6 +150,7 @@ class ZstandardCompressor : public Codec {
 
  private:
   int clevel_;
+  ZSTD_CCtx* stream_ = nullptr;
 };
 
 /// Hadoop's block compression scheme on top of LZ4.
diff --git a/be/src/util/decompress.cc b/be/src/util/decompress.cc
index fbfcba513..99c02a521 100644
--- a/be/src/util/decompress.cc
+++ b/be/src/util/decompress.cc
@@ -630,7 +630,14 @@ Status ZstandardDecompressor::ProcessBlock(bool 
output_preallocated, int64_t inp
     const uint8_t* input, int64_t* output_length, uint8_t** output) {
   DCHECK(output_preallocated) << "Output was not allocated for Zstd Codec";
   if (*output_length == 0) return Status::OK();
-  size_t ret = ZSTD_decompress(*output, *output_length, input, input_length);
+  if (stream_ == NULL) {
+    stream_ = ZSTD_createDCtx();
+    if (stream_ == NULL) {
+      return Status(TErrorCode::COMPRESSED_FILE_DECOMPRESSOR_ERROR, "Zstd",
+          "ZSTD_createDCtx()", 0);
+    }
+  }
+  size_t ret = ZSTD_decompressDCtx(stream_, *output, *output_length, input, 
input_length);
   if (ZSTD_isError(ret)) {
     *output_length = 0;
     return Status(TErrorCode::ZSTD_ERROR, "ZSTD_decompress",
@@ -656,8 +663,8 @@ Status ZstandardDecompressor::ProcessBlockStreaming(int64_t 
input_length,
   if (stream_ == NULL) {
     stream_ = ZSTD_createDCtx();
     if (stream_ == NULL) {
-      return Status(
-          TErrorCode::COMPRESSED_FILE_DECOMPRESSOR_ERROR, "Zstd", 
"ZSTD_createDCtx()");
+      return Status(TErrorCode::COMPRESSED_FILE_DECOMPRESSOR_ERROR, "Zstd",
+          "ZSTD_createDCtx()", 0);
     }
   }
   *input_bytes_read = 0;

Reply via email to