This is an automated email from the ASF dual-hosted git repository.

morningman pushed a commit to branch dev-1.0.0
in repository https://gitbox.apache.org/repos/asf/incubator-doris.git

commit fed43a19b8aa7ba16d2ab3519f843635384260fb
Author: Pxl <952130...@qq.com>
AuthorDate: Thu Mar 3 22:44:49 2022 +0800

    [fix][improvement](runtime-filter) fix string type length limit error && 
add runtime filter decimal support (#8282)
---
 be/src/exprs/runtime_filter.cpp       | 200 ++++++++++++++++++---------------
 be/src/olap/olap_define.h             |   2 +-
 be/src/vec/core/block.cpp             |   9 +-
 be/src/vec/functions/function_case.h  |   5 +-
 be/src/vec/sink/vtablet_sink.cpp      | 202 +++++++++++++++++++---------------
 be/src/vec/utils/template_helpers.hpp |   8 +-
 6 files changed, 245 insertions(+), 181 deletions(-)

diff --git a/be/src/exprs/runtime_filter.cpp b/be/src/exprs/runtime_filter.cpp
index 4b603ea..2ab2e17 100644
--- a/be/src/exprs/runtime_filter.cpp
+++ b/be/src/exprs/runtime_filter.cpp
@@ -34,9 +34,11 @@
 #include "exprs/predicate.h"
 #include "gen_cpp/internal_service.pb.h"
 #include "gen_cpp/types.pb.h"
+#include "runtime/primitive_type.h"
 #include "runtime/runtime_filter_mgr.h"
 #include "runtime/runtime_state.h"
 #include "runtime/type_limit.h"
+#include "udf/udf.h"
 #include "util/defer_op.h"
 #include "util/runtime_profile.h"
 #include "util/string_parser.hpp"
@@ -328,8 +330,13 @@ public:
               _filter_id(params->filter_id) {}
     // for a 'tmp' runtime predicate wrapper
     // only could called assign method or as a param for merge
-    RuntimePredicateWrapper(MemTracker* tracker, ObjectPool* pool, 
RuntimeFilterType type, UniqueId fragment_instance_id, uint32_t filter_id)
-            : _tracker(tracker), _pool(pool), _filter_type(type), 
_fragment_instance_id(fragment_instance_id), _filter_id(filter_id) {}
+    RuntimePredicateWrapper(MemTracker* tracker, ObjectPool* pool, 
RuntimeFilterType type,
+                            UniqueId fragment_instance_id, uint32_t filter_id)
+            : _tracker(tracker),
+              _pool(pool),
+              _filter_type(type),
+              _fragment_instance_id(fragment_instance_id),
+              _filter_id(filter_id) {}
     // init runtime filter wrapper
     // alloc memory to init runtime filter function
     Status init(const RuntimeFilterParams* params) {
@@ -410,8 +417,10 @@ public:
         case TYPE_DATE:
         case TYPE_DATETIME: {
             // DateTime->DateTimeValue
-            vectorized::DateTime date_time =*reinterpret_cast<const 
vectorized::DateTime*>(value.data);
-            vectorized::VecDateTimeValue vec_date_time_value = 
binary_cast<vectorized::Int64, vectorized::VecDateTimeValue>(date_time);
+            vectorized::DateTime date_time =
+                    *reinterpret_cast<const vectorized::DateTime*>(value.data);
+            vectorized::VecDateTimeValue vec_date_time_value =
+                    binary_cast<vectorized::Int64, 
vectorized::VecDateTimeValue>(date_time);
             doris::DateTimeValue date_time_value;
             vec_date_time_value.convert_vec_dt_to_dt(&date_time_value);
             insert(reinterpret_cast<const void*>(&date_time_value));
@@ -438,9 +447,8 @@ public:
     RuntimeFilterType get_real_type() {
         auto real_filter_type = _filter_type;
         if (real_filter_type == RuntimeFilterType::IN_OR_BLOOM_FILTER) {
-            real_filter_type = _is_bloomfilter
-                                       ? RuntimeFilterType::BLOOM_FILTER
-                                       : RuntimeFilterType::IN_FILTER;
+            real_filter_type = _is_bloomfilter ? 
RuntimeFilterType::BLOOM_FILTER
+                                               : RuntimeFilterType::IN_FILTER;
         }
         return real_filter_type;
     }
@@ -511,18 +519,18 @@ public:
     }
 
     Status merge(const RuntimePredicateWrapper* wrapper) {
-        bool can_not_merge_in_or_bloom
-            = _filter_type == RuntimeFilterType::IN_OR_BLOOM_FILTER &&
-                  (wrapper->_filter_type != RuntimeFilterType::IN_FILTER
-                   && wrapper->_filter_type != 
RuntimeFilterType::BLOOM_FILTER);
+        bool can_not_merge_in_or_bloom = _filter_type == 
RuntimeFilterType::IN_OR_BLOOM_FILTER &&
+                                         (wrapper->_filter_type != 
RuntimeFilterType::IN_FILTER &&
+                                          wrapper->_filter_type != 
RuntimeFilterType::BLOOM_FILTER);
 
-        bool can_not_merge_other = _filter_type != 
RuntimeFilterType::IN_OR_BLOOM_FILTER
-                               && _filter_type != wrapper->_filter_type;
+        bool can_not_merge_other = _filter_type != 
RuntimeFilterType::IN_OR_BLOOM_FILTER &&
+                                   _filter_type != wrapper->_filter_type;
 
         CHECK(!can_not_merge_in_or_bloom && !can_not_merge_other)
                 << "fragment instance " << _fragment_instance_id.to_string()
-                << " can not merge runtime filter(id=" << _filter_id << "), 
current is filter type is "
-                << to_string(_filter_type) << ", other filter type is " << 
to_string(wrapper->_filter_type);
+                << " can not merge runtime filter(id=" << _filter_id
+                << "), current is filter type is " << to_string(_filter_type)
+                << ", other filter type is " << 
to_string(wrapper->_filter_type);
 
         switch (_filter_type) {
         case RuntimeFilterType::IN_FILTER: {
@@ -530,8 +538,8 @@ public:
                 break;
             } else if (wrapper->_is_ignored_in_filter) {
                 VLOG_DEBUG << "fragment instance " << 
_fragment_instance_id.to_string()
-                          << " ignore merge runtime filter(in filter id "
-                          << _filter_id << ") because: " << 
*(wrapper->get_ignored_in_filter_msg());
+                           << " ignore merge runtime filter(in filter id " << 
_filter_id
+                           << ") because: " << 
*(wrapper->get_ignored_in_filter_msg());
 
                 _is_ignored_in_filter = true;
                 _ignored_in_filter_msg = wrapper->_ignored_in_filter_msg;
@@ -545,9 +553,9 @@ public:
 #ifdef VLOG_DEBUG_IS_ON
                 std::stringstream msg;
                 msg << "fragment instance " << 
_fragment_instance_id.to_string()
-                    << " ignore merge runtime filter(in filter id "
-                    << _filter_id << ") because: in_num(" << 
_hybrid_set->size()
-                    << ") >= max_in_num(" << _max_in_num << ")";
+                    << " ignore merge runtime filter(in filter id " << 
_filter_id
+                    << ") because: in_num(" << _hybrid_set->size() << ") >= 
max_in_num("
+                    << _max_in_num << ")";
                 _ignored_in_filter_msg = _pool->add(new 
std::string(msg.str()));
 #else
                 _ignored_in_filter_msg = _pool->add(new 
std::string("ignored"));
@@ -568,9 +576,8 @@ public:
             break;
         }
         case RuntimeFilterType::IN_OR_BLOOM_FILTER: {
-            auto real_filter_type = _is_bloomfilter
-                                       ? RuntimeFilterType::BLOOM_FILTER
-                                       : RuntimeFilterType::IN_FILTER;
+            auto real_filter_type = _is_bloomfilter ? 
RuntimeFilterType::BLOOM_FILTER
+                                                    : 
RuntimeFilterType::IN_FILTER;
             if (real_filter_type == RuntimeFilterType::IN_FILTER) {
                 if (wrapper->_filter_type == RuntimeFilterType::IN_FILTER) { 
// in merge in
                     CHECK(!wrapper->_is_ignored_in_filter)
@@ -581,21 +588,22 @@ public:
                     _hybrid_set->insert(wrapper->_hybrid_set.get());
                     if (_max_in_num >= 0 && _hybrid_set->size() >= 
_max_in_num) {
                         VLOG_DEBUG << "fragment instance " << 
_fragment_instance_id.to_string()
-                            << " change runtime filter to bloom filter(id=" << 
_filter_id
-                            << ") because: in_num(" << _hybrid_set->size()
-                            << ") >= max_in_num(" << _max_in_num << ")";
+                                   << " change runtime filter to bloom 
filter(id=" << _filter_id
+                                   << ") because: in_num(" << 
_hybrid_set->size()
+                                   << ") >= max_in_num(" << _max_in_num << ")";
                         change_to_bloom_filter();
                     }
-                // in merge bloom filter
+                    // in merge bloom filter
                 } else {
                     VLOG_DEBUG << "fragment instance " << 
_fragment_instance_id.to_string()
-                        << " change runtime filter to bloom filter(id=" << 
_filter_id
-                        << ") because: already exist a bloom filter";
+                               << " change runtime filter to bloom filter(id=" 
<< _filter_id
+                               << ") because: already exist a bloom filter";
                     change_to_bloom_filter();
                     _bloomfilter_func->merge(wrapper->_bloomfilter_func.get());
                 }
             } else {
-                if (wrapper->_filter_type == RuntimeFilterType::IN_FILTER) { 
// bloom filter merge in
+                if (wrapper->_filter_type ==
+                    RuntimeFilterType::IN_FILTER) { // bloom filter merge in
                     CHECK(!wrapper->_is_ignored_in_filter)
                             << "fragment instance " << 
_fragment_instance_id.to_string()
                             << " can not ignore merge runtime filter(in filter 
id "
@@ -607,7 +615,7 @@ public:
                         _bloomfilter_func->insert(value);
                         it->next();
                     }
-                // bloom filter merge bloom filter
+                    // bloom filter merge bloom filter
                 } else {
                     _bloomfilter_func->merge(wrapper->_bloomfilter_func.get());
                 }
@@ -626,7 +634,8 @@ public:
 
         PrimitiveType type = to_primitive_type(in_filter->column_type());
         if (in_filter->has_ignored_msg()) {
-            VLOG_DEBUG << "Ignore in filter(id=" << _filter_id << ") because: 
" << in_filter->ignored_msg();
+            VLOG_DEBUG << "Ignore in filter(id=" << _filter_id
+                       << ") because: " << in_filter->ignored_msg();
             _is_ignored_in_filter = true;
             _ignored_in_filter_msg = _pool->add(new 
std::string(in_filter->ignored_msg()));
             return Status::OK();
@@ -634,60 +643,68 @@ public:
         _hybrid_set.reset(create_set(type));
         switch (type) {
         case TYPE_BOOLEAN: {
-            batch_assign(in_filter, [](std::unique_ptr<HybridSetBase> &set, 
PColumnValue &column, ObjectPool *pool) {
+            batch_assign(in_filter, [](std::unique_ptr<HybridSetBase>& set, 
PColumnValue& column,
+                                       ObjectPool* pool) {
                 bool bool_val = column.boolval();
                 set->insert(&bool_val);
             });
             break;
         }
         case TYPE_TINYINT: {
-            batch_assign(in_filter, [](std::unique_ptr<HybridSetBase> &set, 
PColumnValue &column, ObjectPool *pool) {
+            batch_assign(in_filter, [](std::unique_ptr<HybridSetBase>& set, 
PColumnValue& column,
+                                       ObjectPool* pool) {
                 int8_t int_val = static_cast<int8_t>(column.intval());
                 set->insert(&int_val);
             });
             break;
         }
         case TYPE_SMALLINT: {
-            batch_assign(in_filter, [](std::unique_ptr<HybridSetBase> &set, 
PColumnValue &column, ObjectPool *pool) {
+            batch_assign(in_filter, [](std::unique_ptr<HybridSetBase>& set, 
PColumnValue& column,
+                                       ObjectPool* pool) {
                 int16_t int_val = static_cast<int16_t>(column.intval());
                 set->insert(&int_val);
             });
             break;
         }
         case TYPE_INT: {
-            batch_assign(in_filter, [](std::unique_ptr<HybridSetBase> &set, 
PColumnValue &column, ObjectPool *pool) {
+            batch_assign(in_filter, [](std::unique_ptr<HybridSetBase>& set, 
PColumnValue& column,
+                                       ObjectPool* pool) {
                 int32_t int_val = column.intval();
                 set->insert(&int_val);
             });
             break;
         }
         case TYPE_BIGINT: {
-            batch_assign(in_filter, [](std::unique_ptr<HybridSetBase> &set, 
PColumnValue &column, ObjectPool *pool) {
+            batch_assign(in_filter, [](std::unique_ptr<HybridSetBase>& set, 
PColumnValue& column,
+                                       ObjectPool* pool) {
                 int64_t long_val = column.longval();
                 set->insert(&long_val);
             });
             break;
         }
         case TYPE_LARGEINT: {
-            batch_assign(in_filter, [](std::unique_ptr<HybridSetBase> &set, 
PColumnValue &column, ObjectPool *pool) {
+            batch_assign(in_filter, [](std::unique_ptr<HybridSetBase>& set, 
PColumnValue& column,
+                                       ObjectPool* pool) {
                 auto string_val = column.stringval();
                 StringParser::ParseResult result;
-                int128_t int128_val = 
StringParser::string_to_int<int128_t>(string_val.c_str(),
-                                                                            
string_val.length(), &result);
+                int128_t int128_val = StringParser::string_to_int<int128_t>(
+                        string_val.c_str(), string_val.length(), &result);
                 DCHECK(result == StringParser::PARSE_SUCCESS);
                 set->insert(&int128_val);
             });
             break;
         }
         case TYPE_FLOAT: {
-            batch_assign(in_filter, [](std::unique_ptr<HybridSetBase> &set, 
PColumnValue &column, ObjectPool *pool) {
+            batch_assign(in_filter, [](std::unique_ptr<HybridSetBase>& set, 
PColumnValue& column,
+                                       ObjectPool* pool) {
                 float float_val = static_cast<float>(column.doubleval());
                 set->insert(&float_val);
             });
             break;
         }
         case TYPE_DOUBLE: {
-            batch_assign(in_filter, [](std::unique_ptr<HybridSetBase> &set, 
PColumnValue &column, ObjectPool *pool) {
+            batch_assign(in_filter, [](std::unique_ptr<HybridSetBase>& set, 
PColumnValue& column,
+                                       ObjectPool* pool) {
                 double double_val = column.doubleval();
                 set->insert(&double_val);
             });
@@ -695,19 +712,30 @@ public:
         }
         case TYPE_DATETIME:
         case TYPE_DATE: {
-            batch_assign(in_filter, [](std::unique_ptr<HybridSetBase> &set, 
PColumnValue &column, ObjectPool *pool) {
-                auto &string_val_ref = column.stringval();
+            batch_assign(in_filter, [](std::unique_ptr<HybridSetBase>& set, 
PColumnValue& column,
+                                       ObjectPool* pool) {
+                auto& string_val_ref = column.stringval();
                 DateTimeValue datetime_val;
                 datetime_val.from_date_str(string_val_ref.c_str(), 
string_val_ref.length());
                 set->insert(&datetime_val);
             });
             break;
         }
+        case TYPE_DECIMALV2: {
+            batch_assign(in_filter, [](std::unique_ptr<HybridSetBase>& set, 
PColumnValue& column,
+                                       ObjectPool* pool) {
+                auto& string_val_ref = column.stringval();
+                DecimalV2Value decimal_val(string_val_ref);
+                set->insert(&decimal_val);
+            });
+            break;
+        }
         case TYPE_VARCHAR:
         case TYPE_CHAR:
         case TYPE_STRING: {
-            batch_assign(in_filter, [](std::unique_ptr<HybridSetBase> &set, 
PColumnValue &column, ObjectPool *pool) {
-                auto &string_val_ref = column.stringval();
+            batch_assign(in_filter, [](std::unique_ptr<HybridSetBase>& set, 
PColumnValue& column,
+                                       ObjectPool* pool) {
+                auto& string_val_ref = column.stringval();
                 auto val_ptr = pool->add(new std::string(string_val_ref));
                 StringValue string_val(const_cast<char*>(val_ptr->c_str()), 
val_ptr->length());
                 set->insert(&string_val);
@@ -742,10 +770,8 @@ public:
         _minmax_func.reset(create_minmax_filter(type));
         switch (type) {
         case TYPE_BOOLEAN: {
-            bool min_val;
-            bool max_val;
-            min_val = minmax_filter->min_val().boolval();
-            max_val = minmax_filter->max_val().boolval();
+            bool min_val = minmax_filter->min_val().boolval();
+            bool max_val = minmax_filter->max_val().boolval();
             return _minmax_func->assign(&min_val, &max_val);
         }
         case TYPE_TINYINT: {
@@ -791,17 +817,13 @@ public:
             return _minmax_func->assign(&min_val, &max_val);
         }
         case TYPE_FLOAT: {
-            float min_val;
-            float max_val;
-            min_val = static_cast<float>(minmax_filter->min_val().doubleval());
-            max_val = static_cast<float>(minmax_filter->max_val().doubleval());
+            float min_val = 
static_cast<float>(minmax_filter->min_val().doubleval());
+            float max_val = 
static_cast<float>(minmax_filter->max_val().doubleval());
             return _minmax_func->assign(&min_val, &max_val);
         }
         case TYPE_DOUBLE: {
-            double min_val;
-            double max_val;
-            min_val = 
static_cast<double>(minmax_filter->min_val().doubleval());
-            max_val = 
static_cast<double>(minmax_filter->max_val().doubleval());
+            double min_val = 
static_cast<double>(minmax_filter->min_val().doubleval());
+            double max_val = 
static_cast<double>(minmax_filter->max_val().doubleval());
             return _minmax_func->assign(&min_val, &max_val);
         }
         case TYPE_DATETIME:
@@ -814,6 +836,13 @@ public:
             max_val.from_date_str(max_val_ref.c_str(), max_val_ref.length());
             return _minmax_func->assign(&min_val, &max_val);
         }
+        case TYPE_DECIMALV2: {
+            auto& min_val_ref = minmax_filter->min_val().stringval();
+            auto& max_val_ref = minmax_filter->max_val().stringval();
+            DecimalV2Value min_val(min_val_ref);
+            DecimalV2Value max_val(max_val_ref);
+            return _minmax_func->assign(&min_val, &max_val);
+        }
         case TYPE_VARCHAR:
         case TYPE_CHAR:
         case TYPE_STRING: {
@@ -869,20 +898,15 @@ public:
         }
     }
 
-    bool is_bloomfilter() const {
-        return _is_bloomfilter;
-    }
+    bool is_bloomfilter() const { return _is_bloomfilter; }
 
-    bool is_ignored_in_filter() const {
-        return _is_ignored_in_filter;
-    }
+    bool is_ignored_in_filter() const { return _is_ignored_in_filter; }
 
-    std::string* get_ignored_in_filter_msg() const {
-        return _ignored_in_filter_msg;
-    }
+    std::string* get_ignored_in_filter_msg() const { return 
_ignored_in_filter_msg; }
 
     void batch_assign(const PInFilter* filter,
-                      void (*assign_func) (std::unique_ptr<HybridSetBase> 
&_hybrid_set, PColumnValue&, ObjectPool*)) {
+                      void (*assign_func)(std::unique_ptr<HybridSetBase>& 
_hybrid_set,
+                                          PColumnValue&, ObjectPool*)) {
         for (int i = 0; i < filter->values_size(); ++i) {
             PColumnValue column = filter->values(i);
             assign_func(_hybrid_set, column, _pool);
@@ -1085,7 +1109,8 @@ Status IRuntimeFilter::_create_wrapper(const T* param, 
MemTracker* tracker, Obje
                                        
std::unique_ptr<RuntimePredicateWrapper>* wrapper) {
     int filter_type = param->request->filter_type();
     wrapper->reset(new RuntimePredicateWrapper(tracker, pool, 
get_type(filter_type),
-                            UniqueId(param->request->fragment_id()), 
param->request->filter_id()));
+                                               
UniqueId(param->request->fragment_id()),
+                                               param->request->filter_id()));
 
     switch (filter_type) {
     case PFilterType::IN_FILTER: {
@@ -1122,7 +1147,8 @@ void IRuntimeFilter::init_profile(RuntimeProfile* 
parent_profile) {
 
 void IRuntimeFilter::update_runtime_filter_type_to_profile() {
     if (_profile.get() != nullptr) {
-        _profile->add_info_string("RealRuntimeFilterType", 
::doris::to_string(_wrapper->get_real_type()));
+        _profile->add_info_string("RealRuntimeFilterType",
+                                  
::doris::to_string(_wrapper->get_real_type()));
     }
 }
 
@@ -1157,7 +1183,7 @@ const RuntimePredicateWrapper* 
IRuntimeFilter::get_wrapper() {
 
 template <typename T>
 void batch_copy(PInFilter* filter, HybridSetBase::IteratorBase* it,
-                void (*set_func) (PColumnValue*, const T*)) {
+                void (*set_func)(PColumnValue*, const T*)) {
     while (it->has_next()) {
         const void* void_value = it->get_value();
         auto origin_value = reinterpret_cast<const T*>(void_value);
@@ -1170,9 +1196,8 @@ template <class T>
 Status IRuntimeFilter::serialize_impl(T* request, void** data, int* len) {
     auto real_runtime_filter_type = _runtime_filter_type;
     if (real_runtime_filter_type == RuntimeFilterType::IN_OR_BLOOM_FILTER) {
-        real_runtime_filter_type = _wrapper->is_bloomfilter()
-                                      ? RuntimeFilterType::BLOOM_FILTER
-                                      : RuntimeFilterType::IN_FILTER;
+        real_runtime_filter_type = _wrapper->is_bloomfilter() ? 
RuntimeFilterType::BLOOM_FILTER
+                                                              : 
RuntimeFilterType::IN_FILTER;
     }
 
     request->set_filter_type(get_type(real_runtime_filter_type));
@@ -1209,56 +1234,56 @@ void IRuntimeFilter::to_protobuf(PInFilter* filter) {
 
     switch (column_type) {
     case TYPE_BOOLEAN: {
-        batch_copy<int32_t>(filter, it, [](PColumnValue *column, const int32_t 
*value) {
+        batch_copy<int32_t>(filter, it, [](PColumnValue* column, const 
int32_t* value) {
             column->set_boolval(*value);
         });
         return;
     }
     case TYPE_TINYINT: {
-        batch_copy<int8_t>(filter, it, [](PColumnValue *column, const int8_t 
*value) {
+        batch_copy<int8_t>(filter, it, [](PColumnValue* column, const int8_t* 
value) {
             column->set_intval(*value);
         });
         return;
     }
     case TYPE_SMALLINT: {
-        batch_copy<int16_t>(filter, it, [](PColumnValue *column, const int16_t 
*value) {
+        batch_copy<int16_t>(filter, it, [](PColumnValue* column, const 
int16_t* value) {
             column->set_intval(*value);
         });
         return;
     }
     case TYPE_INT: {
-        batch_copy<int32_t>(filter, it, [](PColumnValue *column, const int32_t 
*value) {
+        batch_copy<int32_t>(filter, it, [](PColumnValue* column, const 
int32_t* value) {
             column->set_intval(*value);
         });
         return;
     }
     case TYPE_BIGINT: {
-        batch_copy<int64_t>(filter, it, [](PColumnValue *column, const int64_t 
*value) {
+        batch_copy<int64_t>(filter, it, [](PColumnValue* column, const 
int64_t* value) {
             column->set_longval(*value);
         });
         return;
     }
     case TYPE_LARGEINT: {
-        batch_copy<int128_t>(filter, it, [](PColumnValue *column, const 
int128_t *value) {
+        batch_copy<int128_t>(filter, it, [](PColumnValue* column, const 
int128_t* value) {
             column->set_stringval(LargeIntValue::to_string(*value));
         });
         return;
     }
     case TYPE_FLOAT: {
-        batch_copy<float>(filter, it, [](PColumnValue *column, const float 
*value) {
+        batch_copy<float>(filter, it, [](PColumnValue* column, const float* 
value) {
             column->set_doubleval(*value);
         });
         return;
     }
     case TYPE_DOUBLE: {
-        batch_copy<double>(filter, it, [](PColumnValue *column, const double 
*value) {
+        batch_copy<double>(filter, it, [](PColumnValue* column, const double* 
value) {
             column->set_doubleval(*value);
         });
         return;
     }
     case TYPE_DATE:
     case TYPE_DATETIME: {
-        batch_copy<DateTimeValue>(filter, it, [](PColumnValue *column, const 
DateTimeValue *value) {
+        batch_copy<DateTimeValue>(filter, it, [](PColumnValue* column, const 
DateTimeValue* value) {
             char convert_buffer[30];
             value->to_string(convert_buffer);
             column->set_stringval(convert_buffer);
@@ -1266,15 +1291,16 @@ void IRuntimeFilter::to_protobuf(PInFilter* filter) {
         return;
     }
     case TYPE_DECIMALV2: {
-        batch_copy<DecimalV2Value>(filter, it, [](PColumnValue *column, const 
DecimalV2Value *value) {
-            column->set_stringval(value->to_string());
-        });
+        batch_copy<DecimalV2Value>(filter, it,
+                                   [](PColumnValue* column, const 
DecimalV2Value* value) {
+                                       
column->set_stringval(value->to_string());
+                                   });
         return;
     }
     case TYPE_CHAR:
     case TYPE_VARCHAR:
     case TYPE_STRING: {
-        batch_copy<StringValue>(filter, it, [](PColumnValue *column, const 
StringValue *value) {
+        batch_copy<StringValue>(filter, it, [](PColumnValue* column, const 
StringValue* value) {
             column->set_stringval(std::string(value->ptr, value->len));
         });
         return;
diff --git a/be/src/olap/olap_define.h b/be/src/olap/olap_define.h
index 19205e6..b16c614 100644
--- a/be/src/olap/olap_define.h
+++ b/be/src/olap/olap_define.h
@@ -57,7 +57,7 @@ static const uint16_t OLAP_VARCHAR_MAX_LENGTH = 65535;
 static const uint32_t OLAP_STRING_MAX_LENGTH = 2147483647;
 
 // the max length supported for vec string type 1MB
-static constexpr auto MAX_SIZE_OF_VEC_STRING = 1024l * 1024;
+static constexpr size_t MAX_SIZE_OF_VEC_STRING = 1024 * 1024;
 
 // the max length supported for array
 static const uint16_t OLAP_ARRAY_MAX_LENGTH = 65535;
diff --git a/be/src/vec/core/block.cpp b/be/src/vec/core/block.cpp
index f888ad1..1dda373 100644
--- a/be/src/vec/core/block.cpp
+++ b/be/src/vec/core/block.cpp
@@ -44,10 +44,10 @@
 #include "vec/data_types/data_type_date.h"
 #include "vec/data_types/data_type_date_time.h"
 #include "vec/data_types/data_type_decimal.h"
+#include "vec/data_types/data_type_hll.h"
 #include "vec/data_types/data_type_nullable.h"
 #include "vec/data_types/data_type_number.h"
 #include "vec/data_types/data_type_string.h"
-#include "vec/data_types/data_type_hll.h"
 
 namespace doris::vectorized {
 
@@ -718,14 +718,12 @@ Status Block::serialize(PBlock* pblock, size_t* 
uncompressed_bytes, size_t* comp
     }
 
     // serialize data values
+    // when data type is HLL, content_uncompressed_size maybe larger than real 
size.
     allocated_buf->resize(content_uncompressed_size);
     char* buf = allocated_buf->data();
-    char* start_buf = buf;
     for (const auto& c : *this) {
         buf = c.type->serialize(*(c.column), buf);
     }
-    CHECK(content_uncompressed_size == (buf - start_buf))
-            << content_uncompressed_size << " vs. " << (buf - start_buf);
     *uncompressed_bytes = content_uncompressed_size;
 
     // compress
@@ -792,7 +790,8 @@ doris::Tuple* Block::deep_copy_tuple(const 
doris::TupleDescriptor& desc, MemPool
 
         if (!slot_desc->type().is_string_type() && 
!slot_desc->type().is_date_type()) {
             memcpy((void*)dst->get_slot(slot_desc->tuple_offset()), 
data_ref.data, data_ref.size);
-        } else if (slot_desc->type().is_string_type() && slot_desc->type() != 
TYPE_OBJECT && slot_desc->type() != TYPE_HLL) {
+        } else if (slot_desc->type().is_string_type() && slot_desc->type() != 
TYPE_OBJECT &&
+                   slot_desc->type() != TYPE_HLL) {
             memcpy((void*)dst->get_slot(slot_desc->tuple_offset()), (const 
void*)(&data_ref),
                    sizeof(data_ref));
             // Copy the content of string
diff --git a/be/src/vec/functions/function_case.h 
b/be/src/vec/functions/function_case.h
index 1b728e9..7cea64a 100644
--- a/be/src/vec/functions/function_case.h
+++ b/be/src/vec/functions/function_case.h
@@ -17,6 +17,7 @@
 
 #pragma once
 
+#include "vec/columns/column_complex.h"
 #include "vec/data_types/data_type_nullable.h"
 #include "vec/functions/function.h"
 #include "vec/functions/function_helpers.h"
@@ -188,7 +189,9 @@ public:
                                  uint8* then_idx, CaseWhenColumnHolder& 
column_holder) {
         auto result_column_ptr = data_type->create_column();
 
-        if constexpr (std::is_same_v<ColumnType, ColumnString>) {
+        if constexpr (std::is_same_v<ColumnType, ColumnString> ||
+                      std::is_same_v<ColumnType, ColumnBitmap> ||
+                      std::is_same_v<ColumnType, ColumnHLL>) {
             // result_column and all then_column is not nullable.
             // can't simd when type is string.
             update_result_normal(result_column_ptr, then_idx, column_holder);
diff --git a/be/src/vec/sink/vtablet_sink.cpp b/be/src/vec/sink/vtablet_sink.cpp
index f8df6a6..0c486ff 100644
--- a/be/src/vec/sink/vtablet_sink.cpp
+++ b/be/src/vec/sink/vtablet_sink.cpp
@@ -15,18 +15,18 @@
 // specific language governing permissions and limitations
 // under the License.
 
-#include "util/doris_metrics.h"
+#include "vec/sink/vtablet_sink.h"
 
+#include "util/doris_metrics.h"
+#include "vec/core/block.h"
 #include "vec/exprs/vexpr.h"
 #include "vec/exprs/vexpr_context.h"
-#include "vec/sink/vtablet_sink.h"
-#include "vec/core/block.h"
 
 namespace doris {
 namespace stream_load {
 
 VOlapTableSink::VOlapTableSink(ObjectPool* pool, const RowDescriptor& row_desc,
-                             const std::vector<TExpr>& texprs, Status* status)
+                               const std::vector<TExpr>& texprs, Status* 
status)
         : OlapTableSink(pool, row_desc, texprs, status) {
     // From the thrift expressions create the real exprs.
     vectorized::VExpr::create_expr_trees(pool, texprs, &_output_vexpr_ctxs);
@@ -43,7 +43,8 @@ Status VOlapTableSink::init(const TDataSink& sink) {
 
 Status VOlapTableSink::prepare(RuntimeState* state) {
     // Prepare the exprs to run.
-    RETURN_IF_ERROR(vectorized::VExpr::prepare(_output_vexpr_ctxs, state, 
_input_row_desc, _expr_mem_tracker));
+    RETURN_IF_ERROR(vectorized::VExpr::prepare(_output_vexpr_ctxs, state, 
_input_row_desc,
+                                               _expr_mem_tracker));
     return OlapTableSink::prepare(state);
 }
 
@@ -58,7 +59,9 @@ Status VOlapTableSink::send(RuntimeState* state, 
vectorized::Block* input_block)
 
     auto rows = input_block->rows();
     auto bytes = input_block->bytes();
-    if (UNLIKELY(rows == 0)) { return status; }
+    if (UNLIKELY(rows == 0)) {
+        return status;
+    }
 
     SCOPED_TIMER(_profile->total_time_counter());
     _number_input_rows += rows;
@@ -73,8 +76,10 @@ Status VOlapTableSink::send(RuntimeState* state, 
vectorized::Block* input_block)
     if (!_output_vexpr_ctxs.empty()) {
         // Do vectorized expr here to speed up load
         block = vectorized::VExprContext::get_output_block_after_execute_exprs(
-            _output_vexpr_ctxs, *input_block, status);
-        if (UNLIKELY(block.rows() == 0)) { return status; }
+                _output_vexpr_ctxs, *input_block, status);
+        if (UNLIKELY(block.rows() == 0)) {
+            return status;
+        }
     }
 
     auto num_rows = block.rows();
@@ -83,7 +88,8 @@ Status VOlapTableSink::send(RuntimeState* state, 
vectorized::Block* input_block)
         SCOPED_RAW_TIMER(&_validate_data_ns);
         _filter_bitmap.Reset(block.rows());
         bool stop_processing = false;
-        RETURN_IF_ERROR(_validate_data(state, &block, &_filter_bitmap, 
&filtered_rows, &stop_processing));
+        RETURN_IF_ERROR(
+                _validate_data(state, &block, &_filter_bitmap, &filtered_rows, 
&stop_processing));
         _number_filtered_rows += filtered_rows;
         if (stop_processing) {
             // should be returned after updating "_number_filtered_rows", to 
make sure that load job can be cancelled
@@ -106,10 +112,11 @@ Status VOlapTableSink::send(RuntimeState* state, 
vectorized::Block* input_block)
         if (!_vpartition->find_tablet(&block_row, &partition, &dist_hash)) {
             RETURN_IF_ERROR(state->append_error_msg_to_file([]() -> 
std::string { return ""; },
                     [&]() -> std::string {
-                    fmt::memory_buffer buf;
-                    fmt::format_to(buf, "no partition for this tuple. 
tuple=[]");
-                    return buf.data();
-                    }, &stop_processing));
+                        fmt::memory_buffer buf;
+                        fmt::format_to(buf, "no partition for this tuple. 
tuple=[]");
+                        return buf.data();
+                    },
+                    &stop_processing));
             _number_filtered_rows++;
             if (stop_processing) {
                 return Status::EndOfFile("Encountered unqualified data, stop 
processing");
@@ -138,24 +145,28 @@ Status VOlapTableSink::close(RuntimeState* state, Status 
exec_status) {
     return OlapTableSink::close(state, exec_status);
 }
 
-Status VOlapTableSink::_validate_data(RuntimeState* state, vectorized::Block* 
block, Bitmap* filter_bitmap, int* filtered_rows,
-                          bool* stop_processing) {
+Status VOlapTableSink::_validate_data(RuntimeState* state, vectorized::Block* 
block,
+                                      Bitmap* filter_bitmap, int* 
filtered_rows,
+                                      bool* stop_processing) {
     const auto num_rows = block->rows();
     fmt::memory_buffer error_msg;
     auto set_invalid_and_append_error_msg = [&](int row) {
-         filter_bitmap->Set(row, true);
-         return state->append_error_msg_to_file([]() -> std::string { return 
""; },
-                 [&error_msg]() -> std::string { return error_msg.data(); }, 
stop_processing);
+        filter_bitmap->Set(row, true);
+        return state->append_error_msg_to_file(
+                []() -> std::string { return ""; },
+                [&error_msg]() -> std::string { return error_msg.data(); }, 
stop_processing);
     };
 
     for (int i = 0; i < _output_tuple_desc->slots().size(); ++i) {
         SlotDescriptor* desc = _output_tuple_desc->slots()[i];
-        block->get_by_position(i).column = 
block->get_by_position(i).column->convert_to_full_column_if_const();
+        block->get_by_position(i).column =
+                
block->get_by_position(i).column->convert_to_full_column_if_const();
         const auto& column = block->get_by_position(i).column;
 
         if (desc->is_nullable() && desc->type() == TYPE_OBJECT) {
-            const auto& null_map = 
vectorized::check_and_get_column<vectorized::ColumnNullable>(*column)
-                    ->get_null_map_data();
+            const auto& null_map =
+                    
vectorized::check_and_get_column<vectorized::ColumnNullable>(*column)
+                            ->get_null_map_data();
             fmt::format_to(error_msg, "null is not allowed for bitmap column, 
column_name: {}; ",
                            desc->col_name());
 
@@ -168,85 +179,103 @@ Status VOlapTableSink::_validate_data(RuntimeState* 
state, vectorized::Block* bl
             }
         } else {
             auto column_ptr = 
vectorized::check_and_get_column<vectorized::ColumnNullable>(*column);
-            auto& real_column_ptr = column_ptr == nullptr ? column : 
(column_ptr->get_nested_column_ptr());
+            auto& real_column_ptr =
+                    column_ptr == nullptr ? column : 
(column_ptr->get_nested_column_ptr());
 
             switch (desc->type().type) {
-                case TYPE_CHAR:
-                case TYPE_VARCHAR:
-                case TYPE_STRING: {
-                    const auto column_string = assert_cast<const 
vectorized::ColumnString *>(real_column_ptr.get());
-
-                    for (int j = 0; j < num_rows; ++j) {
-                        if (!filter_bitmap->Get(j)) {
-                            auto str_val = column_string->get_data_at(j);
-                            bool invalid = str_val.size > 
std::min(desc->type().len, (int)MAX_SIZE_OF_VEC_STRING);
-
-                            error_msg.clear();
-                            if (str_val.size > desc->type().len) {
-                                fmt::format_to(error_msg, "{}", "the length of 
input is too long than schema. ");
-                                fmt::format_to(error_msg, "column_name: {}; ", 
desc->col_name());
-                                fmt::format_to(error_msg, "input str: [{}] ", 
str_val.to_string());
-                                fmt::format_to(error_msg, "schema length: {}; 
", desc->type().len);
-                                fmt::format_to(error_msg, "actual length: {}; 
", str_val.size);
-                            } else if (str_val.size > MAX_SIZE_OF_VEC_STRING) {
-                                fmt::format_to(error_msg, "{}", "the length of 
input string is too long than vec schema. ");
-                                fmt::format_to(error_msg, "column_name: {}; ", 
desc->col_name());
-                                fmt::format_to(error_msg, "input str: [{}] ", 
str_val.to_string());
-                                fmt::format_to(error_msg, "schema length: {}; 
", MAX_SIZE_OF_VEC_STRING);
-                                fmt::format_to(error_msg, "actual length: {}; 
", str_val.size);
-                            }
+            case TYPE_CHAR:
+            case TYPE_VARCHAR:
+            case TYPE_STRING: {
+                const auto column_string =
+                        assert_cast<const 
vectorized::ColumnString*>(real_column_ptr.get());
+
+                size_t limit = MAX_SIZE_OF_VEC_STRING;
+                if (desc->type().type != TYPE_STRING) {
+                    DCHECK(desc->type().len >= 0);
+                    limit = std::min(limit, (size_t)desc->type().len);
+                }
 
-                            if (invalid) {
-                                
RETURN_IF_ERROR(set_invalid_and_append_error_msg(j));
-                            }
+                for (int j = 0; j < num_rows; ++j) {
+                    if (!filter_bitmap->Get(j)) {
+                        auto str_val = column_string->get_data_at(j);
+                        bool invalid = str_val.size > limit;
+
+                        error_msg.clear();
+                        if (str_val.size > desc->type().len) {
+                            fmt::format_to(error_msg, "{}",
+                                           "the length of input is too long 
than schema. ");
+                            fmt::format_to(error_msg, "column_name: {}; ", 
desc->col_name());
+                            fmt::format_to(error_msg, "input str: [{}] ", 
str_val.to_string());
+                            fmt::format_to(error_msg, "schema length: {}; ", 
desc->type().len);
+                            fmt::format_to(error_msg, "actual length: {}; ", 
str_val.size);
+                        } else if (str_val.size > MAX_SIZE_OF_VEC_STRING) {
+                            fmt::format_to(
+                                    error_msg, "{}",
+                                    "the length of input string is too long 
than vec schema. ");
+                            fmt::format_to(error_msg, "column_name: {}; ", 
desc->col_name());
+                            fmt::format_to(error_msg, "input str: [{}] ", 
str_val.to_string());
+                            fmt::format_to(error_msg, "schema length: {}; ",
+                                           MAX_SIZE_OF_VEC_STRING);
+                            fmt::format_to(error_msg, "actual length: {}; ", 
str_val.size);
+                        }
+
+                        if (invalid) {
+                            
RETURN_IF_ERROR(set_invalid_and_append_error_msg(j));
                         }
                     }
-                    break;
                 }
-                case TYPE_DECIMALV2: {
-                    auto column_decimal = const_cast<vectorized::ColumnDecimal
-                            <vectorized::Decimal128> *>(assert_cast<const 
vectorized::ColumnDecimal
-                            <vectorized::Decimal128> 
*>(real_column_ptr.get()));
-
-                    for (int j = 0; j < num_rows; ++j) {
-                        if (!filter_bitmap->Get(j)) {
-                            auto dec_val = binary_cast<vectorized::Int128, 
DecimalV2Value>(
-                                    column_decimal->get_data()[j]);
-                            error_msg.clear();
-                            bool invalid = false;
-
-                            if 
(dec_val.greater_than_scale(desc->type().scale)) {
-                                auto code = dec_val.round(&dec_val, 
desc->type().scale, HALF_UP);
-                                column_decimal->get_data()[j] = 
binary_cast<DecimalV2Value, vectorized::Int128>(
-                                        dec_val);
-
-                                if (code != E_DEC_OK) {
-                                    fmt::format_to(error_msg, "round one 
decimal failed.value={}; ", dec_val.to_string());
-                                    invalid = true;
-                                }
-                            }
-                            if (dec_val > _max_decimalv2_val[i] || dec_val < 
_min_decimalv2_val[i]) {
-                                fmt::format_to(error_msg, "decimal value is 
not valid for definition, column={}", desc->col_name());
-                                fmt::format_to(error_msg, ", value={}", 
dec_val.to_string());
-                                fmt::format_to(error_msg, ", precision={}, 
scale={}; ", desc->type().precision, desc->type().scale);
+                break;
+            }
+            case TYPE_DECIMALV2: {
+                auto column_decimal = const_cast<
+                        vectorized::ColumnDecimal<vectorized::Decimal128>*>(
+                        assert_cast<const 
vectorized::ColumnDecimal<vectorized::Decimal128>*>(
+                                real_column_ptr.get()));
+
+                for (int j = 0; j < num_rows; ++j) {
+                    if (!filter_bitmap->Get(j)) {
+                        auto dec_val = binary_cast<vectorized::Int128, 
DecimalV2Value>(
+                                column_decimal->get_data()[j]);
+                        error_msg.clear();
+                        bool invalid = false;
+
+                        if (dec_val.greater_than_scale(desc->type().scale)) {
+                            auto code = dec_val.round(&dec_val, 
desc->type().scale, HALF_UP);
+                            column_decimal->get_data()[j] =
+                                    binary_cast<DecimalV2Value, 
vectorized::Int128>(dec_val);
+
+                            if (code != E_DEC_OK) {
+                                fmt::format_to(error_msg, "round one decimal 
failed.value={}; ",
+                                               dec_val.to_string());
                                 invalid = true;
                             }
+                        }
+                        if (dec_val > _max_decimalv2_val[i] || dec_val < 
_min_decimalv2_val[i]) {
+                            fmt::format_to(error_msg,
+                                           "decimal value is not valid for 
definition, column={}",
+                                           desc->col_name());
+                            fmt::format_to(error_msg, ", value={}", 
dec_val.to_string());
+                            fmt::format_to(error_msg, ", precision={}, 
scale={}; ",
+                                           desc->type().precision, 
desc->type().scale);
+                            invalid = true;
+                        }
 
-                            if (invalid) {
-                                
RETURN_IF_ERROR(set_invalid_and_append_error_msg(j));
-                            }
+                        if (invalid) {
+                            
RETURN_IF_ERROR(set_invalid_and_append_error_msg(j));
                         }
                     }
-                    break;
                 }
-                default:
-                    break;
+                break;
+            }
+            default:
+                break;
             }
 
             // Dispose the nullable column not match problem here, convert to 
nullable column
             if (desc->is_nullable() && !column_ptr) {
                 block->get_by_position(i).column = 
vectorized::make_nullable(column);
-                block->get_by_position(i).type = 
vectorized::make_nullable(block->get_by_position(i).type);
+                block->get_by_position(i).type =
+                        
vectorized::make_nullable(block->get_by_position(i).type);
             }
 
             // Dispose the nullable column not match problem here, convert to 
not nullable column
@@ -260,8 +289,10 @@ Status VOlapTableSink::_validate_data(RuntimeState* state, 
vectorized::Block* bl
                     }
                 }
                 block->get_by_position(i).column = 
column_ptr->get_nested_column_ptr();
-                block->get_by_position(i).type = (reinterpret_cast<const 
vectorized::DataTypeNullable*>(
-                        
block->get_by_position(i).type.get()))->get_nested_type();
+                block->get_by_position(i).type =
+                        (reinterpret_cast<const vectorized::DataTypeNullable*>(
+                                 block->get_by_position(i).type.get()))
+                                ->get_nested_type();
             }
         }
     }
@@ -275,4 +306,3 @@ Status VOlapTableSink::_validate_data(RuntimeState* state, 
vectorized::Block* bl
 
 } // namespace stream_load
 } // namespace doris
-
diff --git a/be/src/vec/utils/template_helpers.hpp 
b/be/src/vec/utils/template_helpers.hpp
index 10aac23..7fe681a 100644
--- a/be/src/vec/utils/template_helpers.hpp
+++ b/be/src/vec/utils/template_helpers.hpp
@@ -19,6 +19,7 @@
 
 #include "http/http_status.h"
 #include "vec/aggregate_functions/aggregate_function.h"
+#include "vec/columns/column_complex.h"
 #include "vec/columns/columns_number.h"
 #include "vec/data_types/data_type.h"
 #include "vec/functions/function.h"
@@ -44,11 +45,16 @@
     M(Date, ColumnInt64)            \
     M(DateTime, ColumnInt64)
 
+#define COMPLEX_TYPE_TO_COLUMN_TYPE(M) \
+    M(BitMap, ColumnBitmap)            \
+    M(HLL, ColumnHLL)
+
 #define TYPE_TO_COLUMN_TYPE(M)     \
     NUMERIC_TYPE_TO_COLUMN_TYPE(M) \
     DECIMAL_TYPE_TO_COLUMN_TYPE(M) \
     STRING_TYPE_TO_COLUMN_TYPE(M)  \
-    TIME_TYPE_TO_COLUMN_TYPE(M)
+    TIME_TYPE_TO_COLUMN_TYPE(M)    \
+    COMPLEX_TYPE_TO_COLUMN_TYPE(M)
 
 namespace doris::vectorized {
 

---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org
For additional commands, e-mail: commits-h...@doris.apache.org

Reply via email to