github-actions[bot] commented on code in PR #32180:
URL: https://github.com/apache/doris/pull/32180#discussion_r1528610463


##########
be/src/exprs/runtime_filter.cpp:
##########
@@ -928,27 +931,27 @@
                                           PColumnValue&, ObjectPool*)) {
         for (int i = 0; i < filter->values_size(); ++i) {
             PColumnValue column = filter->values(i);
-            assign_func(_context.hybrid_set, column, _pool);
+            assign_func(_context->hybrid_set, column, _pool);
         }
     }
 
-    size_t get_in_filter_size() const { return _context.hybrid_set->size(); }
+    size_t get_in_filter_size() const { return _context->hybrid_set->size(); }
 
     std::shared_ptr<BitmapFilterFuncBase> get_bitmap_filter() const {
-        return _context.bitmap_filter_func;
+        return _context->bitmap_filter_func;

Review Comment:
   warning: method 'set_filter_id' can be made static 
[readability-convert-member-functions-to-static]
   
   ```suggestion
       static void set_filter_id(int id) {
   ```
   



##########
be/src/exprs/runtime_filter.cpp:
##########
@@ -346,61 +347,66 @@ class RuntimePredicateWrapper {
         CHECK(_filter_type == RuntimeFilterType::IN_OR_BLOOM_FILTER)
                 << "Can not change to bloom filter because of runtime filter 
type is "
                 << IRuntimeFilter::to_string(_filter_type);
-        _is_bloomfilter = true;
-        BloomFilterFuncBase* bf = _context.bloom_filter_func.get();
+        BloomFilterFuncBase* bf = _context->bloom_filter_func.get();
         if (need_init_bf) {
             // BloomFilter may be not init
             RETURN_IF_ERROR(bf->init_with_fixed_length());
             insert_to_bloom_filter(bf);
         }
         // release in filter
-        _context.hybrid_set.reset();
+        _context->hybrid_set.reset();
         return Status::OK();
     }
 
     Status init_bloom_filter(const size_t build_bf_cardinality) {
         DCHECK(_filter_type == RuntimeFilterType::BLOOM_FILTER ||
                _filter_type == RuntimeFilterType::IN_OR_BLOOM_FILTER);
-        return 
_context.bloom_filter_func->init_with_cardinality(build_bf_cardinality);
+        return 
_context->bloom_filter_func->init_with_cardinality(build_bf_cardinality);
+    }
+
+    bool get_build_bf_cardinality() const {
+        DCHECK(_filter_type == RuntimeFilterType::BLOOM_FILTER ||

Review Comment:
   warning: method 'insert_to_bloom_filter' can be made static 
[readability-convert-member-functions-to-static]
   
   ```suggestion
       static void insert_to_bloom_filter(BloomFilterFuncBase* bloom_filter) {
   ```
   



##########
be/src/runtime/fragment_mgr.cpp:
##########
@@ -1348,71 +1348,113 @@
     bool is_pipeline = request->has_is_pipeline() && request->is_pipeline();
     int64_t start_apply = MonotonicMillis();
 
+    std::shared_ptr<PlanFragmentExecutor> fragment_executor;
+    std::shared_ptr<pipeline::PipelineFragmentContext> pip_context;
+
+    RuntimeFilterMgr* runtime_filter_mgr = nullptr;
+    ObjectPool* pool = nullptr;
+
     const auto& fragment_instance_ids = request->fragment_instance_ids();
-    if (!fragment_instance_ids.empty()) {
-        UniqueId fragment_instance_id = fragment_instance_ids[0];
-        TUniqueId tfragment_instance_id = fragment_instance_id.to_thrift();
-
-        std::shared_ptr<PlanFragmentExecutor> fragment_executor;
-        std::shared_ptr<pipeline::PipelineFragmentContext> pip_context;
-
-        RuntimeFilterMgr* runtime_filter_mgr = nullptr;
-        ObjectPool* pool = nullptr;
-        if (is_pipeline) {
-            std::unique_lock<std::mutex> lock(_lock);
-            auto iter = _pipeline_map.find(tfragment_instance_id);
-            if (iter == _pipeline_map.end()) {
-                VLOG_CRITICAL << "unknown.... fragment-id:" << 
fragment_instance_id;
-                return Status::InvalidArgument("fragment-id: {}", 
fragment_instance_id.to_string());
-            }
-            pip_context = iter->second;
+    {
+        std::unique_lock<std::mutex> lock(_lock);
+        for (UniqueId fragment_instance_id : fragment_instance_ids) {
+            TUniqueId tfragment_instance_id = fragment_instance_id.to_thrift();
 
-            DCHECK(pip_context != nullptr);
-            runtime_filter_mgr = 
pip_context->get_query_ctx()->runtime_filter_mgr();
-            pool = &pip_context->get_query_ctx()->obj_pool;
-        } else {
-            std::unique_lock<std::mutex> lock(_lock);
-            auto iter = _fragment_instance_map.find(tfragment_instance_id);
-            if (iter == _fragment_instance_map.end()) {
-                VLOG_CRITICAL << "unknown.... fragment instance id:"
-                              << print_id(tfragment_instance_id);
-                return Status::InvalidArgument("fragment instance id: {}",
-                                               
print_id(tfragment_instance_id));
-            }
-            fragment_executor = iter->second;
+            if (is_pipeline) {
+                auto iter = _pipeline_map.find(tfragment_instance_id);
+                if (iter == _pipeline_map.end()) {
+                    continue;
+                }
+                pip_context = iter->second;
 
-            DCHECK(fragment_executor != nullptr);
-            runtime_filter_mgr = 
fragment_executor->get_query_ctx()->runtime_filter_mgr();
-            pool = &fragment_executor->get_query_ctx()->obj_pool;
-        }
+                DCHECK(pip_context != nullptr);
+                runtime_filter_mgr = 
pip_context->get_query_ctx()->runtime_filter_mgr();
+                pool = &pip_context->get_query_ctx()->obj_pool;
+            } else {
+                auto iter = _fragment_instance_map.find(tfragment_instance_id);
+                if (iter == _fragment_instance_map.end()) {
+                    continue;
+                }
+                fragment_executor = iter->second;
 
-        // 1. get the target filters
-        std::vector<IRuntimeFilter*> filters;
-        
RETURN_IF_ERROR(runtime_filter_mgr->get_consume_filters(request->filter_id(), 
filters));
-
-        // 2. create the filter wrapper to replace or ignore the target filters
-        if (request->has_in_filter() && 
request->in_filter().has_ignored_msg()) {
-            const auto& in_filter = request->in_filter();
-
-            std::ranges::for_each(filters, [&in_filter](auto& filter) {
-                filter->set_ignored(in_filter.ignored_msg());
-                filter->signal();
-            });
-        } else if (!filters.empty()) {
-            UpdateRuntimeFilterParamsV2 params {request, attach_data, pool,
-                                                filters[0]->column_type()};
-            RuntimePredicateWrapper* filter_wrapper = nullptr;
-            RETURN_IF_ERROR(IRuntimeFilter::create_wrapper(&params, 
&filter_wrapper));
-
-            std::ranges::for_each(filters, [&](auto& filter) {
-                filter->update_filter(filter_wrapper, request->merge_time(), 
start_apply);
-            });
+                DCHECK(fragment_executor != nullptr);
+                runtime_filter_mgr = 
fragment_executor->get_query_ctx()->runtime_filter_mgr();
+                pool = &fragment_executor->get_query_ctx()->obj_pool;
+            }
+            break;
         }
     }
 
+    if (runtime_filter_mgr == nullptr) {
+        // all instance finished
+        return Status::OK();
+    }
+
+    // 1. get the target filters
+    std::vector<IRuntimeFilter*> filters;
+    
RETURN_IF_ERROR(runtime_filter_mgr->get_consume_filters(request->filter_id(), 
filters));
+
+    // 2. create the filter wrapper to replace or ignore the target filters
+    if (request->has_in_filter() && request->in_filter().has_ignored_msg()) {
+        const auto& in_filter = request->in_filter();
+
+        std::ranges::for_each(filters, [&in_filter](auto& filter) {
+            filter->set_ignored(in_filter.ignored_msg());
+            filter->signal();
+        });
+    } else if (!filters.empty()) {
+        UpdateRuntimeFilterParamsV2 params {request, attach_data, pool, 
filters[0]->column_type()};
+        RuntimePredicateWrapper* filter_wrapper = nullptr;
+        RETURN_IF_ERROR(IRuntimeFilter::create_wrapper(&params, 
&filter_wrapper));
+
+        std::ranges::for_each(filters, [&](auto& filter) {
+            filter->update_filter(filter_wrapper, request->merge_time(), 
start_apply);
+        });
+    }
+
     return Status::OK();
 }
 
+Status FragmentMgr::send_filter_size(const PSendFilterSizeRequest* request) {
+    UniqueId queryid = request->query_id();
+    std::shared_ptr<RuntimeFilterMergeControllerEntity> filter_controller;
+    RETURN_IF_ERROR(_runtimefilter_controller.acquire(queryid, 
&filter_controller));
+
+    std::shared_ptr<QueryContext> query_ctx;
+    {
+        TUniqueId query_id;
+        query_id.__set_hi(queryid.hi);
+        query_id.__set_lo(queryid.lo);
+        std::lock_guard<std::mutex> lock(_lock);
+        auto iter = _query_ctx_map.find(query_id);
+        if (iter == _query_ctx_map.end()) {
+            return Status::InvalidArgument("query-id: {}", 
queryid.to_string());
+        }
+
+        query_ctx = iter->second;
+    }
+    auto merge_status = filter_controller->send_filter_size(request);
+    return merge_status;
+}
+
+Status FragmentMgr::sync_filter_size(const PSyncFilterSizeRequest* request) {
+    UniqueId queryid = request->query_id();
+    std::shared_ptr<QueryContext> query_ctx;
+    {
+        TUniqueId query_id;
+        query_id.__set_hi(queryid.hi);

Review Comment:
   warning: method 'send_filter_size' can be made static 
[readability-convert-member-functions-to-static]
   
   ```suggestion
   static Status FragmentMgr::send_filter_size(const PSendFilterSizeRequest* 
request) {
   ```
   



##########
be/src/runtime/fragment_mgr.cpp:
##########
@@ -1348,71 +1348,113 @@ Status FragmentMgr::apply_filterv2(const 
PPublishFilterRequestV2* request,
     bool is_pipeline = request->has_is_pipeline() && request->is_pipeline();
     int64_t start_apply = MonotonicMillis();
 
+    std::shared_ptr<PlanFragmentExecutor> fragment_executor;
+    std::shared_ptr<pipeline::PipelineFragmentContext> pip_context;
+
+    RuntimeFilterMgr* runtime_filter_mgr = nullptr;
+    ObjectPool* pool = nullptr;
+
     const auto& fragment_instance_ids = request->fragment_instance_ids();
-    if (!fragment_instance_ids.empty()) {
-        UniqueId fragment_instance_id = fragment_instance_ids[0];
-        TUniqueId tfragment_instance_id = fragment_instance_id.to_thrift();
-
-        std::shared_ptr<PlanFragmentExecutor> fragment_executor;
-        std::shared_ptr<pipeline::PipelineFragmentContext> pip_context;
-
-        RuntimeFilterMgr* runtime_filter_mgr = nullptr;
-        ObjectPool* pool = nullptr;
-        if (is_pipeline) {
-            std::unique_lock<std::mutex> lock(_lock);
-            auto iter = _pipeline_map.find(tfragment_instance_id);
-            if (iter == _pipeline_map.end()) {
-                VLOG_CRITICAL << "unknown.... fragment-id:" << 
fragment_instance_id;
-                return Status::InvalidArgument("fragment-id: {}", 
fragment_instance_id.to_string());
-            }
-            pip_context = iter->second;
+    {
+        std::unique_lock<std::mutex> lock(_lock);
+        for (UniqueId fragment_instance_id : fragment_instance_ids) {
+            TUniqueId tfragment_instance_id = fragment_instance_id.to_thrift();
 
-            DCHECK(pip_context != nullptr);
-            runtime_filter_mgr = 
pip_context->get_query_ctx()->runtime_filter_mgr();
-            pool = &pip_context->get_query_ctx()->obj_pool;
-        } else {
-            std::unique_lock<std::mutex> lock(_lock);
-            auto iter = _fragment_instance_map.find(tfragment_instance_id);
-            if (iter == _fragment_instance_map.end()) {
-                VLOG_CRITICAL << "unknown.... fragment instance id:"
-                              << print_id(tfragment_instance_id);
-                return Status::InvalidArgument("fragment instance id: {}",
-                                               
print_id(tfragment_instance_id));
-            }
-            fragment_executor = iter->second;
+            if (is_pipeline) {
+                auto iter = _pipeline_map.find(tfragment_instance_id);
+                if (iter == _pipeline_map.end()) {
+                    continue;
+                }
+                pip_context = iter->second;
 
-            DCHECK(fragment_executor != nullptr);
-            runtime_filter_mgr = 
fragment_executor->get_query_ctx()->runtime_filter_mgr();
-            pool = &fragment_executor->get_query_ctx()->obj_pool;
-        }
+                DCHECK(pip_context != nullptr);
+                runtime_filter_mgr = 
pip_context->get_query_ctx()->runtime_filter_mgr();
+                pool = &pip_context->get_query_ctx()->obj_pool;
+            } else {

Review Comment:
   warning: method 'apply_filterv2' can be made static 
[readability-convert-member-functions-to-static]
   
   be/src/runtime/fragment_mgr.h:137:
   ```diff
   -     Status apply_filterv2(const PPublishFilterRequestV2* request,
   +     static Status apply_filterv2(const PPublishFilterRequestV2* request,
   ```
   



##########
be/src/exprs/runtime_filter_slots.h:
##########
@@ -37,125 +37,104 @@
             const std::vector<IRuntimeFilter*>& runtime_filters, bool 
need_local_merge = false)
             : _build_expr_context(build_expr_ctxs),
               _runtime_filters(runtime_filters),
-              _need_local_merge(need_local_merge) {}
+              _need_local_merge(need_local_merge) {
+        for (auto* runtime_filter : _runtime_filters) {
+            
_runtime_filters_map[runtime_filter->expr_order()].push_back(runtime_filter);
+        }
+    }
 
-    Status init(RuntimeState* state, int64_t hash_table_size) {
-        // runtime filter effect strategy
-        // 1. we will ignore IN filter when hash_table_size is too big
-        // 2. we will ignore BLOOM filter and MinMax filter when 
hash_table_size
-        // is too small and IN filter has effect
-        std::map<int, bool> has_in_filter;
+    Status send_filter_size(RuntimeState* state, uint64_t hash_table_size, 
bool publish_local,
+                            pipeline::CountedFinishDependency* dependency) {
+        if (_runtime_filters.empty() || publish_local) {
+            return Status::OK();
+        }
+        for (auto* runtime_filter : _runtime_filters) {
+            if (runtime_filter->need_sync_filter_size()) {
+                runtime_filter->set_dependency(dependency);
+            }
+        }
+        for (auto* runtime_filter : _runtime_filters) {
+            if (runtime_filter->need_sync_filter_size()) {
+                
RETURN_IF_ERROR(runtime_filter->send_filter_size(hash_table_size));
+            }
+        }
+        return Status::OK();
+    }
 
-        auto ignore_local_filter = [&](int filter_id) {
-            auto runtime_filter_mgr = _need_local_merge ? 
state->global_runtime_filter_mgr()
-                                                        : 
state->local_runtime_filter_mgr();
+    Status ignore_filter(RuntimeState* state, IRuntimeFilter* runtime_filter,
+                         std::string reason) const {
+        runtime_filter->set_ignored(reason);
+        if (runtime_filter->has_remote_target()) {
+            RETURN_IF_ERROR(runtime_filter->publish());
+        } else {
+            auto* runtime_filter_mgr = _need_local_merge ? 
state->global_runtime_filter_mgr()
+                                                         : 
state->local_runtime_filter_mgr();
 
             std::vector<IRuntimeFilter*> filters;
-            RETURN_IF_ERROR(runtime_filter_mgr->get_consume_filters(filter_id, 
filters));
+            RETURN_IF_ERROR(
+                    
runtime_filter_mgr->get_consume_filters(runtime_filter->filter_id(), filters));
             if (filters.empty()) {
                 throw Exception(ErrorCode::INTERNAL_ERROR, "filters empty, 
filter_id={}",
-                                filter_id);
+                                runtime_filter->filter_id());
             }
             for (auto* filter : filters) {
-                filter->set_ignored("");
+                filter->set_ignored(reason);
                 filter->signal();
             }
-            return Status::OK();
-        };
+        }
+        return Status::OK();
+    }
 
-        auto ignore_remote_filter = [](IRuntimeFilter* runtime_filter, 
std::string& msg) {
-            runtime_filter->set_ignored(msg);
-            RETURN_IF_ERROR(runtime_filter->publish());
-            return Status::OK();
+    Status process_filters(RuntimeState* state, uint64_t hash_table_size) {

Review Comment:
   warning: method 'process_filters' can be made static 
[readability-convert-member-functions-to-static]
   
   ```suggestion
       static Status process_filters(RuntimeState* state, uint64_t 
hash_table_size) {
   ```
   



##########
be/src/service/internal_service.cpp:
##########
@@ -1170,9 +1170,36 @@ void 
PInternalService::merge_filter(::google::protobuf::RpcController* controlle
         auto attachment = 
static_cast<brpc::Controller*>(controller)->request_attachment();
         butil::IOBufAsZeroCopyInputStream zero_copy_input_stream(attachment);
         Status st = _exec_env->fragment_mgr()->merge_filter(request, 
&zero_copy_input_stream);
-        if (!st.ok()) {
-            LOG(WARNING) << "merge meet error" << st.to_string();
-        }
+        st.to_protobuf(response->mutable_status());
+    });
+    if (!ret) {
+        offer_failed(response, done, _light_work_pool);
+        return;
+    }

Review Comment:
   warning: method 'send_filter_size' can be made static 
[readability-convert-member-functions-to-static]
   
   ```suggestion
   static void 
PInternalService::send_filter_size(::google::protobuf::RpcController* 
controller,
   ```
   



##########
be/src/exprs/runtime_filter_slots.h:
##########
@@ -37,125 +37,104 @@ class VRuntimeFilterSlots {
             const std::vector<IRuntimeFilter*>& runtime_filters, bool 
need_local_merge = false)
             : _build_expr_context(build_expr_ctxs),
               _runtime_filters(runtime_filters),
-              _need_local_merge(need_local_merge) {}
+              _need_local_merge(need_local_merge) {
+        for (auto* runtime_filter : _runtime_filters) {
+            
_runtime_filters_map[runtime_filter->expr_order()].push_back(runtime_filter);
+        }
+    }
 
-    Status init(RuntimeState* state, int64_t hash_table_size) {
-        // runtime filter effect strategy
-        // 1. we will ignore IN filter when hash_table_size is too big
-        // 2. we will ignore BLOOM filter and MinMax filter when 
hash_table_size
-        // is too small and IN filter has effect
-        std::map<int, bool> has_in_filter;
+    Status send_filter_size(RuntimeState* state, uint64_t hash_table_size, 
bool publish_local,

Review Comment:
   warning: method 'send_filter_size' can be made static 
[readability-convert-member-functions-to-static]
   
   ```suggestion
       static Status send_filter_size(RuntimeState* state, uint64_t 
hash_table_size, bool publish_local,
   ```
   



##########
be/src/service/internal_service.cpp:
##########
@@ -1170,9 +1170,36 @@
         auto attachment = 
static_cast<brpc::Controller*>(controller)->request_attachment();
         butil::IOBufAsZeroCopyInputStream zero_copy_input_stream(attachment);
         Status st = _exec_env->fragment_mgr()->merge_filter(request, 
&zero_copy_input_stream);
-        if (!st.ok()) {
-            LOG(WARNING) << "merge meet error" << st.to_string();
-        }
+        st.to_protobuf(response->mutable_status());
+    });
+    if (!ret) {
+        offer_failed(response, done, _light_work_pool);
+        return;
+    }
+}
+
+void PInternalService::send_filter_size(::google::protobuf::RpcController* 
controller,
+                                        const ::doris::PSendFilterSizeRequest* 
request,
+                                        ::doris::PSendFilterSizeResponse* 
response,
+                                        ::google::protobuf::Closure* done) {
+    bool ret = _light_work_pool.try_offer([this, request, response, done]() {
+        brpc::ClosureGuard closure_guard(done);
+        Status st = _exec_env->fragment_mgr()->send_filter_size(request);
+        st.to_protobuf(response->mutable_status());
+    });
+    if (!ret) {
+        offer_failed(response, done, _light_work_pool);
+        return;
+    }

Review Comment:
   warning: method 'sync_filter_size' can be made static 
[readability-convert-member-functions-to-static]
   
   ```suggestion
   static void 
PInternalService::sync_filter_size(::google::protobuf::RpcController* 
controller,
   ```
   



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org
For additional commands, e-mail: commits-h...@doris.apache.org

Reply via email to