This is an automated email from the ASF dual-hosted git repository.

morningman pushed a commit to branch branch-2.1
in repository https://gitbox.apache.org/repos/asf/doris.git


The following commit(s) were added to refs/heads/branch-2.1 by this push:
     new e0fac66223e [branch-2.1](fix) fix snappy decompressor bug (#40862)
e0fac66223e is described below

commit e0fac66223eb9e01dfebd7fef76ec72beb8568f9
Author: Socrates <suxiaogang...@icloud.com>
AuthorDate: Fri Sep 20 11:57:14 2024 +0800

    [branch-2.1](fix) fix snappy decompressor bug (#40862)
    
    ## Proposed changes
    Hadoop snappycodec source :
    
    
https://github.com/apache/hadoop/blob/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-nativetask/src/main/native/src/codec/SnappyCodec.cc
    Example:
    OriginData(The original data will be divided into several large data
    block.) :
         large data block1 | large data block2 | large data block3 | ....
    The large data block will be divided into several small data block.
    Suppose a large data block is divided into three small blocks:
    large data block1: | small block1 | small block2 | small block3 |
    CompressData: <A [B1 compress(small block1) ] [B2 compress(small block1)
    ] [B3 compress(small block1)]>
    
    A : original length of the current block of large data block.
    sizeof(A) = 4 bytes.
    A = length(small block1) + length(small block2) + length(small block3)
    Bx : length of  small data block bx.
    sizeof(Bx) = 4 bytes.
    Bx = length(compress(small blockx))
---
 be/src/exec/decompressor.cpp                       | 162 ++++++++++++---------
 .../tvf/compress/test_tvf.csv.snappy               | Bin 107203 -> 100481 bytes
 .../tvf/test_local_tvf_compression.out             |  44 +++---
 3 files changed, 114 insertions(+), 92 deletions(-)

diff --git a/be/src/exec/decompressor.cpp b/be/src/exec/decompressor.cpp
index d8d02c9cf9e..a830eb3cbe7 100644
--- a/be/src/exec/decompressor.cpp
+++ b/be/src/exec/decompressor.cpp
@@ -468,7 +468,7 @@ Status Lz4BlockDecompressor::decompress(uint8_t* input, 
size_t input_len, size_t
         }
 
         std::size_t decompressed_large_block_len = 0;
-        do {
+        while (remaining_decompressed_large_block_len > 0) {
             // Check that input length should not be negative.
             if (input_len < sizeof(uint32_t)) {
                 *more_input_bytes = sizeof(uint32_t) - input_len;
@@ -505,8 +505,7 @@ Status Lz4BlockDecompressor::decompress(uint8_t* input, 
size_t input_len, size_t
             output_ptr += decompressed_small_block_len;
             remaining_decompressed_large_block_len -= 
decompressed_small_block_len;
             decompressed_large_block_len += decompressed_small_block_len;
-
-        } while (remaining_decompressed_large_block_len > 0);
+        };
 
         if (*more_input_bytes != 0) {
             // Need more input buffer
@@ -535,90 +534,113 @@ Status SnappyBlockDecompressor::init() {
     return Status::OK();
 }
 
+// Hadoop snappycodec source :
+// 
https://github.com/apache/hadoop/blob/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-nativetask/src/main/native/src/codec/SnappyCodec.cc
+// Example:
+// OriginData(The original data will be divided into several large data 
block.) :
+//      large data block1 | large data block2 | large data block3 | ....
+// The large data block will be divided into several small data block.
+// Suppose a large data block is divided into three small blocks:
+// large data block1:            | small block1 | small block2 | small block3 |
+// CompressData:   <A [B1 compress(small block1) ] [B2 compress(small block1) 
] [B3 compress(small block1)]>
+//
+// A : original length of the current block of large data block.
+// sizeof(A) = 4 bytes.
+// A = length(small block1) + length(small block2) + length(small block3)
+// Bx : length of  small data block bx.
+// sizeof(Bx) = 4 bytes.
+// Bx = length(compress(small blockx))
 Status SnappyBlockDecompressor::decompress(uint8_t* input, size_t input_len,
                                            size_t* input_bytes_read, uint8_t* 
output,
                                            size_t output_max_len, size_t* 
decompressed_len,
                                            bool* stream_end, size_t* 
more_input_bytes,
                                            size_t* more_output_bytes) {
-    uint8_t* src = input;
-    size_t remaining_input_size = input_len;
-    int64_t uncompressed_total_len = 0;
-    *input_bytes_read = 0;
+    auto* input_ptr = input;
+    auto* output_ptr = output;
 
-    // The hadoop snappy codec is as:
-    // <4 byte big endian uncompressed size>
-    // <4 byte big endian compressed size>
-    // <snappy compressed block>
-    // ....
-    // <4 byte big endian uncompressed size>
-    // <4 byte big endian compressed size>
-    // <snappy compressed block>
-    //
-    // See:
-    // 
https://github.com/apache/hadoop/blob/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-nativetask/src/main/native/src/codec/SnappyCodec.cc
-    while (remaining_input_size > 0) {
-        if (remaining_input_size < 4) {
-            *more_input_bytes = 4 - remaining_input_size;
-            break;
+    while (input_len > 0) {
+        //if faild ,  fall back to large block begin
+        auto* large_block_input_ptr = input_ptr;
+        auto* large_block_output_ptr = output_ptr;
+
+        if (input_len < sizeof(uint32_t)) {
+            return Status::InvalidArgument(strings::Substitute(
+                    "fail to do hadoop-snappy decompress, input_len=$0", 
input_len));
         }
-        // Read uncompressed size
-        uint32_t uncompressed_block_len = Decompressor::_read_int32(src);
-        int64_t remaining_output_len = output_max_len - uncompressed_total_len;
-        if (remaining_output_len < uncompressed_block_len) {
+
+        uint32_t remaining_decompressed_large_block_len = 
BigEndian::Load32(input_ptr);
+
+        input_ptr += sizeof(uint32_t);
+        input_len -= sizeof(uint32_t);
+
+        std::size_t remaining_output_len = output_max_len - *decompressed_len;
+
+        if (remaining_output_len < remaining_decompressed_large_block_len) {
             // Need more output buffer
-            *more_output_bytes = uncompressed_block_len - remaining_output_len;
-            break;
-        }
+            *more_output_bytes = remaining_decompressed_large_block_len - 
remaining_output_len;
+            input_ptr = large_block_input_ptr;
+            output_ptr = large_block_output_ptr;
 
-        if (uncompressed_block_len == 0) {
-            remaining_input_size -= sizeof(uint32_t);
             break;
         }
 
-        if (remaining_input_size <= 2 * sizeof(uint32_t)) {
-            // The remaining input size should be larger then <uncompressed 
size><compressed size><compressed data>
-            // +1 means we need at least 1 bytes of compressed data.
-            *more_input_bytes = 2 * sizeof(uint32_t) + 1 - 
remaining_input_size;
-            break;
-        }
+        std::size_t decompressed_large_block_len = 0;
+        while (remaining_decompressed_large_block_len > 0) {
+            // Check that input length should not be negative.
+            if (input_len < sizeof(uint32_t)) {
+                *more_input_bytes = sizeof(uint32_t) - input_len;
+                break;
+            }
 
-        // Read compressed size
-        size_t tmp_remaining_size = remaining_input_size - 2 * 
sizeof(uint32_t);
-        size_t compressed_len = _read_int32(src + sizeof(uint32_t));
-        if (compressed_len > tmp_remaining_size) {
-            // Need more input data
-            *more_input_bytes = compressed_len - tmp_remaining_size;
-            break;
-        }
+            // Read the length of the next snappy compressed block.
+            size_t compressed_small_block_len = BigEndian::Load32(input_ptr);
 
-        src += 2 * sizeof(uint32_t);
-        remaining_input_size -= 2 * sizeof(uint32_t);
-
-        // ATTN: the uncompressed len from GetUncompressedLength() is same as
-        // uncompressed_block_len, so I think it is unnecessary to get it 
again.
-        // Get uncompressed len from snappy
-        // size_t uncompressed_len;
-        // if (!snappy::GetUncompressedLength(reinterpret_cast<const 
char*>(src),
-        //             compressed_len, &uncompressed_len)) {
-        //     return Status::InternalError("snappy block decompress failed to 
get uncompressed len");
-        // }
-
-        // Decompress
-        if (!snappy::RawUncompress(reinterpret_cast<const char*>(src), 
compressed_len,
-                                   reinterpret_cast<char*>(output))) {
-            return Status::InternalError(
-                    "snappy block decompress failed. uncompressed_len: {}, 
compressed_len: {}",
-                    uncompressed_block_len, compressed_len);
+            input_ptr += sizeof(uint32_t);
+            input_len -= sizeof(uint32_t);
+
+            if (compressed_small_block_len == 0) {
+                continue;
+            }
+
+            if (compressed_small_block_len > input_len) {
+                // Need more input buffer
+                *more_input_bytes = compressed_small_block_len - input_len;
+                break;
+            }
+
+            // Decompress this block.
+            size_t decompressed_small_block_len;
+            if (!snappy::GetUncompressedLength(reinterpret_cast<const 
char*>(input_ptr),
+                                               compressed_small_block_len,
+                                               &decompressed_small_block_len)) 
{
+                return Status::InternalError(
+                        "snappy block decompress failed to get uncompressed 
len");
+            }
+            if (!snappy::RawUncompress(reinterpret_cast<const 
char*>(input_ptr),
+                                       compressed_small_block_len,
+                                       reinterpret_cast<char*>(output_ptr))) {
+                return Status::InternalError(
+                        "snappy block decompress failed. uncompressed_len: {}, 
compressed_len: {}",
+                        decompressed_small_block_len, 
compressed_small_block_len);
+            }
+            input_ptr += compressed_small_block_len;
+            input_len -= compressed_small_block_len;
+
+            output_ptr += decompressed_small_block_len;
+            remaining_decompressed_large_block_len -= 
decompressed_small_block_len;
+            decompressed_large_block_len += decompressed_small_block_len;
+        };
+
+        if (*more_input_bytes != 0) {
+            // Need more input buffer
+            input_ptr = large_block_input_ptr;
+            output_ptr = large_block_output_ptr;
+            break;
         }
 
-        output += uncompressed_block_len;
-        src += compressed_len;
-        remaining_input_size -= compressed_len;
-        uncompressed_total_len += uncompressed_block_len;
+        *decompressed_len += decompressed_large_block_len;
     }
-
-    *input_bytes_read += (input_len - remaining_input_size);
-    *decompressed_len = uncompressed_total_len;
+    *input_bytes_read += (input_ptr - input);
     // If no more input and output need, means this is the end of a compressed 
block
     *stream_end = (*more_input_bytes == 0 && *more_output_bytes == 0);
 
diff --git 
a/regression-test/data/external_table_p0/tvf/compress/test_tvf.csv.snappy 
b/regression-test/data/external_table_p0/tvf/compress/test_tvf.csv.snappy
index 9ac2b7ae299..054613c5146 100644
Binary files 
a/regression-test/data/external_table_p0/tvf/compress/test_tvf.csv.snappy and 
b/regression-test/data/external_table_p0/tvf/compress/test_tvf.csv.snappy differ
diff --git 
a/regression-test/data/external_table_p0/tvf/test_local_tvf_compression.out 
b/regression-test/data/external_table_p0/tvf/test_local_tvf_compression.out
index 8120427ea6c..5f1a4f5d463 100644
--- a/regression-test/data/external_table_p0/tvf/test_local_tvf_compression.out
+++ b/regression-test/data/external_table_p0/tvf/test_local_tvf_compression.out
@@ -123,28 +123,28 @@
 2023-09-18     7
 
 -- !snappy_1 --
-1      694832  buHDwfGeNHfpRFdNaogneddi        2024-02-09      
4.899588807225554
-10     218729  goZsLvvWFOIjlzSAitC     2024-06-10      4.137732740231178
-100    813423  zICskqgcdPc     2024-03-23      8.486529018746493
-1000   612650  RzOXeYpKOmuJOogUyeIEDNDmvq      2023-12-05      
7.8741752707933435
-1001   29486   WoUAFJFuJNnwyqMnoDhX    2024-03-11      9.758244908785949
-1002   445363  OdTEeeWtxfcRwx  2024-08-01      0.3934945460194128
-1003   707035  JAYnKxusVpGzYueACf      2023-11-14      5.377110182643222
-1004   227858  JIFyjKzmbjkt    2024-03-24      5.748037621519263
-1005   539305  PlruLkSUSXZgaHafFriklrhCi       2023-11-08      
4.122635188836725
-1006   145518  KCwqEcSCGuXrHerwn       2024-06-22      8.482290064407216
-1007   939028  KzXhEMelsKVLbDMsEKh     2024-01-01      8.144449761594585
-1008   913569  CHlqPKqkIdqwBCBUHreXbFAkCt      2024-05-25      
1.5683842369495904
-1009   757881  AjcSyYMIMzS     2024-05-04      7.5674012939461255
-101    326164  QWLnalYNmYDt    2024-01-07      3.8159876011523854
-1010   427079  AlRUfmxfAuoLnPqUTvQVMtrS        2024-06-04      
3.8087069699523313
-1011   252076  gHmFDhtytYzWETIxdpkpMUpnLd      2023-09-17      
6.773606843056635
-1012   819615  rFfRHquexplDJvSeUK      2023-11-02      3.220639250504097
-1013   413456  uvNPelHXYjJKiOkwdNbmUkGzxiiqLo  2024-03-15      
8.305048700108081
-1014   308042  vnzcsvHxnWFhvLwJkAtUqe  2024-06-15      1.5668867233009998
-1015   603837  VBEsRVGyhRNWQeKzDaBnJHmFDnXAOU  2024-08-17      
3.8287482122289007
-1016   912679  eEjldPhxojSjTnE 2024-01-09      1.3717891874157961
-1017   630392  TcczYHXbwaCYzFSfXJlhsFjN        2023-10-07      
4.733337480058437
+1      694832  buHDwfGeNHfpRFdNaogneddi        2024-02-09      4.8995886
+10     218729  goZsLvvWFOIjlzSAitC     2024-06-10      4.1377325
+100    813423  zICskqgcdPc     2024-03-23      8.4865294
+1000   612650  RzOXeYpKOmuJOogUyeIEDNDmvq      2023-12-05      7.8741751
+1001   29486   WoUAFJFuJNnwyqMnoDhX    2024-03-11      9.7582445
+1002   445363  OdTEeeWtxfcRwx  2024-08-01      0.39349455
+1003   707035  JAYnKxusVpGzYueACf      2023-11-14      5.37711
+1004   227858  JIFyjKzmbjkt    2024-03-24      5.7480378
+1005   539305  PlruLkSUSXZgaHafFriklrhCi       2023-11-08      4.1226354
+1006   145518  KCwqEcSCGuXrHerwn       2024-06-22      8.48229
+1007   939028  KzXhEMelsKVLbDMsEKh     2024-01-01      8.14445
+1008   913569  CHlqPKqkIdqwBCBUHreXbFAkCt      2024-05-25      1.5683843
+1009   757881  AjcSyYMIMzS     2024-05-04      7.5674014
+101    326164  QWLnalYNmYDt    2024-01-07      3.8159876
+1010   427079  AlRUfmxfAuoLnPqUTvQVMtrS        2024-06-04      3.808707
+1011   252076  gHmFDhtytYzWETIxdpkpMUpnLd      2023-09-17      6.7736068
+1012   819615  rFfRHquexplDJvSeUK      2023-11-02      3.2206392
+1013   413456  uvNPelHXYjJKiOkwdNbmUkGzxiiqLo  2024-03-15      8.3050489
+1014   308042  vnzcsvHxnWFhvLwJkAtUqe  2024-06-15      1.5668868
+1015   603837  VBEsRVGyhRNWQeKzDaBnJHmFDnXAOU  2024-08-17      3.8287482
+1016   912679  eEjldPhxojSjTnE 2024-01-09      1.3717892
+1017   630392  TcczYHXbwaCYzFSfXJlhsFjN        2023-10-07      4.7333374
 
 -- !snappy_2 --
 


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org
For additional commands, e-mail: commits-h...@doris.apache.org

Reply via email to