This is an automated email from the ASF dual-hosted git repository.

panxiaolei pushed a commit to branch refactor_rf
in repository https://gitbox.apache.org/repos/asf/doris.git

commit 9db838121b6a802660cbd9947131aaa4b4fde30b
Author: Gabriel <liwenqi...@selectdb.com>
AuthorDate: Mon Mar 3 17:33:30 2025 +0800

    [runtime filter](UT) test in filter (#48562)
---
 be/src/exprs/bloom_filter_func.h                   |   5 +-
 be/src/exprs/filter_base.h                         |   8 +-
 be/src/exprs/hybrid_set.h                          |  11 +
 be/src/runtime_filter/runtime_filter.h             |   4 +-
 be/src/runtime_filter/runtime_filter_definitions.h |   9 +-
 be/src/runtime_filter/runtime_filter_producer.h    |   6 +-
 .../runtime_filter_producer_helper.cpp             |   7 +-
 .../runtime_filter_producer_helper.h               |   2 +-
 .../runtime_filter_producer_helper_cross.h         |   2 +-
 be/src/runtime_filter/runtime_filter_wrapper.cpp   |  71 +++++-
 be/src/runtime_filter/runtime_filter_wrapper.h     |  46 +---
 .../runtime_filter/runtime_filter_wrapper_test.cpp | 239 +++++++++++++++++++--
 12 files changed, 324 insertions(+), 86 deletions(-)

diff --git a/be/src/exprs/bloom_filter_func.h b/be/src/exprs/bloom_filter_func.h
index 64123250e48..774cf42e1e6 100644
--- a/be/src/exprs/bloom_filter_func.h
+++ b/be/src/exprs/bloom_filter_func.h
@@ -42,9 +42,6 @@ public:
         _enable_fixed_len_to_uint32_v2 = params->enable_fixed_len_to_uint32_v2;
         _limit_length();
     }
-
-    bool build_bf_by_runtime_size() const { return _build_bf_by_runtime_size; }
-
     Status init_with_fixed_length(size_t runtime_size) {
         if (_build_bf_by_runtime_size) {
             // Use the same algorithm as 
org.apache.doris.planner.RuntimeFilter#calculateFilterSize
@@ -74,6 +71,8 @@ public:
         return Status::OK();
     }
 
+    bool build_bf_by_runtime_size() const { return _build_bf_by_runtime_size; }
+
     Status merge(BloomFilterFuncBase* other) {
         if (other == nullptr) {
             return Status::InternalError("bloomfilter_func is nullptr");
diff --git a/be/src/exprs/filter_base.h b/be/src/exprs/filter_base.h
index 5534dc75589..25778059886 100644
--- a/be/src/exprs/filter_base.h
+++ b/be/src/exprs/filter_base.h
@@ -26,13 +26,7 @@ public:
     FilterBase(bool null_aware) : _null_aware(null_aware) {}
     bool contain_null() const { return _null_aware && _contain_null; }
 
-    void set_contain_null(bool contain_null) {
-        if (_contain_null && !contain_null) {
-            throw Exception(ErrorCode::INTERNAL_ERROR,
-                            "contain_null cannot be changed from true to 
false");
-        }
-        _contain_null = contain_null;
-    }
+    void set_contain_null(bool contain_null) { _contain_null |= contain_null; }
 
 protected:
     // Indicates whether a null datum exists to build this filter.
diff --git a/be/src/exprs/hybrid_set.h b/be/src/exprs/hybrid_set.h
index 1358c479dca..094150f3de0 100644
--- a/be/src/exprs/hybrid_set.h
+++ b/be/src/exprs/hybrid_set.h
@@ -148,6 +148,11 @@ public:
     Iterator begin() { return Iterator(_data, 0); }
     Iterator end() { return Iterator(_data, _size); }
 
+    void clear() {
+        std::array<T, N> {}.swap(_data);
+        _size = 0;
+    }
+
 private:
     std::array<T, N> _data;
     size_t _size {};
@@ -179,6 +184,8 @@ public:
 
     bool find(const T& value) const { return _set.contains(value); }
 
+    void clear() { _set.clear(); }
+
     Iterator begin() { return _set.begin(); }
 
     Iterator end() { return _set.end(); }
@@ -210,6 +217,7 @@ public:
         _contain_null |= set->_contain_null;
     }
 
+    virtual void clear() = 0;
     bool empty() { return !_contain_null && size() == 0; }
     virtual int size() = 0;
     virtual bool find(const void* data) const = 0;
@@ -262,6 +270,7 @@ public:
         }
         _set.insert(*reinterpret_cast<const ElementType*>(data));
     }
+    void clear() override { _set.clear(); }
 
     void insert(void* data, size_t /*unused*/) override { insert(data); }
 
@@ -390,6 +399,7 @@ public:
 
     ~StringSet() override = default;
 
+    void clear() override { _set.clear(); }
     void insert(const void* data) override {
         if (data == nullptr) {
             _contain_null = true;
@@ -558,6 +568,7 @@ public:
     StringValueSet(bool null_aware) : HybridSetBase(null_aware) {}
 
     ~StringValueSet() override = default;
+    void clear() override { _set.clear(); }
 
     void insert(const void* data) override {
         if (data == nullptr) {
diff --git a/be/src/runtime_filter/runtime_filter.h 
b/be/src/runtime_filter/runtime_filter.h
index dc2fbd20d2e..660f0ad88ce 100644
--- a/be/src/runtime_filter/runtime_filter.h
+++ b/be/src/runtime_filter/runtime_filter.h
@@ -95,10 +95,10 @@ protected:
 
     template <typename T>
     void _to_protobuf(T* filter) {
-        _wrapper->_to_protobuf(filter);
+        _wrapper->to_protobuf(filter);
     }
     void _to_protobuf(PBloomFilter* filter, char** data, int* filter_length) {
-        _wrapper->_to_protobuf(filter, data, filter_length);
+        _wrapper->to_protobuf(filter, data, filter_length);
     }
 
     Status _push_to_remote(RuntimeState* state, const TNetworkAddress* addr);
diff --git a/be/src/runtime_filter/runtime_filter_definitions.h 
b/be/src/runtime_filter/runtime_filter_definitions.h
index ed732b99532..693e62beea9 100644
--- a/be/src/runtime_filter/runtime_filter_definitions.h
+++ b/be/src/runtime_filter/runtime_filter_definitions.h
@@ -42,11 +42,14 @@ struct RuntimeFilterParams {
     RuntimeFilterType filter_type {};
     // Data type of build column
     PrimitiveType column_return_type {};
+    // Whether this runtime filter is null-aware
     bool null_aware {};
-    bool enable_fixed_len_to_uint32_v2 {};
 
+    // In filter
     // The max limitation of in-set
     int32_t max_in_num {};
+
+    // Bloom filter
     // The min size limitation of bloom filter
     int64_t runtime_bloom_filter_min_size {};
     // The max size limitation of bloom filter
@@ -57,7 +60,11 @@ struct RuntimeFilterParams {
     bool build_bf_by_runtime_size {};
     // Whether an estimated size by NDV is used to build bloom filter
     bool bloom_filter_size_calculated_by_ndv {};
+    // Whether an optimized way to build BF using fixed-length values
+    bool enable_fixed_len_to_uint32_v2 {};
 
+    // Bitmap filter
+    // Whether a join expression is `not in`
     bool bitmap_filter_not_in {};
 };
 
diff --git a/be/src/runtime_filter/runtime_filter_producer.h 
b/be/src/runtime_filter/runtime_filter_producer.h
index 1bb7ac6530a..022fe6450d8 100644
--- a/be/src/runtime_filter/runtime_filter_producer.h
+++ b/be/src/runtime_filter/runtime_filter_producer.h
@@ -59,13 +59,13 @@ public:
                      const std::shared_ptr<pipeline::CountedFinishDependency>& 
dependency);
 
     // insert data to build filter
-    void insert(vectorized::ColumnPtr column, size_t start) {
+    Status insert(vectorized::ColumnPtr column, size_t start) {
         if (_rf_state == State::READY_TO_PUBLISH || _rf_state == 
State::PUBLISHED) {
             DCHECK(!_wrapper->is_valid());
-            return;
+            return Status::OK();
         }
         _check_state({State::WAITING_FOR_DATA});
-        _wrapper->insert(column, start);
+        return _wrapper->insert(column, start);
     }
     Status publish(RuntimeState* state, bool build_hash_table);
 
diff --git a/be/src/runtime_filter/runtime_filter_producer_helper.cpp 
b/be/src/runtime_filter/runtime_filter_producer_helper.cpp
index b68e1c00221..53038a7fa61 100644
--- a/be/src/runtime_filter/runtime_filter_producer_helper.cpp
+++ b/be/src/runtime_filter/runtime_filter_producer_helper.cpp
@@ -64,7 +64,7 @@ Status 
RuntimeFilterProducerHelper::_init_filters(RuntimeState* state,
     return Status::OK();
 }
 
-void RuntimeFilterProducerHelper::_insert(const vectorized::Block* block, 
size_t start) {
+Status RuntimeFilterProducerHelper::_insert(const vectorized::Block* block, 
size_t start) {
     SCOPED_TIMER(_runtime_filter_compute_timer);
     for (int i = 0; i < _producers.size(); i++) {
         auto filter = _producers[i];
@@ -74,8 +74,9 @@ void RuntimeFilterProducerHelper::_insert(const 
vectorized::Block* block, size_t
         }
         int result_column_id = 
_filter_expr_contexts[i]->get_last_result_column_id();
         const auto& column = block->get_by_position(result_column_id).column;
-        filter->insert(column, start);
+        RETURN_IF_ERROR(filter->insert(column, start));
     }
+    return Status::OK();
 }
 
 Status RuntimeFilterProducerHelper::_publish(RuntimeState* state) {
@@ -103,7 +104,7 @@ Status RuntimeFilterProducerHelper::process(
         uint64_t hash_table_size = block ? block->rows() : 0;
         RETURN_IF_ERROR(_init_filters(state, hash_table_size));
         if (hash_table_size > 1) {
-            _insert(block, 1);
+            RETURN_IF_ERROR(_insert(block, 1));
         }
     }
 
diff --git a/be/src/runtime_filter/runtime_filter_producer_helper.h 
b/be/src/runtime_filter/runtime_filter_producer_helper.h
index e85a5fe008a..f8b7d985f50 100644
--- a/be/src/runtime_filter/runtime_filter_producer_helper.h
+++ b/be/src/runtime_filter/runtime_filter_producer_helper.h
@@ -65,7 +65,7 @@ protected:
     virtual void _init_expr(const vectorized::VExprContextSPtrs& 
build_expr_ctxs,
                             const std::vector<TRuntimeFilterDesc>& 
runtime_filter_descs);
     Status _init_filters(RuntimeState* state, uint64_t local_hash_table_size);
-    void _insert(const vectorized::Block* block, size_t start);
+    Status _insert(const vectorized::Block* block, size_t start);
     Status _publish(RuntimeState* state);
 
     std::vector<std::shared_ptr<RuntimeFilterProducer>> _producers;
diff --git a/be/src/runtime_filter/runtime_filter_producer_helper_cross.h 
b/be/src/runtime_filter/runtime_filter_producer_helper_cross.h
index 2c6cd4e8234..a7525fba4df 100644
--- a/be/src/runtime_filter/runtime_filter_producer_helper_cross.h
+++ b/be/src/runtime_filter/runtime_filter_producer_helper_cross.h
@@ -56,7 +56,7 @@ private:
                     block->get_by_position(result_column_id)
                             .column->convert_to_full_column_if_const();
         }
-        _insert(block, 0);
+        RETURN_IF_ERROR(_insert(block, 0));
         return Status::OK();
     }
 
diff --git a/be/src/runtime_filter/runtime_filter_wrapper.cpp 
b/be/src/runtime_filter/runtime_filter_wrapper.cpp
index 99889ce2b72..c3c5217d775 100644
--- a/be/src/runtime_filter/runtime_filter_wrapper.cpp
+++ b/be/src/runtime_filter/runtime_filter_wrapper.cpp
@@ -89,10 +89,18 @@ Status RuntimeFilterWrapper::init(const size_t real_size) {
     return Status::OK();
 }
 
-void RuntimeFilterWrapper::insert(const vectorized::ColumnPtr& column, size_t 
start) {
+Status RuntimeFilterWrapper::insert(const vectorized::ColumnPtr& column, 
size_t start) {
     switch (_filter_type) {
     case RuntimeFilterType::IN_FILTER: {
         _hybrid_set->insert_fixed_len(column, start);
+        if (_hybrid_set->size() > _max_in_num) [[unlikely]] {
+            _hybrid_set->clear();
+            set_state(State::DISABLED, fmt::format("reach max in num: {}", 
_max_in_num));
+            return Status::InternalError(
+                    "Size of in set with actual size {} should be less than 
the limitation {} in "
+                    "runtime filter {}.",
+                    _hybrid_set->size(), _max_in_num, _filter_id);
+        }
         break;
     }
     case RuntimeFilterType::MIN_FILTER:
@@ -140,6 +148,7 @@ void RuntimeFilterWrapper::insert(const 
vectorized::ColumnPtr& column, size_t st
         DCHECK(false);
         break;
     }
+    return Status::OK();
 }
 
 bool RuntimeFilterWrapper::build_bf_by_runtime_size() const {
@@ -147,11 +156,12 @@ bool RuntimeFilterWrapper::build_bf_by_runtime_size() 
const {
 }
 
 Status RuntimeFilterWrapper::merge(const RuntimeFilterWrapper* other) {
+    if (other->_state == State::IGNORED || _state == State::DISABLED) {
+        return Status::OK();
+    }
     if (other->_state == State::DISABLED) {
+        _hybrid_set->clear();
         set_state(State::DISABLED, other->_disabled_reason);
-    }
-
-    if (other->_state == State::IGNORED || _state == State::DISABLED) {
         return Status::OK();
     }
 
@@ -165,7 +175,8 @@ Status RuntimeFilterWrapper::merge(const 
RuntimeFilterWrapper* other) {
     switch (_filter_type) {
     case RuntimeFilterType::IN_FILTER: {
         _hybrid_set->insert(other->_hybrid_set.get());
-        if (_max_in_num >= 0 && _hybrid_set->size() >= _max_in_num) {
+        if (_max_in_num >= 0 && _hybrid_set->size() > _max_in_num) {
+            _hybrid_set->clear();
             set_state(State::DISABLED, fmt::format("reach max in num: {}", 
_max_in_num));
         }
         break;
@@ -194,7 +205,7 @@ Status RuntimeFilterWrapper::merge(const 
RuntimeFilterWrapper* other) {
             // case2: all input-filter's build_bf_by_runtime_size is false, 
inited by default size
             if (other_filter_type == RuntimeFilterType::IN_FILTER) {
                 _hybrid_set->insert(other->_hybrid_set.get());
-                if (_max_in_num >= 0 && _hybrid_set->size() >= _max_in_num) {
+                if (_max_in_num >= 0 && _hybrid_set->size() > _max_in_num) {
                     // case2: use default size to init bf
                     
RETURN_IF_ERROR(_bloom_filter_func->init_with_fixed_length(0));
                     RETURN_IF_ERROR(_change_to_bloom_filter());
@@ -600,20 +611,62 @@ std::string RuntimeFilterWrapper::debug_string() const {
     return result + "]";
 }
 
-void RuntimeFilterWrapper::_to_protobuf(PInFilter* filter) {
+void RuntimeFilterWrapper::to_protobuf(PInFilter* filter) {
     filter->set_column_type(to_proto(column_type()));
     _hybrid_set->to_pb(filter);
 }
 
-void RuntimeFilterWrapper::_to_protobuf(PMinMaxFilter* filter) {
+void RuntimeFilterWrapper::to_protobuf(PMinMaxFilter* filter) {
     filter->set_column_type(to_proto(column_type()));
     _minmax_func->to_pb(filter);
 }
 
-void RuntimeFilterWrapper::_to_protobuf(PBloomFilter* filter, char** data, 
int* filter_length) {
+void RuntimeFilterWrapper::to_protobuf(PBloomFilter* filter, char** data, int* 
filter_length) {
     _bloom_filter_func->get_data(data, filter_length);
     filter->set_filter_length(*filter_length);
     filter->set_always_true(false);
 }
 
+template <class T>
+Status RuntimeFilterWrapper::assign(const T& request, 
butil::IOBufAsZeroCopyInputStream* data) {
+    PFilterType filter_type = request.filter_type();
+
+    if (request.has_disabled() && request.disabled()) {
+        set_state(State::DISABLED, "get disabled from remote");
+        return Status::OK();
+    }
+
+    if (request.has_ignored() && request.ignored()) {
+        set_state(State::IGNORED, "get ignored from remote");
+        return Status::OK();
+    }
+
+    set_state(State::READY);
+
+    switch (filter_type) {
+    case PFilterType::IN_FILTER: {
+        DCHECK(request.has_in_filter());
+        return _assign(request.in_filter(), request.contain_null());
+    }
+    case PFilterType::BLOOM_FILTER: {
+        DCHECK(request.has_bloom_filter());
+        _hybrid_set.reset(); // change in_or_bloom filter to bloom filter
+        return _assign(request.bloom_filter(), data, request.contain_null());
+    }
+    case PFilterType::MIN_FILTER:
+    case PFilterType::MAX_FILTER:
+    case PFilterType::MINMAX_FILTER: {
+        DCHECK(request.has_minmax_filter());
+        return _assign(request.minmax_filter(), request.contain_null());
+    }
+    default:
+        return Status::InternalError("unknown filter type {}", 
int(filter_type));
+    }
+}
+
+template Status RuntimeFilterWrapper::assign<doris::PMergeFilterRequest>(
+        doris::PMergeFilterRequest const&, butil::IOBufAsZeroCopyInputStream*);
+template Status RuntimeFilterWrapper::assign<doris::PPublishFilterRequestV2>(
+        doris::PPublishFilterRequestV2 const&, 
butil::IOBufAsZeroCopyInputStream*);
+
 } // namespace doris
diff --git a/be/src/runtime_filter/runtime_filter_wrapper.h 
b/be/src/runtime_filter/runtime_filter_wrapper.h
index baf02e92e3c..5695d9328d7 100644
--- a/be/src/runtime_filter/runtime_filter_wrapper.h
+++ b/be/src/runtime_filter/runtime_filter_wrapper.h
@@ -52,44 +52,10 @@ public:
               _max_in_num(max_in_num) {}
 
     Status init(const size_t runtime_size);
-    void insert(const vectorized::ColumnPtr& column, size_t start);
+    Status insert(const vectorized::ColumnPtr& column, size_t start);
     Status merge(const RuntimeFilterWrapper* wrapper);
     template <class T>
-    Status assign(const T& request, butil::IOBufAsZeroCopyInputStream* data) {
-        PFilterType filter_type = request.filter_type();
-
-        if (request.has_disabled() && request.disabled()) {
-            set_state(State::DISABLED, "get disabled from remote");
-            return Status::OK();
-        }
-
-        if (request.has_ignored() && request.ignored()) {
-            set_state(State::IGNORED, "get ignored from remote");
-            return Status::OK();
-        }
-
-        set_state(State::READY);
-
-        switch (filter_type) {
-        case PFilterType::IN_FILTER: {
-            DCHECK(request.has_in_filter());
-            return _assign(request.in_filter(), request.contain_null());
-        }
-        case PFilterType::BLOOM_FILTER: {
-            DCHECK(request.has_bloom_filter());
-            _hybrid_set.reset(); // change in_or_bloom filter to bloom filter
-            return _assign(request.bloom_filter(), data, 
request.contain_null());
-        }
-        case PFilterType::MIN_FILTER:
-        case PFilterType::MAX_FILTER:
-        case PFilterType::MINMAX_FILTER: {
-            DCHECK(request.has_minmax_filter());
-            return _assign(request.minmax_filter(), request.contain_null());
-        }
-        default:
-            return Status::InternalError("unknown filter type {}", 
int(filter_type));
-        }
-    }
+    Status assign(const T& request, butil::IOBufAsZeroCopyInputStream* data);
 
     bool is_valid() const { return _state != State::DISABLED && _state != 
State::IGNORED; }
     int filter_id() const { return _filter_id; }
@@ -110,6 +76,10 @@ public:
     std::shared_ptr<BloomFilterFuncBase> bloom_filter_func() const { return 
_bloom_filter_func; }
     std::shared_ptr<BitmapFilterFuncBase> bitmap_filter_func() const { return 
_bitmap_filter_func; }
 
+    void to_protobuf(PInFilter* filter);
+    void to_protobuf(PMinMaxFilter* filter);
+    void to_protobuf(PBloomFilter* filter, char** data, int* filter_length);
+
     PrimitiveType column_type() const { return _column_return_type; }
 
     bool contain_null() const;
@@ -148,16 +118,12 @@ public:
     }
 
 private:
-    friend class RuntimeFilter;
     // used by shuffle runtime filter
     // assign this filter by protobuf
     Status _assign(const PInFilter& in_filter, bool contain_null);
     Status _assign(const PBloomFilter& bloom_filter, 
butil::IOBufAsZeroCopyInputStream* data,
                    bool contain_null);
     Status _assign(const PMinMaxFilter& minmax_filter, bool contain_null);
-    void _to_protobuf(PInFilter* filter);
-    void _to_protobuf(PMinMaxFilter* filter);
-    void _to_protobuf(PBloomFilter* filter, char** data, int* filter_length);
     Status _change_to_bloom_filter();
     // When a runtime filter received from remote and it is a bloom filter, 
_column_return_type will be invalid.
     const PrimitiveType _column_return_type; // column type
diff --git a/be/test/runtime_filter/runtime_filter_wrapper_test.cpp 
b/be/test/runtime_filter/runtime_filter_wrapper_test.cpp
index 963bbfc223b..8cd3c1e32b6 100644
--- a/be/test/runtime_filter/runtime_filter_wrapper_test.cpp
+++ b/be/test/runtime_filter/runtime_filter_wrapper_test.cpp
@@ -20,6 +20,10 @@
 #include <glog/logging.h>
 #include <gtest/gtest.h>
 
+#include "exprs/hybrid_set.h"
+#include "testutil/column_helper.h"
+#include "vec/data_types/data_type_number.h"
+
 namespace doris {
 
 class RuntimeFilterWrapperTest : public testing::Test {
@@ -30,35 +34,238 @@ public:
     void TearDown() override {}
 };
 
-TEST_F(RuntimeFilterWrapperTest, basic) {
+TEST_F(RuntimeFilterWrapperTest, TestIn) {
+    using DataType = vectorized::DataTypeInt32;
     int32_t filter_id = 0;
     RuntimeFilterType filter_type = RuntimeFilterType::IN_FILTER;
+    bool null_aware = true;
     PrimitiveType column_return_type = PrimitiveType::TYPE_INT;
-    int32_t max_in_num = 0;
+
+    int32_t max_in_num = 2;
+
     int64_t runtime_bloom_filter_min_size = 0;
     int64_t runtime_bloom_filter_max_size = 0;
     bool build_bf_by_runtime_size = true;
     int64_t bloom_filter_size = 0;
     bool bloom_filter_size_calculated_by_ndv = true;
-    bool null_aware = true;
     bool enable_fixed_len_to_uint32_v2 = true;
+
     bool bitmap_filter_not_in = false;
 
-    RuntimeFilterParams params;
-    params.filter_id = filter_id;
-    params.filter_type = filter_type;
-    params.column_return_type = column_return_type;
-    params.max_in_num = max_in_num;
-    params.runtime_bloom_filter_min_size = runtime_bloom_filter_min_size;
-    params.runtime_bloom_filter_max_size = runtime_bloom_filter_max_size;
-    params.build_bf_by_runtime_size = build_bf_by_runtime_size;
-    params.bloom_filter_size_calculated_by_ndv = 
bloom_filter_size_calculated_by_ndv;
-    params.bloom_filter_size = bloom_filter_size;
-    params.null_aware = null_aware;
-    params.enable_fixed_len_to_uint32_v2 = enable_fixed_len_to_uint32_v2;
-    params.bitmap_filter_not_in = bitmap_filter_not_in;
+    PMergeFilterRequest valid_request;
+    RuntimeFilterParams params {
+            .filter_id = filter_id,
+            .filter_type = filter_type,
+            .column_return_type = column_return_type,
+            .null_aware = null_aware,
+            .max_in_num = max_in_num,
+            .runtime_bloom_filter_min_size = runtime_bloom_filter_min_size,
+            .runtime_bloom_filter_max_size = runtime_bloom_filter_max_size,
+            .bloom_filter_size = bloom_filter_size,
+            .build_bf_by_runtime_size = build_bf_by_runtime_size,
+            .bloom_filter_size_calculated_by_ndv = 
bloom_filter_size_calculated_by_ndv,
+            .enable_fixed_len_to_uint32_v2 = enable_fixed_len_to_uint32_v2,
+            .bitmap_filter_not_in = bitmap_filter_not_in};
 
     auto wrapper = std::make_shared<RuntimeFilterWrapper>(&params);
+    EXPECT_EQ(wrapper->minmax_func(), nullptr);
+    EXPECT_EQ(wrapper->bloom_filter_func(), nullptr);
+    EXPECT_EQ(wrapper->bitmap_filter_func(), nullptr);
+    EXPECT_NE(wrapper->hybrid_set(), nullptr);
+    {
+        // Init
+        EXPECT_TRUE(wrapper->init(2).ok());
+        EXPECT_EQ(wrapper->get_state(), RuntimeFilterWrapper::State::UNINITED);
+
+        EXPECT_TRUE(wrapper->init(3).ok());
+        EXPECT_EQ(wrapper->get_state(), RuntimeFilterWrapper::State::DISABLED);
+
+        wrapper->_state = RuntimeFilterWrapper::State::UNINITED;
+        wrapper->_disabled_reason = "";
+    }
+    {
+        // Insert
+        auto col =
+                vectorized::ColumnHelper::create_column<DataType>({0, 1, 2, 3, 
4, 5, 6, 7, 8, 9});
+        EXPECT_EQ(wrapper->insert(col, 0).code(), ErrorCode::INTERNAL_ERROR);
+        wrapper->_state = RuntimeFilterWrapper::State::UNINITED;
+        wrapper->_disabled_reason = "";
+
+        col = vectorized::ColumnHelper::create_column<DataType>({0});
+        EXPECT_TRUE(wrapper->insert(col, 0).ok());
+        EXPECT_EQ(wrapper->hybrid_set()->size(), col->size());
+        EXPECT_EQ(wrapper->get_state(), RuntimeFilterWrapper::State::UNINITED);
+    }
+    {
+        // Merge 1 (valid filter)
+        auto another_wrapper = std::make_shared<RuntimeFilterWrapper>(&params);
+        EXPECT_TRUE(another_wrapper->init(2).ok());
+        EXPECT_EQ(another_wrapper->get_state(), 
RuntimeFilterWrapper::State::UNINITED);
+
+        auto col = vectorized::ColumnHelper::create_column<DataType>({1});
+        EXPECT_TRUE(another_wrapper->insert(col, 0).ok());
+        another_wrapper->_state = RuntimeFilterWrapper::State::READY;
+        EXPECT_TRUE(wrapper->merge(another_wrapper.get()).ok());
+        EXPECT_EQ(wrapper->hybrid_set()->size(), 2);
+        EXPECT_EQ(wrapper->get_state(), RuntimeFilterWrapper::State::READY);
+        wrapper->to_protobuf(valid_request.mutable_in_filter());
+
+        // Merge 2 (ignored filter)
+        another_wrapper = std::make_shared<RuntimeFilterWrapper>(&params);
+        EXPECT_TRUE(another_wrapper->init(2).ok());
+        EXPECT_EQ(another_wrapper->get_state(), 
RuntimeFilterWrapper::State::UNINITED);
+        another_wrapper->_state = RuntimeFilterWrapper::State::IGNORED;
+        EXPECT_TRUE(wrapper->merge(another_wrapper.get()).ok());
+        EXPECT_EQ(wrapper->hybrid_set()->size(), 2);
+        EXPECT_EQ(wrapper->get_state(), RuntimeFilterWrapper::State::READY);
+
+        // Merge 3 (disabled filter)
+        another_wrapper = std::make_shared<RuntimeFilterWrapper>(&params);
+        EXPECT_TRUE(another_wrapper->init(2).ok());
+        EXPECT_EQ(another_wrapper->get_state(), 
RuntimeFilterWrapper::State::UNINITED);
+        another_wrapper->_state = RuntimeFilterWrapper::State::DISABLED;
+        EXPECT_TRUE(wrapper->merge(another_wrapper.get()).ok());
+        EXPECT_EQ(wrapper->hybrid_set()->size(), 0);
+        EXPECT_EQ(wrapper->get_state(), RuntimeFilterWrapper::State::DISABLED);
+
+        // Merge 4 (valid filter)
+        another_wrapper = std::make_shared<RuntimeFilterWrapper>(&params);
+        EXPECT_TRUE(another_wrapper->init(2).ok());
+        EXPECT_EQ(another_wrapper->get_state(), 
RuntimeFilterWrapper::State::UNINITED);
+        col = vectorized::ColumnHelper::create_column<DataType>({1});
+        EXPECT_TRUE(another_wrapper->insert(col, 0).ok());
+        another_wrapper->_state = RuntimeFilterWrapper::State::READY;
+        EXPECT_TRUE(wrapper->merge(another_wrapper.get()).ok());
+        EXPECT_EQ(wrapper->hybrid_set()->size(), 0);
+        EXPECT_EQ(wrapper->get_state(), RuntimeFilterWrapper::State::DISABLED)
+                << RuntimeFilterWrapper::to_string(wrapper->get_state());
+        wrapper->_state = RuntimeFilterWrapper::State::UNINITED;
+        wrapper->_disabled_reason = "";
+
+        // Merge 5 (valid filter)
+        another_wrapper = std::make_shared<RuntimeFilterWrapper>(&params);
+        EXPECT_TRUE(another_wrapper->init(2).ok());
+        EXPECT_EQ(another_wrapper->get_state(), 
RuntimeFilterWrapper::State::UNINITED);
+        col = vectorized::ColumnHelper::create_column<DataType>({0, 1});
+        EXPECT_TRUE(another_wrapper->insert(col, 0).ok());
+        another_wrapper->_state = RuntimeFilterWrapper::State::READY;
+        EXPECT_TRUE(wrapper->merge(another_wrapper.get()).ok());
+        EXPECT_EQ(wrapper->hybrid_set()->size(), 2);
+        EXPECT_EQ(wrapper->get_state(), RuntimeFilterWrapper::State::READY)
+                << RuntimeFilterWrapper::to_string(wrapper->get_state());
+
+        // Merge 6 (Exceed the max in set limitation)
+        another_wrapper = std::make_shared<RuntimeFilterWrapper>(&params);
+        EXPECT_TRUE(another_wrapper->init(2).ok());
+        EXPECT_EQ(another_wrapper->get_state(), 
RuntimeFilterWrapper::State::UNINITED);
+        col = vectorized::ColumnHelper::create_column<DataType>({3, 4});
+        EXPECT_TRUE(another_wrapper->insert(col, 0).ok());
+        another_wrapper->_state = RuntimeFilterWrapper::State::READY;
+        EXPECT_TRUE(wrapper->merge(another_wrapper.get()).ok());
+        EXPECT_EQ(wrapper->hybrid_set()->size(), 0);
+        EXPECT_EQ(wrapper->get_state(), RuntimeFilterWrapper::State::DISABLED);
+        wrapper->_state = RuntimeFilterWrapper::State::UNINITED;
+        wrapper->_disabled_reason = "";
+    }
+    {
+        // Assign disabled filter
+        PMergeFilterRequest request;
+        request.set_disabled(true);
+        EXPECT_TRUE(wrapper->assign(request, nullptr).ok());
+        EXPECT_EQ(wrapper->get_state(), RuntimeFilterWrapper::State::DISABLED);
+        wrapper->_state = RuntimeFilterWrapper::State::UNINITED;
+        wrapper->_disabled_reason = "";
+
+        // Assign ignored filter
+        PMergeFilterRequest request2;
+        request2.set_ignored(true);
+        EXPECT_TRUE(wrapper->assign(request2, nullptr).ok());
+        EXPECT_EQ(wrapper->get_state(), RuntimeFilterWrapper::State::IGNORED);
+        wrapper->_state = RuntimeFilterWrapper::State::UNINITED;
+        wrapper->_disabled_reason = "";
+
+        // Assign valid filter
+        valid_request.set_contain_null(false);
+        valid_request.set_filter_type(PFilterType::IN_FILTER);
+        EXPECT_TRUE(wrapper->assign(valid_request, nullptr).ok());
+        EXPECT_EQ(wrapper->get_state(), RuntimeFilterWrapper::State::READY);
+        EXPECT_EQ(wrapper->hybrid_set()->size(), 2);
+    }
+}
+
+TEST_F(RuntimeFilterWrapperTest, TestInAssign) {
+    int32_t filter_id = 0;
+    RuntimeFilterType filter_type = RuntimeFilterType::IN_FILTER;
+    bool null_aware = true;
+
+    int32_t max_in_num = 2;
+
+    int64_t runtime_bloom_filter_min_size = 0;
+    int64_t runtime_bloom_filter_max_size = 0;
+    bool build_bf_by_runtime_size = true;
+    int64_t bloom_filter_size = 0;
+    bool bloom_filter_size_calculated_by_ndv = true;
+    bool enable_fixed_len_to_uint32_v2 = true;
+
+    bool bitmap_filter_not_in = false;
+
+#define APPLY_FOR_PRIMITIVE_TYPE(TYPE, value1, value2)                         
                    \
+    {                                                                          
                    \
+        static constexpr PrimitiveType column_return_type = 
PrimitiveType::TYPE;                   \
+        RuntimeFilterParams params {                                           
                    \
+                .filter_id = filter_id,                                        
                    \
+                .filter_type = filter_type,                                    
                    \
+                .column_return_type = column_return_type,                      
                    \
+                .null_aware = null_aware,                                      
                    \
+                .max_in_num = max_in_num,                                      
                    \
+                .runtime_bloom_filter_min_size = 
runtime_bloom_filter_min_size,                    \
+                .runtime_bloom_filter_max_size = 
runtime_bloom_filter_max_size,                    \
+                .bloom_filter_size = bloom_filter_size,                        
                    \
+                .build_bf_by_runtime_size = build_bf_by_runtime_size,          
                    \
+                .bloom_filter_size_calculated_by_ndv = 
bloom_filter_size_calculated_by_ndv,        \
+                .enable_fixed_len_to_uint32_v2 = 
enable_fixed_len_to_uint32_v2,                    \
+                .bitmap_filter_not_in = bitmap_filter_not_in};                 
                    \
+        auto wrapper = std::make_shared<RuntimeFilterWrapper>(&params);        
                    \
+        PMergeFilterRequest valid_request;                                     
                    \
+        auto* in_filter = valid_request.mutable_in_filter();                   
                    \
+        in_filter->set_column_type(to_proto(column_return_type));              
                    \
+        
get_convertor<PrimitiveTypeTraits<column_return_type>::CppType>()(in_filter->add_values(),
 \
+                                                                          
value1);                 \
+        
get_convertor<PrimitiveTypeTraits<column_return_type>::CppType>()(in_filter->add_values(),
 \
+                                                                          
value2);                 \
+        valid_request.set_contain_null(false);                                 
                    \
+        valid_request.set_filter_type(PFilterType::IN_FILTER);                 
                    \
+        EXPECT_TRUE(wrapper->assign(valid_request, nullptr).ok());             
                    \
+        EXPECT_EQ(wrapper->get_state(), RuntimeFilterWrapper::State::READY);   
                    \
+        EXPECT_EQ(wrapper->hybrid_set()->size(), 2);                           
                    \
+    }
+
+#define APPLY_FOR_PRIMITIVE_BASE_TYPE(TYPE) APPLY_FOR_PRIMITIVE_TYPE(TYPE, 0, 
1)
+
+    APPLY_FOR_PRIMITIVE_BASE_TYPE(TYPE_BOOLEAN);
+    APPLY_FOR_PRIMITIVE_BASE_TYPE(TYPE_TINYINT);
+    APPLY_FOR_PRIMITIVE_BASE_TYPE(TYPE_SMALLINT);
+    APPLY_FOR_PRIMITIVE_BASE_TYPE(TYPE_INT);
+    APPLY_FOR_PRIMITIVE_BASE_TYPE(TYPE_BIGINT);
+    APPLY_FOR_PRIMITIVE_BASE_TYPE(TYPE_LARGEINT);
+    APPLY_FOR_PRIMITIVE_BASE_TYPE(TYPE_FLOAT);
+    APPLY_FOR_PRIMITIVE_BASE_TYPE(TYPE_DOUBLE);
+    APPLY_FOR_PRIMITIVE_BASE_TYPE(TYPE_DATEV2);
+    APPLY_FOR_PRIMITIVE_BASE_TYPE(TYPE_DATETIMEV2);
+    APPLY_FOR_PRIMITIVE_TYPE(TYPE_DATETIME, VecDateTimeValue(0, 3, 0, 0, 0, 
2020, 1, 1),
+                             VecDateTimeValue(0, 3, 0, 0, 0, 2020, 1, 2));
+    APPLY_FOR_PRIMITIVE_TYPE(TYPE_DATE, VecDateTimeValue(0, 2, 0, 0, 0, 2020, 
1, 1),
+                             VecDateTimeValue(0, 2, 0, 0, 0, 2020, 1, 2));
+    APPLY_FOR_PRIMITIVE_TYPE(TYPE_DECIMALV2, DecimalV2Value(1, 1), 
DecimalV2Value(1, 2));
+    APPLY_FOR_PRIMITIVE_BASE_TYPE(TYPE_DECIMAL32);
+    APPLY_FOR_PRIMITIVE_BASE_TYPE(TYPE_DECIMAL64);
+    APPLY_FOR_PRIMITIVE_BASE_TYPE(TYPE_DECIMAL128I);
+    APPLY_FOR_PRIMITIVE_TYPE(TYPE_DECIMAL256, vectorized::Decimal256(0), 
vectorized::Decimal256(1));
+    APPLY_FOR_PRIMITIVE_TYPE(TYPE_VARCHAR, StringRef("1"), StringRef("2"));
+    APPLY_FOR_PRIMITIVE_TYPE(TYPE_CHAR, StringRef("1"), StringRef("2"));
+    APPLY_FOR_PRIMITIVE_TYPE(TYPE_STRING, StringRef("1"), StringRef("2"));
+    APPLY_FOR_PRIMITIVE_BASE_TYPE(TYPE_IPV4);
+    APPLY_FOR_PRIMITIVE_BASE_TYPE(TYPE_IPV6);
 }
 
 } // namespace doris


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org
For additional commands, e-mail: commits-h...@doris.apache.org

Reply via email to