This is an automated email from the ASF dual-hosted git repository.

yiguolei pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/doris.git


The following commit(s) were added to refs/heads/master by this push:
     new 3c4c4b1a87 [feature-wip](parquet-reader) add gzip compression codec 
(#12488)
3c4c4b1a87 is described below

commit 3c4c4b1a872eed775166ecd797b3f397cc89160e
Author: Ashin Gau <ashin...@users.noreply.github.com>
AuthorDate: Fri Sep 9 09:10:25 2022 +0800

    [feature-wip](parquet-reader) add gzip compression codec (#12488)
    
    Query failed when reading parquet data compressed by GZIP:
    
    mysql> select * from customer limit 1;
    ERROR 1105 (HY000): errCode = 2, detailMessage = unknown compression 
type(GZIP)
---
 be/src/util/block_compression.cpp | 78 ++++++++++++++++++++++++++++++++++++++-
 1 file changed, 77 insertions(+), 1 deletion(-)

diff --git a/be/src/util/block_compression.cpp 
b/be/src/util/block_compression.cpp
index 1dafd20f54..4c58a7f86a 100644
--- a/be/src/util/block_compression.cpp
+++ b/be/src/util/block_compression.cpp
@@ -512,6 +512,79 @@ private:
     ZSTD_DCtx* ctx_d = nullptr;
 };
 
+class GzipBlockCompression final : public ZlibBlockCompression {
+public:
+    ~GzipBlockCompression() override = default;
+
+    Status decompress(const Slice& input, Slice* output) const override {
+        z_stream z_strm = {nullptr};
+        z_strm.zalloc = Z_NULL;
+        z_strm.zfree = Z_NULL;
+        z_strm.opaque = Z_NULL;
+
+        int ret = inflateInit2(&z_strm, MAX_WBITS + GZIP_CODEC);
+        if (ret != Z_OK) {
+            return Status::InternalError("Fail to do ZLib stream compress, 
error={}, res={}",
+                                         zError(ret), ret);
+        }
+
+        // 1. set input and output
+        z_strm.next_in = reinterpret_cast<Bytef*>(input.data);
+        z_strm.avail_in = input.size;
+        z_strm.next_out = reinterpret_cast<Bytef*>(output->data);
+        z_strm.avail_out = output->size;
+
+        if (z_strm.avail_out > 0) {
+            // We only support non-streaming use case  for block decompressor
+            ret = inflate(&z_strm, Z_FINISH);
+            if (ret != Z_OK && ret != Z_STREAM_END) {
+                (void)inflateEnd(&z_strm);
+                return Status::InternalError("Fail to do ZLib stream compress, 
error={}, res={}",
+                                             zError(ret), ret);
+            }
+        }
+        (void)inflateEnd(&z_strm);
+
+        return Status::OK();
+    }
+
+    size_t max_compressed_len(size_t len) const override {
+        z_stream zstrm;
+        zstrm.zalloc = Z_NULL;
+        zstrm.zfree = Z_NULL;
+        zstrm.opaque = Z_NULL;
+        auto zres = deflateInit2(&zstrm, Z_DEFAULT_COMPRESSION, Z_DEFLATED, 
MAX_WBITS + GZIP_CODEC,
+                                 MEM_LEVEL, Z_DEFAULT_STRATEGY);
+        if (zres != Z_OK) {
+            // Fall back to zlib estimate logic for deflate, notice this may
+            // cause decompress error
+            LOG(WARNING) << "Fail to do ZLib stream compress, error=" << 
zError(zres)
+                         << ", res=" << zres;
+            return ZlibBlockCompression::max_compressed_len(len);
+        } else {
+            zres = deflateEnd(&zstrm);
+            if (zres != Z_OK) {
+                LOG(WARNING) << "Fail to do deflateEnd on ZLib stream, error=" 
<< zError(zres)
+                             << ", res=" << zres;
+            }
+            // Mark, maintainer of zlib, has stated that 12 needs to be added 
to
+            // result for gzip
+            // 
http://compgroups.net/comp.unix.programmer/gzip-compressing-an-in-memory-string-usi/54854
+            // To have a safe upper bound for "wrapper variations", we add 32 
to
+            // estimate
+            int upper_bound = deflateBound(&zstrm, len) + 32;
+            return upper_bound;
+        }
+    }
+
+private:
+    // Magic number for zlib, see https://zlib.net/manual.html for more 
details.
+    const static int GZIP_CODEC = 16; // gzip
+    // The memLevel parameter specifies how much memory should be allocated for
+    // the internal compression state.
+    const static int MEM_LEVEL = 8;
+};
+
 Status get_block_compression_codec(segment_v2::CompressionTypePB type,
                                    std::unique_ptr<BlockCompressionCodec>& 
codec) {
     BlockCompressionCodec* ptr = nullptr;
@@ -566,8 +639,11 @@ Status 
get_block_compression_codec(tparquet::CompressionCodec::type parquet_code
     case tparquet::CompressionCodec::ZSTD:
         ptr = new ZstdBlockCompression();
         break;
+    case tparquet::CompressionCodec::GZIP:
+        ptr = new GzipBlockCompression();
+        break;
     default:
-        return Status::NotFound("unknown compression type({})", parquet_codec);
+        return Status::NotFound("unknown compression type({})", 
tparquet::to_string(parquet_codec));
     }
 
     Status st = ptr->init();


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org
For additional commands, e-mail: commits-h...@doris.apache.org

Reply via email to