This is an automated email from the ASF dual-hosted git repository.

panxiaolei pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/doris.git


The following commit(s) were added to refs/heads/master by this push:
     new e956872d426 [Improvement](join) support all match one logic at join 
lazy materialize (#50375)
e956872d426 is described below

commit e956872d4267bd900b2be9464dc051e1dacbdf54
Author: Pxl <[email protected]>
AuthorDate: Wed May 7 14:51:11 2025 +0800

    [Improvement](join) support all match one logic at join lazy materialize 
(#50375)
    
    ### What problem does this PR solve?
    support all match one logic at join lazy materialize
    ```
    select max(a.k4) from t10000000 a, t10000000 b where a.k1 = b.k1 and 
a.k2>=b.k2 and b.k2>=a.k2;
    
    before
    -  NonEqualJoinConjunctEvaluationTime:  71.582ms
    after
    -  NonEqualJoinConjunctEvaluationTime:  41.480ms
    ```
    ### Check List (For Author)
    
    - Test <!-- At least one of them must be included. -->
        - [ ] Regression test
        - [ ] Unit Test
        - [ ] Manual test (add detailed scripts or steps below)
        - [x] No need to test or manual test. Explain why:
    - [ ] This is a refactor/code format and no logic has been changed.
            - [x] Previous test can cover this change.
            - [ ] No code files have been changed.
            - [ ] Other reason <!-- Add your reason?  -->
    
    - Behavior changed:
        - [x] No.
        - [ ] Yes. <!-- Explain the behavior change -->
    
    - Does this need documentation?
        - [x] No.
    - [ ] Yes. <!-- Add document PR link here. eg:
    https://github.com/apache/doris-website/pull/1214 -->
    
    ### Check List (For Reviewer who merge this PR)
    
    - [ ] Confirm the release note
    - [ ] Confirm test cases
    - [ ] Confirm document
    - [ ] Add branch pick label <!-- Add branch pick label that this PR
    should merge into -->
---
 .../pipeline/exec/join/process_hash_table_probe.h  |  7 +-
 .../exec/join/process_hash_table_probe_impl.h      | 90 +++++++++++-----------
 2 files changed, 49 insertions(+), 48 deletions(-)

diff --git a/be/src/pipeline/exec/join/process_hash_table_probe.h 
b/be/src/pipeline/exec/join/process_hash_table_probe.h
index c9a3e62f3aa..4fde7ed5fea 100644
--- a/be/src/pipeline/exec/join/process_hash_table_probe.h
+++ b/be/src/pipeline/exec/join/process_hash_table_probe.h
@@ -46,9 +46,9 @@ struct ProcessHashTableProbe {
     ~ProcessHashTableProbe() = default;
 
     // output build side result column
-    void build_side_output_column(vectorized::MutableColumns& mcol, int size, 
bool is_mark_join);
+    void build_side_output_column(vectorized::MutableColumns& mcol, bool 
is_mark_join);
 
-    void probe_side_output_column(vectorized::MutableColumns& mcol, int size, 
bool all_match_one);
+    void probe_side_output_column(vectorized::MutableColumns& mcol);
 
     // Only process the join with no other join conjunct, because of no other 
join conjunt
     // the output block struct is same with mutable block. we can do more opt 
on it and simplify
@@ -62,8 +62,7 @@ struct ProcessHashTableProbe {
     // each matching join column need to be processed by other join conjunct. 
so the struct of mutable block
     // and output block may be different
     // The output result is determined by the other join conjunct result and 
same_to_prev struct
-    Status do_other_join_conjuncts(vectorized::Block* output_block, 
DorisVector<uint8_t>& visited,
-                                   bool has_null_in_build_side);
+    Status do_other_join_conjuncts(vectorized::Block* output_block, 
DorisVector<uint8_t>& visited);
 
     Status do_mark_join_conjuncts(vectorized::Block* output_block, const 
uint8_t* null_map);
 
diff --git a/be/src/pipeline/exec/join/process_hash_table_probe_impl.h 
b/be/src/pipeline/exec/join/process_hash_table_probe_impl.h
index 40ab0c73768..df7ad9456bb 100644
--- a/be/src/pipeline/exec/join/process_hash_table_probe_impl.h
+++ b/be/src/pipeline/exec/join/process_hash_table_probe_impl.h
@@ -33,6 +33,28 @@
 
 namespace doris::pipeline {
 #include "common/compile_check_begin.h"
+
+static bool check_all_match_one(const auto& vecs) {
+    size_t size = vecs.size();
+    if (!size || vecs[size - 1] != vecs[0] + size - 1) {
+        return false;
+    }
+    for (size_t i = 1; i < size; i++) {
+        if (vecs[i] == vecs[i - 1]) {
+            return false;
+        }
+    }
+    return true;
+}
+
+static void insert_with_indexs(auto& dst, const auto& src, const auto& indexs, 
bool all_match_one) {
+    if (all_match_one) {
+        dst->insert_range_from(*src, indexs[0], indexs.size());
+    } else {
+        dst->insert_indices_from(*src, indexs.data(), indexs.data() + 
indexs.size());
+    }
+}
+
 template <int JoinOpType>
 
ProcessHashTableProbe<JoinOpType>::ProcessHashTableProbe(HashJoinProbeLocalState*
 parent,
                                                          int batch_size)
@@ -55,13 +77,14 @@ 
ProcessHashTableProbe<JoinOpType>::ProcessHashTableProbe(HashJoinProbeLocalState
 
 template <int JoinOpType>
 void 
ProcessHashTableProbe<JoinOpType>::build_side_output_column(vectorized::MutableColumns&
 mcol,
-                                                                 int size, 
bool is_mark_join) {
+                                                                 bool 
is_mark_join) {
     SCOPED_TIMER(_build_side_output_timer);
 
     // indicates whether build_indexs contain 0
     bool build_index_has_zero =
             (JoinOpType != TJoinOp::INNER_JOIN && JoinOpType != 
TJoinOp::RIGHT_OUTER_JOIN) ||
             _have_other_join_conjunct || is_mark_join;
+    size_t size = _build_indexs.size();
     if (!size) {
         return;
     }
@@ -115,10 +138,10 @@ void 
ProcessHashTableProbe<JoinOpType>::build_side_output_column(vectorized::Mut
 }
 
 template <int JoinOpType>
-void 
ProcessHashTableProbe<JoinOpType>::probe_side_output_column(vectorized::MutableColumns&
 mcol,
-                                                                 int size, 
bool all_match_one) {
+void 
ProcessHashTableProbe<JoinOpType>::probe_side_output_column(vectorized::MutableColumns&
 mcol) {
     SCOPED_TIMER(_probe_side_output_timer);
     auto& probe_block = _parent->_probe_block;
+    bool all_match_one = check_all_match_one(_probe_indexs.get_data());
 
     for (int i = 0; i < _left_output_slot_flags.size(); ++i) {
         if (_left_output_slot_flags[i]) {
@@ -129,15 +152,10 @@ void 
ProcessHashTableProbe<JoinOpType>::probe_side_output_column(vectorized::Mut
 
         if (_left_output_slot_flags[i] && 
!_parent_operator->is_lazy_materialized_column(i)) {
             auto& column = probe_block.get_by_position(i).column;
-            if (all_match_one) {
-                mcol[i]->insert_range_from(*column, 
_probe_indexs.get_element(0), size);
-            } else {
-                mcol[i]->insert_indices_from(*column, 
_probe_indexs.get_data().data(),
-                                             _probe_indexs.get_data().data() + 
size);
-            }
+            insert_with_indexs(mcol[i], column, _probe_indexs.get_data(), 
all_match_one);
         } else {
             mcol[i]->insert_default();
-            mcol[i] = vectorized::ColumnConst::create(std::move(mcol[i]), 
size);
+            mcol[i] = vectorized::ColumnConst::create(std::move(mcol[i]), 
_probe_indexs.size());
         }
     }
 }
@@ -248,24 +266,15 @@ Status 
ProcessHashTableProbe<JoinOpType>::process(HashTableType& hash_table_ctx,
         current_offset = new_current_offset;
     }
 
-    build_side_output_column(mcol, current_offset, is_mark_join);
+    // input row_indexs's size may bigger than current_offset coz 
_init_probe_side
+    _probe_indexs.resize(current_offset);
+    _build_indexs.resize(current_offset);
+
+    build_side_output_column(mcol, is_mark_join);
 
     if (_have_other_join_conjunct ||
         (JoinOpType != TJoinOp::RIGHT_SEMI_JOIN && JoinOpType != 
TJoinOp::RIGHT_ANTI_JOIN)) {
-        auto check_all_match_one = [](const auto& vecs, int size) {
-            if (!size || vecs[size - 1] != vecs[0] + size - 1) {
-                return false;
-            }
-            for (int i = 1; i < size; i++) {
-                if (vecs[i] == vecs[i - 1]) {
-                    return false;
-                }
-            }
-            return true;
-        };
-
-        probe_side_output_column(mcol, current_offset,
-                                 check_all_match_one(_probe_indexs.get_data(), 
current_offset));
+        probe_side_output_column(mcol);
     }
 
     output_block->swap(mutable_block.to_block());
@@ -280,8 +289,7 @@ Status 
ProcessHashTableProbe<JoinOpType>::process(HashTableType& hash_table_ctx,
                         ->empty_build_side(); // empty build side will return 
false to instead null
         return do_mark_join_conjuncts(output_block, ignore_null_map ? nullptr 
: null_map);
     } else if (_have_other_join_conjunct) {
-        return do_other_join_conjuncts(output_block, 
hash_table_ctx.hash_table->get_visited(),
-                                       
hash_table_ctx.hash_table->has_null_key());
+        return do_other_join_conjuncts(output_block, 
hash_table_ctx.hash_table->get_visited());
     }
 
     return Status::OK();
@@ -323,7 +331,8 @@ Status 
ProcessHashTableProbe<JoinOpType>::finalize_block_with_filter(
 
     auto do_lazy_materialize = [&](const std::vector<bool>& output_slot_flags,
                                    vectorized::ColumnVector<unsigned int>& 
row_indexs,
-                                   int column_offset, vectorized::Block* 
source_block) {
+                                   int column_offset, vectorized::Block* 
source_block,
+                                   bool try_all_match_one) {
         std::vector<int> column_ids;
         for (int i = 0; i < output_slot_flags.size(); ++i) {
             if (output_slot_flags[i] &&
@@ -334,23 +343,16 @@ Status 
ProcessHashTableProbe<JoinOpType>::finalize_block_with_filter(
         if (column_ids.empty()) {
             return;
         }
-        size_t row_count = filter_ptr->size();
-        // input row_indexs's size may bigger than row_count coz 
_init_probe_side
-        row_indexs.resize(row_count);
-
+        const auto& column_filter =
+                assert_cast<const 
vectorized::ColumnUInt8*>(filter_ptr.get())->get_data();
         bool need_filter =
-                simd::count_zero_num(
-                        (int8_t*)assert_cast<const 
vectorized::ColumnUInt8*>(filter_ptr.get())
-                                ->get_data()
-                                .data(),
-                        row_count) != 0;
+                simd::count_zero_num((int8_t*)column_filter.data(), 
column_filter.size()) != 0;
         if (need_filter) {
-            const auto& column_filter =
-                    assert_cast<const 
vectorized::ColumnUInt8*>(filter_ptr.get())->get_data();
             row_indexs.filter(column_filter);
         }
 
         const auto& container = row_indexs.get_data();
+        bool all_match_one = try_all_match_one && 
check_all_match_one(container);
         for (int column_id : column_ids) {
             int output_column_id = column_id + column_offset;
             output_block->get_by_position(output_column_id).column =
@@ -361,12 +363,13 @@ Status 
ProcessHashTableProbe<JoinOpType>::finalize_block_with_filter(
             auto& src = source_block->get_by_position(column_id).column;
             auto dst = 
output_block->get_by_position(output_column_id).column->assume_mutable();
             dst->clear();
-            dst->insert_indices_from(*src, container.data(), container.data() 
+ container.size());
+            insert_with_indexs(dst, src, container, all_match_one);
         }
     };
     do_lazy_materialize(_right_output_slot_flags, _build_indexs, 
(int)_right_col_idx,
-                        _build_block.get());
-    do_lazy_materialize(_left_output_slot_flags, _probe_indexs, 0, 
&_parent->_probe_block);
+                        _build_block.get(), false);
+    // probe side indexs must be incremental so set try_all_match_one to true
+    do_lazy_materialize(_left_output_slot_flags, _probe_indexs, 0, 
&_parent->_probe_block, true);
     return Status::OK();
 }
 
@@ -520,8 +523,7 @@ Status 
ProcessHashTableProbe<JoinOpType>::do_mark_join_conjuncts(vectorized::Blo
 
 template <int JoinOpType>
 Status 
ProcessHashTableProbe<JoinOpType>::do_other_join_conjuncts(vectorized::Block* 
output_block,
-                                                                  
DorisVector<uint8_t>& visited,
-                                                                  bool 
has_null_in_build_side) {
+                                                                  
DorisVector<uint8_t>& visited) {
     // dispose the other join conjunct exec
     auto row_count = output_block->rows();
     if (!row_count) {


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to