This is an automated email from the ASF dual-hosted git repository.

lihaopeng pushed a commit to branch new_join
in repository https://gitbox.apache.org/repos/asf/doris.git


The following commit(s) were added to refs/heads/new_join by this push:
     new 986f6a74299 support tpch q21 (#26248)
986f6a74299 is described below

commit 986f6a742998193f172f65e00c49a7a5a3cb79fc
Author: Pxl <pxl...@qq.com>
AuthorDate: Thu Nov 2 10:30:37 2023 +0800

    support tpch q21 (#26248)
---
 be/src/vec/common/hash_table/hash_map.h            |  33 +-
 be/src/vec/exec/join/process_hash_table_probe.h    |  17 +-
 .../vec/exec/join/process_hash_table_probe_impl.h  | 332 +++------------------
 3 files changed, 61 insertions(+), 321 deletions(-)

diff --git a/be/src/vec/common/hash_table/hash_map.h 
b/be/src/vec/common/hash_table/hash_map.h
index cafe01e8231..9e368bb3ff6 100644
--- a/be/src/vec/common/hash_table/hash_map.h
+++ b/be/src/vec/common/hash_table/hash_map.h
@@ -236,6 +236,8 @@ public:
 
     size_t size() const { return next.size(); }
 
+    std::vector<uint8_t>& get_visited() { return visited; }
+
     void build(const Key* __restrict keys, const uint32_t* __restrict 
bucket_nums,
                size_t num_elem) {
         build_keys = keys;
@@ -246,7 +248,7 @@ public:
         }
     }
 
-    template <int JoinOpType>
+    template <int JoinOpType, bool with_other_conjuncts>
     auto find_batch(const Key* __restrict keys, const uint32_t* __restrict 
bucket_nums,
                     int probe_idx, uint32_t build_idx, int probe_rows,
                     uint32_t* __restrict probe_idxs, uint32_t* __restrict 
build_idxs) {
@@ -254,8 +256,8 @@ public:
                       JoinOpType == doris::TJoinOp::FULL_OUTER_JOIN ||
                       JoinOpType == doris::TJoinOp::LEFT_OUTER_JOIN ||
                       JoinOpType == doris::TJoinOp::RIGHT_OUTER_JOIN) {
-            return _find_batch_inner_outer_join<JoinOpType>(keys, bucket_nums, 
probe_idx, build_idx,
-                                                            probe_rows, 
probe_idxs, build_idxs);
+            return _find_batch_inner_outer_join<JoinOpType, 
with_other_conjuncts>(
+                    keys, bucket_nums, probe_idx, build_idx, probe_rows, 
probe_idxs, build_idxs);
         }
         if constexpr (JoinOpType == doris::TJoinOp::LEFT_ANTI_JOIN ||
                       JoinOpType == doris::TJoinOp::LEFT_SEMI_JOIN) {
@@ -264,7 +266,8 @@ public:
         }
         if constexpr (JoinOpType == doris::TJoinOp::RIGHT_ANTI_JOIN ||
                       JoinOpType == doris::TJoinOp::RIGHT_SEMI_JOIN) {
-            return _find_batch_right_semi_anti(keys, bucket_nums, probe_idx, 
probe_rows);
+            return _find_batch_right_semi_anti<with_other_conjuncts>(
+                    keys, bucket_nums, probe_idx, probe_rows, probe_idxs, 
build_idxs);
         }
         return std::tuple {0, 0u, 0};
     }
@@ -292,21 +295,30 @@ public:
     }
 
 private:
+    template <bool with_other_conjuncts>
     auto _find_batch_right_semi_anti(const Key* __restrict keys,
                                      const uint32_t* __restrict bucket_nums, 
int probe_idx,
-                                     int probe_rows) {
+                                     int probe_rows, uint32_t* __restrict 
probe_idxs,
+                                     uint32_t* __restrict build_idxs) {
+        auto matched_cnt = 0;
         while (probe_idx < probe_rows) {
             auto build_idx = first[bucket_nums[probe_idx]];
 
             while (build_idx) {
                 if (keys[probe_idx] == build_keys[build_idx]) {
-                    visited[build_idx] = 1;
+                    if constexpr (with_other_conjuncts) {
+                        build_idxs[matched_cnt] = build_idx;
+                        probe_idxs[matched_cnt] = probe_idx;
+                        matched_cnt++;
+                    } else {
+                        visited[build_idx] = 1;
+                    }
                 }
                 build_idx = next[build_idx];
             }
             probe_idx++;
         }
-        return std::tuple {probe_idx, 0u, 0};
+        return std::tuple {probe_idx, 0u, matched_cnt};
     }
 
     template <int JoinOpType>
@@ -334,7 +346,7 @@ private:
         return std::tuple {probe_idx, 0u, matched_cnt};
     }
 
-    template <int JoinOpType>
+    template <int JoinOpType, bool with_other_conjuncts>
     auto _find_batch_inner_outer_join(const Key* __restrict keys,
                                       const uint32_t* __restrict bucket_nums, 
int probe_idx,
                                       uint32_t build_idx, int probe_rows,
@@ -348,8 +360,9 @@ private:
                 if (keys[probe_idx] == build_keys[build_idx]) {
                     probe_idxs[matched_cnt] = probe_idx;
                     build_idxs[matched_cnt] = build_idx;
-                    if constexpr (JoinOpType == 
doris::TJoinOp::RIGHT_OUTER_JOIN ||
-                                  JoinOpType == 
doris::TJoinOp::FULL_OUTER_JOIN) {
+                    if constexpr (!with_other_conjuncts &&
+                                  (JoinOpType == 
doris::TJoinOp::RIGHT_OUTER_JOIN ||
+                                   JoinOpType == 
doris::TJoinOp::FULL_OUTER_JOIN)) {
                         visited[build_idx] = 1;
                     }
                     matched_cnt++;
diff --git a/be/src/vec/exec/join/process_hash_table_probe.h 
b/be/src/vec/exec/join/process_hash_table_probe.h
index 9c2fd6094b5..34b5dc3ee8d 100644
--- a/be/src/vec/exec/join/process_hash_table_probe.h
+++ b/be/src/vec/exec/join/process_hash_table_probe.h
@@ -68,24 +68,13 @@ struct ProcessHashTableProbe {
     // and output block may be different
     // The output result is determined by the other join conjunct result and 
same_to_prev struct
     Status do_other_join_conjuncts(Block* output_block, bool is_mark_join,
-                                   int multi_matched_output_row_count, bool 
is_the_last_sub_block);
-
-    void _process_splited_equal_matched_tuples(int start_row_idx, int 
row_count,
-                                               const UInt8* __restrict 
other_hit_column,
-                                               UInt8* __restrict null_map_data,
-                                               UInt8* __restrict filter_map, 
Block* output_block);
-
-    void _emplace_element(int32_t block_row, int& current_offset);
+                                   bool is_the_last_sub_block, 
std::vector<uint8_t>& visited);
 
     template <typename HashTableType>
     typename HashTableType::State _init_probe_side(HashTableType& 
hash_table_ctx, size_t probe_rows,
                                                    bool 
with_other_join_conjuncts,
                                                    const uint8_t* null_map);
 
-    template <typename Mapped, bool with_other_join_conjuncts>
-    ForwardIterator<Mapped>& _probe_row_match(int& current_offset, int& 
probe_index,
-                                              size_t& probe_size, bool& 
all_match_one);
-
     // Process full outer join/ right join / right semi/anti join to output 
the join result
     // in hash table
     template <typename HashTableType>
@@ -111,12 +100,8 @@ struct ProcessHashTableProbe {
     std::unique_ptr<Arena> _serialize_key_arena;
     std::vector<char> _probe_side_find_result;
 
-    std::vector<bool*> _visited_map;
-    std::vector<bool> _same_to_prev;
-
     int _right_col_idx;
     int _right_col_len;
-    int _row_count_from_last_probe;
 
     bool _have_other_join_conjunct;
     bool _is_right_semi_anti;
diff --git a/be/src/vec/exec/join/process_hash_table_probe_impl.h 
b/be/src/vec/exec/join/process_hash_table_probe_impl.h
index 6a21086f50e..e3fadf2056f 100644
--- a/be/src/vec/exec/join/process_hash_table_probe_impl.h
+++ b/be/src/vec/exec/join/process_hash_table_probe_impl.h
@@ -68,9 +68,9 @@ void ProcessHashTableProbe<JoinOpType, 
Parent>::build_side_output_column(
     constexpr auto probe_all =
             JoinOpType == TJoinOp::LEFT_OUTER_JOIN || JoinOpType == 
TJoinOp::FULL_OUTER_JOIN;
 
-    if (!is_semi_anti_join || have_other_join_conjunct) {
+    if ((!is_semi_anti_join || have_other_join_conjunct) && size) {
         for (int i = 0; i < _right_col_len; i++) {
-            const auto& column = *_build_block->get_by_position(i).column;
+            const auto& column = *_build_block->safe_get_by_position(i).column;
             if (output_slot_flags[i]) {
                 mcol[i + _right_col_idx]->insert_indices_from_join(column, 
_build_indexs.data(),
                                                                    
_build_indexs.data() + size);
@@ -126,17 +126,7 @@ typename HashTableType::State 
ProcessHashTableProbe<JoinOpType, Parent>::_init_p
                              ? 0
                              : _parent->left_table_data_types().size();
     _right_col_len = _parent->right_table_data_types().size();
-    _row_count_from_last_probe = 0;
-
-    _build_indexs.clear();
-    _probe_indexs.clear();
-    if (with_other_join_conjuncts) {
-        // use in right join to change visited state after exec the vother 
join conjunct
-        _visited_map.clear();
-        _same_to_prev.clear();
-        _visited_map.reserve(_batch_size * PROBE_SIDE_EXPLODE_RATE);
-        _same_to_prev.reserve(_batch_size * PROBE_SIDE_EXPLODE_RATE);
-    }
+
     _probe_indexs.resize(_batch_size);
     _build_indexs.resize(_batch_size);
 
@@ -149,41 +139,6 @@ typename HashTableType::State 
ProcessHashTableProbe<JoinOpType, Parent>::_init_p
     return typename HashTableType::State(_parent->_probe_columns);
 }
 
-template <int JoinOpType, typename Parent>
-template <typename Mapped, bool with_other_join_conjuncts>
-ForwardIterator<Mapped>& ProcessHashTableProbe<JoinOpType, 
Parent>::_probe_row_match(
-        int& current_offset, int& probe_index, size_t& probe_size, bool& 
all_match_one) {
-    auto& probe_row_match_iter = 
std::get<ForwardIterator<Mapped>>(_parent->_probe_row_match_iter);
-    if (!probe_row_match_iter.ok()) {
-        return probe_row_match_iter;
-    }
-
-    SCOPED_TIMER(_search_hashtable_timer);
-    for (; probe_row_match_iter.ok() && current_offset < _batch_size; 
++probe_row_match_iter) {
-        _emplace_element(probe_row_match_iter->row_num, current_offset);
-        _probe_indexs.emplace_back(probe_index);
-        if constexpr (with_other_join_conjuncts) {
-            _visited_map.emplace_back(&probe_row_match_iter->visited);
-        }
-    }
-
-    _row_count_from_last_probe = current_offset;
-    all_match_one &= (current_offset == 1);
-    if (!probe_row_match_iter.ok()) {
-        ++probe_index;
-    }
-    probe_size = 1;
-
-    return probe_row_match_iter;
-}
-
-template <int JoinOpType, typename Parent>
-void ProcessHashTableProbe<JoinOpType, Parent>::_emplace_element(int32_t 
block_row,
-                                                                 int& 
current_offset) {
-    _build_indexs.emplace_back(block_row);
-    current_offset++;
-}
-
 template <int JoinOpType, typename Parent>
 template <bool need_null_map_for_probe, bool ignore_null, typename 
HashTableType,
           bool with_other_conjuncts, bool is_mark_join>
@@ -194,39 +149,20 @@ Status ProcessHashTableProbe<JoinOpType, 
Parent>::do_process(HashTableType& hash
                                                              size_t 
probe_rows) {
     auto& probe_index = _parent->_probe_index;
     auto& build_index = _parent->_build_index;
-
-    using Mapped = typename HashTableType::Mapped;
+    auto last_probe_index = probe_index;
 
     _init_probe_side<HashTableType>(hash_table_ctx, probe_rows, 
with_other_conjuncts,
                                     need_null_map_for_probe ? null_map->data() 
: nullptr);
 
     auto& mcol = mutable_block.mutable_columns();
 
-    int last_probe_index = probe_index;
-
     int current_offset = 0;
     bool all_match_one = false;
     size_t probe_size = 0;
 
-    auto& probe_row_match_iter = _probe_row_match<Mapped, 
with_other_conjuncts>(
-            current_offset, probe_index, probe_size, all_match_one);
-
-    // If not(which means it excceed batch size), probe_index is not increased 
and
-    // remaining matched rows for the current probe row will be
-    // handled in the next call of this function
-    int multi_matched_output_row_count = 0;
-
     // Is the last sub block of splitted block
     bool is_the_last_sub_block = false;
 
-    if (with_other_conjuncts && probe_size != 0) {
-        is_the_last_sub_block = !probe_row_match_iter.ok();
-        _same_to_prev.emplace_back(false);
-        for (int i = 0; i < current_offset - 1; ++i) {
-            _same_to_prev.emplace_back(true);
-        }
-    }
-
     std::unique_ptr<ColumnFilterHelper> mark_column;
     if (is_mark_join) {
         mark_column = std::make_unique<ColumnFilterHelper>(*mcol[mcol.size() - 
1]);
@@ -235,7 +171,7 @@ Status ProcessHashTableProbe<JoinOpType, 
Parent>::do_process(HashTableType& hash
     {
         SCOPED_TIMER(_search_hashtable_timer);
         auto [new_probe_idx, new_build_idx, new_current_offset] =
-                hash_table_ctx.hash_table->template find_batch<JoinOpType>(
+                hash_table_ctx.hash_table->template find_batch<JoinOpType, 
with_other_conjuncts>(
                         hash_table_ctx.keys, 
hash_table_ctx.bucket_nums.data(), probe_index,
                         build_index, probe_rows, _probe_indexs.data(), 
_build_indexs.data());
         probe_index = new_probe_idx;
@@ -256,8 +192,8 @@ Status ProcessHashTableProbe<JoinOpType, 
Parent>::do_process(HashTableType& hash
     output_block->swap(mutable_block.to_block());
 
     if constexpr (with_other_conjuncts) {
-        return do_other_join_conjuncts(output_block, is_mark_join, 
multi_matched_output_row_count,
-                                       is_the_last_sub_block);
+        return do_other_join_conjuncts(output_block, is_mark_join, 
is_the_last_sub_block,
+                                       
hash_table_ctx.hash_table->get_visited());
     }
 
     return Status::OK();
@@ -265,8 +201,8 @@ Status ProcessHashTableProbe<JoinOpType, 
Parent>::do_process(HashTableType& hash
 
 template <int JoinOpType, typename Parent>
 Status ProcessHashTableProbe<JoinOpType, Parent>::do_other_join_conjuncts(
-        Block* output_block, bool is_mark_join, int 
multi_matched_output_row_count,
-        bool is_the_last_sub_block) {
+        Block* output_block, bool is_mark_join, bool is_the_last_sub_block,
+        std::vector<uint8_t>& visited) {
     // dispose the other join conjunct exec
     auto row_count = output_block->rows();
     if (!row_count) {
@@ -301,22 +237,10 @@ Status ProcessHashTableProbe<JoinOpType, 
Parent>::do_other_join_conjuncts(
         auto null_map_column = ColumnVector<UInt8>::create(row_count, 0);
         auto* __restrict null_map_data = null_map_column->get_data().data();
 
-        // It contains non-first sub block of splited equal-conjuncts-matched 
tuples from last probe row
-        if (_row_count_from_last_probe > 0) {
-            _process_splited_equal_matched_tuples(0, 
_row_count_from_last_probe, filter_column_ptr,
-                                                  null_map_data, filter_map, 
output_block);
-            // This is the last sub block of splitted block, and no 
equal-conjuncts-matched tuple
-            // is output in all sub blocks, need to output a tuple for this 
probe row
-            if (is_the_last_sub_block && 
!_parent->_is_any_probe_match_row_output) {
-                filter_map[0] = true;
-                null_map_data[0] = true;
-            }
-        }
-        int end_idx = row_count - multi_matched_output_row_count;
         // process equal-conjuncts-matched tuples that are newly generated
         // in this run if there are any.
-        for (int i = _row_count_from_last_probe; i < end_idx; ++i) {
-            auto join_hit = _visited_map[i] != nullptr;
+        for (int i = 0; i < row_count; ++i) {
+            auto join_hit = _build_indexs[i];
             auto other_hit = filter_column_ptr[i];
 
             if (!other_hit) {
@@ -330,89 +254,31 @@ Status ProcessHashTableProbe<JoinOpType, 
Parent>::do_other_join_conjuncts(
             }
             null_map_data[i] = !join_hit || !other_hit;
 
-            // For cases where one probe row matches multiple build rows for 
equal conjuncts,
-            // all the other-conjuncts-matched tuples should be output.
-            //
-            // Other-conjuncts-NOT-matched tuples fall into two categories:
-            //    1. The beginning consecutive one(s).
-            //       For these tuples, only the last one is marked to output;
-            //       If there are any following other-conjuncts-matched tuples,
-            //       the last tuple is also marked NOT to output.
-            //    2. All the remaining other-conjuncts-NOT-matched tuples.
-            //       All these tuples are marked not to output.
             if (join_hit) {
-                *_visited_map[i] |= other_hit;
-                filter_map[i] = other_hit || !_same_to_prev[i] ||
-                                (!filter_column_ptr[i] && filter_map[i - 1]);
-                // Here to keep only hit join conjunct and other join conjunt 
is true need to be output.
-                // if not, only some key must keep one row will output will 
null right table column
-                if (_same_to_prev[i] && filter_map[i] && !filter_column_ptr[i 
- 1]) {
-                    filter_map[i - 1] = false;
-                }
+                filter_map[i] = other_hit;
             } else {
                 filter_map[i] = true;
             }
         }
 
-        // It contains the first sub block of splited equal-conjuncts-matched 
tuples of the current probe row
-        if (multi_matched_output_row_count > 0) {
-            _parent->_is_any_probe_match_row_output = false;
-            _process_splited_equal_matched_tuples(row_count - 
multi_matched_output_row_count,
-                                                  
multi_matched_output_row_count, filter_column_ptr,
-                                                  null_map_data, filter_map, 
output_block);
-        }
-
         for (size_t i = 0; i < row_count; ++i) {
             if (filter_map[i]) {
                 _tuple_is_null_right_flags->emplace_back(null_map_data[i]);
+                if constexpr (JoinOpType == TJoinOp::FULL_OUTER_JOIN ||
+                              JoinOpType == TJoinOp::RIGHT_OUTER_JOIN) {
+                    visited[_build_indexs[i]] = 1;
+                }
             }
         }
         output_block->get_by_position(result_column_id).column = 
std::move(new_filter_column);
     } else if constexpr (JoinOpType == TJoinOp::LEFT_SEMI_JOIN) {
-        // TODO: resize in advance
-        auto new_filter_column = ColumnVector<UInt8>::create();
+        auto new_filter_column = ColumnVector<UInt8>::create(row_count);
         auto& filter_map = new_filter_column->get_data();
 
         size_t start_row_idx = 1;
-        // We are handling euqual-conjuncts matched tuples that are splitted 
into multiple blocks
-        if (_row_count_from_last_probe > 0) {
-            if (_parent->_is_any_probe_match_row_output) {
-                // if any matched tuple for this probe row is output,
-                // ignore all the following tuples for this probe row.
-                for (int row_idx = 0; row_idx < _row_count_from_last_probe; 
++row_idx) {
-                    filter_map.emplace_back(false);
-                }
-                start_row_idx += _row_count_from_last_probe;
-                if (_row_count_from_last_probe < row_count) {
-                    
filter_map.emplace_back(filter_column_ptr[_row_count_from_last_probe]);
-                }
-            } else {
-                filter_map.emplace_back(filter_column_ptr[0]);
-            }
-        } else {
-            filter_map.emplace_back(filter_column_ptr[0]);
-        }
+        filter_map.emplace_back(filter_column_ptr[0]);
         for (size_t i = start_row_idx; i < row_count; ++i) {
-            if (filter_column_ptr[i] || (_same_to_prev[i] && filter_map[i - 
1])) {
-                // Only last same element is true, output last one
-                filter_map.push_back(true);
-                filter_map[i - 1] = !_same_to_prev[i] && filter_map[i - 1];
-            } else {
-                filter_map.push_back(false);
-            }
-        }
-        // It contains the first sub block of splited equal-conjuncts-matched 
tuples of the current probe row
-        if (multi_matched_output_row_count > 0) {
-            // If a matched row is output, all the equal-matched tuples in
-            // the following sub blocks should be ignored
-            _parent->_is_any_probe_match_row_output = filter_map[row_count - 
1];
-        } else if (_row_count_from_last_probe > 0 && 
!_parent->_is_any_probe_match_row_output) {
-            // We are handling euqual-conjuncts matched tuples that are 
splitted into multiple blocks,
-            // and no matched tuple has been output in all previous run.
-            // If a tuple is output in this run, all the following mathced 
tuples should be ignored
-            if (filter_map[_row_count_from_last_probe - 1]) {
-                _parent->_is_any_probe_match_row_output = true;
-            }
+            filter_map[i] = filter_column_ptr[i];
         }
 
         /// FIXME: incorrect result of semi mark join with other 
conjuncts(null value missed).
@@ -423,14 +289,9 @@ Status ProcessHashTableProbe<JoinOpType, 
Parent>::do_other_join_conjuncts(
 
             // For mark join, we only filter rows which have duplicate join 
keys.
             // And then, we set matched_map to the join result to do the mark 
join's filtering.
-            for (size_t i = 1; i < row_count; ++i) {
-                if (!_same_to_prev[i]) {
-                    helper.insert_value(filter_map[i - 1]);
-                    filter_map[i - 1] = true;
-                }
+            for (size_t i = 0; i < row_count; ++i) {
+                helper.insert_value(filter_map[i]);
             }
-            helper.insert_value(filter_map[filter_map.size() - 1]);
-            filter_map[filter_map.size() - 1] = true;
         }
 
         output_block->get_by_position(result_column_id).column = 
std::move(new_filter_column);
@@ -449,34 +310,12 @@ Status ProcessHashTableProbe<JoinOpType, 
Parent>::do_other_join_conjuncts(
         // if there are none, just pick a tuple and output.
 
         size_t start_row_idx = 1;
-        // We are handling euqual-conjuncts matched tuples that are splitted 
into multiple blocks
-        if (_row_count_from_last_probe > 0 && 
_parent->_is_any_probe_match_row_output) {
-            // if any matched tuple for this probe row is output,
-            // ignore all the following tuples for this probe row.
-            for (int row_idx = 0; row_idx < _row_count_from_last_probe; 
++row_idx) {
-                filter_map[row_idx] = false;
-            }
-            start_row_idx += _row_count_from_last_probe;
-            if (_row_count_from_last_probe < row_count) {
-                filter_map[_row_count_from_last_probe] =
-                        filter_column_ptr[_row_count_from_last_probe] &&
-                        _visited_map[_row_count_from_last_probe];
-            }
-        } else {
-            // Both equal conjuncts and other conjuncts are true
-            filter_map[0] = filter_column_ptr[0] && _visited_map[0];
-        }
+        // Both equal conjuncts and other conjuncts are true
+        filter_map[0] = filter_column_ptr[0] && _build_indexs[0];
 
         for (size_t i = start_row_idx; i < row_count; ++i) {
-            if ((_visited_map[i] && filter_column_ptr[i]) ||
-                (_same_to_prev[i] && filter_map[i - 1])) {
-                // When either of two conditions is meet:
-                // 1. Both equal conjuncts and other conjuncts are true or 
same_to_prev
-                // 2. This row is joined from the same build side row as the 
previous row
-                // Set filter_map[i] to true and filter_map[i - 1] to false if 
same_to_prev[i]
-                // is true.
-                filter_map[i] = true;
-                filter_map[i - 1] = !_same_to_prev[i] && filter_map[i - 1];
+            if (_build_indexs[i] && filter_column_ptr[i]) {
+                filter_map[i] = _build_indexs[i] && filter_column_ptr[i];
             } else {
                 filter_map[i] = false;
             }
@@ -487,60 +326,8 @@ Status ProcessHashTableProbe<JoinOpType, 
Parent>::do_other_join_conjuncts(
                                         
*(output_block->get_by_position(orig_columns - 1)
                                                   .column->assume_mutable()))
                                         .get_data();
-            for (int i = 1; i < row_count; ++i) {
-                if (!_same_to_prev[i]) {
-                    matched_map.push_back(!filter_map[i - 1]);
-                    filter_map[i - 1] = true;
-                }
-            }
-            matched_map.push_back(!filter_map[row_count - 1]);
-            filter_map[row_count - 1] = true;
-        } else {
-            int end_row_idx = 0;
-            if (_row_count_from_last_probe > 0) {
-                end_row_idx = row_count - multi_matched_output_row_count;
-                if (!_parent->_is_any_probe_match_row_output) {
-                    // We are handling euqual-conjuncts matched tuples that 
are splitted into multiple blocks,
-                    // and no matched tuple has been output in all previous 
run.
-                    // If a tuple is output in this run, all the following 
mathced tuples should be ignored
-                    if (filter_map[_row_count_from_last_probe - 1]) {
-                        _parent->_is_any_probe_match_row_output = true;
-                        filter_map[_row_count_from_last_probe - 1] = false;
-                    }
-                    if (is_the_last_sub_block && 
!_parent->_is_any_probe_match_row_output) {
-                        // This is the last sub block of splitted block, and 
no equal-conjuncts-matched tuple
-                        // is output in all sub blocks, output a tuple for 
this probe row
-                        filter_map[0] = true;
-                    }
-                }
-                if (multi_matched_output_row_count > 0) {
-                    // It contains the first sub block of splited 
equal-conjuncts-matched tuples of the current probe row
-                    // If a matched row is output, all the equal-matched 
tuples in
-                    // the following sub blocks should be ignored
-                    _parent->_is_any_probe_match_row_output = 
filter_map[row_count - 1];
-                    filter_map[row_count - 1] = false;
-                }
-            } else if (multi_matched_output_row_count > 0) {
-                end_row_idx = row_count - multi_matched_output_row_count;
-                // It contains the first sub block of splited 
equal-conjuncts-matched tuples of the current probe row
-                // If a matched row is output, all the equal-matched tuples in
-                // the following sub blocks should be ignored
-                _parent->_is_any_probe_match_row_output = filter_map[row_count 
- 1];
-                filter_map[row_count - 1] = false;
-            } else {
-                end_row_idx = row_count;
-            }
-
-            // Same to the semi join, but change the last value to opposite 
value
-            for (int i = 1 + _row_count_from_last_probe; i < end_row_idx; ++i) 
{
-                if (!_same_to_prev[i]) {
-                    filter_map[i - 1] = !filter_map[i - 1];
-                }
-            }
-            auto non_sub_blocks_matched_row_count =
-                    row_count - _row_count_from_last_probe - 
multi_matched_output_row_count;
-            if (non_sub_blocks_matched_row_count > 0) {
-                filter_map[end_row_idx - 1] = !filter_map[end_row_idx - 1];
+            for (int i = 0; i < row_count; ++i) {
+                matched_map.push_back(!filter_map[i]);
             }
         }
 
@@ -548,16 +335,13 @@ Status ProcessHashTableProbe<JoinOpType, 
Parent>::do_other_join_conjuncts(
     } else if constexpr (JoinOpType == TJoinOp::RIGHT_SEMI_JOIN ||
                          JoinOpType == TJoinOp::RIGHT_ANTI_JOIN) {
         for (int i = 0; i < row_count; ++i) {
-            DCHECK(_visited_map[i]);
-            *_visited_map[i] |= filter_column_ptr[i];
+            visited[_build_indexs[i]] |= filter_column_ptr[i];
         }
     } else if constexpr (JoinOpType == TJoinOp::RIGHT_OUTER_JOIN) {
         auto filter_size = 0;
         for (int i = 0; i < row_count; ++i) {
-            DCHECK(_visited_map[i]);
-            auto result = filter_column_ptr[i];
-            *_visited_map[i] |= result;
-            filter_size += result;
+            visited[_build_indexs[i]] |= filter_column_ptr[i];
+            filter_size += filter_column_ptr[i];
         }
         _tuple_is_null_left_flags->resize_fill(filter_size, 0);
     }
@@ -579,42 +363,6 @@ Status ProcessHashTableProbe<JoinOpType, 
Parent>::do_other_join_conjuncts(
     return Status::OK();
 }
 
-// For left or full outer join with other conjuncts.
-// If multiple equal-conjuncts-matched tuples is splitted into several
-// sub blocks, just filter out all the other-conjuncts-NOT-matched tuples at 
first,
-// and when processing the last sub block, check whether there are any
-// equal-conjuncts-matched tuple is output in all sub blocks,
-// if not, just pick a tuple and output.
-template <int JoinOpType, typename Parent>
-void ProcessHashTableProbe<JoinOpType, 
Parent>::_process_splited_equal_matched_tuples(
-        int start_row_idx, int row_count, const UInt8* __restrict 
other_hit_column,
-        UInt8* __restrict null_map_data, UInt8* __restrict filter_map, Block* 
output_block) {
-    int end_row_idx = start_row_idx + row_count;
-    for (int i = start_row_idx; i < end_row_idx; ++i) {
-        auto join_hit = _visited_map[i] != nullptr;
-        auto other_hit = other_hit_column[i];
-
-        if (!other_hit) {
-            for (size_t j = 0; j < _right_col_len; ++j) {
-                typeid_cast<ColumnNullable*>(
-                        std::move(*output_block->get_by_position(j + 
_right_col_idx).column)
-                                .assume_mutable()
-                                .get())
-                        ->get_null_map_data()[i] = true;
-            }
-        }
-
-        null_map_data[i] = !join_hit || !other_hit;
-        filter_map[i] = other_hit;
-
-        if (join_hit) {
-            *_visited_map[i] |= other_hit;
-        }
-    }
-    _parent->_is_any_probe_match_row_output |=
-            simd::contain_byte(filter_map + start_row_idx, row_count, 1);
-}
-
 template <int JoinOpType, typename Parent>
 template <typename HashTableType>
 Status ProcessHashTableProbe<JoinOpType, Parent>::process_data_in_hashtable(
@@ -624,31 +372,25 @@ Status ProcessHashTableProbe<JoinOpType, 
Parent>::process_data_in_hashtable(
     auto& mcol = mutable_block.mutable_columns();
     *eos = hash_table_ctx.hash_table->template 
iterate_map<JoinOpType>(_build_indexs);
     auto block_size = _build_indexs.size();
-    int right_col_idx =
-            JoinOpType == TJoinOp::RIGHT_OUTER_JOIN || JoinOpType == 
TJoinOp::FULL_OUTER_JOIN
-                    ? _parent->left_table_data_types().size()
-                    : 0;
-    int right_col_len = _parent->right_table_data_types().size();
 
     if (block_size) {
-        for (size_t j = 0; j < right_col_len; ++j) {
-            const auto& column = *_build_block->get_by_position(j).column;
-            mcol[j + right_col_idx]->insert_indices_from_join(
-                    column, _build_indexs.data(), _build_indexs.data() + 
_build_indexs.size());
+        for (size_t j = 0; j < _right_col_len; ++j) {
+            const auto& column = *_build_block->safe_get_by_position(j).column;
+            mcol[j + _right_col_idx]->insert_indices_from_join(column, 
_build_indexs.data(),
+                                                               
_build_indexs.data() + block_size);
         }
 
         // just resize the left table column in case with other conjunct to 
make block size is not zero
         if (_is_right_semi_anti && _have_other_join_conjunct) {
-            auto target_size = mcol[right_col_idx]->size();
-            for (int i = 0; i < right_col_idx; ++i) {
-                mcol[i]->resize(target_size);
+            for (int i = 0; i < _right_col_idx; ++i) {
+                mcol[i]->resize(block_size);
             }
         }
 
         // right outer join / full join need insert data of left table
         if constexpr (JoinOpType == TJoinOp::RIGHT_OUTER_JOIN ||
                       JoinOpType == TJoinOp::FULL_OUTER_JOIN) {
-            for (int i = 0; i < right_col_idx; ++i) {
+            for (int i = 0; i < _right_col_idx; ++i) {
                 
assert_cast<ColumnNullable*>(mcol[i].get())->insert_many_defaults(block_size);
             }
             _tuple_is_null_left_flags->resize_fill(block_size, 1);


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org
For additional commands, e-mail: commits-h...@doris.apache.org

Reply via email to