This is an automated email from the ASF dual-hosted git repository. morningman pushed a commit to branch dev-1.0.0 in repository https://gitbox.apache.org/repos/asf/incubator-doris.git
commit fed43a19b8aa7ba16d2ab3519f843635384260fb Author: Pxl <952130...@qq.com> AuthorDate: Thu Mar 3 22:44:49 2022 +0800 [fix][improvement](runtime-filter) fix string type length limit error && add runtime filter decimal support (#8282) --- be/src/exprs/runtime_filter.cpp | 200 ++++++++++++++++++--------------- be/src/olap/olap_define.h | 2 +- be/src/vec/core/block.cpp | 9 +- be/src/vec/functions/function_case.h | 5 +- be/src/vec/sink/vtablet_sink.cpp | 202 +++++++++++++++++++--------------- be/src/vec/utils/template_helpers.hpp | 8 +- 6 files changed, 245 insertions(+), 181 deletions(-) diff --git a/be/src/exprs/runtime_filter.cpp b/be/src/exprs/runtime_filter.cpp index 4b603ea..2ab2e17 100644 --- a/be/src/exprs/runtime_filter.cpp +++ b/be/src/exprs/runtime_filter.cpp @@ -34,9 +34,11 @@ #include "exprs/predicate.h" #include "gen_cpp/internal_service.pb.h" #include "gen_cpp/types.pb.h" +#include "runtime/primitive_type.h" #include "runtime/runtime_filter_mgr.h" #include "runtime/runtime_state.h" #include "runtime/type_limit.h" +#include "udf/udf.h" #include "util/defer_op.h" #include "util/runtime_profile.h" #include "util/string_parser.hpp" @@ -328,8 +330,13 @@ public: _filter_id(params->filter_id) {} // for a 'tmp' runtime predicate wrapper // only could called assign method or as a param for merge - RuntimePredicateWrapper(MemTracker* tracker, ObjectPool* pool, RuntimeFilterType type, UniqueId fragment_instance_id, uint32_t filter_id) - : _tracker(tracker), _pool(pool), _filter_type(type), _fragment_instance_id(fragment_instance_id), _filter_id(filter_id) {} + RuntimePredicateWrapper(MemTracker* tracker, ObjectPool* pool, RuntimeFilterType type, + UniqueId fragment_instance_id, uint32_t filter_id) + : _tracker(tracker), + _pool(pool), + _filter_type(type), + _fragment_instance_id(fragment_instance_id), + _filter_id(filter_id) {} // init runtime filter wrapper // alloc memory to init runtime filter function Status init(const RuntimeFilterParams* params) { @@ -410,8 +417,10 @@ public: case TYPE_DATE: case TYPE_DATETIME: { // DateTime->DateTimeValue - vectorized::DateTime date_time =*reinterpret_cast<const vectorized::DateTime*>(value.data); - vectorized::VecDateTimeValue vec_date_time_value = binary_cast<vectorized::Int64, vectorized::VecDateTimeValue>(date_time); + vectorized::DateTime date_time = + *reinterpret_cast<const vectorized::DateTime*>(value.data); + vectorized::VecDateTimeValue vec_date_time_value = + binary_cast<vectorized::Int64, vectorized::VecDateTimeValue>(date_time); doris::DateTimeValue date_time_value; vec_date_time_value.convert_vec_dt_to_dt(&date_time_value); insert(reinterpret_cast<const void*>(&date_time_value)); @@ -438,9 +447,8 @@ public: RuntimeFilterType get_real_type() { auto real_filter_type = _filter_type; if (real_filter_type == RuntimeFilterType::IN_OR_BLOOM_FILTER) { - real_filter_type = _is_bloomfilter - ? RuntimeFilterType::BLOOM_FILTER - : RuntimeFilterType::IN_FILTER; + real_filter_type = _is_bloomfilter ? RuntimeFilterType::BLOOM_FILTER + : RuntimeFilterType::IN_FILTER; } return real_filter_type; } @@ -511,18 +519,18 @@ public: } Status merge(const RuntimePredicateWrapper* wrapper) { - bool can_not_merge_in_or_bloom - = _filter_type == RuntimeFilterType::IN_OR_BLOOM_FILTER && - (wrapper->_filter_type != RuntimeFilterType::IN_FILTER - && wrapper->_filter_type != RuntimeFilterType::BLOOM_FILTER); + bool can_not_merge_in_or_bloom = _filter_type == RuntimeFilterType::IN_OR_BLOOM_FILTER && + (wrapper->_filter_type != RuntimeFilterType::IN_FILTER && + wrapper->_filter_type != RuntimeFilterType::BLOOM_FILTER); - bool can_not_merge_other = _filter_type != RuntimeFilterType::IN_OR_BLOOM_FILTER - && _filter_type != wrapper->_filter_type; + bool can_not_merge_other = _filter_type != RuntimeFilterType::IN_OR_BLOOM_FILTER && + _filter_type != wrapper->_filter_type; CHECK(!can_not_merge_in_or_bloom && !can_not_merge_other) << "fragment instance " << _fragment_instance_id.to_string() - << " can not merge runtime filter(id=" << _filter_id << "), current is filter type is " - << to_string(_filter_type) << ", other filter type is " << to_string(wrapper->_filter_type); + << " can not merge runtime filter(id=" << _filter_id + << "), current is filter type is " << to_string(_filter_type) + << ", other filter type is " << to_string(wrapper->_filter_type); switch (_filter_type) { case RuntimeFilterType::IN_FILTER: { @@ -530,8 +538,8 @@ public: break; } else if (wrapper->_is_ignored_in_filter) { VLOG_DEBUG << "fragment instance " << _fragment_instance_id.to_string() - << " ignore merge runtime filter(in filter id " - << _filter_id << ") because: " << *(wrapper->get_ignored_in_filter_msg()); + << " ignore merge runtime filter(in filter id " << _filter_id + << ") because: " << *(wrapper->get_ignored_in_filter_msg()); _is_ignored_in_filter = true; _ignored_in_filter_msg = wrapper->_ignored_in_filter_msg; @@ -545,9 +553,9 @@ public: #ifdef VLOG_DEBUG_IS_ON std::stringstream msg; msg << "fragment instance " << _fragment_instance_id.to_string() - << " ignore merge runtime filter(in filter id " - << _filter_id << ") because: in_num(" << _hybrid_set->size() - << ") >= max_in_num(" << _max_in_num << ")"; + << " ignore merge runtime filter(in filter id " << _filter_id + << ") because: in_num(" << _hybrid_set->size() << ") >= max_in_num(" + << _max_in_num << ")"; _ignored_in_filter_msg = _pool->add(new std::string(msg.str())); #else _ignored_in_filter_msg = _pool->add(new std::string("ignored")); @@ -568,9 +576,8 @@ public: break; } case RuntimeFilterType::IN_OR_BLOOM_FILTER: { - auto real_filter_type = _is_bloomfilter - ? RuntimeFilterType::BLOOM_FILTER - : RuntimeFilterType::IN_FILTER; + auto real_filter_type = _is_bloomfilter ? RuntimeFilterType::BLOOM_FILTER + : RuntimeFilterType::IN_FILTER; if (real_filter_type == RuntimeFilterType::IN_FILTER) { if (wrapper->_filter_type == RuntimeFilterType::IN_FILTER) { // in merge in CHECK(!wrapper->_is_ignored_in_filter) @@ -581,21 +588,22 @@ public: _hybrid_set->insert(wrapper->_hybrid_set.get()); if (_max_in_num >= 0 && _hybrid_set->size() >= _max_in_num) { VLOG_DEBUG << "fragment instance " << _fragment_instance_id.to_string() - << " change runtime filter to bloom filter(id=" << _filter_id - << ") because: in_num(" << _hybrid_set->size() - << ") >= max_in_num(" << _max_in_num << ")"; + << " change runtime filter to bloom filter(id=" << _filter_id + << ") because: in_num(" << _hybrid_set->size() + << ") >= max_in_num(" << _max_in_num << ")"; change_to_bloom_filter(); } - // in merge bloom filter + // in merge bloom filter } else { VLOG_DEBUG << "fragment instance " << _fragment_instance_id.to_string() - << " change runtime filter to bloom filter(id=" << _filter_id - << ") because: already exist a bloom filter"; + << " change runtime filter to bloom filter(id=" << _filter_id + << ") because: already exist a bloom filter"; change_to_bloom_filter(); _bloomfilter_func->merge(wrapper->_bloomfilter_func.get()); } } else { - if (wrapper->_filter_type == RuntimeFilterType::IN_FILTER) { // bloom filter merge in + if (wrapper->_filter_type == + RuntimeFilterType::IN_FILTER) { // bloom filter merge in CHECK(!wrapper->_is_ignored_in_filter) << "fragment instance " << _fragment_instance_id.to_string() << " can not ignore merge runtime filter(in filter id " @@ -607,7 +615,7 @@ public: _bloomfilter_func->insert(value); it->next(); } - // bloom filter merge bloom filter + // bloom filter merge bloom filter } else { _bloomfilter_func->merge(wrapper->_bloomfilter_func.get()); } @@ -626,7 +634,8 @@ public: PrimitiveType type = to_primitive_type(in_filter->column_type()); if (in_filter->has_ignored_msg()) { - VLOG_DEBUG << "Ignore in filter(id=" << _filter_id << ") because: " << in_filter->ignored_msg(); + VLOG_DEBUG << "Ignore in filter(id=" << _filter_id + << ") because: " << in_filter->ignored_msg(); _is_ignored_in_filter = true; _ignored_in_filter_msg = _pool->add(new std::string(in_filter->ignored_msg())); return Status::OK(); @@ -634,60 +643,68 @@ public: _hybrid_set.reset(create_set(type)); switch (type) { case TYPE_BOOLEAN: { - batch_assign(in_filter, [](std::unique_ptr<HybridSetBase> &set, PColumnValue &column, ObjectPool *pool) { + batch_assign(in_filter, [](std::unique_ptr<HybridSetBase>& set, PColumnValue& column, + ObjectPool* pool) { bool bool_val = column.boolval(); set->insert(&bool_val); }); break; } case TYPE_TINYINT: { - batch_assign(in_filter, [](std::unique_ptr<HybridSetBase> &set, PColumnValue &column, ObjectPool *pool) { + batch_assign(in_filter, [](std::unique_ptr<HybridSetBase>& set, PColumnValue& column, + ObjectPool* pool) { int8_t int_val = static_cast<int8_t>(column.intval()); set->insert(&int_val); }); break; } case TYPE_SMALLINT: { - batch_assign(in_filter, [](std::unique_ptr<HybridSetBase> &set, PColumnValue &column, ObjectPool *pool) { + batch_assign(in_filter, [](std::unique_ptr<HybridSetBase>& set, PColumnValue& column, + ObjectPool* pool) { int16_t int_val = static_cast<int16_t>(column.intval()); set->insert(&int_val); }); break; } case TYPE_INT: { - batch_assign(in_filter, [](std::unique_ptr<HybridSetBase> &set, PColumnValue &column, ObjectPool *pool) { + batch_assign(in_filter, [](std::unique_ptr<HybridSetBase>& set, PColumnValue& column, + ObjectPool* pool) { int32_t int_val = column.intval(); set->insert(&int_val); }); break; } case TYPE_BIGINT: { - batch_assign(in_filter, [](std::unique_ptr<HybridSetBase> &set, PColumnValue &column, ObjectPool *pool) { + batch_assign(in_filter, [](std::unique_ptr<HybridSetBase>& set, PColumnValue& column, + ObjectPool* pool) { int64_t long_val = column.longval(); set->insert(&long_val); }); break; } case TYPE_LARGEINT: { - batch_assign(in_filter, [](std::unique_ptr<HybridSetBase> &set, PColumnValue &column, ObjectPool *pool) { + batch_assign(in_filter, [](std::unique_ptr<HybridSetBase>& set, PColumnValue& column, + ObjectPool* pool) { auto string_val = column.stringval(); StringParser::ParseResult result; - int128_t int128_val = StringParser::string_to_int<int128_t>(string_val.c_str(), - string_val.length(), &result); + int128_t int128_val = StringParser::string_to_int<int128_t>( + string_val.c_str(), string_val.length(), &result); DCHECK(result == StringParser::PARSE_SUCCESS); set->insert(&int128_val); }); break; } case TYPE_FLOAT: { - batch_assign(in_filter, [](std::unique_ptr<HybridSetBase> &set, PColumnValue &column, ObjectPool *pool) { + batch_assign(in_filter, [](std::unique_ptr<HybridSetBase>& set, PColumnValue& column, + ObjectPool* pool) { float float_val = static_cast<float>(column.doubleval()); set->insert(&float_val); }); break; } case TYPE_DOUBLE: { - batch_assign(in_filter, [](std::unique_ptr<HybridSetBase> &set, PColumnValue &column, ObjectPool *pool) { + batch_assign(in_filter, [](std::unique_ptr<HybridSetBase>& set, PColumnValue& column, + ObjectPool* pool) { double double_val = column.doubleval(); set->insert(&double_val); }); @@ -695,19 +712,30 @@ public: } case TYPE_DATETIME: case TYPE_DATE: { - batch_assign(in_filter, [](std::unique_ptr<HybridSetBase> &set, PColumnValue &column, ObjectPool *pool) { - auto &string_val_ref = column.stringval(); + batch_assign(in_filter, [](std::unique_ptr<HybridSetBase>& set, PColumnValue& column, + ObjectPool* pool) { + auto& string_val_ref = column.stringval(); DateTimeValue datetime_val; datetime_val.from_date_str(string_val_ref.c_str(), string_val_ref.length()); set->insert(&datetime_val); }); break; } + case TYPE_DECIMALV2: { + batch_assign(in_filter, [](std::unique_ptr<HybridSetBase>& set, PColumnValue& column, + ObjectPool* pool) { + auto& string_val_ref = column.stringval(); + DecimalV2Value decimal_val(string_val_ref); + set->insert(&decimal_val); + }); + break; + } case TYPE_VARCHAR: case TYPE_CHAR: case TYPE_STRING: { - batch_assign(in_filter, [](std::unique_ptr<HybridSetBase> &set, PColumnValue &column, ObjectPool *pool) { - auto &string_val_ref = column.stringval(); + batch_assign(in_filter, [](std::unique_ptr<HybridSetBase>& set, PColumnValue& column, + ObjectPool* pool) { + auto& string_val_ref = column.stringval(); auto val_ptr = pool->add(new std::string(string_val_ref)); StringValue string_val(const_cast<char*>(val_ptr->c_str()), val_ptr->length()); set->insert(&string_val); @@ -742,10 +770,8 @@ public: _minmax_func.reset(create_minmax_filter(type)); switch (type) { case TYPE_BOOLEAN: { - bool min_val; - bool max_val; - min_val = minmax_filter->min_val().boolval(); - max_val = minmax_filter->max_val().boolval(); + bool min_val = minmax_filter->min_val().boolval(); + bool max_val = minmax_filter->max_val().boolval(); return _minmax_func->assign(&min_val, &max_val); } case TYPE_TINYINT: { @@ -791,17 +817,13 @@ public: return _minmax_func->assign(&min_val, &max_val); } case TYPE_FLOAT: { - float min_val; - float max_val; - min_val = static_cast<float>(minmax_filter->min_val().doubleval()); - max_val = static_cast<float>(minmax_filter->max_val().doubleval()); + float min_val = static_cast<float>(minmax_filter->min_val().doubleval()); + float max_val = static_cast<float>(minmax_filter->max_val().doubleval()); return _minmax_func->assign(&min_val, &max_val); } case TYPE_DOUBLE: { - double min_val; - double max_val; - min_val = static_cast<double>(minmax_filter->min_val().doubleval()); - max_val = static_cast<double>(minmax_filter->max_val().doubleval()); + double min_val = static_cast<double>(minmax_filter->min_val().doubleval()); + double max_val = static_cast<double>(minmax_filter->max_val().doubleval()); return _minmax_func->assign(&min_val, &max_val); } case TYPE_DATETIME: @@ -814,6 +836,13 @@ public: max_val.from_date_str(max_val_ref.c_str(), max_val_ref.length()); return _minmax_func->assign(&min_val, &max_val); } + case TYPE_DECIMALV2: { + auto& min_val_ref = minmax_filter->min_val().stringval(); + auto& max_val_ref = minmax_filter->max_val().stringval(); + DecimalV2Value min_val(min_val_ref); + DecimalV2Value max_val(max_val_ref); + return _minmax_func->assign(&min_val, &max_val); + } case TYPE_VARCHAR: case TYPE_CHAR: case TYPE_STRING: { @@ -869,20 +898,15 @@ public: } } - bool is_bloomfilter() const { - return _is_bloomfilter; - } + bool is_bloomfilter() const { return _is_bloomfilter; } - bool is_ignored_in_filter() const { - return _is_ignored_in_filter; - } + bool is_ignored_in_filter() const { return _is_ignored_in_filter; } - std::string* get_ignored_in_filter_msg() const { - return _ignored_in_filter_msg; - } + std::string* get_ignored_in_filter_msg() const { return _ignored_in_filter_msg; } void batch_assign(const PInFilter* filter, - void (*assign_func) (std::unique_ptr<HybridSetBase> &_hybrid_set, PColumnValue&, ObjectPool*)) { + void (*assign_func)(std::unique_ptr<HybridSetBase>& _hybrid_set, + PColumnValue&, ObjectPool*)) { for (int i = 0; i < filter->values_size(); ++i) { PColumnValue column = filter->values(i); assign_func(_hybrid_set, column, _pool); @@ -1085,7 +1109,8 @@ Status IRuntimeFilter::_create_wrapper(const T* param, MemTracker* tracker, Obje std::unique_ptr<RuntimePredicateWrapper>* wrapper) { int filter_type = param->request->filter_type(); wrapper->reset(new RuntimePredicateWrapper(tracker, pool, get_type(filter_type), - UniqueId(param->request->fragment_id()), param->request->filter_id())); + UniqueId(param->request->fragment_id()), + param->request->filter_id())); switch (filter_type) { case PFilterType::IN_FILTER: { @@ -1122,7 +1147,8 @@ void IRuntimeFilter::init_profile(RuntimeProfile* parent_profile) { void IRuntimeFilter::update_runtime_filter_type_to_profile() { if (_profile.get() != nullptr) { - _profile->add_info_string("RealRuntimeFilterType", ::doris::to_string(_wrapper->get_real_type())); + _profile->add_info_string("RealRuntimeFilterType", + ::doris::to_string(_wrapper->get_real_type())); } } @@ -1157,7 +1183,7 @@ const RuntimePredicateWrapper* IRuntimeFilter::get_wrapper() { template <typename T> void batch_copy(PInFilter* filter, HybridSetBase::IteratorBase* it, - void (*set_func) (PColumnValue*, const T*)) { + void (*set_func)(PColumnValue*, const T*)) { while (it->has_next()) { const void* void_value = it->get_value(); auto origin_value = reinterpret_cast<const T*>(void_value); @@ -1170,9 +1196,8 @@ template <class T> Status IRuntimeFilter::serialize_impl(T* request, void** data, int* len) { auto real_runtime_filter_type = _runtime_filter_type; if (real_runtime_filter_type == RuntimeFilterType::IN_OR_BLOOM_FILTER) { - real_runtime_filter_type = _wrapper->is_bloomfilter() - ? RuntimeFilterType::BLOOM_FILTER - : RuntimeFilterType::IN_FILTER; + real_runtime_filter_type = _wrapper->is_bloomfilter() ? RuntimeFilterType::BLOOM_FILTER + : RuntimeFilterType::IN_FILTER; } request->set_filter_type(get_type(real_runtime_filter_type)); @@ -1209,56 +1234,56 @@ void IRuntimeFilter::to_protobuf(PInFilter* filter) { switch (column_type) { case TYPE_BOOLEAN: { - batch_copy<int32_t>(filter, it, [](PColumnValue *column, const int32_t *value) { + batch_copy<int32_t>(filter, it, [](PColumnValue* column, const int32_t* value) { column->set_boolval(*value); }); return; } case TYPE_TINYINT: { - batch_copy<int8_t>(filter, it, [](PColumnValue *column, const int8_t *value) { + batch_copy<int8_t>(filter, it, [](PColumnValue* column, const int8_t* value) { column->set_intval(*value); }); return; } case TYPE_SMALLINT: { - batch_copy<int16_t>(filter, it, [](PColumnValue *column, const int16_t *value) { + batch_copy<int16_t>(filter, it, [](PColumnValue* column, const int16_t* value) { column->set_intval(*value); }); return; } case TYPE_INT: { - batch_copy<int32_t>(filter, it, [](PColumnValue *column, const int32_t *value) { + batch_copy<int32_t>(filter, it, [](PColumnValue* column, const int32_t* value) { column->set_intval(*value); }); return; } case TYPE_BIGINT: { - batch_copy<int64_t>(filter, it, [](PColumnValue *column, const int64_t *value) { + batch_copy<int64_t>(filter, it, [](PColumnValue* column, const int64_t* value) { column->set_longval(*value); }); return; } case TYPE_LARGEINT: { - batch_copy<int128_t>(filter, it, [](PColumnValue *column, const int128_t *value) { + batch_copy<int128_t>(filter, it, [](PColumnValue* column, const int128_t* value) { column->set_stringval(LargeIntValue::to_string(*value)); }); return; } case TYPE_FLOAT: { - batch_copy<float>(filter, it, [](PColumnValue *column, const float *value) { + batch_copy<float>(filter, it, [](PColumnValue* column, const float* value) { column->set_doubleval(*value); }); return; } case TYPE_DOUBLE: { - batch_copy<double>(filter, it, [](PColumnValue *column, const double *value) { + batch_copy<double>(filter, it, [](PColumnValue* column, const double* value) { column->set_doubleval(*value); }); return; } case TYPE_DATE: case TYPE_DATETIME: { - batch_copy<DateTimeValue>(filter, it, [](PColumnValue *column, const DateTimeValue *value) { + batch_copy<DateTimeValue>(filter, it, [](PColumnValue* column, const DateTimeValue* value) { char convert_buffer[30]; value->to_string(convert_buffer); column->set_stringval(convert_buffer); @@ -1266,15 +1291,16 @@ void IRuntimeFilter::to_protobuf(PInFilter* filter) { return; } case TYPE_DECIMALV2: { - batch_copy<DecimalV2Value>(filter, it, [](PColumnValue *column, const DecimalV2Value *value) { - column->set_stringval(value->to_string()); - }); + batch_copy<DecimalV2Value>(filter, it, + [](PColumnValue* column, const DecimalV2Value* value) { + column->set_stringval(value->to_string()); + }); return; } case TYPE_CHAR: case TYPE_VARCHAR: case TYPE_STRING: { - batch_copy<StringValue>(filter, it, [](PColumnValue *column, const StringValue *value) { + batch_copy<StringValue>(filter, it, [](PColumnValue* column, const StringValue* value) { column->set_stringval(std::string(value->ptr, value->len)); }); return; diff --git a/be/src/olap/olap_define.h b/be/src/olap/olap_define.h index 19205e6..b16c614 100644 --- a/be/src/olap/olap_define.h +++ b/be/src/olap/olap_define.h @@ -57,7 +57,7 @@ static const uint16_t OLAP_VARCHAR_MAX_LENGTH = 65535; static const uint32_t OLAP_STRING_MAX_LENGTH = 2147483647; // the max length supported for vec string type 1MB -static constexpr auto MAX_SIZE_OF_VEC_STRING = 1024l * 1024; +static constexpr size_t MAX_SIZE_OF_VEC_STRING = 1024 * 1024; // the max length supported for array static const uint16_t OLAP_ARRAY_MAX_LENGTH = 65535; diff --git a/be/src/vec/core/block.cpp b/be/src/vec/core/block.cpp index f888ad1..1dda373 100644 --- a/be/src/vec/core/block.cpp +++ b/be/src/vec/core/block.cpp @@ -44,10 +44,10 @@ #include "vec/data_types/data_type_date.h" #include "vec/data_types/data_type_date_time.h" #include "vec/data_types/data_type_decimal.h" +#include "vec/data_types/data_type_hll.h" #include "vec/data_types/data_type_nullable.h" #include "vec/data_types/data_type_number.h" #include "vec/data_types/data_type_string.h" -#include "vec/data_types/data_type_hll.h" namespace doris::vectorized { @@ -718,14 +718,12 @@ Status Block::serialize(PBlock* pblock, size_t* uncompressed_bytes, size_t* comp } // serialize data values + // when data type is HLL, content_uncompressed_size maybe larger than real size. allocated_buf->resize(content_uncompressed_size); char* buf = allocated_buf->data(); - char* start_buf = buf; for (const auto& c : *this) { buf = c.type->serialize(*(c.column), buf); } - CHECK(content_uncompressed_size == (buf - start_buf)) - << content_uncompressed_size << " vs. " << (buf - start_buf); *uncompressed_bytes = content_uncompressed_size; // compress @@ -792,7 +790,8 @@ doris::Tuple* Block::deep_copy_tuple(const doris::TupleDescriptor& desc, MemPool if (!slot_desc->type().is_string_type() && !slot_desc->type().is_date_type()) { memcpy((void*)dst->get_slot(slot_desc->tuple_offset()), data_ref.data, data_ref.size); - } else if (slot_desc->type().is_string_type() && slot_desc->type() != TYPE_OBJECT && slot_desc->type() != TYPE_HLL) { + } else if (slot_desc->type().is_string_type() && slot_desc->type() != TYPE_OBJECT && + slot_desc->type() != TYPE_HLL) { memcpy((void*)dst->get_slot(slot_desc->tuple_offset()), (const void*)(&data_ref), sizeof(data_ref)); // Copy the content of string diff --git a/be/src/vec/functions/function_case.h b/be/src/vec/functions/function_case.h index 1b728e9..7cea64a 100644 --- a/be/src/vec/functions/function_case.h +++ b/be/src/vec/functions/function_case.h @@ -17,6 +17,7 @@ #pragma once +#include "vec/columns/column_complex.h" #include "vec/data_types/data_type_nullable.h" #include "vec/functions/function.h" #include "vec/functions/function_helpers.h" @@ -188,7 +189,9 @@ public: uint8* then_idx, CaseWhenColumnHolder& column_holder) { auto result_column_ptr = data_type->create_column(); - if constexpr (std::is_same_v<ColumnType, ColumnString>) { + if constexpr (std::is_same_v<ColumnType, ColumnString> || + std::is_same_v<ColumnType, ColumnBitmap> || + std::is_same_v<ColumnType, ColumnHLL>) { // result_column and all then_column is not nullable. // can't simd when type is string. update_result_normal(result_column_ptr, then_idx, column_holder); diff --git a/be/src/vec/sink/vtablet_sink.cpp b/be/src/vec/sink/vtablet_sink.cpp index f8df6a6..0c486ff 100644 --- a/be/src/vec/sink/vtablet_sink.cpp +++ b/be/src/vec/sink/vtablet_sink.cpp @@ -15,18 +15,18 @@ // specific language governing permissions and limitations // under the License. -#include "util/doris_metrics.h" +#include "vec/sink/vtablet_sink.h" +#include "util/doris_metrics.h" +#include "vec/core/block.h" #include "vec/exprs/vexpr.h" #include "vec/exprs/vexpr_context.h" -#include "vec/sink/vtablet_sink.h" -#include "vec/core/block.h" namespace doris { namespace stream_load { VOlapTableSink::VOlapTableSink(ObjectPool* pool, const RowDescriptor& row_desc, - const std::vector<TExpr>& texprs, Status* status) + const std::vector<TExpr>& texprs, Status* status) : OlapTableSink(pool, row_desc, texprs, status) { // From the thrift expressions create the real exprs. vectorized::VExpr::create_expr_trees(pool, texprs, &_output_vexpr_ctxs); @@ -43,7 +43,8 @@ Status VOlapTableSink::init(const TDataSink& sink) { Status VOlapTableSink::prepare(RuntimeState* state) { // Prepare the exprs to run. - RETURN_IF_ERROR(vectorized::VExpr::prepare(_output_vexpr_ctxs, state, _input_row_desc, _expr_mem_tracker)); + RETURN_IF_ERROR(vectorized::VExpr::prepare(_output_vexpr_ctxs, state, _input_row_desc, + _expr_mem_tracker)); return OlapTableSink::prepare(state); } @@ -58,7 +59,9 @@ Status VOlapTableSink::send(RuntimeState* state, vectorized::Block* input_block) auto rows = input_block->rows(); auto bytes = input_block->bytes(); - if (UNLIKELY(rows == 0)) { return status; } + if (UNLIKELY(rows == 0)) { + return status; + } SCOPED_TIMER(_profile->total_time_counter()); _number_input_rows += rows; @@ -73,8 +76,10 @@ Status VOlapTableSink::send(RuntimeState* state, vectorized::Block* input_block) if (!_output_vexpr_ctxs.empty()) { // Do vectorized expr here to speed up load block = vectorized::VExprContext::get_output_block_after_execute_exprs( - _output_vexpr_ctxs, *input_block, status); - if (UNLIKELY(block.rows() == 0)) { return status; } + _output_vexpr_ctxs, *input_block, status); + if (UNLIKELY(block.rows() == 0)) { + return status; + } } auto num_rows = block.rows(); @@ -83,7 +88,8 @@ Status VOlapTableSink::send(RuntimeState* state, vectorized::Block* input_block) SCOPED_RAW_TIMER(&_validate_data_ns); _filter_bitmap.Reset(block.rows()); bool stop_processing = false; - RETURN_IF_ERROR(_validate_data(state, &block, &_filter_bitmap, &filtered_rows, &stop_processing)); + RETURN_IF_ERROR( + _validate_data(state, &block, &_filter_bitmap, &filtered_rows, &stop_processing)); _number_filtered_rows += filtered_rows; if (stop_processing) { // should be returned after updating "_number_filtered_rows", to make sure that load job can be cancelled @@ -106,10 +112,11 @@ Status VOlapTableSink::send(RuntimeState* state, vectorized::Block* input_block) if (!_vpartition->find_tablet(&block_row, &partition, &dist_hash)) { RETURN_IF_ERROR(state->append_error_msg_to_file([]() -> std::string { return ""; }, [&]() -> std::string { - fmt::memory_buffer buf; - fmt::format_to(buf, "no partition for this tuple. tuple=[]"); - return buf.data(); - }, &stop_processing)); + fmt::memory_buffer buf; + fmt::format_to(buf, "no partition for this tuple. tuple=[]"); + return buf.data(); + }, + &stop_processing)); _number_filtered_rows++; if (stop_processing) { return Status::EndOfFile("Encountered unqualified data, stop processing"); @@ -138,24 +145,28 @@ Status VOlapTableSink::close(RuntimeState* state, Status exec_status) { return OlapTableSink::close(state, exec_status); } -Status VOlapTableSink::_validate_data(RuntimeState* state, vectorized::Block* block, Bitmap* filter_bitmap, int* filtered_rows, - bool* stop_processing) { +Status VOlapTableSink::_validate_data(RuntimeState* state, vectorized::Block* block, + Bitmap* filter_bitmap, int* filtered_rows, + bool* stop_processing) { const auto num_rows = block->rows(); fmt::memory_buffer error_msg; auto set_invalid_and_append_error_msg = [&](int row) { - filter_bitmap->Set(row, true); - return state->append_error_msg_to_file([]() -> std::string { return ""; }, - [&error_msg]() -> std::string { return error_msg.data(); }, stop_processing); + filter_bitmap->Set(row, true); + return state->append_error_msg_to_file( + []() -> std::string { return ""; }, + [&error_msg]() -> std::string { return error_msg.data(); }, stop_processing); }; for (int i = 0; i < _output_tuple_desc->slots().size(); ++i) { SlotDescriptor* desc = _output_tuple_desc->slots()[i]; - block->get_by_position(i).column = block->get_by_position(i).column->convert_to_full_column_if_const(); + block->get_by_position(i).column = + block->get_by_position(i).column->convert_to_full_column_if_const(); const auto& column = block->get_by_position(i).column; if (desc->is_nullable() && desc->type() == TYPE_OBJECT) { - const auto& null_map = vectorized::check_and_get_column<vectorized::ColumnNullable>(*column) - ->get_null_map_data(); + const auto& null_map = + vectorized::check_and_get_column<vectorized::ColumnNullable>(*column) + ->get_null_map_data(); fmt::format_to(error_msg, "null is not allowed for bitmap column, column_name: {}; ", desc->col_name()); @@ -168,85 +179,103 @@ Status VOlapTableSink::_validate_data(RuntimeState* state, vectorized::Block* bl } } else { auto column_ptr = vectorized::check_and_get_column<vectorized::ColumnNullable>(*column); - auto& real_column_ptr = column_ptr == nullptr ? column : (column_ptr->get_nested_column_ptr()); + auto& real_column_ptr = + column_ptr == nullptr ? column : (column_ptr->get_nested_column_ptr()); switch (desc->type().type) { - case TYPE_CHAR: - case TYPE_VARCHAR: - case TYPE_STRING: { - const auto column_string = assert_cast<const vectorized::ColumnString *>(real_column_ptr.get()); - - for (int j = 0; j < num_rows; ++j) { - if (!filter_bitmap->Get(j)) { - auto str_val = column_string->get_data_at(j); - bool invalid = str_val.size > std::min(desc->type().len, (int)MAX_SIZE_OF_VEC_STRING); - - error_msg.clear(); - if (str_val.size > desc->type().len) { - fmt::format_to(error_msg, "{}", "the length of input is too long than schema. "); - fmt::format_to(error_msg, "column_name: {}; ", desc->col_name()); - fmt::format_to(error_msg, "input str: [{}] ", str_val.to_string()); - fmt::format_to(error_msg, "schema length: {}; ", desc->type().len); - fmt::format_to(error_msg, "actual length: {}; ", str_val.size); - } else if (str_val.size > MAX_SIZE_OF_VEC_STRING) { - fmt::format_to(error_msg, "{}", "the length of input string is too long than vec schema. "); - fmt::format_to(error_msg, "column_name: {}; ", desc->col_name()); - fmt::format_to(error_msg, "input str: [{}] ", str_val.to_string()); - fmt::format_to(error_msg, "schema length: {}; ", MAX_SIZE_OF_VEC_STRING); - fmt::format_to(error_msg, "actual length: {}; ", str_val.size); - } + case TYPE_CHAR: + case TYPE_VARCHAR: + case TYPE_STRING: { + const auto column_string = + assert_cast<const vectorized::ColumnString*>(real_column_ptr.get()); + + size_t limit = MAX_SIZE_OF_VEC_STRING; + if (desc->type().type != TYPE_STRING) { + DCHECK(desc->type().len >= 0); + limit = std::min(limit, (size_t)desc->type().len); + } - if (invalid) { - RETURN_IF_ERROR(set_invalid_and_append_error_msg(j)); - } + for (int j = 0; j < num_rows; ++j) { + if (!filter_bitmap->Get(j)) { + auto str_val = column_string->get_data_at(j); + bool invalid = str_val.size > limit; + + error_msg.clear(); + if (str_val.size > desc->type().len) { + fmt::format_to(error_msg, "{}", + "the length of input is too long than schema. "); + fmt::format_to(error_msg, "column_name: {}; ", desc->col_name()); + fmt::format_to(error_msg, "input str: [{}] ", str_val.to_string()); + fmt::format_to(error_msg, "schema length: {}; ", desc->type().len); + fmt::format_to(error_msg, "actual length: {}; ", str_val.size); + } else if (str_val.size > MAX_SIZE_OF_VEC_STRING) { + fmt::format_to( + error_msg, "{}", + "the length of input string is too long than vec schema. "); + fmt::format_to(error_msg, "column_name: {}; ", desc->col_name()); + fmt::format_to(error_msg, "input str: [{}] ", str_val.to_string()); + fmt::format_to(error_msg, "schema length: {}; ", + MAX_SIZE_OF_VEC_STRING); + fmt::format_to(error_msg, "actual length: {}; ", str_val.size); + } + + if (invalid) { + RETURN_IF_ERROR(set_invalid_and_append_error_msg(j)); } } - break; } - case TYPE_DECIMALV2: { - auto column_decimal = const_cast<vectorized::ColumnDecimal - <vectorized::Decimal128> *>(assert_cast<const vectorized::ColumnDecimal - <vectorized::Decimal128> *>(real_column_ptr.get())); - - for (int j = 0; j < num_rows; ++j) { - if (!filter_bitmap->Get(j)) { - auto dec_val = binary_cast<vectorized::Int128, DecimalV2Value>( - column_decimal->get_data()[j]); - error_msg.clear(); - bool invalid = false; - - if (dec_val.greater_than_scale(desc->type().scale)) { - auto code = dec_val.round(&dec_val, desc->type().scale, HALF_UP); - column_decimal->get_data()[j] = binary_cast<DecimalV2Value, vectorized::Int128>( - dec_val); - - if (code != E_DEC_OK) { - fmt::format_to(error_msg, "round one decimal failed.value={}; ", dec_val.to_string()); - invalid = true; - } - } - if (dec_val > _max_decimalv2_val[i] || dec_val < _min_decimalv2_val[i]) { - fmt::format_to(error_msg, "decimal value is not valid for definition, column={}", desc->col_name()); - fmt::format_to(error_msg, ", value={}", dec_val.to_string()); - fmt::format_to(error_msg, ", precision={}, scale={}; ", desc->type().precision, desc->type().scale); + break; + } + case TYPE_DECIMALV2: { + auto column_decimal = const_cast< + vectorized::ColumnDecimal<vectorized::Decimal128>*>( + assert_cast<const vectorized::ColumnDecimal<vectorized::Decimal128>*>( + real_column_ptr.get())); + + for (int j = 0; j < num_rows; ++j) { + if (!filter_bitmap->Get(j)) { + auto dec_val = binary_cast<vectorized::Int128, DecimalV2Value>( + column_decimal->get_data()[j]); + error_msg.clear(); + bool invalid = false; + + if (dec_val.greater_than_scale(desc->type().scale)) { + auto code = dec_val.round(&dec_val, desc->type().scale, HALF_UP); + column_decimal->get_data()[j] = + binary_cast<DecimalV2Value, vectorized::Int128>(dec_val); + + if (code != E_DEC_OK) { + fmt::format_to(error_msg, "round one decimal failed.value={}; ", + dec_val.to_string()); invalid = true; } + } + if (dec_val > _max_decimalv2_val[i] || dec_val < _min_decimalv2_val[i]) { + fmt::format_to(error_msg, + "decimal value is not valid for definition, column={}", + desc->col_name()); + fmt::format_to(error_msg, ", value={}", dec_val.to_string()); + fmt::format_to(error_msg, ", precision={}, scale={}; ", + desc->type().precision, desc->type().scale); + invalid = true; + } - if (invalid) { - RETURN_IF_ERROR(set_invalid_and_append_error_msg(j)); - } + if (invalid) { + RETURN_IF_ERROR(set_invalid_and_append_error_msg(j)); } } - break; } - default: - break; + break; + } + default: + break; } // Dispose the nullable column not match problem here, convert to nullable column if (desc->is_nullable() && !column_ptr) { block->get_by_position(i).column = vectorized::make_nullable(column); - block->get_by_position(i).type = vectorized::make_nullable(block->get_by_position(i).type); + block->get_by_position(i).type = + vectorized::make_nullable(block->get_by_position(i).type); } // Dispose the nullable column not match problem here, convert to not nullable column @@ -260,8 +289,10 @@ Status VOlapTableSink::_validate_data(RuntimeState* state, vectorized::Block* bl } } block->get_by_position(i).column = column_ptr->get_nested_column_ptr(); - block->get_by_position(i).type = (reinterpret_cast<const vectorized::DataTypeNullable*>( - block->get_by_position(i).type.get()))->get_nested_type(); + block->get_by_position(i).type = + (reinterpret_cast<const vectorized::DataTypeNullable*>( + block->get_by_position(i).type.get())) + ->get_nested_type(); } } } @@ -275,4 +306,3 @@ Status VOlapTableSink::_validate_data(RuntimeState* state, vectorized::Block* bl } // namespace stream_load } // namespace doris - diff --git a/be/src/vec/utils/template_helpers.hpp b/be/src/vec/utils/template_helpers.hpp index 10aac23..7fe681a 100644 --- a/be/src/vec/utils/template_helpers.hpp +++ b/be/src/vec/utils/template_helpers.hpp @@ -19,6 +19,7 @@ #include "http/http_status.h" #include "vec/aggregate_functions/aggregate_function.h" +#include "vec/columns/column_complex.h" #include "vec/columns/columns_number.h" #include "vec/data_types/data_type.h" #include "vec/functions/function.h" @@ -44,11 +45,16 @@ M(Date, ColumnInt64) \ M(DateTime, ColumnInt64) +#define COMPLEX_TYPE_TO_COLUMN_TYPE(M) \ + M(BitMap, ColumnBitmap) \ + M(HLL, ColumnHLL) + #define TYPE_TO_COLUMN_TYPE(M) \ NUMERIC_TYPE_TO_COLUMN_TYPE(M) \ DECIMAL_TYPE_TO_COLUMN_TYPE(M) \ STRING_TYPE_TO_COLUMN_TYPE(M) \ - TIME_TYPE_TO_COLUMN_TYPE(M) + TIME_TYPE_TO_COLUMN_TYPE(M) \ + COMPLEX_TYPE_TO_COLUMN_TYPE(M) namespace doris::vectorized { --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org For additional commands, e-mail: commits-h...@doris.apache.org