This is an automated email from the ASF dual-hosted git repository.

panxiaolei pushed a commit to branch refactor_rf
in repository https://gitbox.apache.org/repos/asf/doris.git

commit d5817710883a05e611863f43da520699ef3302ce
Author: BiteTheDDDDt <x...@selectdb.com>
AuthorDate: Wed Feb 26 18:18:30 2025 +0800

    some fix and add ut
    
    fix ut
    
    update
    
    update
    
    format
    
    udpate
---
 be/src/runtime/descriptor_helper.h                 |  55 +++++
 be/src/runtime_filter/runtime_filter_merger.h      |  10 +-
 be/src/runtime_filter/runtime_filter_producer.cpp  |   4 +-
 be/src/runtime_filter/runtime_filter_producer.h    |   8 +-
 be/src/runtime_filter/runtime_filter_wrapper.cpp   |   5 +-
 be/src/runtime_filter/runtime_filter_wrapper.h     |   5 +-
 be/test/pipeline/pipeline_test.cpp                 |  32 +--
 be/test/pipeline/thrift_builder.h                  | 245 +++++++++------------
 .../runtime_filter_consumer_test.cpp               |  67 ++++++
 .../runtime_filter/runtime_filter_merger_test.cpp  | 179 +++++++++++++++
 .../runtime_filter_producer_test.cpp               | 165 ++++++++++++++
 be/test/runtime_filter/runtime_filter_test_utils.h |  68 ++++++
 12 files changed, 657 insertions(+), 186 deletions(-)

diff --git a/be/src/runtime/descriptor_helper.h 
b/be/src/runtime/descriptor_helper.h
index 049947478d3..d61d887a5ab 100644
--- a/be/src/runtime/descriptor_helper.h
+++ b/be/src/runtime/descriptor_helper.h
@@ -41,6 +41,21 @@ public:
 
     TDescriptorTable desc_tbl() { return _desc_tbl; }
 
+    TDescriptorTableBuilder& append_slotDescriptors(TSlotDescriptor& desc) {
+        _desc_tbl.slotDescriptors.push_back(desc);
+        return *this;
+    }
+    TDescriptorTableBuilder& append_tupleDescriptors(TTupleDescriptor& desc) {
+        _desc_tbl.tupleDescriptors.push_back(desc);
+        return *this;
+    }
+    TDescriptorTableBuilder& append_tableDescriptors(TTableDescriptor& desc) {
+        _desc_tbl.tableDescriptors.push_back(desc);
+        return *this;
+    }
+
+    TDescriptorTable& build() { return _desc_tbl; }
+
 private:
     TSlotId _next_slot_id = 0;
     TTupleId _next_tuple_id = 0;
@@ -96,6 +111,39 @@ public:
     }
     TSlotDescriptor build() { return _slot_desc; }
 
+    TSlotDescriptorBuilder& set_id(TTupleId id) {
+        _slot_desc.id = id;
+        return *this;
+    }
+    TSlotDescriptorBuilder& set_parent(TTupleDescriptor& parent) {
+        _slot_desc.parent = parent.id;
+        return *this;
+    }
+    TSlotDescriptorBuilder& set_slotType(TTypeDesc& slotType) {
+        _slot_desc.slotType = slotType;
+        return *this;
+    }
+    TSlotDescriptorBuilder& set_nullIndicatorBit(int nullIndicatorBit) {
+        _slot_desc.nullIndicatorBit = nullIndicatorBit;
+        return *this;
+    }
+    TSlotDescriptorBuilder& set_byteOffset(int byteOffset) {
+        _slot_desc.byteOffset = byteOffset;
+        return *this;
+    }
+    TSlotDescriptorBuilder& set_slotIdx(int slotIdx) {
+        _slot_desc.slotIdx = slotIdx;
+        return *this;
+    }
+    TSlotDescriptorBuilder& set_isMaterialized(bool isMaterialized) {
+        _slot_desc.isMaterialized = isMaterialized;
+        return *this;
+    }
+    TSlotDescriptorBuilder& set_colName(std::string colName) {
+        _slot_desc.colName = colName;
+        return *this;
+    }
+
 private:
     friend TTupleDescriptorBuilder;
     TSlotDescriptor _slot_desc;
@@ -136,6 +184,13 @@ public:
         tb->add_tuple(_tuple_desc);
     }
 
+    TTupleDescriptorBuilder& set_id(TTupleId id) {
+        _tuple_desc.id = id;
+        return *this;
+    }
+
+    TTupleDescriptor& build() { return _tuple_desc; }
+
 private:
     TTupleId _tuple_id;
     std::vector<TSlotDescriptor> _slot_descs;
diff --git a/be/src/runtime_filter/runtime_filter_merger.h 
b/be/src/runtime_filter/runtime_filter_merger.h
index fbc8727f0ba..7c4f3ac639c 100644
--- a/be/src/runtime_filter/runtime_filter_merger.h
+++ b/be/src/runtime_filter/runtime_filter_merger.h
@@ -20,6 +20,7 @@
 #include "runtime_filter/runtime_filter.h"
 #include "runtime_filter/runtime_filter_definitions.h"
 #include "vec/exprs/vexpr.h"
+#include "vec/exprs/vexpr_context.h"
 
 namespace doris {
 // The merger is divided into local merger and global merger
@@ -77,17 +78,18 @@ public:
     }
 
     void set_expected_producer_num(int num) {
+        DCHECK_EQ(_received_producer_num, 0);
+        DCHECK_EQ(_received_rf_size_num, 0);
         _expected_producer_num = num;
         _profile->add_info_string("Info", debug_string());
     }
 
     bool add_rf_size(uint64_t size) {
         _received_rf_size_num++;
-        _received_sum_size += size;
         if (_expected_producer_num < _received_rf_size_num) {
-            return Status::InternalError(
-                    "runtime filter merger input product size more than 
expected, {}",
-                    debug_string());
+            throw Exception(ErrorCode::INTERNAL_ERROR,
+                            "runtime filter merger input product size more 
than expected, {}",
+                            debug_string());
         }
         _received_sum_size += size;
         _profile->add_info_string("Info", debug_string());
diff --git a/be/src/runtime_filter/runtime_filter_producer.cpp 
b/be/src/runtime_filter/runtime_filter_producer.cpp
index b7ddf75fb9d..3f91c000a48 100644
--- a/be/src/runtime_filter/runtime_filter_producer.cpp
+++ b/be/src/runtime_filter/runtime_filter_producer.cpp
@@ -113,7 +113,7 @@ class SyncSizeClosure : public 
AutoReleaseClosure<PSendFilterSizeRequest,
             return;
         }
 
-        wrapper->disable(cntl_->ErrorText());
+        wrapper->set_state(RuntimeFilterWrapper::State::DISABLED, 
cntl_->ErrorText());
         Base::_process_if_rpc_failed();
     }
 
@@ -128,7 +128,7 @@ class SyncSizeClosure : public 
AutoReleaseClosure<PSendFilterSizeRequest,
             return;
         }
 
-        wrapper->disable(status.to_string());
+        wrapper->set_state(RuntimeFilterWrapper::State::DISABLED, 
status.to_string());
     }
 
 public:
diff --git a/be/src/runtime_filter/runtime_filter_producer.h 
b/be/src/runtime_filter/runtime_filter_producer.h
index ba3fea90816..1bb7ac6530a 100644
--- a/be/src/runtime_filter/runtime_filter_producer.h
+++ b/be/src/runtime_filter/runtime_filter_producer.h
@@ -27,7 +27,7 @@ namespace doris {
 // Work on (hash/corss) join build sink node, RuntimeFilterProducerHelper will 
manage all RuntimeFilterProducer
 // Used to generate specific predicate and publish it to consumer/merger
 /**
- * init -> send_size -> insert -> publish
+ * send_size -> init -> insert -> publish
  */
 class RuntimeFilterProducer : public RuntimeFilter {
 public:
@@ -148,14 +148,14 @@ private:
 
     Status _init_with_desc(const TRuntimeFilterDesc* desc, const 
TQueryOptions* options) override {
         RETURN_IF_ERROR(RuntimeFilter::_init_with_desc(desc, options));
-        bool need_sync_filter_size = _wrapper->build_bf_by_runtime_size() && 
!_is_broadcast_join;
-        _rf_state = need_sync_filter_size ? State::WAITING_FOR_SEND_SIZE : 
State::WAITING_FOR_DATA;
+        _need_sync_filter_size = _wrapper->build_bf_by_runtime_size() && 
!_is_broadcast_join;
+        _rf_state = _need_sync_filter_size ? State::WAITING_FOR_SEND_SIZE : 
State::WAITING_FOR_DATA;
         _profile->add_info_string("Info", debug_string());
         return Status::OK();
     }
 
     const bool _is_broadcast_join;
-    const bool _need_sync_filter_size = false;
+    bool _need_sync_filter_size = false;
 
     int64_t _synced_size = -1;
     std::shared_ptr<pipeline::CountedFinishDependency> _dependency;
diff --git a/be/src/runtime_filter/runtime_filter_wrapper.cpp 
b/be/src/runtime_filter/runtime_filter_wrapper.cpp
index fa8acd7ab0b..f07934de6a7 100644
--- a/be/src/runtime_filter/runtime_filter_wrapper.cpp
+++ b/be/src/runtime_filter/runtime_filter_wrapper.cpp
@@ -151,7 +151,7 @@ bool RuntimeFilterWrapper::build_bf_by_runtime_size() const 
{
 
 Status RuntimeFilterWrapper::merge(const RuntimeFilterWrapper* other) {
     if (other->_state == State::DISABLED) {
-        disable(other->_disabled_reason);
+        set_state(State::DISABLED, other->_disabled_reason);
     }
 
     if (other->_state == State::IGNORED || _state == State::DISABLED) {
@@ -159,6 +159,7 @@ Status RuntimeFilterWrapper::merge(const 
RuntimeFilterWrapper* other) {
     }
 
     DCHECK(_state != State::IGNORED);
+    DCHECK(other->_state == State::READY);
 
     set_state(State::READY);
 
@@ -168,7 +169,7 @@ Status RuntimeFilterWrapper::merge(const 
RuntimeFilterWrapper* other) {
     case RuntimeFilterType::IN_FILTER: {
         _hybrid_set->insert(other->_hybrid_set.get());
         if (_max_in_num >= 0 && _hybrid_set->size() >= _max_in_num) {
-            disable(fmt::format("reach max in num: {}", _max_in_num));
+            set_state(State::DISABLED, fmt::format("reach max in num: {}", 
_max_in_num));
         }
         break;
     }
diff --git a/be/src/runtime_filter/runtime_filter_wrapper.h 
b/be/src/runtime_filter/runtime_filter_wrapper.h
index 12ef6828dda..98de97ea0c4 100644
--- a/be/src/runtime_filter/runtime_filter_wrapper.h
+++ b/be/src/runtime_filter/runtime_filter_wrapper.h
@@ -57,12 +57,12 @@ public:
         PFilterType filter_type = request.filter_type();
 
         if (request.has_disabled() && request.disabled()) {
-            disable("get disabled from remote");
+            set_state(State::DISABLED, "get disabled from remote");
             return Status::OK();
         }
 
         if (request.has_ignored() && request.ignored()) {
-            set_state(State::IGNORED);
+            set_state(State::IGNORED, "get ignored from remote");
             return Status::OK();
         }
 
@@ -122,7 +122,6 @@ public:
         }
         _state = state;
     }
-    void disable(std::string reason) { set_state(State::DISABLED, reason); }
     State get_state() const { return _state; }
     void check_state(std::vector<State> assumed_states) const {
         if (!check_state_impl<RuntimeFilterWrapper>(_state, assumed_states)) {
diff --git a/be/test/pipeline/pipeline_test.cpp 
b/be/test/pipeline/pipeline_test.cpp
index e95abce1cd9..bd522621360 100644
--- a/be/test/pipeline/pipeline_test.cpp
+++ b/be/test/pipeline/pipeline_test.cpp
@@ -30,6 +30,7 @@
 #include "pipeline/exec/hashjoin_build_sink.h"
 #include "pipeline/exec/hashjoin_probe_operator.h"
 #include "pipeline/pipeline_fragment_context.h"
+#include "runtime/descriptor_helper.h"
 #include "runtime/exec_env.h"
 #include "runtime/fragment_mgr.h"
 #include "runtime_filter/runtime_filter_definitions.h"
@@ -772,33 +773,10 @@ TEST_F(PipelineTest, PLAN_HASH_JOIN) {
                                                     
.set_slot_ref(TSlotRefBuilder(1, 1).build())
                                                     .build())
                                     .build())
-                    .append_runtime_filters(
-                            TRuntimeFilterDescBuilder(
-                                    0,
-                                    TExprBuilder()
-                                            .append_nodes(
-                                                    TExprNodeBuilder(
-                                                            
TExprNodeType::SLOT_REF,
-                                                            TTypeDescBuilder()
-                                                                    .set_types(
-                                                                            
TTypeNodeBuilder()
-                                                                               
     .set_type(
-                                                                               
             TTypeNodeType::
-                                                                               
                     SCALAR)
-                                                                               
     .set_scalar_type(
-                                                                               
             TPrimitiveType::
-                                                                               
                     INT)
-                                                                               
     .build())
-                                                                    .build(),
-                                                            0)
-                                                            .set_slot_ref(
-                                                                    
TSlotRefBuilder(1, 1).build())
-                                                            .build())
-                                            .build(),
-                                    0, std::map<TPlanNodeId, TExpr> {})
-                                    .set_bloom_filter_size_bytes(1048576)
-                                    .set_build_bf_by_runtime_size(false)
-                                    .build())
+                    .append_runtime_filters(TRuntimeFilterDescBuilder()
+                                                    
.set_bloom_filter_size_bytes(1048576)
+                                                    
.set_build_bf_by_runtime_size(false)
+                                                    .build())
                     .build();
 
     {
diff --git a/be/test/pipeline/thrift_builder.h 
b/be/test/pipeline/thrift_builder.h
index e7ed50d5718..b1555a18c42 100644
--- a/be/test/pipeline/thrift_builder.h
+++ b/be/test/pipeline/thrift_builder.h
@@ -27,7 +27,21 @@
 #include "udf/udf.h"
 #include "vec/sink/writer/vhive_utils.h"
 
-namespace doris::pipeline {
+namespace doris {
+
+#define FAIL_IF_ERROR_OR_CATCH_EXCEPTION(stmt)                  \
+    do {                                                        \
+        try {                                                   \
+            {                                                   \
+                Status _status_ = (stmt);                       \
+                if (UNLIKELY(!_status_.ok())) {                 \
+                    EXPECT_TRUE(false) << _status_.to_string(); \
+                }                                               \
+            }                                                   \
+        } catch (const doris::Exception& e) {                   \
+            EXPECT_TRUE(false) << e.what();                     \
+        }                                                       \
+    } while (0)
 
 class TQueryOptionsBuilder {
 public:
@@ -148,56 +162,6 @@ public:
 private:
     TPlanNode _plan_node;
 };
-
-class TRuntimeFilterDescBuilder {
-public:
-    explicit TRuntimeFilterDescBuilder(
-            int filter_id, TExpr& src_expr, int expr_order,
-            std::map<TPlanNodeId, TExpr> planId_to_target_expr, bool 
is_broadcast_join = false,
-            bool has_local_targets = true, bool has_remote_targets = false,
-            TRuntimeFilterType::type type = TRuntimeFilterType::IN_OR_BLOOM)
-            : _desc() {
-        _desc.__set_filter_id(filter_id);
-        _desc.__set_src_expr(src_expr);
-        _desc.__set_expr_order(expr_order);
-        _desc.__set_planId_to_target_expr(planId_to_target_expr);
-        _desc.__set_is_broadcast_join(is_broadcast_join);
-        _desc.__set_has_local_targets(has_local_targets);
-        _desc.__set_has_remote_targets(has_remote_targets);
-        _desc.__set_type(type);
-    }
-    explicit TRuntimeFilterDescBuilder(
-            int filter_id, TExpr&& src_expr, int expr_order,
-            std::map<TPlanNodeId, TExpr> planId_to_target_expr, bool 
is_broadcast_join = false,
-            bool has_local_targets = true, bool has_remote_targets = false,
-            TRuntimeFilterType::type type = TRuntimeFilterType::IN_OR_BLOOM)
-            : _desc() {
-        _desc.__set_filter_id(filter_id);
-        _desc.__set_src_expr(src_expr);
-        _desc.__set_expr_order(expr_order);
-        _desc.__set_planId_to_target_expr(planId_to_target_expr);
-        _desc.__set_is_broadcast_join(is_broadcast_join);
-        _desc.__set_has_local_targets(has_local_targets);
-        _desc.__set_has_remote_targets(has_remote_targets);
-        _desc.__set_type(type);
-    }
-
-    TRuntimeFilterDescBuilder& set_bloom_filter_size_bytes(int64_t 
bloom_filter_size_bytes) {
-        _desc.__set_bloom_filter_size_bytes(bloom_filter_size_bytes);
-        return *this;
-    }
-    TRuntimeFilterDescBuilder& set_build_bf_by_runtime_size(bool 
build_bf_by_runtime_size) {
-        _desc.__set_build_bf_by_runtime_size(build_bf_by_runtime_size);
-        return *this;
-    }
-    TRuntimeFilterDesc& build() { return _desc; }
-    TRuntimeFilterDescBuilder(const TRuntimeFilterDescBuilder&) = delete;
-    void operator=(const TRuntimeFilterDescBuilder&) = delete;
-
-private:
-    TRuntimeFilterDesc _desc;
-};
-
 class TExchangeNodeBuilder {
 public:
     explicit TExchangeNodeBuilder() : _plan_node() {}
@@ -271,73 +235,6 @@ private:
     TPlanFragmentDestination _dest;
 };
 
-class TTupleDescriptorBuilder {
-public:
-    explicit TTupleDescriptorBuilder() : _desc() {
-        _desc.byteSize = 0;
-        _desc.numNullBytes = 0;
-    }
-
-    TTupleDescriptorBuilder& set_id(TTupleId id) {
-        _desc.id = id;
-        return *this;
-    }
-
-    TTupleDescriptor& build() { return _desc; }
-    TTupleDescriptorBuilder(const TTupleDescriptorBuilder&) = delete;
-    void operator=(const TTupleDescriptorBuilder&) = delete;
-
-private:
-    TTupleDescriptor _desc;
-};
-
-class TSlotDescriptorBuilder {
-public:
-    explicit TSlotDescriptorBuilder() : _desc() {
-        _desc.columnPos = -1;
-        _desc.nullIndicatorByte = 0;
-    }
-    TSlotDescriptorBuilder& set_id(TTupleId id) {
-        _desc.id = id;
-        return *this;
-    }
-    TSlotDescriptorBuilder& set_parent(TTupleDescriptor& parent) {
-        _desc.parent = parent.id;
-        return *this;
-    }
-    TSlotDescriptorBuilder& set_slotType(TTypeDesc& slotType) {
-        _desc.slotType = slotType;
-        return *this;
-    }
-    TSlotDescriptorBuilder& set_nullIndicatorBit(int nullIndicatorBit) {
-        _desc.nullIndicatorBit = nullIndicatorBit;
-        return *this;
-    }
-    TSlotDescriptorBuilder& set_byteOffset(int byteOffset) {
-        _desc.byteOffset = byteOffset;
-        return *this;
-    }
-    TSlotDescriptorBuilder& set_slotIdx(int slotIdx) {
-        _desc.slotIdx = slotIdx;
-        return *this;
-    }
-    TSlotDescriptorBuilder& set_isMaterialized(bool isMaterialized) {
-        _desc.isMaterialized = isMaterialized;
-        return *this;
-    }
-    TSlotDescriptorBuilder& set_colName(std::string colName) {
-        _desc.colName = colName;
-        return *this;
-    }
-
-    TSlotDescriptor& build() { return _desc; }
-    TSlotDescriptorBuilder(const TSlotDescriptorBuilder&) = delete;
-    void operator=(const TSlotDescriptorBuilder&) = delete;
-
-private:
-    TSlotDescriptor _desc;
-};
-
 class TTypeDescBuilder {
 public:
     explicit TTypeDescBuilder() : _desc() {
@@ -386,31 +283,6 @@ private:
     TTypeNode _desc;
 };
 
-class TDescriptorTableBuilder {
-public:
-    explicit TDescriptorTableBuilder() : _desc() {}
-
-    TDescriptorTableBuilder& append_slotDescriptors(TSlotDescriptor& desc) {
-        _desc.slotDescriptors.push_back(desc);
-        return *this;
-    }
-    TDescriptorTableBuilder& append_tupleDescriptors(TTupleDescriptor& desc) {
-        _desc.tupleDescriptors.push_back(desc);
-        return *this;
-    }
-    TDescriptorTableBuilder& append_tableDescriptors(TTableDescriptor& desc) {
-        _desc.tableDescriptors.push_back(desc);
-        return *this;
-    }
-
-    TDescriptorTable& build() { return _desc; }
-    TDescriptorTableBuilder(const TDescriptorTableBuilder&) = delete;
-    void operator=(const TDescriptorTableBuilder&) = delete;
-
-private:
-    TDescriptorTable _desc;
-};
-
 class TDataPartitionBuilder {
 public:
     explicit TDataPartitionBuilder(TPartitionType::type type) : _partition() {
@@ -575,4 +447,89 @@ private:
     TRuntimeFilterParams _params;
 };
 
-} // namespace doris::pipeline
+class TRuntimeFilterDescBuilder {
+public:
+    static TExpr get_default_expr() {
+        return TExprBuilder()
+                .append_nodes(
+                        TExprNodeBuilder(
+                                TExprNodeType::SLOT_REF,
+                                TTypeDescBuilder()
+                                        .set_types(TTypeNodeBuilder()
+                                                           
.set_type(TTypeNodeType::SCALAR)
+                                                           
.set_scalar_type(TPrimitiveType::INT)
+                                                           .build())
+                                        .build(),
+                                0)
+                                .set_slot_ref(TSlotRefBuilder(1, 1).build())
+                                .build())
+                .build();
+    }
+
+    explicit TRuntimeFilterDescBuilder(
+            int filter_id, TExpr& src_expr, int expr_order,
+            std::map<TPlanNodeId, TExpr> planId_to_target_expr, bool 
is_broadcast_join = false,
+            bool has_local_targets = true, bool has_remote_targets = false,
+            TRuntimeFilterType::type type = TRuntimeFilterType::IN_OR_BLOOM) {
+        _desc.__set_filter_id(filter_id);
+        _desc.__set_src_expr(src_expr);
+        _desc.__set_expr_order(expr_order);
+        _desc.__set_planId_to_target_expr(planId_to_target_expr);
+        _desc.__set_is_broadcast_join(is_broadcast_join);
+        _desc.__set_has_local_targets(has_local_targets);
+        _desc.__set_has_remote_targets(has_remote_targets);
+        _desc.__set_type(type);
+    }
+    explicit TRuntimeFilterDescBuilder(
+            int filter_id = 0, TExpr src_expr = get_default_expr(), int 
expr_order = 0,
+            std::map<TPlanNodeId, TExpr> planId_to_target_expr = 
std::map<TPlanNodeId, TExpr> {}) {
+        _desc.__set_filter_id(filter_id);
+        _desc.__set_src_expr(src_expr);
+        _desc.__set_expr_order(expr_order);
+        _desc.__set_planId_to_target_expr(planId_to_target_expr);
+        _desc.__set_is_broadcast_join(false);
+        _desc.__set_has_local_targets(true);
+        _desc.__set_has_remote_targets(false);
+        _desc.__set_type(TRuntimeFilterType::IN_OR_BLOOM);
+    }
+
+    TRuntimeFilterDescBuilder& set_bloom_filter_size_bytes(int64_t 
bloom_filter_size_bytes) {
+        _desc.__set_bloom_filter_size_bytes(bloom_filter_size_bytes);
+        return *this;
+    }
+
+    TRuntimeFilterDescBuilder& set_is_broadcast_join(bool is_broadcast_join) {
+        _desc.__set_is_broadcast_join(is_broadcast_join);
+        return *this;
+    }
+
+    TRuntimeFilterDescBuilder& set_mode(bool local) {
+        _desc.__set_has_local_targets(local);
+        _desc.__set_has_remote_targets(!local);
+        return *this;
+    }
+
+    TRuntimeFilterDescBuilder& add_planId_to_target_expr(int node_id,
+                                                         TExpr expr = 
get_default_expr()) {
+        _desc.planId_to_target_expr[node_id] = expr;
+        return *this;
+    }
+
+    TRuntimeFilterDescBuilder& set_type(TRuntimeFilterType::type type) {
+        _desc.__set_type(type);
+        return *this;
+    }
+
+    TRuntimeFilterDescBuilder& set_build_bf_by_runtime_size(bool 
build_bf_by_runtime_size) {
+        _desc.__set_build_bf_by_runtime_size(build_bf_by_runtime_size);
+        return *this;
+    }
+    TRuntimeFilterDesc& build() { return _desc; }
+    TRuntimeFilterDescBuilder(const TRuntimeFilterDescBuilder&) = delete;
+    void operator=(const TRuntimeFilterDescBuilder&) = delete;
+
+private:
+    TRuntimeFilterDesc _desc;
+};
+
+} // namespace doris
diff --git a/be/test/runtime_filter/runtime_filter_consumer_test.cpp 
b/be/test/runtime_filter/runtime_filter_consumer_test.cpp
new file mode 100644
index 00000000000..617ce3fdbb0
--- /dev/null
+++ b/be/test/runtime_filter/runtime_filter_consumer_test.cpp
@@ -0,0 +1,67 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#include "runtime_filter/runtime_filter_consumer.h"
+
+#include <glog/logging.h>
+#include <gtest/gtest.h>
+
+#include "runtime_filter/runtime_filter_producer.h"
+#include "runtime_filter/runtime_filter_test_utils.h"
+
+namespace doris {
+
+class RuntimeFilterConsumerTest : public RuntimeFilterTest {};
+
+TEST_F(RuntimeFilterConsumerTest, basic) {
+    std::shared_ptr<RuntimeFilterConsumer> consumer;
+    auto desc = 
TRuntimeFilterDescBuilder().add_planId_to_target_expr(0).build();
+    FAIL_IF_ERROR_OR_CATCH_EXCEPTION(RuntimeFilterConsumer::create(
+            RuntimeFilterParamsContext::create(_query_ctx.get()), &desc, 0, 
&consumer, &_profile));
+
+    std::shared_ptr<RuntimeFilterConsumer> registed_consumer;
+    
FAIL_IF_ERROR_OR_CATCH_EXCEPTION(_runtime_states[1]->register_consumer_runtime_filter(
+            desc, true, 0, &registed_consumer, &_profile));
+}
+
+TEST_F(RuntimeFilterConsumerTest, signal_aquire) {
+    std::shared_ptr<RuntimeFilterConsumer> consumer;
+    auto desc = 
TRuntimeFilterDescBuilder().add_planId_to_target_expr(0).build();
+    FAIL_IF_ERROR_OR_CATCH_EXCEPTION(RuntimeFilterConsumer::create(
+            RuntimeFilterParamsContext::create(_query_ctx.get()), &desc, 0, 
&consumer, &_profile));
+
+    std::shared_ptr<RuntimeFilterProducer> producer;
+    FAIL_IF_ERROR_OR_CATCH_EXCEPTION(RuntimeFilterProducer::create(
+            RuntimeFilterParamsContext::create(_query_ctx.get()), &desc, 
&producer, &_profile));
+    
producer->set_wrapper_state_and_ready_to_publish(RuntimeFilterWrapper::State::READY);
+
+    consumer->signal(producer.get());
+
+    try {
+        consumer->signal(producer.get());
+        ASSERT_TRUE(false);
+    } catch (const Exception& e) {
+        ASSERT_EQ(e.code(), ErrorCode::INTERNAL_ERROR);
+    }
+
+    std::vector<vectorized::VRuntimeFilterPtr> push_exprs;
+    FAIL_IF_ERROR_OR_CATCH_EXCEPTION(consumer->acquire_expr(push_exprs));
+    ASSERT_EQ(push_exprs.size(), 1);
+    ASSERT_TRUE(consumer->is_applied());
+}
+
+} // namespace doris
diff --git a/be/test/runtime_filter/runtime_filter_merger_test.cpp 
b/be/test/runtime_filter/runtime_filter_merger_test.cpp
new file mode 100644
index 00000000000..768284879bc
--- /dev/null
+++ b/be/test/runtime_filter/runtime_filter_merger_test.cpp
@@ -0,0 +1,179 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#include "runtime_filter/runtime_filter_merger.h"
+
+#include <glog/logging.h>
+#include <gtest/gtest.h>
+
+#include "runtime_filter/runtime_filter_producer.h"
+#include "runtime_filter/runtime_filter_test_utils.h"
+
+namespace doris {
+
+class RuntimeFilterMergerTest : public RuntimeFilterTest {
+public:
+    void test_merge_from(RuntimeFilterWrapper::State first_product_state,
+                         RuntimeFilterWrapper::State first_expected_state,
+                         RuntimeFilterWrapper::State second_product_state,
+                         RuntimeFilterWrapper::State second_expected_state) {
+        std::shared_ptr<RuntimeFilterMerger> merger;
+        auto desc = TRuntimeFilterDescBuilder().build();
+        FAIL_IF_ERROR_OR_CATCH_EXCEPTION(RuntimeFilterMerger::create(
+                RuntimeFilterParamsContext::create(_query_ctx.get()), &desc, 
&merger, &_profile));
+        merger->set_expected_producer_num(2);
+        ASSERT_FALSE(merger->ready());
+        ASSERT_EQ(merger->_wrapper->_state, 
RuntimeFilterWrapper::State::IGNORED);
+
+        std::shared_ptr<RuntimeFilterProducer> producer;
+        FAIL_IF_ERROR_OR_CATCH_EXCEPTION(
+                _runtime_states[0]->register_producer_runtime_filter(desc, 
&producer, &_profile));
+        producer->set_wrapper_state_and_ready_to_publish(first_product_state);
+        FAIL_IF_ERROR_OR_CATCH_EXCEPTION(merger->merge_from(producer.get()));
+        ASSERT_FALSE(merger->ready());
+        ASSERT_EQ(merger->_wrapper->_state, first_expected_state);
+
+        std::shared_ptr<RuntimeFilterProducer> producer2;
+        FAIL_IF_ERROR_OR_CATCH_EXCEPTION(
+                _runtime_states[1]->register_producer_runtime_filter(desc, 
&producer2, &_profile));
+        
producer2->set_wrapper_state_and_ready_to_publish(second_product_state);
+        FAIL_IF_ERROR_OR_CATCH_EXCEPTION(merger->merge_from(producer2.get()));
+        ASSERT_TRUE(merger->ready());
+        ASSERT_EQ(merger->_wrapper->_state, second_expected_state);
+    }
+
+    void test_serialize(RuntimeFilterWrapper::State state) {
+        std::shared_ptr<RuntimeFilterMerger> merger;
+        auto desc = TRuntimeFilterDescBuilder().build();
+        FAIL_IF_ERROR_OR_CATCH_EXCEPTION(RuntimeFilterMerger::create(
+                RuntimeFilterParamsContext::create(_query_ctx.get()), &desc, 
&merger, &_profile));
+        merger->set_expected_producer_num(1);
+        ASSERT_FALSE(merger->ready());
+
+        std::shared_ptr<RuntimeFilterProducer> producer;
+        FAIL_IF_ERROR_OR_CATCH_EXCEPTION(
+                _runtime_states[0]->register_producer_runtime_filter(desc, 
&producer, &_profile));
+        producer->set_wrapper_state_and_ready_to_publish(state);
+        FAIL_IF_ERROR_OR_CATCH_EXCEPTION(merger->merge_from(producer.get()));
+        ASSERT_TRUE(merger->ready());
+
+        PMergeFilterRequest request;
+        void* data = nullptr;
+        int len = 0;
+        FAIL_IF_ERROR_OR_CATCH_EXCEPTION(merger->serialize(&request, &data, 
&len));
+
+        std::shared_ptr<RuntimeFilterMerger> deserialized_merger;
+        FAIL_IF_ERROR_OR_CATCH_EXCEPTION(
+                
RuntimeFilterMerger::create(RuntimeFilterParamsContext::create(_query_ctx.get()),
+                                            &desc, &deserialized_merger, 
&_profile));
+        FAIL_IF_ERROR_OR_CATCH_EXCEPTION(deserialized_merger->assign(request, 
nullptr));
+        ASSERT_EQ(merger->_wrapper->_state, state);
+    }
+};
+
+TEST_F(RuntimeFilterMergerTest, basic) {
+    test_merge_from(RuntimeFilterWrapper::State::READY, 
RuntimeFilterWrapper::State::READY,
+                    RuntimeFilterWrapper::State::READY, 
RuntimeFilterWrapper::State::READY);
+}
+
+TEST_F(RuntimeFilterMergerTest, add_rf_size) {
+    std::shared_ptr<RuntimeFilterMerger> merger;
+    auto desc = TRuntimeFilterDescBuilder().build();
+    FAIL_IF_ERROR_OR_CATCH_EXCEPTION(RuntimeFilterMerger::create(
+            RuntimeFilterParamsContext::create(_query_ctx.get()), &desc, 
&merger, &_profile));
+    merger->set_expected_producer_num(2);
+
+    ASSERT_FALSE(merger->add_rf_size(123));
+    ASSERT_TRUE(merger->add_rf_size(1));
+    ASSERT_EQ(merger->get_received_sum_size(), 124);
+    ASSERT_FALSE(merger->ready());
+
+    try {
+        ASSERT_TRUE(merger->add_rf_size(1));
+        ASSERT_TRUE(false);
+    } catch (const Exception& e) {
+        ASSERT_EQ(e.code(), ErrorCode::INTERNAL_ERROR);
+    }
+}
+
+TEST_F(RuntimeFilterMergerTest, invalid_merge) {
+    std::shared_ptr<RuntimeFilterMerger> merger;
+    auto desc = TRuntimeFilterDescBuilder().build();
+    FAIL_IF_ERROR_OR_CATCH_EXCEPTION(RuntimeFilterMerger::create(
+            RuntimeFilterParamsContext::create(_query_ctx.get()), &desc, 
&merger, &_profile));
+    merger->set_expected_producer_num(1);
+    ASSERT_FALSE(merger->ready());
+    ASSERT_EQ(merger->_wrapper->_state, RuntimeFilterWrapper::State::IGNORED);
+
+    std::shared_ptr<RuntimeFilterProducer> producer;
+    FAIL_IF_ERROR_OR_CATCH_EXCEPTION(
+            _runtime_states[0]->register_producer_runtime_filter(desc, 
&producer, &_profile));
+    
producer->set_wrapper_state_and_ready_to_publish(RuntimeFilterWrapper::State::READY);
+    FAIL_IF_ERROR_OR_CATCH_EXCEPTION(merger->merge_from(producer.get())); // 
ready wrapper
+    ASSERT_EQ(merger->_wrapper->_state, RuntimeFilterWrapper::State::READY);
+
+    std::shared_ptr<RuntimeFilterProducer> producer2;
+    FAIL_IF_ERROR_OR_CATCH_EXCEPTION(
+            _runtime_states[1]->register_producer_runtime_filter(desc, 
&producer2, &_profile));
+    
producer2->set_wrapper_state_and_ready_to_publish(RuntimeFilterWrapper::State::READY);
+    auto st = merger->merge_from(producer2.get());
+    ASSERT_EQ(st.code(), ErrorCode::INTERNAL_ERROR);
+}
+
+TEST_F(RuntimeFilterMergerTest, merge_from_ready_and_ignored) {
+    test_merge_from(RuntimeFilterWrapper::State::READY, 
RuntimeFilterWrapper::State::READY,
+                    RuntimeFilterWrapper::State::IGNORED, 
RuntimeFilterWrapper::State::READY);
+}
+
+TEST_F(RuntimeFilterMergerTest, merge_from_ignored_and_ready) {
+    test_merge_from(RuntimeFilterWrapper::State::IGNORED, 
RuntimeFilterWrapper::State::IGNORED,
+                    RuntimeFilterWrapper::State::READY, 
RuntimeFilterWrapper::State::READY);
+}
+
+TEST_F(RuntimeFilterMergerTest, merge_from_ready_and_disabled) {
+    test_merge_from(RuntimeFilterWrapper::State::READY, 
RuntimeFilterWrapper::State::READY,
+                    RuntimeFilterWrapper::State::DISABLED, 
RuntimeFilterWrapper::State::DISABLED);
+}
+
+TEST_F(RuntimeFilterMergerTest, merge_from_disabled_and_ready) {
+    test_merge_from(RuntimeFilterWrapper::State::DISABLED, 
RuntimeFilterWrapper::State::DISABLED,
+                    RuntimeFilterWrapper::State::READY, 
RuntimeFilterWrapper::State::DISABLED);
+}
+
+TEST_F(RuntimeFilterMergerTest, merge_from_disabled_and_ignored) {
+    test_merge_from(RuntimeFilterWrapper::State::DISABLED, 
RuntimeFilterWrapper::State::DISABLED,
+                    RuntimeFilterWrapper::State::IGNORED, 
RuntimeFilterWrapper::State::DISABLED);
+}
+
+TEST_F(RuntimeFilterMergerTest, merge_from_ignored_and_disabled) {
+    test_merge_from(RuntimeFilterWrapper::State::IGNORED, 
RuntimeFilterWrapper::State::IGNORED,
+                    RuntimeFilterWrapper::State::DISABLED, 
RuntimeFilterWrapper::State::DISABLED);
+}
+
+TEST_F(RuntimeFilterMergerTest, serialize_ready) {
+    test_serialize(RuntimeFilterWrapper::State::READY);
+}
+
+TEST_F(RuntimeFilterMergerTest, serialize_disabled) {
+    test_serialize(RuntimeFilterWrapper::State::DISABLED);
+}
+
+TEST_F(RuntimeFilterMergerTest, serialize_ignored) {
+    test_serialize(RuntimeFilterWrapper::State::IGNORED);
+}
+
+} // namespace doris
diff --git a/be/test/runtime_filter/runtime_filter_producer_test.cpp 
b/be/test/runtime_filter/runtime_filter_producer_test.cpp
new file mode 100644
index 00000000000..ab04391fe7d
--- /dev/null
+++ b/be/test/runtime_filter/runtime_filter_producer_test.cpp
@@ -0,0 +1,165 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#include "runtime_filter/runtime_filter_producer.h"
+
+#include <glog/logging.h>
+#include <gtest/gtest.h>
+
+#include "runtime_filter/runtime_filter_test_utils.h"
+
+namespace doris {
+
+class RuntimeFilterProducerTest : public RuntimeFilterTest {};
+
+TEST_F(RuntimeFilterProducerTest, basic) {
+    std::shared_ptr<RuntimeFilterProducer> producer;
+    auto desc = TRuntimeFilterDescBuilder().build();
+    FAIL_IF_ERROR_OR_CATCH_EXCEPTION(RuntimeFilterProducer::create(
+            RuntimeFilterParamsContext::create(_query_ctx.get()), &desc, 
&producer, &_profile));
+}
+
+TEST_F(RuntimeFilterProducerTest, no_sync_filter_size) {
+    {
+        std::shared_ptr<RuntimeFilterProducer> producer;
+        auto desc = TRuntimeFilterDescBuilder()
+                            .set_build_bf_by_runtime_size(true)
+                            .set_is_broadcast_join(true)
+                            .build();
+        FAIL_IF_ERROR_OR_CATCH_EXCEPTION(RuntimeFilterProducer::create(
+                RuntimeFilterParamsContext::create(_query_ctx.get()), &desc, 
&producer, &_profile));
+        ASSERT_EQ(producer->_need_sync_filter_size, false);
+        ASSERT_EQ(producer->_rf_state, 
RuntimeFilterProducer::State::WAITING_FOR_DATA);
+    }
+    {
+        std::shared_ptr<RuntimeFilterProducer> producer;
+        auto desc = TRuntimeFilterDescBuilder()
+                            .set_build_bf_by_runtime_size(false)
+                            .set_is_broadcast_join(false)
+                            .build();
+        FAIL_IF_ERROR_OR_CATCH_EXCEPTION(RuntimeFilterProducer::create(
+                RuntimeFilterParamsContext::create(_query_ctx.get()), &desc, 
&producer, &_profile));
+        ASSERT_EQ(producer->_need_sync_filter_size, false);
+        ASSERT_EQ(producer->_rf_state, 
RuntimeFilterProducer::State::WAITING_FOR_DATA);
+    }
+}
+
+TEST_F(RuntimeFilterProducerTest, sync_filter_size) {
+    std::shared_ptr<RuntimeFilterProducer> producer;
+    auto desc = TRuntimeFilterDescBuilder()
+                        .set_build_bf_by_runtime_size(true)
+                        .set_is_broadcast_join(false)
+                        .build();
+    FAIL_IF_ERROR_OR_CATCH_EXCEPTION(RuntimeFilterProducer::create(
+            RuntimeFilterParamsContext::create(_query_ctx.get()), &desc, 
&producer, &_profile));
+    ASSERT_EQ(producer->_need_sync_filter_size, true);
+    ASSERT_EQ(producer->_rf_state, 
RuntimeFilterProducer::State::WAITING_FOR_SEND_SIZE);
+
+    
FAIL_IF_ERROR_OR_CATCH_EXCEPTION(producer->send_size(_runtime_states[0].get(), 
100, nullptr));
+    // local mode, single rf get size directly into WAITING_FOR_DATA
+    ASSERT_EQ(producer->_rf_state, 
RuntimeFilterProducer::State::WAITING_FOR_DATA);
+}
+
+TEST_F(RuntimeFilterProducerTest, sync_filter_size_local_no_merge) {
+    std::shared_ptr<RuntimeFilterProducer> producer;
+    auto desc = TRuntimeFilterDescBuilder()
+                        .set_build_bf_by_runtime_size(true)
+                        .set_is_broadcast_join(false)
+                        .build();
+    FAIL_IF_ERROR_OR_CATCH_EXCEPTION(RuntimeFilterProducer::create(
+            RuntimeFilterParamsContext::create(_query_ctx.get()), &desc, 
&producer, &_profile));
+    ASSERT_EQ(producer->_need_sync_filter_size, true);
+    ASSERT_EQ(producer->_rf_state, 
RuntimeFilterProducer::State::WAITING_FOR_SEND_SIZE);
+
+    
FAIL_IF_ERROR_OR_CATCH_EXCEPTION(producer->send_size(_runtime_states[0].get(), 
100, nullptr));
+    // local mode, single rf get size directly into WAITING_FOR_DATA
+    ASSERT_EQ(producer->_rf_state, 
RuntimeFilterProducer::State::WAITING_FOR_DATA);
+}
+
+TEST_F(RuntimeFilterProducerTest, sync_filter_size_local_merge) {
+    auto desc = TRuntimeFilterDescBuilder()
+                        .set_build_bf_by_runtime_size(true)
+                        .set_is_broadcast_join(false)
+                        .add_planId_to_target_expr(0)
+                        .build();
+
+    std::shared_ptr<RuntimeFilterProducer> producer;
+    FAIL_IF_ERROR_OR_CATCH_EXCEPTION(
+            _runtime_states[0]->register_producer_runtime_filter(desc, 
&producer, &_profile));
+    std::shared_ptr<RuntimeFilterProducer> producer2;
+    FAIL_IF_ERROR_OR_CATCH_EXCEPTION(
+            _runtime_states[1]->register_producer_runtime_filter(desc, 
&producer2, &_profile));
+
+    std::shared_ptr<RuntimeFilterConsumer> consumer;
+    
FAIL_IF_ERROR_OR_CATCH_EXCEPTION(_runtime_states[1]->register_consumer_runtime_filter(
+            desc, true, 0, &consumer, &_profile));
+
+    ASSERT_EQ(producer->_need_sync_filter_size, true);
+    ASSERT_EQ(producer->_rf_state, 
RuntimeFilterProducer::State::WAITING_FOR_SEND_SIZE);
+
+    auto dependency = std::make_shared<pipeline::CountedFinishDependency>(0, 
0, "");
+
+    FAIL_IF_ERROR_OR_CATCH_EXCEPTION(
+            producer->send_size(_runtime_states[0].get(), 123, dependency));
+    // global mode, need waitting synced size
+    ASSERT_EQ(producer->_rf_state, 
RuntimeFilterProducer::State::WAITING_FOR_SYNCED_SIZE);
+    ASSERT_FALSE(dependency->ready());
+
+    
FAIL_IF_ERROR_OR_CATCH_EXCEPTION(producer2->send_size(_runtime_states[1].get(), 
1, dependency));
+    ASSERT_EQ(producer->_rf_state, 
RuntimeFilterProducer::State::WAITING_FOR_DATA);
+    ASSERT_EQ(producer->_synced_size, 124);
+    ASSERT_TRUE(dependency->ready());
+}
+
+TEST_F(RuntimeFilterProducerTest, set_ignore_or_disable) {
+    auto desc = TRuntimeFilterDescBuilder()
+                        .set_build_bf_by_runtime_size(true)
+                        .set_is_broadcast_join(false)
+                        .add_planId_to_target_expr(0)
+                        .build();
+
+    std::shared_ptr<RuntimeFilterProducer> producer;
+    FAIL_IF_ERROR_OR_CATCH_EXCEPTION(
+            _runtime_states[0]->register_producer_runtime_filter(desc, 
&producer, &_profile));
+    std::shared_ptr<RuntimeFilterProducer> producer2;
+    FAIL_IF_ERROR_OR_CATCH_EXCEPTION(
+            _runtime_states[1]->register_producer_runtime_filter(desc, 
&producer2, &_profile));
+
+    std::shared_ptr<RuntimeFilterConsumer> consumer;
+    
FAIL_IF_ERROR_OR_CATCH_EXCEPTION(_runtime_states[1]->register_consumer_runtime_filter(
+            desc, true, 0, &consumer, &_profile));
+
+    ASSERT_EQ(producer->_need_sync_filter_size, true);
+    ASSERT_EQ(producer->_rf_state, 
RuntimeFilterProducer::State::WAITING_FOR_SEND_SIZE);
+
+    auto dependency = std::make_shared<pipeline::CountedFinishDependency>(0, 
0, "");
+
+    FAIL_IF_ERROR_OR_CATCH_EXCEPTION(
+            producer->send_size(_runtime_states[0].get(), 123, dependency));
+    // global mode, need waitting synced size
+    ASSERT_EQ(producer->_rf_state, 
RuntimeFilterProducer::State::WAITING_FOR_SYNCED_SIZE);
+
+    
producer->set_wrapper_state_and_ready_to_publish(RuntimeFilterWrapper::State::IGNORED);
+    ASSERT_EQ(producer->_rf_state, 
RuntimeFilterProducer::State::READY_TO_PUBLISH);
+    ASSERT_EQ(producer->_wrapper->_state, 
RuntimeFilterWrapper::State::IGNORED);
+
+    
producer2->set_wrapper_state_and_ready_to_publish(RuntimeFilterWrapper::State::DISABLED);
+    ASSERT_EQ(producer2->_rf_state, 
RuntimeFilterProducer::State::READY_TO_PUBLISH);
+    ASSERT_EQ(producer2->_wrapper->_state, 
RuntimeFilterWrapper::State::DISABLED);
+}
+
+} // namespace doris
diff --git a/be/test/runtime_filter/runtime_filter_test_utils.h 
b/be/test/runtime_filter/runtime_filter_test_utils.h
new file mode 100644
index 00000000000..3141be9d848
--- /dev/null
+++ b/be/test/runtime_filter/runtime_filter_test_utils.h
@@ -0,0 +1,68 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#include <glog/logging.h>
+#include <gtest/gtest.h>
+
+#include "pipeline/thrift_builder.h"
+#include "runtime/query_context.h"
+
+namespace doris {
+
+class RuntimeFilterTest : public testing::Test {
+public:
+    RuntimeFilterTest() = default;
+    ~RuntimeFilterTest() override = default;
+    void SetUp() override {
+        _query_options = 
TQueryOptionsBuilder().set_runtime_filter_max_in_num(15).build();
+        auto fe_address = TNetworkAddress();
+        fe_address.hostname = LOCALHOST;
+        fe_address.port = DUMMY_PORT;
+
+        _query_ctx =
+                QueryContext::create(TUniqueId(), ExecEnv::GetInstance(), 
_query_options,
+                                     fe_address, true, fe_address, 
QuerySource::INTERNAL_FRONTEND);
+        _query_ctx->runtime_filter_mgr()->set_runtime_filter_params(
+                TRuntimeFilterParamsBuilder().build());
+
+        _runtime_states.resize(INSTANCE_NUM);
+        _local_mgrs.resize(INSTANCE_NUM);
+        for (int i = 0; i < INSTANCE_NUM; i++) {
+            _runtime_states[i] = RuntimeState::create_unique(
+                    TUniqueId(), 0, _query_options, _query_ctx->query_globals,
+                    ExecEnv::GetInstance(), _query_ctx.get());
+
+            _local_mgrs[i] = std::make_unique<RuntimeFilterMgr>(
+                    TUniqueId(), 
RuntimeFilterParamsContext::create(_query_ctx.get()),
+                    _query_ctx->query_mem_tracker(), false);
+            _runtime_states[i]->set_runtime_filter_mgr(_local_mgrs[i].get());
+        }
+    }
+    void TearDown() override {}
+
+protected:
+    RuntimeProfile _profile = RuntimeProfile("");
+    std::shared_ptr<QueryContext> _query_ctx;
+    TQueryOptions _query_options;
+    const std::string LOCALHOST = BackendOptions::get_localhost();
+    const int DUMMY_PORT = config::brpc_port;
+    const int INSTANCE_NUM = 2;
+    std::vector<std::unique_ptr<RuntimeState>> _runtime_states;
+    std::vector<std::unique_ptr<RuntimeFilterMgr>> _local_mgrs;
+};
+
+} // namespace doris


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org
For additional commands, e-mail: commits-h...@doris.apache.org


Reply via email to