This is an automated email from the ASF dual-hosted git repository. joemcdonnell pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/impala.git
commit b9419ee32c98e95b5f1ea378624562673ead35be Author: Surya Hebbar <[email protected]> AuthorDate: Tue Apr 1 13:57:27 2025 +0530 IMPALA-13923: Support more compression levels for ZSTD and ZLIB This patch adds support for more compression levels for ZLIB, ZSTD and BZIP2. The following additional compression levels are now supported. For ZSTD, ZSTD_minCLevel(-ZSTD_TARGETLENGTH_MAX) to ZSTD_maxCLevel(20) For ZLIB i.e. ZLIB, GZIP and DEFLATE, Z_DEFAULT_COMPRESSION(1) to Z_BEST_COMPRESSION(9) For BZIP2 i.e. ZLIB, GZIP and DEFLATE, BlockSize100k * (1) to BlockSize100k * (9) Note: Currently, BZIP2 is only used by TmpFileMgr. It is not supported by Parquet(i.e. for writing tables). These are now supported with the "compression_codec" query option. This has been implemented by refactoring compression levels as an optional parameter in CodecInfo. For ZSTD, negative compression levels are now supported IMPALA-10630. Usage of compression level has been refactored with std::optional in - exec/parquet/hdfs-parquet-table-writer - runtime/tmp-file-mgr - service/query-options - util/codec - util/compress To validate compression levels externally, the following method has been added - Status Codec::ValidateCompressionLevel Added new tests for - * Additional compression levels for ZLIB, ZSTD and BZIP2 * Query option - "compression_codec" for the newly added formats and compression levels The following tests were executed to verify codecs and compression levels. - DecompressorTest.ZSTD* - DecompressorTest.Gzip - DecompressorTest.Bzip - QueryOptions.CompressionCodec - TestComputeStats::test_compute_stats_compression_codec For the stored Parquet, manually verified the compression codec used for ZSTD and ZLIB. Change-Id: I5b98c735246f08e04598a4e752c8cca04e31a88a Reviewed-on: http://gerrit.cloudera.org:8080/22718 Tested-by: Impala Public Jenkins <[email protected]> Reviewed-by: Michael Smith <[email protected]> Reviewed-by: Joe McDonnell <[email protected]> --- be/src/exec/parquet/hdfs-parquet-table-writer.cc | 12 ++-- be/src/runtime/tmp-file-mgr.cc | 1 - be/src/runtime/tmp-file-mgr.h | 8 +-- be/src/service/query-options-test.cc | 70 +++++++++++++++++++---- be/src/service/query-options.cc | 14 ++--- be/src/util/codec.cc | 45 +++++++++------ be/src/util/codec.h | 16 +++--- be/src/util/compress.cc | 71 ++++++++++++++++++++---- be/src/util/compress.h | 27 +++++++-- be/src/util/decompress-test.cc | 30 +++++++--- be/src/util/parse-util.cc | 32 ++++++----- be/src/util/parse-util.h | 3 +- 12 files changed, 239 insertions(+), 90 deletions(-) diff --git a/be/src/exec/parquet/hdfs-parquet-table-writer.cc b/be/src/exec/parquet/hdfs-parquet-table-writer.cc index 11d066432..e104eef47 100644 --- a/be/src/exec/parquet/hdfs-parquet-table-writer.cc +++ b/be/src/exec/parquet/hdfs-parquet-table-writer.cc @@ -1325,12 +1325,14 @@ Status HdfsParquetTableWriter::Init() { // Default to snappy compressed THdfsCompression::type codec = THdfsCompression::SNAPPY; - // Compression level only supported for zstd. - int clevel = ZSTD_CLEVEL_DEFAULT; + // Compression level only supported for zstd and zlib. + std::optional<int> clevel; const TQueryOptions& query_options = state_->query_options(); if (query_options.__isset.compression_codec) { codec = query_options.compression_codec.codec; - clevel = query_options.compression_codec.compression_level; + if (query_options.compression_codec.__isset.compression_level) { + clevel = query_options.compression_codec.compression_level; + } } else if (table_desc_->IsIcebergTable()) { TCompressionCodec compression_codec = table_desc_->IcebergParquetCompressionCodec(); codec = compression_codec.codec; @@ -1362,8 +1364,8 @@ Status HdfsParquetTableWriter::Init() { codec = ConvertParquetToImpalaCodec(parquet_codec); VLOG_FILE << "Using compression codec: " << codec; - if (codec == THdfsCompression::ZSTD) { - VLOG_FILE << "Using compression level: " << clevel; + if (clevel.has_value()) { + VLOG_FILE << "Using compression level: " << clevel.value(); } if (query_options.__isset.parquet_page_row_count_limit) { diff --git a/be/src/runtime/tmp-file-mgr.cc b/be/src/runtime/tmp-file-mgr.cc index 393f74699..494012bde 100644 --- a/be/src/runtime/tmp-file-mgr.cc +++ b/be/src/runtime/tmp-file-mgr.cc @@ -21,7 +21,6 @@ #include <mutex> #include <linux/falloc.h> -#include <zstd.h> // for ZSTD_CLEVEL_DEFAULT #include <boost/algorithm/string.hpp> #include <boost/filesystem.hpp> #include <boost/lexical_cast.hpp> diff --git a/be/src/runtime/tmp-file-mgr.h b/be/src/runtime/tmp-file-mgr.h index e9d929d81..23637d501 100644 --- a/be/src/runtime/tmp-file-mgr.h +++ b/be/src/runtime/tmp-file-mgr.h @@ -324,7 +324,7 @@ class TmpFileMgr { bool compression_enabled() const { return compression_codec_ != THdfsCompression::NONE; } - int compression_level() const { return compression_level_; } + std::optional<int> compression_level() const { return compression_level_; } bool punch_holes() const { return punch_holes_; } /// The minimum size of hole that we will try to punch in a scratch file. @@ -364,9 +364,9 @@ class TmpFileMgr { /// compression is used. THdfsCompression::type compression_codec_ = THdfsCompression::NONE; - /// The compression level, which is used for certain compression codecs like ZSTD - /// and ignored otherwise. -1 means not set/invalid. - int compression_level_ = -1; + /// The compression level used for certain compression codecs(i.e. ZSTD, ZLIB, BZIP2) + /// and ignored otherwise. + std::optional<int> compression_level_; /// Whether hole punching is enabled. bool punch_holes_ = false; diff --git a/be/src/service/query-options-test.cc b/be/src/service/query-options-test.cc index ba70a9cf5..c16e86fec 100644 --- a/be/src/service/query-options-test.cc +++ b/be/src/service/query-options-test.cc @@ -18,6 +18,7 @@ #include "service/query-options.h" #include <zstd.h> +#include <zlib.h> #include <boost/preprocessor/seq/for_each.hpp> #include <boost/preprocessor/tuple/to_seq.hpp> @@ -620,16 +621,21 @@ TEST(QueryOptions, CompressionCodec) { DEFLATE, BZIP2, SNAPPY, SNAPPY_BLOCKED, LZO, LZ4, ZLIB, ZSTD, BROTLI, LZ4_BLOCKED)); // Test valid values for compression_codec. for (auto& codec : codecs) { - EXPECT_TRUE(SetQueryOption("compression_codec", Substitute("$0",codec), &options, + EXPECT_TRUE(SetQueryOption("compression_codec", Substitute("$0", codec), &options, nullptr).ok()); - // Test that compression level is only supported for ZSTD. - if (codec != THdfsCompression::ZSTD) { - EXPECT_FALSE(SetQueryOption("compression_codec", Substitute("$0:1",codec), + // Test with compression levels for supported codecs i.e. ZLIB and ZSTD + switch(codec) { + case THdfsCompression::ZSTD: + case THdfsCompression::GZIP: + case THdfsCompression::ZLIB: + case THdfsCompression::BZIP2: + case THdfsCompression::DEFLATE: + EXPECT_TRUE(SetQueryOption("compression_codec", Substitute("$0:1", codec), &options, nullptr).ok()); - } - else { - EXPECT_TRUE(SetQueryOption("compression_codec", - Substitute("zstd:$0",ZSTD_CLEVEL_DEFAULT), &options, nullptr).ok()); + break; + default: + EXPECT_FALSE(SetQueryOption("compression_codec", Substitute("$0:1", codec), + &options, nullptr).ok()); } } @@ -642,18 +648,58 @@ TEST(QueryOptions, CompressionCodec) { EXPECT_FALSE(SetQueryOption("compression_codec", ":", &options, nullptr).ok()); EXPECT_FALSE(SetQueryOption("compression_codec", ":1", &options, nullptr).ok()); - // Test compression levels for ZSTD. - const int zstd_min_clevel = 1; + + // Test compression levels for ZSTD + // Test minimum negative compression level + EXPECT_TRUE(SetQueryOption("compression_codec", + Substitute("ZSTD:$0", ZSTD_minCLevel()), &options, nullptr).ok()); + + // ZSTD_minCLevel() could be very low (i.e. -ZSTD_TARGETLENGTH_MAX) + // The values could begin from -11000 or even less + // Unnecessary to begin the test range from such low values + const int zstd_min_clevel = -10; const int zstd_max_clevel = ZSTD_maxCLevel(); for (int i = zstd_min_clevel; i <= zstd_max_clevel; i++) { - EXPECT_TRUE(SetQueryOption("compression_codec", Substitute("ZSTD:$0",i), &options, + EXPECT_TRUE(SetQueryOption("compression_codec", Substitute("ZSTD:$0", i), &options, nullptr).ok()); } + EXPECT_TRUE(SetQueryOption("compression_codec", + Substitute("ZSTD:$0", ZSTD_minCLevel()), &options, nullptr).ok()); EXPECT_FALSE(SetQueryOption("compression_codec", - Substitute("ZSTD:$0", zstd_min_clevel - 1), &options, nullptr).ok()); + Substitute("ZSTD:$0", ZSTD_minCLevel() - 1), &options, nullptr).ok()); EXPECT_FALSE(SetQueryOption("compression_codec", Substitute("ZSTD:$0", zstd_max_clevel + 1), &options, nullptr).ok()); + + + // Test compression levels for ZLIB + const int gzip_min_clevel = Z_BEST_SPEED; + const int gzip_max_clevel = Z_BEST_COMPRESSION; + for (int i = gzip_min_clevel; i <= gzip_max_clevel; i++) + { + EXPECT_TRUE(SetQueryOption("compression_codec", Substitute("GZIP:$0", i), &options, + nullptr).ok()); + } + EXPECT_FALSE(SetQueryOption("compression_codec", + Substitute("GZIP:$0", gzip_min_clevel - 1), &options, nullptr).ok()); + EXPECT_FALSE(SetQueryOption("compression_codec", + Substitute("GZIP:$0", gzip_max_clevel + 1), &options, nullptr).ok()); + + + // Test compression levels for BZIP2 + // Note: BZIP2 is not supported by parquet + + const int bzip_min_clevel = 1; + const int bzip_max_clevel = 9; + for (int i = bzip_min_clevel; i <= bzip_max_clevel; i++) + { + EXPECT_TRUE(SetQueryOption("compression_codec", Substitute("BZIP2:$0", i), &options, + nullptr).ok()); + } + EXPECT_FALSE(SetQueryOption("compression_codec", + Substitute("BZIP2:$0", bzip_min_clevel - 1), &options, nullptr).ok()); + EXPECT_FALSE(SetQueryOption("compression_codec", + Substitute("BZIP2:$0", bzip_max_clevel + 1), &options, nullptr).ok()); #undef CASE #undef ENTRIES #undef ENTRY diff --git a/be/src/service/query-options.cc b/be/src/service/query-options.cc index a409cd0e2..3717994d9 100644 --- a/be/src/service/query-options.cc +++ b/be/src/service/query-options.cc @@ -94,11 +94,11 @@ static const string& PrintQueryOptionValue(const string& option) { } static string PrintQueryOptionValue(const impala::TCompressionCodec& compression_codec) { - if (compression_codec.codec != THdfsCompression::ZSTD) { - return Substitute("$0", PrintValue(compression_codec.codec)); - } else { + if (compression_codec.__isset.compression_level) { return Substitute("$0:$1", PrintValue(compression_codec.codec), compression_codec.compression_level); + } else { + return Substitute("$0", PrintValue(compression_codec.codec)); } } @@ -327,13 +327,13 @@ Status impala::SetQueryOption(TImpalaQueryOptions::type option, const string& va }; case TImpalaQueryOptions::COMPRESSION_CODEC: { THdfsCompression::type enum_type; - int compression_level; + std::optional<int> compression_level; RETURN_IF_ERROR( ParseUtil::ParseCompressionCodec(value, &enum_type, &compression_level)); TCompressionCodec compression_codec; compression_codec.__set_codec(enum_type); - if (enum_type == THdfsCompression::ZSTD) { - compression_codec.__set_compression_level(compression_level); + if (compression_level.has_value()) { + compression_codec.__set_compression_level(compression_level.value()); } query_options->__set_compression_codec(compression_codec); break; @@ -1489,7 +1489,7 @@ static void HashQueryOptionValue(const string& option, HashState& hash) { static void HashQueryOptionValue( const TCompressionCodec& compression_codec, HashState& hash) { HashQueryOptionValue(compression_codec.codec, hash); - if (compression_codec.codec == THdfsCompression::ZSTD) { + if (compression_codec.__isset.compression_level) { HashQueryOptionValue(compression_codec.compression_level, hash); } } diff --git a/be/src/util/codec.cc b/be/src/util/codec.cc index 4b9811c25..37533c064 100644 --- a/be/src/util/codec.cc +++ b/be/src/util/codec.cc @@ -81,19 +81,6 @@ Status Codec::GetHadoopCodecClassName(THdfsCompression::type type, string* out_n Codec::~Codec() {} -Status Codec::CreateCompressor(MemPool* mem_pool, bool reuse, const string& codec, - scoped_ptr<Codec>* compressor) { - CodecMap::const_iterator type = CODEC_MAP.find(codec); - if (type == CODEC_MAP.end()) { - return Status(Substitute("$0$1", UNKNOWN_CODEC_ERROR, codec)); - } - - CodecInfo codec_info( - type->second, (type->second == THdfsCompression::ZSTD) ? ZSTD_CLEVEL_DEFAULT : 0); - RETURN_IF_ERROR(CreateCompressor(mem_pool, reuse, codec_info, compressor)); - return Status::OK(); -} - Status Codec::CreateCompressor(MemPool* mem_pool, bool reuse, const CodecInfo& codec_info, scoped_ptr<Codec>* compressor) { THdfsCompression::type format = codec_info.format_; @@ -102,16 +89,21 @@ Status Codec::CreateCompressor(MemPool* mem_pool, bool reuse, const CodecInfo& c compressor->reset(nullptr); return Status::OK(); case THdfsCompression::GZIP: - compressor->reset(new GzipCompressor(GzipCompressor::GZIP, mem_pool, reuse)); + compressor->reset(new GzipCompressor(GzipCompressor::GZIP, mem_pool, reuse, + codec_info.compression_level_)); break; case THdfsCompression::DEFAULT: - compressor->reset(new GzipCompressor(GzipCompressor::ZLIB, mem_pool, reuse)); + case THdfsCompression::ZLIB: + compressor->reset(new GzipCompressor(GzipCompressor::ZLIB, mem_pool, reuse, + codec_info.compression_level_)); break; case THdfsCompression::DEFLATE: - compressor->reset(new GzipCompressor(GzipCompressor::DEFLATE, mem_pool, reuse)); + compressor->reset(new GzipCompressor(GzipCompressor::DEFLATE, mem_pool, reuse, + codec_info.compression_level_)); break; case THdfsCompression::BZIP2: - compressor->reset(new BzipCompressor(mem_pool, reuse)); + compressor->reset(new BzipCompressor(mem_pool, reuse, + codec_info.compression_level_)); break; case THdfsCompression::SNAPPY_BLOCKED: compressor->reset(new SnappyBlockCompressor(mem_pool, reuse)); @@ -138,6 +130,25 @@ Status Codec::CreateCompressor(MemPool* mem_pool, bool reuse, const CodecInfo& c return (*compressor)->Init(); } +Status Codec::ValidateCompressionLevel(THdfsCompression::type format, + int compression_level) { + switch(format) { + case THdfsCompression::GZIP: + case THdfsCompression::ZLIB: + case THdfsCompression::DEFLATE: + return GzipCompressor::ValidateCompressionLevel(compression_level); + case THdfsCompression::ZSTD: + return ZstandardCompressor::ValidateCompressionLevel(compression_level); + case THdfsCompression::BZIP2: + return BzipCompressor::ValidateCompressionLevel(compression_level); + default: + // Note: BZIP2 compression levels are supported for disk-spill + // Parquet or ORC does not support BZIP compression + return Status("Compression level only supported for ZSTD, ZLIB(GZIP, DEFLATE)" + " and BZIP2. Note: BZIP2 is not supported by Parquet(i.e. to write tables)"); + } +} + Status Codec::CreateDecompressor(MemPool* mem_pool, bool reuse, const string& codec, scoped_ptr<Codec>* decompressor) { CodecMap::const_iterator type = CODEC_MAP.find(codec); diff --git a/be/src/util/codec.h b/be/src/util/codec.h index 589746365..a887d62a6 100644 --- a/be/src/util/codec.h +++ b/be/src/util/codec.h @@ -61,12 +61,14 @@ class Codec { struct CodecInfo { public: - CodecInfo(THdfsCompression::type format, int compression_level = 0) + CodecInfo(THdfsCompression::type format, + std::optional<int> compression_level = std::nullopt) : format_(format), compression_level_(compression_level) {} THdfsCompression::type format_; - // Currently only ZSTD uses compression level. - int compression_level_; + + // Currently GZIP, ZSTD and BZIP2 use compression levels. + std::optional<int> compression_level_; }; /// Create a decompressor. @@ -98,16 +100,16 @@ class Codec { const CodecInfo& codec_info, boost::scoped_ptr<Codec>* compressor) WARN_UNUSED_RESULT; - /// Alternate factory method: takes a codec string and populates a scoped pointer. - static Status CreateCompressor(MemPool* mem_pool, bool reuse, const std::string& codec, - boost::scoped_ptr<Codec>* compressor) WARN_UNUSED_RESULT; - /// Return the name of a compression algorithm. static std::string GetCodecName(THdfsCompression::type); /// Returns the java class name for the given compression type static Status GetHadoopCodecClassName( THdfsCompression::type, std::string* out_name) WARN_UNUSED_RESULT; + /// Validate compression levels and return status objects with approprate messages + static Status ValidateCompressionLevel(THdfsCompression::type, + int compression_level); + virtual ~Codec(); /// Initialize the codec. This should only be called once. diff --git a/be/src/util/compress.cc b/be/src/util/compress.cc index 88c6e96fc..9c28f6c59 100644 --- a/be/src/util/compress.cc +++ b/be/src/util/compress.cc @@ -41,10 +41,14 @@ using boost::crc_32_type; using namespace impala; using namespace strings; +GzipCompressor::GzipCompressor(Format format, MemPool* mem_pool, bool reuse_buffer, + std::optional<int> compression_level) : Codec(mem_pool, reuse_buffer), + format_(format), compression_level_(compression_level.value_or(6)) { + // Within ZLIB, Z_DEFAULT_COMPRESSION(-1) is mapped to a value of 6 + + // To limit compression levels in range [Z_BEST_SPEED, Z_BEST_COMPRESSION] + // Default compression level is set to 6 -GzipCompressor::GzipCompressor(Format format, MemPool* mem_pool, bool reuse_buffer) - : Codec(mem_pool, reuse_buffer), - format_(format) { bzero(&stream_, sizeof(stream_)); } @@ -61,7 +65,10 @@ Status GzipCompressor::Init() { } else if (format_ == GZIP) { window_bits += GZIP_CODEC; } - if ((ret = deflateInit2(&stream_, Z_DEFAULT_COMPRESSION, Z_DEFLATED, + Status clevel_status = ValidateCompressionLevel(compression_level_); + if (!clevel_status.ok()) { return clevel_status; } + + if ((ret = deflateInit2(&stream_, compression_level_, Z_DEFLATED, window_bits, 9, Z_DEFAULT_STRATEGY )) != Z_OK) { return Status("zlib deflateInit failed: " + string(stream_.msg)); } @@ -140,8 +147,23 @@ Status GzipCompressor::ProcessBlock(bool output_preallocated, return Status::OK(); } -BzipCompressor::BzipCompressor(MemPool* mem_pool, bool reuse_buffer) - : Codec(mem_pool, reuse_buffer) { +Status GzipCompressor::ValidateCompressionLevel(int compression_level) { + if (compression_level >= Z_BEST_SPEED && compression_level <= Z_BEST_COMPRESSION) { + return Status::OK(); + } else { + return Status(Substitute("GZIP compression levels should be in range - [$0, $1]." + " Current compression level - $2.", Z_BEST_SPEED, Z_BEST_COMPRESSION, + compression_level)); + } +} + +BzipCompressor::BzipCompressor(MemPool* mem_pool, bool reuse_buffer, + std::optional<int> compression_level) + : Codec(mem_pool, reuse_buffer), compression_level_(compression_level.value_or(5)) { +} + +Status BzipCompressor::Init() { + return ValidateCompressionLevel(compression_level_); } int64_t BzipCompressor::MaxOutputLen(int64_t input_len, const uint8_t* input) { @@ -174,10 +196,12 @@ Status BzipCompressor::ProcessBlock(bool output_preallocated, int64_t input_leng buffer_length_ = buffer_length_ * 2; out_buffer_ = temp_memory_pool_->Allocate(buffer_length_); } + // In BZ2 library, the compression level is directly mapped to blockSize100k outlen = static_cast<unsigned int>(buffer_length_); if ((ret = BZ2_bzBuffToBuffCompress(reinterpret_cast<char*>(out_buffer_), &outlen, const_cast<char*>(reinterpret_cast<const char*>(input)), - static_cast<unsigned int>(input_length), 5, 2, 0)) == BZ_OUTBUFF_FULL) { + static_cast<unsigned int>(input_length), compression_level_, 2, 0)) + == BZ_OUTBUFF_FULL) { if (output_preallocated) { return Status("Too small buffer passed to BzipCompressor"); } @@ -197,6 +221,17 @@ Status BzipCompressor::ProcessBlock(bool output_preallocated, int64_t input_leng return Status::OK(); } +Status BzipCompressor::ValidateCompressionLevel(int compression_level) { + if (compression_level >= BZ_MIN_COMPRESSION_LEVEL && + compression_level <= BZ_MAX_COMPRESSION_LEVEL) { + return Status::OK(); + } else { + return Status(Substitute("BZIP2 compression levels should be in range - [$0, $1]." + " Current compression level - $2", BZ_MIN_COMPRESSION_LEVEL, + BZ_MAX_COMPRESSION_LEVEL, compression_level)); + } +} + // Currently this is only use for testing of the decompressor. SnappyBlockCompressor::SnappyBlockCompressor(MemPool* mem_pool, bool reuse_buffer) : Codec(mem_pool, reuse_buffer) { @@ -327,8 +362,14 @@ Status Lz4Compressor::ProcessBlock(bool output_preallocated, int64_t input_lengt return Status::OK(); } -ZstandardCompressor::ZstandardCompressor(MemPool* mem_pool, bool reuse_buffer, int clevel) - : Codec(mem_pool, reuse_buffer), clevel_(clevel) {} +ZstandardCompressor::ZstandardCompressor(MemPool* mem_pool, bool reuse_buffer, + std::optional<int> compression_level) : Codec(mem_pool, reuse_buffer), + compression_level_(compression_level.value_or(ZSTD_defaultCLevel())) { +} + +Status ZstandardCompressor::Init() { + return ValidateCompressionLevel(compression_level_); +} ZstandardCompressor::~ZstandardCompressor() { if (stream_ != nullptr) { @@ -352,7 +393,7 @@ Status ZstandardCompressor::ProcessBlock(bool output_preallocated, int64_t input } } *output_length = ZSTD_compressCCtx(stream_, *output, *output_length, input, - input_length, clevel_); + input_length, compression_level_); if (ZSTD_isError(*output_length)) { return Status(TErrorCode::ZSTD_ERROR, "ZSTD_compress", ZSTD_getErrorString(ZSTD_getErrorCode(*output_length))); @@ -360,6 +401,16 @@ Status ZstandardCompressor::ProcessBlock(bool output_preallocated, int64_t input return Status::OK(); } +Status ZstandardCompressor::ValidateCompressionLevel(int compression_level) { + if (compression_level >= ZSTD_minCLevel() && compression_level <= ZSTD_maxCLevel()) { + return Status::OK(); + } else { + return Status(Substitute("ZSTD compression levels should be in range - [$0, $1]." + " Current compression level - $2", ZSTD_minCLevel(), ZSTD_maxCLevel(), + compression_level)); + } +} + Lz4BlockCompressor::Lz4BlockCompressor(MemPool* mem_pool, bool reuse_buffer) : Codec(mem_pool, reuse_buffer) { } diff --git a/be/src/util/compress.h b/be/src/util/compress.h index d51e797f0..7a4139c2c 100644 --- a/be/src/util/compress.h +++ b/be/src/util/compress.h @@ -27,6 +27,9 @@ #include "common/status.h" #include "util/codec.h" +#define BZ_MIN_COMPRESSION_LEVEL 1 +#define BZ_MAX_COMPRESSION_LEVEL 9 + namespace impala { class MemPool; @@ -44,7 +47,8 @@ class GzipCompressor : public Codec { GZIP, }; - GzipCompressor(Format format, MemPool* mem_pool = nullptr, bool reuse_buffer = false); + GzipCompressor(Format format, MemPool* mem_pool = nullptr, bool reuse_buffer = false, + std::optional<int> compression_level = std::nullopt); virtual ~GzipCompressor(); virtual Status Init() override WARN_UNUSED_RESULT; @@ -56,12 +60,16 @@ class GzipCompressor : public Codec { virtual std::string file_extension() const override { return "gz"; } + static Status ValidateCompressionLevel(int compression_level); + private: Format format_; /// Structure used to communicate with the library. z_stream stream_; + int compression_level_; + /// These are magic numbers from zlib.h. Not clear why they are not defined there. const static int WINDOW_BITS = 15; // Maximum window size const static int GZIP_CODEC = 16; // Output Gzip. @@ -76,15 +84,22 @@ class GzipCompressor : public Codec { class BzipCompressor : public Codec { public: - BzipCompressor(MemPool* mem_pool, bool reuse_buffer); + BzipCompressor(MemPool* mem_pool, bool reuse_buffer, + std::optional<int> compression_level = std::nullopt); virtual ~BzipCompressor() { } + virtual Status Init() override WARN_UNUSED_RESULT; virtual int64_t MaxOutputLen( int64_t input_len, const uint8_t* input = nullptr) override; virtual Status ProcessBlock(bool output_preallocated, int64_t input_length, const uint8_t* input, int64_t* output_length, uint8_t** output) override WARN_UNUSED_RESULT; virtual std::string file_extension() const override { return "bz2"; } + + static Status ValidateCompressionLevel(int compression_level); + + private: + int compression_level_; }; class SnappyBlockCompressor : public Codec { @@ -138,9 +153,11 @@ class Lz4Compressor : public Codec { class ZstandardCompressor : public Codec { public: ZstandardCompressor(MemPool* mem_pool = nullptr, bool reuse_buffer = false, - int clevel = ZSTD_CLEVEL_DEFAULT); + std::optional<int> compression_level = std::nullopt); virtual ~ZstandardCompressor(); + virtual Status Init() override WARN_UNUSED_RESULT; + virtual int64_t MaxOutputLen( int64_t input_len, const uint8_t* input = nullptr) override; virtual Status ProcessBlock(bool output_preallocated, int64_t input_length, @@ -148,8 +165,10 @@ class ZstandardCompressor : public Codec { uint8_t** output) override WARN_UNUSED_RESULT; virtual std::string file_extension() const override { return "zst"; } + static Status ValidateCompressionLevel(int compression_level); + private: - int clevel_; + int compression_level_; ZSTD_CCtx* stream_ = nullptr; }; diff --git a/be/src/util/decompress-test.cc b/be/src/util/decompress-test.cc index f24eb7c65..98f6128a6 100644 --- a/be/src/util/decompress-test.cc +++ b/be/src/util/decompress-test.cc @@ -64,11 +64,12 @@ class DecompressorTest : public ::testing::Test { mem_pool_.FreeAll(); } - void RunTest(THdfsCompression::type format, int clevel = 0) { + void RunTest(THdfsCompression::type format, std::optional<int> clevel = std::nullopt) { scoped_ptr<Codec> compressor; scoped_ptr<Codec> decompressor; - Codec::CodecInfo codec_info(format, clevel); + Codec::CodecInfo codec_info = Codec::CodecInfo(format, clevel); + EXPECT_OK(Codec::CreateCompressor(&mem_pool_, true, codec_info, &compressor)); EXPECT_OK(Codec::CreateDecompressor(&mem_pool_, true, format, &decompressor)); @@ -102,7 +103,8 @@ class DecompressorTest : public ::testing::Test { decompressor->Close(); } - void RunTestStreaming(THdfsCompression::type format, int compression_level = 0) { + void RunTestStreaming(THdfsCompression::type format, + std::optional<int> compression_level = std::nullopt) { scoped_ptr<Codec> compressor; scoped_ptr<Codec> decompressor; Codec::CodecInfo codec_info(format, compression_level); @@ -409,6 +411,9 @@ TEST_F(DecompressorTest, Gzip) { RunTest(THdfsCompression::GZIP); RunTestStreaming(THdfsCompression::GZIP); RunTestMultiStreamDecompressing(THdfsCompression::GZIP); + + RunTest(THdfsCompression::GZIP, Z_BEST_SPEED); + RunTestStreaming(THdfsCompression::GZIP, Z_BEST_COMPRESSION); } TEST_F(DecompressorTest, Deflate) { @@ -421,6 +426,9 @@ TEST_F(DecompressorTest, Bzip) { RunTest(THdfsCompression::BZIP2); RunTestStreaming(THdfsCompression::BZIP2); RunTestMultiStreamDecompressing(THdfsCompression::BZIP2); + + RunTest(THdfsCompression::BZIP2, BZ_MIN_COMPRESSION_LEVEL); + RunTestStreaming(THdfsCompression::BZIP2, BZ_MAX_COMPRESSION_LEVEL); } TEST_F(DecompressorTest, SnappyBlocked) { @@ -496,10 +504,18 @@ TEST_F(DecompressorTest, ZSTD) { RunTest(THdfsCompression::ZSTD, ZSTD_CLEVEL_DEFAULT); mt19937 rng; RandTestUtil::SeedRng("ZSTD_COMPRESSION_LEVEL_SEED", &rng); - // zstd supports compression levels from 1 up to ZSTD_maxCLevel() - const int clevel = uniform_int_distribution<int>(1, ZSTD_maxCLevel())(rng); - RunTest(THdfsCompression::ZSTD, clevel); - RunTestStreaming(THdfsCompression::ZSTD, clevel); + + // Test negative compression level + const int clevel_n = + uniform_int_distribution<int>(ZSTD_minCLevel(), -1)(rng); + RunTest(THdfsCompression::ZSTD, clevel_n); + RunTestStreaming(THdfsCompression::ZSTD, clevel_n); + + // Test positive compression level + const int clevel_p = + uniform_int_distribution<int>(0, ZSTD_maxCLevel())(rng); + RunTest(THdfsCompression::ZSTD, clevel_p); + RunTestStreaming(THdfsCompression::ZSTD, clevel_p); } TEST_F(DecompressorTest, ZSTDHuge) { diff --git a/be/src/util/parse-util.cc b/be/src/util/parse-util.cc index 9ff31c55b..241b29657 100644 --- a/be/src/util/parse-util.cc +++ b/be/src/util/parse-util.cc @@ -19,11 +19,11 @@ #include <sstream> -#include <zstd.h> #include <boost/algorithm/string/classification.hpp> #include <boost/algorithm/string/split.hpp> #include <boost/algorithm/string/trim.hpp> +#include "util/codec.h" #include "util/mem-info.h" #include "util/string-parser.h" @@ -115,9 +115,10 @@ int64_t ParseUtil::ParseMemSpec(const string& mem_spec_str, bool* is_percent, } Status ParseUtil::ParseCompressionCodec( - const string& compression_codec, THdfsCompression::type* type, int* level) { + const string& compression_codec, THdfsCompression::type* type, + std::optional<int>* level) { // Acceptable values are: - // - zstd:compression_level + // - zstd, gzip :compression_level // - codec vector<string> tokens; split(tokens, compression_codec, is_any_of(":"), token_compress_on); @@ -125,29 +126,30 @@ Status ParseUtil::ParseCompressionCodec( string& codec_name = tokens[0]; trim(codec_name); - int compression_level = ZSTD_CLEVEL_DEFAULT; THdfsCompression::type enum_type; RETURN_IF_ERROR(GetThriftEnum( codec_name, "compression codec", _THdfsCompression_VALUES_TO_NAMES, &enum_type)); + *type = enum_type; + if (tokens.size() == 2) { - if (enum_type != THdfsCompression::ZSTD) { - return Status("Compression level only supported for ZSTD"); - } StringParser::ParseResult status; string& clevel = tokens[1]; trim(clevel); - compression_level = StringParser::StringToInt<int>( + int compression_level = StringParser::StringToInt<int>( clevel.c_str(), static_cast<int>(clevel.size()), &status); - if (status != StringParser::PARSE_SUCCESS || compression_level < 1 - || compression_level > ZSTD_maxCLevel()) { - return Status(Substitute("Invalid ZSTD compression level '$0'." - " Valid values are in [1,$1]", - clevel, ZSTD_maxCLevel())); + + if (status == StringParser::PARSE_SUCCESS) { + Status res = Codec::ValidateCompressionLevel(enum_type, compression_level); + if (res.ok()) { + level->emplace(compression_level); + } + return res; + } else { + return Status(Substitute("Invalid compression level value - $0" + ", should be an integer", clevel)); } } - *type = enum_type; - *level = compression_level; return Status::OK(); } diff --git a/be/src/util/parse-util.h b/be/src/util/parse-util.h index 2ef4f5135..cfb5edd7c 100644 --- a/be/src/util/parse-util.h +++ b/be/src/util/parse-util.h @@ -70,7 +70,8 @@ class ParseUtil { bool* is_percent, int64_t relative_reference); static Status ParseCompressionCodec( - const std::string& compression_codec, THdfsCompression::type* type, int* level); + const std::string& compression_codec, THdfsCompression::type* type, + std::optional<int>* level); }; std::string GetThriftEnumValues(const std::map<int, const char*>& enum_values_to_names);
