yiguolei commented on code in PR #10103:
URL: https://github.com/apache/incubator-doris/pull/10103#discussion_r897425517


##########
be/src/vec/exec/volap_scan_node.cpp:
##########
@@ -162,29 +163,63 @@ void VOlapScanNode::scanner_thread(VOlapScanner* scanner) 
{
         scanner->set_opened();
     }
 
-    std::vector<ExprContext*> contexts;
+    std::vector<VExpr*> vexprs;
     auto& scanner_filter_apply_marks = 
*scanner->mutable_runtime_filter_marks();
     DCHECK(scanner_filter_apply_marks.size() == _runtime_filter_descs.size());
     for (size_t i = 0; i < scanner_filter_apply_marks.size(); i++) {
         if (!scanner_filter_apply_marks[i] && 
!_runtime_filter_ctxs[i].apply_mark) {
+            /// When runtime filters are ready during running, we should use 
them to filter data
+            /// in VOlapScanner.
+            /// New arrival rf will be processed as below:
+            /// 1. convert these runtime filters to vectorized expressions
+            /// 2. if this is the first scanner thread to receive this rf, 
construct a new
+            /// VExprContext and update `_vconjunct_ctx_ptr` in scan node. 
Notice that we use
+            /// `_runtime_filter_ready_flag` to ensure `_vconjunct_ctx_ptr` 
will be updated only
+            /// once after any runtime_filters are ready.
+            /// 3. finally, just copy this new VExprContext to scanner and use 
it to filter data.
             IRuntimeFilter* runtime_filter = nullptr;
             
state->runtime_filter_mgr()->get_consume_filter(_runtime_filter_descs[i].filter_id,
                                                             &runtime_filter);
             DCHECK(runtime_filter != nullptr);
             bool ready = runtime_filter->is_ready();
             if (ready) {
-                runtime_filter->get_prepared_context(&contexts, row_desc(), 
_expr_mem_tracker);
+                runtime_filter->get_prepared_vexprs(&vexprs, row_desc(), 
_expr_mem_tracker);
                 scanner_filter_apply_marks[i] = true;
+                {
+                    std::unique_lock<std::mutex> l(*(_rf_locks[i]));
+                    if (!_runtime_filter_ready_flag[i]) {
+                        // Use all conjuncts and new arrival runtime filters 
to construct a new
+                        // expression tree here.
+                        auto last_expr =
+                                _vconjunct_ctx_ptr ? 
(*_vconjunct_ctx_ptr)->root() : vexprs[0];
+                        for (size_t j = _vconjunct_ctx_ptr ? 0 : 1; j < 
vexprs.size(); j++) {
+                            TExprNode texpr_node;
+                            
texpr_node.__set_type(create_type_desc(PrimitiveType::TYPE_BOOLEAN));
+                            
texpr_node.__set_node_type(TExprNodeType::COMPOUND_PRED);
+                            texpr_node.__set_opcode(TExprOpcode::COMPOUND_AND);
+                            VExpr* new_node = _pool->add(new 
VcompoundPred(texpr_node));
+                            new_node->add_child(last_expr);
+                            new_node->add_child(vexprs[j]);
+                            last_expr = new_node;
+                        }
+                        _vconjunct_ctx_ptr.reset(new 
doris::vectorized::VExprContext*);
+                        auto new_vconjunct_ctx_ptr = _pool->add(new 
VExprContext(last_expr));
+                        WARN_IF_ERROR(new_vconjunct_ctx_ptr->prepare(state, 
row_desc(),

Review Comment:
   If prepare or open failed, the _vcontjunct_ctx_ptr is reset, the result is 
wrong?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org
For additional commands, e-mail: commits-h...@doris.apache.org

Reply via email to