This is an automated email from the ASF dual-hosted git repository. joemcdonnell pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/impala.git
commit ca5de24d6ab035fee7089f4dcd473da37abf0d73 Author: Zoltan Borok-Nagy <[email protected]> AuthorDate: Mon May 22 18:44:30 2023 +0200 IMPALA-12153: Parquet STRUCT reader should fill position slots Before this patch the Parquet STRUCT reader didn't fill the position slots: collection position, file position. When users queried these virtual columns Impala was crashed or returned incorrect results. The ORC scanner already worked correctly, but there was no tests written for it. Test: * e2e tests for both ORC / Parquet Change-Id: I32a808a11f4543cd404ed9f3958e9b4e971ca1f4 Reviewed-on: http://gerrit.cloudera.org:8080/19911 Reviewed-by: Impala Public Jenkins <[email protected]> Tested-by: Impala Public Jenkins <[email protected]> --- be/src/exec/parquet/parquet-column-readers.cc | 12 -- be/src/exec/parquet/parquet-column-readers.h | 6 + .../exec/parquet/parquet-struct-column-reader.cc | 15 ++ .../queries/QueryTest/struct-positions.test | 200 +++++++++++++++++++++ tests/query_test/test_nested_types.py | 4 + 5 files changed, 225 insertions(+), 12 deletions(-) diff --git a/be/src/exec/parquet/parquet-column-readers.cc b/be/src/exec/parquet/parquet-column-readers.cc index 7e58b1c24..07b5e92de 100644 --- a/be/src/exec/parquet/parquet-column-readers.cc +++ b/be/src/exec/parquet/parquet-column-readers.cc @@ -158,10 +158,6 @@ class ScalarColumnReader : public BaseScalarColumnReader { /// It updates 'current_row_' when 'rep_level' is 0. inline ALWAYS_INLINE void ReadFilePositionBatched(int16_t rep_level, int64_t* file_pos); - /// Reads position into 'pos' and updates 'pos_current_value_' based on 'rep_level'. - /// It updates 'current_row_' when 'rep_level' is 0. - inline ALWAYS_INLINE void ReadItemPositionBatched(int16_t rep_level, int64_t* pos); - virtual Status CreateDictionaryDecoder( uint8_t* values, int size, DictDecoderBase** decoder) override { DCHECK(slot_desc_->type().type != TYPE_BOOLEAN) @@ -910,14 +906,6 @@ void ScalarColumnReader<InternalType, PARQUET_TYPE, MATERIALIZED>:: *file_pos = FilePositionOfCurrentRow(); } -template <typename InternalType, parquet::Type::type PARQUET_TYPE, bool MATERIALIZED> -void ScalarColumnReader<InternalType, PARQUET_TYPE, MATERIALIZED>:: - ReadItemPositionBatched(int16_t rep_level, int64_t* pos) { - // Reset position counter if we are at the start of a new parent collection. - if (rep_level <= max_rep_level() - 1) pos_current_value_ = 0; - *pos = pos_current_value_++; -} - template <typename InternalType, parquet::Type::type PARQUET_TYPE, bool MATERIALIZED> void ScalarColumnReader<InternalType, PARQUET_TYPE, MATERIALIZED>::ReadPositions( int64_t num_to_read, int tuple_size, uint8_t* RESTRICT tuple_mem) RESTRICT { diff --git a/be/src/exec/parquet/parquet-column-readers.h b/be/src/exec/parquet/parquet-column-readers.h index 7e5f21432..fa6189c1c 100644 --- a/be/src/exec/parquet/parquet-column-readers.h +++ b/be/src/exec/parquet/parquet-column-readers.h @@ -214,6 +214,12 @@ class ParquetColumnReader { return pos_slot_desc_ != nullptr || file_pos_slot_desc_ != nullptr; } + void ReadItemPositionBatched(int16_t rep_level, int64_t* pos) { + // Reset position counter if we are at the start of a new parent collection. + if (rep_level <= max_rep_level() - 1) pos_current_value_ = 0; + *pos = pos_current_value_++; + } + protected: HdfsParquetScanner* parent_; const SchemaNode& node_; diff --git a/be/src/exec/parquet/parquet-struct-column-reader.cc b/be/src/exec/parquet/parquet-struct-column-reader.cc index c58420d7c..675cb377b 100644 --- a/be/src/exec/parquet/parquet-struct-column-reader.cc +++ b/be/src/exec/parquet/parquet-struct-column-reader.cc @@ -27,6 +27,7 @@ bool StructColumnReader::NextLevels() { } def_level_ = children_[0]->def_level(); rep_level_ = children_[0]->rep_level(); + if (rep_level_ <= max_rep_level() - 1) pos_current_value_ = 0; return result; } @@ -54,6 +55,7 @@ bool StructColumnReader::ReadValue(MemPool* pool, Tuple* tuple, bool* read_row) def_level_ = children_[0]->def_level(); rep_level_ = children_[0]->rep_level(); + if (rep_level_ <= max_rep_level() - 1) pos_current_value_ = 0; return should_abort; } @@ -98,6 +100,19 @@ bool StructColumnReader::ReadValueBatch(MemPool* pool, int max_values, int tuple while (val_count < max_values && !RowGroupAtEnd() && continue_execution) { Tuple* tuple = reinterpret_cast<Tuple*>(tuple_mem + val_count * tuple_size); bool read_row = false; + // Fill in position slots if applicable + if (pos_slot_desc() != nullptr) { + DCHECK(file_pos_slot_desc() == nullptr); + ReadItemPositionBatched(rep_level_, + tuple->GetBigIntSlot(pos_slot_desc()->tuple_offset())); + } else if (file_pos_slot_desc() != nullptr) { + DCHECK(pos_slot_desc() == nullptr); + // It is OK to call the non-batched version because we let the child readers + // determine the LastProcessedRow() and we use the non-bached ReadValue() functions + // of the children. + ReadFilePositionNonBatched( + tuple->GetBigIntSlot(file_pos_slot_desc()->tuple_offset())); + } continue_execution = ReadValue<IN_COLLECTION>(pool, tuple, &read_row); if (read_row) ++val_count; if (SHOULD_TRIGGER_COL_READER_DEBUG_ACTION(val_count)) { diff --git a/testdata/workloads/functional-query/queries/QueryTest/struct-positions.test b/testdata/workloads/functional-query/queries/QueryTest/struct-positions.test new file mode 100644 index 000000000..c69d076ba --- /dev/null +++ b/testdata/workloads/functional-query/queries/QueryTest/struct-positions.test @@ -0,0 +1,200 @@ +==== +---- QUERY +# Let's just have the positions here for reference. +select id, file__position from complextypestbl; +---- RESULTS +1,0 +2,1 +3,2 +4,3 +5,4 +6,5 +7,6 +8,0 +---- TYPES +BIGINT, BIGINT +==== +---- QUERY +# Let's query the top-level sruct so we can verify the following results. +select id, file__position, nested_struct from complextypestbl; +---- RESULTS +1,0,'{"a":1,"b":[1],"c":{"d":[[{"e":10,"f":"aaa"},{"e":-10,"f":"bbb"}],[{"e":11,"f":"c"}]]},"g":{"foo":{"h":{"i":[1.1]}}}}' +2,1,'{"a":null,"b":[null],"c":{"d":[[{"e":null,"f":null},{"e":10,"f":"aaa"},{"e":null,"f":null},{"e":-10,"f":"bbb"},{"e":null,"f":null}],[{"e":11,"f":"c"},null],[],null]},"g":{"g1":{"h":{"i":[2.2,null]}},"g2":{"h":{"i":[]}},"g3":null,"g4":{"h":{"i":null}},"g5":{"h":null}}}' +3,2,'{"a":null,"b":null,"c":{"d":[]},"g":{}}' +4,3,'{"a":null,"b":null,"c":{"d":null},"g":null}' +5,4,'{"a":null,"b":null,"c":null,"g":{"foo":{"h":{"i":[2.2,3.3]}}}}' +6,5,'NULL' +7,6,'{"a":7,"b":[2,3,null],"c":{"d":[[],[null],null]},"g":null}' +8,0,'{"a":-1,"b":[-1],"c":{"d":[[{"e":-1,"f":"nonnullable"}]]},"g":{}}' +---- TYPES +BIGINT, BIGINT, STRING +==== +---- QUERY +# We only query the 'nested_struct' non-virtual column, so the STRUCT reader +# needs to set the file positions +select file__position, nested_struct from complextypestbl; +---- RESULTS +0,'{"a":1,"b":[1],"c":{"d":[[{"e":10,"f":"aaa"},{"e":-10,"f":"bbb"}],[{"e":11,"f":"c"}]]},"g":{"foo":{"h":{"i":[1.1]}}}}' +1,'{"a":null,"b":[null],"c":{"d":[[{"e":null,"f":null},{"e":10,"f":"aaa"},{"e":null,"f":null},{"e":-10,"f":"bbb"},{"e":null,"f":null}],[{"e":11,"f":"c"},null],[],null]},"g":{"g1":{"h":{"i":[2.2,null]}},"g2":{"h":{"i":[]}},"g3":null,"g4":{"h":{"i":null}},"g5":{"h":null}}}' +2,'{"a":null,"b":null,"c":{"d":[]},"g":{}}' +3,'{"a":null,"b":null,"c":{"d":null},"g":null}' +4,'{"a":null,"b":null,"c":null,"g":{"foo":{"h":{"i":[2.2,3.3]}}}}' +5,'NULL' +6,'{"a":7,"b":[2,3,null],"c":{"d":[[],[null],null]},"g":null}' +0,'{"a":-1,"b":[-1],"c":{"d":[[{"e":-1,"f":"nonnullable"}]]},"g":{}}' +---- TYPES +BIGINT, STRING +==== +---- QUERY +select file__position, nested_struct from complextypestbl +where file__position = 2; +---- RESULTS +2,'{"a":null,"b":null,"c":{"d":[]},"g":{}}' +---- TYPES +BIGINT, STRING +==== +---- QUERY +select file__position, nested_struct from complextypestbl +where nested_struct.a > 0; +---- RESULTS +0,'{"a":1,"b":[1],"c":{"d":[[{"e":10,"f":"aaa"},{"e":-10,"f":"bbb"}],[{"e":11,"f":"c"}]]},"g":{"foo":{"h":{"i":[1.1]}}}}' +6,'{"a":7,"b":[2,3,null],"c":{"d":[[],[null],null]},"g":null}' +---- TYPES +BIGINT, STRING +==== +---- QUERY +select id, file__position, item from complextypestbl c, c.nested_struct.c.d.item; +---- RESULTS +1,0,'{"e":10,"f":"aaa"}' +1,0,'{"e":-10,"f":"bbb"}' +1,0,'{"e":11,"f":"c"}' +2,1,'{"e":null,"f":null}' +2,1,'{"e":10,"f":"aaa"}' +2,1,'{"e":null,"f":null}' +2,1,'{"e":-10,"f":"bbb"}' +2,1,'{"e":null,"f":null}' +2,1,'{"e":11,"f":"c"}' +2,1,'NULL' +7,6,'NULL' +8,0,'{"e":-1,"f":"nonnullable"}' +---- TYPES +BIGINT, BIGINT, STRING +==== +---- QUERY +select file__position, item from complextypestbl c, c.nested_struct.c.d.item; +---- RESULTS +0,'{"e":10,"f":"aaa"}' +0,'{"e":-10,"f":"bbb"}' +0,'{"e":11,"f":"c"}' +1,'{"e":null,"f":null}' +1,'{"e":10,"f":"aaa"}' +1,'{"e":null,"f":null}' +1,'{"e":-10,"f":"bbb"}' +1,'{"e":null,"f":null}' +1,'{"e":11,"f":"c"}' +1,'NULL' +6,'NULL' +0,'{"e":-1,"f":"nonnullable"}' +---- TYPES +BIGINT, STRING +==== +---- QUERY +select pos, item from complextypestbl c, c.nested_struct.c.d.item; +---- RESULTS +0,'{"e":10,"f":"aaa"}' +1,'{"e":-10,"f":"bbb"}' +0,'{"e":11,"f":"c"}' +0,'{"e":null,"f":null}' +1,'{"e":10,"f":"aaa"}' +2,'{"e":null,"f":null}' +3,'{"e":-10,"f":"bbb"}' +4,'{"e":null,"f":null}' +0,'{"e":11,"f":"c"}' +1,'NULL' +0,'NULL' +0,'{"e":-1,"f":"nonnullable"}' +---- TYPES +BIGINT, STRING +==== +---- QUERY +select pos, item from complextypestbl c, c.nested_struct.c.d.item +where pos = 1; +---- RESULTS +1,'{"e":-10,"f":"bbb"}' +1,'{"e":10,"f":"aaa"}' +1,'NULL' +---- TYPES +BIGINT, STRING +==== +---- QUERY +select pos, item from complextypestbl c, c.nested_struct.c.d.item it +where it.e < 11; +---- RESULTS +0,'{"e":10,"f":"aaa"}' +1,'{"e":-10,"f":"bbb"}' +1,'{"e":10,"f":"aaa"}' +3,'{"e":-10,"f":"bbb"}' +0,'{"e":-1,"f":"nonnullable"}' +---- TYPES +BIGINT, STRING +==== +---- QUERY +# Queries like above, but with a flat tuple structure, i.e. the items +# are in the top-level (and only) tuple. +select pos, item from complextypestbl.nested_struct.c.d.item; +---- RESULTS +0,'{"e":10,"f":"aaa"}' +1,'{"e":-10,"f":"bbb"}' +0,'{"e":11,"f":"c"}' +0,'{"e":null,"f":null}' +1,'{"e":10,"f":"aaa"}' +2,'{"e":null,"f":null}' +3,'{"e":-10,"f":"bbb"}' +4,'{"e":null,"f":null}' +0,'{"e":11,"f":"c"}' +1,'NULL' +0,'NULL' +0,'{"e":-1,"f":"nonnullable"}' +---- TYPES +BIGINT, STRING +==== +---- QUERY +select pos, item from complextypestbl.nested_struct.c.d.item +where pos = 1; +---- RESULTS +1,'{"e":-10,"f":"bbb"}' +1,'{"e":10,"f":"aaa"}' +1,'NULL' +---- TYPES +BIGINT, STRING +==== +---- QUERY +select pos, item from complextypestbl.nested_struct.c.d.item it +where it.e < 11; +---- RESULTS +0,'{"e":10,"f":"aaa"}' +1,'{"e":-10,"f":"bbb"}' +1,'{"e":10,"f":"aaa"}' +3,'{"e":-10,"f":"bbb"}' +0,'{"e":-1,"f":"nonnullable"}' +---- TYPES +BIGINT, STRING +==== +---- QUERY +select file__position, pos, item from complextypestbl c, c.nested_struct.c.d.item; +---- RESULTS +0,0,'{"e":10,"f":"aaa"}' +0,1,'{"e":-10,"f":"bbb"}' +0,0,'{"e":11,"f":"c"}' +1,0,'{"e":null,"f":null}' +1,1,'{"e":10,"f":"aaa"}' +1,2,'{"e":null,"f":null}' +1,3,'{"e":-10,"f":"bbb"}' +1,4,'{"e":null,"f":null}' +1,0,'{"e":11,"f":"c"}' +1,1,'NULL' +6,0,'NULL' +0,0,'{"e":-1,"f":"nonnullable"}' +---- TYPES +BIGINT, BIGINT, STRING +==== diff --git a/tests/query_test/test_nested_types.py b/tests/query_test/test_nested_types.py index 17d924f48..ce3d060ff 100644 --- a/tests/query_test/test_nested_types.py +++ b/tests/query_test/test_nested_types.py @@ -152,6 +152,10 @@ class TestNestedStructsInSelectList(ImpalaTestSuite): new_vector.get_value('exec_option')['convert_legacy_hive_parquet_utc_timestamps'] = 1 self.run_test_case('QueryTest/nested-struct-in-select-list', new_vector) + def test_struct_positions(self, vector): + """Queries where structs and (file/collection) positions are used together""" + self.run_test_case('QueryTest/struct-positions', vector) + class TestNestedCollectionsInSelectList(ImpalaTestSuite): """Functional tests for nested arrays provided in the select list."""
