This is an automated email from the ASF dual-hosted git repository. panxiaolei pushed a commit to branch refactor_rf in repository https://gitbox.apache.org/repos/asf/doris.git
commit 9db838121b6a802660cbd9947131aaa4b4fde30b Author: Gabriel <liwenqi...@selectdb.com> AuthorDate: Mon Mar 3 17:33:30 2025 +0800 [runtime filter](UT) test in filter (#48562) --- be/src/exprs/bloom_filter_func.h | 5 +- be/src/exprs/filter_base.h | 8 +- be/src/exprs/hybrid_set.h | 11 + be/src/runtime_filter/runtime_filter.h | 4 +- be/src/runtime_filter/runtime_filter_definitions.h | 9 +- be/src/runtime_filter/runtime_filter_producer.h | 6 +- .../runtime_filter_producer_helper.cpp | 7 +- .../runtime_filter_producer_helper.h | 2 +- .../runtime_filter_producer_helper_cross.h | 2 +- be/src/runtime_filter/runtime_filter_wrapper.cpp | 71 +++++- be/src/runtime_filter/runtime_filter_wrapper.h | 46 +--- .../runtime_filter/runtime_filter_wrapper_test.cpp | 239 +++++++++++++++++++-- 12 files changed, 324 insertions(+), 86 deletions(-) diff --git a/be/src/exprs/bloom_filter_func.h b/be/src/exprs/bloom_filter_func.h index 64123250e48..774cf42e1e6 100644 --- a/be/src/exprs/bloom_filter_func.h +++ b/be/src/exprs/bloom_filter_func.h @@ -42,9 +42,6 @@ public: _enable_fixed_len_to_uint32_v2 = params->enable_fixed_len_to_uint32_v2; _limit_length(); } - - bool build_bf_by_runtime_size() const { return _build_bf_by_runtime_size; } - Status init_with_fixed_length(size_t runtime_size) { if (_build_bf_by_runtime_size) { // Use the same algorithm as org.apache.doris.planner.RuntimeFilter#calculateFilterSize @@ -74,6 +71,8 @@ public: return Status::OK(); } + bool build_bf_by_runtime_size() const { return _build_bf_by_runtime_size; } + Status merge(BloomFilterFuncBase* other) { if (other == nullptr) { return Status::InternalError("bloomfilter_func is nullptr"); diff --git a/be/src/exprs/filter_base.h b/be/src/exprs/filter_base.h index 5534dc75589..25778059886 100644 --- a/be/src/exprs/filter_base.h +++ b/be/src/exprs/filter_base.h @@ -26,13 +26,7 @@ public: FilterBase(bool null_aware) : _null_aware(null_aware) {} bool contain_null() const { return _null_aware && _contain_null; } - void set_contain_null(bool contain_null) { - if (_contain_null && !contain_null) { - throw Exception(ErrorCode::INTERNAL_ERROR, - "contain_null cannot be changed from true to false"); - } - _contain_null = contain_null; - } + void set_contain_null(bool contain_null) { _contain_null |= contain_null; } protected: // Indicates whether a null datum exists to build this filter. diff --git a/be/src/exprs/hybrid_set.h b/be/src/exprs/hybrid_set.h index 1358c479dca..094150f3de0 100644 --- a/be/src/exprs/hybrid_set.h +++ b/be/src/exprs/hybrid_set.h @@ -148,6 +148,11 @@ public: Iterator begin() { return Iterator(_data, 0); } Iterator end() { return Iterator(_data, _size); } + void clear() { + std::array<T, N> {}.swap(_data); + _size = 0; + } + private: std::array<T, N> _data; size_t _size {}; @@ -179,6 +184,8 @@ public: bool find(const T& value) const { return _set.contains(value); } + void clear() { _set.clear(); } + Iterator begin() { return _set.begin(); } Iterator end() { return _set.end(); } @@ -210,6 +217,7 @@ public: _contain_null |= set->_contain_null; } + virtual void clear() = 0; bool empty() { return !_contain_null && size() == 0; } virtual int size() = 0; virtual bool find(const void* data) const = 0; @@ -262,6 +270,7 @@ public: } _set.insert(*reinterpret_cast<const ElementType*>(data)); } + void clear() override { _set.clear(); } void insert(void* data, size_t /*unused*/) override { insert(data); } @@ -390,6 +399,7 @@ public: ~StringSet() override = default; + void clear() override { _set.clear(); } void insert(const void* data) override { if (data == nullptr) { _contain_null = true; @@ -558,6 +568,7 @@ public: StringValueSet(bool null_aware) : HybridSetBase(null_aware) {} ~StringValueSet() override = default; + void clear() override { _set.clear(); } void insert(const void* data) override { if (data == nullptr) { diff --git a/be/src/runtime_filter/runtime_filter.h b/be/src/runtime_filter/runtime_filter.h index dc2fbd20d2e..660f0ad88ce 100644 --- a/be/src/runtime_filter/runtime_filter.h +++ b/be/src/runtime_filter/runtime_filter.h @@ -95,10 +95,10 @@ protected: template <typename T> void _to_protobuf(T* filter) { - _wrapper->_to_protobuf(filter); + _wrapper->to_protobuf(filter); } void _to_protobuf(PBloomFilter* filter, char** data, int* filter_length) { - _wrapper->_to_protobuf(filter, data, filter_length); + _wrapper->to_protobuf(filter, data, filter_length); } Status _push_to_remote(RuntimeState* state, const TNetworkAddress* addr); diff --git a/be/src/runtime_filter/runtime_filter_definitions.h b/be/src/runtime_filter/runtime_filter_definitions.h index ed732b99532..693e62beea9 100644 --- a/be/src/runtime_filter/runtime_filter_definitions.h +++ b/be/src/runtime_filter/runtime_filter_definitions.h @@ -42,11 +42,14 @@ struct RuntimeFilterParams { RuntimeFilterType filter_type {}; // Data type of build column PrimitiveType column_return_type {}; + // Whether this runtime filter is null-aware bool null_aware {}; - bool enable_fixed_len_to_uint32_v2 {}; + // In filter // The max limitation of in-set int32_t max_in_num {}; + + // Bloom filter // The min size limitation of bloom filter int64_t runtime_bloom_filter_min_size {}; // The max size limitation of bloom filter @@ -57,7 +60,11 @@ struct RuntimeFilterParams { bool build_bf_by_runtime_size {}; // Whether an estimated size by NDV is used to build bloom filter bool bloom_filter_size_calculated_by_ndv {}; + // Whether an optimized way to build BF using fixed-length values + bool enable_fixed_len_to_uint32_v2 {}; + // Bitmap filter + // Whether a join expression is `not in` bool bitmap_filter_not_in {}; }; diff --git a/be/src/runtime_filter/runtime_filter_producer.h b/be/src/runtime_filter/runtime_filter_producer.h index 1bb7ac6530a..022fe6450d8 100644 --- a/be/src/runtime_filter/runtime_filter_producer.h +++ b/be/src/runtime_filter/runtime_filter_producer.h @@ -59,13 +59,13 @@ public: const std::shared_ptr<pipeline::CountedFinishDependency>& dependency); // insert data to build filter - void insert(vectorized::ColumnPtr column, size_t start) { + Status insert(vectorized::ColumnPtr column, size_t start) { if (_rf_state == State::READY_TO_PUBLISH || _rf_state == State::PUBLISHED) { DCHECK(!_wrapper->is_valid()); - return; + return Status::OK(); } _check_state({State::WAITING_FOR_DATA}); - _wrapper->insert(column, start); + return _wrapper->insert(column, start); } Status publish(RuntimeState* state, bool build_hash_table); diff --git a/be/src/runtime_filter/runtime_filter_producer_helper.cpp b/be/src/runtime_filter/runtime_filter_producer_helper.cpp index b68e1c00221..53038a7fa61 100644 --- a/be/src/runtime_filter/runtime_filter_producer_helper.cpp +++ b/be/src/runtime_filter/runtime_filter_producer_helper.cpp @@ -64,7 +64,7 @@ Status RuntimeFilterProducerHelper::_init_filters(RuntimeState* state, return Status::OK(); } -void RuntimeFilterProducerHelper::_insert(const vectorized::Block* block, size_t start) { +Status RuntimeFilterProducerHelper::_insert(const vectorized::Block* block, size_t start) { SCOPED_TIMER(_runtime_filter_compute_timer); for (int i = 0; i < _producers.size(); i++) { auto filter = _producers[i]; @@ -74,8 +74,9 @@ void RuntimeFilterProducerHelper::_insert(const vectorized::Block* block, size_t } int result_column_id = _filter_expr_contexts[i]->get_last_result_column_id(); const auto& column = block->get_by_position(result_column_id).column; - filter->insert(column, start); + RETURN_IF_ERROR(filter->insert(column, start)); } + return Status::OK(); } Status RuntimeFilterProducerHelper::_publish(RuntimeState* state) { @@ -103,7 +104,7 @@ Status RuntimeFilterProducerHelper::process( uint64_t hash_table_size = block ? block->rows() : 0; RETURN_IF_ERROR(_init_filters(state, hash_table_size)); if (hash_table_size > 1) { - _insert(block, 1); + RETURN_IF_ERROR(_insert(block, 1)); } } diff --git a/be/src/runtime_filter/runtime_filter_producer_helper.h b/be/src/runtime_filter/runtime_filter_producer_helper.h index e85a5fe008a..f8b7d985f50 100644 --- a/be/src/runtime_filter/runtime_filter_producer_helper.h +++ b/be/src/runtime_filter/runtime_filter_producer_helper.h @@ -65,7 +65,7 @@ protected: virtual void _init_expr(const vectorized::VExprContextSPtrs& build_expr_ctxs, const std::vector<TRuntimeFilterDesc>& runtime_filter_descs); Status _init_filters(RuntimeState* state, uint64_t local_hash_table_size); - void _insert(const vectorized::Block* block, size_t start); + Status _insert(const vectorized::Block* block, size_t start); Status _publish(RuntimeState* state); std::vector<std::shared_ptr<RuntimeFilterProducer>> _producers; diff --git a/be/src/runtime_filter/runtime_filter_producer_helper_cross.h b/be/src/runtime_filter/runtime_filter_producer_helper_cross.h index 2c6cd4e8234..a7525fba4df 100644 --- a/be/src/runtime_filter/runtime_filter_producer_helper_cross.h +++ b/be/src/runtime_filter/runtime_filter_producer_helper_cross.h @@ -56,7 +56,7 @@ private: block->get_by_position(result_column_id) .column->convert_to_full_column_if_const(); } - _insert(block, 0); + RETURN_IF_ERROR(_insert(block, 0)); return Status::OK(); } diff --git a/be/src/runtime_filter/runtime_filter_wrapper.cpp b/be/src/runtime_filter/runtime_filter_wrapper.cpp index 99889ce2b72..c3c5217d775 100644 --- a/be/src/runtime_filter/runtime_filter_wrapper.cpp +++ b/be/src/runtime_filter/runtime_filter_wrapper.cpp @@ -89,10 +89,18 @@ Status RuntimeFilterWrapper::init(const size_t real_size) { return Status::OK(); } -void RuntimeFilterWrapper::insert(const vectorized::ColumnPtr& column, size_t start) { +Status RuntimeFilterWrapper::insert(const vectorized::ColumnPtr& column, size_t start) { switch (_filter_type) { case RuntimeFilterType::IN_FILTER: { _hybrid_set->insert_fixed_len(column, start); + if (_hybrid_set->size() > _max_in_num) [[unlikely]] { + _hybrid_set->clear(); + set_state(State::DISABLED, fmt::format("reach max in num: {}", _max_in_num)); + return Status::InternalError( + "Size of in set with actual size {} should be less than the limitation {} in " + "runtime filter {}.", + _hybrid_set->size(), _max_in_num, _filter_id); + } break; } case RuntimeFilterType::MIN_FILTER: @@ -140,6 +148,7 @@ void RuntimeFilterWrapper::insert(const vectorized::ColumnPtr& column, size_t st DCHECK(false); break; } + return Status::OK(); } bool RuntimeFilterWrapper::build_bf_by_runtime_size() const { @@ -147,11 +156,12 @@ bool RuntimeFilterWrapper::build_bf_by_runtime_size() const { } Status RuntimeFilterWrapper::merge(const RuntimeFilterWrapper* other) { + if (other->_state == State::IGNORED || _state == State::DISABLED) { + return Status::OK(); + } if (other->_state == State::DISABLED) { + _hybrid_set->clear(); set_state(State::DISABLED, other->_disabled_reason); - } - - if (other->_state == State::IGNORED || _state == State::DISABLED) { return Status::OK(); } @@ -165,7 +175,8 @@ Status RuntimeFilterWrapper::merge(const RuntimeFilterWrapper* other) { switch (_filter_type) { case RuntimeFilterType::IN_FILTER: { _hybrid_set->insert(other->_hybrid_set.get()); - if (_max_in_num >= 0 && _hybrid_set->size() >= _max_in_num) { + if (_max_in_num >= 0 && _hybrid_set->size() > _max_in_num) { + _hybrid_set->clear(); set_state(State::DISABLED, fmt::format("reach max in num: {}", _max_in_num)); } break; @@ -194,7 +205,7 @@ Status RuntimeFilterWrapper::merge(const RuntimeFilterWrapper* other) { // case2: all input-filter's build_bf_by_runtime_size is false, inited by default size if (other_filter_type == RuntimeFilterType::IN_FILTER) { _hybrid_set->insert(other->_hybrid_set.get()); - if (_max_in_num >= 0 && _hybrid_set->size() >= _max_in_num) { + if (_max_in_num >= 0 && _hybrid_set->size() > _max_in_num) { // case2: use default size to init bf RETURN_IF_ERROR(_bloom_filter_func->init_with_fixed_length(0)); RETURN_IF_ERROR(_change_to_bloom_filter()); @@ -600,20 +611,62 @@ std::string RuntimeFilterWrapper::debug_string() const { return result + "]"; } -void RuntimeFilterWrapper::_to_protobuf(PInFilter* filter) { +void RuntimeFilterWrapper::to_protobuf(PInFilter* filter) { filter->set_column_type(to_proto(column_type())); _hybrid_set->to_pb(filter); } -void RuntimeFilterWrapper::_to_protobuf(PMinMaxFilter* filter) { +void RuntimeFilterWrapper::to_protobuf(PMinMaxFilter* filter) { filter->set_column_type(to_proto(column_type())); _minmax_func->to_pb(filter); } -void RuntimeFilterWrapper::_to_protobuf(PBloomFilter* filter, char** data, int* filter_length) { +void RuntimeFilterWrapper::to_protobuf(PBloomFilter* filter, char** data, int* filter_length) { _bloom_filter_func->get_data(data, filter_length); filter->set_filter_length(*filter_length); filter->set_always_true(false); } +template <class T> +Status RuntimeFilterWrapper::assign(const T& request, butil::IOBufAsZeroCopyInputStream* data) { + PFilterType filter_type = request.filter_type(); + + if (request.has_disabled() && request.disabled()) { + set_state(State::DISABLED, "get disabled from remote"); + return Status::OK(); + } + + if (request.has_ignored() && request.ignored()) { + set_state(State::IGNORED, "get ignored from remote"); + return Status::OK(); + } + + set_state(State::READY); + + switch (filter_type) { + case PFilterType::IN_FILTER: { + DCHECK(request.has_in_filter()); + return _assign(request.in_filter(), request.contain_null()); + } + case PFilterType::BLOOM_FILTER: { + DCHECK(request.has_bloom_filter()); + _hybrid_set.reset(); // change in_or_bloom filter to bloom filter + return _assign(request.bloom_filter(), data, request.contain_null()); + } + case PFilterType::MIN_FILTER: + case PFilterType::MAX_FILTER: + case PFilterType::MINMAX_FILTER: { + DCHECK(request.has_minmax_filter()); + return _assign(request.minmax_filter(), request.contain_null()); + } + default: + return Status::InternalError("unknown filter type {}", int(filter_type)); + } +} + +template Status RuntimeFilterWrapper::assign<doris::PMergeFilterRequest>( + doris::PMergeFilterRequest const&, butil::IOBufAsZeroCopyInputStream*); +template Status RuntimeFilterWrapper::assign<doris::PPublishFilterRequestV2>( + doris::PPublishFilterRequestV2 const&, butil::IOBufAsZeroCopyInputStream*); + } // namespace doris diff --git a/be/src/runtime_filter/runtime_filter_wrapper.h b/be/src/runtime_filter/runtime_filter_wrapper.h index baf02e92e3c..5695d9328d7 100644 --- a/be/src/runtime_filter/runtime_filter_wrapper.h +++ b/be/src/runtime_filter/runtime_filter_wrapper.h @@ -52,44 +52,10 @@ public: _max_in_num(max_in_num) {} Status init(const size_t runtime_size); - void insert(const vectorized::ColumnPtr& column, size_t start); + Status insert(const vectorized::ColumnPtr& column, size_t start); Status merge(const RuntimeFilterWrapper* wrapper); template <class T> - Status assign(const T& request, butil::IOBufAsZeroCopyInputStream* data) { - PFilterType filter_type = request.filter_type(); - - if (request.has_disabled() && request.disabled()) { - set_state(State::DISABLED, "get disabled from remote"); - return Status::OK(); - } - - if (request.has_ignored() && request.ignored()) { - set_state(State::IGNORED, "get ignored from remote"); - return Status::OK(); - } - - set_state(State::READY); - - switch (filter_type) { - case PFilterType::IN_FILTER: { - DCHECK(request.has_in_filter()); - return _assign(request.in_filter(), request.contain_null()); - } - case PFilterType::BLOOM_FILTER: { - DCHECK(request.has_bloom_filter()); - _hybrid_set.reset(); // change in_or_bloom filter to bloom filter - return _assign(request.bloom_filter(), data, request.contain_null()); - } - case PFilterType::MIN_FILTER: - case PFilterType::MAX_FILTER: - case PFilterType::MINMAX_FILTER: { - DCHECK(request.has_minmax_filter()); - return _assign(request.minmax_filter(), request.contain_null()); - } - default: - return Status::InternalError("unknown filter type {}", int(filter_type)); - } - } + Status assign(const T& request, butil::IOBufAsZeroCopyInputStream* data); bool is_valid() const { return _state != State::DISABLED && _state != State::IGNORED; } int filter_id() const { return _filter_id; } @@ -110,6 +76,10 @@ public: std::shared_ptr<BloomFilterFuncBase> bloom_filter_func() const { return _bloom_filter_func; } std::shared_ptr<BitmapFilterFuncBase> bitmap_filter_func() const { return _bitmap_filter_func; } + void to_protobuf(PInFilter* filter); + void to_protobuf(PMinMaxFilter* filter); + void to_protobuf(PBloomFilter* filter, char** data, int* filter_length); + PrimitiveType column_type() const { return _column_return_type; } bool contain_null() const; @@ -148,16 +118,12 @@ public: } private: - friend class RuntimeFilter; // used by shuffle runtime filter // assign this filter by protobuf Status _assign(const PInFilter& in_filter, bool contain_null); Status _assign(const PBloomFilter& bloom_filter, butil::IOBufAsZeroCopyInputStream* data, bool contain_null); Status _assign(const PMinMaxFilter& minmax_filter, bool contain_null); - void _to_protobuf(PInFilter* filter); - void _to_protobuf(PMinMaxFilter* filter); - void _to_protobuf(PBloomFilter* filter, char** data, int* filter_length); Status _change_to_bloom_filter(); // When a runtime filter received from remote and it is a bloom filter, _column_return_type will be invalid. const PrimitiveType _column_return_type; // column type diff --git a/be/test/runtime_filter/runtime_filter_wrapper_test.cpp b/be/test/runtime_filter/runtime_filter_wrapper_test.cpp index 963bbfc223b..8cd3c1e32b6 100644 --- a/be/test/runtime_filter/runtime_filter_wrapper_test.cpp +++ b/be/test/runtime_filter/runtime_filter_wrapper_test.cpp @@ -20,6 +20,10 @@ #include <glog/logging.h> #include <gtest/gtest.h> +#include "exprs/hybrid_set.h" +#include "testutil/column_helper.h" +#include "vec/data_types/data_type_number.h" + namespace doris { class RuntimeFilterWrapperTest : public testing::Test { @@ -30,35 +34,238 @@ public: void TearDown() override {} }; -TEST_F(RuntimeFilterWrapperTest, basic) { +TEST_F(RuntimeFilterWrapperTest, TestIn) { + using DataType = vectorized::DataTypeInt32; int32_t filter_id = 0; RuntimeFilterType filter_type = RuntimeFilterType::IN_FILTER; + bool null_aware = true; PrimitiveType column_return_type = PrimitiveType::TYPE_INT; - int32_t max_in_num = 0; + + int32_t max_in_num = 2; + int64_t runtime_bloom_filter_min_size = 0; int64_t runtime_bloom_filter_max_size = 0; bool build_bf_by_runtime_size = true; int64_t bloom_filter_size = 0; bool bloom_filter_size_calculated_by_ndv = true; - bool null_aware = true; bool enable_fixed_len_to_uint32_v2 = true; + bool bitmap_filter_not_in = false; - RuntimeFilterParams params; - params.filter_id = filter_id; - params.filter_type = filter_type; - params.column_return_type = column_return_type; - params.max_in_num = max_in_num; - params.runtime_bloom_filter_min_size = runtime_bloom_filter_min_size; - params.runtime_bloom_filter_max_size = runtime_bloom_filter_max_size; - params.build_bf_by_runtime_size = build_bf_by_runtime_size; - params.bloom_filter_size_calculated_by_ndv = bloom_filter_size_calculated_by_ndv; - params.bloom_filter_size = bloom_filter_size; - params.null_aware = null_aware; - params.enable_fixed_len_to_uint32_v2 = enable_fixed_len_to_uint32_v2; - params.bitmap_filter_not_in = bitmap_filter_not_in; + PMergeFilterRequest valid_request; + RuntimeFilterParams params { + .filter_id = filter_id, + .filter_type = filter_type, + .column_return_type = column_return_type, + .null_aware = null_aware, + .max_in_num = max_in_num, + .runtime_bloom_filter_min_size = runtime_bloom_filter_min_size, + .runtime_bloom_filter_max_size = runtime_bloom_filter_max_size, + .bloom_filter_size = bloom_filter_size, + .build_bf_by_runtime_size = build_bf_by_runtime_size, + .bloom_filter_size_calculated_by_ndv = bloom_filter_size_calculated_by_ndv, + .enable_fixed_len_to_uint32_v2 = enable_fixed_len_to_uint32_v2, + .bitmap_filter_not_in = bitmap_filter_not_in}; auto wrapper = std::make_shared<RuntimeFilterWrapper>(¶ms); + EXPECT_EQ(wrapper->minmax_func(), nullptr); + EXPECT_EQ(wrapper->bloom_filter_func(), nullptr); + EXPECT_EQ(wrapper->bitmap_filter_func(), nullptr); + EXPECT_NE(wrapper->hybrid_set(), nullptr); + { + // Init + EXPECT_TRUE(wrapper->init(2).ok()); + EXPECT_EQ(wrapper->get_state(), RuntimeFilterWrapper::State::UNINITED); + + EXPECT_TRUE(wrapper->init(3).ok()); + EXPECT_EQ(wrapper->get_state(), RuntimeFilterWrapper::State::DISABLED); + + wrapper->_state = RuntimeFilterWrapper::State::UNINITED; + wrapper->_disabled_reason = ""; + } + { + // Insert + auto col = + vectorized::ColumnHelper::create_column<DataType>({0, 1, 2, 3, 4, 5, 6, 7, 8, 9}); + EXPECT_EQ(wrapper->insert(col, 0).code(), ErrorCode::INTERNAL_ERROR); + wrapper->_state = RuntimeFilterWrapper::State::UNINITED; + wrapper->_disabled_reason = ""; + + col = vectorized::ColumnHelper::create_column<DataType>({0}); + EXPECT_TRUE(wrapper->insert(col, 0).ok()); + EXPECT_EQ(wrapper->hybrid_set()->size(), col->size()); + EXPECT_EQ(wrapper->get_state(), RuntimeFilterWrapper::State::UNINITED); + } + { + // Merge 1 (valid filter) + auto another_wrapper = std::make_shared<RuntimeFilterWrapper>(¶ms); + EXPECT_TRUE(another_wrapper->init(2).ok()); + EXPECT_EQ(another_wrapper->get_state(), RuntimeFilterWrapper::State::UNINITED); + + auto col = vectorized::ColumnHelper::create_column<DataType>({1}); + EXPECT_TRUE(another_wrapper->insert(col, 0).ok()); + another_wrapper->_state = RuntimeFilterWrapper::State::READY; + EXPECT_TRUE(wrapper->merge(another_wrapper.get()).ok()); + EXPECT_EQ(wrapper->hybrid_set()->size(), 2); + EXPECT_EQ(wrapper->get_state(), RuntimeFilterWrapper::State::READY); + wrapper->to_protobuf(valid_request.mutable_in_filter()); + + // Merge 2 (ignored filter) + another_wrapper = std::make_shared<RuntimeFilterWrapper>(¶ms); + EXPECT_TRUE(another_wrapper->init(2).ok()); + EXPECT_EQ(another_wrapper->get_state(), RuntimeFilterWrapper::State::UNINITED); + another_wrapper->_state = RuntimeFilterWrapper::State::IGNORED; + EXPECT_TRUE(wrapper->merge(another_wrapper.get()).ok()); + EXPECT_EQ(wrapper->hybrid_set()->size(), 2); + EXPECT_EQ(wrapper->get_state(), RuntimeFilterWrapper::State::READY); + + // Merge 3 (disabled filter) + another_wrapper = std::make_shared<RuntimeFilterWrapper>(¶ms); + EXPECT_TRUE(another_wrapper->init(2).ok()); + EXPECT_EQ(another_wrapper->get_state(), RuntimeFilterWrapper::State::UNINITED); + another_wrapper->_state = RuntimeFilterWrapper::State::DISABLED; + EXPECT_TRUE(wrapper->merge(another_wrapper.get()).ok()); + EXPECT_EQ(wrapper->hybrid_set()->size(), 0); + EXPECT_EQ(wrapper->get_state(), RuntimeFilterWrapper::State::DISABLED); + + // Merge 4 (valid filter) + another_wrapper = std::make_shared<RuntimeFilterWrapper>(¶ms); + EXPECT_TRUE(another_wrapper->init(2).ok()); + EXPECT_EQ(another_wrapper->get_state(), RuntimeFilterWrapper::State::UNINITED); + col = vectorized::ColumnHelper::create_column<DataType>({1}); + EXPECT_TRUE(another_wrapper->insert(col, 0).ok()); + another_wrapper->_state = RuntimeFilterWrapper::State::READY; + EXPECT_TRUE(wrapper->merge(another_wrapper.get()).ok()); + EXPECT_EQ(wrapper->hybrid_set()->size(), 0); + EXPECT_EQ(wrapper->get_state(), RuntimeFilterWrapper::State::DISABLED) + << RuntimeFilterWrapper::to_string(wrapper->get_state()); + wrapper->_state = RuntimeFilterWrapper::State::UNINITED; + wrapper->_disabled_reason = ""; + + // Merge 5 (valid filter) + another_wrapper = std::make_shared<RuntimeFilterWrapper>(¶ms); + EXPECT_TRUE(another_wrapper->init(2).ok()); + EXPECT_EQ(another_wrapper->get_state(), RuntimeFilterWrapper::State::UNINITED); + col = vectorized::ColumnHelper::create_column<DataType>({0, 1}); + EXPECT_TRUE(another_wrapper->insert(col, 0).ok()); + another_wrapper->_state = RuntimeFilterWrapper::State::READY; + EXPECT_TRUE(wrapper->merge(another_wrapper.get()).ok()); + EXPECT_EQ(wrapper->hybrid_set()->size(), 2); + EXPECT_EQ(wrapper->get_state(), RuntimeFilterWrapper::State::READY) + << RuntimeFilterWrapper::to_string(wrapper->get_state()); + + // Merge 6 (Exceed the max in set limitation) + another_wrapper = std::make_shared<RuntimeFilterWrapper>(¶ms); + EXPECT_TRUE(another_wrapper->init(2).ok()); + EXPECT_EQ(another_wrapper->get_state(), RuntimeFilterWrapper::State::UNINITED); + col = vectorized::ColumnHelper::create_column<DataType>({3, 4}); + EXPECT_TRUE(another_wrapper->insert(col, 0).ok()); + another_wrapper->_state = RuntimeFilterWrapper::State::READY; + EXPECT_TRUE(wrapper->merge(another_wrapper.get()).ok()); + EXPECT_EQ(wrapper->hybrid_set()->size(), 0); + EXPECT_EQ(wrapper->get_state(), RuntimeFilterWrapper::State::DISABLED); + wrapper->_state = RuntimeFilterWrapper::State::UNINITED; + wrapper->_disabled_reason = ""; + } + { + // Assign disabled filter + PMergeFilterRequest request; + request.set_disabled(true); + EXPECT_TRUE(wrapper->assign(request, nullptr).ok()); + EXPECT_EQ(wrapper->get_state(), RuntimeFilterWrapper::State::DISABLED); + wrapper->_state = RuntimeFilterWrapper::State::UNINITED; + wrapper->_disabled_reason = ""; + + // Assign ignored filter + PMergeFilterRequest request2; + request2.set_ignored(true); + EXPECT_TRUE(wrapper->assign(request2, nullptr).ok()); + EXPECT_EQ(wrapper->get_state(), RuntimeFilterWrapper::State::IGNORED); + wrapper->_state = RuntimeFilterWrapper::State::UNINITED; + wrapper->_disabled_reason = ""; + + // Assign valid filter + valid_request.set_contain_null(false); + valid_request.set_filter_type(PFilterType::IN_FILTER); + EXPECT_TRUE(wrapper->assign(valid_request, nullptr).ok()); + EXPECT_EQ(wrapper->get_state(), RuntimeFilterWrapper::State::READY); + EXPECT_EQ(wrapper->hybrid_set()->size(), 2); + } +} + +TEST_F(RuntimeFilterWrapperTest, TestInAssign) { + int32_t filter_id = 0; + RuntimeFilterType filter_type = RuntimeFilterType::IN_FILTER; + bool null_aware = true; + + int32_t max_in_num = 2; + + int64_t runtime_bloom_filter_min_size = 0; + int64_t runtime_bloom_filter_max_size = 0; + bool build_bf_by_runtime_size = true; + int64_t bloom_filter_size = 0; + bool bloom_filter_size_calculated_by_ndv = true; + bool enable_fixed_len_to_uint32_v2 = true; + + bool bitmap_filter_not_in = false; + +#define APPLY_FOR_PRIMITIVE_TYPE(TYPE, value1, value2) \ + { \ + static constexpr PrimitiveType column_return_type = PrimitiveType::TYPE; \ + RuntimeFilterParams params { \ + .filter_id = filter_id, \ + .filter_type = filter_type, \ + .column_return_type = column_return_type, \ + .null_aware = null_aware, \ + .max_in_num = max_in_num, \ + .runtime_bloom_filter_min_size = runtime_bloom_filter_min_size, \ + .runtime_bloom_filter_max_size = runtime_bloom_filter_max_size, \ + .bloom_filter_size = bloom_filter_size, \ + .build_bf_by_runtime_size = build_bf_by_runtime_size, \ + .bloom_filter_size_calculated_by_ndv = bloom_filter_size_calculated_by_ndv, \ + .enable_fixed_len_to_uint32_v2 = enable_fixed_len_to_uint32_v2, \ + .bitmap_filter_not_in = bitmap_filter_not_in}; \ + auto wrapper = std::make_shared<RuntimeFilterWrapper>(¶ms); \ + PMergeFilterRequest valid_request; \ + auto* in_filter = valid_request.mutable_in_filter(); \ + in_filter->set_column_type(to_proto(column_return_type)); \ + get_convertor<PrimitiveTypeTraits<column_return_type>::CppType>()(in_filter->add_values(), \ + value1); \ + get_convertor<PrimitiveTypeTraits<column_return_type>::CppType>()(in_filter->add_values(), \ + value2); \ + valid_request.set_contain_null(false); \ + valid_request.set_filter_type(PFilterType::IN_FILTER); \ + EXPECT_TRUE(wrapper->assign(valid_request, nullptr).ok()); \ + EXPECT_EQ(wrapper->get_state(), RuntimeFilterWrapper::State::READY); \ + EXPECT_EQ(wrapper->hybrid_set()->size(), 2); \ + } + +#define APPLY_FOR_PRIMITIVE_BASE_TYPE(TYPE) APPLY_FOR_PRIMITIVE_TYPE(TYPE, 0, 1) + + APPLY_FOR_PRIMITIVE_BASE_TYPE(TYPE_BOOLEAN); + APPLY_FOR_PRIMITIVE_BASE_TYPE(TYPE_TINYINT); + APPLY_FOR_PRIMITIVE_BASE_TYPE(TYPE_SMALLINT); + APPLY_FOR_PRIMITIVE_BASE_TYPE(TYPE_INT); + APPLY_FOR_PRIMITIVE_BASE_TYPE(TYPE_BIGINT); + APPLY_FOR_PRIMITIVE_BASE_TYPE(TYPE_LARGEINT); + APPLY_FOR_PRIMITIVE_BASE_TYPE(TYPE_FLOAT); + APPLY_FOR_PRIMITIVE_BASE_TYPE(TYPE_DOUBLE); + APPLY_FOR_PRIMITIVE_BASE_TYPE(TYPE_DATEV2); + APPLY_FOR_PRIMITIVE_BASE_TYPE(TYPE_DATETIMEV2); + APPLY_FOR_PRIMITIVE_TYPE(TYPE_DATETIME, VecDateTimeValue(0, 3, 0, 0, 0, 2020, 1, 1), + VecDateTimeValue(0, 3, 0, 0, 0, 2020, 1, 2)); + APPLY_FOR_PRIMITIVE_TYPE(TYPE_DATE, VecDateTimeValue(0, 2, 0, 0, 0, 2020, 1, 1), + VecDateTimeValue(0, 2, 0, 0, 0, 2020, 1, 2)); + APPLY_FOR_PRIMITIVE_TYPE(TYPE_DECIMALV2, DecimalV2Value(1, 1), DecimalV2Value(1, 2)); + APPLY_FOR_PRIMITIVE_BASE_TYPE(TYPE_DECIMAL32); + APPLY_FOR_PRIMITIVE_BASE_TYPE(TYPE_DECIMAL64); + APPLY_FOR_PRIMITIVE_BASE_TYPE(TYPE_DECIMAL128I); + APPLY_FOR_PRIMITIVE_TYPE(TYPE_DECIMAL256, vectorized::Decimal256(0), vectorized::Decimal256(1)); + APPLY_FOR_PRIMITIVE_TYPE(TYPE_VARCHAR, StringRef("1"), StringRef("2")); + APPLY_FOR_PRIMITIVE_TYPE(TYPE_CHAR, StringRef("1"), StringRef("2")); + APPLY_FOR_PRIMITIVE_TYPE(TYPE_STRING, StringRef("1"), StringRef("2")); + APPLY_FOR_PRIMITIVE_BASE_TYPE(TYPE_IPV4); + APPLY_FOR_PRIMITIVE_BASE_TYPE(TYPE_IPV6); } } // namespace doris --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org For additional commands, e-mail: commits-h...@doris.apache.org