This is an automated email from the ASF dual-hosted git repository.
gabriellee pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/master by this push:
new 4ba4767eef2 [improvement](scan) make global runtime filter support
in-list filter (#29394)
4ba4767eef2 is described below
commit 4ba4767eef28eca4d1c64fbf45a9c2ebc594d2e1
Author: Jerry Hu <[email protected]>
AuthorDate: Thu Jan 4 09:12:30 2024 +0800
[improvement](scan) make global runtime filter support in-list filter
(#29394)
---
be/src/exprs/runtime_filter.cpp | 45 ++++++++++++++--------
be/src/exprs/runtime_filter.h | 4 +-
be/src/exprs/runtime_filter_slots.h | 5 +--
.../processor/post/RuntimeFilterGenerator.java | 6 +--
4 files changed, 32 insertions(+), 28 deletions(-)
diff --git a/be/src/exprs/runtime_filter.cpp b/be/src/exprs/runtime_filter.cpp
index f52a9574bf6..bf09adc53f8 100644
--- a/be/src/exprs/runtime_filter.cpp
+++ b/be/src/exprs/runtime_filter.cpp
@@ -453,9 +453,11 @@ public:
std::vector<vectorized::VExprSPtr>& push_exprs,
const TExpr& probe_expr);
Status merge(const RuntimePredicateWrapper* wrapper) {
- bool can_not_merge_in_or_bloom = _filter_type ==
RuntimeFilterType::IN_OR_BLOOM_FILTER &&
- (wrapper->_filter_type !=
RuntimeFilterType::IN_FILTER &&
- wrapper->_filter_type !=
RuntimeFilterType::BLOOM_FILTER);
+ bool can_not_merge_in_or_bloom =
+ _filter_type == RuntimeFilterType::IN_OR_BLOOM_FILTER &&
+ (wrapper->_filter_type != RuntimeFilterType::IN_FILTER &&
+ wrapper->_filter_type != RuntimeFilterType::BLOOM_FILTER &&
+ wrapper->_filter_type !=
RuntimeFilterType::IN_OR_BLOOM_FILTER);
bool can_not_merge_other = _filter_type !=
RuntimeFilterType::IN_OR_BLOOM_FILTER &&
_filter_type != wrapper->_filter_type;
@@ -513,8 +515,15 @@ public:
case RuntimeFilterType::IN_OR_BLOOM_FILTER: {
auto real_filter_type = _is_bloomfilter ?
RuntimeFilterType::BLOOM_FILTER
:
RuntimeFilterType::IN_FILTER;
+
+ auto other_filter_type = wrapper->_filter_type;
+ if (other_filter_type == RuntimeFilterType::IN_OR_BLOOM_FILTER) {
+ other_filter_type = wrapper->_is_bloomfilter ?
RuntimeFilterType::BLOOM_FILTER
+ :
RuntimeFilterType::IN_FILTER;
+ }
+
if (real_filter_type == RuntimeFilterType::IN_FILTER) {
- if (wrapper->_filter_type == RuntimeFilterType::IN_FILTER) {
// in merge in
+ if (other_filter_type == RuntimeFilterType::IN_FILTER) { // in
merge in
CHECK(!wrapper->_is_ignored_in_filter)
<< " can not ignore merge runtime filter(in filter
id "
<< wrapper->_filter_id << ") when used
IN_OR_BLOOM_FILTER, ignore msg: "
@@ -526,7 +535,6 @@ public:
<< ") >= max_in_num(" << _max_in_num << ")";
change_to_bloom_filter();
}
- // in merge bloom filter
} else {
VLOG_DEBUG << " change runtime filter to bloom filter(id="
<< _filter_id
<< ") because: already exist a bloom filter";
@@ -535,8 +543,7 @@ public:
wrapper->_context.bloom_filter_func.get()));
}
} else {
- if (wrapper->_filter_type ==
- RuntimeFilterType::IN_FILTER) { // bloom filter merge in
+ if (other_filter_type == RuntimeFilterType::IN_FILTER) { //
bloom filter merge in
CHECK(!wrapper->_is_ignored_in_filter)
<< " can not ignore merge runtime filter(in filter
id "
<< wrapper->_filter_id << ") when used
IN_OR_BLOOM_FILTER, ignore msg: "
@@ -1157,6 +1164,14 @@ void
IRuntimeFilter::set_filter_timer(std::shared_ptr<pipeline::RuntimeFilterTim
_filter_timer.push_back(timer);
}
+void IRuntimeFilter::set_ignored(const std::string& msg) {
+ _is_ignored = true;
+ if (_wrapper->_filter_type == RuntimeFilterType::IN_FILTER) {
+ _wrapper->_is_ignored_in_filter = true;
+ _wrapper->_ignored_in_filter_msg = _pool->add(new std::string(msg));
+ }
+}
+
BloomFilterFuncBase* IRuntimeFilter::get_bloomfilter() const {
return _wrapper->get_bloomfilter();
}
@@ -1344,14 +1359,14 @@ void
IRuntimeFilter::update_runtime_filter_type_to_profile() {
Status IRuntimeFilter::merge_from(const RuntimePredicateWrapper* wrapper) {
if (!_is_ignored && wrapper->is_ignored_in_filter()) {
- set_ignored();
- set_ignored_msg(*(wrapper->get_ignored_in_filter_msg()));
+ std::string* msg = wrapper->get_ignored_in_filter_msg();
+ set_ignored(msg ? *msg : "");
}
auto origin_type = _wrapper->get_real_type();
Status status = _wrapper->merge(wrapper);
if (!_is_ignored && _wrapper->is_ignored_in_filter()) {
- set_ignored();
- set_ignored_msg(*(_wrapper->get_ignored_in_filter_msg()));
+ std::string* msg = _wrapper->get_ignored_in_filter_msg();
+ set_ignored(msg ? *msg : "");
}
if (origin_type != _wrapper->get_real_type()) {
update_runtime_filter_type_to_profile();
@@ -1656,10 +1671,8 @@ bool IRuntimeFilter::is_bloomfilter() {
Status IRuntimeFilter::update_filter(const UpdateRuntimeFilterParams* param) {
if (param->request->has_in_filter() &&
param->request->in_filter().has_ignored_msg()) {
- set_ignored();
const PInFilter in_filter = param->request->in_filter();
- auto msg = param->pool->add(new std::string(in_filter.ignored_msg()));
- set_ignored_msg(*msg);
+ set_ignored(in_filter.ignored_msg());
}
std::unique_ptr<RuntimePredicateWrapper> wrapper;
RETURN_IF_ERROR(IRuntimeFilter::create_wrapper(_state, param, _pool,
&wrapper));
@@ -1677,10 +1690,8 @@ Status IRuntimeFilter::update_filter(const
UpdateRuntimeFilterParams* param) {
Status IRuntimeFilter::update_filter(const UpdateRuntimeFilterParamsV2* param,
int64_t start_apply) {
if (param->request->has_in_filter() &&
param->request->in_filter().has_ignored_msg()) {
- set_ignored();
const PInFilter in_filter = param->request->in_filter();
- auto msg = param->pool->add(new std::string(in_filter.ignored_msg()));
- set_ignored_msg(*msg);
+ set_ignored(in_filter.ignored_msg());
}
std::unique_ptr<RuntimePredicateWrapper> tmp_wrapper;
diff --git a/be/src/exprs/runtime_filter.h b/be/src/exprs/runtime_filter.h
index 2673308ae77..fc324c1c1be 100644
--- a/be/src/exprs/runtime_filter.h
+++ b/be/src/exprs/runtime_filter.h
@@ -292,9 +292,7 @@ public:
Status update_filter(const UpdateRuntimeFilterParams* param);
Status update_filter(const UpdateRuntimeFilterParamsV2* param, int64_t
start_apply);
- void set_ignored() { _is_ignored = true; }
-
- void set_ignored_msg(std::string& msg) { _ignored_msg = msg; }
+ void set_ignored(const std::string& msg);
// for ut
bool is_bloomfilter();
diff --git a/be/src/exprs/runtime_filter_slots.h
b/be/src/exprs/runtime_filter_slots.h
index d539e295ae8..495ac28e762 100644
--- a/be/src/exprs/runtime_filter_slots.h
+++ b/be/src/exprs/runtime_filter_slots.h
@@ -58,15 +58,14 @@ public:
filter_id);
}
for (auto filter : filters) {
- filter->set_ignored();
+ filter->set_ignored("");
filter->signal();
}
return Status::OK();
};
auto ignore_remote_filter = [](IRuntimeFilter* runtime_filter,
std::string& msg) {
- runtime_filter->set_ignored();
- runtime_filter->set_ignored_msg(msg);
+ runtime_filter->set_ignored(msg);
RETURN_IF_ERROR(runtime_filter->publish());
return Status::OK();
};
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/nereids/processor/post/RuntimeFilterGenerator.java
b/fe/fe-core/src/main/java/org/apache/doris/nereids/processor/post/RuntimeFilterGenerator.java
index 45a14d3b633..12db27793d0 100644
---
a/fe/fe-core/src/main/java/org/apache/doris/nereids/processor/post/RuntimeFilterGenerator.java
+++
b/fe/fe-core/src/main/java/org/apache/doris/nereids/processor/post/RuntimeFilterGenerator.java
@@ -369,11 +369,7 @@ public class RuntimeFilterGenerator extends
PlanPostProcessor {
List<TRuntimeFilterType> legalTypes =
Arrays.stream(TRuntimeFilterType.values())
.filter(type -> (type.getValue() &
ctx.getSessionVariable().getRuntimeFilterType()) > 0)
.collect(Collectors.toList());
- if (ctx.getSessionVariable().isIgnoreStorageDataDistribution()) {
- // If storage data distribution is ignored, we use BLOOM filter.
- legalTypes.clear();
- legalTypes.add(TRuntimeFilterType.BLOOM);
- }
+
List<EqualTo> hashJoinConjuncts = join.getEqualToConjuncts();
for (int i = 0; i < hashJoinConjuncts.size(); i++) {
EqualTo equalTo = ((EqualTo) JoinUtils.swapEqualToForChildrenOrder(
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]