This is an automated email from the ASF dual-hosted git repository. morningman pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/master by this push: new 8caa5a9ba4 [Fix](mutli-catalog) Fix null partitions error in iceberg tables. (#22185) 8caa5a9ba4 is described below commit 8caa5a9ba45febf662ffc6efc14039086f3185f5 Author: Qi Chen <kaka11.c...@gmail.com> AuthorDate: Thu Jul 27 23:57:35 2023 +0800 [Fix](mutli-catalog) Fix null partitions error in iceberg tables. (#22185) ### Issue when partition has null partitions, it throws error `Failed to fill partition column: t_int=null` ### Resolution - Fix the following null partitions error in iceberg tables by replacing null partition to '\N'. - Add regression test for hive null partition. --- be/src/exec/text_converter.h | 2 + be/src/vec/exec/scan/vfile_scanner.cpp | 79 +++++++++------------- be/src/vec/exec/scan/vfile_scanner.h | 4 ++ .../hive/test_external_catalog_hive_partition.out | 73 ++++++++++++++++++++ .../test_external_catalog_iceberg_partition.out | 73 ++++++++++++++++++++ .../test_external_catalog_hive_partition.groovy | 67 ++++++++++++++++++ .../test_external_catalog_iceberg_partition.groovy | 63 +++++++++++++++++ 7 files changed, 313 insertions(+), 48 deletions(-) diff --git a/be/src/exec/text_converter.h b/be/src/exec/text_converter.h index b46594fd04..083c7c6881 100644 --- a/be/src/exec/text_converter.h +++ b/be/src/exec/text_converter.h @@ -29,6 +29,8 @@ class SlotDescriptor; // numeric types, etc. class TextConverter { public: + static constexpr char NULL_STR[3] = {'\\', 'N', '\0'}; + TextConverter(char escape_char, char array_delimiter = '\2'); void write_string_column(const SlotDescriptor* slot_desc, diff --git a/be/src/vec/exec/scan/vfile_scanner.cpp b/be/src/vec/exec/scan/vfile_scanner.cpp index eed73e98cb..4413f3deac 100644 --- a/be/src/vec/exec/scan/vfile_scanner.cpp +++ b/be/src/vec/exec/scan/vfile_scanner.cpp @@ -356,27 +356,14 @@ Status VFileScanner::_cast_to_input_block(Block* block) { } Status VFileScanner::_fill_columns_from_path(size_t rows) { - const TFileRangeDesc& range = _ranges.at(_next_range - 1); - if (range.__isset.columns_from_path && !_partition_slot_descs.empty()) { - SCOPED_TIMER(_fill_path_columns_timer); - for (const auto& slot_desc : _partition_slot_descs) { - if (slot_desc == nullptr) continue; - auto it = _partition_slot_index_map.find(slot_desc->id()); - if (it == std::end(_partition_slot_index_map)) { - std::stringstream ss; - ss << "Unknown source slot descriptor, slot_id=" << slot_desc->id(); - return Status::InternalError(ss.str()); - } - const std::string& column_from_path = range.columns_from_path[it->second]; - auto doris_column = _src_block_ptr->get_by_name(slot_desc->col_name()).column; - IColumn* col_ptr = const_cast<IColumn*>(doris_column.get()); - - if (!_text_converter->write_vec_column(slot_desc, col_ptr, - const_cast<char*>(column_from_path.c_str()), - column_from_path.size(), true, false, rows)) { - return Status::InternalError("Failed to fill partition column: {}={}", - slot_desc->col_name(), column_from_path); - } + for (auto& kv : *_partition_columns) { + auto doris_column = _src_block_ptr->get_by_name(kv.first).column; + IColumn* col_ptr = const_cast<IColumn*>(doris_column.get()); + auto& [value, slot_desc] = kv.second; + if (!_text_converter->write_vec_column(slot_desc, col_ptr, const_cast<char*>(value.c_str()), + value.size(), true, false, rows)) { + return Status::InternalError("Failed to fill partition column: {}={}", + slot_desc->col_name(), value); } } return Status::OK(); @@ -388,29 +375,15 @@ Status VFileScanner::_fill_missing_columns(size_t rows) { } SCOPED_TIMER(_fill_missing_columns_timer); - for (auto slot_desc : _real_tuple_desc->slots()) { - if (!slot_desc->is_materialized()) { - continue; - } - if (_missing_cols.find(slot_desc->col_name()) == _missing_cols.end()) { - continue; - } - - auto it = _col_default_value_ctx.find(slot_desc->col_name()); - if (it == _col_default_value_ctx.end()) { - return Status::InternalError("failed to find default value expr for slot: {}", - slot_desc->col_name()); - } - if (it->second == nullptr) { + for (auto& kv : *_missing_columns) { + if (kv.second == nullptr) { // no default column, fill with null auto nullable_column = reinterpret_cast<vectorized::ColumnNullable*>( - (*std::move(_src_block_ptr->get_by_name(slot_desc->col_name()).column)) - .mutate() - .get()); + (*std::move(_src_block_ptr->get_by_name(kv.first).column)).mutate().get()); nullable_column->insert_many_defaults(rows); } else { // fill with default value - auto& ctx = it->second; + auto& ctx = kv.second; auto origin_column_num = _src_block_ptr->columns(); int result_column_id = -1; // PT1 => dest primitive type @@ -426,10 +399,10 @@ Status VFileScanner::_fill_missing_columns(size_t rows) { auto result_column_ptr = _src_block_ptr->get_by_position(result_column_id).column; // result_column_ptr maybe a ColumnConst, convert it to a normal column result_column_ptr = result_column_ptr->convert_to_full_column_if_const(); - auto origin_column_type = _src_block_ptr->get_by_name(slot_desc->col_name()).type; + auto origin_column_type = _src_block_ptr->get_by_name(kv.first).type; bool is_nullable = origin_column_type->is_nullable(); _src_block_ptr->replace_by_position( - _src_block_ptr->get_position_by_name(slot_desc->col_name()), + _src_block_ptr->get_position_by_name(kv.first), is_nullable ? make_nullable(result_column_ptr) : result_column_ptr); _src_block_ptr->erase(result_column_id); } @@ -754,9 +727,9 @@ Status VFileScanner::_get_next_reader() { } Status VFileScanner::_generate_fill_columns() { - std::unordered_map<std::string, std::tuple<std::string, const SlotDescriptor*>> - partition_columns; - std::unordered_map<std::string, VExprContextSPtr> missing_columns; + _partition_columns.reset( + new std::unordered_map<std::string, std::tuple<std::string, const SlotDescriptor*>>()); + _missing_columns.reset(new std::unordered_map<std::string, VExprContextSPtr>()); const TFileRangeDesc& range = _ranges.at(_next_range - 1); if (range.__isset.columns_from_path && !_partition_slot_descs.empty()) { @@ -768,8 +741,13 @@ Status VFileScanner::_generate_fill_columns() { slot_desc->id()); } const std::string& column_from_path = range.columns_from_path[it->second]; - partition_columns.emplace(slot_desc->col_name(), - std::make_tuple(column_from_path, slot_desc)); + const char* data = column_from_path.c_str(); + size_t size = column_from_path.size(); + if (size == 4 && memcmp(data, "null", 4) == 0) { + data = TextConverter::NULL_STR; + } + _partition_columns->emplace(slot_desc->col_name(), + std::make_tuple(data, slot_desc)); } } } @@ -788,11 +766,16 @@ Status VFileScanner::_generate_fill_columns() { return Status::InternalError("failed to find default value expr for slot: {}", slot_desc->col_name()); } - missing_columns.emplace(slot_desc->col_name(), it->second); + _missing_columns->emplace(slot_desc->col_name(), it->second); } } - return _cur_reader->set_fill_columns(partition_columns, missing_columns); + RETURN_IF_ERROR(_cur_reader->set_fill_columns(*_partition_columns, *_missing_columns)); + if (_cur_reader->fill_all_columns()) { + _partition_columns.reset(nullptr); + _missing_columns.reset(nullptr); + } + return Status::OK(); } Status VFileScanner::_init_expr_ctxes() { diff --git a/be/src/vec/exec/scan/vfile_scanner.h b/be/src/vec/exec/scan/vfile_scanner.h index 9a16d056ed..3518a4e733 100644 --- a/be/src/vec/exec/scan/vfile_scanner.h +++ b/be/src/vec/exec/scan/vfile_scanner.h @@ -156,6 +156,10 @@ protected: std::unique_ptr<io::FileCacheStatistics> _file_cache_statistics; std::unique_ptr<io::IOContext> _io_ctx; + std::unique_ptr<std::unordered_map<std::string, std::tuple<std::string, const SlotDescriptor*>>> + _partition_columns; + std::unique_ptr<std::unordered_map<std::string, VExprContextSPtr>> _missing_columns; + private: RuntimeProfile::Counter* _get_block_timer = nullptr; RuntimeProfile::Counter* _open_reader_timer = nullptr; diff --git a/regression-test/data/external_table_emr_p2/hive/test_external_catalog_hive_partition.out b/regression-test/data/external_table_emr_p2/hive/test_external_catalog_hive_partition.out new file mode 100644 index 0000000000..5608999eb5 --- /dev/null +++ b/regression-test/data/external_table_emr_p2/hive/test_external_catalog_hive_partition.out @@ -0,0 +1,73 @@ +-- This file is automatically generated. You should know what you did if you want to edit this +-- !q01 -- +0.1 test1 2023-01-01T00:00 \N +0.2 test2 2023-01-02T00:00 \N +0.3 test3 2023-01-03T00:00 100 + +-- !q02 -- +0.1 test1 2023-01-01T00:00 \N +0.2 test2 2023-01-02T00:00 \N + +-- !q03 -- +0.3 test3 2023-01-03T00:00 100 + +-- !q04 -- +2023-01-01T00:00 \N 0.1 test1 +2023-01-02T00:00 \N 0.2 test2 +2023-01-03T00:00 100 0.3 test3 + +-- !q05 -- +2023-01-01T00:00 \N 0.1 test1 +2023-01-02T00:00 \N 0.2 test2 + +-- !q06 -- +2023-01-03T00:00 100 0.3 test3 + +-- !q01 -- +0.1 test1 2023-01-01T00:00 \N +0.2 test2 2023-01-02T00:00 \N +0.3 test3 2023-01-03T00:00 100 + +-- !q02 -- +0.1 test1 2023-01-01T00:00 \N +0.2 test2 2023-01-02T00:00 \N + +-- !q03 -- +0.3 test3 2023-01-03T00:00 100 + +-- !q04 -- +2023-01-01T00:00 \N 0.1 test1 +2023-01-02T00:00 \N 0.2 test2 +2023-01-03T00:00 100 0.3 test3 + +-- !q05 -- +2023-01-01T00:00 \N 0.1 test1 +2023-01-02T00:00 \N 0.2 test2 + +-- !q06 -- +2023-01-03T00:00 100 0.3 test3 + +-- !q01 -- +0.1 test1 2023-01-01T00:00 \N +0.2 test2 2023-01-02T00:00 \N +0.3 test3 2023-01-03T00:00 100 + +-- !q02 -- +0.1 test1 2023-01-01T00:00 \N +0.2 test2 2023-01-02T00:00 \N + +-- !q03 -- +0.3 test3 2023-01-03T00:00 100 + +-- !q04 -- +2023-01-01T00:00 \N 0.1 test1 +2023-01-02T00:00 \N 0.2 test2 +2023-01-03T00:00 100 0.3 test3 + +-- !q05 -- +2023-01-01T00:00 \N 0.1 test1 +2023-01-02T00:00 \N 0.2 test2 + +-- !q06 -- +2023-01-03T00:00 100 0.3 test3 + diff --git a/regression-test/data/external_table_emr_p2/iceberg/test_external_catalog_iceberg_partition.out b/regression-test/data/external_table_emr_p2/iceberg/test_external_catalog_iceberg_partition.out new file mode 100644 index 0000000000..c2582691cc --- /dev/null +++ b/regression-test/data/external_table_emr_p2/iceberg/test_external_catalog_iceberg_partition.out @@ -0,0 +1,73 @@ +-- This file is automatically generated. You should know what you did if you want to edit this +-- !q01 -- +\N 0.1 test1 2023-01-01T00:00 +\N 0.2 test2 2023-01-02T00:00 +100 0.3 test3 2023-01-03T00:00 + +-- !q02 -- +\N 0.1 test1 2023-01-01T00:00 +\N 0.2 test2 2023-01-02T00:00 + +-- !q03 -- +100 0.3 test3 2023-01-03T00:00 + +-- !q04 -- +\N 0.1 test1 2023-01-01T00:00 +\N 0.2 test2 2023-01-02T00:00 +100 0.3 test3 2023-01-03T00:00 + +-- !q05 -- +\N 0.1 test1 2023-01-01T00:00 +\N 0.2 test2 2023-01-02T00:00 + +-- !q06 -- +100 0.3 test3 2023-01-03T00:00 + +-- !q07 -- +\N 0.1 test1 2023-01-01T00:00 +\N 0.2 test2 2023-01-02T00:00 +100 0.3 test3 2023-01-03T00:00 + +-- !q08 -- +\N 0.1 test1 2023-01-01T00:00 +\N 0.2 test2 2023-01-02T00:00 + +-- !q09 -- +100 0.3 test3 2023-01-03T00:00 + +-- !q01 -- +\N 0.1 test1 2023-01-01T00:00 +\N 0.2 test2 2023-01-02T00:00 +100 0.3 test3 2023-01-03T00:00 + +-- !q02 -- +\N 0.1 test1 2023-01-01T00:00 +\N 0.2 test2 2023-01-02T00:00 + +-- !q03 -- +100 0.3 test3 2023-01-03T00:00 + +-- !q04 -- +\N 0.1 test1 2023-01-01T00:00 +\N 0.2 test2 2023-01-02T00:00 +100 0.3 test3 2023-01-03T00:00 + +-- !q05 -- +\N 0.1 test1 2023-01-01T00:00 +\N 0.2 test2 2023-01-02T00:00 + +-- !q06 -- +100 0.3 test3 2023-01-03T00:00 + +-- !q07 -- +\N 0.1 test1 2023-01-01T00:00 +\N 0.2 test2 2023-01-02T00:00 +100 0.3 test3 2023-01-03T00:00 + +-- !q08 -- +\N 0.1 test1 2023-01-01T00:00 +\N 0.2 test2 2023-01-02T00:00 + +-- !q09 -- +100 0.3 test3 2023-01-03T00:00 + diff --git a/regression-test/suites/external_table_emr_p2/hive/test_external_catalog_hive_partition.groovy b/regression-test/suites/external_table_emr_p2/hive/test_external_catalog_hive_partition.groovy new file mode 100644 index 0000000000..fc6e7fbc23 --- /dev/null +++ b/regression-test/suites/external_table_emr_p2/hive/test_external_catalog_hive_partition.groovy @@ -0,0 +1,67 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +suite("test_external_catalog_hive_partition", "p2") { + String enabled = context.config.otherConfigs.get("enableExternalHiveTest") + if (enabled != null && enabled.equalsIgnoreCase("true")) { + String extHiveHmsHost = context.config.otherConfigs.get("extHiveHmsHost") + String extHiveHmsPort = context.config.otherConfigs.get("extHiveHmsPort") + String catalog_name = "test_external_catalog_hive_partition" + + sql """drop catalog if exists ${catalog_name};""" + sql """ + create catalog if not exists ${catalog_name} properties ( + 'type'='hms', + 'hive.metastore.uris' = 'thrift://${extHiveHmsHost}:${extHiveHmsPort}' + ); + """ + + sql """switch ${catalog_name};""" + // test parquet format + def q01_parquet = { + qt_q01 """ select * from multi_catalog.parquet_partitioned_one_column order by t_float """ + qt_q02 """ select * from multi_catalog.parquet_partitioned_one_column where t_int is null order by t_float """ + qt_q03 """ select * from multi_catalog.parquet_partitioned_one_column where t_int is not null order by t_float """ + qt_q04 """ select * from multi_catalog.parquet_partitioned_columns order by t_float """ + qt_q05 """ select * from multi_catalog.parquet_partitioned_columns where t_int is null order by t_float """ + qt_q06 """ select * from multi_catalog.parquet_partitioned_columns where t_int is not null order by t_float """ + } + // test orc format + def q01_orc = { + qt_q01 """ select * from multi_catalog.orc_partitioned_one_column order by t_float """ + qt_q02 """ select * from multi_catalog.orc_partitioned_one_column where t_int is null order by t_float """ + qt_q03 """ select * from multi_catalog.orc_partitioned_one_column where t_int is not null order by t_float """ + qt_q04 """ select * from multi_catalog.orc_partitioned_columns order by t_float """ + qt_q05 """ select * from multi_catalog.orc_partitioned_columns where t_int is null order by t_float """ + qt_q06 """ select * from multi_catalog.orc_partitioned_columns where t_int is not null order by t_float """ + } + // test text format + def q01_text = { + qt_q01 """ select * from multi_catalog.text_partitioned_one_column order by t_float """ + qt_q02 """ select * from multi_catalog.text_partitioned_one_column where t_int is null order by t_float """ + qt_q03 """ select * from multi_catalog.text_partitioned_one_column where t_int is not null order by t_float """ + qt_q04 """ select * from multi_catalog.text_partitioned_columns order by t_float """ + qt_q05 """ select * from multi_catalog.text_partitioned_columns where t_int is null order by t_float """ + qt_q06 """ select * from multi_catalog.text_partitioned_columns where t_int is not null order by t_float """ + } + sql """ use `multi_catalog`; """ + q01_parquet() + q01_orc() + q01_text() + } +} + diff --git a/regression-test/suites/external_table_emr_p2/iceberg/test_external_catalog_iceberg_partition.groovy b/regression-test/suites/external_table_emr_p2/iceberg/test_external_catalog_iceberg_partition.groovy new file mode 100644 index 0000000000..9179f22a06 --- /dev/null +++ b/regression-test/suites/external_table_emr_p2/iceberg/test_external_catalog_iceberg_partition.groovy @@ -0,0 +1,63 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +suite("test_external_catalog_iceberg_partition", "p2") { + String enabled = context.config.otherConfigs.get("enableExternalHiveTest") + if (enabled != null && enabled.equalsIgnoreCase("true")) { + String extHiveHmsHost = context.config.otherConfigs.get("extHiveHmsHost") + String extHiveHmsPort = context.config.otherConfigs.get("extHiveHmsPort") + String catalog_name = "test_external_catalog_iceberg_partition" + + sql """drop catalog if exists ${catalog_name};""" + sql """ + create catalog if not exists ${catalog_name} properties ( + 'type'='hms', + 'hive.metastore.uris' = 'thrift://${extHiveHmsHost}:${extHiveHmsPort}' + ); + """ + + sql """switch ${catalog_name};""" + // test parquet format + def q01_parquet = { + qt_q01 """ select * from iceberg_catalog.parquet_partitioned_one_column order by t_float """ + qt_q02 """ select * from iceberg_catalog.parquet_partitioned_one_column where t_int is null order by t_float """ + qt_q03 """ select * from iceberg_catalog.parquet_partitioned_one_column where t_int is not null order by t_float """ + qt_q04 """ select * from iceberg_catalog.parquet_partitioned_columns order by t_float """ + qt_q05 """ select * from iceberg_catalog.parquet_partitioned_columns where t_int is null order by t_float """ + qt_q06 """ select * from iceberg_catalog.parquet_partitioned_columns where t_int is not null order by t_float """ + qt_q07 """ select * from iceberg_catalog.parquet_partitioned_truncate_and_fields order by t_float """ + qt_q08 """ select * from iceberg_catalog.parquet_partitioned_truncate_and_fields where t_int is null order by t_float """ + qt_q09 """ select * from iceberg_catalog.parquet_partitioned_truncate_and_fields where t_int is not null order by t_float """ + } + // test orc format + def q01_orc = { + qt_q01 """ select * from iceberg_catalog.orc_partitioned_one_column order by t_float """ + qt_q02 """ select * from iceberg_catalog.orc_partitioned_one_column where t_int is null order by t_float """ + qt_q03 """ select * from iceberg_catalog.orc_partitioned_one_column where t_int is not null order by t_float """ + qt_q04 """ select * from iceberg_catalog.orc_partitioned_columns order by t_float """ + qt_q05 """ select * from iceberg_catalog.orc_partitioned_columns where t_int is null order by t_float """ + qt_q06 """ select * from iceberg_catalog.orc_partitioned_columns where t_int is not null order by t_float """ + qt_q07 """ select * from iceberg_catalog.orc_partitioned_truncate_and_fields order by t_float """ + qt_q08 """ select * from iceberg_catalog.orc_partitioned_truncate_and_fields where t_int is null order by t_float """ + qt_q09 """ select * from iceberg_catalog.orc_partitioned_truncate_and_fields where t_int is not null order by t_float """ + } + sql """ use `iceberg_catalog`; """ + q01_parquet() + q01_orc() + } +} + --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org For additional commands, e-mail: commits-h...@doris.apache.org