This is an automated email from the ASF dual-hosted git repository. yiguolei pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/master by this push: new 3c4c4b1a87 [feature-wip](parquet-reader) add gzip compression codec (#12488) 3c4c4b1a87 is described below commit 3c4c4b1a872eed775166ecd797b3f397cc89160e Author: Ashin Gau <ashin...@users.noreply.github.com> AuthorDate: Fri Sep 9 09:10:25 2022 +0800 [feature-wip](parquet-reader) add gzip compression codec (#12488) Query failed when reading parquet data compressed by GZIP: mysql> select * from customer limit 1; ERROR 1105 (HY000): errCode = 2, detailMessage = unknown compression type(GZIP) --- be/src/util/block_compression.cpp | 78 ++++++++++++++++++++++++++++++++++++++- 1 file changed, 77 insertions(+), 1 deletion(-) diff --git a/be/src/util/block_compression.cpp b/be/src/util/block_compression.cpp index 1dafd20f54..4c58a7f86a 100644 --- a/be/src/util/block_compression.cpp +++ b/be/src/util/block_compression.cpp @@ -512,6 +512,79 @@ private: ZSTD_DCtx* ctx_d = nullptr; }; +class GzipBlockCompression final : public ZlibBlockCompression { +public: + ~GzipBlockCompression() override = default; + + Status decompress(const Slice& input, Slice* output) const override { + z_stream z_strm = {nullptr}; + z_strm.zalloc = Z_NULL; + z_strm.zfree = Z_NULL; + z_strm.opaque = Z_NULL; + + int ret = inflateInit2(&z_strm, MAX_WBITS + GZIP_CODEC); + if (ret != Z_OK) { + return Status::InternalError("Fail to do ZLib stream compress, error={}, res={}", + zError(ret), ret); + } + + // 1. set input and output + z_strm.next_in = reinterpret_cast<Bytef*>(input.data); + z_strm.avail_in = input.size; + z_strm.next_out = reinterpret_cast<Bytef*>(output->data); + z_strm.avail_out = output->size; + + if (z_strm.avail_out > 0) { + // We only support non-streaming use case for block decompressor + ret = inflate(&z_strm, Z_FINISH); + if (ret != Z_OK && ret != Z_STREAM_END) { + (void)inflateEnd(&z_strm); + return Status::InternalError("Fail to do ZLib stream compress, error={}, res={}", + zError(ret), ret); + } + } + (void)inflateEnd(&z_strm); + + return Status::OK(); + } + + size_t max_compressed_len(size_t len) const override { + z_stream zstrm; + zstrm.zalloc = Z_NULL; + zstrm.zfree = Z_NULL; + zstrm.opaque = Z_NULL; + auto zres = deflateInit2(&zstrm, Z_DEFAULT_COMPRESSION, Z_DEFLATED, MAX_WBITS + GZIP_CODEC, + MEM_LEVEL, Z_DEFAULT_STRATEGY); + if (zres != Z_OK) { + // Fall back to zlib estimate logic for deflate, notice this may + // cause decompress error + LOG(WARNING) << "Fail to do ZLib stream compress, error=" << zError(zres) + << ", res=" << zres; + return ZlibBlockCompression::max_compressed_len(len); + } else { + zres = deflateEnd(&zstrm); + if (zres != Z_OK) { + LOG(WARNING) << "Fail to do deflateEnd on ZLib stream, error=" << zError(zres) + << ", res=" << zres; + } + // Mark, maintainer of zlib, has stated that 12 needs to be added to + // result for gzip + // http://compgroups.net/comp.unix.programmer/gzip-compressing-an-in-memory-string-usi/54854 + // To have a safe upper bound for "wrapper variations", we add 32 to + // estimate + int upper_bound = deflateBound(&zstrm, len) + 32; + return upper_bound; + } + } + +private: + // Magic number for zlib, see https://zlib.net/manual.html for more details. + const static int GZIP_CODEC = 16; // gzip + // The memLevel parameter specifies how much memory should be allocated for + // the internal compression state. + const static int MEM_LEVEL = 8; +}; + Status get_block_compression_codec(segment_v2::CompressionTypePB type, std::unique_ptr<BlockCompressionCodec>& codec) { BlockCompressionCodec* ptr = nullptr; @@ -566,8 +639,11 @@ Status get_block_compression_codec(tparquet::CompressionCodec::type parquet_code case tparquet::CompressionCodec::ZSTD: ptr = new ZstdBlockCompression(); break; + case tparquet::CompressionCodec::GZIP: + ptr = new GzipBlockCompression(); + break; default: - return Status::NotFound("unknown compression type({})", parquet_codec); + return Status::NotFound("unknown compression type({})", tparquet::to_string(parquet_codec)); } Status st = ptr->init(); --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org For additional commands, e-mail: commits-h...@doris.apache.org