yiguolei commented on code in PR #10103: URL: https://github.com/apache/incubator-doris/pull/10103#discussion_r897425517
########## be/src/vec/exec/volap_scan_node.cpp: ########## @@ -162,29 +163,63 @@ void VOlapScanNode::scanner_thread(VOlapScanner* scanner) { scanner->set_opened(); } - std::vector<ExprContext*> contexts; + std::vector<VExpr*> vexprs; auto& scanner_filter_apply_marks = *scanner->mutable_runtime_filter_marks(); DCHECK(scanner_filter_apply_marks.size() == _runtime_filter_descs.size()); for (size_t i = 0; i < scanner_filter_apply_marks.size(); i++) { if (!scanner_filter_apply_marks[i] && !_runtime_filter_ctxs[i].apply_mark) { + /// When runtime filters are ready during running, we should use them to filter data + /// in VOlapScanner. + /// New arrival rf will be processed as below: + /// 1. convert these runtime filters to vectorized expressions + /// 2. if this is the first scanner thread to receive this rf, construct a new + /// VExprContext and update `_vconjunct_ctx_ptr` in scan node. Notice that we use + /// `_runtime_filter_ready_flag` to ensure `_vconjunct_ctx_ptr` will be updated only + /// once after any runtime_filters are ready. + /// 3. finally, just copy this new VExprContext to scanner and use it to filter data. IRuntimeFilter* runtime_filter = nullptr; state->runtime_filter_mgr()->get_consume_filter(_runtime_filter_descs[i].filter_id, &runtime_filter); DCHECK(runtime_filter != nullptr); bool ready = runtime_filter->is_ready(); if (ready) { - runtime_filter->get_prepared_context(&contexts, row_desc(), _expr_mem_tracker); + runtime_filter->get_prepared_vexprs(&vexprs, row_desc(), _expr_mem_tracker); scanner_filter_apply_marks[i] = true; + { + std::unique_lock<std::mutex> l(*(_rf_locks[i])); + if (!_runtime_filter_ready_flag[i]) { + // Use all conjuncts and new arrival runtime filters to construct a new + // expression tree here. + auto last_expr = + _vconjunct_ctx_ptr ? (*_vconjunct_ctx_ptr)->root() : vexprs[0]; + for (size_t j = _vconjunct_ctx_ptr ? 0 : 1; j < vexprs.size(); j++) { + TExprNode texpr_node; + texpr_node.__set_type(create_type_desc(PrimitiveType::TYPE_BOOLEAN)); + texpr_node.__set_node_type(TExprNodeType::COMPOUND_PRED); + texpr_node.__set_opcode(TExprOpcode::COMPOUND_AND); + VExpr* new_node = _pool->add(new VcompoundPred(texpr_node)); + new_node->add_child(last_expr); + new_node->add_child(vexprs[j]); + last_expr = new_node; + } + _vconjunct_ctx_ptr.reset(new doris::vectorized::VExprContext*); + auto new_vconjunct_ctx_ptr = _pool->add(new VExprContext(last_expr)); + WARN_IF_ERROR(new_vconjunct_ctx_ptr->prepare(state, row_desc(), Review Comment: If prepare or open failed, the _vcontjunct_ctx_ptr is reset, the result is wrong? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org For additional commands, e-mail: commits-h...@doris.apache.org