This is an automated email from the ASF dual-hosted git repository. morningman pushed a commit to branch branch-2.1 in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/branch-2.1 by this push: new e0fac66223e [branch-2.1](fix) fix snappy decompressor bug (#40862) e0fac66223e is described below commit e0fac66223eb9e01dfebd7fef76ec72beb8568f9 Author: Socrates <suxiaogang...@icloud.com> AuthorDate: Fri Sep 20 11:57:14 2024 +0800 [branch-2.1](fix) fix snappy decompressor bug (#40862) ## Proposed changes Hadoop snappycodec source : https://github.com/apache/hadoop/blob/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-nativetask/src/main/native/src/codec/SnappyCodec.cc Example: OriginData(The original data will be divided into several large data block.) : large data block1 | large data block2 | large data block3 | .... The large data block will be divided into several small data block. Suppose a large data block is divided into three small blocks: large data block1: | small block1 | small block2 | small block3 | CompressData: <A [B1 compress(small block1) ] [B2 compress(small block1) ] [B3 compress(small block1)]> A : original length of the current block of large data block. sizeof(A) = 4 bytes. A = length(small block1) + length(small block2) + length(small block3) Bx : length of small data block bx. sizeof(Bx) = 4 bytes. Bx = length(compress(small blockx)) --- be/src/exec/decompressor.cpp | 162 ++++++++++++--------- .../tvf/compress/test_tvf.csv.snappy | Bin 107203 -> 100481 bytes .../tvf/test_local_tvf_compression.out | 44 +++--- 3 files changed, 114 insertions(+), 92 deletions(-) diff --git a/be/src/exec/decompressor.cpp b/be/src/exec/decompressor.cpp index d8d02c9cf9e..a830eb3cbe7 100644 --- a/be/src/exec/decompressor.cpp +++ b/be/src/exec/decompressor.cpp @@ -468,7 +468,7 @@ Status Lz4BlockDecompressor::decompress(uint8_t* input, size_t input_len, size_t } std::size_t decompressed_large_block_len = 0; - do { + while (remaining_decompressed_large_block_len > 0) { // Check that input length should not be negative. if (input_len < sizeof(uint32_t)) { *more_input_bytes = sizeof(uint32_t) - input_len; @@ -505,8 +505,7 @@ Status Lz4BlockDecompressor::decompress(uint8_t* input, size_t input_len, size_t output_ptr += decompressed_small_block_len; remaining_decompressed_large_block_len -= decompressed_small_block_len; decompressed_large_block_len += decompressed_small_block_len; - - } while (remaining_decompressed_large_block_len > 0); + }; if (*more_input_bytes != 0) { // Need more input buffer @@ -535,90 +534,113 @@ Status SnappyBlockDecompressor::init() { return Status::OK(); } +// Hadoop snappycodec source : +// https://github.com/apache/hadoop/blob/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-nativetask/src/main/native/src/codec/SnappyCodec.cc +// Example: +// OriginData(The original data will be divided into several large data block.) : +// large data block1 | large data block2 | large data block3 | .... +// The large data block will be divided into several small data block. +// Suppose a large data block is divided into three small blocks: +// large data block1: | small block1 | small block2 | small block3 | +// CompressData: <A [B1 compress(small block1) ] [B2 compress(small block1) ] [B3 compress(small block1)]> +// +// A : original length of the current block of large data block. +// sizeof(A) = 4 bytes. +// A = length(small block1) + length(small block2) + length(small block3) +// Bx : length of small data block bx. +// sizeof(Bx) = 4 bytes. +// Bx = length(compress(small blockx)) Status SnappyBlockDecompressor::decompress(uint8_t* input, size_t input_len, size_t* input_bytes_read, uint8_t* output, size_t output_max_len, size_t* decompressed_len, bool* stream_end, size_t* more_input_bytes, size_t* more_output_bytes) { - uint8_t* src = input; - size_t remaining_input_size = input_len; - int64_t uncompressed_total_len = 0; - *input_bytes_read = 0; + auto* input_ptr = input; + auto* output_ptr = output; - // The hadoop snappy codec is as: - // <4 byte big endian uncompressed size> - // <4 byte big endian compressed size> - // <snappy compressed block> - // .... - // <4 byte big endian uncompressed size> - // <4 byte big endian compressed size> - // <snappy compressed block> - // - // See: - // https://github.com/apache/hadoop/blob/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-nativetask/src/main/native/src/codec/SnappyCodec.cc - while (remaining_input_size > 0) { - if (remaining_input_size < 4) { - *more_input_bytes = 4 - remaining_input_size; - break; + while (input_len > 0) { + //if faild , fall back to large block begin + auto* large_block_input_ptr = input_ptr; + auto* large_block_output_ptr = output_ptr; + + if (input_len < sizeof(uint32_t)) { + return Status::InvalidArgument(strings::Substitute( + "fail to do hadoop-snappy decompress, input_len=$0", input_len)); } - // Read uncompressed size - uint32_t uncompressed_block_len = Decompressor::_read_int32(src); - int64_t remaining_output_len = output_max_len - uncompressed_total_len; - if (remaining_output_len < uncompressed_block_len) { + + uint32_t remaining_decompressed_large_block_len = BigEndian::Load32(input_ptr); + + input_ptr += sizeof(uint32_t); + input_len -= sizeof(uint32_t); + + std::size_t remaining_output_len = output_max_len - *decompressed_len; + + if (remaining_output_len < remaining_decompressed_large_block_len) { // Need more output buffer - *more_output_bytes = uncompressed_block_len - remaining_output_len; - break; - } + *more_output_bytes = remaining_decompressed_large_block_len - remaining_output_len; + input_ptr = large_block_input_ptr; + output_ptr = large_block_output_ptr; - if (uncompressed_block_len == 0) { - remaining_input_size -= sizeof(uint32_t); break; } - if (remaining_input_size <= 2 * sizeof(uint32_t)) { - // The remaining input size should be larger then <uncompressed size><compressed size><compressed data> - // +1 means we need at least 1 bytes of compressed data. - *more_input_bytes = 2 * sizeof(uint32_t) + 1 - remaining_input_size; - break; - } + std::size_t decompressed_large_block_len = 0; + while (remaining_decompressed_large_block_len > 0) { + // Check that input length should not be negative. + if (input_len < sizeof(uint32_t)) { + *more_input_bytes = sizeof(uint32_t) - input_len; + break; + } - // Read compressed size - size_t tmp_remaining_size = remaining_input_size - 2 * sizeof(uint32_t); - size_t compressed_len = _read_int32(src + sizeof(uint32_t)); - if (compressed_len > tmp_remaining_size) { - // Need more input data - *more_input_bytes = compressed_len - tmp_remaining_size; - break; - } + // Read the length of the next snappy compressed block. + size_t compressed_small_block_len = BigEndian::Load32(input_ptr); - src += 2 * sizeof(uint32_t); - remaining_input_size -= 2 * sizeof(uint32_t); - - // ATTN: the uncompressed len from GetUncompressedLength() is same as - // uncompressed_block_len, so I think it is unnecessary to get it again. - // Get uncompressed len from snappy - // size_t uncompressed_len; - // if (!snappy::GetUncompressedLength(reinterpret_cast<const char*>(src), - // compressed_len, &uncompressed_len)) { - // return Status::InternalError("snappy block decompress failed to get uncompressed len"); - // } - - // Decompress - if (!snappy::RawUncompress(reinterpret_cast<const char*>(src), compressed_len, - reinterpret_cast<char*>(output))) { - return Status::InternalError( - "snappy block decompress failed. uncompressed_len: {}, compressed_len: {}", - uncompressed_block_len, compressed_len); + input_ptr += sizeof(uint32_t); + input_len -= sizeof(uint32_t); + + if (compressed_small_block_len == 0) { + continue; + } + + if (compressed_small_block_len > input_len) { + // Need more input buffer + *more_input_bytes = compressed_small_block_len - input_len; + break; + } + + // Decompress this block. + size_t decompressed_small_block_len; + if (!snappy::GetUncompressedLength(reinterpret_cast<const char*>(input_ptr), + compressed_small_block_len, + &decompressed_small_block_len)) { + return Status::InternalError( + "snappy block decompress failed to get uncompressed len"); + } + if (!snappy::RawUncompress(reinterpret_cast<const char*>(input_ptr), + compressed_small_block_len, + reinterpret_cast<char*>(output_ptr))) { + return Status::InternalError( + "snappy block decompress failed. uncompressed_len: {}, compressed_len: {}", + decompressed_small_block_len, compressed_small_block_len); + } + input_ptr += compressed_small_block_len; + input_len -= compressed_small_block_len; + + output_ptr += decompressed_small_block_len; + remaining_decompressed_large_block_len -= decompressed_small_block_len; + decompressed_large_block_len += decompressed_small_block_len; + }; + + if (*more_input_bytes != 0) { + // Need more input buffer + input_ptr = large_block_input_ptr; + output_ptr = large_block_output_ptr; + break; } - output += uncompressed_block_len; - src += compressed_len; - remaining_input_size -= compressed_len; - uncompressed_total_len += uncompressed_block_len; + *decompressed_len += decompressed_large_block_len; } - - *input_bytes_read += (input_len - remaining_input_size); - *decompressed_len = uncompressed_total_len; + *input_bytes_read += (input_ptr - input); // If no more input and output need, means this is the end of a compressed block *stream_end = (*more_input_bytes == 0 && *more_output_bytes == 0); diff --git a/regression-test/data/external_table_p0/tvf/compress/test_tvf.csv.snappy b/regression-test/data/external_table_p0/tvf/compress/test_tvf.csv.snappy index 9ac2b7ae299..054613c5146 100644 Binary files a/regression-test/data/external_table_p0/tvf/compress/test_tvf.csv.snappy and b/regression-test/data/external_table_p0/tvf/compress/test_tvf.csv.snappy differ diff --git a/regression-test/data/external_table_p0/tvf/test_local_tvf_compression.out b/regression-test/data/external_table_p0/tvf/test_local_tvf_compression.out index 8120427ea6c..5f1a4f5d463 100644 --- a/regression-test/data/external_table_p0/tvf/test_local_tvf_compression.out +++ b/regression-test/data/external_table_p0/tvf/test_local_tvf_compression.out @@ -123,28 +123,28 @@ 2023-09-18 7 -- !snappy_1 -- -1 694832 buHDwfGeNHfpRFdNaogneddi 2024-02-09 4.899588807225554 -10 218729 goZsLvvWFOIjlzSAitC 2024-06-10 4.137732740231178 -100 813423 zICskqgcdPc 2024-03-23 8.486529018746493 -1000 612650 RzOXeYpKOmuJOogUyeIEDNDmvq 2023-12-05 7.8741752707933435 -1001 29486 WoUAFJFuJNnwyqMnoDhX 2024-03-11 9.758244908785949 -1002 445363 OdTEeeWtxfcRwx 2024-08-01 0.3934945460194128 -1003 707035 JAYnKxusVpGzYueACf 2023-11-14 5.377110182643222 -1004 227858 JIFyjKzmbjkt 2024-03-24 5.748037621519263 -1005 539305 PlruLkSUSXZgaHafFriklrhCi 2023-11-08 4.122635188836725 -1006 145518 KCwqEcSCGuXrHerwn 2024-06-22 8.482290064407216 -1007 939028 KzXhEMelsKVLbDMsEKh 2024-01-01 8.144449761594585 -1008 913569 CHlqPKqkIdqwBCBUHreXbFAkCt 2024-05-25 1.5683842369495904 -1009 757881 AjcSyYMIMzS 2024-05-04 7.5674012939461255 -101 326164 QWLnalYNmYDt 2024-01-07 3.8159876011523854 -1010 427079 AlRUfmxfAuoLnPqUTvQVMtrS 2024-06-04 3.8087069699523313 -1011 252076 gHmFDhtytYzWETIxdpkpMUpnLd 2023-09-17 6.773606843056635 -1012 819615 rFfRHquexplDJvSeUK 2023-11-02 3.220639250504097 -1013 413456 uvNPelHXYjJKiOkwdNbmUkGzxiiqLo 2024-03-15 8.305048700108081 -1014 308042 vnzcsvHxnWFhvLwJkAtUqe 2024-06-15 1.5668867233009998 -1015 603837 VBEsRVGyhRNWQeKzDaBnJHmFDnXAOU 2024-08-17 3.8287482122289007 -1016 912679 eEjldPhxojSjTnE 2024-01-09 1.3717891874157961 -1017 630392 TcczYHXbwaCYzFSfXJlhsFjN 2023-10-07 4.733337480058437 +1 694832 buHDwfGeNHfpRFdNaogneddi 2024-02-09 4.8995886 +10 218729 goZsLvvWFOIjlzSAitC 2024-06-10 4.1377325 +100 813423 zICskqgcdPc 2024-03-23 8.4865294 +1000 612650 RzOXeYpKOmuJOogUyeIEDNDmvq 2023-12-05 7.8741751 +1001 29486 WoUAFJFuJNnwyqMnoDhX 2024-03-11 9.7582445 +1002 445363 OdTEeeWtxfcRwx 2024-08-01 0.39349455 +1003 707035 JAYnKxusVpGzYueACf 2023-11-14 5.37711 +1004 227858 JIFyjKzmbjkt 2024-03-24 5.7480378 +1005 539305 PlruLkSUSXZgaHafFriklrhCi 2023-11-08 4.1226354 +1006 145518 KCwqEcSCGuXrHerwn 2024-06-22 8.48229 +1007 939028 KzXhEMelsKVLbDMsEKh 2024-01-01 8.14445 +1008 913569 CHlqPKqkIdqwBCBUHreXbFAkCt 2024-05-25 1.5683843 +1009 757881 AjcSyYMIMzS 2024-05-04 7.5674014 +101 326164 QWLnalYNmYDt 2024-01-07 3.8159876 +1010 427079 AlRUfmxfAuoLnPqUTvQVMtrS 2024-06-04 3.808707 +1011 252076 gHmFDhtytYzWETIxdpkpMUpnLd 2023-09-17 6.7736068 +1012 819615 rFfRHquexplDJvSeUK 2023-11-02 3.2206392 +1013 413456 uvNPelHXYjJKiOkwdNbmUkGzxiiqLo 2024-03-15 8.3050489 +1014 308042 vnzcsvHxnWFhvLwJkAtUqe 2024-06-15 1.5668868 +1015 603837 VBEsRVGyhRNWQeKzDaBnJHmFDnXAOU 2024-08-17 3.8287482 +1016 912679 eEjldPhxojSjTnE 2024-01-09 1.3717892 +1017 630392 TcczYHXbwaCYzFSfXJlhsFjN 2023-10-07 4.7333374 -- !snappy_2 -- --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org For additional commands, e-mail: commits-h...@doris.apache.org