This is an automated email from the ASF dual-hosted git repository. panxiaolei pushed a commit to branch dev_join in repository https://gitbox.apache.org/repos/asf/doris.git
commit 2e0778a3d87f7a0af33dd11ed3e914ddfd922ed6 Author: BiteTheDDDDt <pxl...@qq.com> AuthorDate: Mon Oct 16 18:43:26 2023 +0800 merge block to single block on join/set node update update update --- be/src/pipeline/exec/hashjoin_build_sink.cpp | 101 ++------ be/src/pipeline/exec/hashjoin_build_sink.h | 5 +- be/src/pipeline/exec/hashjoin_probe_operator.h | 4 +- be/src/pipeline/exec/set_sink_operator.cpp | 13 +- be/src/pipeline/exec/set_sink_operator.h | 2 +- be/src/pipeline/exec/set_source_operator.cpp | 6 +- be/src/pipeline/pipeline_x/dependency.h | 7 +- be/src/vec/common/hash_table/hash_map.h | 67 ++++++ be/src/vec/common/hash_table/hash_map_context.h | 6 +- .../vec/common/hash_table/hash_table_set_build.h | 9 +- be/src/vec/exec/join/join_op.h | 72 +++--- be/src/vec/exec/join/process_hash_table_probe.h | 9 +- .../vec/exec/join/process_hash_table_probe_impl.h | 257 +++------------------ be/src/vec/exec/join/vhash_join_node.cpp | 72 ++---- be/src/vec/exec/join/vhash_join_node.h | 100 ++------ be/src/vec/exec/vset_operation_node.cpp | 69 +++--- be/src/vec/exec/vset_operation_node.h | 5 +- be/src/vec/runtime/shared_hash_table_controller.h | 11 +- 18 files changed, 256 insertions(+), 559 deletions(-) diff --git a/be/src/pipeline/exec/hashjoin_build_sink.cpp b/be/src/pipeline/exec/hashjoin_build_sink.cpp index 153882075b6..e7da32b0340 100644 --- a/be/src/pipeline/exec/hashjoin_build_sink.cpp +++ b/be/src/pipeline/exec/hashjoin_build_sink.cpp @@ -39,10 +39,7 @@ Overload(Callables&&... callables) -> Overload<Callables...>; HashJoinBuildSinkLocalState::HashJoinBuildSinkLocalState(DataSinkOperatorXBase* parent, RuntimeState* state) - : JoinBuildSinkLocalState(parent, state), - _build_block_idx(0), - _build_side_mem_used(0), - _build_side_last_mem_used(0) {} + : JoinBuildSinkLocalState(parent, state) {} Status HashJoinBuildSinkLocalState::init(RuntimeState* state, LocalSinkStateInfo& info) { RETURN_IF_ERROR(JoinBuildSinkLocalState::init(state, info)); @@ -52,13 +49,7 @@ Status HashJoinBuildSinkLocalState::init(RuntimeState* state, LocalSinkStateInfo auto& p = _parent->cast<HashJoinBuildSinkOperatorX>(); _shared_state->join_op_variants = p._join_op_variants; if (p._is_broadcast_join && state->enable_share_hash_table_for_broadcast_join()) { - _shared_state->build_blocks = p._shared_hash_table_context->blocks; - } else { - _shared_state->build_blocks.reset(new std::vector<vectorized::Block>()); - // avoid vector expand change block address. - // one block can store 4g data, _build_blocks can store 128*4g data. - // if probe data bigger than 512g, runtime filter maybe will core dump when insert data. - _shared_state->build_blocks->reserve(vectorized::HASH_JOIN_MAX_BUILD_BLOCK_COUNT); + _shared_state->build_block = p._shared_hash_table_context->block; } _shared_state->is_null_safe_eq_join = p._is_null_safe_eq_join; _shared_state->store_null_in_hash_table = p._store_null_in_hash_table; @@ -82,11 +73,6 @@ Status HashJoinBuildSinkLocalState::init(RuntimeState* state, LocalSinkStateInfo if (!_should_build_hash_table) { _shared_hash_table_dependency->block_writing(); p._shared_hashtable_controller->append_dependency(p.id(), _shared_hash_table_dependency); - } else if (p._is_broadcast_join) { - // avoid vector expand change block address. - // one block can store 4g data, _build_blocks can store 128*4g data. - // if probe data bigger than 512g, runtime filter maybe will core dump when insert data. - _shared_state->build_blocks->reserve(vectorized::HASH_JOIN_MAX_BUILD_BLOCK_COUNT); } _memory_usage_counter = ADD_LABEL_COUNTER(profile(), "MemoryUsage"); @@ -156,25 +142,24 @@ void HashJoinBuildSinkLocalState::init_short_circuit_for_probe() { _shared_state->short_circuit_for_probe = (_shared_state->_has_null_in_build_side && p._join_op == TJoinOp::NULL_AWARE_LEFT_ANTI_JOIN && !p._is_mark_join) || - (_shared_state->build_blocks->empty() && p._join_op == TJoinOp::INNER_JOIN && + (!_shared_state->build_block && p._join_op == TJoinOp::INNER_JOIN && !p._is_mark_join) || - (_shared_state->build_blocks->empty() && p._join_op == TJoinOp::LEFT_SEMI_JOIN && + (!_shared_state->build_block && p._join_op == TJoinOp::LEFT_SEMI_JOIN && !p._is_mark_join) || - (_shared_state->build_blocks->empty() && p._join_op == TJoinOp::RIGHT_OUTER_JOIN) || - (_shared_state->build_blocks->empty() && p._join_op == TJoinOp::RIGHT_SEMI_JOIN) || - (_shared_state->build_blocks->empty() && p._join_op == TJoinOp::RIGHT_ANTI_JOIN); + (!_shared_state->build_block && p._join_op == TJoinOp::RIGHT_OUTER_JOIN) || + (!_shared_state->build_block && p._join_op == TJoinOp::RIGHT_SEMI_JOIN) || + (!_shared_state->build_block && p._join_op == TJoinOp::RIGHT_ANTI_JOIN); //when build table rows is 0 and not have other_join_conjunct and not _is_mark_join and join type is one of LEFT_OUTER_JOIN/FULL_OUTER_JOIN/LEFT_ANTI_JOIN //we could get the result is probe table + null-column(if need output) _shared_state->empty_right_table_need_probe_dispose = - (_shared_state->build_blocks->empty() && !p._have_other_join_conjunct && - !p._is_mark_join) && + (!_shared_state->build_block && !p._have_other_join_conjunct && !p._is_mark_join) && (p._join_op == TJoinOp::LEFT_OUTER_JOIN || p._join_op == TJoinOp::FULL_OUTER_JOIN || p._join_op == TJoinOp::LEFT_ANTI_JOIN); } Status HashJoinBuildSinkLocalState::process_build_block(RuntimeState* state, - vectorized::Block& block, uint8_t offset) { + vectorized::Block& block) { auto& p = _parent->cast<HashJoinBuildSinkOperatorX>(); SCOPED_TIMER(_build_table_timer); size_t rows = block.rows(); @@ -220,7 +205,7 @@ Status HashJoinBuildSinkLocalState::process_build_block(RuntimeState* state, vectorized::ProcessHashTableBuild<HashTableCtxType, HashJoinBuildSinkLocalState> hash_table_build_process(rows, block, raw_ptrs, this, - state->batch_size(), offset, state); + state->batch_size(), state); return hash_table_build_process .template run<has_null_value, short_circuit_for_null_in_build_side>( arg, @@ -321,7 +306,7 @@ void HashJoinBuildSinkLocalState::_hash_table_init(RuntimeState* state) { } return; } - if (!try_get_hash_map_context_fixed<PartitionedHashMap, HashCRC32, RowRefListType>( + if (!try_get_hash_map_context_fixed<JoinFixedHashMap, HashCRC32, RowRefListType>( *_shared_state->hash_table_variants, _build_expr_ctxs)) { _shared_state->hash_table_variants ->emplace<vectorized::SerializedHashTableContext<RowRefListType>>(); @@ -331,16 +316,6 @@ void HashJoinBuildSinkLocalState::_hash_table_init(RuntimeState* state) { vectorized::make_bool_variant(p._have_other_join_conjunct)); DCHECK(!std::holds_alternative<std::monostate>(*_shared_state->hash_table_variants)); - - std::visit(vectorized::Overload {[&](std::monostate& arg) { - LOG(FATAL) << "FATAL: uninited hash table"; - __builtin_unreachable(); - }, - [&](auto&& arg) { - arg.hash_table->set_partitioned_threshold( - state->partitioned_hash_join_rows_threshold()); - }}, - *_shared_state->hash_table_variants); } HashJoinBuildSinkOperatorX::HashJoinBuildSinkOperatorX(ObjectPool* pool, const TPlanNode& tnode, @@ -402,9 +377,6 @@ Status HashJoinBuildSinkOperatorX::sink(RuntimeState* state, vectorized::Block* SCOPED_TIMER(local_state.profile()->total_time_counter()); COUNTER_UPDATE(local_state.rows_input_counter(), (int64_t)in_block->rows()); - // make one block for each 4 gigabytes - constexpr static auto BUILD_BLOCK_MAX_SIZE = 4 * 1024UL * 1024UL * 1024UL; - if (local_state._shared_state->_has_null_in_build_side) { // TODO: if _has_null_in_build_side is true we should finish current pipeline task. DCHECK(state->enable_pipeline_exec()); @@ -417,52 +389,29 @@ Status HashJoinBuildSinkOperatorX::sink(RuntimeState* state, vectorized::Block* if (in_block->rows() != 0) { SCOPED_TIMER(local_state._build_side_merge_block_timer); + if (local_state._build_side_mutable_block.empty()) { + RETURN_IF_ERROR(local_state._build_side_mutable_block.merge( + *(in_block->create_same_struct_block(1, false)))); + } RETURN_IF_ERROR(local_state._build_side_mutable_block.merge(*in_block)); - } - - if (UNLIKELY(local_state._build_side_mem_used - local_state._build_side_last_mem_used > - BUILD_BLOCK_MAX_SIZE)) { - if (local_state._shared_state->build_blocks->size() == - vectorized::HASH_JOIN_MAX_BUILD_BLOCK_COUNT) { - return Status::NotSupported(strings::Substitute( - "data size of right table in hash join > $0", - BUILD_BLOCK_MAX_SIZE * vectorized::HASH_JOIN_MAX_BUILD_BLOCK_COUNT)); + if (local_state._build_side_mutable_block.rows() > + std::numeric_limits<uint32_t>::max()) { + return Status::NotSupported( + "Hash join do not support build table rows" + " over:" + + std::to_string(std::numeric_limits<uint32_t>::max())); } - local_state._shared_state->build_blocks->emplace_back( - local_state._build_side_mutable_block.to_block()); - - COUNTER_UPDATE(local_state._build_blocks_memory_usage, - (*local_state._shared_state->build_blocks)[local_state._build_block_idx] - .bytes()); - - // TODO:: Rethink may we should do the process after we receive all build blocks ? - // which is better. - RETURN_IF_ERROR(local_state.process_build_block( - state, (*local_state._shared_state->build_blocks)[local_state._build_block_idx], - local_state._build_block_idx)); - - local_state._build_side_mutable_block = vectorized::MutableBlock(); - ++local_state._build_block_idx; - local_state._build_side_last_mem_used = local_state._build_side_mem_used; } } if (local_state._should_build_hash_table && source_state == SourceState::FINISHED) { if (!local_state._build_side_mutable_block.empty()) { - if (local_state._shared_state->build_blocks->size() == - vectorized::HASH_JOIN_MAX_BUILD_BLOCK_COUNT) { - return Status::NotSupported(strings::Substitute( - "data size of right table in hash join > $0", - BUILD_BLOCK_MAX_SIZE * vectorized::HASH_JOIN_MAX_BUILD_BLOCK_COUNT)); - } - local_state._shared_state->build_blocks->emplace_back( + local_state._shared_state->build_block = std::make_shared<vectorized::Block>( local_state._build_side_mutable_block.to_block()); COUNTER_UPDATE(local_state._build_blocks_memory_usage, - (*local_state._shared_state->build_blocks)[local_state._build_block_idx] - .bytes()); + (*local_state._shared_state->build_block).bytes()); RETURN_IF_ERROR(local_state.process_build_block( - state, (*local_state._shared_state->build_blocks)[local_state._build_block_idx], - local_state._build_block_idx)); + state, (*local_state._shared_state->build_block))); } auto ret = std::visit( Overload {[&](std::monostate&) -> Status { @@ -552,7 +501,7 @@ Status HashJoinBuildSinkOperatorX::sink(RuntimeState* state, vectorized::Block* if (source_state == SourceState::FINISHED) { // Since the comparison of null values is meaningless, null aware left anti join should not output null // when the build side is not empty. - if (!local_state._shared_state->build_blocks->empty() && + if (!local_state._shared_state->build_block && _join_op == TJoinOp::NULL_AWARE_LEFT_ANTI_JOIN) { local_state._shared_state->probe_ignore_null = true; } diff --git a/be/src/pipeline/exec/hashjoin_build_sink.h b/be/src/pipeline/exec/hashjoin_build_sink.h index 9b43f95cd3b..9cf559588cc 100644 --- a/be/src/pipeline/exec/hashjoin_build_sink.h +++ b/be/src/pipeline/exec/hashjoin_build_sink.h @@ -61,11 +61,11 @@ public: ENABLE_FACTORY_CREATOR(HashJoinBuildSinkLocalState); using Parent = HashJoinBuildSinkOperatorX; HashJoinBuildSinkLocalState(DataSinkOperatorXBase* parent, RuntimeState* state); - ~HashJoinBuildSinkLocalState() = default; + ~HashJoinBuildSinkLocalState() override = default; Status init(RuntimeState* state, LocalSinkStateInfo& info) override; Status open(RuntimeState* state) override; - Status process_build_block(RuntimeState* state, vectorized::Block& block, uint8_t offset); + Status process_build_block(RuntimeState* state, vectorized::Block& block); void init_short_circuit_for_probe(); HashJoinBuildSinkOperatorX* join_build() { return (HashJoinBuildSinkOperatorX*)_parent; } @@ -94,7 +94,6 @@ protected: std::vector<IRuntimeFilter*> _runtime_filters; bool _should_build_hash_table = true; - uint8_t _build_block_idx = 0; int64_t _build_side_mem_used = 0; int64_t _build_side_last_mem_used = 0; vectorized::MutableBlock _build_side_mutable_block; diff --git a/be/src/pipeline/exec/hashjoin_probe_operator.h b/be/src/pipeline/exec/hashjoin_probe_operator.h index 082f45199c1..14503a90357 100644 --- a/be/src/pipeline/exec/hashjoin_probe_operator.h +++ b/be/src/pipeline/exec/hashjoin_probe_operator.h @@ -89,9 +89,7 @@ public: vectorized::DataTypes right_table_data_types(); vectorized::DataTypes left_table_data_types(); bool* has_null_in_build_side() { return &_shared_state->_has_null_in_build_side; } - std::shared_ptr<std::vector<vectorized::Block>> build_blocks() const { - return _shared_state->build_blocks; - } + std::shared_ptr<vectorized::Block> build_block() const { return _shared_state->build_block; } private: void _prepare_probe_block(); diff --git a/be/src/pipeline/exec/set_sink_operator.cpp b/be/src/pipeline/exec/set_sink_operator.cpp index 6725deffa14..a982b079763 100644 --- a/be/src/pipeline/exec/set_sink_operator.cpp +++ b/be/src/pipeline/exec/set_sink_operator.cpp @@ -60,8 +60,7 @@ Status SetSinkOperatorX<is_intersect>::sink(RuntimeState* state, vectorized::Blo COUNTER_UPDATE(local_state.rows_input_counter(), (int64_t)in_block->rows()); auto& mem_used = local_state._shared_state->mem_used; - auto& build_blocks = local_state._shared_state->build_blocks; - auto& build_block_index = local_state._shared_state->build_block_index; + auto& build_block = local_state._shared_state->build_block; auto& valid_element_in_hash_tbl = local_state._shared_state->valid_element_in_hash_tbl; if (in_block->rows() != 0) { @@ -71,11 +70,9 @@ Status SetSinkOperatorX<is_intersect>::sink(RuntimeState* state, vectorized::Blo if (source_state == SourceState::FINISHED || local_state._mutable_block.allocated_bytes() >= BUILD_BLOCK_MAX_SIZE) { - build_blocks.emplace_back(local_state._mutable_block.to_block()); - RETURN_IF_ERROR(_process_build_block(local_state, build_blocks[build_block_index], - build_block_index, state)); + build_block = local_state._mutable_block.to_block(); + RETURN_IF_ERROR(_process_build_block(local_state, build_block, state)); local_state._mutable_block.clear(); - ++build_block_index; if (source_state == SourceState::FINISHED) { if constexpr (is_intersect) { @@ -101,7 +98,7 @@ Status SetSinkOperatorX<is_intersect>::sink(RuntimeState* state, vectorized::Blo template <bool is_intersect> Status SetSinkOperatorX<is_intersect>::_process_build_block( - SetSinkLocalState<is_intersect>& local_state, vectorized::Block& block, uint8_t offset, + SetSinkLocalState<is_intersect>& local_state, vectorized::Block& block, RuntimeState* state) { size_t rows = block.rows(); if (rows == 0) { @@ -117,7 +114,7 @@ Status SetSinkOperatorX<is_intersect>::_process_build_block( using HashTableCtxType = std::decay_t<decltype(arg)>; if constexpr (!std::is_same_v<HashTableCtxType, std::monostate>) { vectorized::HashTableBuild<HashTableCtxType, is_intersect> - hash_table_build_process(&local_state, rows, raw_ptrs, offset, state); + hash_table_build_process(&local_state, rows, raw_ptrs, state); static_cast<void>(hash_table_build_process(arg, local_state._arena)); } else { LOG(FATAL) << "FATAL: uninited hash table"; diff --git a/be/src/pipeline/exec/set_sink_operator.h b/be/src/pipeline/exec/set_sink_operator.h index 5383b1b3a55..1250433dacc 100644 --- a/be/src/pipeline/exec/set_sink_operator.h +++ b/be/src/pipeline/exec/set_sink_operator.h @@ -119,7 +119,7 @@ private: friend struct HashTableBuild; Status _process_build_block(SetSinkLocalState<is_intersect>& local_state, - vectorized::Block& block, uint8_t offset, RuntimeState* state); + vectorized::Block& block, RuntimeState* state); Status _extract_build_column(SetSinkLocalState<is_intersect>& local_state, vectorized::Block& block, vectorized::ColumnRawPtrs& raw_ptrs); diff --git a/be/src/pipeline/exec/set_source_operator.cpp b/be/src/pipeline/exec/set_source_operator.cpp index 6baf084e94a..b9a3433c159 100644 --- a/be/src/pipeline/exec/set_source_operator.cpp +++ b/be/src/pipeline/exec/set_source_operator.cpp @@ -176,12 +176,12 @@ void SetSourceOperatorX<is_intersect>::_add_result_columns( SetSourceLocalState<is_intersect>& local_state, vectorized::RowRefListWithFlags& value, int& block_size) { auto& build_col_idx = local_state._shared_state->build_col_idx; - auto& build_blocks = local_state._shared_state->build_blocks; + auto& build_block = local_state._shared_state->build_block; auto it = value.begin(); for (auto idx = build_col_idx.begin(); idx != build_col_idx.end(); ++idx) { - auto& column = *build_blocks[it->block_offset].get_by_position(idx->first).column; - if (local_state._mutable_cols[idx->second]->is_nullable() xor column.is_nullable()) { + auto& column = *build_block.get_by_position(idx->first).column; + if (local_state._mutable_cols[idx->second]->is_nullable() ^ column.is_nullable()) { DCHECK(local_state._mutable_cols[idx->second]->is_nullable()); ((vectorized::ColumnNullable*)(local_state._mutable_cols[idx->second].get())) ->insert_from_not_nullable(column, it->row_num); diff --git a/be/src/pipeline/pipeline_x/dependency.h b/be/src/pipeline/pipeline_x/dependency.h index 79089c48ed9..d44692909d0 100644 --- a/be/src/pipeline/pipeline_x/dependency.h +++ b/be/src/pipeline/pipeline_x/dependency.h @@ -577,7 +577,7 @@ struct HashJoinSharedState : public JoinSharedState { std::make_shared<vectorized::HashTableVariants>(); const std::vector<TupleDescriptor*> build_side_child_desc; size_t build_exprs_size = 0; - std::shared_ptr<std::vector<vectorized::Block>> build_blocks = nullptr; + std::shared_ptr<vectorized::Block> build_block; bool probe_ignore_null = false; }; @@ -670,8 +670,7 @@ struct SetSharedState { /// default init //record memory during running int64_t mem_used = 0; - std::vector<vectorized::Block> build_blocks; // build to source - int build_block_index = 0; // build to source + vectorized::Block build_block; // build to source //record element size in hashtable int64_t valid_element_in_hash_tbl = 0; //first:column_id, could point to origin column or cast column @@ -742,7 +741,7 @@ public: return; } - if (!try_get_hash_map_context_fixed<PartitionedHashMap, HashCRC32, + if (!try_get_hash_map_context_fixed<JoinFixedHashMap, HashCRC32, vectorized::RowRefListWithFlags>( *hash_table_variants, child_exprs_lists[0])) { hash_table_variants->emplace< diff --git a/be/src/vec/common/hash_table/hash_map.h b/be/src/vec/common/hash_table/hash_map.h index 5b7cd6f4642..85110deba62 100644 --- a/be/src/vec/common/hash_table/hash_map.h +++ b/be/src/vec/common/hash_table/hash_map.h @@ -20,6 +20,8 @@ #pragma once +#include <span> + #include "vec/common/hash_table/hash.h" #include "vec/common/hash_table/hash_table.h" #include "vec/common/hash_table/hash_table_allocator.h" @@ -193,10 +195,75 @@ public: bool has_null_key_data() const { return false; } }; +template <typename Key, typename Cell, typename Hash = DefaultHash<Key>, + typename Grower = HashTableGrower<>, typename Allocator = HashTableAllocator> +class JoinHashMapTable : public HashMapTable<Key, Cell, Hash, Grower, Allocator> { +public: + using Self = JoinHashMapTable; + using Base = HashMapTable<Key, Cell, Hash, Grower, Allocator>; + + using key_type = Key; + using value_type = typename Cell::value_type; + using mapped_type = typename Cell::Mapped; + + using LookupResult = typename Base::LookupResult; + + using HashMapTable<Key, Cell, Hash, Grower, Allocator>::HashMapTable; + + static uint32_t calc_bucket_size(size_t num_elem) { + size_t expect_bucket_size = static_cast<size_t>(num_elem) + (num_elem - 1) / 7; + return phmap::priv::NormalizeCapacity(expect_bucket_size) + 1; + } + + void build(const Key* __restrict keys, const size_t* __restrict hash_values, int num_elem) { + bucket_size = calc_bucket_size(num_elem + 1); + first.resize(bucket_size, 0); + next.resize(num_elem); + + build_keys = keys; + for (size_t i = 1; i < num_elem; i++) { + uint32_t bucket_num = hash_values[i] & (bucket_size - 1); + next[i] = first[bucket_num]; + first[bucket_num] = i; + } + } + + auto find_batch(const Key* __restrict keys, const size_t* __restrict hash_values, int probe_idx, + int probe_rows, std::vector<uint32_t>& probe_idxs, + std::vector<int>& build_idxs) { + auto matched_cnt = 0; + while (probe_idx < probe_rows && matched_cnt < 4096) { + uint32_t bucket_num = hash_values[probe_idx] & (bucket_size - 1); + auto build_idx = first[bucket_num]; + while (build_idx) { + if (keys[probe_idx] == build_keys[build_idx]) { + probe_idxs[matched_cnt] = probe_idx; + build_idxs[matched_cnt] = build_idx; + matched_cnt++; + } + build_idx = next[build_idx]; + } + probe_idx++; + } + return std::pair {probe_idx, matched_cnt}; + } + +private: + const Key* __restrict build_keys; + uint32_t bucket_size = 0; + std::vector<uint32_t> first; + std::vector<uint32_t> next; + Cell cell; + doris::vectorized::Arena* pool; +}; + template <typename Key, typename Mapped, typename Hash = DefaultHash<Key>, typename Grower = HashTableGrower<>, typename Allocator = HashTableAllocator> using HashMap = HashMapTable<Key, HashMapCell<Key, Mapped, Hash>, Hash, Grower, Allocator>; +template <typename Key, typename Mapped, typename Hash = DefaultHash<Key>> +using JoinFixedHashMap = JoinHashMapTable<Key, HashMapCell<Key, Mapped, Hash>, Hash>; + template <typename Key, typename Mapped, typename Hash = DefaultHash<Key>, typename Grower = HashTableGrower<>, typename Allocator = HashTableAllocator> using HashMapWithSavedHash = diff --git a/be/src/vec/common/hash_table/hash_map_context.h b/be/src/vec/common/hash_table/hash_map_context.h index f40a351f9d8..0b1748a9723 100644 --- a/be/src/vec/common/hash_table/hash_map_context.h +++ b/be/src/vec/common/hash_table/hash_map_context.h @@ -478,14 +478,14 @@ struct MethodSingleNullableColumn : public SingleColumnMethod { }; template <typename RowRefListType> -using SerializedHashTableContext = MethodSerialized<PartitionedHashMap<StringRef, RowRefListType>>; +using SerializedHashTableContext = MethodSerialized<JoinFixedHashMap<StringRef, RowRefListType>>; template <class T, typename RowRefListType> using PrimaryTypeHashTableContext = - MethodOneNumber<T, PartitionedHashMap<T, RowRefListType, HashCRC32<T>>>; + MethodOneNumber<T, JoinFixedHashMap<T, RowRefListType, HashCRC32<T>>>; template <class Key, bool has_null, typename Value> using FixedKeyHashTableContext = - MethodKeysFixed<PartitionedHashMap<Key, Value, HashCRC32<Key>>, has_null>; + MethodKeysFixed<JoinFixedHashMap<Key, Value, HashCRC32<Key>>, has_null>; } // namespace doris::vectorized \ No newline at end of file diff --git a/be/src/vec/common/hash_table/hash_table_set_build.h b/be/src/vec/common/hash_table/hash_table_set_build.h index e3c1ed27b1f..48c9e2d1673 100644 --- a/be/src/vec/common/hash_table/hash_table_set_build.h +++ b/be/src/vec/common/hash_table/hash_table_set_build.h @@ -24,11 +24,9 @@ namespace doris::vectorized { template <class HashTableContext, bool is_intersect> struct HashTableBuild { template <typename Parent> - HashTableBuild(Parent* parent, int rows, ColumnRawPtrs& build_raw_ptrs, uint8_t offset, - RuntimeState* state) + HashTableBuild(Parent* parent, int rows, ColumnRawPtrs& build_raw_ptrs, RuntimeState* state) : _mem_used(parent->mem_used()), _rows(rows), - _offset(offset), _build_raw_ptrs(build_raw_ptrs), _state(state) {} @@ -48,9 +46,9 @@ struct HashTableBuild { size_t k = 0; auto creator = [&](const auto& ctor, auto& key, auto& origin) { HashTableContext::try_presis_key(key, origin, arena); - ctor(key, Mapped {k, _offset}); + ctor(key, Mapped {k}); }; - auto creator_for_null_key = [&](auto& mapped) { mapped = {k, _offset}; }; + auto creator_for_null_key = [&](auto& mapped) { mapped = {k}; }; for (; k < _rows; ++k) { if (k % CHECK_FRECUENCY == 0) { @@ -64,7 +62,6 @@ struct HashTableBuild { private: int64_t* _mem_used; const int _rows; - const uint8_t _offset; ColumnRawPtrs& _build_raw_ptrs; RuntimeState* _state; }; diff --git a/be/src/vec/exec/join/join_op.h b/be/src/vec/exec/join/join_op.h index 1b8b8f2c695..858f5197b03 100644 --- a/be/src/vec/exec/join/join_op.h +++ b/be/src/vec/exec/join/join_op.h @@ -18,7 +18,6 @@ #pragma once #include "vec/common/arena.h" #include "vec/common/columns_hashing.h" -#include "vec/common/hash_table/hash_map.h" #include "vec/core/block.h" namespace doris::vectorized { @@ -45,19 +44,19 @@ namespace doris::vectorized { */ struct RowRef { uint32_t row_num = 0; - uint8_t block_offset; RowRef() = default; - RowRef(size_t row_num_count, uint8_t block_offset_) - : row_num(row_num_count), block_offset(block_offset_) {} + RowRef(size_t row_num_count) : row_num(row_num_count) {} + void clear() {}; }; struct RowRefWithFlag : public RowRef { bool visited; RowRefWithFlag() = default; - RowRefWithFlag(size_t row_num_count, uint8_t block_offset_, bool is_visited = false) - : RowRef(row_num_count, block_offset_), visited(is_visited) {} + RowRefWithFlag(size_t row_num_count, bool is_visited = false) + : RowRef(row_num_count), visited(is_visited) {} + void clear() {}; }; /// Portion of RowRefs, 16 * (MAX_SIZE + 1) bytes sized. @@ -93,14 +92,15 @@ public: ForwardIterator() : root(nullptr), first(false), batch(nullptr), position(0) {} ForwardIterator(RowRefListType* begin) - : root(begin), first(true), batch(root->next), position(0) {} + : root(begin), first(true), batch((&root->next)), position(0) {} RowRefType& operator*() { if (first) { return *root; } - return batch->row_refs[position]; + return batch->operator[](position); } + RowRefType* operator->() { return &(**this); } void operator++() { @@ -109,21 +109,17 @@ public: return; } - if (batch) { + if (batch && position < batch->size()) { ++position; - if (position >= batch->size) { - batch = batch->next; - position = 0; - } } } - bool ok() const { return first || batch; } + bool ok() const { return first || (batch && position < batch->size()); } private: RowRefListType* root; bool first; - Batch<RowRefType>* batch; + std::vector<RowRefType>* batch; size_t position; }; @@ -131,76 +127,60 @@ struct RowRefList : RowRef { using RowRefType = RowRef; RowRefList() = default; - RowRefList(size_t row_num_, uint8_t block_offset_) : RowRef(row_num_, block_offset_) {} + RowRefList(size_t row_num_) : RowRef(row_num_) {} ForwardIterator<RowRefList> begin() { return ForwardIterator<RowRefList>(this); } /// insert element after current one - void insert(RowRefType&& row_ref, Arena& pool) { - if (!next) { - next = pool.alloc<Batch<RowRefType>>(); - *next = Batch<RowRefType>(nullptr); - } - next = next->insert(std::move(row_ref), pool); - } + void insert(RowRefType&& row_ref, Arena& pool) { next.emplace_back(std::move(row_ref)); } + + void clear() { next.clear(); } private: friend class ForwardIterator<RowRefList>; - - Batch<RowRefType>* next = nullptr; + std::vector<RowRefType> next; }; struct RowRefListWithFlag : RowRef { using RowRefType = RowRef; RowRefListWithFlag() = default; - RowRefListWithFlag(size_t row_num_, uint8_t block_offset_) : RowRef(row_num_, block_offset_) {} + RowRefListWithFlag(size_t row_num_) : RowRef(row_num_) {} ForwardIterator<RowRefListWithFlag> const begin() { return ForwardIterator<RowRefListWithFlag>(this); } /// insert element after current one - void insert(RowRef&& row_ref, Arena& pool) { - if (!next) { - next = pool.alloc<Batch<RowRefType>>(); - *next = Batch<RowRefType>(nullptr); - } - next = next->insert(std::move(row_ref), pool); - } + void insert(RowRefType&& row_ref, Arena& pool) { next.emplace_back(row_ref); } + + void clear() { next.clear(); } bool visited = false; private: friend class ForwardIterator<RowRefListWithFlag>; - - Batch<RowRefType>* next = nullptr; + std::vector<RowRefType> next; }; struct RowRefListWithFlags : RowRefWithFlag { using RowRefType = RowRefWithFlag; RowRefListWithFlags() = default; - RowRefListWithFlags(size_t row_num_, uint8_t block_offset_) - : RowRefWithFlag(row_num_, block_offset_) {} + RowRefListWithFlags(size_t row_num_) : RowRefWithFlag(row_num_) {} ForwardIterator<RowRefListWithFlags> const begin() { return ForwardIterator<RowRefListWithFlags>(this); } /// insert element after current one - void insert(RowRefWithFlag&& row_ref, Arena& pool) { - if (!next) { - next = pool.alloc<Batch<RowRefType>>(); - *next = Batch<RowRefType>(nullptr); - } - next = next->insert(std::move(row_ref), pool); - } + void insert(RowRefType&& row_ref, Arena& pool) { next.emplace_back(row_ref); } + + void clear() { next.clear(); } private: friend class ForwardIterator<RowRefListWithFlags>; - - Batch<RowRefType>* next = nullptr; + std::vector<RowRefType> next; }; } // namespace doris::vectorized diff --git a/be/src/vec/exec/join/process_hash_table_probe.h b/be/src/vec/exec/join/process_hash_table_probe.h index 435cea84186..8aea6492e77 100644 --- a/be/src/vec/exec/join/process_hash_table_probe.h +++ b/be/src/vec/exec/join/process_hash_table_probe.h @@ -75,7 +75,7 @@ struct ProcessHashTableProbe { UInt8* __restrict null_map_data, UInt8* __restrict filter_map, Block* output_block); - void _emplace_element(int8_t block_offset, int32_t block_row, int& current_offset); + void _emplace_element(int32_t block_row, int& current_offset); template <typename HashTableType> typename HashTableType::State _init_probe_side(HashTableType& hash_table_ctx, size_t probe_rows, @@ -94,14 +94,13 @@ struct ProcessHashTableProbe { Parent* _parent; const int _batch_size; - std::shared_ptr<std::vector<Block>> _build_blocks; + std::shared_ptr<Block> _build_block; std::unique_ptr<Arena> _arena; std::vector<StringRef> _probe_keys; std::vector<uint32_t> _probe_indexs; - PaddedPODArray<int8_t> _build_block_offsets; - PaddedPODArray<int32_t> _build_block_rows; - std::vector<std::pair<int8_t, int>> _build_blocks_locs; + std::vector<int> _build_block_rows; + std::vector<int> _build_blocks_locs; // only need set the tuple is null in RIGHT_OUTER_JOIN and FULL_OUTER_JOIN ColumnUInt8::Container* _tuple_is_null_left_flags; // only need set the tuple is null in LEFT_OUTER_JOIN and FULL_OUTER_JOIN diff --git a/be/src/vec/exec/join/process_hash_table_probe_impl.h b/be/src/vec/exec/join/process_hash_table_probe_impl.h index f4a3010c49f..dd6f79b9eb4 100644 --- a/be/src/vec/exec/join/process_hash_table_probe_impl.h +++ b/be/src/vec/exec/join/process_hash_table_probe_impl.h @@ -32,7 +32,7 @@ template <int JoinOpType, typename Parent> ProcessHashTableProbe<JoinOpType, Parent>::ProcessHashTableProbe(Parent* parent, int batch_size) : _parent(parent), _batch_size(batch_size), - _build_blocks(parent->build_blocks()), + _build_block(parent->build_block()), _tuple_is_null_left_flags(parent->is_outer_join() ? &(reinterpret_cast<ColumnUInt8&>( *parent->_tuple_is_null_left_flag_column) @@ -69,51 +69,13 @@ void ProcessHashTableProbe<JoinOpType, Parent>::build_side_output_column( JoinOpType == TJoinOp::LEFT_OUTER_JOIN || JoinOpType == TJoinOp::FULL_OUTER_JOIN; if (!is_semi_anti_join || have_other_join_conjunct) { - if (_build_blocks->size() == 1) { - for (int i = 0; i < _right_col_len; i++) { - auto& column = *(*_build_blocks)[0].get_by_position(i).column; - if (output_slot_flags[i]) { - mcol[i + _right_col_idx]->insert_indices_from(column, _build_block_rows.data(), - _build_block_rows.data() + size); - } else { - mcol[i + _right_col_idx]->insert_many_defaults(size); - } - } - } else { - for (int i = 0; i < _right_col_len; i++) { - if (output_slot_flags[i]) { - for (int j = 0; j < size; j++) { - if constexpr (probe_all) { - if (_build_block_offsets[j] == -1) { - DCHECK(mcol[i + _right_col_idx]->is_nullable()); - assert_cast<ColumnNullable*>(mcol[i + _right_col_idx].get()) - ->insert_default(); - } else { - auto& column = *(*_build_blocks)[_build_block_offsets[j]] - .get_by_position(i) - .column; - mcol[i + _right_col_idx]->insert_from(column, _build_block_rows[j]); - } - } else { - if (_build_block_offsets[j] == -1) { - // the only case to reach here: - // 1. left anti join with other conjuncts, and - // 2. equal conjuncts does not match - // since nullptr is emplaced back to visited_map, - // the output value of the build side does not matter, - // just insert default value - mcol[i + _right_col_idx]->insert_default(); - } else { - auto& column = *(*_build_blocks)[_build_block_offsets[j]] - .get_by_position(i) - .column; - mcol[i + _right_col_idx]->insert_from(column, _build_block_rows[j]); - } - } - } - } else { - mcol[i + _right_col_idx]->insert_many_defaults(size); - } + for (int i = 0; i < _right_col_len; i++) { + const auto& column = *_build_block->get_by_position(i).column; + if (output_slot_flags[i]) { + mcol[i + _right_col_idx]->insert_indices_from(column, _build_block_rows.data(), + _build_block_rows.data() + size); + } else { + mcol[i + _right_col_idx]->insert_many_defaults(size); } } } @@ -167,7 +129,6 @@ typename HashTableType::State ProcessHashTableProbe<JoinOpType, Parent>::_init_p _row_count_from_last_probe = 0; _build_block_rows.clear(); - _build_block_offsets.clear(); _probe_indexs.clear(); if (with_other_join_conjuncts) { // use in right join to change visited state after exec the vother join conjunct @@ -178,7 +139,6 @@ typename HashTableType::State ProcessHashTableProbe<JoinOpType, Parent>::_init_p } _probe_indexs.reserve(_batch_size * PROBE_SIDE_EXPLODE_RATE); _build_block_rows.reserve(_batch_size * PROBE_SIDE_EXPLODE_RATE); - _build_block_offsets.reserve(_batch_size * PROBE_SIDE_EXPLODE_RATE); if (!_parent->_ready_probe) { _parent->_ready_probe = true; @@ -199,8 +159,7 @@ ForwardIterator<Mapped>& ProcessHashTableProbe<JoinOpType, Parent>::_probe_row_m SCOPED_TIMER(_search_hashtable_timer); for (; probe_row_match_iter.ok() && current_offset < _batch_size; ++probe_row_match_iter) { - _emplace_element(probe_row_match_iter->block_offset, probe_row_match_iter->row_num, - current_offset); + _emplace_element(probe_row_match_iter->row_num, current_offset); _probe_indexs.emplace_back(probe_index); if constexpr (with_other_join_conjuncts) { _visited_map.emplace_back(&probe_row_match_iter->visited); @@ -218,10 +177,8 @@ ForwardIterator<Mapped>& ProcessHashTableProbe<JoinOpType, Parent>::_probe_row_m } template <int JoinOpType, typename Parent> -void ProcessHashTableProbe<JoinOpType, Parent>::_emplace_element(int8_t block_offset, - int32_t block_row, +void ProcessHashTableProbe<JoinOpType, Parent>::_emplace_element(int32_t block_row, int& current_offset) { - _build_block_offsets.emplace_back(block_offset); _build_block_rows.emplace_back(block_row); current_offset++; } @@ -236,21 +193,13 @@ Status ProcessHashTableProbe<JoinOpType, Parent>::do_process(HashTableType& hash size_t probe_rows) { auto& probe_index = _parent->_probe_index; - using KeyGetter = typename HashTableType::State; using Mapped = typename HashTableType::Mapped; - KeyGetter key_getter = - _init_probe_side<HashTableType>(hash_table_ctx, probe_rows, with_other_conjuncts, - need_null_map_for_probe ? null_map->data() : nullptr); + _init_probe_side<HashTableType>(hash_table_ctx, probe_rows, with_other_conjuncts, + need_null_map_for_probe ? null_map->data() : nullptr); auto& mcol = mutable_block.mutable_columns(); - constexpr auto is_right_semi_anti_join = - JoinOpType == TJoinOp::RIGHT_ANTI_JOIN || JoinOpType == TJoinOp::RIGHT_SEMI_JOIN; - - constexpr auto probe_all = - JoinOpType == TJoinOp::LEFT_OUTER_JOIN || JoinOpType == TJoinOp::FULL_OUTER_JOIN; - int last_probe_index = probe_index; int current_offset = 0; @@ -283,121 +232,12 @@ Status ProcessHashTableProbe<JoinOpType, Parent>::do_process(HashTableType& hash { SCOPED_TIMER(_search_hashtable_timer); - using FindResult = KeyGetter::FindResult; - FindResult empty = {nullptr, false}; - while (current_offset < _batch_size && probe_index < probe_rows) { - if constexpr (ignore_null && need_null_map_for_probe) { - if ((*null_map)[probe_index]) { - if constexpr (probe_all) { - // only full outer / left outer need insert the data of right table - _emplace_element(-1, -1, current_offset); - _probe_indexs.emplace_back(probe_index); - - if constexpr (with_other_conjuncts) { - _same_to_prev.emplace_back(false); - _visited_map.emplace_back(nullptr); - } - } else { - all_match_one = false; - } - probe_index++; - continue; - } - } - - const auto& find_result = need_null_map_for_probe && (*null_map)[probe_index] - ? empty - : hash_table_ctx.find(key_getter, probe_index); - - auto current_probe_index = probe_index; - if constexpr (!with_other_conjuncts && - (JoinOpType == TJoinOp::LEFT_ANTI_JOIN || - JoinOpType == TJoinOp::NULL_AWARE_LEFT_ANTI_JOIN || - JoinOpType == TJoinOp::LEFT_SEMI_JOIN)) { - bool need_go_ahead = - (JoinOpType != TJoinOp::LEFT_SEMI_JOIN) ^ find_result.is_found(); - if constexpr (is_mark_join) { - ++current_offset; - bool null_result = (need_null_map_for_probe && (*null_map)[probe_index]) || - (!need_go_ahead && *_has_null_in_build_side); - if (null_result) { - mark_column->insert_null(); - } else { - mark_column->insert_value(need_go_ahead); - } - } else { - current_offset += need_go_ahead; - } - ++probe_index; - } else { - if (find_result.is_found()) { - auto& mapped = find_result.get_mapped(); - auto origin_offset = current_offset; - - // For mark join, if euqual-matched tuple count for one probe row - // excceeds batch size, it's difficult to implement the logic to - // split them into multiple sub blocks and handle them, keep the original - // logic for now. - if constexpr (is_mark_join && with_other_conjuncts) { - for (auto it = mapped.begin(); it.ok(); ++it) { - _emplace_element(it->block_offset, it->row_num, current_offset); - _visited_map.emplace_back(&it->visited); - } - ++probe_index; - } else if constexpr (with_other_conjuncts || !is_right_semi_anti_join) { - auto multi_match_last_offset = current_offset; - auto it = mapped.begin(); - for (; it.ok() && current_offset < _batch_size; ++it) { - _emplace_element(it->block_offset, it->row_num, current_offset); - - if constexpr (with_other_conjuncts) { - _visited_map.emplace_back(&it->visited); - } - } - probe_row_match_iter = it; - if (!it.ok()) { - // If all matched rows for the current probe row are handled, - // advance to next probe row. - // If not(which means it excceed batch size), probe_index is not increased and - // remaining matched rows for the current probe row will be - // handled in the next call of this function - ++probe_index; - } else if constexpr (with_other_conjuncts) { - multi_matched_output_row_count = - current_offset - multi_match_last_offset; - } - } else { - ++probe_index; - } - if constexpr (std::is_same_v<Mapped, RowRefListWithFlag>) { - mapped.visited = true; - } - - if constexpr (with_other_conjuncts) { - _same_to_prev.emplace_back(false); - for (int i = 0; i < current_offset - origin_offset - 1; ++i) { - _same_to_prev.emplace_back(true); - } - } - } else if constexpr (probe_all || JoinOpType == TJoinOp::LEFT_ANTI_JOIN || - JoinOpType == TJoinOp::NULL_AWARE_LEFT_ANTI_JOIN || - (JoinOpType == TJoinOp::LEFT_SEMI_JOIN && is_mark_join)) { - // only full outer / left outer need insert the data of right table - _emplace_element(-1, -1, current_offset); - - if constexpr (with_other_conjuncts) { - _same_to_prev.emplace_back(false); - _visited_map.emplace_back(nullptr); - } - ++probe_index; - } else { - ++probe_index; - } - } - all_match_one &= (current_offset == _probe_indexs.size() + 1); - _probe_indexs.resize(current_offset, current_probe_index); - } - probe_size = probe_index - last_probe_index + probe_row_match_iter.ok(); + auto [new_probe_idx, new_current_offset] = hash_table_ctx.hash_table->find_batch( + hash_table_ctx.keys, hash_table_ctx.hash_values.data(), probe_index, probe_rows, + _probe_indexs, _build_block_rows); + probe_index = new_probe_idx; + current_offset = new_current_offset; + probe_size = probe_index - last_probe_index; } build_side_output_column(mcol, *_right_output_slot_flags, current_offset, with_other_conjuncts); @@ -793,24 +633,20 @@ Status ProcessHashTableProbe<JoinOpType, Parent>::process_data_in_hashtable( auto& visited_iter = std::get<ForwardIterator<Mapped>>(_parent->_outer_join_pull_visited_iter); _build_blocks_locs.resize(_batch_size); - auto register_build_loc = [&](int8_t offset, int32_t row_nums) { - _build_blocks_locs[block_size++] = std::pair<int8_t, int>(offset, row_nums); - }; - if (visited_iter.ok()) { if constexpr (std::is_same_v<Mapped, RowRefListWithFlag>) { for (; visited_iter.ok() && block_size < _batch_size; ++visited_iter) { - register_build_loc(visited_iter->block_offset, visited_iter->row_num); + _build_blocks_locs[block_size++] = visited_iter->row_num; } } else { for (; visited_iter.ok() && block_size < _batch_size; ++visited_iter) { if constexpr (JoinOpType == TJoinOp::RIGHT_SEMI_JOIN) { if (visited_iter->visited) { - register_build_loc(visited_iter->block_offset, visited_iter->row_num); + _build_blocks_locs[block_size++] = visited_iter->row_num; } } else { if (!visited_iter->visited) { - register_build_loc(visited_iter->block_offset, visited_iter->row_num); + _build_blocks_locs[block_size++] = visited_iter->row_num; } } } @@ -827,7 +663,7 @@ Status ProcessHashTableProbe<JoinOpType, Parent>::process_data_in_hashtable( if constexpr (JoinOpType == TJoinOp::RIGHT_SEMI_JOIN) { visited_iter = mapped.begin(); for (; visited_iter.ok() && block_size < _batch_size; ++visited_iter) { - register_build_loc(visited_iter->block_offset, visited_iter->row_num); + _build_blocks_locs[block_size++] = visited_iter->row_num; } if (visited_iter.ok()) { // block_size >= _batch_size, quit for loop @@ -838,7 +674,7 @@ Status ProcessHashTableProbe<JoinOpType, Parent>::process_data_in_hashtable( if constexpr (JoinOpType != TJoinOp::RIGHT_SEMI_JOIN) { visited_iter = mapped.begin(); for (; visited_iter.ok() && block_size < _batch_size; ++visited_iter) { - register_build_loc(visited_iter->block_offset, visited_iter->row_num); + _build_blocks_locs[block_size++] = visited_iter->row_num; } if (visited_iter.ok()) { // block_size >= _batch_size, quit for loop @@ -851,11 +687,11 @@ Status ProcessHashTableProbe<JoinOpType, Parent>::process_data_in_hashtable( for (; visited_iter.ok() && block_size < _batch_size; ++visited_iter) { if constexpr (JoinOpType == TJoinOp::RIGHT_SEMI_JOIN) { if (visited_iter->visited) { - register_build_loc(visited_iter->block_offset, visited_iter->row_num); + _build_blocks_locs[block_size++] = visited_iter->row_num; } } else { if (!visited_iter->visited) { - register_build_loc(visited_iter->block_offset, visited_iter->row_num); + _build_blocks_locs[block_size++] = visited_iter->row_num; } } } @@ -867,38 +703,17 @@ Status ProcessHashTableProbe<JoinOpType, Parent>::process_data_in_hashtable( } _build_blocks_locs.resize(block_size); - auto insert_build_rows = [&](int8_t offset) { - for (size_t j = 0; j < right_col_len; ++j) { - auto& column = *(*_build_blocks)[offset].get_by_position(j).column; - mcol[j + right_col_idx]->insert_indices_from( - column, _build_block_rows.data(), - _build_block_rows.data() + _build_block_rows.size()); - } - }; - if (_build_blocks->size() > 1) { - std::sort(_build_blocks_locs.begin(), _build_blocks_locs.end(), - [](const auto a, const auto b) { return a.first > b.first; }); - auto start = 0, end = 0; - while (start < _build_blocks_locs.size()) { - while (end < _build_blocks_locs.size() && - _build_blocks_locs[start].first == _build_blocks_locs[end].first) { - end++; - } - auto offset = _build_blocks_locs[start].first; - _build_block_rows.resize(end - start); - for (int i = 0; start + i < end; i++) { - _build_block_rows[i] = _build_blocks_locs[start + i].second; - } - start = end; - insert_build_rows(offset); - } - } else if (_build_blocks->size() == 1) { - const auto size = _build_blocks_locs.size(); - _build_block_rows.resize(_build_blocks_locs.size()); - for (int i = 0; i < size; i++) { - _build_block_rows[i] = _build_blocks_locs[i].second; - } - insert_build_rows(0); + const auto size = _build_blocks_locs.size(); + _build_block_rows.resize(_build_blocks_locs.size()); + for (int i = 0; i < size; i++) { + _build_block_rows[i] = _build_blocks_locs[i]; + } + + for (size_t j = 0; j < right_col_len; ++j) { + const auto& column = *_build_block->get_by_position(j).column; + mcol[j + right_col_idx]->insert_indices_from( + column, _build_block_rows.data(), + _build_block_rows.data() + _build_block_rows.size()); } // just resize the left table column in case with other conjunct to make block size is not zero diff --git a/be/src/vec/exec/join/vhash_join_node.cpp b/be/src/vec/exec/join/vhash_join_node.cpp index f0427a16bd9..10aadbe771d 100644 --- a/be/src/vec/exec/join/vhash_join_node.cpp +++ b/be/src/vec/exec/join/vhash_join_node.cpp @@ -98,12 +98,7 @@ HashJoinNode::HashJoinNode(ObjectPool* pool, const TPlanNode& tnode, const Descr _arena = std::make_shared<Arena>(); _hash_table_variants = std::make_shared<HashTableVariants>(); _process_hashtable_ctx_variants = std::make_unique<HashTableCtxVariants>(); - _build_blocks.reset(new std::vector<Block>()); - - // avoid vector expand change block address. - // one block can store 4g data, _build_blocks can store 128*4g data. - // if probe data bigger than 512g, runtime filter maybe will core dump when insert data. - _build_blocks->reserve(HASH_JOIN_MAX_BUILD_BLOCK_COUNT); + _build_block = std::make_shared<Block>(); } Status HashJoinNode::init(const TPlanNode& tnode, RuntimeState* state) { @@ -726,9 +721,6 @@ Status HashJoinNode::_materialize_build_side(RuntimeState* state) { Status HashJoinNode::sink(doris::RuntimeState* state, vectorized::Block* in_block, bool eos) { SCOPED_TIMER(_build_timer); - // make one block for each 4 gigabytes - constexpr static auto BUILD_BLOCK_MAX_SIZE = 4 * 1024UL * 1024UL * 1024UL; - if (_has_null_in_build_side) { // TODO: if _has_null_in_build_side is true we should finish current pipeline task. DCHECK(state->enable_pipeline_exec()); @@ -741,41 +733,25 @@ Status HashJoinNode::sink(doris::RuntimeState* state, vectorized::Block* in_bloc if (in_block->rows() != 0) { SCOPED_TIMER(_build_side_merge_block_timer); + if (_build_side_mutable_block.empty()) { + RETURN_IF_ERROR(_build_side_mutable_block.merge( + *(in_block->create_same_struct_block(1, false)))); + } RETURN_IF_ERROR(_build_side_mutable_block.merge(*in_block)); - } - - if (UNLIKELY(_build_side_mem_used - _build_side_last_mem_used > BUILD_BLOCK_MAX_SIZE)) { - if (_build_blocks->size() == HASH_JOIN_MAX_BUILD_BLOCK_COUNT) { - return Status::NotSupported(strings::Substitute( - "data size of right table in hash join > $0", - BUILD_BLOCK_MAX_SIZE * HASH_JOIN_MAX_BUILD_BLOCK_COUNT)); + if (_build_side_mutable_block.rows() > std::numeric_limits<uint32_t>::max()) { + return Status::NotSupported( + "Hash join do not support build table rows" + " over:" + + std::to_string(std::numeric_limits<uint32_t>::max())); } - _build_blocks->emplace_back(_build_side_mutable_block.to_block()); - - COUNTER_UPDATE(_build_blocks_memory_usage, (*_build_blocks)[_build_block_idx].bytes()); - - // TODO:: Rethink may we should do the process after we receive all build blocks ? - // which is better. - RETURN_IF_ERROR(_process_build_block(state, (*_build_blocks)[_build_block_idx], - _build_block_idx)); - - _build_side_mutable_block = MutableBlock(); - ++_build_block_idx; - _build_side_last_mem_used = _build_side_mem_used; } } if (_should_build_hash_table && eos) { if (!_build_side_mutable_block.empty()) { - if (_build_blocks->size() == HASH_JOIN_MAX_BUILD_BLOCK_COUNT) { - return Status::NotSupported(strings::Substitute( - "data size of right table in hash join > $0", - BUILD_BLOCK_MAX_SIZE * HASH_JOIN_MAX_BUILD_BLOCK_COUNT)); - } - _build_blocks->emplace_back(_build_side_mutable_block.to_block()); - COUNTER_UPDATE(_build_blocks_memory_usage, (*_build_blocks)[_build_block_idx].bytes()); - RETURN_IF_ERROR(_process_build_block(state, (*_build_blocks)[_build_block_idx], - _build_block_idx)); + _build_block = std::make_shared<Block>(_build_side_mutable_block.to_block()); + COUNTER_UPDATE(_build_blocks_memory_usage, _build_block->bytes()); + RETURN_IF_ERROR(_process_build_block(state, *_build_block)); } auto ret = std::visit(Overload {[&](std::monostate&) -> Status { LOG(FATAL) << "FATAL: uninited hash table"; @@ -797,7 +773,7 @@ Status HashJoinNode::sink(doris::RuntimeState* state, vectorized::Block* in_bloc _shared_hash_table_context->status = Status::OK(); // arena will be shared with other instances. _shared_hash_table_context->arena = _arena; - _shared_hash_table_context->blocks = _build_blocks; + _shared_hash_table_context->block = _build_block; _shared_hash_table_context->hash_table_variants = _hash_table_variants; _shared_hash_table_context->short_circuit_for_null_in_probe_side = _has_null_in_build_side; @@ -829,7 +805,7 @@ Status HashJoinNode::sink(doris::RuntimeState* state, vectorized::Block* in_bloc *_hash_table_variants, *std::static_pointer_cast<HashTableVariants>( _shared_hash_table_context->hash_table_variants)); - _build_blocks = _shared_hash_table_context->blocks; + _build_block = _shared_hash_table_context->block; if (!_shared_hash_table_context->runtime_filters.empty()) { auto ret = std::visit( @@ -862,7 +838,7 @@ Status HashJoinNode::sink(doris::RuntimeState* state, vectorized::Block* in_bloc // Since the comparison of null values is meaningless, null aware left anti join should not output null // when the build side is not empty. - if (!_build_blocks->empty() && _join_op == TJoinOp::NULL_AWARE_LEFT_ANTI_JOIN) { + if (_build_block && _join_op == TJoinOp::NULL_AWARE_LEFT_ANTI_JOIN) { _probe_ignore_null = true; } _init_short_circuit_for_probe(); @@ -961,7 +937,7 @@ void HashJoinNode::_set_build_ignore_flag(Block& block, const std::vector<int>& } } -Status HashJoinNode::_process_build_block(RuntimeState* state, Block& block, uint8_t offset) { +Status HashJoinNode::_process_build_block(RuntimeState* state, Block& block) { SCOPED_TIMER(_build_table_timer); size_t rows = block.rows(); if (UNLIKELY(rows == 0)) { @@ -1004,7 +980,7 @@ Status HashJoinNode::_process_build_block(RuntimeState* state, Block& block, uin using HashTableCtxType = std::decay_t<decltype(arg)>; ProcessHashTableBuild<HashTableCtxType, HashJoinNode> hash_table_build_process(rows, block, raw_ptrs, this, - state->batch_size(), offset, state); + state->batch_size(), state); return hash_table_build_process .template run<has_null_value, short_circuit_for_null_in_build_side>( arg, @@ -1082,7 +1058,7 @@ void HashJoinNode::_hash_table_init(RuntimeState* state) { return; } - if (!try_get_hash_map_context_fixed<PartitionedHashMap, HashCRC32, RowRefListType>( + if (!try_get_hash_map_context_fixed<JoinFixedHashMap, HashCRC32, RowRefListType>( *_hash_table_variants, _build_expr_ctxs)) { _hash_table_variants->emplace<SerializedHashTableContext<RowRefListType>>(); } @@ -1090,16 +1066,6 @@ void HashJoinNode::_hash_table_init(RuntimeState* state) { _join_op_variants, make_bool_variant(_have_other_join_conjunct)); DCHECK(!std::holds_alternative<std::monostate>(*_hash_table_variants)); - - std::visit(Overload {[&](std::monostate& arg) { - LOG(FATAL) << "FATAL: uninited hash table"; - __builtin_unreachable(); - }, - [&](auto&& arg) { - arg.hash_table->set_partitioned_threshold( - state->partitioned_hash_join_rows_threshold()); - }}, - *_hash_table_variants); } void HashJoinNode::_process_hashtable_ctx_variants_init(RuntimeState* state) { diff --git a/be/src/vec/exec/join/vhash_join_node.h b/be/src/vec/exec/join/vhash_join_node.h index 50e1567f4e2..ef5a61eae17 100644 --- a/be/src/vec/exec/join/vhash_join_node.h +++ b/be/src/vec/exec/join/vhash_join_node.h @@ -106,21 +106,18 @@ using ProfileCounter = RuntimeProfile::Counter; template <class HashTableContext, typename Parent> struct ProcessHashTableBuild { ProcessHashTableBuild(int rows, Block& acquired_block, ColumnRawPtrs& build_raw_ptrs, - Parent* parent, int batch_size, uint8_t offset, RuntimeState* state) + Parent* parent, int batch_size, RuntimeState* state) : _rows(rows), - _skip_rows(0), _acquired_block(acquired_block), _build_raw_ptrs(build_raw_ptrs), _parent(parent), _batch_size(batch_size), - _offset(offset), _state(state), _build_side_compute_hash_timer(parent->_build_side_compute_hash_timer) {} template <bool ignore_null, bool short_circuit_for_null> Status run(HashTableContext& hash_table_ctx, ConstNullMapPtr null_map, bool* has_null_key) { using KeyGetter = typename HashTableContext::State; - using Mapped = typename HashTableContext::Mapped; Defer defer {[&]() { int64_t bucket_size = hash_table_ctx.hash_table->get_buffer_size_in_cells(); @@ -132,18 +129,14 @@ struct ProcessHashTableBuild { hash_table_ctx.hash_table->get_collisions()); COUNTER_SET(_parent->_build_buckets_fill_counter, filled_bucket_size); - auto hash_table_buckets = hash_table_ctx.hash_table->get_buffer_sizes_in_cells(); std::string hash_table_buckets_info; - for (auto bucket_count : hash_table_buckets) { - hash_table_buckets_info += std::to_string(bucket_count) + ", "; - } + + hash_table_buckets_info += + std::to_string(hash_table_ctx.hash_table->get_buffer_size_in_cells()) + ", "; _parent->add_hash_buckets_info(hash_table_buckets_info); - auto hash_table_sizes = hash_table_ctx.hash_table->sizes(); hash_table_buckets_info.clear(); - for (auto table_size : hash_table_sizes) { - hash_table_buckets_info += std::to_string(table_size) + ", "; - } + hash_table_buckets_info += std::to_string(hash_table_ctx.hash_table->size()) + ", "; _parent->add_hash_buckets_filled_info(hash_table_buckets_info); }}; @@ -152,17 +145,6 @@ struct ProcessHashTableBuild { SCOPED_TIMER(_parent->_build_table_insert_timer); hash_table_ctx.hash_table->reset_resize_timer(); - // only not build_unique, we need expanse hash table before insert data - // 1. There are fewer duplicate keys, reducing the number of resize hash tables - // can improve performance to a certain extent, about 2%-5% - // 2. There are many duplicate keys, and the hash table filled bucket is far less than - // the hash table build bucket, which may waste a lot of memory. - // TODO, use the NDV expansion of the key column in the optimizer statistics - if (!_parent->build_unique()) { - RETURN_IF_CATCH_EXCEPTION(hash_table_ctx.hash_table->expanse_for_add_elem( - std::min<int>(_rows, config::hash_table_pre_expanse_max_rows))); - } - vector<int>& inserted_rows = _parent->_inserted_rows[&_acquired_block]; bool has_runtime_filter = !_parent->runtime_filter_descs().empty(); if (has_runtime_filter) { @@ -174,72 +156,24 @@ struct ProcessHashTableBuild { auto& arena = *_parent->arena(); auto old_build_arena_memory = arena.size(); - - size_t k = 0; - bool inserted = false; - auto creator = [&](const auto& ctor, auto& key, auto& origin) { - HashTableContext::try_presis_key(key, origin, arena); - inserted = true; - ctor(key, Mapped {k, _offset}); - }; - - bool build_unique = _parent->build_unique(); -#define EMPLACE_IMPL(stmt) \ - for (; k < _rows; ++k) { \ - if (k % CHECK_FRECUENCY == 0) { \ - RETURN_IF_CANCELLED(_state); \ - } \ - if constexpr (short_circuit_for_null) { \ - if ((*null_map)[k]) { \ - *has_null_key = true; \ - return Status::OK(); \ - } \ - } else if constexpr (ignore_null) { \ - if ((*null_map)[k]) { \ - *has_null_key = true; \ - continue; \ - } \ - } \ - inserted = false; \ - [[maybe_unused]] auto& mapped = \ - hash_table_ctx.lazy_emplace(key_getter, k, creator, nullptr); \ - stmt; \ - } - - if (has_runtime_filter && build_unique) { - EMPLACE_IMPL( - if (inserted) { inserted_rows.push_back(k); } else { _skip_rows++; }); - } else if (has_runtime_filter && !build_unique) { - EMPLACE_IMPL( - if (inserted) { inserted_rows.push_back(k); } else { - mapped.insert({k, _offset}, *_parent->arena()); - inserted_rows.push_back(k); - }); - } else if (!has_runtime_filter && build_unique) { - EMPLACE_IMPL(if (!inserted) { _skip_rows++; }); - } else { - EMPLACE_IMPL(if (!inserted) { mapped.insert({k, _offset}, *_parent->arena()); }); - } + hash_table_ctx.hash_table->build(hash_table_ctx.keys, hash_table_ctx.hash_values.data(), + _rows); _parent->_build_rf_cardinality += inserted_rows.size(); _parent->_build_arena_memory_usage->add(arena.size() - old_build_arena_memory); COUNTER_UPDATE(_parent->_build_table_expanse_timer, hash_table_ctx.hash_table->get_resize_timer_value()); - COUNTER_UPDATE(_parent->_build_table_convert_timer, - hash_table_ctx.hash_table->get_convert_timer_value()); return Status::OK(); } private: const int _rows; - int _skip_rows; Block& _acquired_block; ColumnRawPtrs& _build_raw_ptrs; Parent* _parent; int _batch_size; - uint8_t _offset; RuntimeState* _state; ProfileCounter* _build_side_compute_hash_timer; @@ -325,8 +259,6 @@ using HashTableIteratorVariants = std::variant<std::monostate, ForwardIterator<RowRefList>, ForwardIterator<RowRefListWithFlag>, ForwardIterator<RowRefListWithFlags>>; -static constexpr auto HASH_JOIN_MAX_BUILD_BLOCK_COUNT = 128; - class HashJoinNode final : public VJoinNodeBase { public: HashJoinNode(ObjectPool* pool, const TPlanNode& tnode, const DescriptorTbl& descs); @@ -369,7 +301,7 @@ public: bool have_other_join_conjunct() const { return _have_other_join_conjunct; } bool is_right_semi_anti() const { return _is_right_semi_anti; } bool is_outer_join() const { return _is_outer_join; } - std::shared_ptr<std::vector<Block>> build_blocks() const { return _build_blocks; } + std::shared_ptr<Block> build_block() const { return _build_block; } std::vector<bool>* left_output_slot_flags() { return &_left_output_slot_flags; } std::vector<bool>* right_output_slot_flags() { return &_right_output_slot_flags; } bool* has_null_in_build_side() { return &_has_null_in_build_side; } @@ -390,16 +322,16 @@ private: _short_circuit_for_probe = (_has_null_in_build_side && _join_op == TJoinOp::NULL_AWARE_LEFT_ANTI_JOIN && !_is_mark_join) || - (_build_blocks->empty() && _join_op == TJoinOp::INNER_JOIN && !_is_mark_join) || - (_build_blocks->empty() && _join_op == TJoinOp::LEFT_SEMI_JOIN && !_is_mark_join) || - (_build_blocks->empty() && _join_op == TJoinOp::RIGHT_OUTER_JOIN) || - (_build_blocks->empty() && _join_op == TJoinOp::RIGHT_SEMI_JOIN) || - (_build_blocks->empty() && _join_op == TJoinOp::RIGHT_ANTI_JOIN); + (!_build_block && _join_op == TJoinOp::INNER_JOIN && !_is_mark_join) || + (!_build_block && _join_op == TJoinOp::LEFT_SEMI_JOIN && !_is_mark_join) || + (!_build_block && _join_op == TJoinOp::RIGHT_OUTER_JOIN) || + (!_build_block && _join_op == TJoinOp::RIGHT_SEMI_JOIN) || + (!_build_block && _join_op == TJoinOp::RIGHT_ANTI_JOIN); //when build table rows is 0 and not have other_join_conjunct and not _is_mark_join and join type is one of LEFT_OUTER_JOIN/FULL_OUTER_JOIN/LEFT_ANTI_JOIN //we could get the result is probe table + null-column(if need output) _empty_right_table_need_probe_dispose = - (_build_blocks->empty() && !_have_other_join_conjunct && !_is_mark_join) && + (!_build_block && !_have_other_join_conjunct && !_is_mark_join) && (_join_op == TJoinOp::LEFT_OUTER_JOIN || _join_op == TJoinOp::FULL_OUTER_JOIN || _join_op == TJoinOp::LEFT_ANTI_JOIN); } @@ -467,7 +399,7 @@ private: HashTableIteratorVariants _outer_join_pull_visited_iter; HashTableIteratorVariants _probe_row_match_iter; - std::shared_ptr<std::vector<Block>> _build_blocks; + std::shared_ptr<Block> _build_block; Block _probe_block; ColumnRawPtrs _probe_columns; ColumnUInt8::MutablePtr _null_map_column; @@ -501,7 +433,7 @@ private: Status _materialize_build_side(RuntimeState* state) override; - Status _process_build_block(RuntimeState* state, Block& block, uint8_t offset); + Status _process_build_block(RuntimeState* state, Block& block); Status _do_evaluate(Block& block, VExprContextSPtrs& exprs, RuntimeProfile::Counter& expr_call_timer, std::vector<int>& res_col_ids); diff --git a/be/src/vec/exec/vset_operation_node.cpp b/be/src/vec/exec/vset_operation_node.cpp index d284385b8ed..bb866025df4 100644 --- a/be/src/vec/exec/vset_operation_node.cpp +++ b/be/src/vec/exec/vset_operation_node.cpp @@ -60,7 +60,6 @@ VSetOperationNode<is_intersect>::VSetOperationNode(ObjectPool* pool, const TPlan : ExecNode(pool, tnode, descs), _valid_element_in_hash_tbl(0), _mem_used(0), - _build_block_index(0), _build_finished(false) { _hash_table_variants = std::make_unique<HashTableVariants>(); } @@ -215,7 +214,7 @@ void VSetOperationNode<is_intersect>::hash_table_init() { } return; } - if (!try_get_hash_map_context_fixed<PartitionedHashMap, HashCRC32, RowRefListWithFlags>( + if (!try_get_hash_map_context_fixed<JoinFixedHashMap, HashCRC32, RowRefListWithFlags>( *_hash_table_variants, _child_expr_lists[0])) { _hash_table_variants->emplace<SerializedHashTableContext<RowRefListWithFlags>>(); } @@ -223,36 +222,45 @@ void VSetOperationNode<is_intersect>::hash_table_init() { template <bool is_intersect> Status VSetOperationNode<is_intersect>::sink(RuntimeState* state, Block* block, bool eos) { - constexpr static auto BUILD_BLOCK_MAX_SIZE = 4 * 1024UL * 1024UL * 1024UL; - if (block->rows() != 0) { _mem_used += block->allocated_bytes(); RETURN_IF_ERROR(_mutable_block.merge(*block)); } - if (eos || _mutable_block.allocated_bytes() >= BUILD_BLOCK_MAX_SIZE) { - _build_blocks.emplace_back(_mutable_block.to_block()); - RETURN_IF_ERROR( - process_build_block(_build_blocks[_build_block_index], _build_block_index, state)); + if (block->rows() != 0) { + if (_build_block.empty()) { + RETURN_IF_ERROR(_mutable_block.merge(*(block->create_same_struct_block(0, false)))); + } + RETURN_IF_ERROR(_mutable_block.merge(*block)); + if (_mutable_block.rows() > std::numeric_limits<uint32_t>::max()) { + return Status::NotSupported( + "Hash join do not support build table rows" + " over:" + + std::to_string(std::numeric_limits<uint32_t>::max())); + } + } + + if (eos) { + if (!_mutable_block.empty()) { + _build_block = _mutable_block.to_block(); + } + RETURN_IF_ERROR(process_build_block(_build_block, state)); _mutable_block.clear(); - ++_build_block_index; - if (eos) { - if constexpr (is_intersect) { - _valid_element_in_hash_tbl = 0; - } else { - std::visit( - [&](auto&& arg) { - using HashTableCtxType = std::decay_t<decltype(arg)>; - if constexpr (!std::is_same_v<HashTableCtxType, std::monostate>) { - _valid_element_in_hash_tbl = arg.hash_table->size(); - } - }, - *_hash_table_variants); - } - _build_finished = true; - _can_read = _children.size() == 1; + if constexpr (is_intersect) { + _valid_element_in_hash_tbl = 0; + } else { + std::visit( + [&](auto&& arg) { + using HashTableCtxType = std::decay_t<decltype(arg)>; + if constexpr (!std::is_same_v<HashTableCtxType, std::monostate>) { + _valid_element_in_hash_tbl = arg.hash_table->size(); + } + }, + *_hash_table_variants); } + _build_finished = true; + _can_read = _children.size() == 1; } return Status::OK(); } @@ -304,8 +312,7 @@ Status VSetOperationNode<is_intersect>::hash_table_build(RuntimeState* state) { } template <bool is_intersect> -Status VSetOperationNode<is_intersect>::process_build_block(Block& block, uint8_t offset, - RuntimeState* state) { +Status VSetOperationNode<is_intersect>::process_build_block(Block& block, RuntimeState* state) { size_t rows = block.rows(); if (rows == 0) { return Status::OK(); @@ -320,7 +327,7 @@ Status VSetOperationNode<is_intersect>::process_build_block(Block& block, uint8_ using HashTableCtxType = std::decay_t<decltype(arg)>; if constexpr (!std::is_same_v<HashTableCtxType, std::monostate>) { HashTableBuild<HashTableCtxType, is_intersect> hash_table_build_process( - this, rows, raw_ptrs, offset, state); + this, rows, raw_ptrs, state); st = hash_table_build_process(arg, _arena); } else { LOG(FATAL) << "FATAL: uninited hash table"; @@ -336,8 +343,8 @@ void VSetOperationNode<is_intersect>::add_result_columns(RowRefListWithFlags& va int& block_size) { auto it = value.begin(); for (auto idx = _build_col_idx.begin(); idx != _build_col_idx.end(); ++idx) { - auto& column = *_build_blocks[it->block_offset].get_by_position(idx->first).column; - if (_mutable_cols[idx->second]->is_nullable() xor column.is_nullable()) { + const auto& column = *_build_block.get_by_position(idx->first).column; + if (_mutable_cols[idx->second]->is_nullable() ^ column.is_nullable()) { DCHECK(_mutable_cols[idx->second]->is_nullable()); ((ColumnNullable*)(_mutable_cols[idx->second].get())) ->insert_from_not_nullable(column, it->row_num); @@ -508,10 +515,6 @@ void VSetOperationNode<is_intersect>::debug_string(int indentation_level, template <bool is_intersect> void VSetOperationNode<is_intersect>::release_mem() { _hash_table_variants = nullptr; - - std::vector<Block> tmp_build_blocks; - _build_blocks.swap(tmp_build_blocks); - _probe_block.clear(); } diff --git a/be/src/vec/exec/vset_operation_node.h b/be/src/vec/exec/vset_operation_node.h index ff016469f49..8ca04f2f71f 100644 --- a/be/src/vec/exec/vset_operation_node.h +++ b/be/src/vec/exec/vset_operation_node.h @@ -82,7 +82,7 @@ private: //It's time to abstract out the same methods and provide them directly to others; void hash_table_init(); Status hash_table_build(RuntimeState* state); - Status process_build_block(Block& block, uint8_t offset, RuntimeState* state); + Status process_build_block(Block& block, RuntimeState* state); Status extract_build_column(Block& block, ColumnRawPtrs& raw_ptrs); Status extract_probe_column(Block& block, ColumnRawPtrs& raw_ptrs, int child_id); void refresh_hash_table(); @@ -115,11 +115,10 @@ private: //record insert column id during probe std::vector<uint16_t> _probe_column_inserted_id; - std::vector<Block> _build_blocks; + Block _build_block; Block _probe_block; ColumnRawPtrs _probe_columns; std::vector<MutableColumnPtr> _mutable_cols; - int _build_block_index; bool _build_finished; std::vector<bool> _probe_finished_children_index; MutableBlock _mutable_block; diff --git a/be/src/vec/runtime/shared_hash_table_controller.h b/be/src/vec/runtime/shared_hash_table_controller.h index 6b31cf07ec9..e1c01709042 100644 --- a/be/src/vec/runtime/shared_hash_table_controller.h +++ b/be/src/vec/runtime/shared_hash_table_controller.h @@ -53,18 +53,15 @@ struct SharedRuntimeFilterContext { struct SharedHashTableContext { SharedHashTableContext() - : hash_table_variants(nullptr), - blocks(new std::vector<vectorized::Block>()), - signaled(false), - short_circuit_for_null_in_probe_side(false) {} + : hash_table_variants(nullptr), block(std::make_shared<vectorized::Block>()) {} Status status; std::shared_ptr<Arena> arena; std::shared_ptr<void> hash_table_variants; - std::shared_ptr<std::vector<Block>> blocks; + std::shared_ptr<Block> block; std::map<int, SharedRuntimeFilterContext> runtime_filters; - bool signaled; - bool short_circuit_for_null_in_probe_side; + bool signaled {}; + bool short_circuit_for_null_in_probe_side {}; }; using SharedHashTableContextPtr = std::shared_ptr<SharedHashTableContext>; --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org For additional commands, e-mail: commits-h...@doris.apache.org