github-actions[bot] commented on code in PR #32180:
URL: https://github.com/apache/doris/pull/32180#discussion_r1528039899


##########
be/src/exprs/runtime_filter_slots.h:
##########
@@ -37,125 +37,109 @@
             const std::vector<IRuntimeFilter*>& runtime_filters, bool 
need_local_merge = false)
             : _build_expr_context(build_expr_ctxs),
               _runtime_filters(runtime_filters),
-              _need_local_merge(need_local_merge) {}
-
-    Status init(RuntimeState* state, int64_t hash_table_size) {
-        // runtime filter effect strategy
-        // 1. we will ignore IN filter when hash_table_size is too big
-        // 2. we will ignore BLOOM filter and MinMax filter when 
hash_table_size
-        // is too small and IN filter has effect
-        std::map<int, bool> has_in_filter;
-
-        auto ignore_local_filter = [&](int filter_id) {
-            auto runtime_filter_mgr = _need_local_merge ? 
state->global_runtime_filter_mgr()
-                                                        : 
state->local_runtime_filter_mgr();
-
-            std::vector<IRuntimeFilter*> filters;
-            RETURN_IF_ERROR(runtime_filter_mgr->get_consume_filters(filter_id, 
filters));
-            if (filters.empty()) {
-                throw Exception(ErrorCode::INTERNAL_ERROR, "filters empty, 
filter_id={}",
-                                filter_id);
+              _need_local_merge(need_local_merge) {
+        for (auto* runtime_filter : _runtime_filters) {
+            
_runtime_filters_map[runtime_filter->expr_order()].push_back(runtime_filter);
+        }
+    }
+
+    Status send_filter_size(RuntimeState* state, int64_t hash_table_size, bool 
publish_local,
+                            pipeline::CountedFinishDependency* dependency) {
+        if (_runtime_filters.empty() || publish_local) {
+            return Status::OK();
+        }
+        for (auto* runtime_filter : _runtime_filters) {
+            if (runtime_filter->need_sync_filter_size()) {
+                runtime_filter->set_dependency(dependency);
             }
-            for (auto* filter : filters) {
-                filter->set_ignored("");
-                filter->signal();
+        }
+        for (auto* runtime_filter : _runtime_filters) {
+            if (runtime_filter->need_sync_filter_size()) {
+                
RETURN_IF_ERROR(runtime_filter->send_filter_size(hash_table_size));
             }
-            return Status::OK();
-        };
+        }
+        return Status::OK();
+    }
 
-        auto ignore_remote_filter = [](IRuntimeFilter* runtime_filter, 
std::string& msg) {
-            runtime_filter->set_ignored(msg);
-            RETURN_IF_ERROR(runtime_filter->publish());
-            return Status::OK();
+    Status process_filters(RuntimeState* state, uint64_t hash_table_size) {

Review Comment:
   warning: function 'process_filters' exceeds recommended size/complexity 
thresholds [readability-function-size]
   ```cpp
       Status process_filters(RuntimeState* state, uint64_t hash_table_size) {
              ^
   ```
   <details>
   <summary>Additional context</summary>
   
   **be/src/exprs/runtime_filter_slots.h:63:** 82 lines including whitespace 
and comments (threshold 80)
   ```cpp
       Status process_filters(RuntimeState* state, uint64_t hash_table_size) {
              ^
   ```
   
   </details>
   



##########
be/src/exprs/runtime_filter_slots.h:
##########
@@ -37,125 +37,109 @@ class VRuntimeFilterSlots {
             const std::vector<IRuntimeFilter*>& runtime_filters, bool 
need_local_merge = false)
             : _build_expr_context(build_expr_ctxs),
               _runtime_filters(runtime_filters),
-              _need_local_merge(need_local_merge) {}
-
-    Status init(RuntimeState* state, int64_t hash_table_size) {
-        // runtime filter effect strategy
-        // 1. we will ignore IN filter when hash_table_size is too big
-        // 2. we will ignore BLOOM filter and MinMax filter when 
hash_table_size
-        // is too small and IN filter has effect
-        std::map<int, bool> has_in_filter;
-
-        auto ignore_local_filter = [&](int filter_id) {
-            auto runtime_filter_mgr = _need_local_merge ? 
state->global_runtime_filter_mgr()
-                                                        : 
state->local_runtime_filter_mgr();
-
-            std::vector<IRuntimeFilter*> filters;
-            RETURN_IF_ERROR(runtime_filter_mgr->get_consume_filters(filter_id, 
filters));
-            if (filters.empty()) {
-                throw Exception(ErrorCode::INTERNAL_ERROR, "filters empty, 
filter_id={}",
-                                filter_id);
+              _need_local_merge(need_local_merge) {
+        for (auto* runtime_filter : _runtime_filters) {
+            
_runtime_filters_map[runtime_filter->expr_order()].push_back(runtime_filter);
+        }
+    }
+
+    Status send_filter_size(RuntimeState* state, int64_t hash_table_size, bool 
publish_local,

Review Comment:
   warning: method 'send_filter_size' can be made static 
[readability-convert-member-functions-to-static]
   
   ```suggestion
       static Status send_filter_size(RuntimeState* state, int64_t 
hash_table_size, bool publish_local,
   ```
   



##########
be/src/pipeline/exec/hashjoin_build_sink.cpp:
##########
@@ -112,6 +120,37 @@ Status HashJoinBuildSinkLocalState::open(RuntimeState* 
state) {
     return Status::OK();
 }
 
+Status HashJoinBuildSinkLocalState::close(RuntimeState* state, Status 
exec_status) {

Review Comment:
   warning: method 'close' can be made const 
[readability-make-member-function-const]
   
   be/src/pipeline/exec/hashjoin_build_sink.h:74:
   ```diff
   -     Status close(RuntimeState* state, Status exec_status) override;
   +     Status close(RuntimeState* state, Status exec_status) const override;
   ```
   
   ```suggestion
   Status HashJoinBuildSinkLocalState::close(RuntimeState* state, Status 
exec_status) const {
   ```
   



##########
be/src/exprs/runtime_filter_slots.h:
##########
@@ -37,125 +37,109 @@
             const std::vector<IRuntimeFilter*>& runtime_filters, bool 
need_local_merge = false)
             : _build_expr_context(build_expr_ctxs),
               _runtime_filters(runtime_filters),
-              _need_local_merge(need_local_merge) {}
-
-    Status init(RuntimeState* state, int64_t hash_table_size) {
-        // runtime filter effect strategy
-        // 1. we will ignore IN filter when hash_table_size is too big
-        // 2. we will ignore BLOOM filter and MinMax filter when 
hash_table_size
-        // is too small and IN filter has effect
-        std::map<int, bool> has_in_filter;
-
-        auto ignore_local_filter = [&](int filter_id) {
-            auto runtime_filter_mgr = _need_local_merge ? 
state->global_runtime_filter_mgr()
-                                                        : 
state->local_runtime_filter_mgr();
-
-            std::vector<IRuntimeFilter*> filters;
-            RETURN_IF_ERROR(runtime_filter_mgr->get_consume_filters(filter_id, 
filters));
-            if (filters.empty()) {
-                throw Exception(ErrorCode::INTERNAL_ERROR, "filters empty, 
filter_id={}",
-                                filter_id);
+              _need_local_merge(need_local_merge) {
+        for (auto* runtime_filter : _runtime_filters) {
+            
_runtime_filters_map[runtime_filter->expr_order()].push_back(runtime_filter);
+        }
+    }
+
+    Status send_filter_size(RuntimeState* state, int64_t hash_table_size, bool 
publish_local,
+                            pipeline::CountedFinishDependency* dependency) {
+        if (_runtime_filters.empty() || publish_local) {
+            return Status::OK();
+        }
+        for (auto* runtime_filter : _runtime_filters) {
+            if (runtime_filter->need_sync_filter_size()) {
+                runtime_filter->set_dependency(dependency);
             }
-            for (auto* filter : filters) {
-                filter->set_ignored("");
-                filter->signal();
+        }
+        for (auto* runtime_filter : _runtime_filters) {
+            if (runtime_filter->need_sync_filter_size()) {
+                
RETURN_IF_ERROR(runtime_filter->send_filter_size(hash_table_size));
             }
-            return Status::OK();
-        };
+        }
+        return Status::OK();
+    }
 
-        auto ignore_remote_filter = [](IRuntimeFilter* runtime_filter, 
std::string& msg) {
-            runtime_filter->set_ignored(msg);
-            RETURN_IF_ERROR(runtime_filter->publish());
-            return Status::OK();
+    Status process_filters(RuntimeState* state, uint64_t hash_table_size) {

Review Comment:
   warning: method 'process_filters' can be made const 
[readability-make-member-function-const]
   
   ```suggestion
       Status process_filters(RuntimeState* state, uint64_t hash_table_size) 
const {
   ```
   



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org
For additional commands, e-mail: commits-h...@doris.apache.org

Reply via email to