This is an automated email from the ASF dual-hosted git repository.

dataroaring pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/doris.git


The following commit(s) were added to refs/heads/master by this push:
     new b1c16b96d6 [refactor](load) move validator out of VOlapTableSink 
(#21460)
b1c16b96d6 is described below

commit b1c16b96d63c96f1f6db1a21fbc4321d99897dec
Author: Kaijie Chen <c...@apache.org>
AuthorDate: Tue Jul 4 10:16:56 2023 +0800

    [refactor](load) move validator out of VOlapTableSink (#21460)
---
 be/src/vec/sink/vtablet_sink.cpp      | 354 +----------------------------
 be/src/vec/sink/vtablet_sink.h        |  34 +--
 be/src/vec/sink/vtablet_validator.cpp | 404 ++++++++++++++++++++++++++++++++++
 be/src/vec/sink/vtablet_validator.h   |  78 +++++++
 4 files changed, 489 insertions(+), 381 deletions(-)

diff --git a/be/src/vec/sink/vtablet_sink.cpp b/be/src/vec/sink/vtablet_sink.cpp
index 39f3475563..f95c2b13c5 100644
--- a/be/src/vec/sink/vtablet_sink.cpp
+++ b/be/src/vec/sink/vtablet_sink.cpp
@@ -89,6 +89,7 @@
 #include "vec/data_types/data_type_nullable.h"
 #include "vec/exprs/vexpr.h"
 #include "vec/exprs/vexpr_context.h"
+#include "vec/sink/vtablet_validator.h"
 
 namespace doris {
 class TExpr;
@@ -1051,6 +1052,7 @@ Status VOlapTableSink::prepare(RuntimeState* state) {
         return Status::InternalError("unknown destination tuple descriptor");
     }
 
+    _validator = std::make_unique<OlapTableValidator>(_output_tuple_desc);
     _output_row_desc = _pool->add(new RowDescriptor(_output_tuple_desc, 
false));
 
     // add all counter
@@ -1359,15 +1361,15 @@ Status VOlapTableSink::send(RuntimeState* state, 
vectorized::Block* input_block,
         SCOPED_RAW_TIMER(&_validate_data_ns);
         _filter_bitmap.Reset(block.rows());
         bool stop_processing = false;
-        RETURN_IF_ERROR(
-                _validate_data(state, &block, &_filter_bitmap, &filtered_rows, 
&stop_processing));
+        RETURN_IF_ERROR(_validator->validate_data(state, &block, 
&_filter_bitmap, &filtered_rows,
+                                                  &stop_processing));
         _number_filtered_rows += filtered_rows;
         if (stop_processing) {
             // should be returned after updating "_number_filtered_rows", to 
make sure that load job can be cancelled
             // because of "data unqualified"
             return Status::EndOfFile("Encountered unqualified data, stop 
processing");
         }
-        _convert_to_dest_desc_block(&block);
+        _validator->convert_to_dest_desc_block(&block);
     }
 
     SCOPED_RAW_TIMER(&_send_data_ns);
@@ -1595,351 +1597,5 @@ Status VOlapTableSink::close(RuntimeState* state, 
Status exec_status) {
     return status;
 }
 
-template <bool is_min>
-DecimalV2Value VOlapTableSink::_get_decimalv2_min_or_max(const TypeDescriptor& 
type) {
-    std::map<std::pair<int, int>, DecimalV2Value>* pmap = nullptr;
-    if constexpr (is_min) {
-        pmap = &_min_decimalv2_val;
-    } else {
-        pmap = &_max_decimalv2_val;
-    }
-
-    // found
-    auto iter = pmap->find({type.precision, type.scale});
-    if (iter != pmap->end()) {
-        return iter->second;
-    }
-
-    // save min or max DecimalV2Value for next time
-    DecimalV2Value value;
-    if constexpr (is_min) {
-        value.to_min_decimal(type.precision, type.scale);
-    } else {
-        value.to_max_decimal(type.precision, type.scale);
-    }
-    pmap->emplace(std::pair<int, int> {type.precision, type.scale}, value);
-    return value;
-}
-
-template <typename DecimalType, bool IsMin>
-DecimalType VOlapTableSink::_get_decimalv3_min_or_max(const TypeDescriptor& 
type) {
-    std::map<int, typename DecimalType::NativeType>* pmap = nullptr;
-    if constexpr (std::is_same_v<DecimalType, vectorized::Decimal32>) {
-        pmap = IsMin ? &_min_decimal32_val : &_max_decimal32_val;
-    } else if constexpr (std::is_same_v<DecimalType, vectorized::Decimal64>) {
-        pmap = IsMin ? &_min_decimal64_val : &_max_decimal64_val;
-    } else {
-        pmap = IsMin ? &_min_decimal128_val : &_max_decimal128_val;
-    }
-
-    // found
-    auto iter = pmap->find(type.precision);
-    if (iter != pmap->end()) {
-        return iter->second;
-    }
-
-    typename DecimalType::NativeType value;
-    if constexpr (IsMin) {
-        value = vectorized::min_decimal_value<DecimalType>(type.precision);
-    } else {
-        value = vectorized::max_decimal_value<DecimalType>(type.precision);
-    }
-    pmap->emplace(type.precision, value);
-    return value;
-}
-
-Status VOlapTableSink::_validate_column(RuntimeState* state, const 
TypeDescriptor& type,
-                                        bool is_nullable, 
vectorized::ColumnPtr column,
-                                        size_t slot_index, Bitmap* 
filter_bitmap,
-                                        bool* stop_processing, 
fmt::memory_buffer& error_prefix,
-                                        vectorized::IColumn::Permutation* 
rows) {
-    DCHECK((rows == nullptr) || (rows->size() == column->size()));
-    fmt::memory_buffer error_msg;
-    auto set_invalid_and_append_error_msg = [&](int row) {
-        filter_bitmap->Set(row, true);
-        auto ret = state->append_error_msg_to_file([]() -> std::string { 
return ""; },
-                                                   [&error_prefix, 
&error_msg]() -> std::string {
-                                                       return 
fmt::to_string(error_prefix) +
-                                                              
fmt::to_string(error_msg);
-                                                   },
-                                                   stop_processing);
-        error_msg.clear();
-        return ret;
-    };
-
-    auto column_ptr = 
vectorized::check_and_get_column<vectorized::ColumnNullable>(*column);
-    auto& real_column_ptr = column_ptr == nullptr ? column : 
(column_ptr->get_nested_column_ptr());
-    auto null_map = column_ptr == nullptr ? nullptr : 
column_ptr->get_null_map_data().data();
-    auto need_to_validate = [&null_map, &filter_bitmap](size_t j, size_t row) {
-        return !filter_bitmap->Get(row) && (null_map == nullptr || null_map[j] 
== 0);
-    };
-
-    ssize_t last_invalid_row = -1;
-    switch (type.type) {
-    case TYPE_CHAR:
-    case TYPE_VARCHAR:
-    case TYPE_STRING: {
-        const auto column_string =
-                assert_cast<const 
vectorized::ColumnString*>(real_column_ptr.get());
-
-        size_t limit = config::string_type_length_soft_limit_bytes;
-        // when type.len is negative, std::min will return overflow value, so 
we need to check it
-        if (type.len > 0) {
-            limit = std::min(config::string_type_length_soft_limit_bytes, 
type.len);
-        }
-        for (size_t j = 0; j < column->size(); ++j) {
-            auto row = rows ? (*rows)[j] : j;
-            if (row == last_invalid_row) {
-                continue;
-            }
-            if (need_to_validate(j, row)) {
-                auto str_val = column_string->get_data_at(j);
-                bool invalid = str_val.size > limit;
-                if (invalid) {
-                    last_invalid_row = row;
-                    if (str_val.size > type.len) {
-                        fmt::format_to(error_msg, "{}",
-                                       "the length of input is too long than 
schema. ");
-                        fmt::format_to(error_msg, "first 32 bytes of input 
str: [{}] ",
-                                       str_val.to_prefix(32));
-                        fmt::format_to(error_msg, "schema length: {}; ", 
type.len);
-                        fmt::format_to(error_msg, "actual length: {}; ", 
str_val.size);
-                    } else if (str_val.size > limit) {
-                        fmt::format_to(error_msg, "{}",
-                                       "the length of input string is too long 
than vec schema. ");
-                        fmt::format_to(error_msg, "first 32 bytes of input 
str: [{}] ",
-                                       str_val.to_prefix(32));
-                        fmt::format_to(error_msg, "schema length: {}; ", 
type.len);
-                        fmt::format_to(error_msg, "limit length: {}; ", limit);
-                        fmt::format_to(error_msg, "actual length: {}; ", 
str_val.size);
-                    }
-                    RETURN_IF_ERROR(set_invalid_and_append_error_msg(row));
-                }
-            }
-        }
-        break;
-    }
-    case TYPE_JSONB: {
-        const auto column_string =
-                assert_cast<const 
vectorized::ColumnString*>(real_column_ptr.get());
-        for (size_t j = 0; j < column->size(); ++j) {
-            if (!filter_bitmap->Get(j)) {
-                if (is_nullable && column_ptr && column_ptr->is_null_at(j)) {
-                    continue;
-                }
-                auto str_val = column_string->get_data_at(j);
-                bool invalid = str_val.size == 0;
-                if (invalid) {
-                    error_msg.clear();
-                    fmt::format_to(error_msg, "{}", "jsonb with size 0 is 
invalid");
-                    RETURN_IF_ERROR(set_invalid_and_append_error_msg(j));
-                }
-            }
-        }
-        break;
-    }
-    case TYPE_DECIMALV2: {
-        auto column_decimal = 
const_cast<vectorized::ColumnDecimal<vectorized::Decimal128>*>(
-                assert_cast<const 
vectorized::ColumnDecimal<vectorized::Decimal128>*>(
-                        real_column_ptr.get()));
-        const auto& max_decimalv2 = _get_decimalv2_min_or_max<false>(type);
-        const auto& min_decimalv2 = _get_decimalv2_min_or_max<true>(type);
-        for (size_t j = 0; j < column->size(); ++j) {
-            auto row = rows ? (*rows)[j] : j;
-            if (row == last_invalid_row) {
-                continue;
-            }
-            if (need_to_validate(j, row)) {
-                auto dec_val = binary_cast<vectorized::Int128, DecimalV2Value>(
-                        column_decimal->get_data()[j]);
-                bool invalid = false;
-
-                if (dec_val.greater_than_scale(type.scale)) {
-                    auto code = dec_val.round(&dec_val, type.scale, HALF_UP);
-                    column_decimal->get_data()[j] = dec_val.value();
-
-                    if (code != E_DEC_OK) {
-                        fmt::format_to(error_msg, "round one decimal 
failed.value={}; ",
-                                       dec_val.to_string());
-                        invalid = true;
-                    }
-                }
-                if (dec_val > max_decimalv2 || dec_val < min_decimalv2) {
-                    fmt::format_to(error_msg, "{}", "decimal value is not 
valid for definition");
-                    fmt::format_to(error_msg, ", value={}", 
dec_val.to_string());
-                    fmt::format_to(error_msg, ", precision={}, scale={}", 
type.precision,
-                                   type.scale);
-                    fmt::format_to(error_msg, ", min={}, max={}; ", 
min_decimalv2.to_string(),
-                                   max_decimalv2.to_string());
-                    invalid = true;
-                }
-
-                if (invalid) {
-                    last_invalid_row = row;
-                    RETURN_IF_ERROR(set_invalid_and_append_error_msg(row));
-                }
-            }
-        }
-        break;
-    }
-    case TYPE_DECIMAL32: {
-#define CHECK_VALIDATION_FOR_DECIMALV3(ColumnDecimalType, DecimalType)         
                    \
-    auto column_decimal = 
const_cast<vectorized::ColumnDecimal<vectorized::ColumnDecimalType>*>(   \
-            assert_cast<const 
vectorized::ColumnDecimal<vectorized::ColumnDecimalType>*>(          \
-                    real_column_ptr.get()));                                   
                    \
-    const auto& max_decimal = 
_get_decimalv3_min_or_max<vectorized::DecimalType, false>(type);     \
-    const auto& min_decimal = 
_get_decimalv3_min_or_max<vectorized::DecimalType, true>(type);      \
-    for (size_t j = 0; j < column->size(); ++j) {                              
                    \
-        auto row = rows ? (*rows)[j] : j;                                      
                    \
-        if (row == last_invalid_row) {                                         
                    \
-            continue;                                                          
                    \
-        }                                                                      
                    \
-        if (need_to_validate(j, row)) {                                        
                    \
-            auto dec_val = column_decimal->get_data()[j];                      
                    \
-            bool invalid = false;                                              
                    \
-            if (dec_val > max_decimal || dec_val < min_decimal) {              
                    \
-                fmt::format_to(error_msg, "{}", "decimal value is not valid 
for definition");      \
-                fmt::format_to(error_msg, ", value={}", dec_val);              
                    \
-                fmt::format_to(error_msg, ", precision={}, scale={}", 
type.precision, type.scale); \
-                fmt::format_to(error_msg, ", min={}, max={}; ", min_decimal, 
max_decimal);         \
-                invalid = true;                                                
                    \
-            }                                                                  
                    \
-            if (invalid) {                                                     
                    \
-                last_invalid_row = row;                                        
                    \
-                RETURN_IF_ERROR(set_invalid_and_append_error_msg(row));        
                    \
-            }                                                                  
                    \
-        }                                                                      
                    \
-    }
-        CHECK_VALIDATION_FOR_DECIMALV3(Decimal32, Decimal32);
-        break;
-    }
-    case TYPE_DECIMAL64: {
-        CHECK_VALIDATION_FOR_DECIMALV3(Decimal64, Decimal64);
-        break;
-    }
-    case TYPE_DECIMAL128I: {
-        CHECK_VALIDATION_FOR_DECIMALV3(Decimal128I, Decimal128);
-        break;
-    }
-    case TYPE_ARRAY: {
-        const auto column_array =
-                assert_cast<const 
vectorized::ColumnArray*>(real_column_ptr.get());
-        DCHECK(type.children.size() == 1);
-        auto nested_type = type.children[0];
-        const auto& offsets = column_array->get_offsets();
-        vectorized::IColumn::Permutation permutation(offsets.back());
-        for (size_t r = 0; r < offsets.size(); ++r) {
-            for (size_t c = offsets[r - 1]; c < offsets[r]; ++c) {
-                permutation[c] = rows ? (*rows)[r] : r;
-            }
-        }
-        fmt::format_to(error_prefix, "ARRAY type failed: ");
-        RETURN_IF_ERROR(_validate_column(state, nested_type, 
type.contains_nulls[0],
-                                         column_array->get_data_ptr(), 
slot_index, filter_bitmap,
-                                         stop_processing, error_prefix, 
&permutation));
-        break;
-    }
-    case TYPE_MAP: {
-        const auto column_map = assert_cast<const 
vectorized::ColumnMap*>(real_column_ptr.get());
-        DCHECK(type.children.size() == 2);
-        auto key_type = type.children[0];
-        auto val_type = type.children[1];
-        const auto& offsets = column_map->get_offsets();
-        vectorized::IColumn::Permutation permutation(offsets.back());
-        for (size_t r = 0; r < offsets.size(); ++r) {
-            for (size_t c = offsets[r - 1]; c < offsets[r]; ++c) {
-                permutation[c] = rows ? (*rows)[r] : r;
-            }
-        }
-        fmt::format_to(error_prefix, "MAP type failed: ");
-        RETURN_IF_ERROR(_validate_column(state, key_type, 
type.contains_nulls[0],
-                                         column_map->get_keys_ptr(), 
slot_index, filter_bitmap,
-                                         stop_processing, error_prefix, 
&permutation));
-        RETURN_IF_ERROR(_validate_column(state, val_type, 
type.contains_nulls[1],
-                                         column_map->get_values_ptr(), 
slot_index, filter_bitmap,
-                                         stop_processing, error_prefix, 
&permutation));
-        break;
-    }
-    case TYPE_STRUCT: {
-        const auto column_struct =
-                assert_cast<const 
vectorized::ColumnStruct*>(real_column_ptr.get());
-        DCHECK(type.children.size() == column_struct->tuple_size());
-        fmt::format_to(error_prefix, "STRUCT type failed: ");
-        for (size_t sc = 0; sc < column_struct->tuple_size(); ++sc) {
-            RETURN_IF_ERROR(_validate_column(state, type.children[sc], 
type.contains_nulls[sc],
-                                             
column_struct->get_column_ptr(sc), slot_index,
-                                             filter_bitmap, stop_processing, 
error_prefix));
-        }
-        break;
-    }
-    default:
-        break;
-    }
-
-    // Dispose the column should do not contain the NULL value
-    // Only two case:
-    // 1. column is nullable but the desc is not nullable
-    // 2. desc->type is BITMAP
-    if ((!is_nullable || type == TYPE_OBJECT) && column_ptr) {
-        for (int j = 0; j < column->size(); ++j) {
-            auto row = rows ? (*rows)[j] : j;
-            if (row == last_invalid_row) {
-                continue;
-            }
-            if (null_map[j] && !filter_bitmap->Get(row)) {
-                fmt::format_to(error_msg, "null value for not null column, 
type={}",
-                               type.debug_string());
-                last_invalid_row = row;
-                RETURN_IF_ERROR(set_invalid_and_append_error_msg(row));
-            }
-        }
-    }
-
-    return Status::OK();
-}
-
-Status VOlapTableSink::_validate_data(RuntimeState* state, vectorized::Block* 
block,
-                                      Bitmap* filter_bitmap, int* 
filtered_rows,
-                                      bool* stop_processing) {
-    for (int i = 0; i < _output_tuple_desc->slots().size(); ++i) {
-        SlotDescriptor* desc = _output_tuple_desc->slots()[i];
-        block->get_by_position(i).column =
-                
block->get_by_position(i).column->convert_to_full_column_if_const();
-        const auto& column = block->get_by_position(i).column;
-
-        fmt::memory_buffer error_prefix;
-        fmt::format_to(error_prefix, "column_name[{}], ", desc->col_name());
-        RETURN_IF_ERROR(_validate_column(state, desc->type(), 
desc->is_nullable(), column, i,
-                                         filter_bitmap, stop_processing, 
error_prefix));
-    }
-
-    *filtered_rows = 0;
-    for (int i = 0; i < block->rows(); ++i) {
-        *filtered_rows += filter_bitmap->Get(i);
-    }
-    return Status::OK();
-}
-
-void VOlapTableSink::_convert_to_dest_desc_block(doris::vectorized::Block* 
block) {
-    for (int i = 0; i < _output_tuple_desc->slots().size() && i < 
block->columns(); ++i) {
-        SlotDescriptor* desc = _output_tuple_desc->slots()[i];
-        if (desc->is_nullable() != 
block->get_by_position(i).type->is_nullable()) {
-            if (desc->is_nullable()) {
-                block->get_by_position(i).type =
-                        
vectorized::make_nullable(block->get_by_position(i).type);
-                block->get_by_position(i).column =
-                        
vectorized::make_nullable(block->get_by_position(i).column);
-            } else {
-                block->get_by_position(i).type = assert_cast<const 
vectorized::DataTypeNullable&>(
-                                                         
*block->get_by_position(i).type)
-                                                         .get_nested_type();
-                block->get_by_position(i).column = assert_cast<const 
vectorized::ColumnNullable&>(
-                                                           
*block->get_by_position(i).column)
-                                                           
.get_nested_column_ptr();
-            }
-        }
-    }
-}
-
 } // namespace stream_load
 } // namespace doris
diff --git a/be/src/vec/sink/vtablet_sink.h b/be/src/vec/sink/vtablet_sink.h
index ebd7c40a81..9c608cda43 100644
--- a/be/src/vec/sink/vtablet_sink.h
+++ b/be/src/vec/sink/vtablet_sink.h
@@ -82,6 +82,7 @@ class RefCountClosure;
 
 namespace stream_load {
 
+class OlapTableValidator;
 class OpenPartitionClosure;
 
 // The counter of add_batch rpc of a single node
@@ -494,28 +495,6 @@ private:
                                       ChannelDistributionPayload& 
channel_to_payload,
                                       size_t num_rows, int32_t filtered_rows);
 
-    // make input data valid for OLAP table
-    // return number of invalid/filtered rows.
-    // invalid row number is set in Bitmap
-    // set stop_processing if we want to stop the whole process now.
-    Status _validate_data(RuntimeState* state, vectorized::Block* block, 
Bitmap* filter_bitmap,
-                          int* filtered_rows, bool* stop_processing);
-
-    template <bool is_min>
-    DecimalV2Value _get_decimalv2_min_or_max(const TypeDescriptor& type);
-
-    template <typename DecimalType, bool IsMin>
-    DecimalType _get_decimalv3_min_or_max(const TypeDescriptor& type);
-
-    Status _validate_column(RuntimeState* state, const TypeDescriptor& type, 
bool is_nullable,
-                            vectorized::ColumnPtr column, size_t slot_index, 
Bitmap* filter_bitmap,
-                            bool* stop_processing, fmt::memory_buffer& 
error_prefix,
-                            vectorized::IColumn::Permutation* rows = nullptr);
-
-    // some output column of output expr may have different nullable property 
with dest slot desc
-    // so here need to do the convert operation
-    void _convert_to_dest_desc_block(vectorized::Block* block);
-
     Status find_tablet(RuntimeState* state, vectorized::Block* block, int 
row_index,
                        const VOlapTablePartition** partition, uint32_t& 
tablet_index,
                        bool& stop_processing, bool& is_continue);
@@ -565,16 +544,7 @@ private:
     bthread_t _sender_thread = 0;
     std::unique_ptr<ThreadPoolToken> _send_batch_thread_pool_token;
 
-    std::map<std::pair<int, int>, DecimalV2Value> _max_decimalv2_val;
-    std::map<std::pair<int, int>, DecimalV2Value> _min_decimalv2_val;
-
-    std::map<int, int32_t> _max_decimal32_val;
-    std::map<int, int32_t> _min_decimal32_val;
-    std::map<int, int64_t> _max_decimal64_val;
-    std::map<int, int64_t> _min_decimal64_val;
-    std::map<int, int128_t> _max_decimal128_val;
-    std::map<int, int128_t> _min_decimal128_val;
-
+    std::unique_ptr<OlapTableValidator> _validator;
     // Stats for this
     int64_t _validate_data_ns = 0;
     int64_t _send_data_ns = 0;
diff --git a/be/src/vec/sink/vtablet_validator.cpp 
b/be/src/vec/sink/vtablet_validator.cpp
new file mode 100644
index 0000000000..39cc92f0f4
--- /dev/null
+++ b/be/src/vec/sink/vtablet_validator.cpp
@@ -0,0 +1,404 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#include "vec/sink/vtablet_validator.h"
+
+#include <fmt/format.h>
+#include <google/protobuf/stubs/common.h>
+
+#include <algorithm>
+#include <memory>
+#include <string>
+#include <unordered_map>
+#include <utility>
+
+// IWYU pragma: no_include <opentelemetry/common/threadlocal.h>
+#include "common/compiler_util.h" // IWYU pragma: keep
+#include "common/status.h"
+#include "runtime/descriptors.h"
+#include "runtime/runtime_state.h"
+#include "service/brpc.h"
+#include "util/binary_cast.hpp"
+#include "util/brpc_client_cache.h"
+#include "util/thread.h"
+#include "vec/columns/column.h"
+#include "vec/columns/column_array.h"
+#include "vec/columns/column_const.h"
+#include "vec/columns/column_decimal.h"
+#include "vec/columns/column_map.h"
+#include "vec/columns/column_nullable.h"
+#include "vec/columns/column_string.h"
+#include "vec/columns/column_struct.h"
+#include "vec/common/assert_cast.h"
+#include "vec/core/block.h"
+#include "vec/core/types.h"
+#include "vec/data_types/data_type_decimal.h"
+#include "vec/data_types/data_type_nullable.h"
+#include "vec/exprs/vexpr.h"
+#include "vec/exprs/vexpr_context.h"
+
+namespace doris {
+namespace stream_load {
+
+template <bool is_min>
+DecimalV2Value OlapTableValidator::_get_decimalv2_min_or_max(const 
TypeDescriptor& type) {
+    std::map<std::pair<int, int>, DecimalV2Value>* pmap;
+    if constexpr (is_min) {
+        pmap = &_min_decimalv2_val;
+    } else {
+        pmap = &_max_decimalv2_val;
+    }
+
+    // found
+    auto iter = pmap->find({type.precision, type.scale});
+    if (iter != pmap->end()) {
+        return iter->second;
+    }
+
+    // save min or max DecimalV2Value for next time
+    DecimalV2Value value;
+    if constexpr (is_min) {
+        value.to_min_decimal(type.precision, type.scale);
+    } else {
+        value.to_max_decimal(type.precision, type.scale);
+    }
+    pmap->emplace(std::pair<int, int> {type.precision, type.scale}, value);
+    return value;
+}
+
+template <typename DecimalType, bool IsMin>
+DecimalType OlapTableValidator::_get_decimalv3_min_or_max(const 
TypeDescriptor& type) {
+    std::map<int, typename DecimalType::NativeType>* pmap;
+    if constexpr (std::is_same_v<DecimalType, vectorized::Decimal32>) {
+        pmap = IsMin ? &_min_decimal32_val : &_max_decimal32_val;
+    } else if constexpr (std::is_same_v<DecimalType, vectorized::Decimal64>) {
+        pmap = IsMin ? &_min_decimal64_val : &_max_decimal64_val;
+    } else {
+        pmap = IsMin ? &_min_decimal128_val : &_max_decimal128_val;
+    }
+
+    // found
+    auto iter = pmap->find(type.precision);
+    if (iter != pmap->end()) {
+        return iter->second;
+    }
+
+    typename DecimalType::NativeType value;
+    if constexpr (IsMin) {
+        value = vectorized::min_decimal_value<DecimalType>(type.precision);
+    } else {
+        value = vectorized::max_decimal_value<DecimalType>(type.precision);
+    }
+    pmap->emplace(type.precision, value);
+    return value;
+}
+
+Status OlapTableValidator::_validate_column(RuntimeState* state, const 
TypeDescriptor& type,
+                                            bool is_nullable, 
vectorized::ColumnPtr column,
+                                            size_t slot_index, Bitmap* 
filter_bitmap,
+                                            bool* stop_processing, 
fmt::memory_buffer& error_prefix,
+                                            vectorized::IColumn::Permutation* 
rows) {
+    DCHECK((rows == nullptr) || (rows->size() == column->size()));
+    fmt::memory_buffer error_msg;
+    auto set_invalid_and_append_error_msg = [&](int row) {
+        filter_bitmap->Set(row, true);
+        auto ret = state->append_error_msg_to_file([]() -> std::string { 
return ""; },
+                                                   [&error_prefix, 
&error_msg]() -> std::string {
+                                                       return 
fmt::to_string(error_prefix) +
+                                                              
fmt::to_string(error_msg);
+                                                   },
+                                                   stop_processing);
+        error_msg.clear();
+        return ret;
+    };
+
+    auto column_ptr = 
vectorized::check_and_get_column<vectorized::ColumnNullable>(*column);
+    auto& real_column_ptr = column_ptr == nullptr ? column : 
(column_ptr->get_nested_column_ptr());
+    auto null_map = column_ptr == nullptr ? nullptr : 
column_ptr->get_null_map_data().data();
+    auto need_to_validate = [&null_map, &filter_bitmap](size_t j, size_t row) {
+        return !filter_bitmap->Get(row) && (null_map == nullptr || null_map[j] 
== 0);
+    };
+
+    ssize_t last_invalid_row = -1;
+    switch (type.type) {
+    case TYPE_CHAR:
+    case TYPE_VARCHAR:
+    case TYPE_STRING: {
+        const auto column_string =
+                assert_cast<const 
vectorized::ColumnString*>(real_column_ptr.get());
+
+        size_t limit = config::string_type_length_soft_limit_bytes;
+        // when type.len is negative, std::min will return overflow value, so 
we need to check it
+        if (type.len > 0) {
+            limit = std::min(config::string_type_length_soft_limit_bytes, 
type.len);
+        }
+        for (size_t j = 0; j < column->size(); ++j) {
+            auto row = rows ? (*rows)[j] : j;
+            if (row == last_invalid_row) {
+                continue;
+            }
+            if (need_to_validate(j, row)) {
+                auto str_val = column_string->get_data_at(j);
+                bool invalid = str_val.size > limit;
+                if (invalid) {
+                    last_invalid_row = row;
+                    if (str_val.size > type.len) {
+                        fmt::format_to(error_msg, "{}",
+                                       "the length of input is too long than 
schema. ");
+                        fmt::format_to(error_msg, "first 32 bytes of input 
str: [{}] ",
+                                       str_val.to_prefix(32));
+                        fmt::format_to(error_msg, "schema length: {}; ", 
type.len);
+                        fmt::format_to(error_msg, "actual length: {}; ", 
str_val.size);
+                    } else if (str_val.size > limit) {
+                        fmt::format_to(error_msg, "{}",
+                                       "the length of input string is too long 
than vec schema. ");
+                        fmt::format_to(error_msg, "first 32 bytes of input 
str: [{}] ",
+                                       str_val.to_prefix(32));
+                        fmt::format_to(error_msg, "schema length: {}; ", 
type.len);
+                        fmt::format_to(error_msg, "limit length: {}; ", limit);
+                        fmt::format_to(error_msg, "actual length: {}; ", 
str_val.size);
+                    }
+                    RETURN_IF_ERROR(set_invalid_and_append_error_msg(row));
+                }
+            }
+        }
+        break;
+    }
+    case TYPE_JSONB: {
+        const auto column_string =
+                assert_cast<const 
vectorized::ColumnString*>(real_column_ptr.get());
+        for (size_t j = 0; j < column->size(); ++j) {
+            if (!filter_bitmap->Get(j)) {
+                if (is_nullable && column_ptr && column_ptr->is_null_at(j)) {
+                    continue;
+                }
+                auto str_val = column_string->get_data_at(j);
+                bool invalid = str_val.size == 0;
+                if (invalid) {
+                    error_msg.clear();
+                    fmt::format_to(error_msg, "{}", "jsonb with size 0 is 
invalid");
+                    RETURN_IF_ERROR(set_invalid_and_append_error_msg(j));
+                }
+            }
+        }
+        break;
+    }
+    case TYPE_DECIMALV2: {
+        auto column_decimal = 
const_cast<vectorized::ColumnDecimal<vectorized::Decimal128>*>(
+                assert_cast<const 
vectorized::ColumnDecimal<vectorized::Decimal128>*>(
+                        real_column_ptr.get()));
+        const auto& max_decimalv2 = _get_decimalv2_min_or_max<false>(type);
+        const auto& min_decimalv2 = _get_decimalv2_min_or_max<true>(type);
+        for (size_t j = 0; j < column->size(); ++j) {
+            auto row = rows ? (*rows)[j] : j;
+            if (row == last_invalid_row) {
+                continue;
+            }
+            if (need_to_validate(j, row)) {
+                auto dec_val = binary_cast<vectorized::Int128, DecimalV2Value>(
+                        column_decimal->get_data()[j]);
+                bool invalid = false;
+
+                if (dec_val.greater_than_scale(type.scale)) {
+                    auto code = dec_val.round(&dec_val, type.scale, HALF_UP);
+                    column_decimal->get_data()[j] = dec_val.value();
+
+                    if (code != E_DEC_OK) {
+                        fmt::format_to(error_msg, "round one decimal 
failed.value={}; ",
+                                       dec_val.to_string());
+                        invalid = true;
+                    }
+                }
+                if (dec_val > max_decimalv2 || dec_val < min_decimalv2) {
+                    fmt::format_to(error_msg, "{}", "decimal value is not 
valid for definition");
+                    fmt::format_to(error_msg, ", value={}", 
dec_val.to_string());
+                    fmt::format_to(error_msg, ", precision={}, scale={}", 
type.precision,
+                                   type.scale);
+                    fmt::format_to(error_msg, ", min={}, max={}; ", 
min_decimalv2.to_string(),
+                                   max_decimalv2.to_string());
+                    invalid = true;
+                }
+
+                if (invalid) {
+                    last_invalid_row = row;
+                    RETURN_IF_ERROR(set_invalid_and_append_error_msg(row));
+                }
+            }
+        }
+        break;
+    }
+    case TYPE_DECIMAL32: {
+#define CHECK_VALIDATION_FOR_DECIMALV3(ColumnDecimalType, DecimalType)         
                    \
+    auto column_decimal = 
const_cast<vectorized::ColumnDecimal<vectorized::ColumnDecimalType>*>(   \
+            assert_cast<const 
vectorized::ColumnDecimal<vectorized::ColumnDecimalType>*>(          \
+                    real_column_ptr.get()));                                   
                    \
+    const auto& max_decimal = 
_get_decimalv3_min_or_max<vectorized::DecimalType, false>(type);     \
+    const auto& min_decimal = 
_get_decimalv3_min_or_max<vectorized::DecimalType, true>(type);      \
+    for (size_t j = 0; j < column->size(); ++j) {                              
                    \
+        auto row = rows ? (*rows)[j] : j;                                      
                    \
+        if (row == last_invalid_row) {                                         
                    \
+            continue;                                                          
                    \
+        }                                                                      
                    \
+        if (need_to_validate(j, row)) {                                        
                    \
+            auto dec_val = column_decimal->get_data()[j];                      
                    \
+            bool invalid = false;                                              
                    \
+            if (dec_val > max_decimal || dec_val < min_decimal) {              
                    \
+                fmt::format_to(error_msg, "{}", "decimal value is not valid 
for definition");      \
+                fmt::format_to(error_msg, ", value={}", dec_val);              
                    \
+                fmt::format_to(error_msg, ", precision={}, scale={}", 
type.precision, type.scale); \
+                fmt::format_to(error_msg, ", min={}, max={}; ", min_decimal, 
max_decimal);         \
+                invalid = true;                                                
                    \
+            }                                                                  
                    \
+            if (invalid) {                                                     
                    \
+                last_invalid_row = row;                                        
                    \
+                RETURN_IF_ERROR(set_invalid_and_append_error_msg(row));        
                    \
+            }                                                                  
                    \
+        }                                                                      
                    \
+    }
+        CHECK_VALIDATION_FOR_DECIMALV3(Decimal32, Decimal32);
+        break;
+    }
+    case TYPE_DECIMAL64: {
+        CHECK_VALIDATION_FOR_DECIMALV3(Decimal64, Decimal64);
+        break;
+    }
+    case TYPE_DECIMAL128I: {
+        CHECK_VALIDATION_FOR_DECIMALV3(Decimal128I, Decimal128);
+        break;
+    }
+    case TYPE_ARRAY: {
+        const auto column_array =
+                assert_cast<const 
vectorized::ColumnArray*>(real_column_ptr.get());
+        DCHECK(type.children.size() == 1);
+        auto nested_type = type.children[0];
+        const auto& offsets = column_array->get_offsets();
+        vectorized::IColumn::Permutation permutation(offsets.back());
+        for (size_t r = 0; r < offsets.size(); ++r) {
+            for (size_t c = offsets[r - 1]; c < offsets[r]; ++c) {
+                permutation[c] = rows ? (*rows)[r] : r;
+            }
+        }
+        fmt::format_to(error_prefix, "ARRAY type failed: ");
+        RETURN_IF_ERROR(_validate_column(state, nested_type, 
type.contains_nulls[0],
+                                         column_array->get_data_ptr(), 
slot_index, filter_bitmap,
+                                         stop_processing, error_prefix, 
&permutation));
+        break;
+    }
+    case TYPE_MAP: {
+        const auto column_map = assert_cast<const 
vectorized::ColumnMap*>(real_column_ptr.get());
+        DCHECK(type.children.size() == 2);
+        auto key_type = type.children[0];
+        auto val_type = type.children[1];
+        const auto& offsets = column_map->get_offsets();
+        vectorized::IColumn::Permutation permutation(offsets.back());
+        for (size_t r = 0; r < offsets.size(); ++r) {
+            for (size_t c = offsets[r - 1]; c < offsets[r]; ++c) {
+                permutation[c] = rows ? (*rows)[r] : r;
+            }
+        }
+        fmt::format_to(error_prefix, "MAP type failed: ");
+        RETURN_IF_ERROR(_validate_column(state, key_type, 
type.contains_nulls[0],
+                                         column_map->get_keys_ptr(), 
slot_index, filter_bitmap,
+                                         stop_processing, error_prefix, 
&permutation));
+        RETURN_IF_ERROR(_validate_column(state, val_type, 
type.contains_nulls[1],
+                                         column_map->get_values_ptr(), 
slot_index, filter_bitmap,
+                                         stop_processing, error_prefix, 
&permutation));
+        break;
+    }
+    case TYPE_STRUCT: {
+        const auto column_struct =
+                assert_cast<const 
vectorized::ColumnStruct*>(real_column_ptr.get());
+        DCHECK(type.children.size() == column_struct->tuple_size());
+        fmt::format_to(error_prefix, "STRUCT type failed: ");
+        for (size_t sc = 0; sc < column_struct->tuple_size(); ++sc) {
+            RETURN_IF_ERROR(_validate_column(state, type.children[sc], 
type.contains_nulls[sc],
+                                             
column_struct->get_column_ptr(sc), slot_index,
+                                             filter_bitmap, stop_processing, 
error_prefix));
+        }
+        break;
+    }
+    default:
+        break;
+    }
+
+    // Dispose the column should do not contain the NULL value
+    // Only two case:
+    // 1. column is nullable but the desc is not nullable
+    // 2. desc->type is BITMAP
+    if ((!is_nullable || type == TYPE_OBJECT) && column_ptr) {
+        for (int j = 0; j < column->size(); ++j) {
+            auto row = rows ? (*rows)[j] : j;
+            if (row == last_invalid_row) {
+                continue;
+            }
+            if (null_map[j] && !filter_bitmap->Get(row)) {
+                fmt::format_to(error_msg, "null value for not null column, 
type={}",
+                               type.debug_string());
+                last_invalid_row = row;
+                RETURN_IF_ERROR(set_invalid_and_append_error_msg(row));
+            }
+        }
+    }
+
+    return Status::OK();
+}
+
+Status OlapTableValidator::validate_data(RuntimeState* state, 
vectorized::Block* block,
+                                         Bitmap* filter_bitmap, int* 
filtered_rows,
+                                         bool* stop_processing) {
+    for (int i = 0; i < _output_tuple_desc->slots().size(); ++i) {
+        SlotDescriptor* desc = _output_tuple_desc->slots()[i];
+        block->get_by_position(i).column =
+                
block->get_by_position(i).column->convert_to_full_column_if_const();
+        const auto& column = block->get_by_position(i).column;
+
+        fmt::memory_buffer error_prefix;
+        fmt::format_to(error_prefix, "column_name[{}], ", desc->col_name());
+        RETURN_IF_ERROR(_validate_column(state, desc->type(), 
desc->is_nullable(), column, i,
+                                         filter_bitmap, stop_processing, 
error_prefix));
+    }
+
+    *filtered_rows = 0;
+    for (int i = 0; i < block->rows(); ++i) {
+        *filtered_rows += filter_bitmap->Get(i);
+    }
+    return Status::OK();
+}
+
+void OlapTableValidator::convert_to_dest_desc_block(doris::vectorized::Block* 
block) {
+    for (int i = 0; i < _output_tuple_desc->slots().size() && i < 
block->columns(); ++i) {
+        SlotDescriptor* desc = _output_tuple_desc->slots()[i];
+        if (desc->is_nullable() != 
block->get_by_position(i).type->is_nullable()) {
+            if (desc->is_nullable()) {
+                block->get_by_position(i).type =
+                        
vectorized::make_nullable(block->get_by_position(i).type);
+                block->get_by_position(i).column =
+                        
vectorized::make_nullable(block->get_by_position(i).column);
+            } else {
+                block->get_by_position(i).type = assert_cast<const 
vectorized::DataTypeNullable&>(
+                                                         
*block->get_by_position(i).type)
+                                                         .get_nested_type();
+                block->get_by_position(i).column = assert_cast<const 
vectorized::ColumnNullable&>(
+                                                           
*block->get_by_position(i).column)
+                                                           
.get_nested_column_ptr();
+            }
+        }
+    }
+}
+
+} // namespace stream_load
+} // namespace doris
\ No newline at end of file
diff --git a/be/src/vec/sink/vtablet_validator.h 
b/be/src/vec/sink/vtablet_validator.h
new file mode 100644
index 0000000000..d610f7680d
--- /dev/null
+++ b/be/src/vec/sink/vtablet_validator.h
@@ -0,0 +1,78 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#pragma once
+#include <stddef.h>
+#include <stdint.h>
+
+// IWYU pragma: no_include <bits/chrono.h>
+#include <chrono> // IWYU pragma: keep
+#include <map>
+
+#include "common/status.h"
+#include "runtime/decimalv2_value.h"
+#include "runtime/types.h"
+#include "util/bitmap.h"
+#include "vec/columns/column.h"
+#include "vec/core/block.h"
+
+namespace doris {
+namespace stream_load {
+
+class OlapTableValidator {
+public:
+    OlapTableValidator(TupleDescriptor* output_tuple_desc)
+            : _output_tuple_desc(output_tuple_desc) {}
+
+    // make input data valid for OLAP table
+    // return number of invalid/filtered rows.
+    // invalid row number is set in Bitmap
+    // set stop_processing if we want to stop the whole process now.
+    Status validate_data(RuntimeState* state, vectorized::Block* block, 
Bitmap* filter_bitmap,
+                         int* filtered_rows, bool* stop_processing);
+
+    // some output column of output expr may have different nullable property 
with dest slot desc
+    // so here need to do the convert operation
+    void convert_to_dest_desc_block(vectorized::Block* block);
+
+private:
+    template <bool is_min>
+    DecimalV2Value _get_decimalv2_min_or_max(const TypeDescriptor& type);
+
+    template <typename DecimalType, bool IsMin>
+    DecimalType _get_decimalv3_min_or_max(const TypeDescriptor& type);
+
+    Status _validate_column(RuntimeState* state, const TypeDescriptor& type, 
bool is_nullable,
+                            vectorized::ColumnPtr column, size_t slot_index, 
Bitmap* filter_bitmap,
+                            bool* stop_processing, fmt::memory_buffer& 
error_prefix,
+                            vectorized::IColumn::Permutation* rows = nullptr);
+
+    TupleDescriptor* _output_tuple_desc = nullptr;
+
+    std::map<std::pair<int, int>, DecimalV2Value> _max_decimalv2_val;
+    std::map<std::pair<int, int>, DecimalV2Value> _min_decimalv2_val;
+
+    std::map<int, int32_t> _max_decimal32_val;
+    std::map<int, int32_t> _min_decimal32_val;
+    std::map<int, int64_t> _max_decimal64_val;
+    std::map<int, int64_t> _min_decimal64_val;
+    std::map<int, int128_t> _max_decimal128_val;
+    std::map<int, int128_t> _min_decimal128_val;
+};
+
+} // namespace stream_load
+} // namespace doris
\ No newline at end of file


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org
For additional commands, e-mail: commits-h...@doris.apache.org


Reply via email to