This is an automated email from the ASF dual-hosted git repository. laszlog pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/impala.git
commit 5cc358d7ca9d746c8cf063e442b42d7d94bc0e1e Author: Joe McDonnell <[email protected]> AuthorDate: Wed Apr 19 15:25:07 2023 -0700 IMPALA-12076: Use ZSTD interfaces with reusable context For repeated compression/decompression, ZSTD recommends constructing a context once via ZSTD_createCCtx()/ZSTD_createDCtx() and using the set of interfaces that passes in the context explicitly to avoid constructing the context on each call. This follows the recommendation and allocates the ZSTD context once for each compressor / decompressor and reuses it for the lifetime of the compressor / decompressor. This gets a minor speedup for small-scale ZSTD TPC-H: +----------+------------------------+---------+------------+------------+----------------+ | Workload | File Format | Avg (s) | Delta(Avg) | GeoMean(s) | Delta(GeoMean) | +----------+------------------------+---------+------------+------------+----------------+ | TPCH(42) | parquet / zstd / block | 3.55 | -1.40% | 2.52 | -1.63% | +----------+------------------------+---------+------------+------------+----------------+ Testing: - Ran core job - Ran a perf-AB-test job Change-Id: I5010a56bf8202ccb3f1710425002f81587fd412b Reviewed-on: http://gerrit.cloudera.org:8080/19773 Reviewed-by: Impala Public Jenkins <[email protected]> Tested-by: Impala Public Jenkins <[email protected]> --- be/src/util/compress.cc | 15 ++++++++++++++- be/src/util/compress.h | 3 ++- be/src/util/decompress.cc | 13 ++++++++++--- 3 files changed, 26 insertions(+), 5 deletions(-) diff --git a/be/src/util/compress.cc b/be/src/util/compress.cc index 2e5128e71..88c6e96fc 100644 --- a/be/src/util/compress.cc +++ b/be/src/util/compress.cc @@ -330,6 +330,12 @@ Status Lz4Compressor::ProcessBlock(bool output_preallocated, int64_t input_lengt ZstandardCompressor::ZstandardCompressor(MemPool* mem_pool, bool reuse_buffer, int clevel) : Codec(mem_pool, reuse_buffer), clevel_(clevel) {} +ZstandardCompressor::~ZstandardCompressor() { + if (stream_ != nullptr) { + static_cast<void>(ZSTD_freeCCtx(stream_)); + } +} + int64_t ZstandardCompressor::MaxOutputLen(int64_t input_len, const uint8_t* input) { return ZSTD_compressBound(input_len); } @@ -339,7 +345,14 @@ Status ZstandardCompressor::ProcessBlock(bool output_preallocated, int64_t input DCHECK_GE(input_length, 0); DCHECK(output_preallocated) << "Output was not allocated for Zstd Codec"; if (input_length == 0) return Status::OK(); - *output_length = ZSTD_compress(*output, *output_length, input, input_length, clevel_); + if (stream_ == nullptr) { + stream_ = ZSTD_createCCtx(); + if (stream_ == nullptr) { + return Status(TErrorCode::ZSTD_ERROR, "ZSTD_createCCtx", "nullptr"); + } + } + *output_length = ZSTD_compressCCtx(stream_, *output, *output_length, input, + input_length, clevel_); if (ZSTD_isError(*output_length)) { return Status(TErrorCode::ZSTD_ERROR, "ZSTD_compress", ZSTD_getErrorString(ZSTD_getErrorCode(*output_length))); diff --git a/be/src/util/compress.h b/be/src/util/compress.h index a9d3bbde9..d51e797f0 100644 --- a/be/src/util/compress.h +++ b/be/src/util/compress.h @@ -139,7 +139,7 @@ class ZstandardCompressor : public Codec { public: ZstandardCompressor(MemPool* mem_pool = nullptr, bool reuse_buffer = false, int clevel = ZSTD_CLEVEL_DEFAULT); - virtual ~ZstandardCompressor() {} + virtual ~ZstandardCompressor(); virtual int64_t MaxOutputLen( int64_t input_len, const uint8_t* input = nullptr) override; @@ -150,6 +150,7 @@ class ZstandardCompressor : public Codec { private: int clevel_; + ZSTD_CCtx* stream_ = nullptr; }; /// Hadoop's block compression scheme on top of LZ4. diff --git a/be/src/util/decompress.cc b/be/src/util/decompress.cc index fbfcba513..99c02a521 100644 --- a/be/src/util/decompress.cc +++ b/be/src/util/decompress.cc @@ -630,7 +630,14 @@ Status ZstandardDecompressor::ProcessBlock(bool output_preallocated, int64_t inp const uint8_t* input, int64_t* output_length, uint8_t** output) { DCHECK(output_preallocated) << "Output was not allocated for Zstd Codec"; if (*output_length == 0) return Status::OK(); - size_t ret = ZSTD_decompress(*output, *output_length, input, input_length); + if (stream_ == NULL) { + stream_ = ZSTD_createDCtx(); + if (stream_ == NULL) { + return Status(TErrorCode::COMPRESSED_FILE_DECOMPRESSOR_ERROR, "Zstd", + "ZSTD_createDCtx()", 0); + } + } + size_t ret = ZSTD_decompressDCtx(stream_, *output, *output_length, input, input_length); if (ZSTD_isError(ret)) { *output_length = 0; return Status(TErrorCode::ZSTD_ERROR, "ZSTD_decompress", @@ -656,8 +663,8 @@ Status ZstandardDecompressor::ProcessBlockStreaming(int64_t input_length, if (stream_ == NULL) { stream_ = ZSTD_createDCtx(); if (stream_ == NULL) { - return Status( - TErrorCode::COMPRESSED_FILE_DECOMPRESSOR_ERROR, "Zstd", "ZSTD_createDCtx()"); + return Status(TErrorCode::COMPRESSED_FILE_DECOMPRESSOR_ERROR, "Zstd", + "ZSTD_createDCtx()", 0); } } *input_bytes_read = 0;
