github-actions[bot] commented on code in PR #32180:
URL: https://github.com/apache/doris/pull/32180#discussion_r1529798756


##########
be/src/exprs/runtime_filter_slots.h:
##########
@@ -37,127 +37,125 @@ class VRuntimeFilterSlots {
             const std::vector<IRuntimeFilter*>& runtime_filters, bool 
need_local_merge = false)
             : _build_expr_context(build_expr_ctxs),
               _runtime_filters(runtime_filters),
-              _need_local_merge(need_local_merge) {}
+              _need_local_merge(need_local_merge) {
+        for (auto* runtime_filter : _runtime_filters) {
+            
_runtime_filters_map[runtime_filter->expr_order()].push_back(runtime_filter);
+        }
+    }
 
-    Status init(RuntimeState* state, int64_t hash_table_size) {
-        // runtime filter effect strategy
-        // 1. we will ignore IN filter when hash_table_size is too big
-        // 2. we will ignore BLOOM filter and MinMax filter when 
hash_table_size
-        // is too small and IN filter has effect
-        std::map<int, bool> has_in_filter;
+    Status send_filter_size(RuntimeState* state, uint64_t hash_table_size, 
bool publish_local,
+                            pipeline::CountedFinishDependency* dependency) {
+        if (_runtime_filters.empty() || publish_local) {
+            return Status::OK();
+        }
+        for (auto* runtime_filter : _runtime_filters) {
+            if (runtime_filter->need_sync_filter_size()) {
+                runtime_filter->set_dependency(dependency);
+            }
+        }
+        for (auto* runtime_filter : _runtime_filters) {
+            if (runtime_filter->need_sync_filter_size()) {
+                
RETURN_IF_ERROR(runtime_filter->send_filter_size(hash_table_size));
+            }
+        }
+        return Status::OK();
+    }
 
-        auto ignore_local_filter = [&](int filter_id) {
-            auto runtime_filter_mgr = _need_local_merge ? 
state->global_runtime_filter_mgr()
-                                                        : 
state->local_runtime_filter_mgr();
+    Status ignore_filter(RuntimeState* state, IRuntimeFilter* runtime_filter,
+                         std::string reason) const {
+        runtime_filter->set_ignored(reason);
+        if (runtime_filter->has_remote_target()) {
+            RETURN_IF_ERROR(runtime_filter->publish());
+        } else {
+            auto* runtime_filter_mgr = _need_local_merge ? 
state->global_runtime_filter_mgr()
+                                                         : 
state->local_runtime_filter_mgr();
 
             std::vector<IRuntimeFilter*> filters;
-            RETURN_IF_ERROR(runtime_filter_mgr->get_consume_filters(filter_id, 
filters));
+            RETURN_IF_ERROR(
+                    
runtime_filter_mgr->get_consume_filters(runtime_filter->filter_id(), filters));
             if (filters.empty()) {
                 throw Exception(ErrorCode::INTERNAL_ERROR, "filters empty, 
filter_id={}",
-                                filter_id);
+                                runtime_filter->filter_id());
             }
             for (auto* filter : filters) {
-                filter->set_ignored("");
+                filter->set_ignored(reason);
                 filter->signal();
             }
-            return Status::OK();
-        };
+        }
+        return Status::OK();
+    }
 
-        auto ignore_remote_filter = [](IRuntimeFilter* runtime_filter, 
std::string& msg) {
-            runtime_filter->set_ignored(msg);
-            RETURN_IF_ERROR(runtime_filter->publish());
-            return Status::OK();
-        };
-
-        // ordered vector: IN, IN_OR_BLOOM, others.
-        // so we can ignore other filter if IN Predicate exists.
-        auto compare_desc = [](IRuntimeFilter* d1, IRuntimeFilter* d2) {
-            if (d1->type() == d2->type()) {
-                return false;
-            } else if (d1->type() == RuntimeFilterType::IN_FILTER) {
-                return true;
-            } else if (d2->type() == RuntimeFilterType::IN_FILTER) {
-                return false;
-            } else if (d1->type() == RuntimeFilterType::IN_OR_BLOOM_FILTER) {
-                return true;
-            } else if (d2->type() == RuntimeFilterType::IN_OR_BLOOM_FILTER) {
-                return false;
-            } else {
-                return d1->type() < d2->type();
-            }
-        };
-        std::sort(_runtime_filters.begin(), _runtime_filters.end(), 
compare_desc);
+    static uint64_t get_real_size(IRuntimeFilter* runtime_filter, uint64_t 
hash_table_size) {
+        return runtime_filter->isset_global_size() ? 
runtime_filter->get_global_size()
+                                                   : hash_table_size;
+    }
 
-        // do not create 'in filter' when hash_table size over limit
-        const auto max_in_num = state->runtime_filter_max_in_num();
-        const bool over_max_in_num = (hash_table_size >= max_in_num);
+    Status ignore_filters(RuntimeState* state, uint64_t hash_table_size) {

Review Comment:
   warning: method 'ignore_filters' can be made static 
[readability-convert-member-functions-to-static]
   
   ```suggestion
       static Status ignore_filters(RuntimeState* state, uint64_t 
hash_table_size) {
   ```
   



##########
be/src/exprs/runtime_filter_slots.h:
##########
@@ -37,127 +37,125 @@
             const std::vector<IRuntimeFilter*>& runtime_filters, bool 
need_local_merge = false)
             : _build_expr_context(build_expr_ctxs),
               _runtime_filters(runtime_filters),
-              _need_local_merge(need_local_merge) {}
+              _need_local_merge(need_local_merge) {
+        for (auto* runtime_filter : _runtime_filters) {
+            
_runtime_filters_map[runtime_filter->expr_order()].push_back(runtime_filter);
+        }
+    }
 
-    Status init(RuntimeState* state, int64_t hash_table_size) {
-        // runtime filter effect strategy
-        // 1. we will ignore IN filter when hash_table_size is too big
-        // 2. we will ignore BLOOM filter and MinMax filter when 
hash_table_size
-        // is too small and IN filter has effect
-        std::map<int, bool> has_in_filter;
+    Status send_filter_size(RuntimeState* state, uint64_t hash_table_size, 
bool publish_local,
+                            pipeline::CountedFinishDependency* dependency) {
+        if (_runtime_filters.empty() || publish_local) {
+            return Status::OK();
+        }
+        for (auto* runtime_filter : _runtime_filters) {
+            if (runtime_filter->need_sync_filter_size()) {
+                runtime_filter->set_dependency(dependency);
+            }
+        }
+        for (auto* runtime_filter : _runtime_filters) {
+            if (runtime_filter->need_sync_filter_size()) {
+                
RETURN_IF_ERROR(runtime_filter->send_filter_size(hash_table_size));
+            }
+        }
+        return Status::OK();
+    }
 
-        auto ignore_local_filter = [&](int filter_id) {
-            auto runtime_filter_mgr = _need_local_merge ? 
state->global_runtime_filter_mgr()
-                                                        : 
state->local_runtime_filter_mgr();
+    Status ignore_filter(RuntimeState* state, IRuntimeFilter* runtime_filter,
+                         std::string reason) const {
+        runtime_filter->set_ignored(reason);
+        if (runtime_filter->has_remote_target()) {
+            RETURN_IF_ERROR(runtime_filter->publish());
+        } else {
+            auto* runtime_filter_mgr = _need_local_merge ? 
state->global_runtime_filter_mgr()
+                                                         : 
state->local_runtime_filter_mgr();
 
             std::vector<IRuntimeFilter*> filters;
-            RETURN_IF_ERROR(runtime_filter_mgr->get_consume_filters(filter_id, 
filters));
+            RETURN_IF_ERROR(
+                    
runtime_filter_mgr->get_consume_filters(runtime_filter->filter_id(), filters));
             if (filters.empty()) {
                 throw Exception(ErrorCode::INTERNAL_ERROR, "filters empty, 
filter_id={}",
-                                filter_id);
+                                runtime_filter->filter_id());
             }
             for (auto* filter : filters) {
-                filter->set_ignored("");
+                filter->set_ignored(reason);
                 filter->signal();
             }
-            return Status::OK();
-        };
+        }
+        return Status::OK();
+    }
 
-        auto ignore_remote_filter = [](IRuntimeFilter* runtime_filter, 
std::string& msg) {
-            runtime_filter->set_ignored(msg);
-            RETURN_IF_ERROR(runtime_filter->publish());
-            return Status::OK();
-        };
-
-        // ordered vector: IN, IN_OR_BLOOM, others.
-        // so we can ignore other filter if IN Predicate exists.
-        auto compare_desc = [](IRuntimeFilter* d1, IRuntimeFilter* d2) {
-            if (d1->type() == d2->type()) {
-                return false;
-            } else if (d1->type() == RuntimeFilterType::IN_FILTER) {
-                return true;
-            } else if (d2->type() == RuntimeFilterType::IN_FILTER) {
-                return false;
-            } else if (d1->type() == RuntimeFilterType::IN_OR_BLOOM_FILTER) {
-                return true;
-            } else if (d2->type() == RuntimeFilterType::IN_OR_BLOOM_FILTER) {
-                return false;
-            } else {
-                return d1->type() < d2->type();
-            }
-        };
-        std::sort(_runtime_filters.begin(), _runtime_filters.end(), 
compare_desc);
+    static uint64_t get_real_size(IRuntimeFilter* runtime_filter, uint64_t 
hash_table_size) {
+        return runtime_filter->isset_global_size() ? 
runtime_filter->get_global_size()
+                                                   : hash_table_size;
+    }
 
-        // do not create 'in filter' when hash_table size over limit
-        const auto max_in_num = state->runtime_filter_max_in_num();
-        const bool over_max_in_num = (hash_table_size >= max_in_num);
+    Status ignore_filters(RuntimeState* state, uint64_t hash_table_size) {
+        // process IN_OR_BLOOM_FILTER's real type
+        for (auto* runtime_filter : _runtime_filters) {
+            if (runtime_filter->type() != 
RuntimeFilterType::IN_OR_BLOOM_FILTER ||
+                get_real_size(runtime_filter, hash_table_size) <=
+                        state->runtime_filter_max_in_num()) {
+                continue;
+            }
+            RETURN_IF_ERROR(runtime_filter->change_to_bloom_filter());
+        }
 
+        // process ignore oversize IN_FILTER
+        std::unordered_map<int, bool> has_in_filter;
         for (auto* runtime_filter : _runtime_filters) {
-            if (runtime_filter->expr_order() < 0 ||
-                runtime_filter->expr_order() >= _build_expr_context.size()) {
-                return Status::InternalError(
-                        "runtime_filter meet invalid expr_order, 
expr_order={}, "
-                        "_build_expr_context.size={}",
-                        runtime_filter->expr_order(), 
_build_expr_context.size());
-            }
-
-            bool is_in_filter = (runtime_filter->type() == 
RuntimeFilterType::IN_FILTER);
-
-            if (over_max_in_num &&
-                runtime_filter->type() == 
RuntimeFilterType::IN_OR_BLOOM_FILTER) {
-                RETURN_IF_ERROR(runtime_filter->change_to_bloom_filter());
-            }
-
-            if (runtime_filter->is_bloomfilter()) {
-                
RETURN_IF_ERROR(runtime_filter->init_bloom_filter(hash_table_size));
-            }
-
-            // Note:
-            // In the case that exist *remote target* and in filter and other 
filter,
-            // we must merge other filter whatever in filter is over the max 
num in current node,
-            // because:
-            // case 1: (in filter >= max num) in current node, so in filter 
will be ignored,
-            //         and then other filter can be used
-            // case 2: (in filter < max num) in current node, we don't know 
whether the in filter
-            //         will be ignored in merge node, so we must transfer 
other filter to merge node
-            if (!runtime_filter->has_remote_target()) {
-                bool exists_in_filter = 
has_in_filter[runtime_filter->expr_order()];
-                if (is_in_filter && over_max_in_num) {
-                    VLOG_DEBUG << "fragment instance " << 
print_id(state->fragment_instance_id())
-                               << " ignore runtime filter(in filter id "
-                               << runtime_filter->filter_id() << ") because: 
in_num("
-                               << hash_table_size << ") >= max_in_num(" << 
max_in_num << ")";
-                    
RETURN_IF_ERROR(ignore_local_filter(runtime_filter->filter_id()));
-                    continue;
-                } else if (!is_in_filter && exists_in_filter) {
-                    // do not create 'bloom filter' and 'minmax filter' when 
'in filter' has created
-                    // because in filter is exactly filter, so it is enough to 
filter data
-                    VLOG_DEBUG << "fragment instance " << 
print_id(state->fragment_instance_id())
-                               << " ignore runtime filter("
-                               << 
IRuntimeFilter::to_string(runtime_filter->type()) << " id "
-                               << runtime_filter->filter_id()
-                               << ") because: already exists in filter";
-                    
RETURN_IF_ERROR(ignore_local_filter(runtime_filter->filter_id()));
-                    continue;
-                }
-            } else if (is_in_filter && over_max_in_num) {
-                std::string msg = fmt::format(
-                        "fragment instance {} ignore runtime filter(in filter 
id {}) because: "
-                        "in_num({}) >= max_in_num({})",
-                        print_id(state->fragment_instance_id()), 
runtime_filter->filter_id(),
-                        hash_table_size, max_in_num);
-                RETURN_IF_ERROR(ignore_remote_filter(runtime_filter, msg));
+            if (runtime_filter->get_real_type() != 
RuntimeFilterType::IN_FILTER) {
                 continue;
             }
 
-            if ((runtime_filter->type() == RuntimeFilterType::IN_FILTER) ||
-                (runtime_filter->type() == 
RuntimeFilterType::IN_OR_BLOOM_FILTER &&
-                 !over_max_in_num)) {
+            if (get_real_size(runtime_filter, hash_table_size) >
+                state->runtime_filter_max_in_num()) {
+                RETURN_IF_ERROR(
+                        ignore_filter(state, runtime_filter,
+                                      fmt::format("in_num({}) >= 
max_in_num({})", hash_table_size,
+                                                  
state->runtime_filter_max_in_num())));
+            } else {
                 has_in_filter[runtime_filter->expr_order()] = true;
             }
-            
_runtime_filters_map[runtime_filter->expr_order()].push_back(runtime_filter);
         }
 
+        // process ignore filter when it has IN_FILTER on same expr, and init 
bloom filter size
+        for (auto* runtime_filter : _runtime_filters) {
+            if (runtime_filter->get_real_type() == 
RuntimeFilterType::IN_FILTER) {
+                continue;
+            }
+
+            if (has_in_filter[runtime_filter->expr_order()]) {
+                RETURN_IF_ERROR(ignore_filter(state, runtime_filter, "has 
IN_FILTER on same expr"));
+                continue;
+            }
+
+            if (runtime_filter->get_real_type() == 
RuntimeFilterType::BLOOM_FILTER) {
+                if (runtime_filter->isset_global_size()) {
+                    LOG(WARNING) << "mytest hash_table_size:" << 
hash_table_size
+                                 << " global_size:" << 
runtime_filter->get_global_size();
+                }
+                RETURN_IF_ERROR(runtime_filter->init_bloom_filter(
+                        get_real_size(runtime_filter, hash_table_size)));
+            }
+        }
+        return Status::OK();
+    }
+
+    Status init_filters(RuntimeState* state, uint64_t hash_table_size) {

Review Comment:
   warning: method 'init_filters' can be made static 
[readability-convert-member-functions-to-static]
   
   ```suggestion
       static Status init_filters(RuntimeState* state, uint64_t 
hash_table_size) {
   ```
   



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org
For additional commands, e-mail: commits-h...@doris.apache.org

Reply via email to