This is an automated email from the ASF dual-hosted git repository. panxiaolei pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/master by this push: new 245490d6b7 [Enhancement](runtime filter) optimize for runtime filter (#12856) 245490d6b7 is described below commit 245490d6b778db6a0eb2e14f62132464fbd96969 Author: Pxl <pxl...@qq.com> AuthorDate: Sun Oct 9 14:11:03 2022 +0800 [Enhancement](runtime filter) optimize for runtime filter (#12856) optimize for runtime filter --- be/src/exec/olap_scan_node.h | 2 +- be/src/exec/olap_scanner.cpp | 4 +- be/src/exec/olap_scanner.h | 4 +- be/src/exprs/bloomfilter_predicate.cpp | 4 +- be/src/exprs/bloomfilter_predicate.h | 312 ++++++++++++--------- be/src/exprs/create_predicate_function.h | 8 +- be/src/exprs/hybrid_set.h | 18 ++ be/src/exprs/minmax_predicate.h | 16 ++ be/src/exprs/runtime_filter.cpp | 55 +++- be/src/exprs/runtime_filter.h | 11 +- be/src/exprs/runtime_filter_slots.h | 31 +- be/src/olap/bloom_filter_predicate.cpp | 4 +- be/src/olap/bloom_filter_predicate.h | 18 +- be/src/olap/reader.cpp | 2 +- be/src/olap/reader.h | 4 +- be/src/runtime/primitive_type.cpp | 10 + be/src/runtime/primitive_type.h | 2 + be/src/runtime/row_batch.cpp | 3 +- be/src/util/hash_util.hpp | 8 + be/src/vec/columns/column.h | 1 + be/src/vec/data_types/data_type.h | 1 + be/src/vec/exec/scan/new_olap_scanner.cpp | 4 +- be/src/vec/exec/scan/new_olap_scanner.h | 4 +- be/src/vec/exec/scan/vscan_node.h | 2 +- be/src/vec/exec/volap_scan_node.h | 2 +- be/src/vec/exec/volap_scanner.cpp | 4 +- be/src/vec/exec/volap_scanner.h | 4 +- be/src/vec/exprs/vbloom_predicate.cpp | 18 +- be/src/vec/exprs/vbloom_predicate.h | 6 +- be/src/vec/exprs/vexpr.h | 2 +- be/test/exprs/bloom_filter_predicate_test.cpp | 6 +- .../olap/bloom_filter_column_predicate_test.cpp | 33 ++- 32 files changed, 401 insertions(+), 202 deletions(-) diff --git a/be/src/exec/olap_scan_node.h b/be/src/exec/olap_scan_node.h index 4097c710fe..f5d6e38cfd 100644 --- a/be/src/exec/olap_scan_node.h +++ b/be/src/exec/olap_scan_node.h @@ -188,7 +188,7 @@ protected: // push down bloom filters to storage engine. // 1. std::pair.first :: column name // 2. std::pair.second :: shared_ptr of BloomFilterFuncBase - std::vector<std::pair<std::string, std::shared_ptr<IBloomFilterFuncBase>>> + std::vector<std::pair<std::string, std::shared_ptr<BloomFilterFuncBase>>> _bloom_filters_push_down; // push down functions to storage engine diff --git a/be/src/exec/olap_scanner.cpp b/be/src/exec/olap_scanner.cpp index 75a45b9a46..036025f075 100644 --- a/be/src/exec/olap_scanner.cpp +++ b/be/src/exec/olap_scanner.cpp @@ -59,7 +59,7 @@ OlapScanner::OlapScanner(RuntimeState* runtime_state, OlapScanNode* parent, bool Status OlapScanner::prepare( const TPaloScanRange& scan_range, const std::vector<OlapScanRange*>& key_ranges, const std::vector<TCondition>& filters, - const std::vector<std::pair<string, std::shared_ptr<IBloomFilterFuncBase>>>& bloom_filters, + const std::vector<std::pair<string, std::shared_ptr<BloomFilterFuncBase>>>& bloom_filters, const std::vector<FunctionFilter>& function_filters) { SCOPED_CONSUME_MEM_TRACKER(_mem_tracker); set_tablet_reader(); @@ -165,7 +165,7 @@ Status OlapScanner::open() { // it will be called under tablet read lock because capture rs readers need Status OlapScanner::_init_tablet_reader_params( const std::vector<OlapScanRange*>& key_ranges, const std::vector<TCondition>& filters, - const std::vector<std::pair<string, std::shared_ptr<IBloomFilterFuncBase>>>& bloom_filters, + const std::vector<std::pair<string, std::shared_ptr<BloomFilterFuncBase>>>& bloom_filters, const std::vector<FunctionFilter>& function_filters) { // if the table with rowset [0-x] or [0-1] [2-y], and [0-1] is empty bool single_version = diff --git a/be/src/exec/olap_scanner.h b/be/src/exec/olap_scanner.h index 1bb890dfd3..134a515219 100644 --- a/be/src/exec/olap_scanner.h +++ b/be/src/exec/olap_scanner.h @@ -48,7 +48,7 @@ public: Status prepare(const TPaloScanRange& scan_range, const std::vector<OlapScanRange*>& key_ranges, const std::vector<TCondition>& filters, - const std::vector<std::pair<std::string, std::shared_ptr<IBloomFilterFuncBase>>>& + const std::vector<std::pair<std::string, std::shared_ptr<BloomFilterFuncBase>>>& bloom_filters, const std::vector<FunctionFilter>& function_filters); @@ -95,7 +95,7 @@ public: protected: Status _init_tablet_reader_params( const std::vector<OlapScanRange*>& key_ranges, const std::vector<TCondition>& filters, - const std::vector<std::pair<string, std::shared_ptr<IBloomFilterFuncBase>>>& + const std::vector<std::pair<string, std::shared_ptr<BloomFilterFuncBase>>>& bloom_filters, const std::vector<FunctionFilter>& function_filters); Status _init_return_columns(bool need_seq_col); diff --git a/be/src/exprs/bloomfilter_predicate.cpp b/be/src/exprs/bloomfilter_predicate.cpp index fcf8589ac6..a2ba47e7fc 100644 --- a/be/src/exprs/bloomfilter_predicate.cpp +++ b/be/src/exprs/bloomfilter_predicate.cpp @@ -44,13 +44,13 @@ BloomFilterPredicate::BloomFilterPredicate(const BloomFilterPredicate& other) _scan_rows() {} Status BloomFilterPredicate::prepare(RuntimeState* state, - std::shared_ptr<IBloomFilterFuncBase> filter) { + std::shared_ptr<BloomFilterFuncBase> filter) { // DCHECK(filter != nullptr); if (_is_prepare) { return Status::OK(); } _filter = filter; - if (nullptr == _filter.get()) { + if (nullptr == _filter) { return Status::InternalError("Unknown column type."); } _is_prepare = true; diff --git a/be/src/exprs/bloomfilter_predicate.h b/be/src/exprs/bloomfilter_predicate.h index 0028e31c8c..2540dc657d 100644 --- a/be/src/exprs/bloomfilter_predicate.h +++ b/be/src/exprs/bloomfilter_predicate.h @@ -19,8 +19,10 @@ #include <algorithm> #include <cmath> +#include <cstdint> #include <memory> #include <string> +#include <type_traits> #include "common/object_pool.h" #include "exprs/block_bloom_filter.hpp" @@ -28,21 +30,19 @@ #include "olap/decimal12.h" #include "olap/rowset/segment_v2/bloom_filter.h" #include "olap/uint24.h" +#include "util/hash_util.hpp" namespace doris { -namespace detail { -class BlockBloomFilterAdaptor { +class BloomFilterAdaptor { public: - BlockBloomFilterAdaptor() { _bloom_filter = std::make_shared<doris::BlockBloomFilter>(); } + BloomFilterAdaptor() { _bloom_filter = std::make_shared<doris::BlockBloomFilter>(); } static int64_t optimal_bit_num(int64_t expect_num, double fpp) { return doris::segment_v2::BloomFilter::optimal_bit_num(expect_num, fpp) / 8; } - static BlockBloomFilterAdaptor* create() { return new BlockBloomFilterAdaptor(); } + static BloomFilterAdaptor* create() { return new BloomFilterAdaptor(); } - Status merge(BlockBloomFilterAdaptor* other) { - return _bloom_filter->merge(*other->_bloom_filter); - } + Status merge(BloomFilterAdaptor* other) { return _bloom_filter->merge(*other->_bloom_filter); } Status init(int len) { int log_space = log2(len); @@ -65,44 +65,42 @@ public: void add_bytes(const char* data, size_t len) { _bloom_filter->insert(Slice(data, len)); } + // test_element/find_element only used on vectorized engine + template <typename T> + bool test_element(T element) const { + if constexpr (std::is_same_v<T, Slice>) { + return _bloom_filter->find(element); + } else { + return _bloom_filter->find(HashUtil::fixed_len_to_uint32(element)); + } + } + + template <typename T> + void add_element(T element) { + if constexpr (std::is_same_v<T, Slice>) { + _bloom_filter->insert(element); + } else { + _bloom_filter->insert(HashUtil::fixed_len_to_uint32(element)); + } + } + private: std::shared_ptr<doris::BlockBloomFilter> _bloom_filter; }; -} // namespace detail -using CurrentBloomFilterAdaptor = detail::BlockBloomFilterAdaptor; // Only Used In RuntimeFilter -class IBloomFilterFuncBase { -public: - virtual ~IBloomFilterFuncBase() {} - virtual Status init(int64_t expect_num, double fpp) = 0; - virtual Status init_with_fixed_length(int64_t bloom_filter_length) = 0; - - virtual void insert(const void* data) = 0; - virtual bool find(const void* data) const = 0; - virtual bool find_olap_engine(const void* data) const = 0; - virtual bool find_uint32_t(uint32_t data) const = 0; - - virtual Status merge(IBloomFilterFuncBase* bloomfilter_func) = 0; - virtual Status assign(const char* data, int len) = 0; - - virtual Status get_data(char** data, int* len) = 0; - virtual void light_copy(IBloomFilterFuncBase* other) = 0; -}; - -template <class BloomFilterAdaptor> -class BloomFilterFuncBase : public IBloomFilterFuncBase { +class BloomFilterFuncBase { public: BloomFilterFuncBase() : _inited(false) {} - virtual ~BloomFilterFuncBase() {} + virtual ~BloomFilterFuncBase() = default; - Status init(int64_t expect_num, double fpp) override { + Status init(int64_t expect_num, double fpp) { size_t filter_size = BloomFilterAdaptor::optimal_bit_num(expect_num, fpp); return init_with_fixed_length(filter_size); } - Status init_with_fixed_length(int64_t bloom_filter_length) override { + Status init_with_fixed_length(int64_t bloom_filter_length) { DCHECK(!_inited); DCHECK(bloom_filter_length >= 0); DCHECK_EQ((bloom_filter_length & (bloom_filter_length - 1)), 0); @@ -113,7 +111,7 @@ public: return Status::OK(); } - Status merge(IBloomFilterFuncBase* bloomfilter_func) override { + Status merge(BloomFilterFuncBase* bloomfilter_func) { auto other_func = static_cast<BloomFilterFuncBase*>(bloomfilter_func); if (bloomfilter_func == nullptr) { _bloom_filter.reset(BloomFilterAdaptor::create()); @@ -125,7 +123,7 @@ public: return _bloom_filter->merge(other_func->_bloom_filter.get()); } - Status assign(const char* data, int len) override { + Status assign(const char* data, int len) { if (_bloom_filter == nullptr) { _bloom_filter.reset(BloomFilterAdaptor::create()); } @@ -134,19 +132,35 @@ public: return _bloom_filter->init(data, len); } - Status get_data(char** data, int* len) override { + Status get_data(char** data, int* len) { *data = _bloom_filter->data(); *len = _bloom_filter->size(); return Status::OK(); } - void light_copy(IBloomFilterFuncBase* bloomfilter_func) override { + void light_copy(BloomFilterFuncBase* bloomfilter_func) { auto other_func = static_cast<BloomFilterFuncBase*>(bloomfilter_func); _bloom_filter_alloced = other_func->_bloom_filter_alloced; _bloom_filter = other_func->_bloom_filter; _inited = other_func->_inited; } + virtual void insert(const void* data) = 0; + + virtual bool find(const void* data) const = 0; + + virtual bool find_olap_engine(const void* data) const = 0; + + virtual bool find_uint32_t(uint32_t data) const = 0; + + virtual void insert_fixed_len(const char* data, const int* offsets, int number) = 0; + + virtual uint16_t find_fixed_len_olap_engine(const char* data, const uint8* nullmap, + uint16_t* offsets, int number) = 0; + + virtual void find_fixed_len(const char* data, const uint8* nullmap, int number, + uint8* results) = 0; + protected: // bloom filter size int32_t _bloom_filter_alloced; @@ -154,63 +168,113 @@ protected: bool _inited; }; -template <class T, class BloomFilterAdaptor> +template <class T> struct CommonFindOp { - ALWAYS_INLINE void insert(BloomFilterAdaptor& bloom_filter, const void* data) const { + // test_batch/find_batch/find_batch_olap_engine only used on vectorized engine + void insert_batch(BloomFilterAdaptor& bloom_filter, const char* data, const int* offsets, + int number) const { + for (int i = 0; i < number; i++) { + bloom_filter.add_element(*((T*)data + offsets[i])); + } + } + + uint16_t find_batch_olap_engine(const BloomFilterAdaptor& bloom_filter, const char* data, + const uint8* nullmap, uint16_t* offsets, int number) const { + uint16_t new_size = 0; + for (int i = 0; i < number; i++) { + uint16_t idx = offsets[i]; + if (nullmap != nullptr && nullmap[idx]) { + continue; + } + if (!bloom_filter.test_element(*((T*)data + idx))) { + continue; + } + offsets[new_size++] = idx; + } + return new_size; + } + + void find_batch(const BloomFilterAdaptor& bloom_filter, const char* data, const uint8* nullmap, + int number, uint8* results) const { + for (int i = 0; i < number; i++) { + results[i] = false; + if (nullmap != nullptr && nullmap[i]) { + continue; + } + if (!bloom_filter.test_element(*((T*)data + i))) { + continue; + } + results[i] = true; + } + } + + void insert(BloomFilterAdaptor& bloom_filter, const void* data) const { bloom_filter.add_bytes((char*)data, sizeof(T)); } - ALWAYS_INLINE bool find(const BloomFilterAdaptor& bloom_filter, const void* data) const { + bool find(const BloomFilterAdaptor& bloom_filter, const void* data) const { return bloom_filter.test(Slice((char*)data, sizeof(T))); } - ALWAYS_INLINE bool find_olap_engine(const BloomFilterAdaptor& bloom_filter, - const void* data) const { - return this->find(bloom_filter, data); + bool find_olap_engine(const BloomFilterAdaptor& bloom_filter, const void* data) const { + return find(bloom_filter, data); } - ALWAYS_INLINE bool find(const BloomFilterAdaptor& bloom_filter, uint32_t data) const { + bool find(const BloomFilterAdaptor& bloom_filter, uint32_t data) const { return bloom_filter.test(data); } }; -template <class BloomFilterAdaptor> struct StringFindOp { - ALWAYS_INLINE void insert(BloomFilterAdaptor& bloom_filter, const void* data) const { + void insert_batch(BloomFilterAdaptor& bloom_filter, const char* data, const int* offsets, + int number) const { + LOG(FATAL) << "StringFindOp does not support insert_batch"; + } + + uint16_t find_batch_olap_engine(const BloomFilterAdaptor& bloom_filter, const char* data, + const uint8* nullmap, uint16_t* offsets, int number) const { + LOG(FATAL) << "StringFindOp does not support find_batch_olap_engine"; + return 0; + } + + void find_batch(const BloomFilterAdaptor& bloom_filter, const char* data, const uint8* nullmap, + int number, uint8* results) const { + LOG(FATAL) << "StringFindOp does not support find_batch"; + } + + void insert(BloomFilterAdaptor& bloom_filter, const void* data) const { const auto* value = reinterpret_cast<const StringValue*>(data); if (value) { bloom_filter.add_bytes(value->ptr, value->len); } } - ALWAYS_INLINE bool find(const BloomFilterAdaptor& bloom_filter, const void* data) const { + bool find(const BloomFilterAdaptor& bloom_filter, const void* data) const { const auto* value = reinterpret_cast<const StringValue*>(data); if (value == nullptr) { return false; } return bloom_filter.test(Slice(value->ptr, value->len)); } - ALWAYS_INLINE bool find_olap_engine(const BloomFilterAdaptor& bloom_filter, - const void* data) const { + bool find_olap_engine(const BloomFilterAdaptor& bloom_filter, const void* data) const { return StringFindOp::find(bloom_filter, data); } - ALWAYS_INLINE bool find(const BloomFilterAdaptor& bloom_filter, uint32_t data) const { + bool find(const BloomFilterAdaptor& bloom_filter, uint32_t data) const { return bloom_filter.test(data); } }; // We do not need to judge whether data is empty, because null will not appear // when filer used by the storage engine -template <class BloomFilterAdaptor> -struct FixedStringFindOp : public StringFindOp<BloomFilterAdaptor> { - ALWAYS_INLINE bool find_olap_engine(const BloomFilterAdaptor& bloom_filter, - const void* input_data) const { +struct FixedStringFindOp : public StringFindOp { + bool find_olap_engine(const BloomFilterAdaptor& bloom_filter, const void* input_data) const { const auto* value = reinterpret_cast<const StringValue*>(input_data); int64_t size = value->len; char* data = value->ptr; - while (size > 0 && data[size - 1] == '\0') size--; + while (size > 0 && data[size - 1] == '\0') { + size--; + } return bloom_filter.test(Slice(value->ptr, size)); } }; -template <class BloomFilterAdaptor> -struct DateTimeFindOp : public CommonFindOp<DateTimeValue, BloomFilterAdaptor> { +struct DateTimeFindOp : public CommonFindOp<DateTimeValue> { bool find_olap_engine(const BloomFilterAdaptor& bloom_filter, const void* data) const { DateTimeValue value; value.from_olap_datetime(*reinterpret_cast<const uint64_t*>(data)); @@ -221,8 +285,7 @@ struct DateTimeFindOp : public CommonFindOp<DateTimeValue, BloomFilterAdaptor> { // avoid violating C/C++ aliasing rules. // https://gcc.gnu.org/bugzilla/show_bug.cgi?id=101684 -template <class BloomFilterAdaptor> -struct DateFindOp : public CommonFindOp<DateTimeValue, BloomFilterAdaptor> { +struct DateFindOp : public CommonFindOp<DateTimeValue> { bool find_olap_engine(const BloomFilterAdaptor& bloom_filter, const void* data) const { uint24_t date = *static_cast<const uint24_t*>(data); uint64_t value = uint32_t(date); @@ -237,31 +300,7 @@ struct DateFindOp : public CommonFindOp<DateTimeValue, BloomFilterAdaptor> { } }; -template <class BloomFilterAdaptor> -struct DateV2FindOp - : public CommonFindOp<doris::vectorized::DateV2Value<doris::vectorized::DateV2ValueType>, - BloomFilterAdaptor> { - bool find_olap_engine(const BloomFilterAdaptor& bloom_filter, const void* data) const { - return bloom_filter.test( - Slice((char*)data, - sizeof(doris::vectorized::DateV2Value<doris::vectorized::DateV2ValueType>))); - } -}; - -template <class BloomFilterAdaptor> -struct DateTimeV2FindOp - : public CommonFindOp< - doris::vectorized::DateV2Value<doris::vectorized::DateTimeV2ValueType>, - BloomFilterAdaptor> { - bool find_olap_engine(const BloomFilterAdaptor& bloom_filter, const void* data) const { - return bloom_filter.test(Slice( - (char*)data, - sizeof(doris::vectorized::DateV2Value<doris::vectorized::DateTimeV2ValueType>))); - } -}; - -template <class BloomFilterAdaptor> -struct DecimalV2FindOp : public CommonFindOp<DecimalV2Value, BloomFilterAdaptor> { +struct DecimalV2FindOp : public CommonFindOp<DecimalV2Value> { bool find_olap_engine(const BloomFilterAdaptor& bloom_filter, const void* data) const { auto packed_decimal = *static_cast<const decimal12_t*>(data); DecimalV2Value value; @@ -276,103 +315,106 @@ struct DecimalV2FindOp : public CommonFindOp<DecimalV2Value, BloomFilterAdaptor> } }; -template <PrimitiveType type, class BloomFilterAdaptor> +template <PrimitiveType type> struct BloomFilterTypeTraits { using T = typename PrimitiveTypeTraits<type>::CppType; - using FindOp = CommonFindOp<T, BloomFilterAdaptor>; -}; - -template <class BloomFilterAdaptor> -struct BloomFilterTypeTraits<TYPE_DATE, BloomFilterAdaptor> { - using FindOp = DateFindOp<BloomFilterAdaptor>; -}; - -template <class BloomFilterAdaptor> -struct BloomFilterTypeTraits<TYPE_DATEV2, BloomFilterAdaptor> { - using FindOp = DateV2FindOp<BloomFilterAdaptor>; + using FindOp = CommonFindOp<T>; }; -template <class BloomFilterAdaptor> -struct BloomFilterTypeTraits<TYPE_DATETIMEV2, BloomFilterAdaptor> { - using FindOp = DateTimeV2FindOp<BloomFilterAdaptor>; +template <> +struct BloomFilterTypeTraits<TYPE_DATE> { + using FindOp = DateFindOp; }; -template <class BloomFilterAdaptor> -struct BloomFilterTypeTraits<TYPE_DATETIME, BloomFilterAdaptor> { - using FindOp = DateTimeFindOp<BloomFilterAdaptor>; +template <> +struct BloomFilterTypeTraits<TYPE_DATETIME> { + using FindOp = DateTimeFindOp; }; -template <class BloomFilterAdaptor> -struct BloomFilterTypeTraits<TYPE_DECIMALV2, BloomFilterAdaptor> { - using FindOp = DecimalV2FindOp<BloomFilterAdaptor>; +template <> +struct BloomFilterTypeTraits<TYPE_DECIMALV2> { + using FindOp = DecimalV2FindOp; }; -template <class BloomFilterAdaptor> -struct BloomFilterTypeTraits<TYPE_CHAR, BloomFilterAdaptor> { - using FindOp = FixedStringFindOp<BloomFilterAdaptor>; +template <> +struct BloomFilterTypeTraits<TYPE_CHAR> { + using FindOp = FixedStringFindOp; }; -template <class BloomFilterAdaptor> -struct BloomFilterTypeTraits<TYPE_VARCHAR, BloomFilterAdaptor> { - using FindOp = StringFindOp<BloomFilterAdaptor>; +template <> +struct BloomFilterTypeTraits<TYPE_VARCHAR> { + using FindOp = StringFindOp; }; -template <class BloomFilterAdaptor> -struct BloomFilterTypeTraits<TYPE_STRING, BloomFilterAdaptor> { - using FindOp = StringFindOp<BloomFilterAdaptor>; +template <> +struct BloomFilterTypeTraits<TYPE_STRING> { + using FindOp = StringFindOp; }; -template <PrimitiveType type, class BloomFilterAdaptor> -class BloomFilterFunc final : public BloomFilterFuncBase<BloomFilterAdaptor> { +template <PrimitiveType type> +class BloomFilterFunc final : public BloomFilterFuncBase { public: - BloomFilterFunc() : BloomFilterFuncBase<BloomFilterAdaptor>() {} + BloomFilterFunc() : BloomFilterFuncBase() {} - ~BloomFilterFunc() = default; + ~BloomFilterFunc() override = default; void insert(const void* data) override { - DCHECK(this->_bloom_filter != nullptr); - dummy.insert(*this->_bloom_filter, data); + DCHECK(_bloom_filter != nullptr); + dummy.insert(*_bloom_filter, data); + } + + void insert_fixed_len(const char* data, const int* offsets, int number) override { + DCHECK(_bloom_filter != nullptr); + dummy.insert_batch(*_bloom_filter, data, offsets, number); + } + + uint16_t find_fixed_len_olap_engine(const char* data, const uint8* nullmap, uint16_t* offsets, + int number) override { + return dummy.find_batch_olap_engine(*_bloom_filter, data, nullmap, offsets, number); + } + + void find_fixed_len(const char* data, const uint8* nullmap, int number, + uint8* results) override { + dummy.find_batch(*_bloom_filter, data, nullmap, number, results); } bool find(const void* data) const override { - DCHECK(this->_bloom_filter != nullptr); - return dummy.find(*this->_bloom_filter, data); + DCHECK(_bloom_filter != nullptr); + return dummy.find(*_bloom_filter, data); } bool find_olap_engine(const void* data) const override { - return dummy.find_olap_engine(*this->_bloom_filter, data); + return dummy.find_olap_engine(*_bloom_filter, data); } - bool find_uint32_t(uint32_t data) const override { - return dummy.find(*this->_bloom_filter, data); - } + bool find_uint32_t(uint32_t data) const override { return dummy.find(*_bloom_filter, data); } private: - typename BloomFilterTypeTraits<type, BloomFilterAdaptor>::FindOp dummy; + typename BloomFilterTypeTraits<type>::FindOp dummy; }; // BloomFilterPredicate only used in runtime filter class BloomFilterPredicate : public Predicate { public: - virtual ~BloomFilterPredicate(); + ~BloomFilterPredicate() override; BloomFilterPredicate(const TExprNode& node); BloomFilterPredicate(const BloomFilterPredicate& other); - virtual Expr* clone(ObjectPool* pool) const override { + Expr* clone(ObjectPool* pool) const override { return pool->add(new BloomFilterPredicate(*this)); } using Predicate::prepare; - Status prepare(RuntimeState* state, std::shared_ptr<IBloomFilterFuncBase> bloomfilterfunc); + Status prepare(RuntimeState* state, std::shared_ptr<BloomFilterFuncBase> bloomfilterfunc); - std::shared_ptr<IBloomFilterFuncBase> get_bloom_filter_func() { return _filter; } + std::shared_ptr<BloomFilterFuncBase> get_bloom_filter_func() { return _filter; } - virtual BooleanVal get_boolean_val(ExprContext* context, TupleRow* row) override; + BooleanVal get_boolean_val(ExprContext* context, TupleRow* row) override; - virtual Status open(RuntimeState* state, ExprContext* context, - FunctionContext::FunctionStateScope scope) override; + Status open(RuntimeState* state, ExprContext* context, + FunctionContext::FunctionStateScope scope) override; protected: friend class Expr; - virtual std::string debug_string() const override; + std::string debug_string() const override; private: bool _is_prepare; @@ -382,7 +424,7 @@ private: std::atomic<int64_t> _filtered_rows; std::atomic<int64_t> _scan_rows; - std::shared_ptr<IBloomFilterFuncBase> _filter; + std::shared_ptr<BloomFilterFuncBase> _filter; bool _has_calculate_filter = false; // loop size must be power of 2 constexpr static int64_t _loop_size = 8192; diff --git a/be/src/exprs/create_predicate_function.h b/be/src/exprs/create_predicate_function.h index 5aa0e2347f..bc1ca659ea 100644 --- a/be/src/exprs/create_predicate_function.h +++ b/be/src/exprs/create_predicate_function.h @@ -28,7 +28,7 @@ public: using BasePtr = MinMaxFuncBase*; template <PrimitiveType type> static BasePtr get_function() { - return new (std::nothrow) MinMaxNumFunc<typename PrimitiveTypeTraits<type>::CppType>(); + return new MinMaxNumFunc<typename PrimitiveTypeTraits<type>::CppType>(); }; }; @@ -41,16 +41,16 @@ public: using CppType = typename PrimitiveTypeTraits<type>::CppType; using Set = std::conditional_t<std::is_same_v<CppType, StringValue>, StringSet, HybridSet<type, is_vec>>; - return new (std::nothrow) Set(); + return new Set(); }; }; class BloomFilterTraits { public: - using BasePtr = IBloomFilterFuncBase*; + using BasePtr = BloomFilterFuncBase*; template <PrimitiveType type> static BasePtr get_function() { - return new BloomFilterFunc<type, CurrentBloomFilterAdaptor>(); + return new BloomFilterFunc<type>(); }; }; diff --git a/be/src/exprs/hybrid_set.h b/be/src/exprs/hybrid_set.h index 20c9721675..571aa82b87 100644 --- a/be/src/exprs/hybrid_set.h +++ b/be/src/exprs/hybrid_set.h @@ -41,6 +41,8 @@ public: // use in vectorize execute engine virtual void insert(void* data, size_t) = 0; + virtual void insert_fixed_len(const char* data, const int* offsets, int number) = 0; + virtual void insert(HybridSetBase* set) = 0; virtual int size() = 0; @@ -106,6 +108,12 @@ public: } void insert(void* data, size_t) override { insert(data); } + void insert_fixed_len(const char* data, const int* offsets, int number) override { + for (int i = 0; i < number; i++) { + _set.insert(*((CppType*)data + offsets[i])); + } + } + void insert(HybridSetBase* set) override { HybridSet<T, is_vec>* hybrid_set = reinterpret_cast<HybridSet<T, is_vec>*>(set); _set.insert(hybrid_set->_set.begin(), hybrid_set->_set.end()); @@ -173,11 +181,16 @@ public: std::string str_value(value->ptr, value->len); _set.insert(str_value); } + void insert(void* data, size_t size) override { std::string str_value(reinterpret_cast<char*>(data), size); _set.insert(str_value); } + void insert_fixed_len(const char* data, const int* offsets, int number) override { + LOG(FATAL) << "string set not support insert_fixed_len"; + } + void insert(HybridSetBase* set) override { StringSet* string_set = reinterpret_cast<StringSet*>(set); _set.insert(string_set->_set.begin(), string_set->_set.end()); @@ -259,11 +272,16 @@ public: StringValue sv(value->ptr, value->len); _set.insert(sv); } + void insert(void* data, size_t size) override { StringValue sv(reinterpret_cast<char*>(data), size); _set.insert(sv); } + void insert_fixed_len(const char* data, const int* offsets, int number) override { + LOG(FATAL) << "string set not support insert_fixed_len"; + } + void insert(HybridSetBase* set) override { StringValueSet* string_set = reinterpret_cast<StringValueSet*>(set); _set.insert(string_set->_set.begin(), string_set->_set.end()); diff --git a/be/src/exprs/minmax_predicate.h b/be/src/exprs/minmax_predicate.h index ef811c5c9e..a237f7f8b2 100644 --- a/be/src/exprs/minmax_predicate.h +++ b/be/src/exprs/minmax_predicate.h @@ -25,6 +25,7 @@ namespace doris { class MinMaxFuncBase { public: virtual void insert(const void* data) = 0; + virtual void insert_fixed_len(const char* data, const int* offsets, int number) = 0; virtual bool find(void* data) = 0; virtual bool is_empty() = 0; virtual void* get_max() = 0; @@ -68,6 +69,21 @@ public: } } + void insert_fixed_len(const char* data, const int* offsets, int number) override { + if (!number) { + return; + } + if (_empty) { + _min = *((T*)data + offsets[0]); + _max = *((T*)data + offsets[0]); + } + for (int i = _empty; i < number; i++) { + _min = std::min(_min, *((T*)data + offsets[i])); + _max = std::max(_max, *((T*)data + offsets[i])); + } + _empty = false; + } + bool find(void* data) override { if (data == nullptr) { return false; diff --git a/be/src/exprs/runtime_filter.cpp b/be/src/exprs/runtime_filter.cpp index 89e832f3d0..1311f2b928 100644 --- a/be/src/exprs/runtime_filter.cpp +++ b/be/src/exprs/runtime_filter.cpp @@ -31,12 +31,14 @@ #include "exprs/literal.h" #include "exprs/minmax_predicate.h" #include "gen_cpp/internal_service.pb.h" +#include "runtime/define_primitive_type.h" #include "runtime/large_int_value.h" #include "runtime/primitive_type.h" #include "runtime/runtime_filter_mgr.h" #include "runtime/runtime_state.h" #include "util/runtime_profile.h" #include "util/string_parser.hpp" +#include "vec/columns/column.h" #include "vec/exprs/vbloom_predicate.h" #include "vec/exprs/vexpr.h" #include "vec/exprs/vruntimefilter_wrapper.h" @@ -489,8 +491,41 @@ public: break; } } + + void insert_fixed_len(const char* data, const int* offsets, int number) { + switch (_filter_type) { + case RuntimeFilterType::IN_FILTER: { + if (_is_ignored_in_filter) { + break; + } + _hybrid_set->insert_fixed_len(data, offsets, number); + break; + } + case RuntimeFilterType::MINMAX_FILTER: { + _minmax_func->insert_fixed_len(data, offsets, number); + break; + } + case RuntimeFilterType::BLOOM_FILTER: { + _bloomfilter_func->insert_fixed_len(data, offsets, number); + break; + } + case RuntimeFilterType::IN_OR_BLOOM_FILTER: { + if (_is_bloomfilter) { + _bloomfilter_func->insert_fixed_len(data, offsets, number); + } else { + _hybrid_set->insert_fixed_len(data, offsets, number); + } + break; + } + default: + DCHECK(false); + break; + } + } + void insert(const StringRef& value) { switch (_column_return_type) { + // todo: rethink logic of hll/bitmap/date case TYPE_DATE: case TYPE_DATETIME: { // DateTime->DateTimeValue @@ -521,6 +556,16 @@ public: } } + void insert_batch(const vectorized::ColumnPtr column, const std::vector<int>& rows) { + if (IRuntimeFilter::enable_use_batch(_column_return_type)) { + insert_fixed_len(column->get_raw_data().data, rows.data(), rows.size()); + } else { + for (int index : rows) { + insert(column->get_data_at(index)); + } + } + } + RuntimeFilterType get_real_type() { auto real_filter_type = _filter_type; if (real_filter_type == RuntimeFilterType::IN_OR_BLOOM_FILTER) { @@ -1000,7 +1045,7 @@ private: int32_t _max_in_num = -1; std::unique_ptr<MinMaxFuncBase> _minmax_func; std::unique_ptr<HybridSetBase> _hybrid_set; - std::shared_ptr<IBloomFilterFuncBase> _bloomfilter_func; + std::shared_ptr<BloomFilterFuncBase> _bloomfilter_func; bool _is_bloomfilter = false; bool _is_ignored_in_filter = false; std::string* _ignored_in_filter_msg = nullptr; @@ -1029,6 +1074,12 @@ void IRuntimeFilter::insert(const StringRef& value) { _wrapper->insert(value); } +void IRuntimeFilter::insert_batch(const vectorized::ColumnPtr column, + const std::vector<int>& rows) { + DCHECK(is_producer()); + _wrapper->insert_batch(column, rows); +} + Status IRuntimeFilter::publish() { DCHECK(is_producer()); if (_has_local_target) { @@ -1255,7 +1306,7 @@ void IRuntimeFilter::init_profile(RuntimeProfile* parent_profile) { } void IRuntimeFilter::update_runtime_filter_type_to_profile() { - if (_profile.get() != nullptr) { + if (_profile != nullptr) { _profile->add_info_string("RealRuntimeFilterType", ::doris::to_string(_wrapper->get_real_type())); } diff --git a/be/src/exprs/runtime_filter.h b/be/src/exprs/runtime_filter.h index 587c0aa87a..60162b98f0 100644 --- a/be/src/exprs/runtime_filter.h +++ b/be/src/exprs/runtime_filter.h @@ -139,6 +139,7 @@ public: // only used for producer void insert(const void* data); void insert(const StringRef& data); + void insert_batch(vectorized::ColumnPtr column, const std::vector<int>& rows); // publish filter // push filter to remote node or push down it to scan_node @@ -175,7 +176,7 @@ public: bool is_producer() const { return _role == RuntimeFilterRole::PRODUCER; } bool is_consumer() const { return _role == RuntimeFilterRole::CONSUMER; } void set_role(const RuntimeFilterRole role) { _role = role; } - int expr_order() { return _expr_order; } + int expr_order() const { return _expr_order; } // only used for consumer // if filter is not ready for filter data scan_node @@ -209,7 +210,7 @@ public: void set_ignored() { _is_ignored = true; } // for ut - bool is_ignored() { return _is_ignored; } + bool is_ignored() const { return _is_ignored; } void set_ignored_msg(std::string& msg) { _ignored_msg = msg; } @@ -231,6 +232,10 @@ public: void ready_for_publish(); + static bool enable_use_batch(PrimitiveType type) { + return is_int_or_bool(type) || is_float_or_double(type); + } + protected: // serialize _wrapper to protobuf void to_protobuf(PInFilter* filter); @@ -282,7 +287,7 @@ protected: // Indicate whether runtime filter expr has been ignored bool _is_ignored; - std::string _ignored_msg = ""; + std::string _ignored_msg; // some runtime filter will generate // multiple contexts such as minmax filter diff --git a/be/src/exprs/runtime_filter_slots.h b/be/src/exprs/runtime_filter_slots.h index 144a31224c..43fca0a71f 100644 --- a/be/src/exprs/runtime_filter_slots.h +++ b/be/src/exprs/runtime_filter_slots.h @@ -20,6 +20,9 @@ #include "exprs/runtime_filter.h" #include "runtime/runtime_filter_mgr.h" #include "runtime/runtime_state.h" +#include "vec/columns/column_nullable.h" +#include "vec/columns/columns_number.h" +#include "vec/common/assert_cast.h" namespace doris { // this class used in a hash join node @@ -164,10 +167,13 @@ public: } } } + void insert(std::unordered_map<const vectorized::Block*, std::vector<int>>& datas) { for (int i = 0; i < _build_expr_context.size(); ++i) { auto iter = _runtime_filters.find(i); - if (iter == _runtime_filters.end()) continue; + if (iter == _runtime_filters.end()) { + continue; + } int result_column_id = _build_expr_context[i]->get_last_result_column_id(); for (auto it : datas) { @@ -175,24 +181,23 @@ public: if (auto* nullable = vectorized::check_and_get_column<vectorized::ColumnNullable>(*column)) { - auto& column_nested = nullable->get_nested_column(); - auto& column_nullmap = nullable->get_null_map_column(); + auto& column_nested = nullable->get_nested_column_ptr(); + auto& column_nullmap = nullable->get_null_map_column_ptr(); + std::vector<int> indexs; for (int row_num : it.second) { - if (column_nullmap.get_bool(row_num)) { + if (assert_cast<const vectorized::ColumnUInt8*>(column_nullmap.get()) + ->get_bool(row_num)) { continue; } - const auto& ref_data = column_nested.get_data_at(row_num); - for (auto filter : iter->second) { - filter->insert(ref_data); - } + indexs.push_back(row_num); + } + for (auto filter : iter->second) { + filter->insert_batch(column_nested, indexs); } } else { - for (int row_num : it.second) { - const auto& ref_data = column->get_data_at(row_num); - for (auto filter : iter->second) { - filter->insert(ref_data); - } + for (auto filter : iter->second) { + filter->insert_batch(column, it.second); } } } diff --git a/be/src/olap/bloom_filter_predicate.cpp b/be/src/olap/bloom_filter_predicate.cpp index 299e4e1e1c..8d281cb06a 100644 --- a/be/src/olap/bloom_filter_predicate.cpp +++ b/be/src/olap/bloom_filter_predicate.cpp @@ -40,9 +40,9 @@ namespace doris { ColumnPredicate* BloomFilterColumnPredicateFactory::create_column_predicate( - uint32_t column_id, const std::shared_ptr<IBloomFilterFuncBase>& bloom_filter, + uint32_t column_id, const std::shared_ptr<BloomFilterFuncBase>& bloom_filter, FieldType type) { - std::shared_ptr<IBloomFilterFuncBase> filter; + std::shared_ptr<BloomFilterFuncBase> filter; switch (type) { #define M(NAME) \ case OLAP_FIELD_##NAME: { \ diff --git a/be/src/olap/bloom_filter_predicate.h b/be/src/olap/bloom_filter_predicate.h index fd121302de..392fa5a438 100644 --- a/be/src/olap/bloom_filter_predicate.h +++ b/be/src/olap/bloom_filter_predicate.h @@ -19,9 +19,8 @@ #include <stdint.h> -#include <roaring/roaring.hh> - #include "exprs/bloomfilter_predicate.h" +#include "exprs/runtime_filter.h" #include "olap/column_predicate.h" #include "vec/columns/column_dictionary.h" #include "vec/columns/column_nullable.h" @@ -34,10 +33,10 @@ namespace doris { template <PrimitiveType T> class BloomFilterColumnPredicate : public ColumnPredicate { public: - using SpecificFilter = BloomFilterFunc<T, CurrentBloomFilterAdaptor>; + using SpecificFilter = BloomFilterFunc<T>; BloomFilterColumnPredicate(uint32_t column_id, - const std::shared_ptr<IBloomFilterFuncBase>& filter) + const std::shared_ptr<BloomFilterFuncBase>& filter) : ColumnPredicate(column_id), _filter(filter), _specific_filter(static_cast<SpecificFilter*>(_filter.get())) {} @@ -81,6 +80,12 @@ private: new_size += _specific_filter->find_uint32_t(dict_col->get_hash_value(idx)); } } + } else if (IRuntimeFilter::enable_use_batch(T)) { + new_size = _specific_filter->find_fixed_len_olap_engine( + (char*)reinterpret_cast<const vectorized::PredicateColumnType<T>*>(&column) + ->get_data() + .data(), + null_map, sel, size); } else { uint24_t tmp_uint24_value; auto get_cell_value = [&tmp_uint24_value](auto& data) { @@ -113,7 +118,7 @@ private: return new_size; } - std::shared_ptr<IBloomFilterFuncBase> _filter; + std::shared_ptr<BloomFilterFuncBase> _filter; SpecificFilter* _specific_filter; // owned by _filter mutable uint64_t _evaluated_rows = 1; mutable uint64_t _passed_rows = 0; @@ -174,8 +179,7 @@ uint16_t BloomFilterColumnPredicate<T>::evaluate(const vectorized::IColumn& colu class BloomFilterColumnPredicateFactory { public: static ColumnPredicate* create_column_predicate( - uint32_t column_id, const std::shared_ptr<IBloomFilterFuncBase>& filter, - FieldType type); + uint32_t column_id, const std::shared_ptr<BloomFilterFuncBase>& filter, FieldType type); }; } //namespace doris diff --git a/be/src/olap/reader.cpp b/be/src/olap/reader.cpp index aaec382cb0..c757906d8d 100644 --- a/be/src/olap/reader.cpp +++ b/be/src/olap/reader.cpp @@ -464,7 +464,7 @@ void TabletReader::_init_conditions_param(const ReaderParams& read_params) { } ColumnPredicate* TabletReader::_parse_to_predicate( - const std::pair<std::string, std::shared_ptr<IBloomFilterFuncBase>>& bloom_filter) { + const std::pair<std::string, std::shared_ptr<BloomFilterFuncBase>>& bloom_filter) { int32_t index = _tablet_schema->field_index(bloom_filter.first); if (index < 0) { return nullptr; diff --git a/be/src/olap/reader.h b/be/src/olap/reader.h index ae476e4fa2..7c5dc32fcf 100644 --- a/be/src/olap/reader.h +++ b/be/src/olap/reader.h @@ -76,7 +76,7 @@ public: bool end_key_include = false; std::vector<TCondition> conditions; - std::vector<std::pair<string, std::shared_ptr<IBloomFilterFuncBase>>> bloom_filters; + std::vector<std::pair<string, std::shared_ptr<BloomFilterFuncBase>>> bloom_filters; std::vector<FunctionFilter> function_filters; std::vector<RowsetMetaSharedPtr> delete_predicates; @@ -166,7 +166,7 @@ protected: void _init_conditions_param(const ReaderParams& read_params); ColumnPredicate* _parse_to_predicate( - const std::pair<std::string, std::shared_ptr<IBloomFilterFuncBase>>& bloom_filter); + const std::pair<std::string, std::shared_ptr<BloomFilterFuncBase>>& bloom_filter); virtual ColumnPredicate* _parse_to_predicate(const FunctionFilter& function_filter); diff --git a/be/src/runtime/primitive_type.cpp b/be/src/runtime/primitive_type.cpp index 61ee8bb0ca..9c7a771e59 100644 --- a/be/src/runtime/primitive_type.cpp +++ b/be/src/runtime/primitive_type.cpp @@ -21,6 +21,7 @@ #include "gen_cpp/Types_types.h" #include "runtime/collection_value.h" +#include "runtime/define_primitive_type.h" #include "runtime/jsonb_value.h" #include "runtime/string_value.h" @@ -132,6 +133,15 @@ bool is_string_type(PrimitiveType type) { return type == TYPE_CHAR || type == TYPE_VARCHAR || type == TYPE_STRING; } +bool is_float_or_double(PrimitiveType type) { + return type == TYPE_FLOAT || type == TYPE_DOUBLE; +} + +bool is_int_or_bool(PrimitiveType type) { + return type == TYPE_BOOLEAN || type == TYPE_TINYINT || type == TYPE_SMALLINT || + type == TYPE_INT || type == TYPE_BIGINT || type == TYPE_LARGEINT; +} + bool has_variable_type(PrimitiveType type) { return type == TYPE_CHAR || type == TYPE_VARCHAR || type == TYPE_OBJECT || type == TYPE_QUANTILE_STATE || type == TYPE_STRING; diff --git a/be/src/runtime/primitive_type.h b/be/src/runtime/primitive_type.h index 18d7aa6605..c00ef24805 100644 --- a/be/src/runtime/primitive_type.h +++ b/be/src/runtime/primitive_type.h @@ -40,6 +40,8 @@ PrimitiveType convert_type_to_primitive(FunctionContext::Type type); bool is_enumeration_type(PrimitiveType type); bool is_date_type(PrimitiveType type); +bool is_float_or_double(PrimitiveType type); +bool is_int_or_bool(PrimitiveType type); bool is_string_type(PrimitiveType type); bool has_variable_type(PrimitiveType type); diff --git a/be/src/runtime/row_batch.cpp b/be/src/runtime/row_batch.cpp index 79cfe1a946..b910d2c9b8 100644 --- a/be/src/runtime/row_batch.cpp +++ b/be/src/runtime/row_batch.cpp @@ -78,8 +78,7 @@ RowBatch::RowBatch(const RowDescriptor& row_desc, const PRowBatch& input_batch) _num_tuples_per_row(input_batch.row_tuples_size()), _row_desc(row_desc), _auxiliary_mem_usage(0), - _need_to_return(false), - _tuple_data_pool() { + _need_to_return(false) { _tuple_ptrs_size = _num_rows * _num_tuples_per_row * sizeof(Tuple*); DCHECK_GT(_tuple_ptrs_size, 0); _tuple_ptrs = (Tuple**)(malloc(_tuple_ptrs_size)); diff --git a/be/src/util/hash_util.hpp b/be/src/util/hash_util.hpp index 845ee754a7..330a35d028 100644 --- a/be/src/util/hash_util.hpp +++ b/be/src/util/hash_util.hpp @@ -45,6 +45,14 @@ namespace doris { // Utility class to compute hash values. class HashUtil { public: + template <typename T> + static uint32_t fixed_len_to_uint32(T value) { + if constexpr (sizeof(T) <= sizeof(uint32_t)) { + return value; + } + return std::hash<T>()(value); + } + static uint32_t zlib_crc_hash(const void* data, int32_t bytes, uint32_t hash) { return crc32(hash, (const unsigned char*)data, bytes); } diff --git a/be/src/vec/columns/column.h b/be/src/vec/columns/column.h index 0459ef4298..ab6c103e86 100644 --- a/be/src/vec/columns/column.h +++ b/be/src/vec/columns/column.h @@ -299,6 +299,7 @@ public: /// This is for calculating the memory size for vectorized serialization of aggregation keys. virtual size_t get_max_row_byte_size() const { LOG(FATAL) << "get_max_row_byte_size not supported"; + return 0; } virtual void serialize_vec(std::vector<StringRef>& keys, size_t num_rows, diff --git a/be/src/vec/data_types/data_type.h b/be/src/vec/data_types/data_type.h index 1e7f8a7af0..23aa6d7d4a 100644 --- a/be/src/vec/data_types/data_type.h +++ b/be/src/vec/data_types/data_type.h @@ -279,6 +279,7 @@ struct WhichDataType { bool is_int() const { return is_int8() || is_int16() || is_int32() || is_int64() || is_int128(); } + bool is_int_or_uint() const { return is_int() || is_uint(); } bool is_native_int() const { return is_int8() || is_int16() || is_int32() || is_int64(); } bool is_decimal32() const { return idx == TypeIndex::Decimal32; } diff --git a/be/src/vec/exec/scan/new_olap_scanner.cpp b/be/src/vec/exec/scan/new_olap_scanner.cpp index d6fe854e63..ea07536a76 100644 --- a/be/src/vec/exec/scan/new_olap_scanner.cpp +++ b/be/src/vec/exec/scan/new_olap_scanner.cpp @@ -36,7 +36,7 @@ NewOlapScanner::NewOlapScanner(RuntimeState* state, NewOlapScanNode* parent, int Status NewOlapScanner::prepare( const TPaloScanRange& scan_range, const std::vector<OlapScanRange*>& key_ranges, VExprContext** vconjunct_ctx_ptr, const std::vector<TCondition>& filters, - const std::vector<std::pair<string, std::shared_ptr<IBloomFilterFuncBase>>>& bloom_filters, + const std::vector<std::pair<string, std::shared_ptr<BloomFilterFuncBase>>>& bloom_filters, const std::vector<FunctionFilter>& function_filters) { SCOPED_CONSUME_MEM_TRACKER(_mem_tracker); @@ -130,7 +130,7 @@ Status NewOlapScanner::open(RuntimeState* state) { // it will be called under tablet read lock because capture rs readers need Status NewOlapScanner::_init_tablet_reader_params( const std::vector<OlapScanRange*>& key_ranges, const std::vector<TCondition>& filters, - const std::vector<std::pair<string, std::shared_ptr<IBloomFilterFuncBase>>>& bloom_filters, + const std::vector<std::pair<string, std::shared_ptr<BloomFilterFuncBase>>>& bloom_filters, const std::vector<FunctionFilter>& function_filters) { // if the table with rowset [0-x] or [0-1] [2-y], and [0-1] is empty bool single_version = diff --git a/be/src/vec/exec/scan/new_olap_scanner.h b/be/src/vec/exec/scan/new_olap_scanner.h index 7e784d69fc..41b2888879 100644 --- a/be/src/vec/exec/scan/new_olap_scanner.h +++ b/be/src/vec/exec/scan/new_olap_scanner.h @@ -43,7 +43,7 @@ public: public: Status prepare(const TPaloScanRange& scan_range, const std::vector<OlapScanRange*>& key_ranges, VExprContext** vconjunct_ctx_ptr, const std::vector<TCondition>& filters, - const std::vector<std::pair<string, std::shared_ptr<IBloomFilterFuncBase>>>& + const std::vector<std::pair<string, std::shared_ptr<BloomFilterFuncBase>>>& bloom_filters, const std::vector<FunctionFilter>& function_filters); @@ -58,7 +58,7 @@ private: Status _init_tablet_reader_params( const std::vector<OlapScanRange*>& key_ranges, const std::vector<TCondition>& filters, - const std::vector<std::pair<string, std::shared_ptr<IBloomFilterFuncBase>>>& + const std::vector<std::pair<string, std::shared_ptr<BloomFilterFuncBase>>>& bloom_filters, const std::vector<FunctionFilter>& function_filters); diff --git a/be/src/vec/exec/scan/vscan_node.h b/be/src/vec/exec/scan/vscan_node.h index 4f08192cd0..fa51dd393f 100644 --- a/be/src/vec/exec/scan/vscan_node.h +++ b/be/src/vec/exec/scan/vscan_node.h @@ -183,7 +183,7 @@ protected: // Save all bloom filter predicates which may be pushed down to data source. // column name -> bloom filter function - std::vector<std::pair<std::string, std::shared_ptr<IBloomFilterFuncBase>>> + std::vector<std::pair<std::string, std::shared_ptr<BloomFilterFuncBase>>> _bloom_filters_push_down; // Save all function predicates which may be pushed down to data source. diff --git a/be/src/vec/exec/volap_scan_node.h b/be/src/vec/exec/volap_scan_node.h index c8ab58fae7..44b3829402 100644 --- a/be/src/vec/exec/volap_scan_node.h +++ b/be/src/vec/exec/volap_scan_node.h @@ -168,7 +168,7 @@ private: // push down bloom filters to storage engine. // 1. std::pair.first :: column name // 2. std::pair.second :: shared_ptr of BloomFilterFuncBase - std::vector<std::pair<std::string, std::shared_ptr<IBloomFilterFuncBase>>> + std::vector<std::pair<std::string, std::shared_ptr<BloomFilterFuncBase>>> _bloom_filters_push_down; // push down functions to storage engine diff --git a/be/src/vec/exec/volap_scanner.cpp b/be/src/vec/exec/volap_scanner.cpp index fc082a6313..5761705a84 100644 --- a/be/src/vec/exec/volap_scanner.cpp +++ b/be/src/vec/exec/volap_scanner.cpp @@ -45,7 +45,7 @@ VOlapScanner::VOlapScanner(RuntimeState* runtime_state, VOlapScanNode* parent, b Status VOlapScanner::prepare( const TPaloScanRange& scan_range, const std::vector<OlapScanRange*>& key_ranges, const std::vector<TCondition>& filters, - const std::vector<std::pair<string, std::shared_ptr<IBloomFilterFuncBase>>>& bloom_filters, + const std::vector<std::pair<string, std::shared_ptr<BloomFilterFuncBase>>>& bloom_filters, const std::vector<FunctionFilter>& function_filters) { SCOPED_CONSUME_MEM_TRACKER(_mem_tracker); set_tablet_reader(); @@ -154,7 +154,7 @@ TabletStorageType VOlapScanner::get_storage_type() { // it will be called under tablet read lock because capture rs readers need Status VOlapScanner::_init_tablet_reader_params( const std::vector<OlapScanRange*>& key_ranges, const std::vector<TCondition>& filters, - const std::vector<std::pair<string, std::shared_ptr<IBloomFilterFuncBase>>>& bloom_filters, + const std::vector<std::pair<string, std::shared_ptr<BloomFilterFuncBase>>>& bloom_filters, const std::vector<FunctionFilter>& function_filters) { // if the table with rowset [0-x] or [0-1] [2-y], and [0-1] is empty bool single_version = diff --git a/be/src/vec/exec/volap_scanner.h b/be/src/vec/exec/volap_scanner.h index 2e21dc5777..d3cd791b63 100644 --- a/be/src/vec/exec/volap_scanner.h +++ b/be/src/vec/exec/volap_scanner.h @@ -41,7 +41,7 @@ public: Status prepare(const TPaloScanRange& scan_range, const std::vector<OlapScanRange*>& key_ranges, const std::vector<TCondition>& filters, - const std::vector<std::pair<std::string, std::shared_ptr<IBloomFilterFuncBase>>>& + const std::vector<std::pair<std::string, std::shared_ptr<BloomFilterFuncBase>>>& bloom_filters, const std::vector<FunctionFilter>& function_filters); @@ -94,7 +94,7 @@ public: private: Status _init_tablet_reader_params( const std::vector<OlapScanRange*>& key_ranges, const std::vector<TCondition>& filters, - const std::vector<std::pair<string, std::shared_ptr<IBloomFilterFuncBase>>>& + const std::vector<std::pair<string, std::shared_ptr<BloomFilterFuncBase>>>& bloom_filters, const std::vector<FunctionFilter>& function_filters); Status _init_return_columns(bool need_seq_col); diff --git a/be/src/vec/exprs/vbloom_predicate.cpp b/be/src/vec/exprs/vbloom_predicate.cpp index dc3be41027..6ba96eb4f0 100644 --- a/be/src/vec/exprs/vbloom_predicate.cpp +++ b/be/src/vec/exprs/vbloom_predicate.cpp @@ -17,6 +17,7 @@ #include "vec/exprs/vbloom_predicate.h" +#include <cstdint> #include <string_view> #include "common/status.h" @@ -71,13 +72,24 @@ Status VBloomPredicate::execute(VExprContext* context, Block* block, int* result size_t sz = argument_column->size(); res_data_column->resize(sz); auto ptr = ((ColumnVector<UInt8>*)res_data_column.get())->get_data().data(); - if (WhichDataType(remove_nullable(block->get_by_position(arguments[0]).type)) - .is_string_or_fixed_string()) { + auto type = WhichDataType(remove_nullable(block->get_by_position(arguments[0]).type)); + if (type.is_string_or_fixed_string()) { for (size_t i = 0; i < sz; i++) { auto ele = argument_column->get_data_at(i); const StringValue v(ele.data, ele.size); ptr[i] = _filter->find(reinterpret_cast<const void*>(&v)); } + } else if (type.is_int_or_uint() || type.is_float()) { + if (argument_column->is_nullable()) { + auto column_nested = reinterpret_cast<const ColumnNullable*>(argument_column.get()) + ->get_nested_column_ptr(); + auto column_nullmap = reinterpret_cast<const ColumnNullable*>(argument_column.get()) + ->get_null_map_column_ptr(); + _filter->find_fixed_len(column_nested->get_raw_data().data, + (uint8*)column_nullmap->get_raw_data().data, sz, ptr); + } else { + _filter->find_fixed_len(argument_column->get_raw_data().data, nullptr, sz, ptr); + } } else { for (size_t i = 0; i < sz; i++) { ptr[i] = _filter->find( @@ -98,7 +110,7 @@ Status VBloomPredicate::execute(VExprContext* context, Block* block, int* result const std::string& VBloomPredicate::expr_name() const { return _expr_name; } -void VBloomPredicate::set_filter(std::shared_ptr<IBloomFilterFuncBase>& filter) { +void VBloomPredicate::set_filter(std::shared_ptr<BloomFilterFuncBase>& filter) { _filter = filter; } } // namespace doris::vectorized \ No newline at end of file diff --git a/be/src/vec/exprs/vbloom_predicate.h b/be/src/vec/exprs/vbloom_predicate.h index b4e9f54a31..ad0f15820e 100644 --- a/be/src/vec/exprs/vbloom_predicate.h +++ b/be/src/vec/exprs/vbloom_predicate.h @@ -37,12 +37,12 @@ public: return pool->add(new VBloomPredicate(*this)); } const std::string& expr_name() const override; - void set_filter(std::shared_ptr<IBloomFilterFuncBase>& filter); + void set_filter(std::shared_ptr<BloomFilterFuncBase>& filter); - std::shared_ptr<IBloomFilterFuncBase> get_bloom_filter_func() const override { return _filter; } + std::shared_ptr<BloomFilterFuncBase> get_bloom_filter_func() const override { return _filter; } private: - std::shared_ptr<IBloomFilterFuncBase> _filter; + std::shared_ptr<BloomFilterFuncBase> _filter; std::string _expr_name; }; } // namespace doris::vectorized \ No newline at end of file diff --git a/be/src/vec/exprs/vexpr.h b/be/src/vec/exprs/vexpr.h index e0e0defac4..0a82877b93 100644 --- a/be/src/vec/exprs/vexpr.h +++ b/be/src/vec/exprs/vexpr.h @@ -159,7 +159,7 @@ public: virtual const VExpr* get_impl() const { return nullptr; } // If this expr is a BloomPredicate, this method will return a BloomFilterFunc - virtual std::shared_ptr<IBloomFilterFuncBase> get_bloom_filter_func() const { + virtual std::shared_ptr<BloomFilterFuncBase> get_bloom_filter_func() const { LOG(FATAL) << "Method 'get_bloom_filter_func()' is not supported in expression: " << this->debug_string(); return nullptr; diff --git a/be/test/exprs/bloom_filter_predicate_test.cpp b/be/test/exprs/bloom_filter_predicate_test.cpp index abedc9a59f..3f2fec11bc 100644 --- a/be/test/exprs/bloom_filter_predicate_test.cpp +++ b/be/test/exprs/bloom_filter_predicate_test.cpp @@ -31,7 +31,7 @@ public: }; TEST_F(BloomFilterPredicateTest, bloom_filter_func_int_test) { - std::unique_ptr<IBloomFilterFuncBase> func(create_bloom_filter(PrimitiveType::TYPE_INT)); + std::unique_ptr<BloomFilterFuncBase> func(create_bloom_filter(PrimitiveType::TYPE_INT)); EXPECT_TRUE(func->init(1024, 0.05).ok()); const int data_size = 1024; int data[data_size]; @@ -51,7 +51,7 @@ TEST_F(BloomFilterPredicateTest, bloom_filter_func_int_test) { } TEST_F(BloomFilterPredicateTest, bloom_filter_func_stringval_test) { - std::unique_ptr<IBloomFilterFuncBase> func(create_bloom_filter(PrimitiveType::TYPE_VARCHAR)); + std::unique_ptr<BloomFilterFuncBase> func(create_bloom_filter(PrimitiveType::TYPE_VARCHAR)); EXPECT_TRUE(func->init(1024, 0.05).ok()); ObjectPool obj_pool; const int data_size = 1024; @@ -100,7 +100,7 @@ TEST_F(BloomFilterPredicateTest, bloom_filter_func_stringval_test) { } TEST_F(BloomFilterPredicateTest, bloom_filter_size_test) { - std::unique_ptr<IBloomFilterFuncBase> func(create_bloom_filter(PrimitiveType::TYPE_VARCHAR)); + std::unique_ptr<BloomFilterFuncBase> func(create_bloom_filter(PrimitiveType::TYPE_VARCHAR)); int length = 4096; func->init_with_fixed_length(4096); char* data = nullptr; diff --git a/be/test/olap/bloom_filter_column_predicate_test.cpp b/be/test/olap/bloom_filter_column_predicate_test.cpp index 79ba5af940..cc98c90698 100644 --- a/be/test/olap/bloom_filter_column_predicate_test.cpp +++ b/be/test/olap/bloom_filter_column_predicate_test.cpp @@ -27,6 +27,7 @@ #include "runtime/mem_pool.h" #include "runtime/string_value.hpp" #include "vec/columns/column_nullable.h" +#include "vec/columns/columns_number.h" #include "vec/columns/predicate_column.h" #include "vec/core/block.h" @@ -78,7 +79,7 @@ TEST_F(TestBloomFilterColumnPredicate, FLOAT_COLUMN) { return_columns.push_back(i); } - std::shared_ptr<IBloomFilterFuncBase> bloom_filter( + std::shared_ptr<BloomFilterFuncBase> bloom_filter( create_bloom_filter(PrimitiveType::TYPE_FLOAT)); bloom_filter->init(4096, 0.05); @@ -90,7 +91,6 @@ TEST_F(TestBloomFilterColumnPredicate, FLOAT_COLUMN) { bloom_filter->insert(reinterpret_cast<void*>(&value)); ColumnPredicate* pred = BloomFilterColumnPredicateFactory::create_column_predicate( 0, bloom_filter, OLAP_FIELD_TYPE_FLOAT); - auto* col_data = reinterpret_cast<float*>(_mem_pool->allocate(size * sizeof(float))); // for ColumnBlock no null init_row_block(tablet_schema, size); @@ -123,6 +123,31 @@ TEST_F(TestBloomFilterColumnPredicate, FLOAT_COLUMN) { EXPECT_EQ(select_size, 1); EXPECT_FLOAT_EQ(*(float*)col_block.cell(_row_block->selection_vector()[0]).cell_ptr(), 5.1); + delete pred; +} + +TEST_F(TestBloomFilterColumnPredicate, FLOAT_COLUMN_VEC) { + TabletSchemaSPtr tablet_schema = std::make_shared<TabletSchema>(); + SetTabletSchema(std::string("FLOAT_COLUMN"), "FLOAT", "REPLACE", 1, true, true, tablet_schema); + const int size = 10; + std::vector<uint32_t> return_columns; + for (int i = 0; i < tablet_schema->num_columns(); ++i) { + return_columns.push_back(i); + } + + std::shared_ptr<BloomFilterFuncBase> bloom_filter( + create_bloom_filter(PrimitiveType::TYPE_FLOAT)); + + bloom_filter->init(4096, 0.05); + auto column_data = ColumnFloat32::create(); + float values[3] = {4.1, 5.1, 6.1}; + int offsets[3] = {0, 1, 2}; + + bloom_filter->insert_fixed_len((char*)values, offsets, 3); + ColumnPredicate* pred = BloomFilterColumnPredicateFactory::create_column_predicate( + 0, bloom_filter, OLAP_FIELD_TYPE_FLOAT); + auto* col_data = reinterpret_cast<float*>(_mem_pool->allocate(size * sizeof(float))); + // for vectorized::Block no null auto pred_col = PredicateColumnType<TYPE_FLOAT>::create(); pred_col->reserve(size); @@ -130,8 +155,9 @@ TEST_F(TestBloomFilterColumnPredicate, FLOAT_COLUMN) { *(col_data + i) = i + 0.1f; pred_col->insert_data(reinterpret_cast<const char*>(col_data + i), 0); } + init_row_block(tablet_schema, size); _row_block->clear(); - select_size = _row_block->selected_size(); + auto select_size = _row_block->selected_size(); select_size = pred->evaluate(*pred_col, _row_block->selection_vector(), select_size); EXPECT_EQ(select_size, 3); EXPECT_FLOAT_EQ((float)pred_col->get_data()[_row_block->selection_vector()[0]], 4.1); @@ -156,5 +182,4 @@ TEST_F(TestBloomFilterColumnPredicate, FLOAT_COLUMN) { delete pred; } - } // namespace doris --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org For additional commands, e-mail: commits-h...@doris.apache.org