This is an automated email from the ASF dual-hosted git repository.

panxiaolei pushed a commit to branch dev_join
in repository https://gitbox.apache.org/repos/asf/doris.git

commit 16a2f7d090731d7fd6ebf0d695fe933c153d41d5
Author: Pxl <pxl...@qq.com>
AuthorDate: Fri Oct 20 11:50:08 2023 +0800

    update dev_join to pre calculate bucket num (#25663)
---
 be/src/vec/common/hash_table/hash_map.h            | 51 +++++++++++----------
 be/src/vec/common/hash_table/hash_map_context.h    |  8 ++++
 be/src/vec/exec/join/process_hash_table_probe.h    |  6 ---
 .../vec/exec/join/process_hash_table_probe_impl.h  | 53 ++--------------------
 be/src/vec/exec/join/vhash_join_node.h             |  2 +
 5 files changed, 39 insertions(+), 81 deletions(-)

diff --git a/be/src/vec/common/hash_table/hash_map.h 
b/be/src/vec/common/hash_table/hash_map.h
index 53cb01dbfaa..89dfe7f8aac 100644
--- a/be/src/vec/common/hash_table/hash_map.h
+++ b/be/src/vec/common/hash_table/hash_map.h
@@ -216,18 +216,23 @@ public:
         return phmap::priv::NormalizeCapacity(expect_bucket_size) + 1;
     }
 
+    void reserve(int num_elem) {
+        bucket_size = calc_bucket_size(num_elem + 1);
+        first.resize(bucket_size, 0);
+        next.resize(num_elem);
+    }
+
     void build(const Key* __restrict keys, const size_t* __restrict 
hash_values, size_t num_elem,
                int batch_size) {
-        max_batch_size = batch_size;
-        bucket_size = calc_bucket_size(num_elem + 1);
+        _batch_size = batch_size;
+        bucket_size = calc_bucket_size(num_elem);
         first.resize(bucket_size, 0);
         next.resize(num_elem);
 
         build_keys = keys;
         for (size_t i = 1; i < num_elem; i++) {
-            uint32_t bucket_num = hash_values[i] & (bucket_size - 1);
-            next[i] = first[bucket_num];
-            first[bucket_num] = i;
+            next[i] = first[hash_values[i]];
+            first[hash_values[i]] = i;
         }
     }
 
@@ -248,18 +253,16 @@ public:
         return std::pair {0, 0};
     }
 
+    size_t get_bucket_mask() { return bucket_size - 1; }
+
 private:
     template <int JoinOpType>
     auto _find_batch_left_semi_anti(const Key* __restrict keys,
                                     const size_t* __restrict hash_values, int 
probe_idx,
                                     int probe_rows, std::vector<uint32_t>& 
probe_idxs) {
-        auto matched_cnt = 0;
-        const auto batch_size = max_batch_size;
-
-        while (LIKELY(probe_idx < probe_rows && matched_cnt < batch_size)) {
-            uint32_t bucket_num = hash_values[probe_idx] & (bucket_size - 1);
-            auto build_idx = first[bucket_num];
-
+        int matched_cnt = 0;
+        while (LIKELY(probe_idx < probe_rows && matched_cnt < _batch_size)) {
+            uint32_t build_idx = first[hash_values[probe_idx]];
             while (build_idx) {
                 if (keys[probe_idx] == build_keys[build_idx]) {
                     break;
@@ -279,12 +282,11 @@ private:
                                       const size_t* __restrict hash_values, 
int probe_idx,
                                       int probe_rows, std::vector<uint32_t>& 
probe_idxs,
                                       std::vector<uint32_t>& build_idxs) {
-        auto matched_cnt = 0;
-        const auto batch_size = max_batch_size;
+        int matched_cnt = 0;
         uint32_t build_idx = 0;
 
         auto do_the_probe = [&]() {
-            while (build_idx && LIKELY(matched_cnt < batch_size)) {
+            while (build_idx && LIKELY(matched_cnt < _batch_size)) {
                 if (keys[probe_idx] == build_keys[build_idx]) {
                     probe_idxs[matched_cnt] = probe_idx;
                     build_idxs[matched_cnt] = build_idx;
@@ -302,12 +304,7 @@ private:
                 }
             }
 
-            if (matched_cnt == max_batch_size && build_idx) {
-                current_probe_idx = probe_idx;
-                current_build_idx = build_idx;
-            } else {
-                probe_idx++;
-            }
+            probe_idx++;
         };
 
         // some row over the batch_size, need dispose first
@@ -317,17 +314,21 @@ private:
             current_build_idx = 0;
             do_the_probe();
         }
-        while (LIKELY(probe_idx < probe_rows && matched_cnt < batch_size)) {
-            uint32_t bucket_num = hash_values[probe_idx] & (bucket_size - 1);
-            build_idx = first[bucket_num];
+        while (LIKELY(probe_idx < probe_rows && matched_cnt < _batch_size)) {
+            build_idx = first[hash_values[probe_idx]];
             do_the_probe();
         }
+
+        if (matched_cnt == _batch_size && build_idx) {
+            current_probe_idx = probe_idx - 1;
+            current_build_idx = build_idx;
+        }
         return std::pair {probe_idx, matched_cnt};
     }
 
     const Key* __restrict build_keys;
     uint32_t bucket_size = 0;
-    int max_batch_size = 0;
+    int _batch_size = 0;
 
     int current_probe_idx = -1;
     uint32_t current_build_idx = 0;
diff --git a/be/src/vec/common/hash_table/hash_map_context.h 
b/be/src/vec/common/hash_table/hash_map_context.h
index 0b1748a9723..ec203b5c514 100644
--- a/be/src/vec/common/hash_table/hash_map_context.h
+++ b/be/src/vec/common/hash_table/hash_map_context.h
@@ -85,6 +85,7 @@ struct MethodBase {
             hash_values[k] = hash_table->hash(keys[k]);
         }
     }
+
     void init_hash_values(size_t num_rows) {
         hash_values.resize(num_rows);
         for (size_t k = 0; k < num_rows; ++k) {
@@ -92,6 +93,13 @@ struct MethodBase {
         }
     }
 
+    void calculate_bucket(size_t num_rows) {
+        size_t mask = hash_table->get_bucket_mask();
+        for (int i = 0; i < num_rows; i++) {
+            hash_values[i] &= mask;
+        }
+    }
+
     template <bool read>
     void prefetch(int current) {
         if (LIKELY(current + HASH_MAP_PREFETCH_DIST < hash_values.size())) {
diff --git a/be/src/vec/exec/join/process_hash_table_probe.h 
b/be/src/vec/exec/join/process_hash_table_probe.h
index eb1f6b78df5..ebd63f9f55d 100644
--- a/be/src/vec/exec/join/process_hash_table_probe.h
+++ b/be/src/vec/exec/join/process_hash_table_probe.h
@@ -75,17 +75,11 @@ struct ProcessHashTableProbe {
                                                UInt8* __restrict null_map_data,
                                                UInt8* __restrict filter_map, 
Block* output_block);
 
-    void _emplace_element(int32_t block_row, int& current_offset);
-
     template <typename HashTableType>
     typename HashTableType::State _init_probe_side(HashTableType& 
hash_table_ctx, size_t probe_rows,
                                                    bool 
with_other_join_conjuncts,
                                                    const uint8_t* null_map);
 
-    template <typename Mapped, bool with_other_join_conjuncts>
-    ForwardIterator<Mapped>& _probe_row_match(int& current_offset, int& 
probe_index,
-                                              size_t& probe_size, bool& 
all_match_one);
-
     // Process full outer join/ right join / right semi/anti join to output 
the join result
     // in hash table
     template <typename HashTableType>
diff --git a/be/src/vec/exec/join/process_hash_table_probe_impl.h 
b/be/src/vec/exec/join/process_hash_table_probe_impl.h
index 3c6ab25c21d..f4bef996911 100644
--- a/be/src/vec/exec/join/process_hash_table_probe_impl.h
+++ b/be/src/vec/exec/join/process_hash_table_probe_impl.h
@@ -137,52 +137,18 @@ typename HashTableType::State 
ProcessHashTableProbe<JoinOpType, Parent>::_init_p
         _visited_map.reserve(_batch_size * PROBE_SIDE_EXPLODE_RATE);
         _same_to_prev.reserve(_batch_size * PROBE_SIDE_EXPLODE_RATE);
     }
-    _probe_indexs.reserve(_batch_size * PROBE_SIDE_EXPLODE_RATE);
-    _build_block_rows.reserve(_batch_size * PROBE_SIDE_EXPLODE_RATE);
+    _probe_indexs.resize(_batch_size);
+    _build_block_rows.resize(_batch_size);
 
     if (!_parent->_ready_probe) {
         _parent->_ready_probe = true;
         hash_table_ctx.reset();
         hash_table_ctx.init_serialized_keys(_parent->_probe_columns, 
probe_rows, null_map);
+        hash_table_ctx.calculate_bucket(probe_rows);
     }
     return typename HashTableType::State(_parent->_probe_columns);
 }
 
-template <int JoinOpType, typename Parent>
-template <typename Mapped, bool with_other_join_conjuncts>
-ForwardIterator<Mapped>& ProcessHashTableProbe<JoinOpType, 
Parent>::_probe_row_match(
-        int& current_offset, int& probe_index, size_t& probe_size, bool& 
all_match_one) {
-    auto& probe_row_match_iter = 
std::get<ForwardIterator<Mapped>>(_parent->_probe_row_match_iter);
-    if (!probe_row_match_iter.ok()) {
-        return probe_row_match_iter;
-    }
-
-    SCOPED_TIMER(_search_hashtable_timer);
-    for (; probe_row_match_iter.ok() && current_offset < _batch_size; 
++probe_row_match_iter) {
-        _emplace_element(probe_row_match_iter->row_num, current_offset);
-        _probe_indexs.emplace_back(probe_index);
-        if constexpr (with_other_join_conjuncts) {
-            _visited_map.emplace_back(&probe_row_match_iter->visited);
-        }
-    }
-
-    _row_count_from_last_probe = current_offset;
-    all_match_one &= (current_offset == 1);
-    if (!probe_row_match_iter.ok()) {
-        ++probe_index;
-    }
-    probe_size = 1;
-
-    return probe_row_match_iter;
-}
-
-template <int JoinOpType, typename Parent>
-void ProcessHashTableProbe<JoinOpType, Parent>::_emplace_element(int32_t 
block_row,
-                                                                 int& 
current_offset) {
-    _build_block_rows.emplace_back(block_row);
-    current_offset++;
-}
-
 template <int JoinOpType, typename Parent>
 template <bool need_null_map_for_probe, bool ignore_null, typename 
HashTableType,
           bool with_other_conjuncts, bool is_mark_join>
@@ -193,8 +159,6 @@ Status ProcessHashTableProbe<JoinOpType, 
Parent>::do_process(HashTableType& hash
                                                              size_t 
probe_rows) {
     auto& probe_index = _parent->_probe_index;
 
-    using Mapped = typename HashTableType::Mapped;
-
     _init_probe_side<HashTableType>(hash_table_ctx, probe_rows, 
with_other_conjuncts,
                                     need_null_map_for_probe ? null_map->data() 
: nullptr);
 
@@ -206,9 +170,6 @@ Status ProcessHashTableProbe<JoinOpType, 
Parent>::do_process(HashTableType& hash
     bool all_match_one = false;
     size_t probe_size = 0;
 
-    auto& probe_row_match_iter = _probe_row_match<Mapped, 
with_other_conjuncts>(
-            current_offset, probe_index, probe_size, all_match_one);
-
     // If not(which means it excceed batch size), probe_index is not increased 
and
     // remaining matched rows for the current probe row will be
     // handled in the next call of this function
@@ -217,14 +178,6 @@ Status ProcessHashTableProbe<JoinOpType, 
Parent>::do_process(HashTableType& hash
     // Is the last sub block of splitted block
     bool is_the_last_sub_block = false;
 
-    if (with_other_conjuncts && probe_size != 0) {
-        is_the_last_sub_block = !probe_row_match_iter.ok();
-        _same_to_prev.emplace_back(false);
-        for (int i = 0; i < current_offset - 1; ++i) {
-            _same_to_prev.emplace_back(true);
-        }
-    }
-
     std::unique_ptr<ColumnFilterHelper> mark_column;
     if (is_mark_join) {
         mark_column = std::make_unique<ColumnFilterHelper>(*mcol[mcol.size() - 
1]);
diff --git a/be/src/vec/exec/join/vhash_join_node.h 
b/be/src/vec/exec/join/vhash_join_node.h
index ee941fa5c1b..c15e3674642 100644
--- a/be/src/vec/exec/join/vhash_join_node.h
+++ b/be/src/vec/exec/join/vhash_join_node.h
@@ -131,8 +131,10 @@ struct ProcessHashTableBuild {
         if (!_parent->runtime_filter_descs().empty()) {
             _parent->_inserted_blocks.insert(&_acquired_block);
         }
+        hash_table_ctx.hash_table->reserve(_rows);
         hash_table_ctx.init_serialized_keys(_build_raw_ptrs, _rows,
                                             null_map ? null_map->data() : 
nullptr);
+        hash_table_ctx.calculate_bucket(_rows);
         SCOPED_TIMER(_parent->_build_table_insert_timer);
         hash_table_ctx.hash_table->build(hash_table_ctx.keys, 
hash_table_ctx.hash_values.data(),
                                          _rows, _state->batch_size());


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org
For additional commands, e-mail: commits-h...@doris.apache.org

Reply via email to