This is an automated email from the ASF dual-hosted git repository.
yiguolei pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/master by this push:
new ff4d1e7b24e [chore](profile) Minor refactor on runtime filter producer
profile (#50055)
ff4d1e7b24e is described below
commit ff4d1e7b24e7f701c7ee6e946978ef9f782350c4
Author: zhiqiang <[email protected]>
AuthorDate: Wed Apr 16 10:14:40 2025 +0800
[chore](profile) Minor refactor on runtime filter producer profile (#50055)
### What problem does this PR solve?
Do same thing from https://github.com/apache/doris/pull/49777 to
RuntimeFilterProducer
In execution profile:
```text
- RuntimeFilterInfo:
- BuildTime: 392.464us
- PublishTime: 69.942us
- RF4 Info: Producer: ([id: 4, state: [READY], type: MINMAX_FILTER,
column_type: INT], mode: LOCAL, state: PUBLISHED)
- RF5 Info: Producer: ([id: 5, state: [READY], type:
IN_OR_BLOOM_FILTER(BLOOM_FILTER), column_type: INT, bf_size: 1048576,
build_bf_by_runtime_size: true], mode: LOCAL, state: PUBLISHED)
- SkipProcess: False
```
In merged profile
```
- RuntimeFilterInfo: sum , avg , max , min
- BuildTime: avg 0ns, max 0ns, min 0ns
- PublishTime: avg 103.959us, max 103.959us, min 103.959us
```
---
be/src/pipeline/exec/hashjoin_build_sink.cpp | 5 ++-
.../exec/nested_loop_join_build_operator.cpp | 3 +-
be/src/pipeline/exec/set_sink_operator.cpp | 7 +++-
be/src/runtime/runtime_state.cpp | 9 ++---
be/src/runtime/runtime_state.h | 6 +--
be/src/runtime_filter/runtime_filter_mgr.cpp | 14 +++----
be/src/runtime_filter/runtime_filter_mgr.h | 6 +--
be/src/runtime_filter/runtime_filter_producer.h | 36 ++++++++++--------
.../runtime_filter_producer_helper.cpp | 34 ++++++++++++++---
.../runtime_filter_producer_helper.h | 26 +++++++------
.../runtime_filter_producer_helper_cross.h | 3 +-
.../runtime_filter_producer_helper_set.h | 3 +-
.../runtime_filter_consumer_helper_test.cpp | 2 +-
.../runtime_filter_consumer_test.cpp | 8 ++--
.../runtime_filter/runtime_filter_merger_test.cpp | 14 +++----
be/test/runtime_filter/runtime_filter_mgr_test.cpp | 43 ++++++++++------------
.../runtime_filter_producer_helper_cross_test.cpp | 2 +-
.../runtime_filter_producer_helper_test.cpp | 10 ++---
.../runtime_filter_producer_test.cpp | 18 ++++-----
19 files changed, 141 insertions(+), 108 deletions(-)
diff --git a/be/src/pipeline/exec/hashjoin_build_sink.cpp
b/be/src/pipeline/exec/hashjoin_build_sink.cpp
index b9ca1c6f6bd..bfc283470bf 100644
--- a/be/src/pipeline/exec/hashjoin_build_sink.cpp
+++ b/be/src/pipeline/exec/hashjoin_build_sink.cpp
@@ -88,7 +88,7 @@ Status HashJoinBuildSinkLocalState::init(RuntimeState* state,
LocalSinkStateInfo
// Hash Table Init
RETURN_IF_ERROR(_hash_table_init(state));
_runtime_filter_producer_helper =
std::make_shared<RuntimeFilterProducerHelper>(
- profile(), _should_build_hash_table, p._is_broadcast_join);
+ _should_build_hash_table, p._is_broadcast_join);
RETURN_IF_ERROR(_runtime_filter_producer_helper->init(state,
_build_expr_ctxs,
p._runtime_filter_descs));
return Status::OK();
@@ -250,6 +250,9 @@ Status HashJoinBuildSinkLocalState::close(RuntimeState*
state, Status exec_statu
e.to_string(), _terminated, _should_build_hash_table,
_finish_dependency->debug_string(),
blocked_by_shared_hash_table_signal);
}
+ if (_runtime_filter_producer_helper) {
+ _runtime_filter_producer_helper->collect_realtime_profile(profile());
+ }
return Base::close(state, exec_status);
}
diff --git a/be/src/pipeline/exec/nested_loop_join_build_operator.cpp
b/be/src/pipeline/exec/nested_loop_join_build_operator.cpp
index 7b8647f2232..fd44242cb68 100644
--- a/be/src/pipeline/exec/nested_loop_join_build_operator.cpp
+++ b/be/src/pipeline/exec/nested_loop_join_build_operator.cpp
@@ -41,7 +41,7 @@ Status NestedLoopJoinBuildSinkLocalState::init(RuntimeState*
state, LocalSinkSta
RETURN_IF_ERROR(p._filter_src_expr_ctxs[i]->clone(state,
_filter_src_expr_ctxs[i]));
}
- _runtime_filter_producer_helper =
std::make_shared<RuntimeFilterProducerHelperCross>(profile());
+ _runtime_filter_producer_helper =
std::make_shared<RuntimeFilterProducerHelperCross>();
RETURN_IF_ERROR(_runtime_filter_producer_helper->init(state,
_filter_src_expr_ctxs,
p._runtime_filter_descs));
return Status::OK();
@@ -56,6 +56,7 @@ Status NestedLoopJoinBuildSinkLocalState::open(RuntimeState*
state) {
Status NestedLoopJoinBuildSinkLocalState::close(RuntimeState* state, Status
exec_status) {
RETURN_IF_ERROR(_runtime_filter_producer_helper->process(state,
_shared_state->build_blocks));
+ _runtime_filter_producer_helper->collect_realtime_profile(profile());
RETURN_IF_ERROR(JoinBuildSinkLocalState::close(state, exec_status));
return Status::OK();
}
diff --git a/be/src/pipeline/exec/set_sink_operator.cpp
b/be/src/pipeline/exec/set_sink_operator.cpp
index 0d8f4a45eb7..6c5b4483915 100644
--- a/be/src/pipeline/exec/set_sink_operator.cpp
+++ b/be/src/pipeline/exec/set_sink_operator.cpp
@@ -66,6 +66,11 @@ Status SetSinkLocalState<is_intersect>::close(RuntimeState*
state, Status exec_s
e.to_string(), _terminated,
_finish_dependency->debug_string());
}
}
+
+ if (_runtime_filter_producer_helper) {
+ _runtime_filter_producer_helper->collect_realtime_profile(profile());
+ }
+
return Base::close(state, exec_status);
}
@@ -209,7 +214,7 @@ Status SetSinkLocalState<is_intersect>::init(RuntimeState*
state, LocalSinkState
RETURN_IF_ERROR(_shared_state->update_build_not_ignore_null(_child_exprs));
- _runtime_filter_producer_helper =
std::make_shared<RuntimeFilterProducerHelperSet>(profile());
+ _runtime_filter_producer_helper =
std::make_shared<RuntimeFilterProducerHelperSet>();
RETURN_IF_ERROR(_runtime_filter_producer_helper->init(state, _child_exprs,
parent._runtime_filter_descs));
return Status::OK();
diff --git a/be/src/runtime/runtime_state.cpp b/be/src/runtime/runtime_state.cpp
index b8dd7f560e9..2389883afb2 100644
--- a/be/src/runtime/runtime_state.cpp
+++ b/be/src/runtime/runtime_state.cpp
@@ -495,14 +495,13 @@ RuntimeFilterMgr*
RuntimeState::global_runtime_filter_mgr() {
}
Status RuntimeState::register_producer_runtime_filter(
- const TRuntimeFilterDesc& desc,
std::shared_ptr<RuntimeFilterProducer>* producer_filter,
- RuntimeProfile* parent_profile) {
+ const TRuntimeFilterDesc& desc,
std::shared_ptr<RuntimeFilterProducer>* producer_filter) {
// Producers are created by local runtime filter mgr and shared by global
runtime filter manager.
// When RF is published, consumers in both global and local RF mgr will be
found.
- RETURN_IF_ERROR(local_runtime_filter_mgr()->register_producer_filter(
- _query_ctx, desc, producer_filter, parent_profile));
+
RETURN_IF_ERROR(local_runtime_filter_mgr()->register_producer_filter(_query_ctx,
desc,
+
producer_filter));
RETURN_IF_ERROR(global_runtime_filter_mgr()->register_local_merger_producer_filter(
- _query_ctx, desc, *producer_filter, &_profile));
+ _query_ctx, desc, *producer_filter));
return Status::OK();
}
diff --git a/be/src/runtime/runtime_state.h b/be/src/runtime/runtime_state.h
index 4037c22d31a..e4ecf59563c 100644
--- a/be/src/runtime/runtime_state.h
+++ b/be/src/runtime/runtime_state.h
@@ -561,9 +561,9 @@ public:
return _task_execution_context;
}
- Status register_producer_runtime_filter(const doris::TRuntimeFilterDesc&
desc,
-
std::shared_ptr<RuntimeFilterProducer>* producer_filter,
- RuntimeProfile* parent_profile);
+ Status register_producer_runtime_filter(
+ const doris::TRuntimeFilterDesc& desc,
+ std::shared_ptr<RuntimeFilterProducer>* producer_filter);
Status register_consumer_runtime_filter(
const doris::TRuntimeFilterDesc& desc, bool need_local_merge, int
node_id,
diff --git a/be/src/runtime_filter/runtime_filter_mgr.cpp
b/be/src/runtime_filter/runtime_filter_mgr.cpp
index c18a448f1b1..a2072b6771b 100644
--- a/be/src/runtime_filter/runtime_filter_mgr.cpp
+++ b/be/src/runtime_filter/runtime_filter_mgr.cpp
@@ -74,7 +74,7 @@ Status RuntimeFilterMgr::register_consumer_filter(
Status RuntimeFilterMgr::register_local_merger_producer_filter(
const QueryContext* query_ctx, const TRuntimeFilterDesc& desc,
- std::shared_ptr<RuntimeFilterProducer> producer, RuntimeProfile*
parent_profile) {
+ std::shared_ptr<RuntimeFilterProducer> producer) {
if (!_is_global) [[unlikely]] {
return Status::InternalError(
"A local merge filter can not be registered in Local
RuntimeFilterMgr");
@@ -129,10 +129,9 @@ Status
RuntimeFilterMgr::get_local_merge_producer_filters(int filter_id,
return Status::OK();
}
-Status RuntimeFilterMgr::register_producer_filter(const QueryContext*
query_ctx,
- const TRuntimeFilterDesc&
desc,
-
std::shared_ptr<RuntimeFilterProducer>* producer,
- RuntimeProfile*
parent_profile) {
+Status RuntimeFilterMgr::register_producer_filter(
+ const QueryContext* query_ctx, const TRuntimeFilterDesc& desc,
+ std::shared_ptr<RuntimeFilterProducer>* producer) {
if (_is_global) [[unlikely]] {
return Status::InternalError(
"A local producer filter should not be registered in Global
RuntimeFilterMgr");
@@ -144,7 +143,7 @@ Status RuntimeFilterMgr::register_producer_filter(const
QueryContext* query_ctx,
if (_producer_id_set.contains(key)) {
return Status::InvalidArgument("filter {} has been registered", key);
}
- RETURN_IF_ERROR(RuntimeFilterProducer::create(query_ctx, &desc, producer,
parent_profile));
+ RETURN_IF_ERROR(RuntimeFilterProducer::create(query_ctx, &desc, producer));
_producer_id_set.insert(key);
return Status::OK();
}
@@ -312,7 +311,7 @@ Status
RuntimeFilterMergeControllerEntity::merge(std::shared_ptr<QueryContext> q
}
std::shared_ptr<RuntimeFilterProducer> tmp_filter;
RETURN_IF_ERROR(RuntimeFilterProducer::create(query_ctx.get(),
&cnt_val.runtime_filter_desc,
- &tmp_filter, nullptr));
+ &tmp_filter));
RETURN_IF_ERROR(tmp_filter->assign(*request, attach_data));
@@ -347,6 +346,7 @@ Status
RuntimeFilterMergeControllerEntity::merge(std::shared_ptr<QueryContext> q
auto ctx = query_ctx->ignore_runtime_filter_error() ?
std::weak_ptr<QueryContext> {}
: query_ctx;
std::vector<TRuntimeFilterTargetParamsV2>& targets =
cnt_val.targetv2_info;
+
for (auto& target : targets) {
auto closure = AutoReleaseClosure<PPublishFilterRequestV2,
DummyBrpcCallback<PPublishFilterResponse>>::
diff --git a/be/src/runtime_filter/runtime_filter_mgr.h
b/be/src/runtime_filter/runtime_filter_mgr.h
index 87e471a745b..00a91d61a2b 100644
--- a/be/src/runtime_filter/runtime_filter_mgr.h
+++ b/be/src/runtime_filter/runtime_filter_mgr.h
@@ -87,15 +87,13 @@ public:
Status register_local_merger_producer_filter(const QueryContext* query_ctx,
const TRuntimeFilterDesc&
desc,
-
std::shared_ptr<RuntimeFilterProducer> producer,
- RuntimeProfile*
parent_profile);
+
std::shared_ptr<RuntimeFilterProducer> producer);
Status get_local_merge_producer_filters(int filter_id, LocalMergeContext**
local_merge_filters);
// Create local producer. This producer is hold by
RuntimeFilterProducerHelper.
Status register_producer_filter(const QueryContext* query_ctx, const
TRuntimeFilterDesc& desc,
- std::shared_ptr<RuntimeFilterProducer>*
producer,
- RuntimeProfile* parent_profile);
+ std::shared_ptr<RuntimeFilterProducer>*
producer);
// update filter by remote
bool set_runtime_filter_params(const TRuntimeFilterParams&
runtime_filter_params);
diff --git a/be/src/runtime_filter/runtime_filter_producer.h
b/be/src/runtime_filter/runtime_filter_producer.h
index d688e29fe00..620262f6051 100644
--- a/be/src/runtime_filter/runtime_filter_producer.h
+++ b/be/src/runtime_filter/runtime_filter_producer.h
@@ -22,6 +22,7 @@
#include "pipeline/dependency.h"
#include "runtime/query_context.h"
#include "runtime_filter/runtime_filter.h"
+#include "util/runtime_profile.h"
namespace doris {
#include "common/compile_check_begin.h"
@@ -46,10 +47,8 @@ public:
};
static Status create(const QueryContext* query_ctx, const
TRuntimeFilterDesc* desc,
- std::shared_ptr<RuntimeFilterProducer>* res,
- RuntimeProfile* parent_profile) {
- *res = std::shared_ptr<RuntimeFilterProducer>(
- new RuntimeFilterProducer(query_ctx, desc, parent_profile));
+ std::shared_ptr<RuntimeFilterProducer>* res) {
+ *res = std::shared_ptr<RuntimeFilterProducer>(new
RuntimeFilterProducer(query_ctx, desc));
RETURN_IF_ERROR((*res)->_init_with_desc(desc,
&query_ctx->query_options()));
return Status::OK();
}
@@ -117,24 +116,33 @@ public:
return false;
}
_rf_state = state;
- _profile->add_info_string("Info", debug_string());
return true;
}
std::shared_ptr<RuntimeFilterWrapper> wrapper() const { return _wrapper; }
void set_wrapper(std::shared_ptr<RuntimeFilterWrapper> wrapper) { _wrapper
= wrapper; }
-private:
- RuntimeFilterProducer(const QueryContext* query_ctx, const
TRuntimeFilterDesc* desc,
- RuntimeProfile* parent_profile)
- : RuntimeFilter(desc),
- _is_broadcast_join(desc->is_broadcast_join),
- _profile(new RuntimeProfile(fmt::format("RF{}",
desc->filter_id))) {
- if (parent_profile) { //tmp filter for mgr has no profile
- parent_profile->add_child(_profile.get(), true, nullptr);
+ void collect_realtime_profile(RuntimeProfile* parent_operator_profile) {
+ DCHECK(parent_operator_profile != nullptr);
+ if (parent_operator_profile == nullptr) {
+ return;
+ }
+ /*
+ RuntimeFilterInfo:
+ - RF0 Info: xxxx
+ */
+ {
+ std::unique_lock<std::mutex> l(_mtx);
+ parent_operator_profile->add_description(
+ fmt::format("RF{} Info", _wrapper->filter_id()),
debug_string(),
+ "RuntimeFilterInfo");
}
}
+private:
+ RuntimeFilterProducer(const QueryContext* query_ctx, const
TRuntimeFilterDesc* desc)
+ : RuntimeFilter(desc), _is_broadcast_join(desc->is_broadcast_join)
{}
+
Status _send_to_remote_targets(RuntimeState* state, RuntimeFilter*
merger_filter);
Status _send_to_local_targets(RuntimeState* state, RuntimeFilter*
merger_filter, bool global);
@@ -150,7 +158,6 @@ private:
RETURN_IF_ERROR(RuntimeFilter::_init_with_desc(desc, options));
_need_sync_filter_size = _wrapper->build_bf_by_runtime_size() &&
!_is_broadcast_join;
_rf_state = _need_sync_filter_size ? State::WAITING_FOR_SEND_SIZE :
State::WAITING_FOR_DATA;
- _profile->add_info_string("Info", debug_string());
return Status::OK();
}
@@ -161,7 +168,6 @@ private:
std::shared_ptr<pipeline::CountedFinishDependency> _dependency;
std::atomic<State> _rf_state;
- std::unique_ptr<RuntimeProfile> _profile;
// only used to lock set_state() to make _rf_state is protected
// set_synced_size and RuntimeFilterProducerHelper::terminate may called
in different threads at the same time
diff --git a/be/src/runtime_filter/runtime_filter_producer_helper.cpp
b/be/src/runtime_filter/runtime_filter_producer_helper.cpp
index 9b3d5b1acdb..435585da3d3 100644
--- a/be/src/runtime_filter/runtime_filter_producer_helper.cpp
+++ b/be/src/runtime_filter/runtime_filter_producer_helper.cpp
@@ -17,6 +17,8 @@
#include "runtime_filter/runtime_filter_producer_helper.h"
+#include <gen_cpp/Metrics_types.h>
+
#include "pipeline/pipeline_task.h"
#include "runtime_filter/runtime_filter_wrapper.h"
@@ -36,8 +38,8 @@ Status RuntimeFilterProducerHelper::init(
const std::vector<TRuntimeFilterDesc>& runtime_filter_descs) {
_producers.resize(runtime_filter_descs.size());
for (size_t i = 0; i < runtime_filter_descs.size(); i++) {
-
RETURN_IF_ERROR(state->register_producer_runtime_filter(runtime_filter_descs[i],
-
&_producers[i], _profile.get()));
+ RETURN_IF_ERROR(
+
state->register_producer_runtime_filter(runtime_filter_descs[i],
&_producers[i]));
}
_init_expr(build_expr_ctxs, runtime_filter_descs);
return Status::OK();
@@ -68,7 +70,7 @@ Status
RuntimeFilterProducerHelper::_init_filters(RuntimeState* state,
}
Status RuntimeFilterProducerHelper::_insert(const vectorized::Block* block,
size_t start) {
- SCOPED_TIMER(_runtime_filter_compute_timer);
+ SCOPED_TIMER(_runtime_filter_compute_timer.get());
for (int i = 0; i < _producers.size(); i++) {
auto filter = _producers[i];
int result_column_id =
_filter_expr_contexts[i]->get_last_result_column_id();
@@ -80,7 +82,7 @@ Status RuntimeFilterProducerHelper::_insert(const
vectorized::Block* block, size
}
Status RuntimeFilterProducerHelper::_publish(RuntimeState* state) {
- SCOPED_TIMER(_publish_runtime_filter_timer);
+ SCOPED_TIMER(_publish_runtime_filter_timer.get());
for (const auto& filter : _producers) {
RETURN_IF_ERROR(filter->publish(state, _should_build_hash_table));
}
@@ -153,8 +155,30 @@ Status
RuntimeFilterProducerHelper::skip_process(RuntimeState* state) {
RETURN_IF_ERROR(publish(state));
_skip_runtime_filters_process = true;
- _profile->add_info_string("SkipProcess", "True");
return Status::OK();
}
+void RuntimeFilterProducerHelper::collect_realtime_profile(
+ RuntimeProfile* parent_operator_profile) {
+ DCHECK(parent_operator_profile != nullptr);
+ if (parent_operator_profile == nullptr) {
+ return;
+ }
+
+ parent_operator_profile->add_counter_with_level("RuntimeFilterInfo",
TUnit::NONE, 1);
+ RuntimeProfile::Counter* publish_timer =
parent_operator_profile->add_counter(
+ "PublishTime", TUnit::TIME_NS, "RuntimeFilterInfo", 1);
+ RuntimeProfile::Counter* build_timer =
parent_operator_profile->add_counter(
+ "BuildTime", TUnit::TIME_NS, "RuntimeFilterInfo", 1);
+
+ parent_operator_profile->add_description(
+ "SkipProcess", _skip_runtime_filters_process ? "True" : "False",
"RuntimeFilterInfo");
+ publish_timer->set(_publish_runtime_filter_timer->value());
+ build_timer->set(_runtime_filter_compute_timer->value());
+
+ for (auto& producer : _producers) {
+ producer->collect_realtime_profile(parent_operator_profile);
+ }
+}
+
} // namespace doris
diff --git a/be/src/runtime_filter/runtime_filter_producer_helper.h
b/be/src/runtime_filter/runtime_filter_producer_helper.h
index d3a6f23bd10..a802e9b3ba7 100644
--- a/be/src/runtime_filter/runtime_filter_producer_helper.h
+++ b/be/src/runtime_filter/runtime_filter_producer_helper.h
@@ -17,6 +17,8 @@
#pragma once
+#include <gen_cpp/Metrics_types.h>
+
#include "common/be_mock_util.h"
#include "common/status.h"
#include "runtime/runtime_state.h"
@@ -36,15 +38,9 @@ class RuntimeFilterProducerHelper {
public:
virtual ~RuntimeFilterProducerHelper() = default;
- RuntimeFilterProducerHelper(RuntimeProfile* profile, bool
should_build_hash_table,
- bool is_broadcast_join)
+ RuntimeFilterProducerHelper(bool should_build_hash_table, bool
is_broadcast_join)
: _should_build_hash_table(should_build_hash_table),
- _profile(new RuntimeProfile("RuntimeFilterProducerHelper")),
- _is_broadcast_join(is_broadcast_join) {
- profile->add_child(_profile.get(), true, nullptr);
- _publish_runtime_filter_timer = ADD_TIMER_WITH_LEVEL(_profile,
"PublishTime", 1);
- _runtime_filter_compute_timer = ADD_TIMER_WITH_LEVEL(_profile,
"BuildTime", 1);
- }
+ _is_broadcast_join(is_broadcast_join) {}
#ifdef BE_TEST
RuntimeFilterProducerHelper() : _should_build_hash_table(true),
_is_broadcast_join(false) {}
@@ -70,6 +66,8 @@ public:
// publish rf
Status publish(RuntimeState* state);
+ void collect_realtime_profile(RuntimeProfile* parent_operator_profile);
+
protected:
virtual void _init_expr(const vectorized::VExprContextSPtrs&
build_expr_ctxs,
const std::vector<TRuntimeFilterDesc>&
runtime_filter_descs);
@@ -79,10 +77,14 @@ protected:
std::vector<std::shared_ptr<RuntimeFilterProducer>> _producers;
const bool _should_build_hash_table;
- RuntimeProfile::Counter* _publish_runtime_filter_timer = nullptr;
- RuntimeProfile::Counter* _runtime_filter_compute_timer = nullptr;
- std::unique_ptr<RuntimeProfile> _profile;
- bool _skip_runtime_filters_process = false;
+ std::unique_ptr<RuntimeProfile::Counter> _publish_runtime_filter_timer =
+ std::make_unique<RuntimeProfile::Counter>(TUnit::TIME_NS, 0);
+ std::unique_ptr<RuntimeProfile::Counter> _runtime_filter_compute_timer =
+ std::make_unique<RuntimeProfile::Counter>(TUnit::TIME_NS, 0);
+
+ // This flag is setted by skip_process
+ // and read by many methods, not sure wheather there exists data race, so
i use atomic
+ std::atomic_bool _skip_runtime_filters_process = false;
const bool _is_broadcast_join;
std::vector<std::shared_ptr<vectorized::VExprContext>>
_filter_expr_contexts;
diff --git a/be/src/runtime_filter/runtime_filter_producer_helper_cross.h
b/be/src/runtime_filter/runtime_filter_producer_helper_cross.h
index dd629cef1f4..af80750524c 100644
--- a/be/src/runtime_filter/runtime_filter_producer_helper_cross.h
+++ b/be/src/runtime_filter/runtime_filter_producer_helper_cross.h
@@ -33,8 +33,7 @@ class RuntimeFilterProducerHelperCross : public
RuntimeFilterProducerHelper {
public:
~RuntimeFilterProducerHelperCross() override = default;
- RuntimeFilterProducerHelperCross(RuntimeProfile* profile)
- : RuntimeFilterProducerHelper(profile, true, false) {}
+ RuntimeFilterProducerHelperCross() : RuntimeFilterProducerHelper(true,
false) {}
Status process(RuntimeState* state, vectorized::Blocks& blocks) {
for (auto& block : blocks) {
diff --git a/be/src/runtime_filter/runtime_filter_producer_helper_set.h
b/be/src/runtime_filter/runtime_filter_producer_helper_set.h
index 2e4e5bfe86a..0478d7b4c5c 100644
--- a/be/src/runtime_filter/runtime_filter_producer_helper_set.h
+++ b/be/src/runtime_filter/runtime_filter_producer_helper_set.h
@@ -34,8 +34,7 @@ class RuntimeFilterProducerHelperSet : public
RuntimeFilterProducerHelper {
public:
~RuntimeFilterProducerHelperSet() override = default;
- RuntimeFilterProducerHelperSet(RuntimeProfile* profile)
- : RuntimeFilterProducerHelper(profile, true, false) {}
+ RuntimeFilterProducerHelperSet() : RuntimeFilterProducerHelper(true,
false) {}
Status process(RuntimeState* state, const vectorized::Block* block,
uint64_t cardinality) {
if (_skip_runtime_filters_process) {
diff --git a/be/test/runtime_filter/runtime_filter_consumer_helper_test.cpp
b/be/test/runtime_filter/runtime_filter_consumer_helper_test.cpp
index 05806c6a9b6..6875b7d4116 100644
--- a/be/test/runtime_filter/runtime_filter_consumer_helper_test.cpp
+++ b/be/test/runtime_filter/runtime_filter_consumer_helper_test.cpp
@@ -88,7 +88,7 @@ TEST_F(RuntimeFilterConsumerHelperTest, basic) {
std::shared_ptr<RuntimeFilterProducer> producer;
FAIL_IF_ERROR_OR_CATCH_EXCEPTION(RuntimeFilterProducer::create(
- _query_ctx.get(), runtime_filter_descs.data(), &producer,
&_profile));
+ _query_ctx.get(), runtime_filter_descs.data(), &producer));
producer->set_wrapper_state_and_ready_to_publish(RuntimeFilterWrapper::State::READY);
helper._consumers[0]->signal(producer.get());
diff --git a/be/test/runtime_filter/runtime_filter_consumer_test.cpp
b/be/test/runtime_filter/runtime_filter_consumer_test.cpp
index 3dcc2064c1c..4d1338c2689 100644
--- a/be/test/runtime_filter/runtime_filter_consumer_test.cpp
+++ b/be/test/runtime_filter/runtime_filter_consumer_test.cpp
@@ -35,7 +35,7 @@ public:
std::shared_ptr<RuntimeFilterProducer> producer;
FAIL_IF_ERROR_OR_CATCH_EXCEPTION(
- RuntimeFilterProducer::create(_query_ctx.get(), &desc,
&producer, &_profile));
+ RuntimeFilterProducer::create(_query_ctx.get(), &desc,
&producer));
FAIL_IF_ERROR_OR_CATCH_EXCEPTION(producer->init(123));
producer->set_wrapper_state_and_ready_to_publish(RuntimeFilterWrapper::State::READY);
@@ -120,7 +120,7 @@ TEST_F(RuntimeFilterConsumerTest, timeout_aquire) {
std::shared_ptr<RuntimeFilterProducer> producer;
FAIL_IF_ERROR_OR_CATCH_EXCEPTION(
- RuntimeFilterProducer::create(_query_ctx.get(), &desc, &producer,
&_profile));
+ RuntimeFilterProducer::create(_query_ctx.get(), &desc, &producer));
producer->set_wrapper_state_and_ready_to_publish(RuntimeFilterWrapper::State::READY);
std::vector<vectorized::VRuntimeFilterPtr> push_exprs;
@@ -156,7 +156,7 @@ TEST_F(RuntimeFilterConsumerTest, aquire_disabled) {
std::shared_ptr<RuntimeFilterProducer> producer;
FAIL_IF_ERROR_OR_CATCH_EXCEPTION(
- RuntimeFilterProducer::create(_query_ctx.get(), &desc, &producer,
&_profile));
+ RuntimeFilterProducer::create(_query_ctx.get(), &desc, &producer));
producer->set_wrapper_state_and_ready_to_publish(RuntimeFilterWrapper::State::DISABLED);
std::vector<vectorized::VRuntimeFilterPtr> push_exprs;
@@ -221,7 +221,7 @@ TEST_F(RuntimeFilterConsumerTest,
aquire_signal_at_same_time) {
std::shared_ptr<RuntimeFilterProducer> producer;
FAIL_IF_ERROR_OR_CATCH_EXCEPTION(
- RuntimeFilterProducer::create(_query_ctx.get(), &desc,
&producer, &_profile));
+ RuntimeFilterProducer::create(_query_ctx.get(), &desc,
&producer));
producer->set_wrapper_state_and_ready_to_publish(RuntimeFilterWrapper::State::READY);
std::vector<vectorized::VRuntimeFilterPtr> push_exprs;
diff --git a/be/test/runtime_filter/runtime_filter_merger_test.cpp
b/be/test/runtime_filter/runtime_filter_merger_test.cpp
index 18b4766afc5..2c62c0de8b0 100644
--- a/be/test/runtime_filter/runtime_filter_merger_test.cpp
+++ b/be/test/runtime_filter/runtime_filter_merger_test.cpp
@@ -41,7 +41,7 @@ public:
std::shared_ptr<RuntimeFilterProducer> producer;
FAIL_IF_ERROR_OR_CATCH_EXCEPTION(
- _runtime_states[0]->register_producer_runtime_filter(desc,
&producer, &_profile));
+ _runtime_states[0]->register_producer_runtime_filter(desc,
&producer));
producer->set_wrapper_state_and_ready_to_publish(first_product_state);
FAIL_IF_ERROR_OR_CATCH_EXCEPTION(merger->merge_from(producer.get()));
ASSERT_FALSE(merger->ready());
@@ -49,7 +49,7 @@ public:
std::shared_ptr<RuntimeFilterProducer> producer2;
FAIL_IF_ERROR_OR_CATCH_EXCEPTION(
- _runtime_states[1]->register_producer_runtime_filter(desc,
&producer2, &_profile));
+ _runtime_states[1]->register_producer_runtime_filter(desc,
&producer2));
producer2->set_wrapper_state_and_ready_to_publish(second_product_state);
FAIL_IF_ERROR_OR_CATCH_EXCEPTION(merger->merge_from(producer2.get()));
ASSERT_TRUE(merger->ready());
@@ -68,7 +68,7 @@ public:
std::shared_ptr<RuntimeFilterProducer> producer;
FAIL_IF_ERROR_OR_CATCH_EXCEPTION(
- _runtime_states[0]->register_producer_runtime_filter(desc,
&producer, &_profile));
+ _runtime_states[0]->register_producer_runtime_filter(desc,
&producer));
FAIL_IF_ERROR_OR_CATCH_EXCEPTION(producer->init(123));
producer->set_wrapper_state_and_ready_to_publish(state);
FAIL_IF_ERROR_OR_CATCH_EXCEPTION(merger->merge_from(producer.get()));
@@ -80,8 +80,8 @@ public:
FAIL_IF_ERROR_OR_CATCH_EXCEPTION(merger->serialize(&request, &data,
&len));
std::shared_ptr<RuntimeFilterProducer> deserialized_producer;
- FAIL_IF_ERROR_OR_CATCH_EXCEPTION(RuntimeFilterProducer::create(
- _query_ctx.get(), &desc, &deserialized_producer, &_profile));
+ FAIL_IF_ERROR_OR_CATCH_EXCEPTION(
+ RuntimeFilterProducer::create(_query_ctx.get(), &desc,
&deserialized_producer));
butil::IOBuf buf;
buf.append(data, len);
butil::IOBufAsZeroCopyInputStream stream(buf);
@@ -124,14 +124,14 @@ TEST_F(RuntimeFilterMergerTest, invalid_merge) {
std::shared_ptr<RuntimeFilterProducer> producer;
FAIL_IF_ERROR_OR_CATCH_EXCEPTION(
- _runtime_states[0]->register_producer_runtime_filter(desc,
&producer, &_profile));
+ _runtime_states[0]->register_producer_runtime_filter(desc,
&producer));
producer->set_wrapper_state_and_ready_to_publish(RuntimeFilterWrapper::State::READY);
FAIL_IF_ERROR_OR_CATCH_EXCEPTION(merger->merge_from(producer.get())); //
ready wrapper
ASSERT_EQ(merger->_wrapper->_state, RuntimeFilterWrapper::State::READY);
std::shared_ptr<RuntimeFilterProducer> producer2;
FAIL_IF_ERROR_OR_CATCH_EXCEPTION(
- _runtime_states[1]->register_producer_runtime_filter(desc,
&producer2, &_profile));
+ _runtime_states[1]->register_producer_runtime_filter(desc,
&producer2));
producer2->set_wrapper_state_and_ready_to_publish(RuntimeFilterWrapper::State::READY);
auto st = merger->merge_from(producer2.get());
ASSERT_EQ(st.code(), ErrorCode::INTERNAL_ERROR);
diff --git a/be/test/runtime_filter/runtime_filter_mgr_test.cpp
b/be/test/runtime_filter/runtime_filter_mgr_test.cpp
index 54b2b673402..d8222e201d9 100644
--- a/be/test/runtime_filter/runtime_filter_mgr_test.cpp
+++ b/be/test/runtime_filter/runtime_filter_mgr_test.cpp
@@ -75,35 +75,32 @@ TEST_F(RuntimeFilterMgrTest, TestRuntimeFilterMgr) {
std::shared_ptr<RuntimeFilterProducer> producer_filter;
// producer_filter should not be nullptr
- EXPECT_FALSE(global_runtime_filter_mgr
-
->register_local_merger_producer_filter(ctx.get(), desc,
-
producer_filter, profile.get())
- .ok());
- // local merge filter should not be registered in local mgr
- EXPECT_FALSE(local_runtime_filter_mgr
-
->register_local_merger_producer_filter(ctx.get(), desc,
-
producer_filter, profile.get())
- .ok());
- // producer should not registered in global mgr
EXPECT_FALSE(
global_runtime_filter_mgr
- ->register_producer_filter(ctx.get(), desc,
&producer_filter, profile.get())
+ ->register_local_merger_producer_filter(ctx.get(),
desc, producer_filter)
.ok());
- EXPECT_EQ(producer_filter, nullptr);
- // Register in local mgr
- EXPECT_TRUE(
+ // local merge filter should not be registered in local mgr
+ EXPECT_FALSE(
local_runtime_filter_mgr
- ->register_producer_filter(ctx.get(), desc,
&producer_filter, profile.get())
+ ->register_local_merger_producer_filter(ctx.get(),
desc, producer_filter)
.ok());
+ // producer should not registered in global mgr
+ EXPECT_FALSE(global_runtime_filter_mgr
+ ->register_producer_filter(ctx.get(), desc,
&producer_filter)
+ .ok());
+ EXPECT_EQ(producer_filter, nullptr);
+ // Register in local mgr
+ EXPECT_TRUE(local_runtime_filter_mgr
+ ->register_producer_filter(ctx.get(), desc,
&producer_filter)
+ .ok());
auto mocked_dependency =
std::make_shared<pipeline::CountedFinishDependency>(
0, 0, "MOCKED_FINISH_DEPENDENCY");
producer_filter->latch_dependency(mocked_dependency);
EXPECT_NE(producer_filter, nullptr);
// Register in local mgr twice
- EXPECT_FALSE(
- local_runtime_filter_mgr
- ->register_producer_filter(ctx.get(), desc,
&producer_filter, profile.get())
- .ok());
+ EXPECT_FALSE(local_runtime_filter_mgr
+ ->register_producer_filter(ctx.get(), desc,
&producer_filter)
+ .ok());
EXPECT_NE(producer_filter, nullptr);
LocalMergeContext* local_merge_filters = nullptr;
@@ -114,10 +111,10 @@ TEST_F(RuntimeFilterMgrTest, TestRuntimeFilterMgr) {
->get_local_merge_producer_filters(filter_id,
&local_merge_filters)
.ok());
// Register local merge filter
- EXPECT_TRUE(global_runtime_filter_mgr
- ->register_local_merger_producer_filter(ctx.get(),
desc,
-
producer_filter, profile.get())
- .ok());
+ EXPECT_TRUE(
+ global_runtime_filter_mgr
+ ->register_local_merger_producer_filter(ctx.get(),
desc, producer_filter)
+ .ok());
EXPECT_TRUE(global_runtime_filter_mgr
->get_local_merge_producer_filters(filter_id,
&local_merge_filters)
.ok());
diff --git
a/be/test/runtime_filter/runtime_filter_producer_helper_cross_test.cpp
b/be/test/runtime_filter/runtime_filter_producer_helper_cross_test.cpp
index 974642d9d74..53f8e05e81d 100644
--- a/be/test/runtime_filter/runtime_filter_producer_helper_cross_test.cpp
+++ b/be/test/runtime_filter/runtime_filter_producer_helper_cross_test.cpp
@@ -56,7 +56,7 @@ class RuntimeFilterProducerHelperCrossTest : public
RuntimeFilterTest {
};
TEST_F(RuntimeFilterProducerHelperCrossTest, basic) {
- auto helper = RuntimeFilterProducerHelperCross(&_profile);
+ auto helper = RuntimeFilterProducerHelperCross();
vectorized::VExprContextSPtr ctx;
FAIL_IF_ERROR_OR_CATCH_EXCEPTION(vectorized::VExpr::create_expr_tree(
diff --git a/be/test/runtime_filter/runtime_filter_producer_helper_test.cpp
b/be/test/runtime_filter/runtime_filter_producer_helper_test.cpp
index fffe752d9db..bf5988928dd 100644
--- a/be/test/runtime_filter/runtime_filter_producer_helper_test.cpp
+++ b/be/test/runtime_filter/runtime_filter_producer_helper_test.cpp
@@ -57,7 +57,7 @@ class RuntimeFilterProducerHelperTest : public
RuntimeFilterTest {
};
TEST_F(RuntimeFilterProducerHelperTest, basic) {
- auto helper = RuntimeFilterProducerHelper(&_profile, true, false);
+ auto helper = RuntimeFilterProducerHelper(true, false);
vectorized::VExprContextSPtr ctx;
FAIL_IF_ERROR_OR_CATCH_EXCEPTION(vectorized::VExpr::create_expr_tree(
@@ -82,7 +82,7 @@ TEST_F(RuntimeFilterProducerHelperTest, basic) {
}
TEST_F(RuntimeFilterProducerHelperTest, wake_up_eraly) {
- auto helper = RuntimeFilterProducerHelper(&_profile, true, false);
+ auto helper = RuntimeFilterProducerHelper(true, false);
vectorized::VExprContextSPtr ctx;
FAIL_IF_ERROR_OR_CATCH_EXCEPTION(vectorized::VExpr::create_expr_tree(
@@ -106,7 +106,7 @@ TEST_F(RuntimeFilterProducerHelperTest, wake_up_eraly) {
}
TEST_F(RuntimeFilterProducerHelperTest, skip_process) {
- auto helper = RuntimeFilterProducerHelper(&_profile, true, false);
+ auto helper = RuntimeFilterProducerHelper(true, false);
vectorized::VExprContextSPtr ctx;
FAIL_IF_ERROR_OR_CATCH_EXCEPTION(vectorized::VExpr::create_expr_tree(
@@ -137,7 +137,7 @@ TEST_F(RuntimeFilterProducerHelperTest, skip_process) {
}
TEST_F(RuntimeFilterProducerHelperTest, broadcast) {
- auto helper = RuntimeFilterProducerHelper(&_profile, true, true);
+ auto helper = RuntimeFilterProducerHelper(true, true);
vectorized::VExprContextSPtr ctx;
FAIL_IF_ERROR_OR_CATCH_EXCEPTION(vectorized::VExpr::create_expr_tree(
@@ -160,7 +160,7 @@ TEST_F(RuntimeFilterProducerHelperTest, broadcast) {
helper.build(_runtime_states[0].get(), &block, true,
runtime_filters));
FAIL_IF_ERROR_OR_CATCH_EXCEPTION(helper.publish(_runtime_states[0].get()));
- auto helper2 = RuntimeFilterProducerHelper(&_profile, false, true);
+ auto helper2 = RuntimeFilterProducerHelper(false, true);
FAIL_IF_ERROR_OR_CATCH_EXCEPTION(
helper2.init(_runtime_states[1].get(), build_expr_ctxs,
runtime_filter_descs));
FAIL_IF_ERROR_OR_CATCH_EXCEPTION(
diff --git a/be/test/runtime_filter/runtime_filter_producer_test.cpp
b/be/test/runtime_filter/runtime_filter_producer_test.cpp
index 549a2d8361c..b075247759a 100644
--- a/be/test/runtime_filter/runtime_filter_producer_test.cpp
+++ b/be/test/runtime_filter/runtime_filter_producer_test.cpp
@@ -31,7 +31,7 @@ TEST_F(RuntimeFilterProducerTest, basic) {
std::shared_ptr<RuntimeFilterProducer> producer;
auto desc = TRuntimeFilterDescBuilder().build();
FAIL_IF_ERROR_OR_CATCH_EXCEPTION(
- RuntimeFilterProducer::create(_query_ctx.get(), &desc, &producer,
&_profile));
+ RuntimeFilterProducer::create(_query_ctx.get(), &desc, &producer));
}
TEST_F(RuntimeFilterProducerTest, no_sync_filter_size) {
@@ -42,7 +42,7 @@ TEST_F(RuntimeFilterProducerTest, no_sync_filter_size) {
.set_is_broadcast_join(true)
.build();
FAIL_IF_ERROR_OR_CATCH_EXCEPTION(
- RuntimeFilterProducer::create(_query_ctx.get(), &desc,
&producer, &_profile));
+ RuntimeFilterProducer::create(_query_ctx.get(), &desc,
&producer));
ASSERT_EQ(producer->_need_sync_filter_size, false);
ASSERT_EQ(producer->_rf_state,
RuntimeFilterProducer::State::WAITING_FOR_DATA);
}
@@ -53,7 +53,7 @@ TEST_F(RuntimeFilterProducerTest, no_sync_filter_size) {
.set_is_broadcast_join(false)
.build();
FAIL_IF_ERROR_OR_CATCH_EXCEPTION(
- RuntimeFilterProducer::create(_query_ctx.get(), &desc,
&producer, &_profile));
+ RuntimeFilterProducer::create(_query_ctx.get(), &desc,
&producer));
ASSERT_EQ(producer->_need_sync_filter_size, false);
ASSERT_EQ(producer->_rf_state,
RuntimeFilterProducer::State::WAITING_FOR_DATA);
}
@@ -66,7 +66,7 @@ TEST_F(RuntimeFilterProducerTest, sync_filter_size) {
.set_is_broadcast_join(false)
.build();
FAIL_IF_ERROR_OR_CATCH_EXCEPTION(
- RuntimeFilterProducer::create(_query_ctx.get(), &desc, &producer,
&_profile));
+ RuntimeFilterProducer::create(_query_ctx.get(), &desc, &producer));
ASSERT_EQ(producer->_need_sync_filter_size, true);
ASSERT_EQ(producer->_rf_state,
RuntimeFilterProducer::State::WAITING_FOR_SEND_SIZE);
@@ -85,7 +85,7 @@ TEST_F(RuntimeFilterProducerTest,
sync_filter_size_local_no_merge) {
.set_is_broadcast_join(false)
.build();
FAIL_IF_ERROR_OR_CATCH_EXCEPTION(
- RuntimeFilterProducer::create(_query_ctx.get(), &desc, &producer,
&_profile));
+ RuntimeFilterProducer::create(_query_ctx.get(), &desc, &producer));
ASSERT_EQ(producer->_need_sync_filter_size, true);
ASSERT_EQ(producer->_rf_state,
RuntimeFilterProducer::State::WAITING_FOR_SEND_SIZE);
@@ -106,10 +106,10 @@ TEST_F(RuntimeFilterProducerTest,
sync_filter_size_local_merge) {
std::shared_ptr<RuntimeFilterProducer> producer;
FAIL_IF_ERROR_OR_CATCH_EXCEPTION(
- _runtime_states[0]->register_producer_runtime_filter(desc,
&producer, &_profile));
+ _runtime_states[0]->register_producer_runtime_filter(desc,
&producer));
std::shared_ptr<RuntimeFilterProducer> producer2;
FAIL_IF_ERROR_OR_CATCH_EXCEPTION(
- _runtime_states[1]->register_producer_runtime_filter(desc,
&producer2, &_profile));
+ _runtime_states[1]->register_producer_runtime_filter(desc,
&producer2));
std::shared_ptr<RuntimeFilterConsumer> consumer;
FAIL_IF_ERROR_OR_CATCH_EXCEPTION(
@@ -142,10 +142,10 @@ TEST_F(RuntimeFilterProducerTest, set_disable) {
std::shared_ptr<RuntimeFilterProducer> producer;
FAIL_IF_ERROR_OR_CATCH_EXCEPTION(
- _runtime_states[0]->register_producer_runtime_filter(desc,
&producer, &_profile));
+ _runtime_states[0]->register_producer_runtime_filter(desc,
&producer));
std::shared_ptr<RuntimeFilterProducer> producer2;
FAIL_IF_ERROR_OR_CATCH_EXCEPTION(
- _runtime_states[1]->register_producer_runtime_filter(desc,
&producer2, &_profile));
+ _runtime_states[1]->register_producer_runtime_filter(desc,
&producer2));
std::shared_ptr<RuntimeFilterConsumer> consumer;
FAIL_IF_ERROR_OR_CATCH_EXCEPTION(
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]