This is an automated email from the ASF dual-hosted git repository. panxiaolei pushed a commit to branch refactor_rf in repository https://gitbox.apache.org/repos/asf/doris.git
commit d5817710883a05e611863f43da520699ef3302ce Author: BiteTheDDDDt <x...@selectdb.com> AuthorDate: Wed Feb 26 18:18:30 2025 +0800 some fix and add ut fix ut update update format udpate --- be/src/runtime/descriptor_helper.h | 55 +++++ be/src/runtime_filter/runtime_filter_merger.h | 10 +- be/src/runtime_filter/runtime_filter_producer.cpp | 4 +- be/src/runtime_filter/runtime_filter_producer.h | 8 +- be/src/runtime_filter/runtime_filter_wrapper.cpp | 5 +- be/src/runtime_filter/runtime_filter_wrapper.h | 5 +- be/test/pipeline/pipeline_test.cpp | 32 +-- be/test/pipeline/thrift_builder.h | 245 +++++++++------------ .../runtime_filter_consumer_test.cpp | 67 ++++++ .../runtime_filter/runtime_filter_merger_test.cpp | 179 +++++++++++++++ .../runtime_filter_producer_test.cpp | 165 ++++++++++++++ be/test/runtime_filter/runtime_filter_test_utils.h | 68 ++++++ 12 files changed, 657 insertions(+), 186 deletions(-) diff --git a/be/src/runtime/descriptor_helper.h b/be/src/runtime/descriptor_helper.h index 049947478d3..d61d887a5ab 100644 --- a/be/src/runtime/descriptor_helper.h +++ b/be/src/runtime/descriptor_helper.h @@ -41,6 +41,21 @@ public: TDescriptorTable desc_tbl() { return _desc_tbl; } + TDescriptorTableBuilder& append_slotDescriptors(TSlotDescriptor& desc) { + _desc_tbl.slotDescriptors.push_back(desc); + return *this; + } + TDescriptorTableBuilder& append_tupleDescriptors(TTupleDescriptor& desc) { + _desc_tbl.tupleDescriptors.push_back(desc); + return *this; + } + TDescriptorTableBuilder& append_tableDescriptors(TTableDescriptor& desc) { + _desc_tbl.tableDescriptors.push_back(desc); + return *this; + } + + TDescriptorTable& build() { return _desc_tbl; } + private: TSlotId _next_slot_id = 0; TTupleId _next_tuple_id = 0; @@ -96,6 +111,39 @@ public: } TSlotDescriptor build() { return _slot_desc; } + TSlotDescriptorBuilder& set_id(TTupleId id) { + _slot_desc.id = id; + return *this; + } + TSlotDescriptorBuilder& set_parent(TTupleDescriptor& parent) { + _slot_desc.parent = parent.id; + return *this; + } + TSlotDescriptorBuilder& set_slotType(TTypeDesc& slotType) { + _slot_desc.slotType = slotType; + return *this; + } + TSlotDescriptorBuilder& set_nullIndicatorBit(int nullIndicatorBit) { + _slot_desc.nullIndicatorBit = nullIndicatorBit; + return *this; + } + TSlotDescriptorBuilder& set_byteOffset(int byteOffset) { + _slot_desc.byteOffset = byteOffset; + return *this; + } + TSlotDescriptorBuilder& set_slotIdx(int slotIdx) { + _slot_desc.slotIdx = slotIdx; + return *this; + } + TSlotDescriptorBuilder& set_isMaterialized(bool isMaterialized) { + _slot_desc.isMaterialized = isMaterialized; + return *this; + } + TSlotDescriptorBuilder& set_colName(std::string colName) { + _slot_desc.colName = colName; + return *this; + } + private: friend TTupleDescriptorBuilder; TSlotDescriptor _slot_desc; @@ -136,6 +184,13 @@ public: tb->add_tuple(_tuple_desc); } + TTupleDescriptorBuilder& set_id(TTupleId id) { + _tuple_desc.id = id; + return *this; + } + + TTupleDescriptor& build() { return _tuple_desc; } + private: TTupleId _tuple_id; std::vector<TSlotDescriptor> _slot_descs; diff --git a/be/src/runtime_filter/runtime_filter_merger.h b/be/src/runtime_filter/runtime_filter_merger.h index fbc8727f0ba..7c4f3ac639c 100644 --- a/be/src/runtime_filter/runtime_filter_merger.h +++ b/be/src/runtime_filter/runtime_filter_merger.h @@ -20,6 +20,7 @@ #include "runtime_filter/runtime_filter.h" #include "runtime_filter/runtime_filter_definitions.h" #include "vec/exprs/vexpr.h" +#include "vec/exprs/vexpr_context.h" namespace doris { // The merger is divided into local merger and global merger @@ -77,17 +78,18 @@ public: } void set_expected_producer_num(int num) { + DCHECK_EQ(_received_producer_num, 0); + DCHECK_EQ(_received_rf_size_num, 0); _expected_producer_num = num; _profile->add_info_string("Info", debug_string()); } bool add_rf_size(uint64_t size) { _received_rf_size_num++; - _received_sum_size += size; if (_expected_producer_num < _received_rf_size_num) { - return Status::InternalError( - "runtime filter merger input product size more than expected, {}", - debug_string()); + throw Exception(ErrorCode::INTERNAL_ERROR, + "runtime filter merger input product size more than expected, {}", + debug_string()); } _received_sum_size += size; _profile->add_info_string("Info", debug_string()); diff --git a/be/src/runtime_filter/runtime_filter_producer.cpp b/be/src/runtime_filter/runtime_filter_producer.cpp index b7ddf75fb9d..3f91c000a48 100644 --- a/be/src/runtime_filter/runtime_filter_producer.cpp +++ b/be/src/runtime_filter/runtime_filter_producer.cpp @@ -113,7 +113,7 @@ class SyncSizeClosure : public AutoReleaseClosure<PSendFilterSizeRequest, return; } - wrapper->disable(cntl_->ErrorText()); + wrapper->set_state(RuntimeFilterWrapper::State::DISABLED, cntl_->ErrorText()); Base::_process_if_rpc_failed(); } @@ -128,7 +128,7 @@ class SyncSizeClosure : public AutoReleaseClosure<PSendFilterSizeRequest, return; } - wrapper->disable(status.to_string()); + wrapper->set_state(RuntimeFilterWrapper::State::DISABLED, status.to_string()); } public: diff --git a/be/src/runtime_filter/runtime_filter_producer.h b/be/src/runtime_filter/runtime_filter_producer.h index ba3fea90816..1bb7ac6530a 100644 --- a/be/src/runtime_filter/runtime_filter_producer.h +++ b/be/src/runtime_filter/runtime_filter_producer.h @@ -27,7 +27,7 @@ namespace doris { // Work on (hash/corss) join build sink node, RuntimeFilterProducerHelper will manage all RuntimeFilterProducer // Used to generate specific predicate and publish it to consumer/merger /** - * init -> send_size -> insert -> publish + * send_size -> init -> insert -> publish */ class RuntimeFilterProducer : public RuntimeFilter { public: @@ -148,14 +148,14 @@ private: Status _init_with_desc(const TRuntimeFilterDesc* desc, const TQueryOptions* options) override { RETURN_IF_ERROR(RuntimeFilter::_init_with_desc(desc, options)); - bool need_sync_filter_size = _wrapper->build_bf_by_runtime_size() && !_is_broadcast_join; - _rf_state = need_sync_filter_size ? State::WAITING_FOR_SEND_SIZE : State::WAITING_FOR_DATA; + _need_sync_filter_size = _wrapper->build_bf_by_runtime_size() && !_is_broadcast_join; + _rf_state = _need_sync_filter_size ? State::WAITING_FOR_SEND_SIZE : State::WAITING_FOR_DATA; _profile->add_info_string("Info", debug_string()); return Status::OK(); } const bool _is_broadcast_join; - const bool _need_sync_filter_size = false; + bool _need_sync_filter_size = false; int64_t _synced_size = -1; std::shared_ptr<pipeline::CountedFinishDependency> _dependency; diff --git a/be/src/runtime_filter/runtime_filter_wrapper.cpp b/be/src/runtime_filter/runtime_filter_wrapper.cpp index fa8acd7ab0b..f07934de6a7 100644 --- a/be/src/runtime_filter/runtime_filter_wrapper.cpp +++ b/be/src/runtime_filter/runtime_filter_wrapper.cpp @@ -151,7 +151,7 @@ bool RuntimeFilterWrapper::build_bf_by_runtime_size() const { Status RuntimeFilterWrapper::merge(const RuntimeFilterWrapper* other) { if (other->_state == State::DISABLED) { - disable(other->_disabled_reason); + set_state(State::DISABLED, other->_disabled_reason); } if (other->_state == State::IGNORED || _state == State::DISABLED) { @@ -159,6 +159,7 @@ Status RuntimeFilterWrapper::merge(const RuntimeFilterWrapper* other) { } DCHECK(_state != State::IGNORED); + DCHECK(other->_state == State::READY); set_state(State::READY); @@ -168,7 +169,7 @@ Status RuntimeFilterWrapper::merge(const RuntimeFilterWrapper* other) { case RuntimeFilterType::IN_FILTER: { _hybrid_set->insert(other->_hybrid_set.get()); if (_max_in_num >= 0 && _hybrid_set->size() >= _max_in_num) { - disable(fmt::format("reach max in num: {}", _max_in_num)); + set_state(State::DISABLED, fmt::format("reach max in num: {}", _max_in_num)); } break; } diff --git a/be/src/runtime_filter/runtime_filter_wrapper.h b/be/src/runtime_filter/runtime_filter_wrapper.h index 12ef6828dda..98de97ea0c4 100644 --- a/be/src/runtime_filter/runtime_filter_wrapper.h +++ b/be/src/runtime_filter/runtime_filter_wrapper.h @@ -57,12 +57,12 @@ public: PFilterType filter_type = request.filter_type(); if (request.has_disabled() && request.disabled()) { - disable("get disabled from remote"); + set_state(State::DISABLED, "get disabled from remote"); return Status::OK(); } if (request.has_ignored() && request.ignored()) { - set_state(State::IGNORED); + set_state(State::IGNORED, "get ignored from remote"); return Status::OK(); } @@ -122,7 +122,6 @@ public: } _state = state; } - void disable(std::string reason) { set_state(State::DISABLED, reason); } State get_state() const { return _state; } void check_state(std::vector<State> assumed_states) const { if (!check_state_impl<RuntimeFilterWrapper>(_state, assumed_states)) { diff --git a/be/test/pipeline/pipeline_test.cpp b/be/test/pipeline/pipeline_test.cpp index e95abce1cd9..bd522621360 100644 --- a/be/test/pipeline/pipeline_test.cpp +++ b/be/test/pipeline/pipeline_test.cpp @@ -30,6 +30,7 @@ #include "pipeline/exec/hashjoin_build_sink.h" #include "pipeline/exec/hashjoin_probe_operator.h" #include "pipeline/pipeline_fragment_context.h" +#include "runtime/descriptor_helper.h" #include "runtime/exec_env.h" #include "runtime/fragment_mgr.h" #include "runtime_filter/runtime_filter_definitions.h" @@ -772,33 +773,10 @@ TEST_F(PipelineTest, PLAN_HASH_JOIN) { .set_slot_ref(TSlotRefBuilder(1, 1).build()) .build()) .build()) - .append_runtime_filters( - TRuntimeFilterDescBuilder( - 0, - TExprBuilder() - .append_nodes( - TExprNodeBuilder( - TExprNodeType::SLOT_REF, - TTypeDescBuilder() - .set_types( - TTypeNodeBuilder() - .set_type( - TTypeNodeType:: - SCALAR) - .set_scalar_type( - TPrimitiveType:: - INT) - .build()) - .build(), - 0) - .set_slot_ref( - TSlotRefBuilder(1, 1).build()) - .build()) - .build(), - 0, std::map<TPlanNodeId, TExpr> {}) - .set_bloom_filter_size_bytes(1048576) - .set_build_bf_by_runtime_size(false) - .build()) + .append_runtime_filters(TRuntimeFilterDescBuilder() + .set_bloom_filter_size_bytes(1048576) + .set_build_bf_by_runtime_size(false) + .build()) .build(); { diff --git a/be/test/pipeline/thrift_builder.h b/be/test/pipeline/thrift_builder.h index e7ed50d5718..b1555a18c42 100644 --- a/be/test/pipeline/thrift_builder.h +++ b/be/test/pipeline/thrift_builder.h @@ -27,7 +27,21 @@ #include "udf/udf.h" #include "vec/sink/writer/vhive_utils.h" -namespace doris::pipeline { +namespace doris { + +#define FAIL_IF_ERROR_OR_CATCH_EXCEPTION(stmt) \ + do { \ + try { \ + { \ + Status _status_ = (stmt); \ + if (UNLIKELY(!_status_.ok())) { \ + EXPECT_TRUE(false) << _status_.to_string(); \ + } \ + } \ + } catch (const doris::Exception& e) { \ + EXPECT_TRUE(false) << e.what(); \ + } \ + } while (0) class TQueryOptionsBuilder { public: @@ -148,56 +162,6 @@ public: private: TPlanNode _plan_node; }; - -class TRuntimeFilterDescBuilder { -public: - explicit TRuntimeFilterDescBuilder( - int filter_id, TExpr& src_expr, int expr_order, - std::map<TPlanNodeId, TExpr> planId_to_target_expr, bool is_broadcast_join = false, - bool has_local_targets = true, bool has_remote_targets = false, - TRuntimeFilterType::type type = TRuntimeFilterType::IN_OR_BLOOM) - : _desc() { - _desc.__set_filter_id(filter_id); - _desc.__set_src_expr(src_expr); - _desc.__set_expr_order(expr_order); - _desc.__set_planId_to_target_expr(planId_to_target_expr); - _desc.__set_is_broadcast_join(is_broadcast_join); - _desc.__set_has_local_targets(has_local_targets); - _desc.__set_has_remote_targets(has_remote_targets); - _desc.__set_type(type); - } - explicit TRuntimeFilterDescBuilder( - int filter_id, TExpr&& src_expr, int expr_order, - std::map<TPlanNodeId, TExpr> planId_to_target_expr, bool is_broadcast_join = false, - bool has_local_targets = true, bool has_remote_targets = false, - TRuntimeFilterType::type type = TRuntimeFilterType::IN_OR_BLOOM) - : _desc() { - _desc.__set_filter_id(filter_id); - _desc.__set_src_expr(src_expr); - _desc.__set_expr_order(expr_order); - _desc.__set_planId_to_target_expr(planId_to_target_expr); - _desc.__set_is_broadcast_join(is_broadcast_join); - _desc.__set_has_local_targets(has_local_targets); - _desc.__set_has_remote_targets(has_remote_targets); - _desc.__set_type(type); - } - - TRuntimeFilterDescBuilder& set_bloom_filter_size_bytes(int64_t bloom_filter_size_bytes) { - _desc.__set_bloom_filter_size_bytes(bloom_filter_size_bytes); - return *this; - } - TRuntimeFilterDescBuilder& set_build_bf_by_runtime_size(bool build_bf_by_runtime_size) { - _desc.__set_build_bf_by_runtime_size(build_bf_by_runtime_size); - return *this; - } - TRuntimeFilterDesc& build() { return _desc; } - TRuntimeFilterDescBuilder(const TRuntimeFilterDescBuilder&) = delete; - void operator=(const TRuntimeFilterDescBuilder&) = delete; - -private: - TRuntimeFilterDesc _desc; -}; - class TExchangeNodeBuilder { public: explicit TExchangeNodeBuilder() : _plan_node() {} @@ -271,73 +235,6 @@ private: TPlanFragmentDestination _dest; }; -class TTupleDescriptorBuilder { -public: - explicit TTupleDescriptorBuilder() : _desc() { - _desc.byteSize = 0; - _desc.numNullBytes = 0; - } - - TTupleDescriptorBuilder& set_id(TTupleId id) { - _desc.id = id; - return *this; - } - - TTupleDescriptor& build() { return _desc; } - TTupleDescriptorBuilder(const TTupleDescriptorBuilder&) = delete; - void operator=(const TTupleDescriptorBuilder&) = delete; - -private: - TTupleDescriptor _desc; -}; - -class TSlotDescriptorBuilder { -public: - explicit TSlotDescriptorBuilder() : _desc() { - _desc.columnPos = -1; - _desc.nullIndicatorByte = 0; - } - TSlotDescriptorBuilder& set_id(TTupleId id) { - _desc.id = id; - return *this; - } - TSlotDescriptorBuilder& set_parent(TTupleDescriptor& parent) { - _desc.parent = parent.id; - return *this; - } - TSlotDescriptorBuilder& set_slotType(TTypeDesc& slotType) { - _desc.slotType = slotType; - return *this; - } - TSlotDescriptorBuilder& set_nullIndicatorBit(int nullIndicatorBit) { - _desc.nullIndicatorBit = nullIndicatorBit; - return *this; - } - TSlotDescriptorBuilder& set_byteOffset(int byteOffset) { - _desc.byteOffset = byteOffset; - return *this; - } - TSlotDescriptorBuilder& set_slotIdx(int slotIdx) { - _desc.slotIdx = slotIdx; - return *this; - } - TSlotDescriptorBuilder& set_isMaterialized(bool isMaterialized) { - _desc.isMaterialized = isMaterialized; - return *this; - } - TSlotDescriptorBuilder& set_colName(std::string colName) { - _desc.colName = colName; - return *this; - } - - TSlotDescriptor& build() { return _desc; } - TSlotDescriptorBuilder(const TSlotDescriptorBuilder&) = delete; - void operator=(const TSlotDescriptorBuilder&) = delete; - -private: - TSlotDescriptor _desc; -}; - class TTypeDescBuilder { public: explicit TTypeDescBuilder() : _desc() { @@ -386,31 +283,6 @@ private: TTypeNode _desc; }; -class TDescriptorTableBuilder { -public: - explicit TDescriptorTableBuilder() : _desc() {} - - TDescriptorTableBuilder& append_slotDescriptors(TSlotDescriptor& desc) { - _desc.slotDescriptors.push_back(desc); - return *this; - } - TDescriptorTableBuilder& append_tupleDescriptors(TTupleDescriptor& desc) { - _desc.tupleDescriptors.push_back(desc); - return *this; - } - TDescriptorTableBuilder& append_tableDescriptors(TTableDescriptor& desc) { - _desc.tableDescriptors.push_back(desc); - return *this; - } - - TDescriptorTable& build() { return _desc; } - TDescriptorTableBuilder(const TDescriptorTableBuilder&) = delete; - void operator=(const TDescriptorTableBuilder&) = delete; - -private: - TDescriptorTable _desc; -}; - class TDataPartitionBuilder { public: explicit TDataPartitionBuilder(TPartitionType::type type) : _partition() { @@ -575,4 +447,89 @@ private: TRuntimeFilterParams _params; }; -} // namespace doris::pipeline +class TRuntimeFilterDescBuilder { +public: + static TExpr get_default_expr() { + return TExprBuilder() + .append_nodes( + TExprNodeBuilder( + TExprNodeType::SLOT_REF, + TTypeDescBuilder() + .set_types(TTypeNodeBuilder() + .set_type(TTypeNodeType::SCALAR) + .set_scalar_type(TPrimitiveType::INT) + .build()) + .build(), + 0) + .set_slot_ref(TSlotRefBuilder(1, 1).build()) + .build()) + .build(); + } + + explicit TRuntimeFilterDescBuilder( + int filter_id, TExpr& src_expr, int expr_order, + std::map<TPlanNodeId, TExpr> planId_to_target_expr, bool is_broadcast_join = false, + bool has_local_targets = true, bool has_remote_targets = false, + TRuntimeFilterType::type type = TRuntimeFilterType::IN_OR_BLOOM) { + _desc.__set_filter_id(filter_id); + _desc.__set_src_expr(src_expr); + _desc.__set_expr_order(expr_order); + _desc.__set_planId_to_target_expr(planId_to_target_expr); + _desc.__set_is_broadcast_join(is_broadcast_join); + _desc.__set_has_local_targets(has_local_targets); + _desc.__set_has_remote_targets(has_remote_targets); + _desc.__set_type(type); + } + explicit TRuntimeFilterDescBuilder( + int filter_id = 0, TExpr src_expr = get_default_expr(), int expr_order = 0, + std::map<TPlanNodeId, TExpr> planId_to_target_expr = std::map<TPlanNodeId, TExpr> {}) { + _desc.__set_filter_id(filter_id); + _desc.__set_src_expr(src_expr); + _desc.__set_expr_order(expr_order); + _desc.__set_planId_to_target_expr(planId_to_target_expr); + _desc.__set_is_broadcast_join(false); + _desc.__set_has_local_targets(true); + _desc.__set_has_remote_targets(false); + _desc.__set_type(TRuntimeFilterType::IN_OR_BLOOM); + } + + TRuntimeFilterDescBuilder& set_bloom_filter_size_bytes(int64_t bloom_filter_size_bytes) { + _desc.__set_bloom_filter_size_bytes(bloom_filter_size_bytes); + return *this; + } + + TRuntimeFilterDescBuilder& set_is_broadcast_join(bool is_broadcast_join) { + _desc.__set_is_broadcast_join(is_broadcast_join); + return *this; + } + + TRuntimeFilterDescBuilder& set_mode(bool local) { + _desc.__set_has_local_targets(local); + _desc.__set_has_remote_targets(!local); + return *this; + } + + TRuntimeFilterDescBuilder& add_planId_to_target_expr(int node_id, + TExpr expr = get_default_expr()) { + _desc.planId_to_target_expr[node_id] = expr; + return *this; + } + + TRuntimeFilterDescBuilder& set_type(TRuntimeFilterType::type type) { + _desc.__set_type(type); + return *this; + } + + TRuntimeFilterDescBuilder& set_build_bf_by_runtime_size(bool build_bf_by_runtime_size) { + _desc.__set_build_bf_by_runtime_size(build_bf_by_runtime_size); + return *this; + } + TRuntimeFilterDesc& build() { return _desc; } + TRuntimeFilterDescBuilder(const TRuntimeFilterDescBuilder&) = delete; + void operator=(const TRuntimeFilterDescBuilder&) = delete; + +private: + TRuntimeFilterDesc _desc; +}; + +} // namespace doris diff --git a/be/test/runtime_filter/runtime_filter_consumer_test.cpp b/be/test/runtime_filter/runtime_filter_consumer_test.cpp new file mode 100644 index 00000000000..617ce3fdbb0 --- /dev/null +++ b/be/test/runtime_filter/runtime_filter_consumer_test.cpp @@ -0,0 +1,67 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +#include "runtime_filter/runtime_filter_consumer.h" + +#include <glog/logging.h> +#include <gtest/gtest.h> + +#include "runtime_filter/runtime_filter_producer.h" +#include "runtime_filter/runtime_filter_test_utils.h" + +namespace doris { + +class RuntimeFilterConsumerTest : public RuntimeFilterTest {}; + +TEST_F(RuntimeFilterConsumerTest, basic) { + std::shared_ptr<RuntimeFilterConsumer> consumer; + auto desc = TRuntimeFilterDescBuilder().add_planId_to_target_expr(0).build(); + FAIL_IF_ERROR_OR_CATCH_EXCEPTION(RuntimeFilterConsumer::create( + RuntimeFilterParamsContext::create(_query_ctx.get()), &desc, 0, &consumer, &_profile)); + + std::shared_ptr<RuntimeFilterConsumer> registed_consumer; + FAIL_IF_ERROR_OR_CATCH_EXCEPTION(_runtime_states[1]->register_consumer_runtime_filter( + desc, true, 0, ®isted_consumer, &_profile)); +} + +TEST_F(RuntimeFilterConsumerTest, signal_aquire) { + std::shared_ptr<RuntimeFilterConsumer> consumer; + auto desc = TRuntimeFilterDescBuilder().add_planId_to_target_expr(0).build(); + FAIL_IF_ERROR_OR_CATCH_EXCEPTION(RuntimeFilterConsumer::create( + RuntimeFilterParamsContext::create(_query_ctx.get()), &desc, 0, &consumer, &_profile)); + + std::shared_ptr<RuntimeFilterProducer> producer; + FAIL_IF_ERROR_OR_CATCH_EXCEPTION(RuntimeFilterProducer::create( + RuntimeFilterParamsContext::create(_query_ctx.get()), &desc, &producer, &_profile)); + producer->set_wrapper_state_and_ready_to_publish(RuntimeFilterWrapper::State::READY); + + consumer->signal(producer.get()); + + try { + consumer->signal(producer.get()); + ASSERT_TRUE(false); + } catch (const Exception& e) { + ASSERT_EQ(e.code(), ErrorCode::INTERNAL_ERROR); + } + + std::vector<vectorized::VRuntimeFilterPtr> push_exprs; + FAIL_IF_ERROR_OR_CATCH_EXCEPTION(consumer->acquire_expr(push_exprs)); + ASSERT_EQ(push_exprs.size(), 1); + ASSERT_TRUE(consumer->is_applied()); +} + +} // namespace doris diff --git a/be/test/runtime_filter/runtime_filter_merger_test.cpp b/be/test/runtime_filter/runtime_filter_merger_test.cpp new file mode 100644 index 00000000000..768284879bc --- /dev/null +++ b/be/test/runtime_filter/runtime_filter_merger_test.cpp @@ -0,0 +1,179 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +#include "runtime_filter/runtime_filter_merger.h" + +#include <glog/logging.h> +#include <gtest/gtest.h> + +#include "runtime_filter/runtime_filter_producer.h" +#include "runtime_filter/runtime_filter_test_utils.h" + +namespace doris { + +class RuntimeFilterMergerTest : public RuntimeFilterTest { +public: + void test_merge_from(RuntimeFilterWrapper::State first_product_state, + RuntimeFilterWrapper::State first_expected_state, + RuntimeFilterWrapper::State second_product_state, + RuntimeFilterWrapper::State second_expected_state) { + std::shared_ptr<RuntimeFilterMerger> merger; + auto desc = TRuntimeFilterDescBuilder().build(); + FAIL_IF_ERROR_OR_CATCH_EXCEPTION(RuntimeFilterMerger::create( + RuntimeFilterParamsContext::create(_query_ctx.get()), &desc, &merger, &_profile)); + merger->set_expected_producer_num(2); + ASSERT_FALSE(merger->ready()); + ASSERT_EQ(merger->_wrapper->_state, RuntimeFilterWrapper::State::IGNORED); + + std::shared_ptr<RuntimeFilterProducer> producer; + FAIL_IF_ERROR_OR_CATCH_EXCEPTION( + _runtime_states[0]->register_producer_runtime_filter(desc, &producer, &_profile)); + producer->set_wrapper_state_and_ready_to_publish(first_product_state); + FAIL_IF_ERROR_OR_CATCH_EXCEPTION(merger->merge_from(producer.get())); + ASSERT_FALSE(merger->ready()); + ASSERT_EQ(merger->_wrapper->_state, first_expected_state); + + std::shared_ptr<RuntimeFilterProducer> producer2; + FAIL_IF_ERROR_OR_CATCH_EXCEPTION( + _runtime_states[1]->register_producer_runtime_filter(desc, &producer2, &_profile)); + producer2->set_wrapper_state_and_ready_to_publish(second_product_state); + FAIL_IF_ERROR_OR_CATCH_EXCEPTION(merger->merge_from(producer2.get())); + ASSERT_TRUE(merger->ready()); + ASSERT_EQ(merger->_wrapper->_state, second_expected_state); + } + + void test_serialize(RuntimeFilterWrapper::State state) { + std::shared_ptr<RuntimeFilterMerger> merger; + auto desc = TRuntimeFilterDescBuilder().build(); + FAIL_IF_ERROR_OR_CATCH_EXCEPTION(RuntimeFilterMerger::create( + RuntimeFilterParamsContext::create(_query_ctx.get()), &desc, &merger, &_profile)); + merger->set_expected_producer_num(1); + ASSERT_FALSE(merger->ready()); + + std::shared_ptr<RuntimeFilterProducer> producer; + FAIL_IF_ERROR_OR_CATCH_EXCEPTION( + _runtime_states[0]->register_producer_runtime_filter(desc, &producer, &_profile)); + producer->set_wrapper_state_and_ready_to_publish(state); + FAIL_IF_ERROR_OR_CATCH_EXCEPTION(merger->merge_from(producer.get())); + ASSERT_TRUE(merger->ready()); + + PMergeFilterRequest request; + void* data = nullptr; + int len = 0; + FAIL_IF_ERROR_OR_CATCH_EXCEPTION(merger->serialize(&request, &data, &len)); + + std::shared_ptr<RuntimeFilterMerger> deserialized_merger; + FAIL_IF_ERROR_OR_CATCH_EXCEPTION( + RuntimeFilterMerger::create(RuntimeFilterParamsContext::create(_query_ctx.get()), + &desc, &deserialized_merger, &_profile)); + FAIL_IF_ERROR_OR_CATCH_EXCEPTION(deserialized_merger->assign(request, nullptr)); + ASSERT_EQ(merger->_wrapper->_state, state); + } +}; + +TEST_F(RuntimeFilterMergerTest, basic) { + test_merge_from(RuntimeFilterWrapper::State::READY, RuntimeFilterWrapper::State::READY, + RuntimeFilterWrapper::State::READY, RuntimeFilterWrapper::State::READY); +} + +TEST_F(RuntimeFilterMergerTest, add_rf_size) { + std::shared_ptr<RuntimeFilterMerger> merger; + auto desc = TRuntimeFilterDescBuilder().build(); + FAIL_IF_ERROR_OR_CATCH_EXCEPTION(RuntimeFilterMerger::create( + RuntimeFilterParamsContext::create(_query_ctx.get()), &desc, &merger, &_profile)); + merger->set_expected_producer_num(2); + + ASSERT_FALSE(merger->add_rf_size(123)); + ASSERT_TRUE(merger->add_rf_size(1)); + ASSERT_EQ(merger->get_received_sum_size(), 124); + ASSERT_FALSE(merger->ready()); + + try { + ASSERT_TRUE(merger->add_rf_size(1)); + ASSERT_TRUE(false); + } catch (const Exception& e) { + ASSERT_EQ(e.code(), ErrorCode::INTERNAL_ERROR); + } +} + +TEST_F(RuntimeFilterMergerTest, invalid_merge) { + std::shared_ptr<RuntimeFilterMerger> merger; + auto desc = TRuntimeFilterDescBuilder().build(); + FAIL_IF_ERROR_OR_CATCH_EXCEPTION(RuntimeFilterMerger::create( + RuntimeFilterParamsContext::create(_query_ctx.get()), &desc, &merger, &_profile)); + merger->set_expected_producer_num(1); + ASSERT_FALSE(merger->ready()); + ASSERT_EQ(merger->_wrapper->_state, RuntimeFilterWrapper::State::IGNORED); + + std::shared_ptr<RuntimeFilterProducer> producer; + FAIL_IF_ERROR_OR_CATCH_EXCEPTION( + _runtime_states[0]->register_producer_runtime_filter(desc, &producer, &_profile)); + producer->set_wrapper_state_and_ready_to_publish(RuntimeFilterWrapper::State::READY); + FAIL_IF_ERROR_OR_CATCH_EXCEPTION(merger->merge_from(producer.get())); // ready wrapper + ASSERT_EQ(merger->_wrapper->_state, RuntimeFilterWrapper::State::READY); + + std::shared_ptr<RuntimeFilterProducer> producer2; + FAIL_IF_ERROR_OR_CATCH_EXCEPTION( + _runtime_states[1]->register_producer_runtime_filter(desc, &producer2, &_profile)); + producer2->set_wrapper_state_and_ready_to_publish(RuntimeFilterWrapper::State::READY); + auto st = merger->merge_from(producer2.get()); + ASSERT_EQ(st.code(), ErrorCode::INTERNAL_ERROR); +} + +TEST_F(RuntimeFilterMergerTest, merge_from_ready_and_ignored) { + test_merge_from(RuntimeFilterWrapper::State::READY, RuntimeFilterWrapper::State::READY, + RuntimeFilterWrapper::State::IGNORED, RuntimeFilterWrapper::State::READY); +} + +TEST_F(RuntimeFilterMergerTest, merge_from_ignored_and_ready) { + test_merge_from(RuntimeFilterWrapper::State::IGNORED, RuntimeFilterWrapper::State::IGNORED, + RuntimeFilterWrapper::State::READY, RuntimeFilterWrapper::State::READY); +} + +TEST_F(RuntimeFilterMergerTest, merge_from_ready_and_disabled) { + test_merge_from(RuntimeFilterWrapper::State::READY, RuntimeFilterWrapper::State::READY, + RuntimeFilterWrapper::State::DISABLED, RuntimeFilterWrapper::State::DISABLED); +} + +TEST_F(RuntimeFilterMergerTest, merge_from_disabled_and_ready) { + test_merge_from(RuntimeFilterWrapper::State::DISABLED, RuntimeFilterWrapper::State::DISABLED, + RuntimeFilterWrapper::State::READY, RuntimeFilterWrapper::State::DISABLED); +} + +TEST_F(RuntimeFilterMergerTest, merge_from_disabled_and_ignored) { + test_merge_from(RuntimeFilterWrapper::State::DISABLED, RuntimeFilterWrapper::State::DISABLED, + RuntimeFilterWrapper::State::IGNORED, RuntimeFilterWrapper::State::DISABLED); +} + +TEST_F(RuntimeFilterMergerTest, merge_from_ignored_and_disabled) { + test_merge_from(RuntimeFilterWrapper::State::IGNORED, RuntimeFilterWrapper::State::IGNORED, + RuntimeFilterWrapper::State::DISABLED, RuntimeFilterWrapper::State::DISABLED); +} + +TEST_F(RuntimeFilterMergerTest, serialize_ready) { + test_serialize(RuntimeFilterWrapper::State::READY); +} + +TEST_F(RuntimeFilterMergerTest, serialize_disabled) { + test_serialize(RuntimeFilterWrapper::State::DISABLED); +} + +TEST_F(RuntimeFilterMergerTest, serialize_ignored) { + test_serialize(RuntimeFilterWrapper::State::IGNORED); +} + +} // namespace doris diff --git a/be/test/runtime_filter/runtime_filter_producer_test.cpp b/be/test/runtime_filter/runtime_filter_producer_test.cpp new file mode 100644 index 00000000000..ab04391fe7d --- /dev/null +++ b/be/test/runtime_filter/runtime_filter_producer_test.cpp @@ -0,0 +1,165 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +#include "runtime_filter/runtime_filter_producer.h" + +#include <glog/logging.h> +#include <gtest/gtest.h> + +#include "runtime_filter/runtime_filter_test_utils.h" + +namespace doris { + +class RuntimeFilterProducerTest : public RuntimeFilterTest {}; + +TEST_F(RuntimeFilterProducerTest, basic) { + std::shared_ptr<RuntimeFilterProducer> producer; + auto desc = TRuntimeFilterDescBuilder().build(); + FAIL_IF_ERROR_OR_CATCH_EXCEPTION(RuntimeFilterProducer::create( + RuntimeFilterParamsContext::create(_query_ctx.get()), &desc, &producer, &_profile)); +} + +TEST_F(RuntimeFilterProducerTest, no_sync_filter_size) { + { + std::shared_ptr<RuntimeFilterProducer> producer; + auto desc = TRuntimeFilterDescBuilder() + .set_build_bf_by_runtime_size(true) + .set_is_broadcast_join(true) + .build(); + FAIL_IF_ERROR_OR_CATCH_EXCEPTION(RuntimeFilterProducer::create( + RuntimeFilterParamsContext::create(_query_ctx.get()), &desc, &producer, &_profile)); + ASSERT_EQ(producer->_need_sync_filter_size, false); + ASSERT_EQ(producer->_rf_state, RuntimeFilterProducer::State::WAITING_FOR_DATA); + } + { + std::shared_ptr<RuntimeFilterProducer> producer; + auto desc = TRuntimeFilterDescBuilder() + .set_build_bf_by_runtime_size(false) + .set_is_broadcast_join(false) + .build(); + FAIL_IF_ERROR_OR_CATCH_EXCEPTION(RuntimeFilterProducer::create( + RuntimeFilterParamsContext::create(_query_ctx.get()), &desc, &producer, &_profile)); + ASSERT_EQ(producer->_need_sync_filter_size, false); + ASSERT_EQ(producer->_rf_state, RuntimeFilterProducer::State::WAITING_FOR_DATA); + } +} + +TEST_F(RuntimeFilterProducerTest, sync_filter_size) { + std::shared_ptr<RuntimeFilterProducer> producer; + auto desc = TRuntimeFilterDescBuilder() + .set_build_bf_by_runtime_size(true) + .set_is_broadcast_join(false) + .build(); + FAIL_IF_ERROR_OR_CATCH_EXCEPTION(RuntimeFilterProducer::create( + RuntimeFilterParamsContext::create(_query_ctx.get()), &desc, &producer, &_profile)); + ASSERT_EQ(producer->_need_sync_filter_size, true); + ASSERT_EQ(producer->_rf_state, RuntimeFilterProducer::State::WAITING_FOR_SEND_SIZE); + + FAIL_IF_ERROR_OR_CATCH_EXCEPTION(producer->send_size(_runtime_states[0].get(), 100, nullptr)); + // local mode, single rf get size directly into WAITING_FOR_DATA + ASSERT_EQ(producer->_rf_state, RuntimeFilterProducer::State::WAITING_FOR_DATA); +} + +TEST_F(RuntimeFilterProducerTest, sync_filter_size_local_no_merge) { + std::shared_ptr<RuntimeFilterProducer> producer; + auto desc = TRuntimeFilterDescBuilder() + .set_build_bf_by_runtime_size(true) + .set_is_broadcast_join(false) + .build(); + FAIL_IF_ERROR_OR_CATCH_EXCEPTION(RuntimeFilterProducer::create( + RuntimeFilterParamsContext::create(_query_ctx.get()), &desc, &producer, &_profile)); + ASSERT_EQ(producer->_need_sync_filter_size, true); + ASSERT_EQ(producer->_rf_state, RuntimeFilterProducer::State::WAITING_FOR_SEND_SIZE); + + FAIL_IF_ERROR_OR_CATCH_EXCEPTION(producer->send_size(_runtime_states[0].get(), 100, nullptr)); + // local mode, single rf get size directly into WAITING_FOR_DATA + ASSERT_EQ(producer->_rf_state, RuntimeFilterProducer::State::WAITING_FOR_DATA); +} + +TEST_F(RuntimeFilterProducerTest, sync_filter_size_local_merge) { + auto desc = TRuntimeFilterDescBuilder() + .set_build_bf_by_runtime_size(true) + .set_is_broadcast_join(false) + .add_planId_to_target_expr(0) + .build(); + + std::shared_ptr<RuntimeFilterProducer> producer; + FAIL_IF_ERROR_OR_CATCH_EXCEPTION( + _runtime_states[0]->register_producer_runtime_filter(desc, &producer, &_profile)); + std::shared_ptr<RuntimeFilterProducer> producer2; + FAIL_IF_ERROR_OR_CATCH_EXCEPTION( + _runtime_states[1]->register_producer_runtime_filter(desc, &producer2, &_profile)); + + std::shared_ptr<RuntimeFilterConsumer> consumer; + FAIL_IF_ERROR_OR_CATCH_EXCEPTION(_runtime_states[1]->register_consumer_runtime_filter( + desc, true, 0, &consumer, &_profile)); + + ASSERT_EQ(producer->_need_sync_filter_size, true); + ASSERT_EQ(producer->_rf_state, RuntimeFilterProducer::State::WAITING_FOR_SEND_SIZE); + + auto dependency = std::make_shared<pipeline::CountedFinishDependency>(0, 0, ""); + + FAIL_IF_ERROR_OR_CATCH_EXCEPTION( + producer->send_size(_runtime_states[0].get(), 123, dependency)); + // global mode, need waitting synced size + ASSERT_EQ(producer->_rf_state, RuntimeFilterProducer::State::WAITING_FOR_SYNCED_SIZE); + ASSERT_FALSE(dependency->ready()); + + FAIL_IF_ERROR_OR_CATCH_EXCEPTION(producer2->send_size(_runtime_states[1].get(), 1, dependency)); + ASSERT_EQ(producer->_rf_state, RuntimeFilterProducer::State::WAITING_FOR_DATA); + ASSERT_EQ(producer->_synced_size, 124); + ASSERT_TRUE(dependency->ready()); +} + +TEST_F(RuntimeFilterProducerTest, set_ignore_or_disable) { + auto desc = TRuntimeFilterDescBuilder() + .set_build_bf_by_runtime_size(true) + .set_is_broadcast_join(false) + .add_planId_to_target_expr(0) + .build(); + + std::shared_ptr<RuntimeFilterProducer> producer; + FAIL_IF_ERROR_OR_CATCH_EXCEPTION( + _runtime_states[0]->register_producer_runtime_filter(desc, &producer, &_profile)); + std::shared_ptr<RuntimeFilterProducer> producer2; + FAIL_IF_ERROR_OR_CATCH_EXCEPTION( + _runtime_states[1]->register_producer_runtime_filter(desc, &producer2, &_profile)); + + std::shared_ptr<RuntimeFilterConsumer> consumer; + FAIL_IF_ERROR_OR_CATCH_EXCEPTION(_runtime_states[1]->register_consumer_runtime_filter( + desc, true, 0, &consumer, &_profile)); + + ASSERT_EQ(producer->_need_sync_filter_size, true); + ASSERT_EQ(producer->_rf_state, RuntimeFilterProducer::State::WAITING_FOR_SEND_SIZE); + + auto dependency = std::make_shared<pipeline::CountedFinishDependency>(0, 0, ""); + + FAIL_IF_ERROR_OR_CATCH_EXCEPTION( + producer->send_size(_runtime_states[0].get(), 123, dependency)); + // global mode, need waitting synced size + ASSERT_EQ(producer->_rf_state, RuntimeFilterProducer::State::WAITING_FOR_SYNCED_SIZE); + + producer->set_wrapper_state_and_ready_to_publish(RuntimeFilterWrapper::State::IGNORED); + ASSERT_EQ(producer->_rf_state, RuntimeFilterProducer::State::READY_TO_PUBLISH); + ASSERT_EQ(producer->_wrapper->_state, RuntimeFilterWrapper::State::IGNORED); + + producer2->set_wrapper_state_and_ready_to_publish(RuntimeFilterWrapper::State::DISABLED); + ASSERT_EQ(producer2->_rf_state, RuntimeFilterProducer::State::READY_TO_PUBLISH); + ASSERT_EQ(producer2->_wrapper->_state, RuntimeFilterWrapper::State::DISABLED); +} + +} // namespace doris diff --git a/be/test/runtime_filter/runtime_filter_test_utils.h b/be/test/runtime_filter/runtime_filter_test_utils.h new file mode 100644 index 00000000000..3141be9d848 --- /dev/null +++ b/be/test/runtime_filter/runtime_filter_test_utils.h @@ -0,0 +1,68 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +#include <glog/logging.h> +#include <gtest/gtest.h> + +#include "pipeline/thrift_builder.h" +#include "runtime/query_context.h" + +namespace doris { + +class RuntimeFilterTest : public testing::Test { +public: + RuntimeFilterTest() = default; + ~RuntimeFilterTest() override = default; + void SetUp() override { + _query_options = TQueryOptionsBuilder().set_runtime_filter_max_in_num(15).build(); + auto fe_address = TNetworkAddress(); + fe_address.hostname = LOCALHOST; + fe_address.port = DUMMY_PORT; + + _query_ctx = + QueryContext::create(TUniqueId(), ExecEnv::GetInstance(), _query_options, + fe_address, true, fe_address, QuerySource::INTERNAL_FRONTEND); + _query_ctx->runtime_filter_mgr()->set_runtime_filter_params( + TRuntimeFilterParamsBuilder().build()); + + _runtime_states.resize(INSTANCE_NUM); + _local_mgrs.resize(INSTANCE_NUM); + for (int i = 0; i < INSTANCE_NUM; i++) { + _runtime_states[i] = RuntimeState::create_unique( + TUniqueId(), 0, _query_options, _query_ctx->query_globals, + ExecEnv::GetInstance(), _query_ctx.get()); + + _local_mgrs[i] = std::make_unique<RuntimeFilterMgr>( + TUniqueId(), RuntimeFilterParamsContext::create(_query_ctx.get()), + _query_ctx->query_mem_tracker(), false); + _runtime_states[i]->set_runtime_filter_mgr(_local_mgrs[i].get()); + } + } + void TearDown() override {} + +protected: + RuntimeProfile _profile = RuntimeProfile(""); + std::shared_ptr<QueryContext> _query_ctx; + TQueryOptions _query_options; + const std::string LOCALHOST = BackendOptions::get_localhost(); + const int DUMMY_PORT = config::brpc_port; + const int INSTANCE_NUM = 2; + std::vector<std::unique_ptr<RuntimeState>> _runtime_states; + std::vector<std::unique_ptr<RuntimeFilterMgr>> _local_mgrs; +}; + +} // namespace doris --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org For additional commands, e-mail: commits-h...@doris.apache.org