This is an automated email from the ASF dual-hosted git repository. panxiaolei pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/master by this push: new 0e26f28bf2 [Enhancement](runtime-filter) enlarge runtime filter in predicate threshold (#13581) 0e26f28bf2 is described below commit 0e26f28bf2b64f01b5d71c15bdc60636817a2fa1 Author: Pxl <pxl...@qq.com> AuthorDate: Thu Nov 10 15:48:46 2022 +0800 [Enhancement](runtime-filter) enlarge runtime filter in predicate threshold (#13581) enlarge runtime filter in predicate threshold --- be/src/exprs/create_predicate_function.h | 13 +- be/src/exprs/hybrid_set.h | 159 +++++++++--------- be/src/exprs/in_predicate.cpp | 15 +- be/src/exprs/in_predicate.h | 22 ++- be/src/exprs/literal.h | 47 +++--- be/src/exprs/runtime_filter.cpp | 177 +++++++++++---------- be/src/olap/CMakeLists.txt | 1 - be/src/olap/bloom_filter_predicate.cpp | 69 -------- be/src/olap/bloom_filter_predicate.h | 7 - be/src/olap/in_list_predicate.h | 135 ++++++++++++---- be/src/olap/predicate_creator.h | 68 ++++++++ be/src/olap/reader.cpp | 23 ++- be/src/olap/reader.h | 6 +- be/src/runtime/primitive_type.cpp | 61 ------- be/src/runtime/primitive_type.h | 64 +++++++- be/src/vec/columns/predicate_column.h | 3 +- be/src/vec/exec/scan/new_olap_scan_node.cpp | 26 +-- be/src/vec/exec/scan/new_olap_scan_node.h | 2 + be/src/vec/exec/scan/new_olap_scanner.cpp | 12 +- be/src/vec/exec/scan/new_olap_scanner.h | 3 + be/src/vec/exec/scan/vscan_node.cpp | 68 ++++---- be/src/vec/exec/scan/vscan_node.h | 4 + be/src/vec/exprs/vbloom_predicate.cpp | 8 +- be/src/vec/exprs/vdirect_in_predicate.h | 94 +++++++++++ be/src/vec/exprs/vexpr.h | 5 +- be/src/vec/exprs/vin_predicate.h | 25 ++- be/src/vec/exprs/vliteral.cpp | 26 ++- be/src/vec/exprs/vliteral.h | 14 +- be/src/vec/exprs/vruntimefilter_wrapper.h | 2 +- be/src/vec/functions/in.h | 17 +- .../olap/bloom_filter_column_predicate_test.cpp | 10 +- be/test/olap/in_list_predicate_test.cpp | 28 ++-- be/test/olap/rowset/segment_v2/segment_test.cpp | 11 +- .../advanced/join-optimization/runtime-filter.md | 5 +- .../advanced/join-optimization/runtime-filter.md | 5 +- .../java/org/apache/doris/qe/SessionVariable.java | 2 +- 36 files changed, 704 insertions(+), 533 deletions(-) diff --git a/be/src/exprs/create_predicate_function.h b/be/src/exprs/create_predicate_function.h index bc1ca659ea..445df3a7e3 100644 --- a/be/src/exprs/create_predicate_function.h +++ b/be/src/exprs/create_predicate_function.h @@ -122,13 +122,12 @@ inline auto create_minmax_filter(PrimitiveType type) { return create_predicate_function<MinmaxFunctionTraits>(type); } -inline auto create_set(PrimitiveType type) { - return create_predicate_function<HybridSetTraits<false>>(type); -} - -// used for VInPredicate -inline auto vec_create_set(PrimitiveType type) { - return create_predicate_function<HybridSetTraits<true>>(type); +inline auto create_set(PrimitiveType type, bool is_vectorized = false) { + if (is_vectorized) { + return create_predicate_function<HybridSetTraits<true>>(type); + } else { + return create_predicate_function<HybridSetTraits<false>>(type); + } } inline auto create_bloom_filter(PrimitiveType type) { diff --git a/be/src/exprs/hybrid_set.h b/be/src/exprs/hybrid_set.h index 571aa82b87..5941906308 100644 --- a/be/src/exprs/hybrid_set.h +++ b/be/src/exprs/hybrid_set.h @@ -19,17 +19,11 @@ #include <parallel_hashmap/phmap.h> -#include <cstring> - #include "common/object_pool.h" -#include "common/status.h" -#include "exprs/expr.h" -#include "runtime/datetime_value.h" #include "runtime/decimalv2_value.h" -#include "runtime/large_int_value.h" +#include "runtime/define_primitive_type.h" #include "runtime/primitive_type.h" #include "runtime/string_value.h" -#include "vec/exprs/vliteral.h" namespace doris { @@ -46,19 +40,19 @@ public: virtual void insert(HybridSetBase* set) = 0; virtual int size() = 0; - virtual bool find(void* data) = 0; + virtual bool find(const void* data) = 0; // use in vectorize execute engine - virtual bool find(void* data, size_t) = 0; + virtual bool find(const void* data, size_t) = 0; - virtual Status to_vexpr_list(doris::ObjectPool* pool, - std::vector<doris::vectorized::VExpr*>* vexpr_list, int precision, - int scale) = 0; + virtual void find_fixed_len(const char* data, const uint8* nullmap, int number, + uint8* results) { + LOG(FATAL) << "HybridSetBase not support find_fixed_len"; + } - virtual bool is_date_v2() { return false; } class IteratorBase { public: - IteratorBase() {} - virtual ~IteratorBase() {} + IteratorBase() = default; + virtual ~IteratorBase() = default; virtual const void* get_value() = 0; virtual bool has_next() const = 0; virtual void next() = 0; @@ -77,25 +71,10 @@ public: ~HybridSet() override = default; - bool is_date_v2() override { return T == TYPE_DATEV2; } - - Status to_vexpr_list(doris::ObjectPool* pool, - std::vector<doris::vectorized::VExpr*>* vexpr_list, int precision, - int scale) override { - HybridSetBase::IteratorBase* it = begin(); - DCHECK(it != nullptr); - while (it->has_next()) { - TExprNode node; - const void* v = it->get_value(); - create_texpr_literal_node<T>(v, &node, precision, scale); - vexpr_list->push_back(pool->add(new doris::vectorized::VLiteral(node))); - it->next(); - } - return Status::OK(); - }; - void insert(const void* data) override { - if (data == nullptr) return; + if (data == nullptr) { + return; + } if constexpr (sizeof(CppType) >= 16) { // for large int, it will core dump with no memcpy @@ -110,7 +89,7 @@ public: void insert_fixed_len(const char* data, const int* offsets, int number) override { for (int i = 0; i < number; i++) { - _set.insert(*((CppType*)data + offsets[i])); + insert((void*)((CppType*)data + offsets[i])); } } @@ -121,12 +100,27 @@ public: int size() override { return _set.size(); } - bool find(void* data) override { - auto it = _set.find(*reinterpret_cast<CppType*>(data)); + bool find(const void* data) override { + if (data == nullptr) { + return false; + } + + auto it = _set.find(*reinterpret_cast<const CppType*>(data)); return !(it == _set.end()); } - bool find(void* data, size_t) override { return find(data); } + bool find(const void* data, size_t) override { return find(data); } + + void find_fixed_len(const char* data, const uint8* nullmap, int number, + uint8* results) override { + for (int i = 0; i < number; i++) { + if (nullmap != nullptr && nullmap[i]) { + results[i] = false; + } else { + results[i] = _set.count(*((CppType*)data + i)); + } + } + } template <class _iT> class Iterator : public IteratorBase { @@ -135,9 +129,9 @@ public: typename phmap::flat_hash_set<_iT>::iterator end) : _begin(begin), _end(end) {} ~Iterator() override = default; - virtual bool has_next() const override { return !(_begin == _end); } - virtual const void* get_value() override { return _begin.operator->(); } - virtual void next() override { ++_begin; } + bool has_next() const override { return !(_begin == _end); } + const void* get_value() override { return _begin.operator->(); } + void next() override { ++_begin; } private: typename phmap::flat_hash_set<_iT>::iterator _begin; @@ -148,6 +142,8 @@ public: return _pool.add(new (std::nothrow) Iterator<CppType>(_set.begin(), _set.end())); } + phmap::flat_hash_set<CppType>* get_inner_set() { return &_set; } + private: phmap::flat_hash_set<CppType> _set; ObjectPool _pool; @@ -159,23 +155,10 @@ public: ~StringSet() override = default; - Status to_vexpr_list(doris::ObjectPool* pool, - std::vector<doris::vectorized::VExpr*>* vexpr_list, int precision, - int scale) override { - HybridSetBase::IteratorBase* it = begin(); - DCHECK(it != nullptr); - while (it->has_next()) { - TExprNode node; - const void* v = it->get_value(); - create_texpr_literal_node<TYPE_STRING>(v, &node); - vexpr_list->push_back(pool->add(new doris::vectorized::VLiteral(node))); - it->next(); - } - return Status::OK(); - }; - void insert(const void* data) override { - if (data == nullptr) return; + if (data == nullptr) { + return; + } const auto* value = reinterpret_cast<const StringValue*>(data); std::string str_value(value->ptr, value->len); @@ -198,16 +181,20 @@ public: int size() override { return _set.size(); } - bool find(void* data) override { - auto* value = reinterpret_cast<StringValue*>(data); + bool find(const void* data) override { + if (data == nullptr) { + return false; + } + + auto* value = reinterpret_cast<const StringValue*>(data); std::string_view str_value(const_cast<const char*>(value->ptr), value->len); auto it = _set.find(str_value); return !(it == _set.end()); } - bool find(void* data, size_t size) override { - std::string str_value(reinterpret_cast<char*>(data), size); + bool find(const void* data, size_t size) override { + std::string str_value(reinterpret_cast<const char*>(data), size); auto it = _set.find(str_value); return !(it == _set.end()); } @@ -218,13 +205,13 @@ public: phmap::flat_hash_set<std::string>::iterator end) : _begin(begin), _end(end) {} ~Iterator() override = default; - virtual bool has_next() const override { return !(_begin == _end); } - virtual const void* get_value() override { + bool has_next() const override { return !(_begin == _end); } + const void* get_value() override { _value.ptr = const_cast<char*>(_begin->data()); _value.len = _begin->length(); return &_value; } - virtual void next() override { ++_begin; } + void next() override { ++_begin; } private: typename phmap::flat_hash_set<std::string>::iterator _begin; @@ -236,6 +223,8 @@ public: return _pool.add(new (std::nothrow) Iterator(_set.begin(), _set.end())); } + phmap::flat_hash_set<std::string>* get_inner_set() { return &_set; } + private: phmap::flat_hash_set<std::string> _set; ObjectPool _pool; @@ -250,23 +239,10 @@ public: ~StringValueSet() override = default; - Status to_vexpr_list(doris::ObjectPool* pool, - std::vector<doris::vectorized::VExpr*>* vexpr_list, int precision, - int scale) override { - HybridSetBase::IteratorBase* it = begin(); - DCHECK(it != nullptr); - while (it->has_next()) { - TExprNode node; - const void* v = it->get_value(); - create_texpr_literal_node<TYPE_STRING>(v, &node); - vexpr_list->push_back(pool->add(new doris::vectorized::VLiteral(node))); - it->next(); - } - return Status::OK(); - }; - void insert(const void* data) override { - if (data == nullptr) return; + if (data == nullptr) { + return; + } const auto* value = reinterpret_cast<const StringValue*>(data); StringValue sv(value->ptr, value->len); @@ -289,16 +265,23 @@ public: int size() override { return _set.size(); } - bool find(void* data) override { - auto* value = reinterpret_cast<StringValue*>(data); + bool find(const void* data) override { + if (data == nullptr) { + return false; + } + + auto* value = reinterpret_cast<const StringValue*>(data); auto it = _set.find(*value); return !(it == _set.end()); } - bool find(void* data, size_t size) override { - // std::string str_value(reinterpret_cast<char*>(data), size); - StringValue sv(reinterpret_cast<char*>(data), size); + bool find(const void* data, size_t size) override { + if (data == nullptr) { + return false; + } + + StringValue sv(reinterpret_cast<const char*>(data), size); auto it = _set.find(sv); return !(it == _set.end()); } @@ -309,13 +292,13 @@ public: phmap::flat_hash_set<StringValue>::iterator end) : _begin(begin), _end(end) {} ~Iterator() override = default; - virtual bool has_next() const override { return !(_begin == _end); } - virtual const void* get_value() override { + bool has_next() const override { return !(_begin == _end); } + const void* get_value() override { _value.ptr = const_cast<char*>(_begin->ptr); _value.len = _begin->len; return &_value; } - virtual void next() override { ++_begin; } + void next() override { ++_begin; } private: typename phmap::flat_hash_set<StringValue>::iterator _begin; @@ -327,6 +310,8 @@ public: return _pool.add(new (std::nothrow) Iterator(_set.begin(), _set.end())); } + phmap::flat_hash_set<StringValue>* get_inner_set() { return &_set; } + private: phmap::flat_hash_set<StringValue> _set; ObjectPool _pool; diff --git a/be/src/exprs/in_predicate.cpp b/be/src/exprs/in_predicate.cpp index a0db24592b..32acb5fdeb 100644 --- a/be/src/exprs/in_predicate.cpp +++ b/be/src/exprs/in_predicate.cpp @@ -20,8 +20,6 @@ #include "exprs/in_predicate.h" -#include <sstream> - #include "exprs/create_predicate_function.h" #include "exprs/expr_context.h" #include "runtime/runtime_state.h" @@ -35,13 +33,17 @@ InPredicate::InPredicate(const TExprNode& node) _null_in_set(false), _hybrid_set() {} -InPredicate::~InPredicate() {} +InPredicate::~InPredicate() { + if (_should_delete) { + delete _hybrid_set; + } +} Status InPredicate::prepare(RuntimeState* state, HybridSetBase* hset) { if (_is_prepare) { return Status::OK(); } - _hybrid_set.reset(hset); + _hybrid_set = hset; if (nullptr == _hybrid_set) { return Status::InternalError("Unknown column type."); } @@ -87,10 +89,11 @@ Status InPredicate::prepare(RuntimeState* state, const RowDescriptor& row_desc, return Status::InternalError("no Function operator in."); } - _hybrid_set.reset(create_set(_children[0]->type().type)); - if (nullptr == _hybrid_set.get()) { + _hybrid_set = create_set(_children[0]->type().type); + if (nullptr == _hybrid_set) { return Status::InternalError("Unknown column type."); } + _should_delete = true; _is_prepare = true; diff --git a/be/src/exprs/in_predicate.h b/be/src/exprs/in_predicate.h index 0d43b4cefc..41e5585676 100644 --- a/be/src/exprs/in_predicate.h +++ b/be/src/exprs/in_predicate.h @@ -20,9 +20,6 @@ #pragma once -#include <string> -#include <unordered_set> - #include "exprs/hybrid_set.h" #include "exprs/predicate.h" @@ -33,24 +30,22 @@ namespace doris { // 2. construct by new one, and push child. class InPredicate : public Predicate { public: - virtual ~InPredicate(); - virtual Expr* clone(ObjectPool* pool) const override { - return pool->add(new InPredicate(*this)); - } + ~InPredicate() override; + Expr* clone(ObjectPool* pool) const override { return pool->add(new InPredicate(*this)); } Status prepare(RuntimeState* state, HybridSetBase* hset); Status open(RuntimeState* state, ExprContext* context, FunctionContext::FunctionStateScope scope) override; - virtual Status prepare(RuntimeState* state, const RowDescriptor& row_desc, - ExprContext* context) override; + Status prepare(RuntimeState* state, const RowDescriptor& row_desc, + ExprContext* context) override; - virtual BooleanVal get_boolean_val(ExprContext* context, TupleRow* row) override; + BooleanVal get_boolean_val(ExprContext* context, TupleRow* row) override; // this function add one item in hashset, not add to children. // if add to children, when List is long, copy is a expensive op. void insert(void* value); - HybridSetBase* hybrid_set() const { return _hybrid_set.get(); } + HybridSetBase* hybrid_set() const { return _hybrid_set; } bool is_not_in() const { return _is_not_in; } @@ -62,13 +57,14 @@ protected: InPredicate(const TExprNode& node); // virtual Status prepare(RuntimeState* state, const RowDescriptor& desc); - virtual std::string debug_string() const override; + std::string debug_string() const override; private: const bool _is_not_in; bool _is_prepare; bool _null_in_set; - std::shared_ptr<HybridSetBase> _hybrid_set; + HybridSetBase* _hybrid_set; + bool _should_delete = false; }; } // namespace doris diff --git a/be/src/exprs/literal.h b/be/src/exprs/literal.h index 9a6f7ae793..06ddb0b398 100644 --- a/be/src/exprs/literal.h +++ b/be/src/exprs/literal.h @@ -22,6 +22,7 @@ #include "common/object_pool.h" #include "exprs/expr.h" +#include "exprs/expr_value.h" namespace doris { @@ -30,30 +31,30 @@ class TExprNode; class Literal final : public Expr { public: Literal(const TExprNode& node); - virtual ~Literal(); - - virtual Expr* clone(ObjectPool* pool) const override { return pool->add(new Literal(*this)); } - - virtual BooleanVal get_boolean_val(ExprContext* context, TupleRow*) override; - virtual TinyIntVal get_tiny_int_val(ExprContext* context, TupleRow*) override; - virtual SmallIntVal get_small_int_val(ExprContext* context, TupleRow*) override; - virtual IntVal get_int_val(ExprContext* context, TupleRow*) override; - virtual BigIntVal get_big_int_val(ExprContext* context, TupleRow*) override; - virtual LargeIntVal get_large_int_val(ExprContext* context, TupleRow*) override; - virtual FloatVal get_float_val(ExprContext* context, TupleRow*) override; - virtual DoubleVal get_double_val(ExprContext* context, TupleRow*) override; - virtual DecimalV2Val get_decimalv2_val(ExprContext* context, TupleRow*) override; - virtual DateTimeVal get_datetime_val(ExprContext* context, TupleRow*) override; - virtual DateV2Val get_datev2_val(ExprContext* context, TupleRow*) override; - virtual DateTimeV2Val get_datetimev2_val(ExprContext* context, TupleRow*) override; - virtual StringVal get_string_val(ExprContext* context, TupleRow* row) override; - virtual CollectionVal get_array_val(ExprContext* context, TupleRow*) override; - virtual Decimal32Val get_decimal32_val(ExprContext* context, TupleRow*) override; - virtual Decimal64Val get_decimal64_val(ExprContext* context, TupleRow*) override; - virtual Decimal128Val get_decimal128_val(ExprContext* context, TupleRow*) override; + ~Literal() override; + + Expr* clone(ObjectPool* pool) const override { return pool->add(new Literal(*this)); } + + BooleanVal get_boolean_val(ExprContext* context, TupleRow*) override; + TinyIntVal get_tiny_int_val(ExprContext* context, TupleRow*) override; + SmallIntVal get_small_int_val(ExprContext* context, TupleRow*) override; + IntVal get_int_val(ExprContext* context, TupleRow*) override; + BigIntVal get_big_int_val(ExprContext* context, TupleRow*) override; + LargeIntVal get_large_int_val(ExprContext* context, TupleRow*) override; + FloatVal get_float_val(ExprContext* context, TupleRow*) override; + DoubleVal get_double_val(ExprContext* context, TupleRow*) override; + DecimalV2Val get_decimalv2_val(ExprContext* context, TupleRow*) override; + DateTimeVal get_datetime_val(ExprContext* context, TupleRow*) override; + DateV2Val get_datev2_val(ExprContext* context, TupleRow*) override; + DateTimeV2Val get_datetimev2_val(ExprContext* context, TupleRow*) override; + StringVal get_string_val(ExprContext* context, TupleRow* row) override; + CollectionVal get_array_val(ExprContext* context, TupleRow*) override; + Decimal32Val get_decimal32_val(ExprContext* context, TupleRow*) override; + Decimal64Val get_decimal64_val(ExprContext* context, TupleRow*) override; + Decimal128Val get_decimal128_val(ExprContext* context, TupleRow*) override; // init val before use - virtual Status prepare(RuntimeState* state, const RowDescriptor& row_desc, - ExprContext* context) override; + Status prepare(RuntimeState* state, const RowDescriptor& row_desc, + ExprContext* context) override; private: ExprValue _value; diff --git a/be/src/exprs/runtime_filter.cpp b/be/src/exprs/runtime_filter.cpp index 1245ce370d..346da7fdf8 100644 --- a/be/src/exprs/runtime_filter.cpp +++ b/be/src/exprs/runtime_filter.cpp @@ -17,9 +17,6 @@ #include "runtime_filter.h" -#include <memory> -#include <type_traits> - #include "common/object_pool.h" #include "common/status.h" #include "exprs/binary_predicate.h" @@ -41,7 +38,9 @@ #include "util/string_parser.hpp" #include "vec/columns/column.h" #include "vec/exprs/vbloom_predicate.h" +#include "vec/exprs/vdirect_in_predicate.h" #include "vec/exprs/vexpr.h" +#include "vec/exprs/vliteral.h" #include "vec/exprs/vruntimefilter_wrapper.h" namespace doris { @@ -312,8 +311,7 @@ Status create_literal(ObjectPool* pool, const TypeDescriptor& type, const void* } if constexpr (is_vectorized) { - *reinterpret_cast<doris::vectorized::VExpr**>(expr) = - pool->add(new doris::vectorized::VLiteral(node)); + *reinterpret_cast<vectorized::VExpr**>(expr) = pool->add(new vectorized::VLiteral(node)); } else { *reinterpret_cast<Expr**>(expr) = pool->add(new Literal(node)); } @@ -409,7 +407,10 @@ public: _column_return_type(params->column_return_type), _filter_type(params->filter_type), _fragment_instance_id(params->fragment_instance_id), - _filter_id(params->filter_id) {} + _filter_id(params->filter_id), + _use_batch(_state->enable_vectorized_exec() && + IRuntimeFilter::enable_use_batch(_state->be_exec_version(), + _column_return_type)) {} // for a 'tmp' runtime predicate wrapper // only could called assign method or as a param for merge RuntimePredicateWrapper(RuntimeState* state, ObjectPool* pool, PrimitiveType column_type, @@ -420,14 +421,17 @@ public: _column_return_type(column_type), _filter_type(type), _fragment_instance_id(fragment_instance_id), - _filter_id(filter_id) {} + _filter_id(filter_id), + _use_batch(_state->enable_vectorized_exec() && + IRuntimeFilter::enable_use_batch(_state->be_exec_version(), + _column_return_type)) {} // init runtime filter wrapper // alloc memory to init runtime filter function Status init(const RuntimeFilterParams* params) { _max_in_num = params->max_in_num; switch (_filter_type) { case RuntimeFilterType::IN_FILTER: { - _hybrid_set.reset(create_set(_column_return_type)); + _hybrid_set.reset(create_set(_column_return_type, _state->enable_vectorized_exec())); break; } case RuntimeFilterType::MINMAX_FILTER: { @@ -441,7 +445,7 @@ public: return Status::OK(); } case RuntimeFilterType::IN_OR_BLOOM_FILTER: { - _hybrid_set.reset(create_set(_column_return_type)); + _hybrid_set.reset(create_set(_column_return_type, _state->enable_vectorized_exec())); _bloomfilter_func.reset(create_bloom_filter(_column_return_type)); _bloomfilter_func->set_length(params->bloom_filter_size); return Status::OK(); @@ -459,17 +463,14 @@ public: _is_bloomfilter = true; insert_to_bloom_filter(_bloomfilter_func.get()); // release in filter - _hybrid_set.reset(create_set(_column_return_type)); + _hybrid_set.reset(create_set(_column_return_type, _state->enable_vectorized_exec())); } void insert_to_bloom_filter(BloomFilterFuncBase* bloom_filter) const { if (_hybrid_set->size() > 0) { - bool use_batch = _state->enable_vectorized_exec() && - IRuntimeFilter::enable_use_batch(_state->be_exec_version(), - _column_return_type); auto it = _hybrid_set->begin(); - if (use_batch) { + if (_use_batch) { while (it->has_next()) { bloom_filter->insert_fixed_len((char*)it->get_value()); it->next(); @@ -549,20 +550,6 @@ public: void insert(const StringRef& value) { switch (_column_return_type) { - // todo: rethink logic of hll/bitmap/date - case TYPE_DATE: - case TYPE_DATETIME: { - // DateTime->DateTimeValue - vectorized::DateTime date_time = - *reinterpret_cast<const vectorized::DateTime*>(value.data); - vectorized::VecDateTimeValue vec_date_time_value = - binary_cast<vectorized::Int64, vectorized::VecDateTimeValue>(date_time); - doris::DateTimeValue date_time_value; - vec_date_time_value.convert_vec_dt_to_dt(&date_time_value); - insert(reinterpret_cast<const void*>(&date_time_value)); - break; - } - case TYPE_CHAR: case TYPE_VARCHAR: case TYPE_HLL: @@ -631,7 +618,8 @@ public: _is_ignored_in_filter = true; _ignored_in_filter_msg = wrapper->_ignored_in_filter_msg; // release in filter - _hybrid_set.reset(create_set(_column_return_type)); + _hybrid_set.reset( + create_set(_column_return_type, _state->enable_vectorized_exec())); break; } // try insert set @@ -650,7 +638,8 @@ public: _is_ignored_in_filter = true; // release in filter - _hybrid_set.reset(create_set(_column_return_type)); + _hybrid_set.reset( + create_set(_column_return_type, _state->enable_vectorized_exec())); } break; } @@ -720,10 +709,10 @@ public: _ignored_in_filter_msg = _pool->add(new std::string(in_filter->ignored_msg())); return Status::OK(); } - _hybrid_set.reset(create_set(type)); + _hybrid_set.reset(create_set(type, _state->enable_vectorized_exec())); switch (type) { case TYPE_BOOLEAN: { - batch_assign(in_filter, [](std::unique_ptr<HybridSetBase>& set, PColumnValue& column, + batch_assign(in_filter, [](std::shared_ptr<HybridSetBase>& set, PColumnValue& column, ObjectPool* pool) { bool bool_val = column.boolval(); set->insert(&bool_val); @@ -731,7 +720,7 @@ public: break; } case TYPE_TINYINT: { - batch_assign(in_filter, [](std::unique_ptr<HybridSetBase>& set, PColumnValue& column, + batch_assign(in_filter, [](std::shared_ptr<HybridSetBase>& set, PColumnValue& column, ObjectPool* pool) { int8_t int_val = static_cast<int8_t>(column.intval()); set->insert(&int_val); @@ -739,7 +728,7 @@ public: break; } case TYPE_SMALLINT: { - batch_assign(in_filter, [](std::unique_ptr<HybridSetBase>& set, PColumnValue& column, + batch_assign(in_filter, [](std::shared_ptr<HybridSetBase>& set, PColumnValue& column, ObjectPool* pool) { int16_t int_val = static_cast<int16_t>(column.intval()); set->insert(&int_val); @@ -747,7 +736,7 @@ public: break; } case TYPE_INT: { - batch_assign(in_filter, [](std::unique_ptr<HybridSetBase>& set, PColumnValue& column, + batch_assign(in_filter, [](std::shared_ptr<HybridSetBase>& set, PColumnValue& column, ObjectPool* pool) { int32_t int_val = column.intval(); set->insert(&int_val); @@ -755,7 +744,7 @@ public: break; } case TYPE_BIGINT: { - batch_assign(in_filter, [](std::unique_ptr<HybridSetBase>& set, PColumnValue& column, + batch_assign(in_filter, [](std::shared_ptr<HybridSetBase>& set, PColumnValue& column, ObjectPool* pool) { int64_t long_val = column.longval(); set->insert(&long_val); @@ -763,7 +752,7 @@ public: break; } case TYPE_LARGEINT: { - batch_assign(in_filter, [](std::unique_ptr<HybridSetBase>& set, PColumnValue& column, + batch_assign(in_filter, [](std::shared_ptr<HybridSetBase>& set, PColumnValue& column, ObjectPool* pool) { auto string_val = column.stringval(); StringParser::ParseResult result; @@ -775,7 +764,7 @@ public: break; } case TYPE_FLOAT: { - batch_assign(in_filter, [](std::unique_ptr<HybridSetBase>& set, PColumnValue& column, + batch_assign(in_filter, [](std::shared_ptr<HybridSetBase>& set, PColumnValue& column, ObjectPool* pool) { float float_val = static_cast<float>(column.doubleval()); set->insert(&float_val); @@ -783,7 +772,7 @@ public: break; } case TYPE_DOUBLE: { - batch_assign(in_filter, [](std::unique_ptr<HybridSetBase>& set, PColumnValue& column, + batch_assign(in_filter, [](std::shared_ptr<HybridSetBase>& set, PColumnValue& column, ObjectPool* pool) { double double_val = column.doubleval(); set->insert(&double_val); @@ -791,7 +780,7 @@ public: break; } case TYPE_DATEV2: { - batch_assign(in_filter, [](std::unique_ptr<HybridSetBase>& set, PColumnValue& column, + batch_assign(in_filter, [](std::shared_ptr<HybridSetBase>& set, PColumnValue& column, ObjectPool* pool) { auto date_v2_val = column.intval(); set->insert(&date_v2_val); @@ -799,7 +788,7 @@ public: break; } case TYPE_DATETIMEV2: { - batch_assign(in_filter, [](std::unique_ptr<HybridSetBase>& set, PColumnValue& column, + batch_assign(in_filter, [](std::shared_ptr<HybridSetBase>& set, PColumnValue& column, ObjectPool* pool) { auto date_v2_val = column.longval(); set->insert(&date_v2_val); @@ -808,17 +797,27 @@ public: } case TYPE_DATETIME: case TYPE_DATE: { - batch_assign(in_filter, [](std::unique_ptr<HybridSetBase>& set, PColumnValue& column, - ObjectPool* pool) { - auto& string_val_ref = column.stringval(); - DateTimeValue datetime_val; - datetime_val.from_date_str(string_val_ref.c_str(), string_val_ref.length()); - set->insert(&datetime_val); - }); + if (_state->enable_vectorized_exec()) { + batch_assign(in_filter, [](std::shared_ptr<HybridSetBase>& set, + PColumnValue& column, ObjectPool* pool) { + auto& string_val_ref = column.stringval(); + vectorized::VecDateTimeValue datetime_val; + datetime_val.from_date_str(string_val_ref.c_str(), string_val_ref.length()); + set->insert(&datetime_val); + }); + } else { + batch_assign(in_filter, [](std::shared_ptr<HybridSetBase>& set, + PColumnValue& column, ObjectPool* pool) { + auto& string_val_ref = column.stringval(); + DateTimeValue datetime_val; + datetime_val.from_date_str(string_val_ref.c_str(), string_val_ref.length()); + set->insert(&datetime_val); + }); + } break; } case TYPE_DECIMALV2: { - batch_assign(in_filter, [](std::unique_ptr<HybridSetBase>& set, PColumnValue& column, + batch_assign(in_filter, [](std::shared_ptr<HybridSetBase>& set, PColumnValue& column, ObjectPool* pool) { auto& string_val_ref = column.stringval(); DecimalV2Value decimal_val(string_val_ref); @@ -827,7 +826,7 @@ public: break; } case TYPE_DECIMAL32: { - batch_assign(in_filter, [](std::unique_ptr<HybridSetBase>& set, PColumnValue& column, + batch_assign(in_filter, [](std::shared_ptr<HybridSetBase>& set, PColumnValue& column, ObjectPool* pool) { int32_t decimal_32_val = column.intval(); set->insert(&decimal_32_val); @@ -835,7 +834,7 @@ public: break; } case TYPE_DECIMAL64: { - batch_assign(in_filter, [](std::unique_ptr<HybridSetBase>& set, PColumnValue& column, + batch_assign(in_filter, [](std::shared_ptr<HybridSetBase>& set, PColumnValue& column, ObjectPool* pool) { int64_t decimal_64_val = column.longval(); set->insert(&decimal_64_val); @@ -843,7 +842,7 @@ public: break; } case TYPE_DECIMAL128: { - batch_assign(in_filter, [](std::unique_ptr<HybridSetBase>& set, PColumnValue& column, + batch_assign(in_filter, [](std::shared_ptr<HybridSetBase>& set, PColumnValue& column, ObjectPool* pool) { auto string_val = column.stringval(); StringParser::ParseResult result; @@ -857,7 +856,7 @@ public: case TYPE_VARCHAR: case TYPE_CHAR: case TYPE_STRING: { - batch_assign(in_filter, [](std::unique_ptr<HybridSetBase>& set, PColumnValue& column, + batch_assign(in_filter, [](std::shared_ptr<HybridSetBase>& set, PColumnValue& column, ObjectPool* pool) { auto& string_val_ref = column.stringval(); auto val_ptr = pool->add(new std::string(string_val_ref)); @@ -1049,7 +1048,7 @@ public: std::string* get_ignored_in_filter_msg() const { return _ignored_in_filter_msg; } void batch_assign(const PInFilter* filter, - void (*assign_func)(std::unique_ptr<HybridSetBase>& _hybrid_set, + void (*assign_func)(std::shared_ptr<HybridSetBase>& _hybrid_set, PColumnValue&, ObjectPool*)) { for (int i = 0; i < filter->values_size(); ++i) { PColumnValue column = filter->values(i); @@ -1057,22 +1056,29 @@ public: } } + size_t get_in_filter_size() const { return _hybrid_set->size(); } + friend class IRuntimeFilter; private: RuntimeState* _state; ObjectPool* _pool; + + // When a runtime filter received from remote and it is a bloom filter, _column_return_type will be invalid. PrimitiveType _column_return_type; // column type RuntimeFilterType _filter_type; int32_t _max_in_num = -1; std::unique_ptr<MinMaxFuncBase> _minmax_func; - std::unique_ptr<HybridSetBase> _hybrid_set; + std::shared_ptr<HybridSetBase> _hybrid_set; std::shared_ptr<BloomFilterFuncBase> _bloomfilter_func; bool _is_bloomfilter = false; bool _is_ignored_in_filter = false; std::string* _ignored_in_filter_msg = nullptr; UniqueId _fragment_instance_id; uint32_t _filter_id; + + // When _column_return_type is invalid, _use_batch will be always false. + bool _use_batch; }; Status IRuntimeFilter::create(RuntimeState* state, ObjectPool* pool, const TRuntimeFilterDesc* desc, @@ -1237,6 +1243,10 @@ void IRuntimeFilter::signal() { _is_ready = true; _inner_cv.notify_all(); _effect_timer.reset(); + + if (_wrapper->get_real_type() == RuntimeFilterType::IN_FILTER) { + _profile->add_info_string("InFilterSize", std::to_string(_wrapper->get_in_filter_size())); + } } BloomFilterFuncBase* IRuntimeFilter::get_bloomfilter() const { @@ -1357,6 +1367,7 @@ void IRuntimeFilter::init_profile(RuntimeProfile* parent_profile) { DCHECK(parent_profile != nullptr); _profile.reset(new RuntimeProfile("RuntimeFilter:" + ::doris::to_string(_runtime_filter_type))); parent_profile->add_child(_profile.get(), true, nullptr); + _profile->add_info_string("Ignored", _is_ignored ? "true" : "false"); _effect_time_cost = ADD_TIMER(_profile, "EffectTimeCost"); _await_time_cost = ADD_TIMER(_profile, "AWaitTimeCost"); @@ -1526,11 +1537,22 @@ void IRuntimeFilter::to_protobuf(PInFilter* filter) { } case TYPE_DATE: case TYPE_DATETIME: { - batch_copy<DateTimeValue>(filter, it, [](PColumnValue* column, const DateTimeValue* value) { - char convert_buffer[30]; - value->to_string(convert_buffer); - column->set_stringval(convert_buffer); - }); + if (_state->enable_vectorized_exec()) { + batch_copy<vectorized::VecDateTimeValue>( + filter, it, + [](PColumnValue* column, const vectorized::VecDateTimeValue* value) { + char convert_buffer[30]; + value->to_string(convert_buffer); + column->set_stringval(convert_buffer); + }); + } else { + batch_copy<DateTimeValue>(filter, it, + [](PColumnValue* column, const DateTimeValue* value) { + char convert_buffer[30]; + value->to_string(convert_buffer); + column->set_stringval(convert_buffer); + }); + } return; } case TYPE_DECIMALV2: { @@ -1735,7 +1757,7 @@ Status RuntimePredicateWrapper::get_push_context(T* container, RuntimeState* sta node.__isset.vector_opcode = true; node.__set_vector_opcode(to_in_opcode(_column_return_type)); auto in_pred = _pool->add(new InPredicate(node)); - RETURN_IF_ERROR(in_pred->prepare(state, _hybrid_set.release())); + RETURN_IF_ERROR(in_pred->prepare(state, _hybrid_set.get())); in_pred->add_child(Expr::copy(_pool, prob_expr->root())); ExprContext* ctx = _pool->add(new ExprContext(in_pred)); container->push_back(ctx); @@ -1799,8 +1821,7 @@ Status RuntimePredicateWrapper::get_push_vexprs(std::vector<doris::vectorized::V case RuntimeFilterType::IN_FILTER: { if (!_is_ignored_in_filter) { TTypeDesc type_desc = create_type_desc(PrimitiveType::TYPE_BOOLEAN); - type_desc.__set_is_nullable( - _hybrid_set->size() > 0 ? true : vprob_expr->root()->is_nullable()); + type_desc.__set_is_nullable(false); TExprNode node; node.__set_type(type_desc); node.__set_node_type(TExprNodeType::IN_PRED); @@ -1808,20 +1829,14 @@ Status RuntimePredicateWrapper::get_push_vexprs(std::vector<doris::vectorized::V node.__set_opcode(TExprOpcode::FILTER_IN); node.__isset.vector_opcode = true; node.__set_vector_opcode(to_in_opcode(_column_return_type)); - node.__set_is_nullable(_hybrid_set->size() > 0 ? true - : vprob_expr->root()->is_nullable()); + node.__set_is_nullable(false); - // VInPredicate - doris::vectorized::VExpr* expr = nullptr; - RETURN_IF_ERROR(doris::vectorized::VExpr::create_expr(_pool, node, &expr)); + auto in_pred = _pool->add(new vectorized::VDirectInPredicate(node)); + in_pred->set_filter(_hybrid_set); auto cloned_vexpr = vprob_expr->root()->clone(_pool); - expr->add_child(cloned_vexpr); - - auto& children = const_cast<std::vector<doris::vectorized::VExpr*>&>(expr->children()); - _hybrid_set->to_vexpr_list(_pool, &children, vprob_expr->root()->type().precision, - vprob_expr->root()->type().scale); - container->push_back( - _pool->add(new doris::vectorized::VRuntimeFilterWrapper(node, expr))); + in_pred->add_child(cloned_vexpr); + auto wrapper = _pool->add(new vectorized::VRuntimeFilterWrapper(node, in_pred)); + container->push_back(wrapper); } break; } @@ -1838,7 +1853,7 @@ Status RuntimePredicateWrapper::get_push_vexprs(std::vector<doris::vectorized::V max_pred->add_child(cloned_vexpr); max_pred->add_child(max_literal); container->push_back( - _pool->add(new doris::vectorized::VRuntimeFilterWrapper(max_pred_node, max_pred))); + _pool->add(new vectorized::VRuntimeFilterWrapper(max_pred_node, max_pred))); // create min filter doris::vectorized::VExpr* min_pred = nullptr; @@ -1852,25 +1867,25 @@ Status RuntimePredicateWrapper::get_push_vexprs(std::vector<doris::vectorized::V min_pred->add_child(cloned_vexpr); min_pred->add_child(min_literal); container->push_back( - _pool->add(new doris::vectorized::VRuntimeFilterWrapper(min_pred_node, min_pred))); + _pool->add(new vectorized::VRuntimeFilterWrapper(min_pred_node, min_pred))); break; } case RuntimeFilterType::BLOOM_FILTER: { // create a bloom filter TTypeDesc type_desc = create_type_desc(PrimitiveType::TYPE_BOOLEAN); - type_desc.__set_is_nullable(vprob_expr->root()->is_nullable()); + type_desc.__set_is_nullable(false); TExprNode node; node.__set_type(type_desc); node.__set_node_type(TExprNodeType::BLOOM_PRED); node.__set_opcode(TExprOpcode::RT_FILTER); node.__isset.vector_opcode = true; node.__set_vector_opcode(to_in_opcode(_column_return_type)); - node.__set_is_nullable(vprob_expr->root()->is_nullable()); - auto bloom_pred = _pool->add(new doris::vectorized::VBloomPredicate(node)); + node.__set_is_nullable(false); + auto bloom_pred = _pool->add(new vectorized::VBloomPredicate(node)); bloom_pred->set_filter(_bloomfilter_func); auto cloned_vexpr = vprob_expr->root()->clone(_pool); bloom_pred->add_child(cloned_vexpr); - auto wrapper = _pool->add(new doris::vectorized::VRuntimeFilterWrapper(node, bloom_pred)); + auto wrapper = _pool->add(new vectorized::VRuntimeFilterWrapper(node, bloom_pred)); container->push_back(wrapper); break; } diff --git a/be/src/olap/CMakeLists.txt b/be/src/olap/CMakeLists.txt index f46dd6259d..3512d13a7e 100644 --- a/be/src/olap/CMakeLists.txt +++ b/be/src/olap/CMakeLists.txt @@ -42,7 +42,6 @@ add_library(Olap STATIC generic_iterators.cpp hll.cpp inverted_index_parser.cpp - bloom_filter_predicate.cpp like_column_predicate.cpp key_coder.cpp lru_cache.cpp diff --git a/be/src/olap/bloom_filter_predicate.cpp b/be/src/olap/bloom_filter_predicate.cpp deleted file mode 100644 index 82688d4f0c..0000000000 --- a/be/src/olap/bloom_filter_predicate.cpp +++ /dev/null @@ -1,69 +0,0 @@ -// Licensed to the Apache Software Foundation (ASF) under one -// or more contributor license agreements. See the NOTICE file -// distributed with this work for additional information -// regarding copyright ownership. The ASF licenses this file -// to you under the Apache License, Version 2.0 (the -// "License"); you may not use this file except in compliance -// with the License. You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, -// software distributed under the License is distributed on an -// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY -// KIND, either express or implied. See the License for the -// specific language governing permissions and limitations -// under the License. - -#include "olap/bloom_filter_predicate.h" - -#include "exprs/create_predicate_function.h" - -#define APPLY_FOR_PRIMTYPE(M) \ - M(TYPE_TINYINT) \ - M(TYPE_SMALLINT) \ - M(TYPE_INT) \ - M(TYPE_BIGINT) \ - M(TYPE_LARGEINT) \ - M(TYPE_FLOAT) \ - M(TYPE_DOUBLE) \ - M(TYPE_CHAR) \ - M(TYPE_DATE) \ - M(TYPE_DATETIME) \ - M(TYPE_DATEV2) \ - M(TYPE_DATETIMEV2) \ - M(TYPE_VARCHAR) \ - M(TYPE_STRING) \ - M(TYPE_DECIMAL32) \ - M(TYPE_DECIMAL64) \ - M(TYPE_DECIMAL128) - -namespace doris { -ColumnPredicate* BloomFilterColumnPredicateFactory::create_column_predicate( - uint32_t column_id, const std::shared_ptr<BloomFilterFuncBase>& bloom_filter, - FieldType type, int be_exec_version) { - std::shared_ptr<BloomFilterFuncBase> filter; - switch (type) { -#define M(NAME) \ - case OLAP_FIELD_##NAME: { \ - filter.reset(create_bloom_filter(NAME)); \ - filter->light_copy(bloom_filter.get()); \ - return new BloomFilterColumnPredicate<NAME>(column_id, filter, be_exec_version); \ - } - APPLY_FOR_PRIMTYPE(M) -#undef M - case OLAP_FIELD_TYPE_DECIMAL: { - filter.reset(create_bloom_filter(TYPE_DECIMALV2)); - filter->light_copy(bloom_filter.get()); - return new BloomFilterColumnPredicate<TYPE_DECIMALV2>(column_id, filter, be_exec_version); - } - case OLAP_FIELD_TYPE_BOOL: { - filter.reset(create_bloom_filter(TYPE_BOOLEAN)); - filter->light_copy(bloom_filter.get()); - return new BloomFilterColumnPredicate<TYPE_BOOLEAN>(column_id, filter, be_exec_version); - } - default: - return nullptr; - } -} -} //namespace doris diff --git a/be/src/olap/bloom_filter_predicate.h b/be/src/olap/bloom_filter_predicate.h index 3712833a8d..d9d6a964f3 100644 --- a/be/src/olap/bloom_filter_predicate.h +++ b/be/src/olap/bloom_filter_predicate.h @@ -182,11 +182,4 @@ uint16_t BloomFilterColumnPredicate<T>::evaluate(const vectorized::IColumn& colu return new_size; } -class BloomFilterColumnPredicateFactory { -public: - static ColumnPredicate* create_column_predicate( - uint32_t column_id, const std::shared_ptr<BloomFilterFuncBase>& filter, FieldType type, - int be_exec_version); -}; - } //namespace doris diff --git a/be/src/olap/in_list_predicate.h b/be/src/olap/in_list_predicate.h index bdadaf4e57..a4b608f160 100644 --- a/be/src/olap/in_list_predicate.h +++ b/be/src/olap/in_list_predicate.h @@ -18,18 +18,18 @@ #pragma once #include <parallel_hashmap/phmap.h> -#include <stdint.h> #include <cstdint> #include <roaring/roaring.hh> -#include <type_traits> #include "decimal12.h" +#include "exprs/hybrid_set.h" #include "olap/column_predicate.h" #include "olap/olap_common.h" #include "olap/rowset/segment_v2/bloom_filter.h" #include "olap/wrapper_field.h" #include "runtime/define_primitive_type.h" +#include "runtime/primitive_type.h" #include "runtime/string_value.h" #include "runtime/type_limit.h" #include "uint24.h" @@ -91,6 +91,7 @@ public: const ConvertFunc& convert, bool is_opposite = false, const TabletColumn* col = nullptr, MemPool* pool = nullptr) : ColumnPredicate(column_id, is_opposite), + _values(new phmap::flat_hash_set<T>()), _min_value(type_limit<T>::max()), _max_value(type_limit<T>::min()) { for (const auto& condition : conditions) { @@ -103,24 +104,79 @@ public: } else { tmp = convert(condition); } - _values.insert(tmp); - if (tmp > _max_value) { - _max_value = tmp; - } - if (tmp < _min_value) { - _min_value = tmp; + _values->insert(tmp); + _update_min_max(tmp); + } + } + + InListPredicateBase(uint32_t column_id, const std::shared_ptr<HybridSetBase>& hybrid_set, + size_t char_length = 0) + : ColumnPredicate(column_id, false), + _min_value(type_limit<T>::max()), + _max_value(type_limit<T>::min()) { + using HybridSetType = std::conditional_t<is_string_type(Type), StringSet, HybridSet<Type>>; + + CHECK(hybrid_set != nullptr); + + if constexpr (is_string_type(Type) || Type == TYPE_DECIMALV2 || is_date_type(Type)) { + _values = new phmap::flat_hash_set<T>(); + auto values = ((HybridSetType*)hybrid_set.get())->get_inner_set(); + + if constexpr (is_string_type(Type)) { + for (auto& value : *values) { + StringValue sv = {value.data(), int(value.size())}; + if constexpr (Type == TYPE_CHAR) { + _temp_datas.push_back(""); + _temp_datas.back().resize(std::max(char_length, value.size())); + memcpy(_temp_datas.back().data(), value.data(), value.size()); + sv = {_temp_datas.back().data(), int(_temp_datas.back().size())}; + } + _values->insert(sv); + } + } else if constexpr (Type == TYPE_DECIMALV2) { + for (auto& value : *values) { + _values->insert({value.int_value(), value.frac_value()}); + } + } else if constexpr (Type == TYPE_DATE) { + for (auto& value : *values) { + _values->insert(value.to_olap_date()); + } + } else if constexpr (Type == TYPE_DATETIME) { + for (auto& value : *values) { + _values->insert(value.to_olap_datetime()); + } + } else { + CHECK(Type == TYPE_DATETIMEV2 || Type == TYPE_DATEV2); + for (auto& value : *values) { + _values->insert(T(value)); + } } + } else { + should_delete = false; + _values = ((HybridSetType*)hybrid_set.get())->get_inner_set(); + } + + for (auto& value : *_values) { + _update_min_max(value); + } + } + + ~InListPredicateBase() override { + if (should_delete) { + delete _values; } } // Only for test - InListPredicateBase(uint32_t column_id, phmap::flat_hash_set<T>&& values, + InListPredicateBase(uint32_t column_id, phmap::flat_hash_set<T>& values, T min_value = type_limit<T>::min(), T max_value = type_limit<T>::max(), bool is_opposite = false) : ColumnPredicate(column_id, is_opposite), - _values(std::move(values)), + _values(&values), _min_value(min_value), - _max_value(max_value) {} + _max_value(max_value) { + should_delete = false; + } PredicateType type() const override { return PT; } @@ -160,7 +216,7 @@ public: *result -= null_bitmap; } roaring::Roaring indices; - for (auto value : _values) { + for (auto value : *_values) { bool exact_match; Status s = iterator->seek_dictionary(&value, &exact_match); rowid_t seeked_ordinal = iterator->current_ordinal(); @@ -292,7 +348,7 @@ public: bool evaluate_and(const segment_v2::BloomFilter* bf) const override { if constexpr (PT == PredicateType::IN_LIST) { - for (auto value : _values) { + for (auto value : *_values) { if constexpr (std::is_same_v<T, StringValue>) { if (bf->test_bytes(value.ptr, value.len)) { return true; @@ -320,7 +376,7 @@ private: template <typename LeftT, typename RightT> bool _operator(const LeftT& lhs, const RightT& rhs) const { if constexpr (Type == TYPE_BOOLEAN) { - DCHECK(_values.size() == 2); + DCHECK(_values->size() == 2); return PT == PredicateType::IN_LIST; } else if constexpr (PT == PredicateType::IN_LIST) { return lhs != rhs; @@ -338,20 +394,20 @@ private: T tmp_uint32_value = 0; memcpy((char*)(&tmp_uint32_value), block->cell(idx).cell_ptr(), sizeof(uint24_t)); if constexpr (is_nullable) { - new_size += - _opposite ^ (!block->cell(idx).is_null() && - _operator(_values.find(tmp_uint32_value), _values.end())); + new_size += _opposite ^ + (!block->cell(idx).is_null() && + _operator(_values->find(tmp_uint32_value), _values->end())); } else { new_size += - _opposite ^ _operator(_values.find(tmp_uint32_value), _values.end()); + _opposite ^ _operator(_values->find(tmp_uint32_value), _values->end()); } } else { const T* cell_value = reinterpret_cast<const T*>(block->cell(idx).cell_ptr()); if constexpr (is_nullable) { new_size += _opposite ^ (!block->cell(idx).is_null() && - _operator(_values.find(*cell_value), _values.end())); + _operator(_values->find(*cell_value), _values->end())); } else { - new_size += _opposite ^ _operator(_values.find(*cell_value), _values.end()); + new_size += _opposite ^ _operator(_values->find(*cell_value), _values->end()); } } } @@ -374,13 +430,13 @@ private: if constexpr (is_nullable) { result &= !block->cell(idx).is_null(); } - result &= _operator(_values.find(tmp_uint32_value), _values.end()); + result &= _operator(_values->find(tmp_uint32_value), _values->end()); } else { const T* cell_value = reinterpret_cast<const T*>(block->cell(idx).cell_ptr()); if constexpr (is_nullable) { result &= !block->cell(idx).is_null(); } - result &= _operator(_values.find(*cell_value), _values.end()); + result &= _operator(_values->find(*cell_value), _values->end()); } if constexpr (is_and) { @@ -405,7 +461,7 @@ private: auto& value_in_dict_flags = _segment_id_to_value_in_dict_flags[column->get_rowset_segment_id()]; if (value_in_dict_flags.empty()) { - nested_col_ptr->find_codes(_values, value_in_dict_flags); + nested_col_ptr->find_codes(*_values, value_in_dict_flags); } for (uint16_t i = 0; i < size; i++) { @@ -450,13 +506,13 @@ private: } if constexpr (!is_opposite) { - if (_operator(_values.find(reinterpret_cast<const T&>(data_array[idx])), - _values.end())) { + if (_operator(_values->find(reinterpret_cast<const T&>(data_array[idx])), + _values->end())) { sel[new_size++] = idx; } } else { - if (!_operator(_values.find(reinterpret_cast<const T&>(data_array[idx])), - _values.end())) { + if (!_operator(_values->find(reinterpret_cast<const T&>(data_array[idx])), + _values->end())) { sel[new_size++] = idx; } } @@ -478,7 +534,7 @@ private: auto& value_in_dict_flags = _segment_id_to_value_in_dict_flags[column->get_rowset_segment_id()]; if (value_in_dict_flags.empty()) { - nested_col_ptr->find_codes(_values, value_in_dict_flags); + nested_col_ptr->find_codes(*_values, value_in_dict_flags); } for (uint16_t i = 0; i < size; i++) { @@ -531,14 +587,14 @@ private: if constexpr (!is_opposite) { if (is_and ^ - _operator(_values.find(reinterpret_cast<const T&>(data_array[idx])), - _values.end())) { + _operator(_values->find(reinterpret_cast<const T&>(data_array[idx])), + _values->end())) { flags[i] = !is_and; } } else { if (is_and ^ - !_operator(_values.find(reinterpret_cast<const T&>(data_array[idx])), - _values.end())) { + !_operator(_values->find(reinterpret_cast<const T&>(data_array[idx])), + _values->end())) { flags[i] = !is_and; } } @@ -552,12 +608,25 @@ private: return info; } - phmap::flat_hash_set<T> _values; + void _update_min_max(const T& value) { + if (value > _max_value) { + _max_value = value; + } + if (value < _min_value) { + _min_value = value; + } + } + + phmap::flat_hash_set<T>* _values; + bool should_delete = true; mutable std::map<std::pair<RowsetId, uint32_t>, std::vector<vectorized::UInt8>> _segment_id_to_value_in_dict_flags; T _min_value; T _max_value; static constexpr PrimitiveType EvalType = (Type == TYPE_CHAR ? TYPE_STRING : Type); + + // temp string for char type column + std::list<std::string> _temp_datas; }; template <PrimitiveType Type, PredicateType PT> diff --git a/be/src/olap/predicate_creator.h b/be/src/olap/predicate_creator.h index 0b1e1a4c14..3e39b7cbc1 100644 --- a/be/src/olap/predicate_creator.h +++ b/be/src/olap/predicate_creator.h @@ -19,11 +19,17 @@ #include <charconv> +#include "exprs/bloomfilter_predicate.h" +#include "exprs/create_predicate_function.h" +#include "exprs/hybrid_set.h" +#include "olap/bloom_filter_predicate.h" #include "olap/column_predicate.h" #include "olap/comparison_predicate.h" #include "olap/in_list_predicate.h" #include "olap/null_predicate.h" #include "olap/tablet_schema.h" +#include "runtime/define_primitive_type.h" +#include "runtime/primitive_type.h" #include "util/date_func.h" #include "util/string_util.h" @@ -285,4 +291,66 @@ inline ColumnPredicate* parse_to_predicate(TabletSchemaSPtr tablet_schema, return create(column, index, condition.condition_values[0], opposite, mem_pool); } +#define APPLY_FOR_PRIMTYPE(M) \ + M(TYPE_TINYINT) \ + M(TYPE_SMALLINT) \ + M(TYPE_INT) \ + M(TYPE_BIGINT) \ + M(TYPE_LARGEINT) \ + M(TYPE_FLOAT) \ + M(TYPE_DOUBLE) \ + M(TYPE_CHAR) \ + M(TYPE_DATE) \ + M(TYPE_DATETIME) \ + M(TYPE_DATEV2) \ + M(TYPE_DATETIMEV2) \ + M(TYPE_VARCHAR) \ + M(TYPE_STRING) \ + M(TYPE_DECIMAL32) \ + M(TYPE_DECIMAL64) \ + M(TYPE_DECIMAL128) + +template <PrimitiveType PT> +inline ColumnPredicate* create_olap_column_predicate( + uint32_t column_id, const std::shared_ptr<BloomFilterFuncBase>& filter, int be_exec_version, + const TabletColumn* column = nullptr) { + std::shared_ptr<BloomFilterFuncBase> filter_olap; + filter_olap.reset(create_bloom_filter(PT)); + filter_olap->light_copy(filter.get()); + return new BloomFilterColumnPredicate<PT>(column_id, filter, be_exec_version); +} + +template <PrimitiveType PT> +inline ColumnPredicate* create_olap_column_predicate(uint32_t column_id, + const std::shared_ptr<HybridSetBase>& filter, + int be_exec_version, + const TabletColumn* column = nullptr) { + return new InListPredicateBase<PT, PredicateType::IN_LIST>(column_id, filter, column->length()); +} + +template <typename T> +inline ColumnPredicate* create_column_predicate(uint32_t column_id, + const std::shared_ptr<T>& filter, FieldType type, + int be_exec_version, + const TabletColumn* column = nullptr) { + switch (type) { +#define M(NAME) \ + case OLAP_FIELD_##NAME: { \ + return create_olap_column_predicate<NAME>(column_id, filter, be_exec_version, column); \ + } + APPLY_FOR_PRIMTYPE(M) +#undef M + case OLAP_FIELD_TYPE_DECIMAL: { + return create_olap_column_predicate<TYPE_DECIMALV2>(column_id, filter, be_exec_version, + column); + } + case OLAP_FIELD_TYPE_BOOL: { + return create_olap_column_predicate<TYPE_BOOLEAN>(column_id, filter, be_exec_version, + column); + } + default: + return nullptr; + } +} + } //namespace doris diff --git a/be/src/olap/reader.cpp b/be/src/olap/reader.cpp index b1a1154346..1b03ed7e8d 100644 --- a/be/src/olap/reader.cpp +++ b/be/src/olap/reader.cpp @@ -20,7 +20,7 @@ #include <parallel_hashmap/phmap.h> #include "common/status.h" -#include "olap/bloom_filter_predicate.h" +#include "exprs/hybrid_set.h" #include "olap/like_column_predicate.h" #include "olap/olap_common.h" #include "olap/predicate_creator.h" @@ -431,6 +431,7 @@ void TabletReader::_init_conditions_param(const ReaderParams& read_params) { // These conditions is passed from OlapScannode, but not set column unique id here, so that set it here because it // is too complicated to modify related interface TCondition tmp_cond = condition; + auto condition_col_uid = _tablet_schema->column(tmp_cond.column_name).unique_id(); tmp_cond.__set_column_unique_id(condition_col_uid); ColumnPredicate* predicate = @@ -450,6 +451,10 @@ void TabletReader::_init_conditions_param(const ReaderParams& read_params) { _col_predicates.emplace_back(_parse_to_predicate(filter)); } + for (const auto& filter : read_params.in_filters) { + _col_predicates.emplace_back(_parse_to_predicate(filter)); + } + // Function filter push down to storage engine for (const auto& filter : read_params.function_filters) { _col_predicates.emplace_back(_parse_to_predicate(filter)); @@ -463,9 +468,19 @@ ColumnPredicate* TabletReader::_parse_to_predicate( return nullptr; } const TabletColumn& column = _tablet_schema->column(index); - return BloomFilterColumnPredicateFactory::create_column_predicate( - index, bloom_filter.second, column.type(), - _reader_context.runtime_state->be_exec_version()); + return create_column_predicate(index, bloom_filter.second, column.type(), + _reader_context.runtime_state->be_exec_version(), &column); +} + +ColumnPredicate* TabletReader::_parse_to_predicate( + const std::pair<std::string, std::shared_ptr<HybridSetBase>>& in_filter) { + int32_t index = _tablet_schema->field_index(in_filter.first); + if (index < 0) { + return nullptr; + } + const TabletColumn& column = _tablet_schema->column(index); + return create_column_predicate(index, in_filter.second, column.type(), + _reader_context.runtime_state->be_exec_version(), &column); } ColumnPredicate* TabletReader::_parse_to_predicate(const FunctionFilter& function_filter) { diff --git a/be/src/olap/reader.h b/be/src/olap/reader.h index e3f501e063..e3e29f0cc8 100644 --- a/be/src/olap/reader.h +++ b/be/src/olap/reader.h @@ -21,12 +21,12 @@ #include "exprs/bloomfilter_predicate.h" #include "exprs/function_filter.h" +#include "exprs/hybrid_set.h" #include "olap/delete_handler.h" #include "olap/row_cursor.h" #include "olap/rowset/rowset_reader.h" #include "olap/tablet.h" #include "olap/tablet_schema.h" -#include "util/date_func.h" #include "util/runtime_profile.h" namespace doris { @@ -77,6 +77,7 @@ public: std::vector<TCondition> conditions; std::vector<std::pair<string, std::shared_ptr<BloomFilterFuncBase>>> bloom_filters; + std::vector<std::pair<string, std::shared_ptr<HybridSetBase>>> in_filters; std::vector<FunctionFilter> function_filters; std::vector<RowsetMetaSharedPtr> delete_predicates; @@ -170,6 +171,9 @@ protected: ColumnPredicate* _parse_to_predicate( const std::pair<std::string, std::shared_ptr<BloomFilterFuncBase>>& bloom_filter); + ColumnPredicate* _parse_to_predicate( + const std::pair<std::string, std::shared_ptr<HybridSetBase>>& in_filter); + virtual ColumnPredicate* _parse_to_predicate(const FunctionFilter& function_filter); Status _init_delete_condition(const ReaderParams& read_params); diff --git a/be/src/runtime/primitive_type.cpp b/be/src/runtime/primitive_type.cpp index 9c7a771e59..9c17ba8fec 100644 --- a/be/src/runtime/primitive_type.cpp +++ b/be/src/runtime/primitive_type.cpp @@ -17,8 +17,6 @@ #include "runtime/primitive_type.h" -#include <sstream> - #include "gen_cpp/Types_types.h" #include "runtime/collection_value.h" #include "runtime/define_primitive_type.h" @@ -88,65 +86,6 @@ PrimitiveType convert_type_to_primitive(FunctionContext::Type type) { return PrimitiveType::INVALID_TYPE; } -bool is_enumeration_type(PrimitiveType type) { - switch (type) { - case TYPE_FLOAT: - case TYPE_DOUBLE: - case TYPE_NULL: - case TYPE_CHAR: - case TYPE_VARCHAR: - case TYPE_STRING: - case TYPE_DATETIME: - case TYPE_DATETIMEV2: - case TYPE_TIMEV2: - case TYPE_DECIMALV2: - case TYPE_DECIMAL32: - case TYPE_DECIMAL64: - case TYPE_DECIMAL128: - case TYPE_BOOLEAN: - case TYPE_ARRAY: - case TYPE_HLL: - return false; - case TYPE_TINYINT: - case TYPE_SMALLINT: - case TYPE_INT: - case TYPE_BIGINT: - case TYPE_LARGEINT: - case TYPE_DATE: - case TYPE_DATEV2: - return true; - - case INVALID_TYPE: - default: - DCHECK(false); - } - - return false; -} - -bool is_date_type(PrimitiveType type) { - return type == TYPE_DATETIME || type == TYPE_DATE || type == TYPE_DATETIMEV2 || - type == TYPE_DATEV2; -} - -bool is_string_type(PrimitiveType type) { - return type == TYPE_CHAR || type == TYPE_VARCHAR || type == TYPE_STRING; -} - -bool is_float_or_double(PrimitiveType type) { - return type == TYPE_FLOAT || type == TYPE_DOUBLE; -} - -bool is_int_or_bool(PrimitiveType type) { - return type == TYPE_BOOLEAN || type == TYPE_TINYINT || type == TYPE_SMALLINT || - type == TYPE_INT || type == TYPE_BIGINT || type == TYPE_LARGEINT; -} - -bool has_variable_type(PrimitiveType type) { - return type == TYPE_CHAR || type == TYPE_VARCHAR || type == TYPE_OBJECT || - type == TYPE_QUANTILE_STATE || type == TYPE_STRING; -} - // Returns the byte size of 'type' Returns 0 for variable length types. int get_byte_size(PrimitiveType type) { switch (type) { diff --git a/be/src/runtime/primitive_type.h b/be/src/runtime/primitive_type.h index c00ef24805..104c738bac 100644 --- a/be/src/runtime/primitive_type.h +++ b/be/src/runtime/primitive_type.h @@ -38,12 +38,64 @@ struct JsonBinaryValue; PrimitiveType convert_type_to_primitive(FunctionContext::Type type); -bool is_enumeration_type(PrimitiveType type); -bool is_date_type(PrimitiveType type); -bool is_float_or_double(PrimitiveType type); -bool is_int_or_bool(PrimitiveType type); -bool is_string_type(PrimitiveType type); -bool has_variable_type(PrimitiveType type); +constexpr bool is_enumeration_type(PrimitiveType type) { + switch (type) { + case TYPE_FLOAT: + case TYPE_DOUBLE: + case TYPE_NULL: + case TYPE_CHAR: + case TYPE_VARCHAR: + case TYPE_STRING: + case TYPE_DATETIME: + case TYPE_DATETIMEV2: + case TYPE_TIMEV2: + case TYPE_DECIMALV2: + case TYPE_DECIMAL32: + case TYPE_DECIMAL64: + case TYPE_DECIMAL128: + case TYPE_BOOLEAN: + case TYPE_ARRAY: + case TYPE_HLL: + return false; + case TYPE_TINYINT: + case TYPE_SMALLINT: + case TYPE_INT: + case TYPE_BIGINT: + case TYPE_LARGEINT: + case TYPE_DATE: + case TYPE_DATEV2: + return true; + + case INVALID_TYPE: + default: + DCHECK(false); + } + + return false; +} + +constexpr bool is_date_type(PrimitiveType type) { + return type == TYPE_DATETIME || type == TYPE_DATE || type == TYPE_DATETIMEV2 || + type == TYPE_DATEV2; +} + +constexpr bool is_string_type(PrimitiveType type) { + return type == TYPE_CHAR || type == TYPE_VARCHAR || type == TYPE_STRING; +} + +constexpr bool is_float_or_double(PrimitiveType type) { + return type == TYPE_FLOAT || type == TYPE_DOUBLE; +} + +constexpr bool is_int_or_bool(PrimitiveType type) { + return type == TYPE_BOOLEAN || type == TYPE_TINYINT || type == TYPE_SMALLINT || + type == TYPE_INT || type == TYPE_BIGINT || type == TYPE_LARGEINT; +} + +constexpr bool has_variable_type(PrimitiveType type) { + return type == TYPE_CHAR || type == TYPE_VARCHAR || type == TYPE_OBJECT || + type == TYPE_QUANTILE_STATE || type == TYPE_STRING; +} // Returns the byte size of 'type' Returns 0 for variable length types. int get_byte_size(PrimitiveType type); diff --git a/be/src/vec/columns/predicate_column.h b/be/src/vec/columns/predicate_column.h index 23c165adcc..c6ab98a3c7 100644 --- a/be/src/vec/columns/predicate_column.h +++ b/be/src/vec/columns/predicate_column.h @@ -19,10 +19,11 @@ #include "olap/decimal12.h" #include "olap/uint24.h" +#include "runtime/mem_pool.h" +#include "runtime/primitive_type.h" #include "runtime/string_value.h" #include "vec/columns/column.h" #include "vec/columns/column_decimal.h" -#include "vec/columns/column_impl.h" #include "vec/columns/column_string.h" #include "vec/columns/column_vector.h" #include "vec/core/types.h" diff --git a/be/src/vec/exec/scan/new_olap_scan_node.cpp b/be/src/vec/exec/scan/new_olap_scan_node.cpp index 1520c834f4..84e2a5c64a 100644 --- a/be/src/vec/exec/scan/new_olap_scan_node.cpp +++ b/be/src/vec/exec/scan/new_olap_scan_node.cpp @@ -22,7 +22,6 @@ #include "util/to_string.h" #include "vec/columns/column_const.h" #include "vec/exec/scan/new_olap_scanner.h" -#include "vec/functions/in.h" namespace doris::vectorized { @@ -53,6 +52,8 @@ Status NewOlapScanNode::_init_profile() { // 2. init timer and counters _reader_init_timer = ADD_TIMER(_scanner_profile, "ReaderInitTime"); + _scanner_init_timer = ADD_TIMER(_scanner_profile, "ScannerInitTime"); + _process_conjunct_timer = ADD_TIMER(_runtime_profile, "ProcessConjunctTime"); _read_compressed_counter = ADD_COUNTER(_segment_profile, "CompressedBytesRead", TUnit::BYTES); _read_uncompressed_counter = ADD_COUNTER(_segment_profile, "UncompressedBytesRead", TUnit::BYTES); @@ -116,7 +117,9 @@ static std::string olap_filter_to_string(const doris::TCondition& condition) { op_name = "NOT IN"; } return fmt::format("{{{} {} {}}}", condition.column_name, op_name, - to_string(condition.condition_values)); + condition.condition_values.size() > 128 + ? "[more than 128 elements]" + : to_string(condition.condition_values)); } static std::string olap_filters_to_string(const std::vector<doris::TCondition>& filters) { @@ -147,6 +150,7 @@ static std::string tablets_id_to_string( } Status NewOlapScanNode::_process_conjuncts() { + SCOPED_TIMER(_process_conjunct_timer); RETURN_IF_ERROR(VScanNode::_process_conjuncts()); if (_eos) { return Status::OK(); @@ -179,10 +183,12 @@ Status NewOlapScanNode::_build_key_ranges_and_filters() { // because extend_scan_key method may change the first parameter. // but the original range may be converted to olap filters, if it's not a exact_range. auto temp_range = range; - RETURN_IF_ERROR(_scan_keys.extend_scan_key(temp_range, _max_scan_key_num, - &exact_range)); - if (exact_range) { - _colname_to_value_range.erase(iter->first); + if (range.get_fixed_value_size() <= _max_pushdown_conditions_per_column) { + RETURN_IF_ERROR(_scan_keys.extend_scan_key(temp_range, _max_scan_key_num, + &exact_range)); + if (exact_range) { + _colname_to_value_range.erase(iter->first); + } } return Status::OK(); }, @@ -194,7 +200,7 @@ Status NewOlapScanNode::_build_key_ranges_and_filters() { std::visit([&](auto&& range) { range.to_olap_filter(filters); }, iter.second); for (const auto& filter : filters) { - _olap_filters.push_back(std::move(filter)); + _olap_filters.push_back(filter); } } @@ -282,14 +288,14 @@ Status NewOlapScanNode::_init_scanners(std::list<VScanner*>* scanners) { _eos = true; return Status::OK(); } + SCOPED_TIMER(_scanner_init_timer); + auto span = opentelemetry::trace::Tracer::GetCurrentSpan(); if (_vconjunct_ctx_ptr && (*_vconjunct_ctx_ptr)->root()) { _runtime_profile->add_info_string("RemainedDownPredicates", (*_vconjunct_ctx_ptr)->root()->debug_string()); } - auto span = opentelemetry::trace::Tracer::GetCurrentSpan(); - // ranges constructed from scan keys std::vector<std::unique_ptr<doris::OlapScanRange>> cond_ranges; RETURN_IF_ERROR(_scan_keys.get_key_range(&cond_ranges)); @@ -342,7 +348,7 @@ Status NewOlapScanNode::_init_scanners(std::list<VScanner*>* scanners) { _scanner_pool.add(scanner); RETURN_IF_ERROR(scanner->prepare(*scan_range, scanner_ranges, _vconjunct_ctx_ptr.get(), _olap_filters, _bloom_filters_push_down, - _push_down_functions)); + _in_filters_push_down, _push_down_functions)); scanners->push_back((VScanner*)scanner); disk_set.insert(scanner->scan_disk()); } diff --git a/be/src/vec/exec/scan/new_olap_scan_node.h b/be/src/vec/exec/scan/new_olap_scan_node.h index 2315d67047..8a56792ac0 100644 --- a/be/src/vec/exec/scan/new_olap_scan_node.h +++ b/be/src/vec/exec/scan/new_olap_scan_node.h @@ -65,6 +65,8 @@ private: RuntimeProfile::Counter* _tablet_counter = nullptr; RuntimeProfile::Counter* _rows_pushed_cond_filtered_counter = nullptr; RuntimeProfile::Counter* _reader_init_timer = nullptr; + RuntimeProfile::Counter* _scanner_init_timer = nullptr; + RuntimeProfile::Counter* _process_conjunct_timer = nullptr; RuntimeProfile::Counter* _io_timer = nullptr; RuntimeProfile::Counter* _read_compressed_counter = nullptr; diff --git a/be/src/vec/exec/scan/new_olap_scanner.cpp b/be/src/vec/exec/scan/new_olap_scanner.cpp index 8ad76adfb1..65c5df0c7f 100644 --- a/be/src/vec/exec/scan/new_olap_scanner.cpp +++ b/be/src/vec/exec/scan/new_olap_scanner.cpp @@ -38,6 +38,7 @@ Status NewOlapScanner::prepare( const TPaloScanRange& scan_range, const std::vector<OlapScanRange*>& key_ranges, VExprContext** vconjunct_ctx_ptr, const std::vector<TCondition>& filters, const std::vector<std::pair<string, std::shared_ptr<BloomFilterFuncBase>>>& bloom_filters, + const std::vector<std::pair<string, std::shared_ptr<HybridSetBase>>>& in_filters, const std::vector<FunctionFilter>& function_filters) { if (vconjunct_ctx_ptr != nullptr) { // Copy vconjunct_ctx_ptr from scan node to this scanner's _vconjunct_ctx. @@ -104,7 +105,7 @@ Status NewOlapScanner::prepare( // Initialize tablet_reader_params RETURN_IF_ERROR(_init_tablet_reader_params(key_ranges, filters, bloom_filters, - function_filters)); + in_filters, function_filters)); } } @@ -130,6 +131,7 @@ Status NewOlapScanner::open(RuntimeState* state) { Status NewOlapScanner::_init_tablet_reader_params( const std::vector<OlapScanRange*>& key_ranges, const std::vector<TCondition>& filters, const std::vector<std::pair<string, std::shared_ptr<BloomFilterFuncBase>>>& bloom_filters, + const std::vector<std::pair<string, std::shared_ptr<HybridSetBase>>>& in_filters, const std::vector<FunctionFilter>& function_filters) { // if the table with rowset [0-x] or [0-1] [2-y], and [0-1] is empty bool single_version = @@ -162,9 +164,10 @@ Status NewOlapScanner::_init_tablet_reader_params( _tablet_reader_params.tablet_schema = _tablet_schema; _tablet_reader_params.reader_type = READER_QUERY; _tablet_reader_params.aggregation = _aggregation; - if (real_parent->_olap_scan_node.__isset.push_down_agg_type_opt) + if (real_parent->_olap_scan_node.__isset.push_down_agg_type_opt) { _tablet_reader_params.push_down_agg_type_opt = real_parent->_olap_scan_node.push_down_agg_type_opt; + } _tablet_reader_params.version = Version(0, _version); // Condition @@ -175,9 +178,14 @@ Status NewOlapScanner::_init_tablet_reader_params( std::inserter(_tablet_reader_params.bloom_filters, _tablet_reader_params.bloom_filters.begin())); + std::copy(in_filters.cbegin(), in_filters.cend(), + std::inserter(_tablet_reader_params.in_filters, + _tablet_reader_params.in_filters.begin())); + std::copy(function_filters.cbegin(), function_filters.cend(), std::inserter(_tablet_reader_params.function_filters, _tablet_reader_params.function_filters.begin())); + if (!_state->skip_delete_predicate()) { auto& delete_preds = _tablet->delete_predicates(); std::copy(delete_preds.cbegin(), delete_preds.cend(), diff --git a/be/src/vec/exec/scan/new_olap_scanner.h b/be/src/vec/exec/scan/new_olap_scanner.h index 8b812c5a8e..e2ac03c4e0 100644 --- a/be/src/vec/exec/scan/new_olap_scanner.h +++ b/be/src/vec/exec/scan/new_olap_scanner.h @@ -20,6 +20,7 @@ #include "exec/olap_utils.h" #include "exprs/bloomfilter_predicate.h" #include "exprs/function_filter.h" +#include "exprs/hybrid_set.h" #include "olap/reader.h" #include "util/runtime_profile.h" #include "vec/exec/scan/vscanner.h" @@ -47,6 +48,7 @@ public: VExprContext** vconjunct_ctx_ptr, const std::vector<TCondition>& filters, const std::vector<std::pair<string, std::shared_ptr<BloomFilterFuncBase>>>& bloom_filters, + const std::vector<std::pair<string, std::shared_ptr<HybridSetBase>>>& in_filters, const std::vector<FunctionFilter>& function_filters); const std::string& scan_disk() const { return _tablet->data_dir()->path(); } @@ -62,6 +64,7 @@ private: const std::vector<OlapScanRange*>& key_ranges, const std::vector<TCondition>& filters, const std::vector<std::pair<string, std::shared_ptr<BloomFilterFuncBase>>>& bloom_filters, + const std::vector<std::pair<string, std::shared_ptr<HybridSetBase>>>& in_filters, const std::vector<FunctionFilter>& function_filters); Status _init_return_columns(); diff --git a/be/src/vec/exec/scan/vscan_node.cpp b/be/src/vec/exec/scan/vscan_node.cpp index 614da06d70..c4c58b38f0 100644 --- a/be/src/vec/exec/scan/vscan_node.cpp +++ b/be/src/vec/exec/scan/vscan_node.cpp @@ -17,14 +17,14 @@ #include "vec/exec/scan/vscan_node.h" +#include "common/status.h" #include "exprs/hybrid_set.h" #include "runtime/runtime_filter_mgr.h" -#include "util/stack_util.h" -#include "util/threadpool.h" #include "vec/columns/column_const.h" #include "vec/exec/scan/scanner_scheduler.h" #include "vec/exec/scan/vscanner.h" #include "vec/exprs/vcompound_pred.h" +#include "vec/exprs/vdirect_in_predicate.h" #include "vec/exprs/vslot_ref.h" #include "vec/functions/in.h" @@ -108,6 +108,7 @@ Status VScanNode::open(RuntimeState* state) { Status VScanNode::get_next(RuntimeState* state, vectorized::Block* block, bool* eos) { INIT_AND_SCOPE_REENTRANT_SPAN_IF(state->enable_profile(), state->get_tracer(), _get_next_span, "VScanNode::get_next"); + SCOPED_TIMER(_get_next_timer); SCOPED_TIMER(_runtime_profile->total_time_counter()); SCOPED_CONSUME_MEM_TRACKER(mem_tracker()); if (state->is_cancelled()) { @@ -146,6 +147,8 @@ Status VScanNode::_init_profile() { _total_throughput_counter = runtime_profile()->add_rate_counter("TotalReadThroughput", _rows_read_counter); _num_scanners = ADD_COUNTER(_runtime_profile, "NumScanners", TUnit::UNIT); + _get_next_timer = ADD_TIMER(_runtime_profile, "GetNextTime"); + _acquire_runtime_filter_timer = ADD_TIMER(_runtime_profile, "AcuireRuntimeFilterTime"); // 2. counters for scanners _scanner_profile.reset(new RuntimeProfile("VScanner")); @@ -199,6 +202,7 @@ Status VScanNode::_register_runtime_filter() { } Status VScanNode::_acquire_runtime_filter() { + SCOPED_TIMER(_acquire_runtime_filter_timer); std::vector<VExpr*> vexprs; for (size_t i = 0; i < _runtime_filter_descs.size(); ++i) { IRuntimeFilter* runtime_filter = _runtime_filter_ctxs[i].runtime_filter; @@ -581,18 +585,33 @@ Status VScanNode::_normalize_in_and_eq_predicate(VExpr* expr, VExprContext* expr slot->type().scale); // 1. Normalize in conjuncts like 'where col in (v1, v2, v3)' if (TExprNodeType::IN_PRED == expr->node_type()) { - VInPredicate* pred = static_cast<VInPredicate*>(expr); - PushDownType temp_pdt = _should_push_down_in_predicate(pred, expr_ctx, false); - if (temp_pdt == PushDownType::UNACCEPTABLE) { - return Status::OK(); + HybridSetBase::IteratorBase* iter = nullptr; + auto hybrid_set = expr->get_set_func(); + + if (hybrid_set != nullptr) { + // runtime filter produce VDirectInPredicate + if (hybrid_set->size() <= _max_pushdown_conditions_per_column) { + iter = hybrid_set->begin(); + } else { + _in_filters_push_down.emplace_back(slot->col_name(), expr->get_set_func()); + *pdt = PushDownType::ACCEPTABLE; + return Status::OK(); + } + } else { + // normal in predicate + VInPredicate* pred = static_cast<VInPredicate*>(expr); + PushDownType temp_pdt = _should_push_down_in_predicate(pred, expr_ctx, false); + if (temp_pdt == PushDownType::UNACCEPTABLE) { + return Status::OK(); + } + + // begin to push InPredicate value into ColumnValueRange + InState* state = reinterpret_cast<InState*>( + expr_ctx->fn_context(pred->fn_context_index()) + ->get_function_state(FunctionContext::FRAGMENT_LOCAL)); + iter = state->hybrid_set->begin(); } - // begin to push InPredicate value into ColumnValueRange - InState* state = reinterpret_cast<InState*>( - expr_ctx->fn_context(pred->fn_context_index()) - ->get_function_state(FunctionContext::FRAGMENT_LOCAL)); - HybridSetBase::IteratorBase* iter = state->hybrid_set->begin(); - auto fn_name = std::string(""); while (iter->has_next()) { // column in (nullptr) is always false so continue to // dispose next item @@ -602,11 +621,11 @@ Status VScanNode::_normalize_in_and_eq_predicate(VExpr* expr, VExprContext* expr } auto value = const_cast<void*>(iter->get_value()); RETURN_IF_ERROR(_change_value_range<true>( - temp_range, value, ColumnValueRange<T>::add_fixed_value_range, fn_name)); + temp_range, value, ColumnValueRange<T>::add_fixed_value_range, "")); iter->next(); } range.intersection(temp_range); - *pdt = temp_pdt; + *pdt = PushDownType::ACCEPTABLE; } else if (TExprNodeType::BINARY_PRED == expr->node_type()) { DCHECK(expr->children().size() == 2); auto eq_checker = [](const std::string& fn_name) { return fn_name == "eq"; }; @@ -640,11 +659,6 @@ Status VScanNode::_normalize_in_and_eq_predicate(VExpr* expr, VExprContext* expr *pdt = temp_pdt; } - // exceed limit, no conditions will be pushed down to storage engine. - if (range.get_fixed_value_size() > _max_pushdown_conditions_per_column) { - range.set_whole_value_range(); - *pdt = PushDownType::UNACCEPTABLE; - } return Status::OK(); } @@ -947,22 +961,6 @@ VScanNode::PushDownType VScanNode::_should_push_down_in_predicate(VInPredicate* if (pred->is_not_in() != is_not_in) { return PushDownType::UNACCEPTABLE; } - InState* state = reinterpret_cast<InState*>( - expr_ctx->fn_context(pred->fn_context_index()) - ->get_function_state(FunctionContext::FRAGMENT_LOCAL)); - HybridSetBase* set = state->hybrid_set.get(); - - // if there are too many elements in InPredicate, exceed the limit, - // we will not push any condition of this column to storage engine. - // because too many conditions pushed down to storage engine may even - // slow down the query process. - // ATTN: This is just an experience value. You may need to try - // different thresholds to improve performance. - if (set->size() > _max_pushdown_conditions_per_column) { - VLOG_NOTICE << "Predicate value num " << set->size() << " exceed limit " - << _max_pushdown_conditions_per_column; - return PushDownType::UNACCEPTABLE; - } return PushDownType::ACCEPTABLE; } diff --git a/be/src/vec/exec/scan/vscan_node.h b/be/src/vec/exec/scan/vscan_node.h index fa51dd393f..a2aaf5ffd4 100644 --- a/be/src/vec/exec/scan/vscan_node.h +++ b/be/src/vec/exec/scan/vscan_node.h @@ -186,6 +186,8 @@ protected: std::vector<std::pair<std::string, std::shared_ptr<BloomFilterFuncBase>>> _bloom_filters_push_down; + std::vector<std::pair<std::string, std::shared_ptr<HybridSetBase>>> _in_filters_push_down; + // Save all function predicates which may be pushed down to data source. std::vector<FunctionFilter> _push_down_functions; @@ -221,6 +223,8 @@ protected: RuntimeProfile::Counter* _total_throughput_counter; RuntimeProfile::Counter* _num_scanners; + RuntimeProfile::Counter* _get_next_timer = nullptr; + RuntimeProfile::Counter* _acquire_runtime_filter_timer = nullptr; // time of get block from scanner RuntimeProfile::Counter* _scan_timer = nullptr; // time of prefilter input block from scanner diff --git a/be/src/vec/exprs/vbloom_predicate.cpp b/be/src/vec/exprs/vbloom_predicate.cpp index 1a8d400ab4..5cdaa048ed 100644 --- a/be/src/vec/exprs/vbloom_predicate.cpp +++ b/be/src/vec/exprs/vbloom_predicate.cpp @@ -33,13 +33,6 @@ Status VBloomPredicate::prepare(RuntimeState* state, const RowDescriptor& desc, return Status::InternalError("Invalid argument for VBloomPredicate."); } - ColumnsWithTypeAndName argument_template; - argument_template.reserve(_children.size()); - for (auto child : _children) { - auto column = child->data_type()->create_column(); - argument_template.emplace_back(std::move(column), child->data_type(), child->expr_name()); - } - _be_exec_version = state->be_exec_version(); return Status::OK(); } @@ -95,6 +88,7 @@ Status VBloomPredicate::execute(VExprContext* context, Block* block, int* result reinterpret_cast<const void*>(argument_column->get_data_at(i).data)); } } + if (_data_type->is_nullable()) { auto null_map = ColumnVector<UInt8>::create(block->rows(), 0); block->insert({ColumnNullable::create(std::move(res_data_column), std::move(null_map)), diff --git a/be/src/vec/exprs/vdirect_in_predicate.h b/be/src/vec/exprs/vdirect_in_predicate.h new file mode 100644 index 0000000000..458e6c0a0e --- /dev/null +++ b/be/src/vec/exprs/vdirect_in_predicate.h @@ -0,0 +1,94 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +#pragma once + +#include "common/status.h" +#include "vec/exprs/vexpr.h" + +namespace doris::vectorized { +class VDirectInPredicate final : public VExpr { +public: + VDirectInPredicate(const TExprNode& node) + : VExpr(node), _filter(nullptr), _expr_name("direct_in_predicate") {} + ~VDirectInPredicate() override = default; + + Status execute(VExprContext* context, doris::vectorized::Block* block, + int* result_column_id) override { + doris::vectorized::ColumnNumbers arguments(_children.size()); + for (int i = 0; i < _children.size(); ++i) { + int column_id = -1; + RETURN_IF_ERROR(_children[i]->execute(context, block, &column_id)); + arguments[i] = column_id; + } + + size_t num_columns_without_result = block->columns(); + auto res_data_column = ColumnVector<UInt8>::create(block->rows()); + ColumnPtr argument_column = + block->get_by_position(arguments[0]).column->convert_to_full_column_if_const(); + size_t sz = argument_column->size(); + res_data_column->resize(sz); + + auto ptr = ((ColumnVector<UInt8>*)res_data_column.get())->get_data().data(); + auto type = WhichDataType(remove_nullable(block->get_by_position(arguments[0]).type)); + if (type.is_string_or_fixed_string()) { + for (size_t i = 0; i < sz; i++) { + auto ele = argument_column->get_data_at(i); + StringValue v(ele.data, ele.size); + ptr[i] = _filter->find(reinterpret_cast<const void*>(&v)); + } + } else if (type.is_int_or_uint() || type.is_float()) { + if (argument_column->is_nullable()) { + auto column_nested = reinterpret_cast<const ColumnNullable*>(argument_column.get()) + ->get_nested_column_ptr(); + auto column_nullmap = reinterpret_cast<const ColumnNullable*>(argument_column.get()) + ->get_null_map_column_ptr(); + _filter->find_fixed_len(column_nested->get_raw_data().data, + (uint8*)column_nullmap->get_raw_data().data, sz, ptr); + } else { + _filter->find_fixed_len(argument_column->get_raw_data().data, nullptr, sz, ptr); + } + } else { + for (size_t i = 0; i < sz; i++) { + ptr[i] = _filter->find( + reinterpret_cast<const void*>(argument_column->get_data_at(i).data)); + } + } + + DCHECK(!_data_type->is_nullable()); + + block->insert({std::move(res_data_column), _data_type, _expr_name}); + + *result_column_id = num_columns_without_result; + return Status::OK(); + } + + VExpr* clone(doris::ObjectPool* pool) const override { + return pool->add(new VDirectInPredicate(*this)); + } + + const std::string& expr_name() const override { return _expr_name; } + + void set_filter(std::shared_ptr<HybridSetBase>& filter) { _filter = filter; } + + std::shared_ptr<HybridSetBase> get_set_func() const override { return _filter; } + +private: + std::shared_ptr<HybridSetBase> _filter; + std::string _expr_name; +}; +} // namespace doris::vectorized \ No newline at end of file diff --git a/be/src/vec/exprs/vexpr.h b/be/src/vec/exprs/vexpr.h index 0a82877b93..891c83360e 100644 --- a/be/src/vec/exprs/vexpr.h +++ b/be/src/vec/exprs/vexpr.h @@ -22,6 +22,7 @@ #include "common/status.h" #include "exprs/bloomfilter_predicate.h" +#include "exprs/hybrid_set.h" #include "gen_cpp/Exprs_types.h" #include "runtime/types.h" #include "udf/udf_internal.h" @@ -146,7 +147,7 @@ public: /// expr. virtual ColumnPtrWrapper* get_const_col(VExprContext* context); - int fn_context_index() { return _fn_context_index; }; + int fn_context_index() const { return _fn_context_index; }; static const VExpr* expr_without_cast(const VExpr* expr) { if (expr->node_type() == doris::TExprNodeType::CAST_EXPR) { @@ -165,6 +166,8 @@ public: return nullptr; } + virtual std::shared_ptr<HybridSetBase> get_set_func() const { return nullptr; } + protected: /// Simple debug string that provides no expr subclass-specific information std::string debug_string(const std::string& expr_name) const { diff --git a/be/src/vec/exprs/vin_predicate.h b/be/src/vec/exprs/vin_predicate.h index 556c724471..3c494f8e53 100644 --- a/be/src/vec/exprs/vin_predicate.h +++ b/be/src/vec/exprs/vin_predicate.h @@ -17,7 +17,6 @@ #pragma once -#include "exprs/hybrid_set.h" #include "vec/exprs/vexpr.h" #include "vec/functions/function.h" @@ -25,25 +24,25 @@ namespace doris::vectorized { class VInPredicate final : public VExpr { public: VInPredicate(const TExprNode& node); - ~VInPredicate() = default; - virtual doris::Status execute(VExprContext* context, doris::vectorized::Block* block, - int* result_column_id) override; - virtual doris::Status prepare(doris::RuntimeState* state, const doris::RowDescriptor& desc, - VExprContext* context) override; - virtual doris::Status open(doris::RuntimeState* state, VExprContext* context, - FunctionContext::FunctionStateScope scope) override; - virtual void close(doris::RuntimeState* state, VExprContext* context, + ~VInPredicate() override = default; + doris::Status execute(VExprContext* context, doris::vectorized::Block* block, + int* result_column_id) override; + doris::Status prepare(doris::RuntimeState* state, const doris::RowDescriptor& desc, + VExprContext* context) override; + doris::Status open(doris::RuntimeState* state, VExprContext* context, FunctionContext::FunctionStateScope scope) override; - virtual VExpr* clone(doris::ObjectPool* pool) const override { + void close(doris::RuntimeState* state, VExprContext* context, + FunctionContext::FunctionStateScope scope) override; + VExpr* clone(doris::ObjectPool* pool) const override { return pool->add(new VInPredicate(*this)); } - virtual const std::string& expr_name() const override; + const std::string& expr_name() const override; - virtual std::string debug_string() const override; + std::string debug_string() const override; const FunctionBasePtr function() { return _function; }; - const bool is_not_in() { return _is_not_in; }; + const bool is_not_in() const { return _is_not_in; }; private: FunctionBasePtr _function; diff --git a/be/src/vec/exprs/vliteral.cpp b/be/src/vec/exprs/vliteral.cpp index 4f9ad2e7a2..8851b53331 100644 --- a/be/src/vec/exprs/vliteral.cpp +++ b/be/src/vec/exprs/vliteral.cpp @@ -21,7 +21,6 @@ #include "runtime/jsonb_value.h" #include "runtime/large_int_value.h" -#include "util/jsonb_document.h" #include "util/string_parser.hpp" #include "vec/core/field.h" #include "vec/data_types/data_type_decimal.h" @@ -187,7 +186,7 @@ void VLiteral::init(const TExprNode& node) { Status VLiteral::execute(VExprContext* context, vectorized::Block* block, int* result_column_id) { // Literal expr should return least one row. - size_t row_size = std::max(block->rows(), size_t(1)); + size_t row_size = std::max(block->rows(), _column_ptr->size()); *result_column_id = VExpr::insert_param(block, {_column_ptr, _data_type, _expr_name}, row_size); return Status::OK(); } @@ -196,9 +195,12 @@ std::string VLiteral::debug_string() const { std::stringstream out; out << "VLiteral (name = " << _expr_name; out << ", type = " << _data_type->get_name(); - out << ", value = "; - if (_column_ptr->size() > 0) { - StringRef ref = _column_ptr->get_data_at(0); + out << ", value = ("; + for (size_t i = 0; i < _column_ptr->size(); i++) { + if (i != 0) { + out << ", "; + } + StringRef ref = _column_ptr->get_data_at(i); if (ref.data == nullptr) { out << "null"; } else { @@ -234,7 +236,17 @@ std::string VLiteral::debug_string() const { case TYPE_DATETIME: { auto value = *(reinterpret_cast<const int64_t*>(ref.data)); auto date_value = (VecDateTimeValue*)&value; - out << date_value; + out << *date_value; + break; + } + case TYPE_DATEV2: { + auto* value = (DateV2Value<DateV2ValueType>*)ref.data; + out << *value; + break; + } + case TYPE_DATETIMEV2: { + auto* value = (DateV2Value<DateTimeV2ValueType>*)ref.data; + out << *value; break; } case TYPE_STRING: @@ -271,7 +283,7 @@ std::string VLiteral::debug_string() const { } } } - out << ")"; + out << "))"; return out.str(); } } // namespace vectorized diff --git a/be/src/vec/exprs/vliteral.h b/be/src/vec/exprs/vliteral.h index 3a34ff397f..c5c1c47066 100644 --- a/be/src/vec/exprs/vliteral.h +++ b/be/src/vec/exprs/vliteral.h @@ -18,7 +18,6 @@ #pragma once #include "vec/columns/column.h" -#include "vec/columns/column_const.h" #include "vec/exprs/vexpr.h" namespace doris { @@ -33,15 +32,10 @@ public: init(node); } }; - virtual ~VLiteral() = default; - virtual Status execute(VExprContext* context, vectorized::Block* block, - int* result_column_id) override; - virtual const std::string& expr_name() const override { return _expr_name; } - virtual VExpr* clone(doris::ObjectPool* pool) const override { - return pool->add(new VLiteral(*this)); - } - - virtual std::string debug_string() const override; + Status execute(VExprContext* context, vectorized::Block* block, int* result_column_id) override; + const std::string& expr_name() const override { return _expr_name; } + VExpr* clone(doris::ObjectPool* pool) const override { return pool->add(new VLiteral(*this)); } + std::string debug_string() const override; protected: ColumnPtr _column_ptr; diff --git a/be/src/vec/exprs/vruntimefilter_wrapper.h b/be/src/vec/exprs/vruntimefilter_wrapper.h index e24dba6b2f..624e4918d8 100644 --- a/be/src/vec/exprs/vruntimefilter_wrapper.h +++ b/be/src/vec/exprs/vruntimefilter_wrapper.h @@ -24,7 +24,7 @@ class VRuntimeFilterWrapper final : public VExpr { public: VRuntimeFilterWrapper(const TExprNode& node, VExpr* impl); VRuntimeFilterWrapper(const VRuntimeFilterWrapper& vexpr); - ~VRuntimeFilterWrapper() = default; + ~VRuntimeFilterWrapper() override = default; doris::Status execute(VExprContext* context, doris::vectorized::Block* block, int* result_column_id) override; doris::Status prepare(doris::RuntimeState* state, const doris::RowDescriptor& desc, diff --git a/be/src/vec/functions/in.h b/be/src/vec/functions/in.h index bdf21cda96..79187abb2b 100644 --- a/be/src/vec/functions/in.h +++ b/be/src/vec/functions/in.h @@ -21,9 +21,7 @@ #include <fmt/format.h> #include "exprs/create_predicate_function.h" -#include "vec/columns/column_const.h" #include "vec/columns/column_nullable.h" -#include "vec/columns/column_set.h" #include "vec/columns/columns_number.h" #include "vec/data_types/data_type_nullable.h" #include "vec/data_types/data_type_number.h" @@ -54,7 +52,9 @@ public: DataTypePtr get_return_type_impl(const DataTypes& args) const override { for (const auto& arg : args) { - if (arg->is_nullable()) return make_nullable(std::make_shared<DataTypeUInt8>()); + if (arg->is_nullable()) { + return make_nullable(std::make_shared<DataTypeUInt8>()); + } } return std::make_shared<DataTypeUInt8>(); } @@ -74,7 +74,7 @@ public: state->hybrid_set.reset(new StringValueSet()); } else { state->hybrid_set.reset( - vec_create_set(convert_type_to_primitive(context->get_arg_type(0)->type))); + create_set(convert_type_to_primitive(context->get_arg_type(0)->type), true)); } DCHECK(context->get_num_args() >= 1); @@ -199,16 +199,17 @@ public: continue; } - std::unique_ptr<HybridSetBase> hybrid_set( - vec_create_set(convert_type_to_primitive(context->get_arg_type(0)->type))); + std::unique_ptr<HybridSetBase> hybrid_set(create_set( + convert_type_to_primitive(context->get_arg_type(0)->type), true)); bool null_in_set = false; for (const auto& set_column : set_columns) { auto set_data = set_column->get_data_at(i); - if (set_data.data == nullptr) + if (set_data.data == nullptr) { null_in_set = true; - else + } else { hybrid_set->insert((void*)(set_data.data), set_data.size); + } } vec_res[i] = negative ^ hybrid_set->find((void*)ref_data.data, ref_data.size); if (null_in_set) { diff --git a/be/test/olap/bloom_filter_column_predicate_test.cpp b/be/test/olap/bloom_filter_column_predicate_test.cpp index 2db411c654..ae27d609ba 100644 --- a/be/test/olap/bloom_filter_column_predicate_test.cpp +++ b/be/test/olap/bloom_filter_column_predicate_test.cpp @@ -21,8 +21,8 @@ #include "agent/be_exec_version_manager.h" #include "exprs/create_predicate_function.h" -#include "olap/bloom_filter_predicate.h" #include "olap/column_predicate.h" +#include "olap/predicate_creator.h" #include "olap/row_block2.h" #include "runtime/mem_pool.h" #include "vec/columns/column_nullable.h" @@ -87,8 +87,8 @@ TEST_F(TestBloomFilterColumnPredicate, FLOAT_COLUMN) { bloom_filter->insert(reinterpret_cast<void*>(&value)); value = 6.1; bloom_filter->insert(reinterpret_cast<void*>(&value)); - ColumnPredicate* pred = BloomFilterColumnPredicateFactory::create_column_predicate( - 0, bloom_filter, OLAP_FIELD_TYPE_FLOAT, BeExecVersionManager::get_newest_version()); + ColumnPredicate* pred = create_column_predicate(0, bloom_filter, OLAP_FIELD_TYPE_FLOAT, + BeExecVersionManager::get_newest_version()); // for ColumnBlock no null init_row_block(tablet_schema, size); @@ -142,8 +142,8 @@ TEST_F(TestBloomFilterColumnPredicate, FLOAT_COLUMN_VEC) { int offsets[3] = {0, 1, 2}; bloom_filter->insert_fixed_len((char*)values, offsets, 3); - ColumnPredicate* pred = BloomFilterColumnPredicateFactory::create_column_predicate( - 0, bloom_filter, OLAP_FIELD_TYPE_FLOAT, BeExecVersionManager::get_newest_version()); + ColumnPredicate* pred = create_column_predicate(0, bloom_filter, OLAP_FIELD_TYPE_FLOAT, + BeExecVersionManager::get_newest_version()); auto* col_data = reinterpret_cast<float*>(_mem_pool->allocate(size * sizeof(float))); // for vectorized::Block no null diff --git a/be/test/olap/in_list_predicate_test.cpp b/be/test/olap/in_list_predicate_test.cpp index eccac83e97..73413c0c10 100644 --- a/be/test/olap/in_list_predicate_test.cpp +++ b/be/test/olap/in_list_predicate_test.cpp @@ -22,11 +22,8 @@ #include <time.h> #include "olap/column_predicate.h" -#include "olap/field.h" #include "olap/row_block2.h" #include "runtime/mem_pool.h" -#include "runtime/primitive_type.h" -#include "runtime/string_value.hpp" namespace doris { @@ -150,8 +147,8 @@ public: values.insert(4); \ values.insert(5); \ values.insert(6); \ - ColumnPredicate* pred = new InListPredicateBase<PRIMITIVE_TYPE, PredicateType::IN_LIST>( \ - 0, std::move(values)); \ + ColumnPredicate* pred = \ + new InListPredicateBase<PRIMITIVE_TYPE, PredicateType::IN_LIST>(0, values); \ uint16_t sel[10]; \ for (int i = 0; i < 10; ++i) { \ sel[i] = i; \ @@ -216,8 +213,7 @@ TEST_F(TestInListPredicate, FLOAT_COLUMN) { values.insert(4.1); values.insert(5.1); values.insert(6.1); - ColumnPredicate* pred = - new InListPredicateBase<TYPE_FLOAT, PredicateType::IN_LIST>(0, std::move(values)); + ColumnPredicate* pred = new InListPredicateBase<TYPE_FLOAT, PredicateType::IN_LIST>(0, values); // for ColumnBlock no null init_row_block(tablet_schema, size); @@ -267,8 +263,7 @@ TEST_F(TestInListPredicate, DOUBLE_COLUMN) { values.insert(5.1); values.insert(6.1); - ColumnPredicate* pred = - new InListPredicateBase<TYPE_DOUBLE, PredicateType::IN_LIST>(0, std::move(values)); + ColumnPredicate* pred = new InListPredicateBase<TYPE_DOUBLE, PredicateType::IN_LIST>(0, values); // for ColumnBlock no null init_row_block(tablet_schema, size); @@ -323,7 +318,7 @@ TEST_F(TestInListPredicate, DECIMAL_COLUMN) { values.insert(value3); ColumnPredicate* pred = - new InListPredicateBase<TYPE_DECIMALV2, PredicateType::IN_LIST>(0, std::move(values)); + new InListPredicateBase<TYPE_DECIMALV2, PredicateType::IN_LIST>(0, values); // for ColumnBlock no null init_row_block(tablet_schema, size); @@ -388,8 +383,7 @@ TEST_F(TestInListPredicate, CHAR_COLUMN) { value3.len = 5; values.insert(value3); - ColumnPredicate* pred = - new InListPredicateBase<TYPE_CHAR, PredicateType::IN_LIST>(0, std::move(values)); + ColumnPredicate* pred = new InListPredicateBase<TYPE_CHAR, PredicateType::IN_LIST>(0, values); // for ColumnBlock no null init_row_block(tablet_schema, size); @@ -467,7 +461,7 @@ TEST_F(TestInListPredicate, VARCHAR_COLUMN) { values.insert(value3); ColumnPredicate* pred = - new InListPredicateBase<TYPE_VARCHAR, PredicateType::IN_LIST>(0, std::move(values)); + new InListPredicateBase<TYPE_VARCHAR, PredicateType::IN_LIST>(0, values); // for ColumnBlock no null init_row_block(tablet_schema, size); @@ -533,8 +527,7 @@ TEST_F(TestInListPredicate, DATE_COLUMN) { uint24_t value3 = datetime::timestamp_from_date("2017-09-11"); values.insert(value3); - ColumnPredicate* pred = - new InListPredicateBase<TYPE_DATE, PredicateType::IN_LIST>(0, std::move(values)); + ColumnPredicate* pred = new InListPredicateBase<TYPE_DATE, PredicateType::IN_LIST>(0, values); std::vector<std::string> date_array; date_array.push_back("2017-09-07"); @@ -606,8 +599,7 @@ TEST_F(TestInListPredicate, DATE_V2_COLUMN) { uint32_t value3 = datetime::timestamp_from_date_v2("2017-09-11"); values.insert(value3); - ColumnPredicate* pred = - new InListPredicateBase<TYPE_DATEV2, PredicateType::IN_LIST>(0, std::move(values)); + ColumnPredicate* pred = new InListPredicateBase<TYPE_DATEV2, PredicateType::IN_LIST>(0, values); std::vector<std::string> date_array; date_array.push_back("2017-09-07"); @@ -681,7 +673,7 @@ TEST_F(TestInListPredicate, DATETIME_COLUMN) { values.insert(value3); ColumnPredicate* pred = - new InListPredicateBase<TYPE_DATETIME, PredicateType::IN_LIST>(0, std::move(values)); + new InListPredicateBase<TYPE_DATETIME, PredicateType::IN_LIST>(0, values); std::vector<std::string> date_array; date_array.push_back("2017-09-07 00:00:00"); diff --git a/be/test/olap/rowset/segment_v2/segment_test.cpp b/be/test/olap/rowset/segment_v2/segment_test.cpp index e793d4acd3..068d5b3ee3 100644 --- a/be/test/olap/rowset/segment_v2/segment_test.cpp +++ b/be/test/olap/rowset/segment_v2/segment_test.cpp @@ -25,7 +25,6 @@ #include <memory> #include <vector> -#include "common/logging.h" #include "io/fs/file_system.h" #include "io/fs/file_writer.h" #include "io/fs/local_file_system.h" @@ -33,18 +32,14 @@ #include "olap/data_dir.h" #include "olap/in_list_predicate.h" #include "olap/olap_common.h" -#include "olap/row_block.h" #include "olap/row_block2.h" #include "olap/row_cursor.h" -#include "olap/rowset/segment_v2/segment_iterator.h" #include "olap/rowset/segment_v2/segment_writer.h" #include "olap/storage_engine.h" #include "olap/tablet_schema.h" #include "olap/tablet_schema_helper.h" -#include "olap/types.h" #include "runtime/mem_pool.h" #include "testutil/test_util.h" -#include "util/debug_util.h" #include "util/file_utils.h" #include "util/key_util.h" @@ -976,8 +971,7 @@ TEST_F(SegmentReaderWriterTest, TestBitmapPredicate) { values.insert(20); values.insert(1); std::unique_ptr<ColumnPredicate> predicate( - new InListPredicateBase<TYPE_INT, PredicateType::IN_LIST>(0, - std::move(values))); + new InListPredicateBase<TYPE_INT, PredicateType::IN_LIST>(0, values)); column_predicates.emplace_back(predicate.get()); StorageReadOptions read_opts; @@ -1001,8 +995,7 @@ TEST_F(SegmentReaderWriterTest, TestBitmapPredicate) { values.insert(10); values.insert(20); std::unique_ptr<ColumnPredicate> predicate( - new InListPredicateBase<TYPE_INT, PredicateType::NOT_IN_LIST>( - 0, std::move(values))); + new InListPredicateBase<TYPE_INT, PredicateType::NOT_IN_LIST>(0, values)); column_predicates.emplace_back(predicate.get()); StorageReadOptions read_opts; diff --git a/docs/en/docs/advanced/join-optimization/runtime-filter.md b/docs/en/docs/advanced/join-optimization/runtime-filter.md index de5bacb926..da657ea1f7 100644 --- a/docs/en/docs/advanced/join-optimization/runtime-filter.md +++ b/docs/en/docs/advanced/join-optimization/runtime-filter.md @@ -105,7 +105,7 @@ For query options related to Runtime Filter, please refer to the following secti - `runtime_bloom_filter_size`: The default length of Bloom Filter in Runtime Filter, the default is 2097152 (2M) - - `runtime_filter_max_in_num`: If the number of rows in the right table of the join is greater than this value, we will not generate an IN predicate, the default is 1024 + - `runtime_filter_max_in_num`: If the number of rows in the right table of the join is greater than this value, we will not generate an IN predicate, the default is 102400 The query options are further explained below. @@ -124,7 +124,7 @@ set runtime_filter_type=7; **Precautions for use** - **IN or Bloom Filter**: According to the actual number of rows in the right table during execution, the system automatically determines whether to use IN predicate or Bloom Filter. - - By default, IN Predicate will be used when the number of data rows in the right table is less than 1024 (which can be adjusted by ` runtime_filter_max_in_num 'in the session variable). Otherwise, use bloom filter. + - By default, IN Predicate will be used when the number of data rows in the right table is less than 102400 (which can be adjusted by ` runtime_filter_max_in_num 'in the session variable). Otherwise, use bloom filter. - **Bloom Filter**: There is a certain misjudgment rate, which results in the filtered data being a little less than expected, but it will not cause the final result to be inaccurate. In most cases, Bloom Filter can improve performance or has no significant impact on performance, but in some cases Under circumstances will cause performance degradation. - Bloom Filter construction and application overhead is high, so when the filtering rate is low, or the amount of data in the left table is small, Bloom Filter may cause performance degradation. - At present, only the Key column of the left table can be pushed down to the storage engine if the Bloom Filter is applied, and the test results show that the performance of the Bloom Filter is often reduced when the Bloom Filter is not pushed down to the storage engine. @@ -135,7 +135,6 @@ set runtime_filter_type=7; - When the type of the Key column in the join on clause is varchar, etc., applying the MinMax Filter will often cause performance degradation. - **IN predicate**: Construct IN predicate based on all the values of Key listed in the join on clause on the right table, and use the constructed IN predicate to filter on the left table. Compared with Bloom Filter, the cost of construction and application is lower. The amount of data in the right table is lower. When it is less, it tends to perform better. - - By default, only the number of data rows in the right table is less than 1024 will be pushed down (can be adjusted by `runtime_filter_max_in_num` in the session variable). - Currently IN predicate already implement a merge method. - When IN predicate and other filters are specified at the same time, and the filtering value of IN predicate does not reach runtime_filter_max_in_num will try to remove other filters. The reason is that IN predicate is an accurate filtering condition. Even if there is no other filter, it can filter efficiently. If it is used at the same time, other filters will do useless work. Currently, only when the producer and consumer of the runtime filter are in the same fragment can there be [...] diff --git a/docs/zh-CN/docs/advanced/join-optimization/runtime-filter.md b/docs/zh-CN/docs/advanced/join-optimization/runtime-filter.md index af9e9409bb..6377b3d391 100644 --- a/docs/zh-CN/docs/advanced/join-optimization/runtime-filter.md +++ b/docs/zh-CN/docs/advanced/join-optimization/runtime-filter.md @@ -102,7 +102,7 @@ Runtime Filter主要用于大表join小表的优化,如果左表的数据量 - `runtime_bloom_filter_min_size`: Runtime Filter中Bloom Filter的最小长度,默认1048576(1M) - `runtime_bloom_filter_max_size`: Runtime Filter中Bloom Filter的最大长度,默认16777216(16M) - `runtime_bloom_filter_size`: Runtime Filter中Bloom Filter的默认长度,默认2097152(2M) - - `runtime_filter_max_in_num`: 如果join右表数据行数大于这个值,我们将不生成IN predicate,默认1024 + - `runtime_filter_max_in_num`: 如果join右表数据行数大于这个值,我们将不生成IN predicate,默认102400 下面对查询选项做进一步说明。 @@ -125,7 +125,7 @@ set runtime_filter_type=7; **使用注意事项** - **IN or Bloom Filter**: 根据右表在执行过程中的真实行数,由系统自动判断使用 IN predicate 还是 Bloom Filter - - 默认在右表数据行数少于1024时会使用IN predicate(可通过session变量中的`runtime_filter_max_in_num`调整),否则使用Bloom filter。 + - 默认在右表数据行数少于102400时会使用IN predicate(可通过session变量中的`runtime_filter_max_in_num`调整),否则使用Bloom filter。 - **Bloom Filter**: 有一定的误判率,导致过滤的数据比预期少一点,但不会导致最终结果不准确,在大部分情况下Bloom Filter都可以提升性能或对性能没有显著影响,但在部分情况下会导致性能降低。 - Bloom Filter构建和应用的开销较高,所以当过滤率较低时,或者左表数据量较少时,Bloom Filter可能会导致性能降低。 - 目前只有左表的Key列应用Bloom Filter才能下推到存储引擎,而测试结果显示Bloom Filter不下推到存储引擎时往往会导致性能降低。 @@ -134,7 +134,6 @@ set runtime_filter_type=7; - 当join on clause中Key列的类型为int/bigint/double等时,极端情况下,如果左右表的最大最小值相同则没有效果,反之右表最大值小于左表最小值,或右表最小值大于左表最大值,则效果最好。 - 当join on clause中Key列的类型为varchar等时,应用MinMax Filter往往会导致性能降低。 - **IN predicate**: 根据join on clause中Key列在右表上的所有值构建IN predicate,使用构建的IN predicate在左表上过滤,相比Bloom Filter构建和应用的开销更低,在右表数据量较少时往往性能更高。 - - 默认只有右表数据行数少于1024才会下推(可通过session变量中的`runtime_filter_max_in_num`调整)。 - 目前IN predicate已实现合并方法。 - 当同时指定In predicate和其他filter,并且in的过滤数值没达到runtime_filter_max_in_num时,会尝试把其他filter去除掉。原因是In predicate是精确的过滤条件,即使没有其他filter也可以高效过滤,如果同时使用则其他filter会做无用功。目前仅在Runtime filter的生产者和消费者处于同一个fragment时才会有去除非in filter的逻辑。 diff --git a/fe/fe-core/src/main/java/org/apache/doris/qe/SessionVariable.java b/fe/fe-core/src/main/java/org/apache/doris/qe/SessionVariable.java index 4d7c4a8a80..64b8dad6d5 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/qe/SessionVariable.java +++ b/fe/fe-core/src/main/java/org/apache/doris/qe/SessionVariable.java @@ -488,7 +488,7 @@ public class SessionVariable implements Serializable, Writable { private int runtimeFilterType = 8; @VariableMgr.VarAttr(name = RUNTIME_FILTER_MAX_IN_NUM) - private int runtimeFilterMaxInNum = 1024; + private int runtimeFilterMaxInNum = 102400; @VariableMgr.VarAttr(name = DISABLE_JOIN_REORDER) private boolean disableJoinReorder = false; --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org For additional commands, e-mail: commits-h...@doris.apache.org