This is an automated email from the ASF dual-hosted git repository. panxiaolei pushed a commit to branch dev_join in repository https://gitbox.apache.org/repos/asf/doris.git
commit 16a2f7d090731d7fd6ebf0d695fe933c153d41d5 Author: Pxl <pxl...@qq.com> AuthorDate: Fri Oct 20 11:50:08 2023 +0800 update dev_join to pre calculate bucket num (#25663) --- be/src/vec/common/hash_table/hash_map.h | 51 +++++++++++---------- be/src/vec/common/hash_table/hash_map_context.h | 8 ++++ be/src/vec/exec/join/process_hash_table_probe.h | 6 --- .../vec/exec/join/process_hash_table_probe_impl.h | 53 ++-------------------- be/src/vec/exec/join/vhash_join_node.h | 2 + 5 files changed, 39 insertions(+), 81 deletions(-) diff --git a/be/src/vec/common/hash_table/hash_map.h b/be/src/vec/common/hash_table/hash_map.h index 53cb01dbfaa..89dfe7f8aac 100644 --- a/be/src/vec/common/hash_table/hash_map.h +++ b/be/src/vec/common/hash_table/hash_map.h @@ -216,18 +216,23 @@ public: return phmap::priv::NormalizeCapacity(expect_bucket_size) + 1; } + void reserve(int num_elem) { + bucket_size = calc_bucket_size(num_elem + 1); + first.resize(bucket_size, 0); + next.resize(num_elem); + } + void build(const Key* __restrict keys, const size_t* __restrict hash_values, size_t num_elem, int batch_size) { - max_batch_size = batch_size; - bucket_size = calc_bucket_size(num_elem + 1); + _batch_size = batch_size; + bucket_size = calc_bucket_size(num_elem); first.resize(bucket_size, 0); next.resize(num_elem); build_keys = keys; for (size_t i = 1; i < num_elem; i++) { - uint32_t bucket_num = hash_values[i] & (bucket_size - 1); - next[i] = first[bucket_num]; - first[bucket_num] = i; + next[i] = first[hash_values[i]]; + first[hash_values[i]] = i; } } @@ -248,18 +253,16 @@ public: return std::pair {0, 0}; } + size_t get_bucket_mask() { return bucket_size - 1; } + private: template <int JoinOpType> auto _find_batch_left_semi_anti(const Key* __restrict keys, const size_t* __restrict hash_values, int probe_idx, int probe_rows, std::vector<uint32_t>& probe_idxs) { - auto matched_cnt = 0; - const auto batch_size = max_batch_size; - - while (LIKELY(probe_idx < probe_rows && matched_cnt < batch_size)) { - uint32_t bucket_num = hash_values[probe_idx] & (bucket_size - 1); - auto build_idx = first[bucket_num]; - + int matched_cnt = 0; + while (LIKELY(probe_idx < probe_rows && matched_cnt < _batch_size)) { + uint32_t build_idx = first[hash_values[probe_idx]]; while (build_idx) { if (keys[probe_idx] == build_keys[build_idx]) { break; @@ -279,12 +282,11 @@ private: const size_t* __restrict hash_values, int probe_idx, int probe_rows, std::vector<uint32_t>& probe_idxs, std::vector<uint32_t>& build_idxs) { - auto matched_cnt = 0; - const auto batch_size = max_batch_size; + int matched_cnt = 0; uint32_t build_idx = 0; auto do_the_probe = [&]() { - while (build_idx && LIKELY(matched_cnt < batch_size)) { + while (build_idx && LIKELY(matched_cnt < _batch_size)) { if (keys[probe_idx] == build_keys[build_idx]) { probe_idxs[matched_cnt] = probe_idx; build_idxs[matched_cnt] = build_idx; @@ -302,12 +304,7 @@ private: } } - if (matched_cnt == max_batch_size && build_idx) { - current_probe_idx = probe_idx; - current_build_idx = build_idx; - } else { - probe_idx++; - } + probe_idx++; }; // some row over the batch_size, need dispose first @@ -317,17 +314,21 @@ private: current_build_idx = 0; do_the_probe(); } - while (LIKELY(probe_idx < probe_rows && matched_cnt < batch_size)) { - uint32_t bucket_num = hash_values[probe_idx] & (bucket_size - 1); - build_idx = first[bucket_num]; + while (LIKELY(probe_idx < probe_rows && matched_cnt < _batch_size)) { + build_idx = first[hash_values[probe_idx]]; do_the_probe(); } + + if (matched_cnt == _batch_size && build_idx) { + current_probe_idx = probe_idx - 1; + current_build_idx = build_idx; + } return std::pair {probe_idx, matched_cnt}; } const Key* __restrict build_keys; uint32_t bucket_size = 0; - int max_batch_size = 0; + int _batch_size = 0; int current_probe_idx = -1; uint32_t current_build_idx = 0; diff --git a/be/src/vec/common/hash_table/hash_map_context.h b/be/src/vec/common/hash_table/hash_map_context.h index 0b1748a9723..ec203b5c514 100644 --- a/be/src/vec/common/hash_table/hash_map_context.h +++ b/be/src/vec/common/hash_table/hash_map_context.h @@ -85,6 +85,7 @@ struct MethodBase { hash_values[k] = hash_table->hash(keys[k]); } } + void init_hash_values(size_t num_rows) { hash_values.resize(num_rows); for (size_t k = 0; k < num_rows; ++k) { @@ -92,6 +93,13 @@ struct MethodBase { } } + void calculate_bucket(size_t num_rows) { + size_t mask = hash_table->get_bucket_mask(); + for (int i = 0; i < num_rows; i++) { + hash_values[i] &= mask; + } + } + template <bool read> void prefetch(int current) { if (LIKELY(current + HASH_MAP_PREFETCH_DIST < hash_values.size())) { diff --git a/be/src/vec/exec/join/process_hash_table_probe.h b/be/src/vec/exec/join/process_hash_table_probe.h index eb1f6b78df5..ebd63f9f55d 100644 --- a/be/src/vec/exec/join/process_hash_table_probe.h +++ b/be/src/vec/exec/join/process_hash_table_probe.h @@ -75,17 +75,11 @@ struct ProcessHashTableProbe { UInt8* __restrict null_map_data, UInt8* __restrict filter_map, Block* output_block); - void _emplace_element(int32_t block_row, int& current_offset); - template <typename HashTableType> typename HashTableType::State _init_probe_side(HashTableType& hash_table_ctx, size_t probe_rows, bool with_other_join_conjuncts, const uint8_t* null_map); - template <typename Mapped, bool with_other_join_conjuncts> - ForwardIterator<Mapped>& _probe_row_match(int& current_offset, int& probe_index, - size_t& probe_size, bool& all_match_one); - // Process full outer join/ right join / right semi/anti join to output the join result // in hash table template <typename HashTableType> diff --git a/be/src/vec/exec/join/process_hash_table_probe_impl.h b/be/src/vec/exec/join/process_hash_table_probe_impl.h index 3c6ab25c21d..f4bef996911 100644 --- a/be/src/vec/exec/join/process_hash_table_probe_impl.h +++ b/be/src/vec/exec/join/process_hash_table_probe_impl.h @@ -137,52 +137,18 @@ typename HashTableType::State ProcessHashTableProbe<JoinOpType, Parent>::_init_p _visited_map.reserve(_batch_size * PROBE_SIDE_EXPLODE_RATE); _same_to_prev.reserve(_batch_size * PROBE_SIDE_EXPLODE_RATE); } - _probe_indexs.reserve(_batch_size * PROBE_SIDE_EXPLODE_RATE); - _build_block_rows.reserve(_batch_size * PROBE_SIDE_EXPLODE_RATE); + _probe_indexs.resize(_batch_size); + _build_block_rows.resize(_batch_size); if (!_parent->_ready_probe) { _parent->_ready_probe = true; hash_table_ctx.reset(); hash_table_ctx.init_serialized_keys(_parent->_probe_columns, probe_rows, null_map); + hash_table_ctx.calculate_bucket(probe_rows); } return typename HashTableType::State(_parent->_probe_columns); } -template <int JoinOpType, typename Parent> -template <typename Mapped, bool with_other_join_conjuncts> -ForwardIterator<Mapped>& ProcessHashTableProbe<JoinOpType, Parent>::_probe_row_match( - int& current_offset, int& probe_index, size_t& probe_size, bool& all_match_one) { - auto& probe_row_match_iter = std::get<ForwardIterator<Mapped>>(_parent->_probe_row_match_iter); - if (!probe_row_match_iter.ok()) { - return probe_row_match_iter; - } - - SCOPED_TIMER(_search_hashtable_timer); - for (; probe_row_match_iter.ok() && current_offset < _batch_size; ++probe_row_match_iter) { - _emplace_element(probe_row_match_iter->row_num, current_offset); - _probe_indexs.emplace_back(probe_index); - if constexpr (with_other_join_conjuncts) { - _visited_map.emplace_back(&probe_row_match_iter->visited); - } - } - - _row_count_from_last_probe = current_offset; - all_match_one &= (current_offset == 1); - if (!probe_row_match_iter.ok()) { - ++probe_index; - } - probe_size = 1; - - return probe_row_match_iter; -} - -template <int JoinOpType, typename Parent> -void ProcessHashTableProbe<JoinOpType, Parent>::_emplace_element(int32_t block_row, - int& current_offset) { - _build_block_rows.emplace_back(block_row); - current_offset++; -} - template <int JoinOpType, typename Parent> template <bool need_null_map_for_probe, bool ignore_null, typename HashTableType, bool with_other_conjuncts, bool is_mark_join> @@ -193,8 +159,6 @@ Status ProcessHashTableProbe<JoinOpType, Parent>::do_process(HashTableType& hash size_t probe_rows) { auto& probe_index = _parent->_probe_index; - using Mapped = typename HashTableType::Mapped; - _init_probe_side<HashTableType>(hash_table_ctx, probe_rows, with_other_conjuncts, need_null_map_for_probe ? null_map->data() : nullptr); @@ -206,9 +170,6 @@ Status ProcessHashTableProbe<JoinOpType, Parent>::do_process(HashTableType& hash bool all_match_one = false; size_t probe_size = 0; - auto& probe_row_match_iter = _probe_row_match<Mapped, with_other_conjuncts>( - current_offset, probe_index, probe_size, all_match_one); - // If not(which means it excceed batch size), probe_index is not increased and // remaining matched rows for the current probe row will be // handled in the next call of this function @@ -217,14 +178,6 @@ Status ProcessHashTableProbe<JoinOpType, Parent>::do_process(HashTableType& hash // Is the last sub block of splitted block bool is_the_last_sub_block = false; - if (with_other_conjuncts && probe_size != 0) { - is_the_last_sub_block = !probe_row_match_iter.ok(); - _same_to_prev.emplace_back(false); - for (int i = 0; i < current_offset - 1; ++i) { - _same_to_prev.emplace_back(true); - } - } - std::unique_ptr<ColumnFilterHelper> mark_column; if (is_mark_join) { mark_column = std::make_unique<ColumnFilterHelper>(*mcol[mcol.size() - 1]); diff --git a/be/src/vec/exec/join/vhash_join_node.h b/be/src/vec/exec/join/vhash_join_node.h index ee941fa5c1b..c15e3674642 100644 --- a/be/src/vec/exec/join/vhash_join_node.h +++ b/be/src/vec/exec/join/vhash_join_node.h @@ -131,8 +131,10 @@ struct ProcessHashTableBuild { if (!_parent->runtime_filter_descs().empty()) { _parent->_inserted_blocks.insert(&_acquired_block); } + hash_table_ctx.hash_table->reserve(_rows); hash_table_ctx.init_serialized_keys(_build_raw_ptrs, _rows, null_map ? null_map->data() : nullptr); + hash_table_ctx.calculate_bucket(_rows); SCOPED_TIMER(_parent->_build_table_insert_timer); hash_table_ctx.hash_table->build(hash_table_ctx.keys, hash_table_ctx.hash_values.data(), _rows, _state->batch_size()); --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org For additional commands, e-mail: commits-h...@doris.apache.org