This is an automated email from the ASF dual-hosted git repository. yiguolei pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/master by this push: new 3006b258b0 [Improvement](bloomfilter) allocate memory for BF in open phase (#13494) 3006b258b0 is described below commit 3006b258b058982626d57521948ed2ac98458af1 Author: Gabriel <gabrielleeb...@gmail.com> AuthorDate: Fri Oct 21 17:37:26 2022 +0800 [Improvement](bloomfilter) allocate memory for BF in open phase (#13494) --- be/src/exec/hash_join_node.cpp | 36 ++++++++++++++--------------- be/src/exec/hash_join_node.h | 6 ++--- be/src/exec/olap_scan_node.cpp | 3 +++ be/src/exprs/bloomfilter_predicate.h | 37 ++++++++++++++++++++++-------- be/src/exprs/runtime_filter.cpp | 29 ++++++++---------------- be/src/exprs/runtime_filter.h | 2 +- be/src/runtime/fragment_mgr.cpp | 39 -------------------------------- be/src/runtime/runtime_filter_mgr.cpp | 4 ++++ be/src/vec/exec/join/vhash_join_node.cpp | 22 ++++++++++++------ be/src/vec/exec/join/vhash_join_node.h | 5 +++- be/src/vec/exec/scan/vscan_node.cpp | 3 +++ be/src/vec/exec/volap_scan_node.cpp | 3 +++ be/test/exprs/runtime_filter_test.cpp | 6 +++++ 13 files changed, 96 insertions(+), 99 deletions(-) diff --git a/be/src/exec/hash_join_node.cpp b/be/src/exec/hash_join_node.cpp index 53efed1113..7e9a2bf989 100644 --- a/be/src/exec/hash_join_node.cpp +++ b/be/src/exec/hash_join_node.cpp @@ -24,6 +24,7 @@ #include "common/utils.h" #include "exec/hash_table.h" +#include "exprs/bloomfilter_predicate.h" #include "exprs/expr.h" #include "exprs/expr_context.h" #include "exprs/runtime_filter.h" @@ -88,9 +89,13 @@ Status HashJoinNode::init(const TPlanNode& tnode, RuntimeState* state) { _build_unique = false; } - for (const auto& filter_desc : _runtime_filter_descs) { + _runtime_filters.resize(_runtime_filter_descs.size()); + + for (size_t i = 0; i < _runtime_filter_descs.size(); i++) { RETURN_IF_ERROR(state->runtime_filter_mgr()->regist_filter( - RuntimeFilterRole::PRODUCER, filter_desc, state->query_options())); + RuntimeFilterRole::PRODUCER, _runtime_filter_descs[i], state->query_options())); + RETURN_IF_ERROR(state->runtime_filter_mgr()->get_producer_filter( + _runtime_filter_descs[i].filter_id, &_runtime_filters[i])); } return Status::OK(); @@ -177,10 +182,10 @@ Status HashJoinNode::close(RuntimeState* state) { return ExecNode::close(state); } -void HashJoinNode::build_side_thread(RuntimeState* state, std::promise<Status>* status) { +void HashJoinNode::probe_side_open_thread(RuntimeState* state, std::promise<Status>* status) { SCOPED_ATTACH_TASK(state); SCOPED_CONSUME_MEM_TRACKER(mem_tracker()); - status->set_value(construct_hash_table(state)); + status->set_value(child(0)->open(state)); } Status HashJoinNode::construct_hash_table(RuntimeState* state) { @@ -218,6 +223,11 @@ Status HashJoinNode::construct_hash_table(RuntimeState* state) { } Status HashJoinNode::open(RuntimeState* state) { + for (size_t i = 0; i < _runtime_filter_descs.size(); i++) { + if (auto bf = _runtime_filters[i]->get_bloomfilter()) { + RETURN_IF_ERROR(bf->init_with_fixed_length()); + } + } RETURN_IF_ERROR(ExecNode::open(state)); SCOPED_TIMER(_runtime_profile->total_time_counter()); SCOPED_CONSUME_MEM_TRACKER(mem_tracker()); @@ -235,13 +245,13 @@ Status HashJoinNode::open(RuntimeState* state) { // main thread std::promise<Status> thread_status; add_runtime_exec_option("Hash Table Built Asynchronously"); - std::thread(bind(&HashJoinNode::build_side_thread, this, state, &thread_status)).detach(); + std::thread(bind(&HashJoinNode::probe_side_open_thread, this, state, &thread_status)).detach(); if (!_runtime_filter_descs.empty()) { RuntimeFilterSlots runtime_filter_slots(_probe_expr_ctxs, _build_expr_ctxs, _runtime_filter_descs); - RETURN_IF_ERROR(thread_status.get_future().get()); + RETURN_IF_ERROR(construct_hash_table(state)); RETURN_IF_ERROR(runtime_filter_slots.init(state, _hash_tbl->size())); { SCOPED_TIMER(_push_compute_timer); @@ -253,23 +263,13 @@ Status HashJoinNode::open(RuntimeState* state) { SCOPED_TIMER(_push_down_timer); runtime_filter_slots.publish(); } - Status open_status = child(0)->open(state); - RETURN_IF_ERROR(open_status); + RETURN_IF_ERROR(thread_status.get_future().get()); } else { - // Open the probe-side child so that it may perform any initialisation in parallel. - // Don't exit even if we see an error, we still need to wait for the build thread - // to finish. - Status open_status = child(0)->open(state); - // Blocks until ConstructHashTable has returned, after which // the hash table is fully constructed and we can start the probe // phase. RETURN_IF_ERROR(thread_status.get_future().get()); - - // ISSUE-1247, check open_status after buildThread execute. - // If this return first, build thread will use 'thread_status' - // which is already destructor and then coredump. - RETURN_IF_ERROR(open_status); + RETURN_IF_ERROR(construct_hash_table(state)); } // seed probe batch and _current_probe_row, etc. diff --git a/be/src/exec/hash_join_node.h b/be/src/exec/hash_join_node.h index 506ac07b9e..afb898b991 100644 --- a/be/src/exec/hash_join_node.h +++ b/be/src/exec/hash_join_node.h @@ -34,6 +34,7 @@ namespace doris { class MemPool; class RowBatch; class TupleRow; +class IRuntimeFilter; // Node for in-memory hash joins: // - builds up a hash table with the rows produced by our right input @@ -139,9 +140,7 @@ private: RuntimeProfile::Counter* _hash_table_list_min_size; RuntimeProfile::Counter* _hash_table_list_max_size; - // Supervises ConstructHashTable in a separate thread, and - // returns its status in the promise parameter. - void build_side_thread(RuntimeState* state, std::promise<Status>* status); + void probe_side_open_thread(RuntimeState* state, std::promise<Status>* status); // We parallelise building the build-side with Open'ing the // probe-side. If, for example, the probe-side child is another @@ -177,6 +176,7 @@ private: std::string get_probe_row_output_string(TupleRow* probe_row); std::vector<TRuntimeFilterDesc> _runtime_filter_descs; + std::vector<IRuntimeFilter*> _runtime_filters; }; } // namespace doris diff --git a/be/src/exec/olap_scan_node.cpp b/be/src/exec/olap_scan_node.cpp index d3b3a3aabd..ef4f2236c5 100644 --- a/be/src/exec/olap_scan_node.cpp +++ b/be/src/exec/olap_scan_node.cpp @@ -244,6 +244,9 @@ Status OlapScanNode::open(RuntimeState* state) { IRuntimeFilter* runtime_filter = nullptr; state->runtime_filter_mgr()->get_consume_filter(filter_desc.filter_id, &runtime_filter); DCHECK(runtime_filter != nullptr); + if (auto bf = runtime_filter->get_bloomfilter()) { + RETURN_IF_ERROR(bf->init_with_fixed_length()); + } if (runtime_filter == nullptr) { continue; } diff --git a/be/src/exprs/bloomfilter_predicate.h b/be/src/exprs/bloomfilter_predicate.h index cd1e89f465..ab07fd8e1b 100644 --- a/be/src/exprs/bloomfilter_predicate.h +++ b/be/src/exprs/bloomfilter_predicate.h @@ -20,6 +20,7 @@ #include <algorithm> #include <cmath> #include <cstdint> +#include <future> #include <memory> #include <string> #include <type_traits> @@ -100,6 +101,10 @@ public: return init_with_fixed_length(filter_size); } + void set_length(int64_t bloom_filter_length) { _bloom_filter_length = bloom_filter_length; } + + Status init_with_fixed_length() { return init_with_fixed_length(_bloom_filter_length); } + Status init_with_fixed_length(int64_t bloom_filter_length) { if (_inited) { return Status::OK(); @@ -118,17 +123,28 @@ public: } Status merge(BloomFilterFuncBase* bloomfilter_func) { - auto other_func = static_cast<BloomFilterFuncBase*>(bloomfilter_func); - if (bloomfilter_func == nullptr) { - _bloom_filter.reset(BloomFilterAdaptor::create()); - } - if (_bloom_filter_alloced != other_func->_bloom_filter_alloced) { - LOG(WARNING) << "bloom filter size not the same: already allocated bytes = " - << _bloom_filter_alloced - << ", expected allocated bytes = " << other_func->_bloom_filter_alloced; - return Status::InvalidArgument("bloom filter size invalid"); + // If `_inited` is false, there is no memory allocated in bloom filter and this is the first + // call for `merge` function. So we just reuse this bloom filter, and we don't need to + // allocate memory again. + if (!_inited) { + auto other_func = static_cast<BloomFilterFuncBase*>(bloomfilter_func); + DCHECK(_bloom_filter == nullptr); + DCHECK(bloomfilter_func != nullptr); + _bloom_filter = bloomfilter_func->_bloom_filter; + _bloom_filter_alloced = other_func->_bloom_filter_alloced; + _inited = true; + return Status::OK(); + } else { + DCHECK(bloomfilter_func != nullptr); + auto other_func = static_cast<BloomFilterFuncBase*>(bloomfilter_func); + if (_bloom_filter_alloced != other_func->_bloom_filter_alloced) { + LOG(WARNING) << "bloom filter size not the same: already allocated bytes = " + << _bloom_filter_alloced << ", expected allocated bytes = " + << other_func->_bloom_filter_alloced; + return Status::InvalidArgument("bloom filter size invalid"); + } + return _bloom_filter->merge(other_func->_bloom_filter.get()); } - return _bloom_filter->merge(other_func->_bloom_filter.get()); } Status assign(const char* data, int len) { @@ -175,6 +191,7 @@ protected: std::shared_ptr<BloomFilterAdaptor> _bloom_filter; bool _inited; std::mutex _lock; + int64_t _bloom_filter_length; }; template <class T> diff --git a/be/src/exprs/runtime_filter.cpp b/be/src/exprs/runtime_filter.cpp index 540e814633..1b5ca935e5 100644 --- a/be/src/exprs/runtime_filter.cpp +++ b/be/src/exprs/runtime_filter.cpp @@ -418,7 +418,7 @@ public: _filter_id(filter_id) {} // init runtime filter wrapper // alloc memory to init runtime filter function - Status init(const RuntimeFilterParams* params, bool init_bloom_filter) { + Status init(const RuntimeFilterParams* params) { _max_in_num = params->max_in_num; switch (_filter_type) { case RuntimeFilterType::IN_FILTER: { @@ -432,20 +432,14 @@ public: case RuntimeFilterType::BLOOM_FILTER: { _is_bloomfilter = true; _bloomfilter_func.reset(create_bloom_filter(_column_return_type)); - if (init_bloom_filter) { - return _bloomfilter_func->init_with_fixed_length(params->bloom_filter_size); - } else { - return Status::OK(); - } + _bloomfilter_func->set_length(params->bloom_filter_size); + return Status::OK(); } case RuntimeFilterType::IN_OR_BLOOM_FILTER: { _hybrid_set.reset(create_set(_column_return_type)); _bloomfilter_func.reset(create_bloom_filter(_column_return_type)); - if (init_bloom_filter) { - return _bloomfilter_func->init_with_fixed_length(params->bloom_filter_size); - } else { - return Status::OK(); - } + _bloomfilter_func->set_length(params->bloom_filter_size); + return Status::OK(); } default: return Status::InvalidArgument("Unknown Filter type"); @@ -469,11 +463,7 @@ public: } } - BloomFilterFuncBase* get_bloomfilter() const { - DCHECK(_filter_type == RuntimeFilterType::IN_OR_BLOOM_FILTER || - _filter_type == RuntimeFilterType::BLOOM_FILTER); - return _bloomfilter_func.get(); - } + BloomFilterFuncBase* get_bloomfilter() const { return _bloomfilter_func.get(); } void insert(const void* data) { switch (_filter_type) { @@ -1073,7 +1063,7 @@ Status IRuntimeFilter::create(RuntimeState* state, ObjectPool* pool, const TRunt *res = pool->add(new IRuntimeFilter(state, pool)); (*res)->set_role(role); UniqueId fragment_instance_id(state->fragment_instance_id()); - return (*res)->init_with_desc(desc, query_options, fragment_instance_id, true, node_id); + return (*res)->init_with_desc(desc, query_options, fragment_instance_id, node_id); } void IRuntimeFilter::insert(const void* data) { @@ -1205,8 +1195,7 @@ BloomFilterFuncBase* IRuntimeFilter::get_bloomfilter() const { } Status IRuntimeFilter::init_with_desc(const TRuntimeFilterDesc* desc, const TQueryOptions* options, - UniqueId fragment_instance_id, bool init_bloom_filter, - int node_id) { + UniqueId fragment_instance_id, int node_id) { // if node_id == -1 , it shouldn't be a consumer DCHECK(node_id >= 0 || (node_id == -1 && !is_consumer())); @@ -1254,7 +1243,7 @@ Status IRuntimeFilter::init_with_desc(const TRuntimeFilterDesc* desc, const TQue } _wrapper = _pool->add(new RuntimePredicateWrapper(_state, _pool, ¶ms)); - return _wrapper->init(¶ms, init_bloom_filter); + return _wrapper->init(¶ms); } Status IRuntimeFilter::serialize(PMergeFilterRequest* request, void** data, int* len) { diff --git a/be/src/exprs/runtime_filter.h b/be/src/exprs/runtime_filter.h index 6756e2e70f..677321ea4d 100644 --- a/be/src/exprs/runtime_filter.h +++ b/be/src/exprs/runtime_filter.h @@ -191,7 +191,7 @@ public: // init filter with desc Status init_with_desc(const TRuntimeFilterDesc* desc, const TQueryOptions* options, - UniqueId fragment_id, bool init_bloom_filter = false, int node_id = -1); + UniqueId fragment_id, int node_id = -1); BloomFilterFuncBase* get_bloomfilter() const; diff --git a/be/src/runtime/fragment_mgr.cpp b/be/src/runtime/fragment_mgr.cpp index b18e80df3e..7b5c3af432 100644 --- a/be/src/runtime/fragment_mgr.cpp +++ b/be/src/runtime/fragment_mgr.cpp @@ -27,7 +27,6 @@ #include "common/object_pool.h" #include "common/resource_tls.h" #include "common/signal_handler.h" -#include "exprs/bloomfilter_predicate.h" #include "gen_cpp/DataSinks_types.h" #include "gen_cpp/FrontendService.h" #include "gen_cpp/HeartbeatService.h" @@ -426,7 +425,6 @@ FragmentMgr::FragmentMgr(ExecEnv* exec_env) : _exec_env(exec_env), _fragment_map(), _fragments_ctx_map(), - _bf_size_map(), _stop_background_threads_latch(1) { _entity = DorisMetrics::instance()->metric_registry()->register_entity("FragmentMgr"); INT_UGAUGE_METRIC_REGISTER(_entity, timeout_canceled_fragment_count); @@ -469,7 +467,6 @@ FragmentMgr::~FragmentMgr() { std::lock_guard<std::mutex> lock(_lock); _fragment_map.clear(); _fragments_ctx_map.clear(); - _bf_size_map.clear(); } } @@ -511,7 +508,6 @@ void FragmentMgr::_exec_actual(std::shared_ptr<FragmentExecState> exec_state, Fi _fragment_map.erase(exec_state->fragment_instance_id()); if (all_done && fragments_ctx) { _fragments_ctx_map.erase(fragments_ctx->query_id); - _bf_size_map.erase(fragments_ctx->query_id); } } @@ -673,29 +669,6 @@ Status FragmentMgr::exec_plan_fragment(const TExecPlanFragmentParams& params, Fi RETURN_IF_ERROR(exec_state->prepare(params)); { std::lock_guard<std::mutex> lock(_lock); - auto& runtime_filter_params = params.params.runtime_filter_params; - if (!runtime_filter_params.rid_to_runtime_filter.empty()) { - auto bf_size_for_cur_query = _bf_size_map.find(fragments_ctx->query_id); - if (bf_size_for_cur_query == _bf_size_map.end()) { - _bf_size_map.insert({fragments_ctx->query_id, {}}); - } - for (auto& filterid_to_desc : runtime_filter_params.rid_to_runtime_filter) { - int filter_id = filterid_to_desc.first; - const auto& target_iter = runtime_filter_params.rid_to_target_param.find(filter_id); - if (target_iter == runtime_filter_params.rid_to_target_param.end()) { - continue; - } - const auto& build_iter = - runtime_filter_params.runtime_filter_builder_num.find(filter_id); - if (build_iter == runtime_filter_params.runtime_filter_builder_num.end()) { - continue; - } - if (filterid_to_desc.second.__isset.bloom_filter_size_bytes) { - _bf_size_map[fragments_ctx->query_id].insert( - {filter_id, filterid_to_desc.second.bloom_filter_size_bytes}); - } - } - } _fragment_map.insert(std::make_pair(params.params.fragment_instance_id, exec_state)); _cv.notify_all(); } @@ -710,7 +683,6 @@ Status FragmentMgr::exec_plan_fragment(const TExecPlanFragmentParams& params, Fi // Remove the exec state added std::lock_guard<std::mutex> lock(_lock); _fragment_map.erase(params.params.fragment_instance_id); - _bf_size_map.erase(fragments_ctx->query_id); } exec_state->cancel(PPlanFragmentCancelReason::INTERNAL_ERROR, "push plan fragment to thread pool failed"); @@ -982,17 +954,6 @@ Status FragmentMgr::merge_filter(const PMergeFilterRequest* request, const char* UniqueId queryid = request->query_id(); std::shared_ptr<RuntimeFilterMergeControllerEntity> filter_controller; RETURN_IF_ERROR(_runtimefilter_controller.acquire(queryid, &filter_controller)); - { - std::lock_guard<std::mutex> lock(_lock); - auto bf_size_for_cur_query = _bf_size_map.find(queryid.to_thrift()); - if (bf_size_for_cur_query != _bf_size_map.end()) { - for (auto& iter : bf_size_for_cur_query->second) { - auto bf = filter_controller->get_filter(iter.first)->filter->get_bloomfilter(); - DCHECK(bf != nullptr); - bf->init_with_fixed_length(iter.second); - } - } - } RETURN_IF_ERROR(filter_controller->merge(request, attach_data)); return Status::OK(); } diff --git a/be/src/runtime/runtime_filter_mgr.cpp b/be/src/runtime/runtime_filter_mgr.cpp index 73a9115a23..5175ed580c 100644 --- a/be/src/runtime/runtime_filter_mgr.cpp +++ b/be/src/runtime/runtime_filter_mgr.cpp @@ -20,6 +20,7 @@ #include <string> #include "client_cache.h" +#include "exprs/bloomfilter_predicate.h" #include "exprs/runtime_filter.h" #include "gen_cpp/internal_service.pb.h" #include "runtime/exec_env.h" @@ -197,6 +198,9 @@ Status RuntimeFilterMergeControllerEntity::merge(const PMergeFilterRequest* requ return Status::InvalidArgument("unknown filter id"); } cntVal = iter->second; + if (auto bf = cntVal->filter->get_bloomfilter()) { + RETURN_IF_ERROR(bf->init_with_fixed_length()); + } MergeRuntimeFilterParams params; params.data = data; params.request = request; diff --git a/be/src/vec/exec/join/vhash_join_node.cpp b/be/src/vec/exec/join/vhash_join_node.cpp index fad0eb68cf..760db91927 100644 --- a/be/src/vec/exec/join/vhash_join_node.cpp +++ b/be/src/vec/exec/join/vhash_join_node.cpp @@ -898,9 +898,12 @@ Status HashJoinNode::init(const TPlanNode& tnode, RuntimeState* state) { _output_expr_ctxs.push_back(ctx); } - for (const auto& filter_desc : _runtime_filter_descs) { + _runtime_filters.resize(_runtime_filter_descs.size()); + for (size_t i = 0; i < _runtime_filter_descs.size(); i++) { RETURN_IF_ERROR(state->runtime_filter_mgr()->regist_filter( - RuntimeFilterRole::PRODUCER, filter_desc, state->query_options())); + RuntimeFilterRole::PRODUCER, _runtime_filter_descs[i], state->query_options())); + RETURN_IF_ERROR(state->runtime_filter_mgr()->get_producer_filter( + _runtime_filter_descs[i].filter_id, &_runtime_filters[i])); } // init left/right output slots flags, only column of slot_id in _hash_output_slot_ids need @@ -1198,6 +1201,11 @@ void HashJoinNode::_construct_mutable_join_block() { Status HashJoinNode::open(RuntimeState* state) { START_AND_SCOPE_SPAN(state->get_tracer(), span, "HashJoinNode::open"); SCOPED_TIMER(_runtime_profile->total_time_counter()); + for (size_t i = 0; i < _runtime_filter_descs.size(); i++) { + if (auto bf = _runtime_filters[i]->get_bloomfilter()) { + RETURN_IF_ERROR(bf->init_with_fixed_length()); + } + } RETURN_IF_ERROR(ExecNode::open(state)); SCOPED_CONSUME_MEM_TRACKER(mem_tracker()); RETURN_IF_CANCELLED(state); @@ -1213,7 +1221,7 @@ Status HashJoinNode::open(RuntimeState* state) { std::thread([this, state, thread_status_p = &thread_status, parent_span = opentelemetry::trace::Tracer::GetCurrentSpan()] { OpentelemetryScope scope {parent_span}; - this->_hash_table_build_thread(state, thread_status_p); + this->_probe_side_open_thread(state, thread_status_p); }).detach(); // Open the probe-side child so that it may perform any initialisation in parallel. @@ -1222,16 +1230,16 @@ Status HashJoinNode::open(RuntimeState* state) { // ISSUE-1247, check open_status after buildThread execute. // If this return first, build thread will use 'thread_status' // which is already destructor and then coredump. - Status open_status = child(0)->open(state); + Status status = _hash_table_build(state); RETURN_IF_ERROR(thread_status.get_future().get()); - return open_status; + return status; } -void HashJoinNode::_hash_table_build_thread(RuntimeState* state, std::promise<Status>* status) { +void HashJoinNode::_probe_side_open_thread(RuntimeState* state, std::promise<Status>* status) { START_AND_SCOPE_SPAN(state->get_tracer(), span, "HashJoinNode::_hash_table_build_thread"); SCOPED_ATTACH_TASK(state); SCOPED_CONSUME_MEM_TRACKER(mem_tracker()); - status->set_value(_hash_table_build(state)); + status->set_value(child(0)->open(state)); } Status HashJoinNode::_hash_table_build(RuntimeState* state) { diff --git a/be/src/vec/exec/join/vhash_join_node.h b/be/src/vec/exec/join/vhash_join_node.h index 5de6b0be9c..5f84211110 100644 --- a/be/src/vec/exec/join/vhash_join_node.h +++ b/be/src/vec/exec/join/vhash_join_node.h @@ -389,7 +389,8 @@ private: MutableColumnPtr _tuple_is_null_left_flag_column; MutableColumnPtr _tuple_is_null_right_flag_column; - void _hash_table_build_thread(RuntimeState* state, std::promise<Status>* status); +private: + void _probe_side_open_thread(RuntimeState* state, std::promise<Status>* status); Status _hash_table_build(RuntimeState* state); @@ -435,6 +436,8 @@ private: std::vector<TRuntimeFilterDesc> _runtime_filter_descs; std::unordered_map<const Block*, std::vector<int>> _inserted_rows; + + std::vector<IRuntimeFilter*> _runtime_filters; }; } // namespace vectorized } // namespace doris diff --git a/be/src/vec/exec/scan/vscan_node.cpp b/be/src/vec/exec/scan/vscan_node.cpp index 81a1c80d54..af193ddcf3 100644 --- a/be/src/vec/exec/scan/vscan_node.cpp +++ b/be/src/vec/exec/scan/vscan_node.cpp @@ -202,6 +202,9 @@ Status VScanNode::_acquire_runtime_filter() { std::vector<VExpr*> vexprs; for (size_t i = 0; i < _runtime_filter_descs.size(); ++i) { IRuntimeFilter* runtime_filter = _runtime_filter_ctxs[i].runtime_filter; + if (auto bf = runtime_filter->get_bloomfilter()) { + RETURN_IF_ERROR(bf->init_with_fixed_length()); + } bool ready = runtime_filter->is_ready(); if (!ready) { ready = runtime_filter->await(); diff --git a/be/src/vec/exec/volap_scan_node.cpp b/be/src/vec/exec/volap_scan_node.cpp index 8551a872da..f2f276cc70 100644 --- a/be/src/vec/exec/volap_scan_node.cpp +++ b/be/src/vec/exec/volap_scan_node.cpp @@ -253,6 +253,9 @@ Status VOlapScanNode::open(RuntimeState* state) { std::vector<VExpr*> vexprs; for (size_t i = 0; i < _runtime_filter_descs.size(); ++i) { IRuntimeFilter* runtime_filter = _runtime_filter_ctxs[i].runtimefilter; + if (auto bf = runtime_filter->get_bloomfilter()) { + RETURN_IF_ERROR(bf->init_with_fixed_length()); + } bool ready = runtime_filter->is_ready(); if (!ready) { ready = runtime_filter->await(); diff --git a/be/test/exprs/runtime_filter_test.cpp b/be/test/exprs/runtime_filter_test.cpp index 6e69eb1d37..d4b66f295c 100644 --- a/be/test/exprs/runtime_filter_test.cpp +++ b/be/test/exprs/runtime_filter_test.cpp @@ -20,6 +20,7 @@ #include <array> #include <memory> +#include "exprs/bloomfilter_predicate.h" #include "exprs/expr_context.h" #include "exprs/slot_ref.h" #include "gen_cpp/Planner_types.h" @@ -109,6 +110,11 @@ IRuntimeFilter* create_runtime_filter(TRuntimeFilterType::type type, TQueryOptio EXPECT_TRUE(status.ok()) << status.to_string(); + if (auto bf = runtime_filter->get_bloomfilter()) { + status = bf->init_with_fixed_length(); + EXPECT_TRUE(status.ok()) << status.to_string(); + } + return status.ok() ? runtime_filter : nullptr; } --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org For additional commands, e-mail: commits-h...@doris.apache.org