This is an automated email from the ASF dual-hosted git repository. michaelsmith pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/impala.git
commit c56cd7b214dae8e4d31e093c71421f524452f53c Author: Zoltan Borok-Nagy <[email protected]> AuthorDate: Wed Dec 7 13:40:13 2022 +0100 IMPALA-11780: Wrong FILE__POSITION values for multi row group Parquet files when page filtering is used Impala generated wrong values for the FILE__POSITION column when the Parquet file contained multiple row groups and page filtering was used as well. We are using the value of 'current_row_' in the Parquet column readers to populate the file position slot. The problem is that 'current_row_' denotes the index of the row within the row group and not within the file. We cannot change 'current_row_' as page filtering depends on its value, as the page index also uses the row group-based indexes of the rows, not the file indexes. In the meantime it turned out FILE__POSITION was also not set correctly in the Parquet late materialization code, as BaseScalarColumnReader::SkipRowsInternal() didn't update 'current_row_' in some code paths. The value of FILE__POSITION is critical for Iceberg V2 tables as position delete files store file positions of the deleted rows. Testing: * added e2e tests * the tests are now running w/o PARQUET_READ_STATISTICS to exercise more code paths Change-Id: I5ef37a1aa731eb54930d6689621cd6169fed6605 Reviewed-on: http://gerrit.cloudera.org:8080/19328 Reviewed-by: Csaba Ringhofer <[email protected]> Tested-by: Impala Public Jenkins <[email protected]> --- be/src/exec/parquet/parquet-column-readers.cc | 8 +-- be/src/exec/parquet/parquet-column-readers.h | 14 +++++- testdata/data/README | 13 ++++- .../customer_nested_multiblock_multipage.parquet | Bin 0 -> 807255 bytes .../virtual-column-file-position-parquet.test | 56 +++++++++++++++++++++ tests/query_test/test_scanners.py | 10 +++- 6 files changed, 93 insertions(+), 8 deletions(-) diff --git a/be/src/exec/parquet/parquet-column-readers.cc b/be/src/exec/parquet/parquet-column-readers.cc index 20cfe38b7..15bd9bfbe 100644 --- a/be/src/exec/parquet/parquet-column-readers.cc +++ b/be/src/exec/parquet/parquet-column-readers.cc @@ -903,7 +903,7 @@ bool ScalarColumnReader<bool, parquet::Type::BOOLEAN, true>::DecodeValues( template <typename InternalType, parquet::Type::type PARQUET_TYPE, bool MATERIALIZED> void ScalarColumnReader<InternalType, PARQUET_TYPE, MATERIALIZED>:: ReadFilePositionBatched(int16_t rep_level, int64_t* file_pos) { - *file_pos = current_row_; + *file_pos = FilePositionOfCurrentRow(); } template <typename InternalType, parquet::Type::type PARQUET_TYPE, bool MATERIALIZED> @@ -1068,7 +1068,8 @@ Status BaseScalarColumnReader::Reset(const HdfsFileDesc& file_desc, // See ColumnReader constructor. rep_level_ = max_rep_level() == 0 ? 0 : ParquetLevel::INVALID_LEVEL; pos_current_value_ = ParquetLevel::INVALID_POS; - current_row_ = row_group_first_row - 1; + row_group_first_row_ = row_group_first_row; + current_row_ = -1; vector<ScanRange::SubRange> sub_ranges; CreateSubRanges(&sub_ranges); @@ -1466,7 +1467,7 @@ int BaseScalarColumnReader::FillPositionsInCandidateRange(int rows_remaining, } ++val_count; if (file_pos_writer.IsValid()) { - *file_pos_writer.Advance() = current_row_; + *file_pos_writer.Advance() = FilePositionOfCurrentRow(); } else if (pos_writer.IsValid()) { if (rep_level <= max_rep_level() - 1) pos_current_value_ = 0; *pos_writer.Advance() = pos_current_value_++; @@ -1672,6 +1673,7 @@ bool BaseScalarColumnReader::SkipRowsInternal(int64_t num_rows, int64_t skip_row while (num_rows > current_page_values) { COUNTER_ADD(parent_->num_pages_skipped_by_late_materialization_counter_, 1); num_rows -= current_page_values; + current_row_ += current_page_values; if (!col_chunk_reader_.SkipPageData().ok() || !AdvanceNextPageHeader()) { return false; } diff --git a/be/src/exec/parquet/parquet-column-readers.h b/be/src/exec/parquet/parquet-column-readers.h index c5aa058bd..d461c94e7 100644 --- a/be/src/exec/parquet/parquet-column-readers.h +++ b/be/src/exec/parquet/parquet-column-readers.h @@ -225,6 +225,9 @@ class ParquetColumnReader { const SlotDescriptor* pos_slot_desc_ = nullptr; const SlotDescriptor* file_pos_slot_desc_ = nullptr; + /// Index within the file of the first row in the row group. + int64_t row_group_first_row_ = 0; + /// The next value to write into the position slot, if there is one. 64-bit int because /// the pos slot is always a BIGINT Set to ParquetLevel::INVALID_POS when this column /// reader does not have a current rep and def level (i.e. before the first NextLevels() @@ -376,6 +379,12 @@ class BaseScalarColumnReader : public ParquetColumnReader { template <bool ADVANCE_REP_LEVEL> bool NextLevels(); + /// Returns file position of current row ('current_row_' is the index of the row + /// within the row group). + int64_t FilePositionOfCurrentRow() const { + return row_group_first_row_ + current_row_; + } + protected: // Friend parent scanner so it can perform validation (e.g. ValidateEndOfRowGroup()) friend class HdfsParquetScanner; @@ -414,7 +423,8 @@ class BaseScalarColumnReader : public ParquetColumnReader { /// Metadata for the column for the current row group. const parquet::ColumnMetaData* metadata_ = nullptr; - /// Index of the current top-level row. It is updated together with the rep/def levels. + /// Index of the current top-level row within the row group. It is updated together + /// with the rep/def levels. /// When updated, and its value is N, it means that we already processed the Nth row /// completely, hence the initial value is '-1', because '0' would mean that we already /// processed the first (zeroeth) row. @@ -685,7 +695,7 @@ inline void ParquetColumnReader::ReadFilePositionNonBatched(int64_t* file_pos) { DCHECK_GE(def_level_, 0); DCHECK_GE(def_level_, def_level_of_immediate_repeated_ancestor()) << "Caller should have called NextLevels() until we are ready to read a value"; - *file_pos = LastProcessedRow() + 1; + *file_pos = row_group_first_row_ + LastProcessedRow() + 1; } // Change 'val_count' to zero to exercise IMPALA-5197. This verifies the error handling diff --git a/testdata/data/README b/testdata/data/README index 6ac0a363e..9772cc2de 100644 --- a/testdata/data/README +++ b/testdata/data/README @@ -633,6 +633,17 @@ In order to generate this file, execute the following instruments: WHERE c_current_cdemo_sk IS NOT NULL ORDER BY c_current_cdemo_sk LIMIT 2000; generated file will contains multi blocks, multi pages per block. +customer_nested_multiblock_multipage.parquet +Parquet file that contains multiple row groups multiple pages and store nested +data. +Used Hive (version 3.1.3000.7.2.16.0-233) to generate Parquet file: +1. SET parquet.block.size=8192; +2. SET parquet.page.row.count.limit=20; +3. CREATE TABLE customer_nested_multiblock_multipage + LIKE tpch_nested_parquet.customer STORED AS PARQUET; +4. INSERT INTO customer_nested_multiblock_multipage + SELECT * FROM tpch_nested_parquet.customer ORDER BY c_custkey LIMIT 300; + IMPALA-10361: Use field id to resolve columns for Iceberg tables We generated data by spark-shell, version is 2.4.x, and table data is in testdata/data/iceberg_test/hadoop_catalog/iceberg_resolution_test, this table @@ -912,4 +923,4 @@ Converted similarly to iceberg_v2_no_deletes create_table_like_parquet_test.parquet: Generated by Hive create table iceberg_create_table_like_parquet_test (col_int int, col_float float, col_double double, col_string string, col_struct struct<col_int:int, col_float:float>, col_array array<string>, col_map map<string,array<int>>) stored as parquet; -insert into iceberg_create_table_like_parquet_test values (0, 1.0, 2.0, "3", named_struct("col_int", 4, "col_float", cast(5.0 as float)), array("6","7","8"), map("A", array(11,12), "B", array(21,22))); \ No newline at end of file +insert into iceberg_create_table_like_parquet_test values (0, 1.0, 2.0, "3", named_struct("col_int", 4, "col_float", cast(5.0 as float)), array("6","7","8"), map("A", array(11,12), "B", array(21,22))); diff --git a/testdata/data/customer_nested_multiblock_multipage.parquet b/testdata/data/customer_nested_multiblock_multipage.parquet new file mode 100644 index 000000000..b4484c276 Binary files /dev/null and b/testdata/data/customer_nested_multiblock_multipage.parquet differ diff --git a/testdata/workloads/functional-query/queries/QueryTest/virtual-column-file-position-parquet.test b/testdata/workloads/functional-query/queries/QueryTest/virtual-column-file-position-parquet.test index 7ff2f9d19..bf54e6e5c 100644 --- a/testdata/workloads/functional-query/queries/QueryTest/virtual-column-file-position-parquet.test +++ b/testdata/workloads/functional-query/queries/QueryTest/virtual-column-file-position-parquet.test @@ -221,3 +221,59 @@ where id = l_orderkey and ---- TYPES BIGINT, BIGINT, INT, BIGINT ==== +---- QUERY +select file__position, * from customer_multiblock_page_index where c_customer_sk = 99331; +---- RESULTS +1119,99331,'AAAAAAAADAEIBAAA',22643,7071,15946,2452620,2452590,'Mr.','Joseph','Mueller','N',2,10,1970,'VANUATU','NULL','[email protected]','2452638' +---- TYPES +BIGINT, INT, STRING, INT, INT, INT, INT, INT, STRING, STRING, STRING, STRING, INT, INT, INT, STRING, STRING, STRING, STRING +==== +---- QUERY +select file__position, * from customer_multiblock_page_index where c_customer_sk > 10000 and c_customer_sk < 10500; +---- RESULTS +303,10380,'AAAAAAAAMIICAAAA',6555,6205,18255,2452491,2452461,'Sir','William','Hunter','N',4,7,1964,'NAURU','NULL','[email protected]','2452386' +1219,10396,'AAAAAAAAMJICAAAA',24791,6556,15968,2450865,2450835,'Mr.','Gerald','Manley','N',14,10,1939,'PORTUGAL','NULL','[email protected]','2452430' +1292,10208,'AAAAAAAAAOHCAAAA',26660,2359,9211,2452596,2452566,'Dr.','Lowell','Amos','N',26,10,1977,'SENEGAL','NULL','[email protected]','2452391' +1300,10343,'AAAAAAAAHGICAAAA',26829,222,42462,2451479,2451449,'Sir','Bruce','Grice','N',23,1,1949,'CAYMAN ISLANDS','NULL','[email protected]','2452313' +1738,10153,'AAAAAAAAJKHCAAAA',35544,1696,35848,2449126,2449096,'Mr.','Wesley','Hooker','Y',23,2,1986,'NEW ZEALAND','NULL','[email protected]','2452527' +1842,10353,'AAAAAAAABHICAAAA',37434,1053,15825,2451525,2451495,'Mrs.','Lucinda','Pierson','N',13,3,1980,'ARUBA','NULL','[email protected]','2452450' +1856,10316,'AAAAAAAAMEICAAAA',37920,2586,49299,2452571,2452541,'Ms.','Carolyn','Guajardo','Y',11,7,1931,'MYANMAR','NULL','[email protected]','2452600' +---- TYPES +BIGINT, INT, STRING, INT, INT, INT, INT, INT, STRING, STRING, STRING, STRING, INT, INT, INT, STRING, STRING, STRING, STRING +==== +---- QUERY +select file__position, * from customer_multiblock_page_index where c_first_sales_date_sk = 2449000; +---- RESULTS +1452,99451,'AAAAAAAALHEIBAAA',29723,723,35323,2449030,2449000,'Mrs.','Judy','Grant','N',28,1,1955,'INDIA','NULL','[email protected]','2452566' +---- TYPES +BIGINT, INT, STRING, INT, INT, INT, INT, INT, STRING, STRING, STRING, STRING, INT, INT, INT, STRING, STRING, STRING, STRING +==== +---- QUERY +select * from customer_nested_multiblock_multipage where c_custkey = 300; +---- RESULTS +300,'Customer#000000300','I0fJfo60DRqQ',7,'17-165-193-5964',8084.92,'AUTOMOBILE','p fluffily among the slyly express grouches. furiously express instruct' +---- TYPES +BIGINT, STRING, STRING, SMALLINT, STRING, DECIMAL, STRING, STRING +==== +---- QUERY +select * from customer_nested_multiblock_multipage where c_name = 'Customer#000000295'; +---- RESULTS +295,'Customer#000000295','mk649IH6njR14woTVZ1cxtlNs URxBHD5o5z2',0,'10-340-773-4322',9497.89,'HOUSEHOLD','play according to the quickly ironic instructions-- unusual, bol' +---- TYPES +BIGINT, STRING, STRING, SMALLINT, STRING, DECIMAL, STRING, STRING +==== +---- QUERY +select file__position, l_shipdate from customer_nested_multiblock_multipage c, c.c_orders.o_lineitems where l_shipdate='1998-11-26'; +---- RESULTS +80,'1998-11-26' +---- TYPES +BIGINT, STRING +==== +---- QUERY +select file__position, l_shipdate from customer_nested_multiblock_multipage c, c.c_orders.o_lineitems where l_partkey = 199994; +---- RESULTS +51,'1993-05-13' +82,'1994-01-17' +---- TYPES +BIGINT, STRING +==== diff --git a/tests/query_test/test_scanners.py b/tests/query_test/test_scanners.py index 76ed7a254..b4ab8b15b 100644 --- a/tests/query_test/test_scanners.py +++ b/tests/query_test/test_scanners.py @@ -500,12 +500,18 @@ class TestParquet(ImpalaTestSuite): def test_virtual_column_file_position_parquet(self, vector, unique_database): # Parquet-specific tests for virtual column FILE__POSITION create_table_from_parquet(self.client, unique_database, 'alltypes_tiny_pages') + create_table_from_parquet(self.client, unique_database, + 'customer_multiblock_page_index') + create_table_from_parquet(self.client, unique_database, + 'customer_nested_multiblock_multipage') new_vector = deepcopy(vector) for late_mat in [-1, 1, 17]: new_vector.get_value('exec_option')['parquet_late_materialization_threshold'] = \ late_mat - self.run_test_case('QueryTest/virtual-column-file-position-parquet', new_vector, - unique_database) + for read_stats in ['true', 'false']: + new_vector.get_value('exec_option')['parquet_read_statistics'] = read_stats + self.run_test_case('QueryTest/virtual-column-file-position-parquet', new_vector, + unique_database) def test_corrupt_files(self, vector): new_vector = deepcopy(vector)
