This is an automated email from the ASF dual-hosted git repository.

morningman pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/doris.git


The following commit(s) were added to refs/heads/master by this push:
     new 8caa5a9ba4 [Fix](mutli-catalog) Fix null partitions error in iceberg 
tables. (#22185)
8caa5a9ba4 is described below

commit 8caa5a9ba45febf662ffc6efc14039086f3185f5
Author: Qi Chen <kaka11.c...@gmail.com>
AuthorDate: Thu Jul 27 23:57:35 2023 +0800

    [Fix](mutli-catalog) Fix null partitions error in iceberg tables. (#22185)
    
    ### Issue
    when partition has null partitions, it throws error
    `Failed to fill partition column: t_int=null`
    
    ### Resolution
    - Fix the following null partitions error in iceberg tables by replacing 
null partition to '\N'.
    - Add regression test for hive null partition.
---
 be/src/exec/text_converter.h                       |  2 +
 be/src/vec/exec/scan/vfile_scanner.cpp             | 79 +++++++++-------------
 be/src/vec/exec/scan/vfile_scanner.h               |  4 ++
 .../hive/test_external_catalog_hive_partition.out  | 73 ++++++++++++++++++++
 .../test_external_catalog_iceberg_partition.out    | 73 ++++++++++++++++++++
 .../test_external_catalog_hive_partition.groovy    | 67 ++++++++++++++++++
 .../test_external_catalog_iceberg_partition.groovy | 63 +++++++++++++++++
 7 files changed, 313 insertions(+), 48 deletions(-)

diff --git a/be/src/exec/text_converter.h b/be/src/exec/text_converter.h
index b46594fd04..083c7c6881 100644
--- a/be/src/exec/text_converter.h
+++ b/be/src/exec/text_converter.h
@@ -29,6 +29,8 @@ class SlotDescriptor;
 // numeric types, etc.
 class TextConverter {
 public:
+    static constexpr char NULL_STR[3] = {'\\', 'N', '\0'};
+
     TextConverter(char escape_char, char array_delimiter = '\2');
 
     void write_string_column(const SlotDescriptor* slot_desc,
diff --git a/be/src/vec/exec/scan/vfile_scanner.cpp 
b/be/src/vec/exec/scan/vfile_scanner.cpp
index eed73e98cb..4413f3deac 100644
--- a/be/src/vec/exec/scan/vfile_scanner.cpp
+++ b/be/src/vec/exec/scan/vfile_scanner.cpp
@@ -356,27 +356,14 @@ Status VFileScanner::_cast_to_input_block(Block* block) {
 }
 
 Status VFileScanner::_fill_columns_from_path(size_t rows) {
-    const TFileRangeDesc& range = _ranges.at(_next_range - 1);
-    if (range.__isset.columns_from_path && !_partition_slot_descs.empty()) {
-        SCOPED_TIMER(_fill_path_columns_timer);
-        for (const auto& slot_desc : _partition_slot_descs) {
-            if (slot_desc == nullptr) continue;
-            auto it = _partition_slot_index_map.find(slot_desc->id());
-            if (it == std::end(_partition_slot_index_map)) {
-                std::stringstream ss;
-                ss << "Unknown source slot descriptor, slot_id=" << 
slot_desc->id();
-                return Status::InternalError(ss.str());
-            }
-            const std::string& column_from_path = 
range.columns_from_path[it->second];
-            auto doris_column = 
_src_block_ptr->get_by_name(slot_desc->col_name()).column;
-            IColumn* col_ptr = const_cast<IColumn*>(doris_column.get());
-
-            if (!_text_converter->write_vec_column(slot_desc, col_ptr,
-                                                   
const_cast<char*>(column_from_path.c_str()),
-                                                   column_from_path.size(), 
true, false, rows)) {
-                return Status::InternalError("Failed to fill partition column: 
{}={}",
-                                             slot_desc->col_name(), 
column_from_path);
-            }
+    for (auto& kv : *_partition_columns) {
+        auto doris_column = _src_block_ptr->get_by_name(kv.first).column;
+        IColumn* col_ptr = const_cast<IColumn*>(doris_column.get());
+        auto& [value, slot_desc] = kv.second;
+        if (!_text_converter->write_vec_column(slot_desc, col_ptr, 
const_cast<char*>(value.c_str()),
+                                               value.size(), true, false, 
rows)) {
+            return Status::InternalError("Failed to fill partition column: 
{}={}",
+                                         slot_desc->col_name(), value);
         }
     }
     return Status::OK();
@@ -388,29 +375,15 @@ Status VFileScanner::_fill_missing_columns(size_t rows) {
     }
 
     SCOPED_TIMER(_fill_missing_columns_timer);
-    for (auto slot_desc : _real_tuple_desc->slots()) {
-        if (!slot_desc->is_materialized()) {
-            continue;
-        }
-        if (_missing_cols.find(slot_desc->col_name()) == _missing_cols.end()) {
-            continue;
-        }
-
-        auto it = _col_default_value_ctx.find(slot_desc->col_name());
-        if (it == _col_default_value_ctx.end()) {
-            return Status::InternalError("failed to find default value expr 
for slot: {}",
-                                         slot_desc->col_name());
-        }
-        if (it->second == nullptr) {
+    for (auto& kv : *_missing_columns) {
+        if (kv.second == nullptr) {
             // no default column, fill with null
             auto nullable_column = 
reinterpret_cast<vectorized::ColumnNullable*>(
-                    
(*std::move(_src_block_ptr->get_by_name(slot_desc->col_name()).column))
-                            .mutate()
-                            .get());
+                    
(*std::move(_src_block_ptr->get_by_name(kv.first).column)).mutate().get());
             nullable_column->insert_many_defaults(rows);
         } else {
             // fill with default value
-            auto& ctx = it->second;
+            auto& ctx = kv.second;
             auto origin_column_num = _src_block_ptr->columns();
             int result_column_id = -1;
             // PT1 => dest primitive type
@@ -426,10 +399,10 @@ Status VFileScanner::_fill_missing_columns(size_t rows) {
                 auto result_column_ptr = 
_src_block_ptr->get_by_position(result_column_id).column;
                 // result_column_ptr maybe a ColumnConst, convert it to a 
normal column
                 result_column_ptr = 
result_column_ptr->convert_to_full_column_if_const();
-                auto origin_column_type = 
_src_block_ptr->get_by_name(slot_desc->col_name()).type;
+                auto origin_column_type = 
_src_block_ptr->get_by_name(kv.first).type;
                 bool is_nullable = origin_column_type->is_nullable();
                 _src_block_ptr->replace_by_position(
-                        
_src_block_ptr->get_position_by_name(slot_desc->col_name()),
+                        _src_block_ptr->get_position_by_name(kv.first),
                         is_nullable ? make_nullable(result_column_ptr) : 
result_column_ptr);
                 _src_block_ptr->erase(result_column_id);
             }
@@ -754,9 +727,9 @@ Status VFileScanner::_get_next_reader() {
 }
 
 Status VFileScanner::_generate_fill_columns() {
-    std::unordered_map<std::string, std::tuple<std::string, const 
SlotDescriptor*>>
-            partition_columns;
-    std::unordered_map<std::string, VExprContextSPtr> missing_columns;
+    _partition_columns.reset(
+            new std::unordered_map<std::string, std::tuple<std::string, const 
SlotDescriptor*>>());
+    _missing_columns.reset(new std::unordered_map<std::string, 
VExprContextSPtr>());
 
     const TFileRangeDesc& range = _ranges.at(_next_range - 1);
     if (range.__isset.columns_from_path && !_partition_slot_descs.empty()) {
@@ -768,8 +741,13 @@ Status VFileScanner::_generate_fill_columns() {
                                                  slot_desc->id());
                 }
                 const std::string& column_from_path = 
range.columns_from_path[it->second];
-                partition_columns.emplace(slot_desc->col_name(),
-                                          std::make_tuple(column_from_path, 
slot_desc));
+                const char* data = column_from_path.c_str();
+                size_t size = column_from_path.size();
+                if (size == 4 && memcmp(data, "null", 4) == 0) {
+                    data = TextConverter::NULL_STR;
+                }
+                _partition_columns->emplace(slot_desc->col_name(),
+                                            std::make_tuple(data, slot_desc));
             }
         }
     }
@@ -788,11 +766,16 @@ Status VFileScanner::_generate_fill_columns() {
                 return Status::InternalError("failed to find default value 
expr for slot: {}",
                                              slot_desc->col_name());
             }
-            missing_columns.emplace(slot_desc->col_name(), it->second);
+            _missing_columns->emplace(slot_desc->col_name(), it->second);
         }
     }
 
-    return _cur_reader->set_fill_columns(partition_columns, missing_columns);
+    RETURN_IF_ERROR(_cur_reader->set_fill_columns(*_partition_columns, 
*_missing_columns));
+    if (_cur_reader->fill_all_columns()) {
+        _partition_columns.reset(nullptr);
+        _missing_columns.reset(nullptr);
+    }
+    return Status::OK();
 }
 
 Status VFileScanner::_init_expr_ctxes() {
diff --git a/be/src/vec/exec/scan/vfile_scanner.h 
b/be/src/vec/exec/scan/vfile_scanner.h
index 9a16d056ed..3518a4e733 100644
--- a/be/src/vec/exec/scan/vfile_scanner.h
+++ b/be/src/vec/exec/scan/vfile_scanner.h
@@ -156,6 +156,10 @@ protected:
     std::unique_ptr<io::FileCacheStatistics> _file_cache_statistics;
     std::unique_ptr<io::IOContext> _io_ctx;
 
+    std::unique_ptr<std::unordered_map<std::string, std::tuple<std::string, 
const SlotDescriptor*>>>
+            _partition_columns;
+    std::unique_ptr<std::unordered_map<std::string, VExprContextSPtr>> 
_missing_columns;
+
 private:
     RuntimeProfile::Counter* _get_block_timer = nullptr;
     RuntimeProfile::Counter* _open_reader_timer = nullptr;
diff --git 
a/regression-test/data/external_table_emr_p2/hive/test_external_catalog_hive_partition.out
 
b/regression-test/data/external_table_emr_p2/hive/test_external_catalog_hive_partition.out
new file mode 100644
index 0000000000..5608999eb5
--- /dev/null
+++ 
b/regression-test/data/external_table_emr_p2/hive/test_external_catalog_hive_partition.out
@@ -0,0 +1,73 @@
+-- This file is automatically generated. You should know what you did if you 
want to edit this
+-- !q01 --
+0.1    test1   2023-01-01T00:00        \N
+0.2    test2   2023-01-02T00:00        \N
+0.3    test3   2023-01-03T00:00        100
+
+-- !q02 --
+0.1    test1   2023-01-01T00:00        \N
+0.2    test2   2023-01-02T00:00        \N
+
+-- !q03 --
+0.3    test3   2023-01-03T00:00        100
+
+-- !q04 --
+2023-01-01T00:00       \N      0.1     test1
+2023-01-02T00:00       \N      0.2     test2
+2023-01-03T00:00       100     0.3     test3
+
+-- !q05 --
+2023-01-01T00:00       \N      0.1     test1
+2023-01-02T00:00       \N      0.2     test2
+
+-- !q06 --
+2023-01-03T00:00       100     0.3     test3
+
+-- !q01 --
+0.1    test1   2023-01-01T00:00        \N
+0.2    test2   2023-01-02T00:00        \N
+0.3    test3   2023-01-03T00:00        100
+
+-- !q02 --
+0.1    test1   2023-01-01T00:00        \N
+0.2    test2   2023-01-02T00:00        \N
+
+-- !q03 --
+0.3    test3   2023-01-03T00:00        100
+
+-- !q04 --
+2023-01-01T00:00       \N      0.1     test1
+2023-01-02T00:00       \N      0.2     test2
+2023-01-03T00:00       100     0.3     test3
+
+-- !q05 --
+2023-01-01T00:00       \N      0.1     test1
+2023-01-02T00:00       \N      0.2     test2
+
+-- !q06 --
+2023-01-03T00:00       100     0.3     test3
+
+-- !q01 --
+0.1    test1   2023-01-01T00:00        \N
+0.2    test2   2023-01-02T00:00        \N
+0.3    test3   2023-01-03T00:00        100
+
+-- !q02 --
+0.1    test1   2023-01-01T00:00        \N
+0.2    test2   2023-01-02T00:00        \N
+
+-- !q03 --
+0.3    test3   2023-01-03T00:00        100
+
+-- !q04 --
+2023-01-01T00:00       \N      0.1     test1
+2023-01-02T00:00       \N      0.2     test2
+2023-01-03T00:00       100     0.3     test3
+
+-- !q05 --
+2023-01-01T00:00       \N      0.1     test1
+2023-01-02T00:00       \N      0.2     test2
+
+-- !q06 --
+2023-01-03T00:00       100     0.3     test3
+
diff --git 
a/regression-test/data/external_table_emr_p2/iceberg/test_external_catalog_iceberg_partition.out
 
b/regression-test/data/external_table_emr_p2/iceberg/test_external_catalog_iceberg_partition.out
new file mode 100644
index 0000000000..c2582691cc
--- /dev/null
+++ 
b/regression-test/data/external_table_emr_p2/iceberg/test_external_catalog_iceberg_partition.out
@@ -0,0 +1,73 @@
+-- This file is automatically generated. You should know what you did if you 
want to edit this
+-- !q01 --
+\N     0.1     test1   2023-01-01T00:00
+\N     0.2     test2   2023-01-02T00:00
+100    0.3     test3   2023-01-03T00:00
+
+-- !q02 --
+\N     0.1     test1   2023-01-01T00:00
+\N     0.2     test2   2023-01-02T00:00
+
+-- !q03 --
+100    0.3     test3   2023-01-03T00:00
+
+-- !q04 --
+\N     0.1     test1   2023-01-01T00:00
+\N     0.2     test2   2023-01-02T00:00
+100    0.3     test3   2023-01-03T00:00
+
+-- !q05 --
+\N     0.1     test1   2023-01-01T00:00
+\N     0.2     test2   2023-01-02T00:00
+
+-- !q06 --
+100    0.3     test3   2023-01-03T00:00
+
+-- !q07 --
+\N     0.1     test1   2023-01-01T00:00
+\N     0.2     test2   2023-01-02T00:00
+100    0.3     test3   2023-01-03T00:00
+
+-- !q08 --
+\N     0.1     test1   2023-01-01T00:00
+\N     0.2     test2   2023-01-02T00:00
+
+-- !q09 --
+100    0.3     test3   2023-01-03T00:00
+
+-- !q01 --
+\N     0.1     test1   2023-01-01T00:00
+\N     0.2     test2   2023-01-02T00:00
+100    0.3     test3   2023-01-03T00:00
+
+-- !q02 --
+\N     0.1     test1   2023-01-01T00:00
+\N     0.2     test2   2023-01-02T00:00
+
+-- !q03 --
+100    0.3     test3   2023-01-03T00:00
+
+-- !q04 --
+\N     0.1     test1   2023-01-01T00:00
+\N     0.2     test2   2023-01-02T00:00
+100    0.3     test3   2023-01-03T00:00
+
+-- !q05 --
+\N     0.1     test1   2023-01-01T00:00
+\N     0.2     test2   2023-01-02T00:00
+
+-- !q06 --
+100    0.3     test3   2023-01-03T00:00
+
+-- !q07 --
+\N     0.1     test1   2023-01-01T00:00
+\N     0.2     test2   2023-01-02T00:00
+100    0.3     test3   2023-01-03T00:00
+
+-- !q08 --
+\N     0.1     test1   2023-01-01T00:00
+\N     0.2     test2   2023-01-02T00:00
+
+-- !q09 --
+100    0.3     test3   2023-01-03T00:00
+
diff --git 
a/regression-test/suites/external_table_emr_p2/hive/test_external_catalog_hive_partition.groovy
 
b/regression-test/suites/external_table_emr_p2/hive/test_external_catalog_hive_partition.groovy
new file mode 100644
index 0000000000..fc6e7fbc23
--- /dev/null
+++ 
b/regression-test/suites/external_table_emr_p2/hive/test_external_catalog_hive_partition.groovy
@@ -0,0 +1,67 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+suite("test_external_catalog_hive_partition", "p2") {
+    String enabled = context.config.otherConfigs.get("enableExternalHiveTest")
+    if (enabled != null && enabled.equalsIgnoreCase("true")) {
+        String extHiveHmsHost = 
context.config.otherConfigs.get("extHiveHmsHost")
+        String extHiveHmsPort = 
context.config.otherConfigs.get("extHiveHmsPort")
+        String catalog_name = "test_external_catalog_hive_partition"
+
+        sql """drop catalog if exists ${catalog_name};"""
+        sql """
+            create catalog if not exists ${catalog_name} properties (
+                'type'='hms',
+                'hive.metastore.uris' = 
'thrift://${extHiveHmsHost}:${extHiveHmsPort}'
+            );
+        """
+
+        sql """switch ${catalog_name};"""
+        // test parquet format
+        def q01_parquet = {
+            qt_q01 """ select * from 
multi_catalog.parquet_partitioned_one_column order by t_float """
+            qt_q02 """ select * from 
multi_catalog.parquet_partitioned_one_column where t_int is null order by 
t_float """
+            qt_q03 """ select * from 
multi_catalog.parquet_partitioned_one_column where t_int is not null order by 
t_float """
+            qt_q04 """ select * from multi_catalog.parquet_partitioned_columns 
order by t_float """
+            qt_q05 """ select * from multi_catalog.parquet_partitioned_columns 
where t_int is null order by t_float """
+            qt_q06 """ select * from multi_catalog.parquet_partitioned_columns 
where t_int is not null order by t_float """
+        }
+        // test orc format
+        def q01_orc = {
+            qt_q01 """ select * from multi_catalog.orc_partitioned_one_column 
order by t_float """
+            qt_q02 """ select * from multi_catalog.orc_partitioned_one_column 
where t_int is null order by t_float """
+            qt_q03 """ select * from multi_catalog.orc_partitioned_one_column 
where t_int is not null order by t_float """
+            qt_q04 """ select * from multi_catalog.orc_partitioned_columns 
order by t_float """
+            qt_q05 """ select * from multi_catalog.orc_partitioned_columns 
where t_int is null order by t_float """
+            qt_q06 """ select * from multi_catalog.orc_partitioned_columns 
where t_int is not null order by t_float """
+        }
+        // test text format
+        def q01_text = {
+            qt_q01 """ select * from multi_catalog.text_partitioned_one_column 
order by t_float """
+            qt_q02 """ select * from multi_catalog.text_partitioned_one_column 
where t_int is null order by t_float """
+            qt_q03 """ select * from multi_catalog.text_partitioned_one_column 
where t_int is not null order by t_float """
+            qt_q04 """ select * from multi_catalog.text_partitioned_columns 
order by t_float """
+            qt_q05 """ select * from multi_catalog.text_partitioned_columns 
where t_int is null order by t_float """
+            qt_q06 """ select * from multi_catalog.text_partitioned_columns 
where t_int is not null order by t_float """
+        }
+        sql """ use `multi_catalog`; """
+        q01_parquet()
+        q01_orc()
+        q01_text()
+    }
+}
+
diff --git 
a/regression-test/suites/external_table_emr_p2/iceberg/test_external_catalog_iceberg_partition.groovy
 
b/regression-test/suites/external_table_emr_p2/iceberg/test_external_catalog_iceberg_partition.groovy
new file mode 100644
index 0000000000..9179f22a06
--- /dev/null
+++ 
b/regression-test/suites/external_table_emr_p2/iceberg/test_external_catalog_iceberg_partition.groovy
@@ -0,0 +1,63 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+suite("test_external_catalog_iceberg_partition", "p2") {
+    String enabled = context.config.otherConfigs.get("enableExternalHiveTest")
+    if (enabled != null && enabled.equalsIgnoreCase("true")) {
+        String extHiveHmsHost = 
context.config.otherConfigs.get("extHiveHmsHost")
+        String extHiveHmsPort = 
context.config.otherConfigs.get("extHiveHmsPort")
+        String catalog_name = "test_external_catalog_iceberg_partition"
+
+        sql """drop catalog if exists ${catalog_name};"""
+        sql """
+            create catalog if not exists ${catalog_name} properties (
+                'type'='hms',
+                'hive.metastore.uris' = 
'thrift://${extHiveHmsHost}:${extHiveHmsPort}'
+            );
+        """
+
+        sql """switch ${catalog_name};"""
+        // test parquet format
+        def q01_parquet = {
+            qt_q01 """ select * from 
iceberg_catalog.parquet_partitioned_one_column order by t_float """
+            qt_q02 """ select * from 
iceberg_catalog.parquet_partitioned_one_column where t_int is null order by 
t_float """
+            qt_q03 """ select * from 
iceberg_catalog.parquet_partitioned_one_column where t_int is not null order by 
t_float """
+            qt_q04 """ select * from 
iceberg_catalog.parquet_partitioned_columns order by t_float """
+            qt_q05 """ select * from 
iceberg_catalog.parquet_partitioned_columns where t_int is null order by 
t_float """
+            qt_q06 """ select * from 
iceberg_catalog.parquet_partitioned_columns where t_int is not null order by 
t_float """
+            qt_q07 """ select * from 
iceberg_catalog.parquet_partitioned_truncate_and_fields order by t_float """
+            qt_q08 """ select * from 
iceberg_catalog.parquet_partitioned_truncate_and_fields where t_int is null 
order by t_float """
+            qt_q09 """ select * from 
iceberg_catalog.parquet_partitioned_truncate_and_fields where t_int is not null 
order by t_float """
+        }
+        // test orc format
+        def q01_orc = {
+            qt_q01 """ select * from 
iceberg_catalog.orc_partitioned_one_column order by t_float """
+            qt_q02 """ select * from 
iceberg_catalog.orc_partitioned_one_column where t_int is null order by t_float 
"""
+            qt_q03 """ select * from 
iceberg_catalog.orc_partitioned_one_column where t_int is not null order by 
t_float """
+            qt_q04 """ select * from iceberg_catalog.orc_partitioned_columns 
order by t_float """
+            qt_q05 """ select * from iceberg_catalog.orc_partitioned_columns 
where t_int is null order by t_float """
+            qt_q06 """ select * from iceberg_catalog.orc_partitioned_columns 
where t_int is not null order by t_float """
+            qt_q07 """ select * from 
iceberg_catalog.orc_partitioned_truncate_and_fields order by t_float """
+            qt_q08 """ select * from 
iceberg_catalog.orc_partitioned_truncate_and_fields where t_int is null order 
by t_float """
+            qt_q09 """ select * from 
iceberg_catalog.orc_partitioned_truncate_and_fields where t_int is not null 
order by t_float """
+        }
+        sql """ use `iceberg_catalog`; """
+        q01_parquet()
+        q01_orc()
+    }
+}
+


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org
For additional commands, e-mail: commits-h...@doris.apache.org

Reply via email to