Gabriel39 commented on code in PR #47524: URL: https://github.com/apache/doris/pull/47524#discussion_r1961534973
########## be/src/exprs/bloom_filter_func.h: ########## @@ -124,25 +125,26 @@ class BloomFilterFuncBase : public RuntimeFilterFuncBase { void init_params(const RuntimeFilterParams* params) { _bloom_filter_length = params->bloom_filter_size; - _build_bf_exactly = params->build_bf_exactly; + _build_bf_by_runtime_size = params->build_bf_exactly; Review Comment: ```suggestion _build_bf_by_runtime_size = params->build_bf_by_runtime_size; ``` ########## be/src/exprs/bloom_filter_func.h: ########## @@ -124,25 +125,26 @@ class BloomFilterFuncBase : public RuntimeFilterFuncBase { void init_params(const RuntimeFilterParams* params) { _bloom_filter_length = params->bloom_filter_size; - _build_bf_exactly = params->build_bf_exactly; + _build_bf_by_runtime_size = params->build_bf_exactly; _runtime_bloom_filter_min_size = params->runtime_bloom_filter_min_size; _runtime_bloom_filter_max_size = params->runtime_bloom_filter_max_size; _null_aware = params->null_aware; _bloom_filter_size_calculated_by_ndv = params->bloom_filter_size_calculated_by_ndv; + _enable_fixed_len_to_uint32_v2 = params->enable_fixed_len_to_uint32_v2; _limit_length(); } Status init_with_fixed_length() { return init_with_fixed_length(_bloom_filter_length); } - bool get_build_bf_cardinality() const { return _build_bf_exactly; } + bool build_bf_by_runtime_size() const { return _build_bf_by_runtime_size; } - Status init_with_cardinality(const size_t build_bf_cardinality) { - if (_build_bf_exactly) { + Status init_with_cardinality(const size_t runtime_size) { Review Comment: ```suggestion Status init_with_cardinality(const size_t cardinality) { ``` ########## be/src/runtime_filter/role/producer.h: ########## @@ -0,0 +1,160 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +#pragma once + +#include "pipeline/dependency.h" +#include "runtime/query_context.h" +#include "runtime_filter/role/runtime_filter.h" +#include "vec/runtime/shared_hash_table_controller.h" + +namespace doris { + +class RuntimeFilterProducer : public RuntimeFilter { +public: + static Status create(RuntimeFilterParamsContext* state, const TRuntimeFilterDesc* desc, + std::shared_ptr<RuntimeFilterProducer>* res, + RuntimeProfile* parent_profile) { + *res = std::shared_ptr<RuntimeFilterProducer>( + new RuntimeFilterProducer(state, desc, parent_profile)); + RETURN_IF_ERROR((*res)->_init_with_desc(desc, &state->get_query_ctx()->query_options())); + bool need_sync_filter_size = + (*res)->_wrapper->build_bf_by_runtime_size() && !(*res)->_is_broadcast_join; + (*res)->_rf_state = + need_sync_filter_size ? State::WAITING_FOR_SEND_SIZE : State::WAITING_FOR_DATA; + (*res)->_profile->add_info_string("Info", ((*res)->debug_string())); + return Status::OK(); + } + + // insert data to build filter + void insert_batch(vectorized::ColumnPtr column, size_t start) { + if (_rf_state == State::READY_TO_PUBLISH || _rf_state == State::PUBLISHED) { + return; + } + _check_state({State::WAITING_FOR_DATA}); + _wrapper->insert_batch(column, start); + } + + int expr_order() const { return _expr_order; } + + Status init_with_size(size_t local_size); + + Status send_filter_size(RuntimeState* state, uint64_t local_filter_size, + const std::shared_ptr<pipeline::CountedFinishDependency>& dependency); + + Status publish(RuntimeState* state, bool publish_local); + + void set_synced_size(uint64_t global_size); + + std::string debug_string() const override { + return fmt::format("Producer: ({}, state: {}, dependency: {}, synced_size: {})", + _debug_string(), to_string(_rf_state), + _dependency ? _dependency->debug_string() : "none", _synced_size); + } + + enum class State { + WAITING_FOR_SEND_SIZE = 0, + WAITING_FOR_SYNCED_SIZE = 1, + WAITING_FOR_DATA = 2, + READY_TO_PUBLISH = 3, + PUBLISHED = 4 + }; + + void set_wrapper_state_and_ready_to_publish(RuntimeFilterWrapper::State state) { + if (_set_state(State::READY_TO_PUBLISH)) { + _wrapper->set_state(state); + } + } + + void disable_and_ready_to_publish(std::string reason) { + if (_set_state(State::READY_TO_PUBLISH)) { + _wrapper->disable(reason); + } + } + + void disable_meaningless_filters(std::unordered_set<int>& has_in_filter, + bool collect_in_filters); + + static std::string to_string(const State& state) { + switch (state) { + case State::WAITING_FOR_SEND_SIZE: + return "WAITING_FOR_SEND_SIZE"; + case State::WAITING_FOR_SYNCED_SIZE: + return "WAITING_FOR_SYNCED_SIZE"; + case State::WAITING_FOR_DATA: + return "WAITING_FOR_DATA"; + case State::READY_TO_PUBLISH: + return "READY_TO_PUBLISH"; + case State::PUBLISHED: + return "PUBLISHED"; + default: + throw Exception(ErrorCode::INTERNAL_ERROR, "Invalid state {}", int(state)); + } + } + + void copy_to_shared_context(vectorized::SharedHashTableContextPtr& context) { + context->runtime_filters[_wrapper->filter_id()] = _wrapper; + } + + void copy_from_shared_context(vectorized::SharedHashTableContextPtr& context) { + _wrapper = context->runtime_filters[_wrapper->filter_id()]; + } + +private: + RuntimeFilterProducer(RuntimeFilterParamsContext* state, const TRuntimeFilterDesc* desc, + RuntimeProfile* parent_profile) + : RuntimeFilter(state, desc), + _is_broadcast_join(desc->is_broadcast_join), + _expr_order(desc->expr_order), + _profile(new RuntimeProfile(fmt::format("RF{}", desc->filter_id))) { + if (parent_profile) { //tmp filter for mgr has no profile + parent_profile->add_child(_profile.get(), true, nullptr); + } + } + + Status _send_to_remote_targets(RuntimeState* state, RuntimeFilter* merger_filter); + Status _send_to_local_targets(RuntimeFilter* merger_filter, bool global); + + void _check_state(std::vector<State> assumed_states) { + if (!check_state_impl<RuntimeFilterProducer>(_rf_state, assumed_states)) { + throw Exception(ErrorCode::INTERNAL_ERROR, + "producer meet invalid state, {}, assumed_states is {}", debug_string(), + states_to_string<RuntimeFilterProducer>(assumed_states)); + } + } + + bool _set_state(State state) { + if (_rf_state == State::PUBLISHED || Review Comment: We should check if the state transition is legal ########## be/src/runtime_filter/role/producer.h: ########## @@ -0,0 +1,160 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +#pragma once + +#include "pipeline/dependency.h" +#include "runtime/query_context.h" +#include "runtime_filter/role/runtime_filter.h" +#include "vec/runtime/shared_hash_table_controller.h" + +namespace doris { + +class RuntimeFilterProducer : public RuntimeFilter { +public: + static Status create(RuntimeFilterParamsContext* state, const TRuntimeFilterDesc* desc, + std::shared_ptr<RuntimeFilterProducer>* res, + RuntimeProfile* parent_profile) { + *res = std::shared_ptr<RuntimeFilterProducer>( + new RuntimeFilterProducer(state, desc, parent_profile)); + RETURN_IF_ERROR((*res)->_init_with_desc(desc, &state->get_query_ctx()->query_options())); + bool need_sync_filter_size = + (*res)->_wrapper->build_bf_by_runtime_size() && !(*res)->_is_broadcast_join; + (*res)->_rf_state = + need_sync_filter_size ? State::WAITING_FOR_SEND_SIZE : State::WAITING_FOR_DATA; + (*res)->_profile->add_info_string("Info", ((*res)->debug_string())); + return Status::OK(); + } + + // insert data to build filter + void insert_batch(vectorized::ColumnPtr column, size_t start) { + if (_rf_state == State::READY_TO_PUBLISH || _rf_state == State::PUBLISHED) { + return; + } + _check_state({State::WAITING_FOR_DATA}); + _wrapper->insert_batch(column, start); + } + + int expr_order() const { return _expr_order; } + + Status init_with_size(size_t local_size); + + Status send_filter_size(RuntimeState* state, uint64_t local_filter_size, + const std::shared_ptr<pipeline::CountedFinishDependency>& dependency); + + Status publish(RuntimeState* state, bool publish_local); + + void set_synced_size(uint64_t global_size); + + std::string debug_string() const override { + return fmt::format("Producer: ({}, state: {}, dependency: {}, synced_size: {})", + _debug_string(), to_string(_rf_state), + _dependency ? _dependency->debug_string() : "none", _synced_size); + } + + enum class State { Review Comment: Add comments to describe state transition -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org For additional commands, e-mail: commits-h...@doris.apache.org