This is an automated email from the ASF dual-hosted git repository. laszlog pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/impala.git
commit 6ddd69c605d4c594e33fdd39a2ca888538b4b8d7 Author: Zinway Liu <[email protected]> AuthorDate: Wed Dec 27 14:32:17 2023 +0800 IMPALA-12665: Adjust complete_micro_batch_ length to new scratch_batch_->capacity after ScratchTupleBatch::Reset **IMPALA-12665 Description:** The issue occurs when scanning Parquet tables with a row size > 4096 bytes and a row batch size > 1024. A heap-buffer-overflow was detected by AddressSanitizer, indicating a write operation beyond the allocated buffer space. **Root Cause Analysis:** The error log by AddressSanitizer points to a heap-buffer-overflow, where memory is accessed beyond the allocated region. This occurs in the `HdfsParquetScanner` and `ScratchTupleBatch` classes when handling large rows > 4096 bytes. **Fault Reproduction:** The issue can be reproduced by creating a Parquet table with many columns, inserting data using Hive, then querying with Impala. Bash and Hive client scripts in IMPALA-12665 create a table and populate it, triggering the bug. **Technical Analysis:** `ScratchTupleBatch::Reset` recalculates `capacity` based on tuple size and fixed memory limits. When row size > 4096 bytes, `capacity` is set < 1024. `HdfsParquetScanner` incorrectly assumes `complete_micro_batch_` length of 1024, leading to overflow. **Proposed Solution:** Ensure `complete_micro_batch_` length is updated after `ScratchTupleBatch::Reset`. This prevents accessing memory outside allocated buffer, avoiding heap-buffer-overflow. Change-Id: I966ff10ba734ed8b1b61325486de0dfcc7b58e4d Reviewed-on: http://gerrit.cloudera.org:8080/20834 Reviewed-by: Impala Public Jenkins <[email protected]> Tested-by: Impala Public Jenkins <[email protected]> --- be/src/exec/parquet/hdfs-parquet-scanner.cc | 7 ++++++ be/src/exec/scratch-tuple-batch.h | 8 +++++++ tests/query_test/test_scanners.py | 36 +++++++++++++++++++++++++++++ 3 files changed, 51 insertions(+) diff --git a/be/src/exec/parquet/hdfs-parquet-scanner.cc b/be/src/exec/parquet/hdfs-parquet-scanner.cc index cc7c92a7b..8ff1cc312 100644 --- a/be/src/exec/parquet/hdfs-parquet-scanner.cc +++ b/be/src/exec/parquet/hdfs-parquet-scanner.cc @@ -2362,6 +2362,10 @@ Status HdfsParquetScanner::AssembleRows(RowBatch* row_batch, bool* skip_row_grou // Start a new scratch batch. RETURN_IF_ERROR(scratch_batch_->Reset(state_)); InitTupleBuffer(template_tuple_, scratch_batch_->tuple_mem, scratch_batch_->capacity); + // Adjust complete_micro_batch_ length to new scratch_batch_->capacity after + // ScratchTupleBatch::Reset + complete_micro_batch_.AdjustLength(scratch_batch_->capacity); + // Late Materialization // 1. Filter rows only materializing the columns in 'filter_readers_' // 2. Transfer the surviving rows @@ -2499,6 +2503,9 @@ Status HdfsParquetScanner::FillScratchMicroBatches( col_reader->schema_element().name, filename())); } } + // Ensure that the length of the micro_batch is less than + // or equal to the capacity of scratch_batch_. + DCHECK_LE(micro_batches[r].length, scratch_batch_->capacity); uint8_t* next_tuple_mem = scratch_batch_->tuple_mem + (scratch_batch_->tuple_byte_size * micro_batches[r].start); if (col_reader->max_rep_level() > 0) { diff --git a/be/src/exec/scratch-tuple-batch.h b/be/src/exec/scratch-tuple-batch.h index 6513c5c70..ca14c255f 100644 --- a/be/src/exec/scratch-tuple-batch.h +++ b/be/src/exec/scratch-tuple-batch.h @@ -30,6 +30,14 @@ struct ScratchMicroBatch { int start; int end; int length; + + // Adjusts the micro batch length to new capacity if needed. + void AdjustLength(int new_capacity) { + if (length > new_capacity) { + length = new_capacity; + end = length - 1; + } + } }; /// Helper struct that holds a batch of tuples allocated from a mem pool, as well diff --git a/tests/query_test/test_scanners.py b/tests/query_test/test_scanners.py index 6a5e2b8d6..88e9abf83 100644 --- a/tests/query_test/test_scanners.py +++ b/tests/query_test/test_scanners.py @@ -1268,6 +1268,42 @@ class TestParquet(ImpalaTestSuite): self.run_test_case( "QueryTest/parquet-decimal-precision-and-scale-altering", vector, unique_database) + def test_row_size_gt_4096_queries(self, unique_database): + table_format = 'parquet' + table_name = "{0}.{1}_{2}".format( + unique_database, "t_row_size_gt_4096", table_format) + + # create table + field_string = ', '.join('field{} STRING'.format(i) for i in range(1, 601)) + create_sql = "CREATE TABLE {} (id INT, {}) STORED AS {}".format( + table_name, field_string, table_format) + self.client.execute(create_sql) + + # insert data + id_generation_sql = """ + WITH ten AS ( + SELECT 0 AS n + UNION ALL SELECT 1 UNION ALL SELECT 2 UNION ALL SELECT 3 + UNION ALL SELECT 4 UNION ALL SELECT 5 UNION ALL SELECT 6 + UNION ALL SELECT 7 UNION ALL SELECT 8 UNION ALL SELECT 9 + ) + SELECT + row_number() OVER (ORDER BY a.n) AS id + FROM + ten a, ten b, ten c, ten d + LIMIT + 2000 + """ + field_string = ', '.join(['CAST(RAND() AS STRING) AS field{}'.format(i) + for i in range(1, 601)]) + insert_sql = "INSERT INTO {} SELECT CAST(s.id AS INT), {} FROM ({}) s;".format( + table_name, field_string, id_generation_sql) + self.execute_query_expect_success(self.client, insert_sql) + + # do a query + query_sql = "SELECT * FROM {} where field1 = '123'".format(table_name) + self.execute_query_expect_success(self.client, query_sql) + # We use various scan range lengths to exercise corner cases in the HDFS scanner more # thoroughly. In particular, it will exercise:
