This is an automated email from the ASF dual-hosted git repository. dataroaring pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/master by this push: new b1c16b96d6 [refactor](load) move validator out of VOlapTableSink (#21460) b1c16b96d6 is described below commit b1c16b96d63c96f1f6db1a21fbc4321d99897dec Author: Kaijie Chen <c...@apache.org> AuthorDate: Tue Jul 4 10:16:56 2023 +0800 [refactor](load) move validator out of VOlapTableSink (#21460) --- be/src/vec/sink/vtablet_sink.cpp | 354 +---------------------------- be/src/vec/sink/vtablet_sink.h | 34 +-- be/src/vec/sink/vtablet_validator.cpp | 404 ++++++++++++++++++++++++++++++++++ be/src/vec/sink/vtablet_validator.h | 78 +++++++ 4 files changed, 489 insertions(+), 381 deletions(-) diff --git a/be/src/vec/sink/vtablet_sink.cpp b/be/src/vec/sink/vtablet_sink.cpp index 39f3475563..f95c2b13c5 100644 --- a/be/src/vec/sink/vtablet_sink.cpp +++ b/be/src/vec/sink/vtablet_sink.cpp @@ -89,6 +89,7 @@ #include "vec/data_types/data_type_nullable.h" #include "vec/exprs/vexpr.h" #include "vec/exprs/vexpr_context.h" +#include "vec/sink/vtablet_validator.h" namespace doris { class TExpr; @@ -1051,6 +1052,7 @@ Status VOlapTableSink::prepare(RuntimeState* state) { return Status::InternalError("unknown destination tuple descriptor"); } + _validator = std::make_unique<OlapTableValidator>(_output_tuple_desc); _output_row_desc = _pool->add(new RowDescriptor(_output_tuple_desc, false)); // add all counter @@ -1359,15 +1361,15 @@ Status VOlapTableSink::send(RuntimeState* state, vectorized::Block* input_block, SCOPED_RAW_TIMER(&_validate_data_ns); _filter_bitmap.Reset(block.rows()); bool stop_processing = false; - RETURN_IF_ERROR( - _validate_data(state, &block, &_filter_bitmap, &filtered_rows, &stop_processing)); + RETURN_IF_ERROR(_validator->validate_data(state, &block, &_filter_bitmap, &filtered_rows, + &stop_processing)); _number_filtered_rows += filtered_rows; if (stop_processing) { // should be returned after updating "_number_filtered_rows", to make sure that load job can be cancelled // because of "data unqualified" return Status::EndOfFile("Encountered unqualified data, stop processing"); } - _convert_to_dest_desc_block(&block); + _validator->convert_to_dest_desc_block(&block); } SCOPED_RAW_TIMER(&_send_data_ns); @@ -1595,351 +1597,5 @@ Status VOlapTableSink::close(RuntimeState* state, Status exec_status) { return status; } -template <bool is_min> -DecimalV2Value VOlapTableSink::_get_decimalv2_min_or_max(const TypeDescriptor& type) { - std::map<std::pair<int, int>, DecimalV2Value>* pmap = nullptr; - if constexpr (is_min) { - pmap = &_min_decimalv2_val; - } else { - pmap = &_max_decimalv2_val; - } - - // found - auto iter = pmap->find({type.precision, type.scale}); - if (iter != pmap->end()) { - return iter->second; - } - - // save min or max DecimalV2Value for next time - DecimalV2Value value; - if constexpr (is_min) { - value.to_min_decimal(type.precision, type.scale); - } else { - value.to_max_decimal(type.precision, type.scale); - } - pmap->emplace(std::pair<int, int> {type.precision, type.scale}, value); - return value; -} - -template <typename DecimalType, bool IsMin> -DecimalType VOlapTableSink::_get_decimalv3_min_or_max(const TypeDescriptor& type) { - std::map<int, typename DecimalType::NativeType>* pmap = nullptr; - if constexpr (std::is_same_v<DecimalType, vectorized::Decimal32>) { - pmap = IsMin ? &_min_decimal32_val : &_max_decimal32_val; - } else if constexpr (std::is_same_v<DecimalType, vectorized::Decimal64>) { - pmap = IsMin ? &_min_decimal64_val : &_max_decimal64_val; - } else { - pmap = IsMin ? &_min_decimal128_val : &_max_decimal128_val; - } - - // found - auto iter = pmap->find(type.precision); - if (iter != pmap->end()) { - return iter->second; - } - - typename DecimalType::NativeType value; - if constexpr (IsMin) { - value = vectorized::min_decimal_value<DecimalType>(type.precision); - } else { - value = vectorized::max_decimal_value<DecimalType>(type.precision); - } - pmap->emplace(type.precision, value); - return value; -} - -Status VOlapTableSink::_validate_column(RuntimeState* state, const TypeDescriptor& type, - bool is_nullable, vectorized::ColumnPtr column, - size_t slot_index, Bitmap* filter_bitmap, - bool* stop_processing, fmt::memory_buffer& error_prefix, - vectorized::IColumn::Permutation* rows) { - DCHECK((rows == nullptr) || (rows->size() == column->size())); - fmt::memory_buffer error_msg; - auto set_invalid_and_append_error_msg = [&](int row) { - filter_bitmap->Set(row, true); - auto ret = state->append_error_msg_to_file([]() -> std::string { return ""; }, - [&error_prefix, &error_msg]() -> std::string { - return fmt::to_string(error_prefix) + - fmt::to_string(error_msg); - }, - stop_processing); - error_msg.clear(); - return ret; - }; - - auto column_ptr = vectorized::check_and_get_column<vectorized::ColumnNullable>(*column); - auto& real_column_ptr = column_ptr == nullptr ? column : (column_ptr->get_nested_column_ptr()); - auto null_map = column_ptr == nullptr ? nullptr : column_ptr->get_null_map_data().data(); - auto need_to_validate = [&null_map, &filter_bitmap](size_t j, size_t row) { - return !filter_bitmap->Get(row) && (null_map == nullptr || null_map[j] == 0); - }; - - ssize_t last_invalid_row = -1; - switch (type.type) { - case TYPE_CHAR: - case TYPE_VARCHAR: - case TYPE_STRING: { - const auto column_string = - assert_cast<const vectorized::ColumnString*>(real_column_ptr.get()); - - size_t limit = config::string_type_length_soft_limit_bytes; - // when type.len is negative, std::min will return overflow value, so we need to check it - if (type.len > 0) { - limit = std::min(config::string_type_length_soft_limit_bytes, type.len); - } - for (size_t j = 0; j < column->size(); ++j) { - auto row = rows ? (*rows)[j] : j; - if (row == last_invalid_row) { - continue; - } - if (need_to_validate(j, row)) { - auto str_val = column_string->get_data_at(j); - bool invalid = str_val.size > limit; - if (invalid) { - last_invalid_row = row; - if (str_val.size > type.len) { - fmt::format_to(error_msg, "{}", - "the length of input is too long than schema. "); - fmt::format_to(error_msg, "first 32 bytes of input str: [{}] ", - str_val.to_prefix(32)); - fmt::format_to(error_msg, "schema length: {}; ", type.len); - fmt::format_to(error_msg, "actual length: {}; ", str_val.size); - } else if (str_val.size > limit) { - fmt::format_to(error_msg, "{}", - "the length of input string is too long than vec schema. "); - fmt::format_to(error_msg, "first 32 bytes of input str: [{}] ", - str_val.to_prefix(32)); - fmt::format_to(error_msg, "schema length: {}; ", type.len); - fmt::format_to(error_msg, "limit length: {}; ", limit); - fmt::format_to(error_msg, "actual length: {}; ", str_val.size); - } - RETURN_IF_ERROR(set_invalid_and_append_error_msg(row)); - } - } - } - break; - } - case TYPE_JSONB: { - const auto column_string = - assert_cast<const vectorized::ColumnString*>(real_column_ptr.get()); - for (size_t j = 0; j < column->size(); ++j) { - if (!filter_bitmap->Get(j)) { - if (is_nullable && column_ptr && column_ptr->is_null_at(j)) { - continue; - } - auto str_val = column_string->get_data_at(j); - bool invalid = str_val.size == 0; - if (invalid) { - error_msg.clear(); - fmt::format_to(error_msg, "{}", "jsonb with size 0 is invalid"); - RETURN_IF_ERROR(set_invalid_and_append_error_msg(j)); - } - } - } - break; - } - case TYPE_DECIMALV2: { - auto column_decimal = const_cast<vectorized::ColumnDecimal<vectorized::Decimal128>*>( - assert_cast<const vectorized::ColumnDecimal<vectorized::Decimal128>*>( - real_column_ptr.get())); - const auto& max_decimalv2 = _get_decimalv2_min_or_max<false>(type); - const auto& min_decimalv2 = _get_decimalv2_min_or_max<true>(type); - for (size_t j = 0; j < column->size(); ++j) { - auto row = rows ? (*rows)[j] : j; - if (row == last_invalid_row) { - continue; - } - if (need_to_validate(j, row)) { - auto dec_val = binary_cast<vectorized::Int128, DecimalV2Value>( - column_decimal->get_data()[j]); - bool invalid = false; - - if (dec_val.greater_than_scale(type.scale)) { - auto code = dec_val.round(&dec_val, type.scale, HALF_UP); - column_decimal->get_data()[j] = dec_val.value(); - - if (code != E_DEC_OK) { - fmt::format_to(error_msg, "round one decimal failed.value={}; ", - dec_val.to_string()); - invalid = true; - } - } - if (dec_val > max_decimalv2 || dec_val < min_decimalv2) { - fmt::format_to(error_msg, "{}", "decimal value is not valid for definition"); - fmt::format_to(error_msg, ", value={}", dec_val.to_string()); - fmt::format_to(error_msg, ", precision={}, scale={}", type.precision, - type.scale); - fmt::format_to(error_msg, ", min={}, max={}; ", min_decimalv2.to_string(), - max_decimalv2.to_string()); - invalid = true; - } - - if (invalid) { - last_invalid_row = row; - RETURN_IF_ERROR(set_invalid_and_append_error_msg(row)); - } - } - } - break; - } - case TYPE_DECIMAL32: { -#define CHECK_VALIDATION_FOR_DECIMALV3(ColumnDecimalType, DecimalType) \ - auto column_decimal = const_cast<vectorized::ColumnDecimal<vectorized::ColumnDecimalType>*>( \ - assert_cast<const vectorized::ColumnDecimal<vectorized::ColumnDecimalType>*>( \ - real_column_ptr.get())); \ - const auto& max_decimal = _get_decimalv3_min_or_max<vectorized::DecimalType, false>(type); \ - const auto& min_decimal = _get_decimalv3_min_or_max<vectorized::DecimalType, true>(type); \ - for (size_t j = 0; j < column->size(); ++j) { \ - auto row = rows ? (*rows)[j] : j; \ - if (row == last_invalid_row) { \ - continue; \ - } \ - if (need_to_validate(j, row)) { \ - auto dec_val = column_decimal->get_data()[j]; \ - bool invalid = false; \ - if (dec_val > max_decimal || dec_val < min_decimal) { \ - fmt::format_to(error_msg, "{}", "decimal value is not valid for definition"); \ - fmt::format_to(error_msg, ", value={}", dec_val); \ - fmt::format_to(error_msg, ", precision={}, scale={}", type.precision, type.scale); \ - fmt::format_to(error_msg, ", min={}, max={}; ", min_decimal, max_decimal); \ - invalid = true; \ - } \ - if (invalid) { \ - last_invalid_row = row; \ - RETURN_IF_ERROR(set_invalid_and_append_error_msg(row)); \ - } \ - } \ - } - CHECK_VALIDATION_FOR_DECIMALV3(Decimal32, Decimal32); - break; - } - case TYPE_DECIMAL64: { - CHECK_VALIDATION_FOR_DECIMALV3(Decimal64, Decimal64); - break; - } - case TYPE_DECIMAL128I: { - CHECK_VALIDATION_FOR_DECIMALV3(Decimal128I, Decimal128); - break; - } - case TYPE_ARRAY: { - const auto column_array = - assert_cast<const vectorized::ColumnArray*>(real_column_ptr.get()); - DCHECK(type.children.size() == 1); - auto nested_type = type.children[0]; - const auto& offsets = column_array->get_offsets(); - vectorized::IColumn::Permutation permutation(offsets.back()); - for (size_t r = 0; r < offsets.size(); ++r) { - for (size_t c = offsets[r - 1]; c < offsets[r]; ++c) { - permutation[c] = rows ? (*rows)[r] : r; - } - } - fmt::format_to(error_prefix, "ARRAY type failed: "); - RETURN_IF_ERROR(_validate_column(state, nested_type, type.contains_nulls[0], - column_array->get_data_ptr(), slot_index, filter_bitmap, - stop_processing, error_prefix, &permutation)); - break; - } - case TYPE_MAP: { - const auto column_map = assert_cast<const vectorized::ColumnMap*>(real_column_ptr.get()); - DCHECK(type.children.size() == 2); - auto key_type = type.children[0]; - auto val_type = type.children[1]; - const auto& offsets = column_map->get_offsets(); - vectorized::IColumn::Permutation permutation(offsets.back()); - for (size_t r = 0; r < offsets.size(); ++r) { - for (size_t c = offsets[r - 1]; c < offsets[r]; ++c) { - permutation[c] = rows ? (*rows)[r] : r; - } - } - fmt::format_to(error_prefix, "MAP type failed: "); - RETURN_IF_ERROR(_validate_column(state, key_type, type.contains_nulls[0], - column_map->get_keys_ptr(), slot_index, filter_bitmap, - stop_processing, error_prefix, &permutation)); - RETURN_IF_ERROR(_validate_column(state, val_type, type.contains_nulls[1], - column_map->get_values_ptr(), slot_index, filter_bitmap, - stop_processing, error_prefix, &permutation)); - break; - } - case TYPE_STRUCT: { - const auto column_struct = - assert_cast<const vectorized::ColumnStruct*>(real_column_ptr.get()); - DCHECK(type.children.size() == column_struct->tuple_size()); - fmt::format_to(error_prefix, "STRUCT type failed: "); - for (size_t sc = 0; sc < column_struct->tuple_size(); ++sc) { - RETURN_IF_ERROR(_validate_column(state, type.children[sc], type.contains_nulls[sc], - column_struct->get_column_ptr(sc), slot_index, - filter_bitmap, stop_processing, error_prefix)); - } - break; - } - default: - break; - } - - // Dispose the column should do not contain the NULL value - // Only two case: - // 1. column is nullable but the desc is not nullable - // 2. desc->type is BITMAP - if ((!is_nullable || type == TYPE_OBJECT) && column_ptr) { - for (int j = 0; j < column->size(); ++j) { - auto row = rows ? (*rows)[j] : j; - if (row == last_invalid_row) { - continue; - } - if (null_map[j] && !filter_bitmap->Get(row)) { - fmt::format_to(error_msg, "null value for not null column, type={}", - type.debug_string()); - last_invalid_row = row; - RETURN_IF_ERROR(set_invalid_and_append_error_msg(row)); - } - } - } - - return Status::OK(); -} - -Status VOlapTableSink::_validate_data(RuntimeState* state, vectorized::Block* block, - Bitmap* filter_bitmap, int* filtered_rows, - bool* stop_processing) { - for (int i = 0; i < _output_tuple_desc->slots().size(); ++i) { - SlotDescriptor* desc = _output_tuple_desc->slots()[i]; - block->get_by_position(i).column = - block->get_by_position(i).column->convert_to_full_column_if_const(); - const auto& column = block->get_by_position(i).column; - - fmt::memory_buffer error_prefix; - fmt::format_to(error_prefix, "column_name[{}], ", desc->col_name()); - RETURN_IF_ERROR(_validate_column(state, desc->type(), desc->is_nullable(), column, i, - filter_bitmap, stop_processing, error_prefix)); - } - - *filtered_rows = 0; - for (int i = 0; i < block->rows(); ++i) { - *filtered_rows += filter_bitmap->Get(i); - } - return Status::OK(); -} - -void VOlapTableSink::_convert_to_dest_desc_block(doris::vectorized::Block* block) { - for (int i = 0; i < _output_tuple_desc->slots().size() && i < block->columns(); ++i) { - SlotDescriptor* desc = _output_tuple_desc->slots()[i]; - if (desc->is_nullable() != block->get_by_position(i).type->is_nullable()) { - if (desc->is_nullable()) { - block->get_by_position(i).type = - vectorized::make_nullable(block->get_by_position(i).type); - block->get_by_position(i).column = - vectorized::make_nullable(block->get_by_position(i).column); - } else { - block->get_by_position(i).type = assert_cast<const vectorized::DataTypeNullable&>( - *block->get_by_position(i).type) - .get_nested_type(); - block->get_by_position(i).column = assert_cast<const vectorized::ColumnNullable&>( - *block->get_by_position(i).column) - .get_nested_column_ptr(); - } - } - } -} - } // namespace stream_load } // namespace doris diff --git a/be/src/vec/sink/vtablet_sink.h b/be/src/vec/sink/vtablet_sink.h index ebd7c40a81..9c608cda43 100644 --- a/be/src/vec/sink/vtablet_sink.h +++ b/be/src/vec/sink/vtablet_sink.h @@ -82,6 +82,7 @@ class RefCountClosure; namespace stream_load { +class OlapTableValidator; class OpenPartitionClosure; // The counter of add_batch rpc of a single node @@ -494,28 +495,6 @@ private: ChannelDistributionPayload& channel_to_payload, size_t num_rows, int32_t filtered_rows); - // make input data valid for OLAP table - // return number of invalid/filtered rows. - // invalid row number is set in Bitmap - // set stop_processing if we want to stop the whole process now. - Status _validate_data(RuntimeState* state, vectorized::Block* block, Bitmap* filter_bitmap, - int* filtered_rows, bool* stop_processing); - - template <bool is_min> - DecimalV2Value _get_decimalv2_min_or_max(const TypeDescriptor& type); - - template <typename DecimalType, bool IsMin> - DecimalType _get_decimalv3_min_or_max(const TypeDescriptor& type); - - Status _validate_column(RuntimeState* state, const TypeDescriptor& type, bool is_nullable, - vectorized::ColumnPtr column, size_t slot_index, Bitmap* filter_bitmap, - bool* stop_processing, fmt::memory_buffer& error_prefix, - vectorized::IColumn::Permutation* rows = nullptr); - - // some output column of output expr may have different nullable property with dest slot desc - // so here need to do the convert operation - void _convert_to_dest_desc_block(vectorized::Block* block); - Status find_tablet(RuntimeState* state, vectorized::Block* block, int row_index, const VOlapTablePartition** partition, uint32_t& tablet_index, bool& stop_processing, bool& is_continue); @@ -565,16 +544,7 @@ private: bthread_t _sender_thread = 0; std::unique_ptr<ThreadPoolToken> _send_batch_thread_pool_token; - std::map<std::pair<int, int>, DecimalV2Value> _max_decimalv2_val; - std::map<std::pair<int, int>, DecimalV2Value> _min_decimalv2_val; - - std::map<int, int32_t> _max_decimal32_val; - std::map<int, int32_t> _min_decimal32_val; - std::map<int, int64_t> _max_decimal64_val; - std::map<int, int64_t> _min_decimal64_val; - std::map<int, int128_t> _max_decimal128_val; - std::map<int, int128_t> _min_decimal128_val; - + std::unique_ptr<OlapTableValidator> _validator; // Stats for this int64_t _validate_data_ns = 0; int64_t _send_data_ns = 0; diff --git a/be/src/vec/sink/vtablet_validator.cpp b/be/src/vec/sink/vtablet_validator.cpp new file mode 100644 index 0000000000..39cc92f0f4 --- /dev/null +++ b/be/src/vec/sink/vtablet_validator.cpp @@ -0,0 +1,404 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +#include "vec/sink/vtablet_validator.h" + +#include <fmt/format.h> +#include <google/protobuf/stubs/common.h> + +#include <algorithm> +#include <memory> +#include <string> +#include <unordered_map> +#include <utility> + +// IWYU pragma: no_include <opentelemetry/common/threadlocal.h> +#include "common/compiler_util.h" // IWYU pragma: keep +#include "common/status.h" +#include "runtime/descriptors.h" +#include "runtime/runtime_state.h" +#include "service/brpc.h" +#include "util/binary_cast.hpp" +#include "util/brpc_client_cache.h" +#include "util/thread.h" +#include "vec/columns/column.h" +#include "vec/columns/column_array.h" +#include "vec/columns/column_const.h" +#include "vec/columns/column_decimal.h" +#include "vec/columns/column_map.h" +#include "vec/columns/column_nullable.h" +#include "vec/columns/column_string.h" +#include "vec/columns/column_struct.h" +#include "vec/common/assert_cast.h" +#include "vec/core/block.h" +#include "vec/core/types.h" +#include "vec/data_types/data_type_decimal.h" +#include "vec/data_types/data_type_nullable.h" +#include "vec/exprs/vexpr.h" +#include "vec/exprs/vexpr_context.h" + +namespace doris { +namespace stream_load { + +template <bool is_min> +DecimalV2Value OlapTableValidator::_get_decimalv2_min_or_max(const TypeDescriptor& type) { + std::map<std::pair<int, int>, DecimalV2Value>* pmap; + if constexpr (is_min) { + pmap = &_min_decimalv2_val; + } else { + pmap = &_max_decimalv2_val; + } + + // found + auto iter = pmap->find({type.precision, type.scale}); + if (iter != pmap->end()) { + return iter->second; + } + + // save min or max DecimalV2Value for next time + DecimalV2Value value; + if constexpr (is_min) { + value.to_min_decimal(type.precision, type.scale); + } else { + value.to_max_decimal(type.precision, type.scale); + } + pmap->emplace(std::pair<int, int> {type.precision, type.scale}, value); + return value; +} + +template <typename DecimalType, bool IsMin> +DecimalType OlapTableValidator::_get_decimalv3_min_or_max(const TypeDescriptor& type) { + std::map<int, typename DecimalType::NativeType>* pmap; + if constexpr (std::is_same_v<DecimalType, vectorized::Decimal32>) { + pmap = IsMin ? &_min_decimal32_val : &_max_decimal32_val; + } else if constexpr (std::is_same_v<DecimalType, vectorized::Decimal64>) { + pmap = IsMin ? &_min_decimal64_val : &_max_decimal64_val; + } else { + pmap = IsMin ? &_min_decimal128_val : &_max_decimal128_val; + } + + // found + auto iter = pmap->find(type.precision); + if (iter != pmap->end()) { + return iter->second; + } + + typename DecimalType::NativeType value; + if constexpr (IsMin) { + value = vectorized::min_decimal_value<DecimalType>(type.precision); + } else { + value = vectorized::max_decimal_value<DecimalType>(type.precision); + } + pmap->emplace(type.precision, value); + return value; +} + +Status OlapTableValidator::_validate_column(RuntimeState* state, const TypeDescriptor& type, + bool is_nullable, vectorized::ColumnPtr column, + size_t slot_index, Bitmap* filter_bitmap, + bool* stop_processing, fmt::memory_buffer& error_prefix, + vectorized::IColumn::Permutation* rows) { + DCHECK((rows == nullptr) || (rows->size() == column->size())); + fmt::memory_buffer error_msg; + auto set_invalid_and_append_error_msg = [&](int row) { + filter_bitmap->Set(row, true); + auto ret = state->append_error_msg_to_file([]() -> std::string { return ""; }, + [&error_prefix, &error_msg]() -> std::string { + return fmt::to_string(error_prefix) + + fmt::to_string(error_msg); + }, + stop_processing); + error_msg.clear(); + return ret; + }; + + auto column_ptr = vectorized::check_and_get_column<vectorized::ColumnNullable>(*column); + auto& real_column_ptr = column_ptr == nullptr ? column : (column_ptr->get_nested_column_ptr()); + auto null_map = column_ptr == nullptr ? nullptr : column_ptr->get_null_map_data().data(); + auto need_to_validate = [&null_map, &filter_bitmap](size_t j, size_t row) { + return !filter_bitmap->Get(row) && (null_map == nullptr || null_map[j] == 0); + }; + + ssize_t last_invalid_row = -1; + switch (type.type) { + case TYPE_CHAR: + case TYPE_VARCHAR: + case TYPE_STRING: { + const auto column_string = + assert_cast<const vectorized::ColumnString*>(real_column_ptr.get()); + + size_t limit = config::string_type_length_soft_limit_bytes; + // when type.len is negative, std::min will return overflow value, so we need to check it + if (type.len > 0) { + limit = std::min(config::string_type_length_soft_limit_bytes, type.len); + } + for (size_t j = 0; j < column->size(); ++j) { + auto row = rows ? (*rows)[j] : j; + if (row == last_invalid_row) { + continue; + } + if (need_to_validate(j, row)) { + auto str_val = column_string->get_data_at(j); + bool invalid = str_val.size > limit; + if (invalid) { + last_invalid_row = row; + if (str_val.size > type.len) { + fmt::format_to(error_msg, "{}", + "the length of input is too long than schema. "); + fmt::format_to(error_msg, "first 32 bytes of input str: [{}] ", + str_val.to_prefix(32)); + fmt::format_to(error_msg, "schema length: {}; ", type.len); + fmt::format_to(error_msg, "actual length: {}; ", str_val.size); + } else if (str_val.size > limit) { + fmt::format_to(error_msg, "{}", + "the length of input string is too long than vec schema. "); + fmt::format_to(error_msg, "first 32 bytes of input str: [{}] ", + str_val.to_prefix(32)); + fmt::format_to(error_msg, "schema length: {}; ", type.len); + fmt::format_to(error_msg, "limit length: {}; ", limit); + fmt::format_to(error_msg, "actual length: {}; ", str_val.size); + } + RETURN_IF_ERROR(set_invalid_and_append_error_msg(row)); + } + } + } + break; + } + case TYPE_JSONB: { + const auto column_string = + assert_cast<const vectorized::ColumnString*>(real_column_ptr.get()); + for (size_t j = 0; j < column->size(); ++j) { + if (!filter_bitmap->Get(j)) { + if (is_nullable && column_ptr && column_ptr->is_null_at(j)) { + continue; + } + auto str_val = column_string->get_data_at(j); + bool invalid = str_val.size == 0; + if (invalid) { + error_msg.clear(); + fmt::format_to(error_msg, "{}", "jsonb with size 0 is invalid"); + RETURN_IF_ERROR(set_invalid_and_append_error_msg(j)); + } + } + } + break; + } + case TYPE_DECIMALV2: { + auto column_decimal = const_cast<vectorized::ColumnDecimal<vectorized::Decimal128>*>( + assert_cast<const vectorized::ColumnDecimal<vectorized::Decimal128>*>( + real_column_ptr.get())); + const auto& max_decimalv2 = _get_decimalv2_min_or_max<false>(type); + const auto& min_decimalv2 = _get_decimalv2_min_or_max<true>(type); + for (size_t j = 0; j < column->size(); ++j) { + auto row = rows ? (*rows)[j] : j; + if (row == last_invalid_row) { + continue; + } + if (need_to_validate(j, row)) { + auto dec_val = binary_cast<vectorized::Int128, DecimalV2Value>( + column_decimal->get_data()[j]); + bool invalid = false; + + if (dec_val.greater_than_scale(type.scale)) { + auto code = dec_val.round(&dec_val, type.scale, HALF_UP); + column_decimal->get_data()[j] = dec_val.value(); + + if (code != E_DEC_OK) { + fmt::format_to(error_msg, "round one decimal failed.value={}; ", + dec_val.to_string()); + invalid = true; + } + } + if (dec_val > max_decimalv2 || dec_val < min_decimalv2) { + fmt::format_to(error_msg, "{}", "decimal value is not valid for definition"); + fmt::format_to(error_msg, ", value={}", dec_val.to_string()); + fmt::format_to(error_msg, ", precision={}, scale={}", type.precision, + type.scale); + fmt::format_to(error_msg, ", min={}, max={}; ", min_decimalv2.to_string(), + max_decimalv2.to_string()); + invalid = true; + } + + if (invalid) { + last_invalid_row = row; + RETURN_IF_ERROR(set_invalid_and_append_error_msg(row)); + } + } + } + break; + } + case TYPE_DECIMAL32: { +#define CHECK_VALIDATION_FOR_DECIMALV3(ColumnDecimalType, DecimalType) \ + auto column_decimal = const_cast<vectorized::ColumnDecimal<vectorized::ColumnDecimalType>*>( \ + assert_cast<const vectorized::ColumnDecimal<vectorized::ColumnDecimalType>*>( \ + real_column_ptr.get())); \ + const auto& max_decimal = _get_decimalv3_min_or_max<vectorized::DecimalType, false>(type); \ + const auto& min_decimal = _get_decimalv3_min_or_max<vectorized::DecimalType, true>(type); \ + for (size_t j = 0; j < column->size(); ++j) { \ + auto row = rows ? (*rows)[j] : j; \ + if (row == last_invalid_row) { \ + continue; \ + } \ + if (need_to_validate(j, row)) { \ + auto dec_val = column_decimal->get_data()[j]; \ + bool invalid = false; \ + if (dec_val > max_decimal || dec_val < min_decimal) { \ + fmt::format_to(error_msg, "{}", "decimal value is not valid for definition"); \ + fmt::format_to(error_msg, ", value={}", dec_val); \ + fmt::format_to(error_msg, ", precision={}, scale={}", type.precision, type.scale); \ + fmt::format_to(error_msg, ", min={}, max={}; ", min_decimal, max_decimal); \ + invalid = true; \ + } \ + if (invalid) { \ + last_invalid_row = row; \ + RETURN_IF_ERROR(set_invalid_and_append_error_msg(row)); \ + } \ + } \ + } + CHECK_VALIDATION_FOR_DECIMALV3(Decimal32, Decimal32); + break; + } + case TYPE_DECIMAL64: { + CHECK_VALIDATION_FOR_DECIMALV3(Decimal64, Decimal64); + break; + } + case TYPE_DECIMAL128I: { + CHECK_VALIDATION_FOR_DECIMALV3(Decimal128I, Decimal128); + break; + } + case TYPE_ARRAY: { + const auto column_array = + assert_cast<const vectorized::ColumnArray*>(real_column_ptr.get()); + DCHECK(type.children.size() == 1); + auto nested_type = type.children[0]; + const auto& offsets = column_array->get_offsets(); + vectorized::IColumn::Permutation permutation(offsets.back()); + for (size_t r = 0; r < offsets.size(); ++r) { + for (size_t c = offsets[r - 1]; c < offsets[r]; ++c) { + permutation[c] = rows ? (*rows)[r] : r; + } + } + fmt::format_to(error_prefix, "ARRAY type failed: "); + RETURN_IF_ERROR(_validate_column(state, nested_type, type.contains_nulls[0], + column_array->get_data_ptr(), slot_index, filter_bitmap, + stop_processing, error_prefix, &permutation)); + break; + } + case TYPE_MAP: { + const auto column_map = assert_cast<const vectorized::ColumnMap*>(real_column_ptr.get()); + DCHECK(type.children.size() == 2); + auto key_type = type.children[0]; + auto val_type = type.children[1]; + const auto& offsets = column_map->get_offsets(); + vectorized::IColumn::Permutation permutation(offsets.back()); + for (size_t r = 0; r < offsets.size(); ++r) { + for (size_t c = offsets[r - 1]; c < offsets[r]; ++c) { + permutation[c] = rows ? (*rows)[r] : r; + } + } + fmt::format_to(error_prefix, "MAP type failed: "); + RETURN_IF_ERROR(_validate_column(state, key_type, type.contains_nulls[0], + column_map->get_keys_ptr(), slot_index, filter_bitmap, + stop_processing, error_prefix, &permutation)); + RETURN_IF_ERROR(_validate_column(state, val_type, type.contains_nulls[1], + column_map->get_values_ptr(), slot_index, filter_bitmap, + stop_processing, error_prefix, &permutation)); + break; + } + case TYPE_STRUCT: { + const auto column_struct = + assert_cast<const vectorized::ColumnStruct*>(real_column_ptr.get()); + DCHECK(type.children.size() == column_struct->tuple_size()); + fmt::format_to(error_prefix, "STRUCT type failed: "); + for (size_t sc = 0; sc < column_struct->tuple_size(); ++sc) { + RETURN_IF_ERROR(_validate_column(state, type.children[sc], type.contains_nulls[sc], + column_struct->get_column_ptr(sc), slot_index, + filter_bitmap, stop_processing, error_prefix)); + } + break; + } + default: + break; + } + + // Dispose the column should do not contain the NULL value + // Only two case: + // 1. column is nullable but the desc is not nullable + // 2. desc->type is BITMAP + if ((!is_nullable || type == TYPE_OBJECT) && column_ptr) { + for (int j = 0; j < column->size(); ++j) { + auto row = rows ? (*rows)[j] : j; + if (row == last_invalid_row) { + continue; + } + if (null_map[j] && !filter_bitmap->Get(row)) { + fmt::format_to(error_msg, "null value for not null column, type={}", + type.debug_string()); + last_invalid_row = row; + RETURN_IF_ERROR(set_invalid_and_append_error_msg(row)); + } + } + } + + return Status::OK(); +} + +Status OlapTableValidator::validate_data(RuntimeState* state, vectorized::Block* block, + Bitmap* filter_bitmap, int* filtered_rows, + bool* stop_processing) { + for (int i = 0; i < _output_tuple_desc->slots().size(); ++i) { + SlotDescriptor* desc = _output_tuple_desc->slots()[i]; + block->get_by_position(i).column = + block->get_by_position(i).column->convert_to_full_column_if_const(); + const auto& column = block->get_by_position(i).column; + + fmt::memory_buffer error_prefix; + fmt::format_to(error_prefix, "column_name[{}], ", desc->col_name()); + RETURN_IF_ERROR(_validate_column(state, desc->type(), desc->is_nullable(), column, i, + filter_bitmap, stop_processing, error_prefix)); + } + + *filtered_rows = 0; + for (int i = 0; i < block->rows(); ++i) { + *filtered_rows += filter_bitmap->Get(i); + } + return Status::OK(); +} + +void OlapTableValidator::convert_to_dest_desc_block(doris::vectorized::Block* block) { + for (int i = 0; i < _output_tuple_desc->slots().size() && i < block->columns(); ++i) { + SlotDescriptor* desc = _output_tuple_desc->slots()[i]; + if (desc->is_nullable() != block->get_by_position(i).type->is_nullable()) { + if (desc->is_nullable()) { + block->get_by_position(i).type = + vectorized::make_nullable(block->get_by_position(i).type); + block->get_by_position(i).column = + vectorized::make_nullable(block->get_by_position(i).column); + } else { + block->get_by_position(i).type = assert_cast<const vectorized::DataTypeNullable&>( + *block->get_by_position(i).type) + .get_nested_type(); + block->get_by_position(i).column = assert_cast<const vectorized::ColumnNullable&>( + *block->get_by_position(i).column) + .get_nested_column_ptr(); + } + } + } +} + +} // namespace stream_load +} // namespace doris \ No newline at end of file diff --git a/be/src/vec/sink/vtablet_validator.h b/be/src/vec/sink/vtablet_validator.h new file mode 100644 index 0000000000..d610f7680d --- /dev/null +++ b/be/src/vec/sink/vtablet_validator.h @@ -0,0 +1,78 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +#pragma once +#include <stddef.h> +#include <stdint.h> + +// IWYU pragma: no_include <bits/chrono.h> +#include <chrono> // IWYU pragma: keep +#include <map> + +#include "common/status.h" +#include "runtime/decimalv2_value.h" +#include "runtime/types.h" +#include "util/bitmap.h" +#include "vec/columns/column.h" +#include "vec/core/block.h" + +namespace doris { +namespace stream_load { + +class OlapTableValidator { +public: + OlapTableValidator(TupleDescriptor* output_tuple_desc) + : _output_tuple_desc(output_tuple_desc) {} + + // make input data valid for OLAP table + // return number of invalid/filtered rows. + // invalid row number is set in Bitmap + // set stop_processing if we want to stop the whole process now. + Status validate_data(RuntimeState* state, vectorized::Block* block, Bitmap* filter_bitmap, + int* filtered_rows, bool* stop_processing); + + // some output column of output expr may have different nullable property with dest slot desc + // so here need to do the convert operation + void convert_to_dest_desc_block(vectorized::Block* block); + +private: + template <bool is_min> + DecimalV2Value _get_decimalv2_min_or_max(const TypeDescriptor& type); + + template <typename DecimalType, bool IsMin> + DecimalType _get_decimalv3_min_or_max(const TypeDescriptor& type); + + Status _validate_column(RuntimeState* state, const TypeDescriptor& type, bool is_nullable, + vectorized::ColumnPtr column, size_t slot_index, Bitmap* filter_bitmap, + bool* stop_processing, fmt::memory_buffer& error_prefix, + vectorized::IColumn::Permutation* rows = nullptr); + + TupleDescriptor* _output_tuple_desc = nullptr; + + std::map<std::pair<int, int>, DecimalV2Value> _max_decimalv2_val; + std::map<std::pair<int, int>, DecimalV2Value> _min_decimalv2_val; + + std::map<int, int32_t> _max_decimal32_val; + std::map<int, int32_t> _min_decimal32_val; + std::map<int, int64_t> _max_decimal64_val; + std::map<int, int64_t> _min_decimal64_val; + std::map<int, int128_t> _max_decimal128_val; + std::map<int, int128_t> _min_decimal128_val; +}; + +} // namespace stream_load +} // namespace doris \ No newline at end of file --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org For additional commands, e-mail: commits-h...@doris.apache.org