This is an automated email from the ASF dual-hosted git repository. panxiaolei pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/master by this push: new f3ff86ce0d9 [Refactor](join) some refactor of join process (#44346) f3ff86ce0d9 is described below commit f3ff86ce0d904a5ece8624da26621c608a89d5b6 Author: Pxl <x...@selectdb.com> AuthorDate: Thu Jan 2 17:54:53 2025 +0800 [Refactor](join) some refactor of join process (#44346) ### What problem does this PR solve? remove need_judge_null variable --- be/src/pipeline/dependency.h | 1 - be/src/pipeline/exec/hashjoin_build_sink.cpp | 7 - be/src/pipeline/exec/hashjoin_build_sink.h | 4 +- be/src/pipeline/exec/hashjoin_probe_operator.cpp | 27 +-- be/src/pipeline/exec/hashjoin_probe_operator.h | 2 - .../pipeline/exec/join/process_hash_table_probe.h | 12 +- .../exec/join/process_hash_table_probe_impl.h | 167 ++++++------- be/src/vec/common/hash_table/join_hash_table.h | 265 +++++++++++---------- 8 files changed, 227 insertions(+), 258 deletions(-) diff --git a/be/src/pipeline/dependency.h b/be/src/pipeline/dependency.h index ecbd49a5647..fd179cdfd0a 100644 --- a/be/src/pipeline/dependency.h +++ b/be/src/pipeline/dependency.h @@ -612,7 +612,6 @@ struct HashJoinSharedState : public JoinSharedState { size_t build_exprs_size = 0; std::shared_ptr<vectorized::Block> build_block; std::shared_ptr<std::vector<uint32_t>> build_indexes_null; - bool probe_ignore_null = false; }; struct PartitionedHashJoinSharedState diff --git a/be/src/pipeline/exec/hashjoin_build_sink.cpp b/be/src/pipeline/exec/hashjoin_build_sink.cpp index 6aca4897367..016ea494062 100644 --- a/be/src/pipeline/exec/hashjoin_build_sink.cpp +++ b/be/src/pipeline/exec/hashjoin_build_sink.cpp @@ -613,13 +613,6 @@ Status HashJoinBuildSinkOperatorX::sink(RuntimeState* state, vectorized::Block* if (eos) { local_state._eos = true; local_state.init_short_circuit_for_probe(); - // Since the comparison of null values is meaningless, null aware left anti/semi join should not output null - // when the build side is not empty. - if (local_state._shared_state->build_block && - (_join_op == TJoinOp::NULL_AWARE_LEFT_ANTI_JOIN || - _join_op == TJoinOp::NULL_AWARE_LEFT_SEMI_JOIN)) { - local_state._shared_state->probe_ignore_null = true; - } local_state._dependency->set_ready_to_read(); } diff --git a/be/src/pipeline/exec/hashjoin_build_sink.h b/be/src/pipeline/exec/hashjoin_build_sink.h index 906fc5f9bd4..91465380d70 100644 --- a/be/src/pipeline/exec/hashjoin_build_sink.h +++ b/be/src/pipeline/exec/hashjoin_build_sink.h @@ -201,10 +201,12 @@ struct ProcessHashTableBuild { SCOPED_TIMER(_parent->_build_table_insert_timer); hash_table_ctx.hash_table->template prepare_build<JoinOpType>(_rows, _batch_size, *has_null_key); + // In order to make the null keys equal when using single null eq, all null keys need to be set to default value. if (_build_raw_ptrs.size() == 1 && null_map) { _build_raw_ptrs[0]->assume_mutable()->replace_column_null_data(null_map->data()); } + hash_table_ctx.init_serialized_keys(_build_raw_ptrs, _rows, null_map ? null_map->data() : nullptr, true, true, hash_table_ctx.hash_table->get_bucket_size()); @@ -213,7 +215,7 @@ struct ProcessHashTableBuild { if ((JoinOpType == TJoinOp::NULL_AWARE_LEFT_ANTI_JOIN || JoinOpType == TJoinOp::NULL_AWARE_LEFT_SEMI_JOIN) && with_other_conjuncts) { - //null aware join with other conjuncts + // null aware join with other conjuncts keep_null_key = true; } else if (_parent->_shared_state->is_null_safe_eq_join.size() == 1 && _parent->_shared_state->is_null_safe_eq_join[0]) { diff --git a/be/src/pipeline/exec/hashjoin_probe_operator.cpp b/be/src/pipeline/exec/hashjoin_probe_operator.cpp index 37ccd6206f3..1f30a6183a2 100644 --- a/be/src/pipeline/exec/hashjoin_probe_operator.cpp +++ b/be/src/pipeline/exec/hashjoin_probe_operator.cpp @@ -39,7 +39,6 @@ Status HashJoinProbeLocalState::init(RuntimeState* state, LocalStateInfo& info) SCOPED_TIMER(exec_time_counter()); SCOPED_TIMER(_init_timer); auto& p = _parent->cast<HashJoinProbeOperatorX>(); - _shared_state->probe_ignore_null = p._probe_ignore_null; _probe_expr_ctxs.resize(p._probe_expr_ctxs.size()); for (size_t i = 0; i < _probe_expr_ctxs.size(); i++) { RETURN_IF_ERROR(p._probe_expr_ctxs[i]->clone(state, _probe_expr_ctxs[i])); @@ -287,12 +286,12 @@ Status HashJoinProbeOperatorX::pull(doris::RuntimeState* state, vectorized::Bloc if (local_state._probe_index < local_state._probe_block.rows()) { DCHECK(local_state._has_set_need_null_map_for_probe); std::visit( - [&](auto&& arg, auto&& process_hashtable_ctx, auto need_judge_null) { + [&](auto&& arg, auto&& process_hashtable_ctx) { using HashTableProbeType = std::decay_t<decltype(process_hashtable_ctx)>; if constexpr (!std::is_same_v<HashTableProbeType, std::monostate>) { using HashTableCtxType = std::decay_t<decltype(arg)>; if constexpr (!std::is_same_v<HashTableCtxType, std::monostate>) { - st = process_hashtable_ctx.template process<need_judge_null>( + st = process_hashtable_ctx.template process( arg, local_state._null_map_column ? &local_state._null_map_column->get_data() @@ -308,9 +307,7 @@ Status HashJoinProbeOperatorX::pull(doris::RuntimeState* state, vectorized::Bloc } }, local_state._shared_state->hash_table_variants->method_variant, - *local_state._process_hashtable_ctx_variants, - vectorized::make_bool_variant(local_state._need_null_map_for_probe && - local_state._shared_state->probe_ignore_null)); + *local_state._process_hashtable_ctx_variants); } else if (local_state._probe_eos) { if (_is_right_semi_anti || (_is_outer_join && _join_op != TJoinOp::LEFT_OUTER_JOIN)) { std::visit( @@ -493,29 +490,13 @@ Status HashJoinProbeOperatorX::push(RuntimeState* state, vectorized::Block* inpu Status HashJoinProbeOperatorX::init(const TPlanNode& tnode, RuntimeState* state) { RETURN_IF_ERROR(JoinProbeOperatorX<HashJoinProbeLocalState>::init(tnode, state)); DCHECK(tnode.__isset.hash_join_node); - const bool probe_dispose_null = - _match_all_probe || _build_unique || _join_op == TJoinOp::NULL_AWARE_LEFT_ANTI_JOIN || - _join_op == TJoinOp::NULL_AWARE_LEFT_SEMI_JOIN || _join_op == TJoinOp::LEFT_ANTI_JOIN || - _join_op == TJoinOp::LEFT_SEMI_JOIN; const std::vector<TEqJoinCondition>& eq_join_conjuncts = tnode.hash_join_node.eq_join_conjuncts; - std::vector<bool> probe_not_ignore_null(eq_join_conjuncts.size()); - size_t conjuncts_index = 0; for (const auto& eq_join_conjunct : eq_join_conjuncts) { vectorized::VExprContextSPtr ctx; RETURN_IF_ERROR(vectorized::VExpr::create_expr_tree(eq_join_conjunct.left, ctx)); _probe_expr_ctxs.push_back(ctx); - bool null_aware = eq_join_conjunct.__isset.opcode && - eq_join_conjunct.opcode == TExprOpcode::EQ_FOR_NULL && - (eq_join_conjunct.right.nodes[0].is_nullable || - eq_join_conjunct.left.nodes[0].is_nullable); - probe_not_ignore_null[conjuncts_index] = - null_aware || - (_probe_expr_ctxs.back()->root()->is_nullable() && probe_dispose_null); - conjuncts_index++; - } - for (size_t i = 0; i < _probe_expr_ctxs.size(); ++i) { - _probe_ignore_null |= !probe_not_ignore_null[i]; } + if (tnode.hash_join_node.__isset.other_join_conjuncts && !tnode.hash_join_node.other_join_conjuncts.empty()) { RETURN_IF_ERROR(vectorized::VExpr::create_expr_trees( diff --git a/be/src/pipeline/exec/hashjoin_probe_operator.h b/be/src/pipeline/exec/hashjoin_probe_operator.h index 1bdb9d13347..55a8835f55b 100644 --- a/be/src/pipeline/exec/hashjoin_probe_operator.h +++ b/be/src/pipeline/exec/hashjoin_probe_operator.h @@ -92,7 +92,6 @@ private: bool _ready_probe = false; bool _probe_eos = false; int _last_probe_match; - // For mark join, last probe index of null mark int _last_probe_null_mark; @@ -174,7 +173,6 @@ private: // probe expr vectorized::VExprContextSPtrs _probe_expr_ctxs; - bool _probe_ignore_null = false; vectorized::DataTypes _right_table_data_types; vectorized::DataTypes _left_table_data_types; diff --git a/be/src/pipeline/exec/join/process_hash_table_probe.h b/be/src/pipeline/exec/join/process_hash_table_probe.h index 91fd82f0644..052ed5875ae 100644 --- a/be/src/pipeline/exec/join/process_hash_table_probe.h +++ b/be/src/pipeline/exec/join/process_hash_table_probe.h @@ -22,6 +22,7 @@ #include "vec/columns/column.h" #include "vec/columns/columns_number.h" #include "vec/common/arena.h" +#include "vec/common/hash_table/join_hash_table.h" namespace doris { namespace vectorized { @@ -54,7 +55,7 @@ struct ProcessHashTableProbe { int last_probe_index, bool all_match_one, bool have_other_join_conjunct); - template <bool need_judge_null, typename HashTableType> + template <typename HashTableType> Status process(HashTableType& hash_table_ctx, ConstNullMapPtr null_map, vectorized::MutableBlock& mutable_block, vectorized::Block* output_block, uint32_t probe_rows, bool is_mark_join, bool have_other_join_conjunct); @@ -63,9 +64,8 @@ struct ProcessHashTableProbe { // the output block struct is same with mutable block. we can do more opt on it and simplify // the logic of probe // TODO: opt the visited here to reduce the size of hash table - template <bool need_judge_null, typename HashTableType, bool with_other_conjuncts, - bool is_mark_join> - Status do_process(HashTableType& hash_table_ctx, ConstNullMapPtr null_map, + template <typename HashTableType, bool with_other_conjuncts, bool is_mark_join> + Status do_process(HashTableType& hash_table_ctx, const uint8_t* null_map, vectorized::MutableBlock& mutable_block, vectorized::Block* output_block, uint32_t probe_rows); // In the presence of other join conjunct, the process of join become more complicated. @@ -76,12 +76,12 @@ struct ProcessHashTableProbe { bool has_null_in_build_side); template <bool with_other_conjuncts> - Status do_mark_join_conjuncts(vectorized::Block* output_block, size_t hash_table_bucket_size); + Status do_mark_join_conjuncts(vectorized::Block* output_block, const uint8_t* null_map); template <typename HashTableType> typename HashTableType::State _init_probe_side(HashTableType& hash_table_ctx, size_t probe_rows, bool with_other_join_conjuncts, - const uint8_t* null_map, bool need_judge_null); + const uint8_t* null_map); // Process full outer join/ right join / right semi/anti join to output the join result // in hash table diff --git a/be/src/pipeline/exec/join/process_hash_table_probe_impl.h b/be/src/pipeline/exec/join/process_hash_table_probe_impl.h index 24a9a7f6743..f97a4513816 100644 --- a/be/src/pipeline/exec/join/process_hash_table_probe_impl.h +++ b/be/src/pipeline/exec/join/process_hash_table_probe_impl.h @@ -27,6 +27,7 @@ #include "util/simd/bits.h" #include "vec/columns/column_filter_helper.h" #include "vec/columns/column_nullable.h" +#include "vec/common/hash_table/join_hash_table.h" #include "vec/exprs/vexpr_context.h" namespace doris::pipeline { @@ -160,14 +161,14 @@ template <int JoinOpType> template <typename HashTableType> typename HashTableType::State ProcessHashTableProbe<JoinOpType>::_init_probe_side( HashTableType& hash_table_ctx, size_t probe_rows, bool with_other_join_conjuncts, - const uint8_t* null_map, bool need_judge_null) { + const uint8_t* null_map) { // may over batch size 1 for some outer join case _probe_indexs.resize(_batch_size + 1); _build_indexs.resize(_batch_size + 1); - if constexpr (JoinOpType == TJoinOp::NULL_AWARE_LEFT_ANTI_JOIN || - JoinOpType == TJoinOp::NULL_AWARE_LEFT_SEMI_JOIN) { + if ((JoinOpType == TJoinOp::NULL_AWARE_LEFT_ANTI_JOIN || + JoinOpType == TJoinOp::NULL_AWARE_LEFT_SEMI_JOIN) && + with_other_join_conjuncts) { _null_flags.resize(_batch_size + 1); - memset(_null_flags.data(), 0, _batch_size + 1); } if (!_parent->_ready_probe) { @@ -177,10 +178,10 @@ typename HashTableType::State ProcessHashTableProbe<JoinOpType>::_init_probe_sid if (_parent->_probe_columns.size() == 1 && null_map) { _parent->_probe_columns[0]->assume_mutable()->replace_column_null_data(null_map); } + hash_table_ctx.init_serialized_keys(_parent->_probe_columns, probe_rows, null_map, true, false, hash_table_ctx.hash_table->get_bucket_size()); - hash_table_ctx.hash_table->pre_build_idxs(hash_table_ctx.bucket_nums, - need_judge_null ? null_map : nullptr); + hash_table_ctx.hash_table->pre_build_idxs(hash_table_ctx.bucket_nums); int64_t arena_memory_usage = hash_table_ctx.serialized_keys_size(false); COUNTER_SET(_parent->_probe_arena_memory_usage, arena_memory_usage); COUNTER_UPDATE(_parent->_memory_used_counter, arena_memory_usage); @@ -190,10 +191,9 @@ typename HashTableType::State ProcessHashTableProbe<JoinOpType>::_init_probe_sid } template <int JoinOpType> -template <bool need_judge_null, typename HashTableType, bool with_other_conjuncts, - bool is_mark_join> +template <typename HashTableType, bool with_other_conjuncts, bool is_mark_join> Status ProcessHashTableProbe<JoinOpType>::do_process(HashTableType& hash_table_ctx, - vectorized::ConstNullMapPtr null_map, + const uint8_t* null_map, vectorized::MutableBlock& mutable_block, vectorized::Block* output_block, uint32_t probe_rows) { @@ -204,31 +204,22 @@ Status ProcessHashTableProbe<JoinOpType>::do_process(HashTableType& hash_table_c auto& probe_index = _parent->_probe_index; auto& build_index = _parent->_build_index; auto last_probe_index = probe_index; - { SCOPED_TIMER(_init_probe_side_timer); - _init_probe_side<HashTableType>( - hash_table_ctx, probe_rows, with_other_conjuncts, - null_map ? null_map->data() : nullptr, - need_judge_null && - (JoinOpType == doris::TJoinOp::LEFT_ANTI_JOIN || - JoinOpType == doris::TJoinOp::LEFT_SEMI_JOIN || - JoinOpType == doris::TJoinOp::NULL_AWARE_LEFT_ANTI_JOIN || - (is_mark_join && JoinOpType != doris::TJoinOp::RIGHT_SEMI_JOIN))); + _init_probe_side<HashTableType>(hash_table_ctx, probe_rows, with_other_conjuncts, null_map); } auto& mcol = mutable_block.mutable_columns(); - const bool has_mark_join_conjunct = !_parent->_mark_join_conjuncts.empty(); uint32_t current_offset = 0; - if constexpr ((JoinOpType == doris::TJoinOp::NULL_AWARE_LEFT_ANTI_JOIN || - JoinOpType == doris::TJoinOp::NULL_AWARE_LEFT_SEMI_JOIN) && - with_other_conjuncts) { + if ((JoinOpType == TJoinOp::NULL_AWARE_LEFT_ANTI_JOIN || + JoinOpType == TJoinOp::NULL_AWARE_LEFT_SEMI_JOIN) && + with_other_conjuncts) { SCOPED_TIMER(_search_hashtable_timer); /// If `_build_index_for_null_probe_key` is not zero, it means we are in progress of handling probe null key. if (_build_index_for_null_probe_key) { - DCHECK_EQ(build_index, hash_table_ctx.hash_table->get_bucket_size()); + DCHECK(null_map && null_map[probe_index]); current_offset = _process_probe_null_key(probe_index); if (!_build_index_for_null_probe_key) { probe_index++; @@ -239,13 +230,13 @@ Status ProcessHashTableProbe<JoinOpType>::do_process(HashTableType& hash_table_c hash_table_ctx.hash_table->find_null_aware_with_other_conjuncts( hash_table_ctx.keys, hash_table_ctx.bucket_nums.data(), probe_index, build_index, probe_rows, _probe_indexs.data(), _build_indexs.data(), - _null_flags.data(), _picking_null_keys); + _null_flags.data(), _picking_null_keys, null_map); probe_index = new_probe_idx; build_index = new_build_idx; current_offset = new_current_offset; _picking_null_keys = picking_null_keys; - if (build_index == hash_table_ctx.hash_table->get_bucket_size()) { + if (null_map && null_map[probe_index]) { _build_index_for_null_probe_key = 1; if (current_offset == 0) { current_offset = _process_probe_null_key(probe_index); @@ -259,11 +250,11 @@ Status ProcessHashTableProbe<JoinOpType>::do_process(HashTableType& hash_table_c } else { SCOPED_TIMER(_search_hashtable_timer); auto [new_probe_idx, new_build_idx, new_current_offset] = - hash_table_ctx.hash_table->template find_batch<JoinOpType, with_other_conjuncts, - is_mark_join, need_judge_null>( + hash_table_ctx.hash_table->template find_batch<JoinOpType>( hash_table_ctx.keys, hash_table_ctx.bucket_nums.data(), probe_index, build_index, cast_set<int32_t>(probe_rows), _probe_indexs.data(), - _probe_visited, _build_indexs.data(), has_mark_join_conjunct); + _probe_visited, _build_indexs.data(), null_map, with_other_conjuncts, + is_mark_join, !_parent->_mark_join_conjuncts.empty()); probe_index = new_probe_idx; build_index = new_build_idx; current_offset = new_current_offset; @@ -296,8 +287,13 @@ Status ProcessHashTableProbe<JoinOpType>::do_process(HashTableType& hash_table_c output_block->swap(mutable_block.to_block()); if constexpr (is_mark_join && JoinOpType != TJoinOp::RIGHT_SEMI_JOIN) { - return do_mark_join_conjuncts<with_other_conjuncts>( - output_block, hash_table_ctx.hash_table->get_bucket_size()); + bool ignore_null_map = + (JoinOpType == TJoinOp::NULL_AWARE_LEFT_ANTI_JOIN || + JoinOpType == TJoinOp::NULL_AWARE_LEFT_SEMI_JOIN) && + hash_table_ctx.hash_table + ->empty_build_side(); // empty build side will return false to instead null + return do_mark_join_conjuncts<with_other_conjuncts>(output_block, + ignore_null_map ? nullptr : null_map); } else if constexpr (with_other_conjuncts) { return do_other_join_conjuncts(output_block, hash_table_ctx.hash_table->get_visited(), hash_table_ctx.hash_table->has_null_key()); @@ -366,7 +362,7 @@ uint32_t ProcessHashTableProbe<JoinOpType>::_process_probe_null_key(uint32_t pro template <int JoinOpType> template <bool with_other_conjuncts> Status ProcessHashTableProbe<JoinOpType>::do_mark_join_conjuncts(vectorized::Block* output_block, - size_t hash_table_bucket_size) { + const uint8_t* null_map) { DCHECK(JoinOpType == TJoinOp::LEFT_ANTI_JOIN || JoinOpType == TJoinOp::NULL_AWARE_LEFT_ANTI_JOIN || JoinOpType == TJoinOp::LEFT_SEMI_JOIN || @@ -376,7 +372,6 @@ Status ProcessHashTableProbe<JoinOpType>::do_mark_join_conjuncts(vectorized::Blo JoinOpType == TJoinOp::NULL_AWARE_LEFT_ANTI_JOIN; constexpr bool is_null_aware_join = JoinOpType == TJoinOp::NULL_AWARE_LEFT_SEMI_JOIN || JoinOpType == TJoinOp::NULL_AWARE_LEFT_ANTI_JOIN; - const auto row_count = output_block->rows(); if (!row_count) { return Status::OK(); @@ -387,49 +382,45 @@ Status ProcessHashTableProbe<JoinOpType>::do_mark_join_conjuncts(vectorized::Blo auto& mark_column = assert_cast<vectorized::ColumnNullable&>(*mark_column_mutable); vectorized::IColumn::Filter& filter = assert_cast<vectorized::ColumnUInt8&>(mark_column.get_nested_column()).get_data(); + RETURN_IF_ERROR( + vectorized::VExprContext::execute_conjuncts(_parent->_mark_join_conjuncts, output_block, + mark_column.get_null_map_column(), filter)); + uint8_t* mark_filter_data = filter.data(); + uint8_t* mark_null_map = mark_column.get_null_map_data().data(); - if (_parent->_mark_join_conjuncts.empty()) { + if (is_null_aware_join) { // For null aware anti/semi join, if the equal conjuncts was not matched and the build side has null value, // the result should be null. Like: // select 4 not in (2, 3, null) => null, select 4 not in (2, 3) => true // select 4 in (2, 3, null) => null, select 4 in (2, 3) => false - const bool should_be_null_if_build_side_has_null = *_has_null_in_build_side; - - mark_column.resize(row_count); - auto* filter_data = assert_cast<vectorized::ColumnUInt8&>(mark_column.get_nested_column()) - .get_data() - .data(); - auto* mark_null_map = mark_column.get_null_map_data().data(); - int last_probe_matched = -1; for (size_t i = 0; i != row_count; ++i) { - filter_data[i] = _build_indexs[i] != 0 && _build_indexs[i] != hash_table_bucket_size; - if constexpr (is_null_aware_join) { - if constexpr (with_other_conjuncts) { - mark_null_map[i] = _null_flags[i]; - } else { - if (filter_data[i]) { - last_probe_matched = _probe_indexs[i]; - mark_null_map[i] = false; - } else if (_build_indexs[i] == 0) { - mark_null_map[i] = should_be_null_if_build_side_has_null && - last_probe_matched != _probe_indexs[i]; - } else if (_build_indexs[i] == hash_table_bucket_size) { - mark_null_map[i] = true; - } + mark_filter_data[i] = _build_indexs[i] != 0; + } + + if constexpr (with_other_conjuncts) { + // _null_flags is true means build or probe side of the row is null + memcpy(mark_null_map, _null_flags.data(), row_count); + } else { + if (null_map) { + // probe side of the row is null, so the mark sign should also be null. + for (size_t i = 0; i != row_count; ++i) { + mark_null_map[i] |= null_map[_probe_indexs[i]]; + } + } + if (!with_other_conjuncts && *_has_null_in_build_side) { + // _has_null_in_build_side will change false to null when row not matched + for (size_t i = 0; i != row_count; ++i) { + mark_null_map[i] |= _build_indexs[i] == 0; } } - } - if constexpr (!is_null_aware_join) { - memset(mark_null_map, 0, row_count); } } else { - RETURN_IF_ERROR(vectorized::VExprContext::execute_conjuncts( - _parent->_mark_join_conjuncts, output_block, mark_column.get_null_map_column(), - filter)); + // for non null aware join, build_indexs is 0 which means there is no match + // sometimes null will be returned in conjunct, but it should not actually be null. + for (size_t i = 0; i != row_count; ++i) { + mark_null_map[i] &= _build_indexs[i] != 0; + } } - auto* mark_null_map = mark_column.get_null_map_data().data(); - - auto* mark_filter_data = filter.data(); if constexpr (with_other_conjuncts) { vectorized::IColumn::Filter other_conjunct_filter(row_count, 1); @@ -453,32 +444,20 @@ Status ProcessHashTableProbe<JoinOpType>::do_mark_join_conjuncts(vectorized::Blo auto filter_column = vectorized::ColumnUInt8::create(row_count, 0); auto* __restrict filter_map = filter_column->get_data().data(); - - /** - * Here need `!with_other_conjuncts` be true, - * because null aware join with other join conjuncts will process the `mark_null_map` after the - * other join conjuncts are executed. - */ - const bool should_be_null_if_build_side_has_null = - *_has_null_in_build_side && is_null_aware_join && !with_other_conjuncts; for (size_t i = 0; i != row_count; ++i) { - bool not_matched_before = _parent->_last_probe_match != _probe_indexs[i]; + if (_parent->_last_probe_match == _probe_indexs[i]) { + continue; + } if (_build_indexs[i] == 0) { bool has_null_mark_value = _parent->_last_probe_null_mark == _probe_indexs[i]; - if (not_matched_before) { - filter_map[i] = true; - mark_null_map[i] = has_null_mark_value || should_be_null_if_build_side_has_null; - mark_filter_data[i] = false; - } - } else { - if (mark_null_map[i]) { // is null - _parent->_last_probe_null_mark = _probe_indexs[i]; - } else { - if (mark_filter_data[i] && not_matched_before) { - _parent->_last_probe_match = _probe_indexs[i]; - filter_map[i] = true; - } - } + filter_map[i] = true; + mark_filter_data[i] = false; + mark_null_map[i] |= has_null_mark_value; + } else if (mark_null_map[i]) { + _parent->_last_probe_null_mark = _probe_indexs[i]; + } else if (mark_filter_data[i]) { + filter_map[i] = true; + _parent->_last_probe_match = _probe_indexs[i]; } } @@ -677,7 +656,7 @@ Status ProcessHashTableProbe<JoinOpType>::finish_probing(HashTableType& hash_tab } template <int JoinOpType> -template <bool need_judge_null, typename HashTableType> +template <typename HashTableType> Status ProcessHashTableProbe<JoinOpType>::process(HashTableType& hash_table_ctx, vectorized::ConstNullMapPtr null_map, vectorized::MutableBlock& mutable_block, @@ -687,9 +666,9 @@ Status ProcessHashTableProbe<JoinOpType>::process(HashTableType& hash_table_ctx, Status res; std::visit( [&](auto is_mark_join, auto have_other_join_conjunct) { - res = do_process<need_judge_null, HashTableType, have_other_join_conjunct, - is_mark_join>(hash_table_ctx, null_map, mutable_block, - output_block, probe_rows); + res = do_process<HashTableType, have_other_join_conjunct, is_mark_join>( + hash_table_ctx, null_map ? null_map->data() : nullptr, mutable_block, + output_block, probe_rows); }, vectorized::make_bool_variant(is_mark_join), vectorized::make_bool_variant(have_other_join_conjunct)); @@ -705,11 +684,7 @@ struct ExtractType<T(U)> { }; #define INSTANTIATION(JoinOpType, T) \ - template Status ProcessHashTableProbe<JoinOpType>::process<false, ExtractType<void(T)>::Type>( \ - ExtractType<void(T)>::Type & hash_table_ctx, vectorized::ConstNullMapPtr null_map, \ - vectorized::MutableBlock & mutable_block, vectorized::Block * output_block, \ - uint32_t probe_rows, bool is_mark_join, bool have_other_join_conjunct); \ - template Status ProcessHashTableProbe<JoinOpType>::process<true, ExtractType<void(T)>::Type>( \ + template Status ProcessHashTableProbe<JoinOpType>::process<ExtractType<void(T)>::Type>( \ ExtractType<void(T)>::Type & hash_table_ctx, vectorized::ConstNullMapPtr null_map, \ vectorized::MutableBlock & mutable_block, vectorized::Block * output_block, \ uint32_t probe_rows, bool is_mark_join, bool have_other_join_conjunct); \ diff --git a/be/src/vec/common/hash_table/join_hash_table.h b/be/src/vec/common/hash_table/join_hash_table.h index 59463f0e3f4..ab501d67698 100644 --- a/be/src/vec/common/hash_table/join_hash_table.h +++ b/be/src/vec/common/hash_table/join_hash_table.h @@ -21,6 +21,8 @@ #include <limits> +#include "common/exception.h" +#include "common/status.h" #include "vec/columns/column_filter_helper.h" #include "vec/common/hash_table/hash.h" #include "vec/common/hash_table/hash_table_allocator.h" @@ -70,6 +72,8 @@ public: std::vector<uint8_t>& get_visited() { return visited; } + bool empty_build_side() const { return _empty_build_side; } + void build(const Key* __restrict keys, const uint32_t* __restrict bucket_nums, size_t num_elem, bool keep_null_key) { build_keys = keys; @@ -81,63 +85,67 @@ public: if (!keep_null_key) { first[bucket_size] = 0; // index = bucket_size means null } + _keep_null_key = keep_null_key; } - template <int JoinOpType, bool with_other_conjuncts, bool is_mark_join, bool need_judge_null> + template <int JoinOpType> auto find_batch(const Key* __restrict keys, const uint32_t* __restrict build_idx_map, int probe_idx, uint32_t build_idx, int probe_rows, uint32_t* __restrict probe_idxs, bool& probe_visited, - uint32_t* __restrict build_idxs, bool has_mark_join_conjunct = false) { - if constexpr (JoinOpType == TJoinOp::NULL_AWARE_LEFT_ANTI_JOIN || - JoinOpType == TJoinOp::NULL_AWARE_LEFT_SEMI_JOIN) { - if (_empty_build_side) { - return _process_null_aware_left_half_join_for_empty_build_side<JoinOpType>( - probe_idx, probe_rows, probe_idxs, build_idxs); - } + uint32_t* __restrict build_idxs, const uint8_t* null_map, + bool with_other_conjuncts, bool is_mark_join, bool has_mark_join_conjunct) { + if ((JoinOpType == TJoinOp::NULL_AWARE_LEFT_ANTI_JOIN || + JoinOpType == TJoinOp::NULL_AWARE_LEFT_SEMI_JOIN) && + _empty_build_side) { + return _process_null_aware_left_half_join_for_empty_build_side<JoinOpType>( + probe_idx, probe_rows, probe_idxs, build_idxs); } - if constexpr (with_other_conjuncts || - (is_mark_join && JoinOpType != TJoinOp::RIGHT_SEMI_JOIN)) { - if constexpr (!with_other_conjuncts) { - constexpr bool is_null_aware_join = - JoinOpType == TJoinOp::NULL_AWARE_LEFT_ANTI_JOIN || - JoinOpType == TJoinOp::NULL_AWARE_LEFT_SEMI_JOIN; - constexpr bool is_left_half_join = JoinOpType == TJoinOp::LEFT_SEMI_JOIN || - JoinOpType == TJoinOp::LEFT_ANTI_JOIN; - - /// For null aware join or left half(semi/anti) join without other conjuncts and without - /// mark join conjunct. - /// If one row on probe side has one match in build side, we should stop searching the - /// hash table for this row. - if (is_null_aware_join || (is_left_half_join && !has_mark_join_conjunct)) { - return _find_batch_conjunct<JoinOpType, need_judge_null, true>( - keys, build_idx_map, probe_idx, build_idx, probe_rows, probe_idxs, - build_idxs); - } + if (with_other_conjuncts) { + return _find_batch_conjunct<JoinOpType, false>( + keys, build_idx_map, probe_idx, build_idx, probe_rows, probe_idxs, build_idxs); + } + + if (is_mark_join && JoinOpType != TJoinOp::RIGHT_SEMI_JOIN) { + bool is_null_aware_join = JoinOpType == TJoinOp::NULL_AWARE_LEFT_ANTI_JOIN || + JoinOpType == TJoinOp::NULL_AWARE_LEFT_SEMI_JOIN; + bool is_left_half_join = + JoinOpType == TJoinOp::LEFT_SEMI_JOIN || JoinOpType == TJoinOp::LEFT_ANTI_JOIN; + + /// For null aware join or left half(semi/anti) join without other conjuncts and without + /// mark join conjunct. + /// If one row on probe side has one match in build side, we should stop searching the + /// hash table for this row. + if (is_null_aware_join || (is_left_half_join && !has_mark_join_conjunct)) { + return _find_batch_conjunct<JoinOpType, true>(keys, build_idx_map, probe_idx, + build_idx, probe_rows, probe_idxs, + build_idxs); } - return _find_batch_conjunct<JoinOpType, need_judge_null, false>( + return _find_batch_conjunct<JoinOpType, false>( keys, build_idx_map, probe_idx, build_idx, probe_rows, probe_idxs, build_idxs); } - if constexpr (JoinOpType == TJoinOp::INNER_JOIN || JoinOpType == TJoinOp::FULL_OUTER_JOIN || - JoinOpType == TJoinOp::LEFT_OUTER_JOIN || - JoinOpType == TJoinOp::RIGHT_OUTER_JOIN) { + if (JoinOpType == TJoinOp::INNER_JOIN || JoinOpType == TJoinOp::FULL_OUTER_JOIN || + JoinOpType == TJoinOp::LEFT_OUTER_JOIN || JoinOpType == TJoinOp::RIGHT_OUTER_JOIN) { return _find_batch_inner_outer_join<JoinOpType>(keys, build_idx_map, probe_idx, build_idx, probe_rows, probe_idxs, probe_visited, build_idxs); } - if constexpr (JoinOpType == TJoinOp::LEFT_ANTI_JOIN || - JoinOpType == TJoinOp::LEFT_SEMI_JOIN || - JoinOpType == TJoinOp::NULL_AWARE_LEFT_ANTI_JOIN) { - return _find_batch_left_semi_anti<JoinOpType, need_judge_null>( - keys, build_idx_map, probe_idx, probe_rows, probe_idxs); + if (JoinOpType == TJoinOp::LEFT_ANTI_JOIN || JoinOpType == TJoinOp::LEFT_SEMI_JOIN || + JoinOpType == TJoinOp::NULL_AWARE_LEFT_ANTI_JOIN) { + if (null_map) { + return _find_batch_left_semi_anti<JoinOpType, true>( + keys, build_idx_map, probe_idx, probe_rows, probe_idxs, null_map); + } else { + return _find_batch_left_semi_anti<JoinOpType, false>( + keys, build_idx_map, probe_idx, probe_rows, probe_idxs, nullptr); + } } - if constexpr (JoinOpType == TJoinOp::RIGHT_ANTI_JOIN || - JoinOpType == TJoinOp::RIGHT_SEMI_JOIN) { + if (JoinOpType == TJoinOp::RIGHT_ANTI_JOIN || JoinOpType == TJoinOp::RIGHT_SEMI_JOIN) { return _find_batch_right_semi_anti(keys, build_idx_map, probe_idx, probe_rows); } - return std::tuple {0, 0U, 0U}; + throw Exception(ErrorCode::INTERNAL_ERROR, "meet invalid hash join input"); } /** @@ -157,67 +165,16 @@ public: uint32_t* __restrict probe_idxs, uint32_t* __restrict build_idxs, uint8_t* __restrict null_flags, - bool picking_null_keys) { - uint32_t matched_cnt = 0; - const auto batch_size = max_batch_size; - - auto do_the_probe = [&]() { - /// If no any rows match the probe key, here start to handle null keys in build side. - /// The result of "Any = null" is null. - if (build_idx == 0 && !picking_null_keys) { - build_idx = first[bucket_size]; - picking_null_keys = true; // now pick null from build side - } - - while (build_idx && matched_cnt < batch_size) { - if (picking_null_keys || keys[probe_idx] == build_keys[build_idx]) { - build_idxs[matched_cnt] = build_idx; - probe_idxs[matched_cnt] = probe_idx; - null_flags[matched_cnt] = picking_null_keys; - matched_cnt++; - } - - build_idx = next[build_idx]; - - // If `build_idx` is 0, all matched keys are handled, - // now need to handle null keys in build side. - if (!build_idx && !picking_null_keys) { - build_idx = first[bucket_size]; - picking_null_keys = true; // now pick null keys from build side - } - } - - // may over batch_size when emplace 0 into build_idxs - if (!build_idx) { - probe_idxs[matched_cnt] = probe_idx; - build_idxs[matched_cnt] = 0; - picking_null_keys = false; - matched_cnt++; - } - - probe_idx++; - }; - - if (build_idx) { - do_the_probe(); - } - - while (probe_idx < probe_rows && matched_cnt < batch_size) { - build_idx = build_idx_map[probe_idx]; - - /// If the probe key is null - if (build_idx == bucket_size) { - probe_idx++; - break; - } - do_the_probe(); - if (picking_null_keys) { - break; - } + bool picking_null_keys, const uint8_t* null_map) { + if (null_map) { + return _find_null_aware_with_other_conjuncts_impl<true>( + keys, build_idx_map, probe_idx, build_idx, probe_rows, probe_idxs, build_idxs, + null_flags, picking_null_keys, null_map); + } else { + return _find_null_aware_with_other_conjuncts_impl<false>( + keys, build_idx_map, probe_idx, build_idx, probe_rows, probe_idxs, build_idxs, + null_flags, picking_null_keys, nullptr); } - - probe_idx -= (build_idx != 0); - return std::tuple {probe_idx, build_idx, matched_cnt, picking_null_keys}; } template <int JoinOpType, bool is_mark_join> @@ -250,15 +207,11 @@ public: bool has_null_key() { return _has_null_key; } - void pre_build_idxs(std::vector<uint32>& buckets, const uint8_t* null_map) const { - if (null_map) { - for (unsigned int& bucket : buckets) { - bucket = bucket == bucket_size ? bucket_size : first[bucket]; - } - } else { - for (unsigned int& bucket : buckets) { - bucket = first[bucket]; - } + bool keep_null_key() { return _keep_null_key; } + + void pre_build_idxs(std::vector<uint32>& buckets) const { + for (unsigned int& bucket : buckets) { + bucket = first[bucket]; } } @@ -267,8 +220,12 @@ private: auto _process_null_aware_left_half_join_for_empty_build_side(int probe_idx, int probe_rows, uint32_t* __restrict probe_idxs, uint32_t* __restrict build_idxs) { - static_assert(JoinOpType == TJoinOp::NULL_AWARE_LEFT_ANTI_JOIN || - JoinOpType == TJoinOp::NULL_AWARE_LEFT_SEMI_JOIN); + if (JoinOpType != TJoinOp::NULL_AWARE_LEFT_ANTI_JOIN && + JoinOpType != TJoinOp::NULL_AWARE_LEFT_SEMI_JOIN) { + throw Exception(ErrorCode::INTERNAL_ERROR, + "process_null_aware_left_half_join_for_empty_build_side meet invalid " + "hash join input"); + } uint32_t matched_cnt = 0; const auto batch_size = max_batch_size; @@ -298,16 +255,17 @@ private: return std::tuple {probe_idx, 0U, 0U}; } - template <int JoinOpType, bool need_judge_null> + template <int JoinOpType, bool has_null_map> auto _find_batch_left_semi_anti(const Key* __restrict keys, const uint32_t* __restrict build_idx_map, int probe_idx, - int probe_rows, uint32_t* __restrict probe_idxs) { + int probe_rows, uint32_t* __restrict probe_idxs, + const uint8_t* null_map) { uint32_t matched_cnt = 0; const auto batch_size = max_batch_size; while (probe_idx < probe_rows && matched_cnt < batch_size) { - if constexpr (need_judge_null) { - if (build_idx_map[probe_idx] == bucket_size) { + if constexpr (JoinOpType == TJoinOp::NULL_AWARE_LEFT_ANTI_JOIN && has_null_map) { + if (null_map[probe_idx]) { probe_idx++; continue; } @@ -325,7 +283,7 @@ private: return std::tuple {probe_idx, 0U, matched_cnt}; } - template <int JoinOpType, bool need_judge_null, bool only_need_to_match_one> + template <int JoinOpType, bool only_need_to_match_one> auto _find_batch_conjunct(const Key* __restrict keys, const uint32_t* __restrict build_idx_map, int probe_idx, uint32_t build_idx, int probe_rows, uint32_t* __restrict probe_idxs, uint32_t* __restrict build_idxs) { @@ -341,14 +299,6 @@ private: build_idxs[matched_cnt] = build_idx; matched_cnt++; } - } else if constexpr (need_judge_null) { - if (build_idx == bucket_size) { - build_idxs[matched_cnt] = build_idx; - probe_idxs[matched_cnt] = probe_idx; - build_idx = 0; - matched_cnt++; - break; - } } if (keys[probe_idx] == build_keys[build_idx]) { @@ -448,6 +398,76 @@ private: return std::tuple {probe_idx, build_idx, matched_cnt}; } + template <bool has_null_map> + auto _find_null_aware_with_other_conjuncts_impl( + const Key* __restrict keys, const uint32_t* __restrict build_idx_map, int probe_idx, + uint32_t build_idx, int probe_rows, uint32_t* __restrict probe_idxs, + uint32_t* __restrict build_idxs, uint8_t* __restrict null_flags, bool picking_null_keys, + const uint8_t* null_map) { + uint32_t matched_cnt = 0; + const auto batch_size = max_batch_size; + + auto do_the_probe = [&]() { + /// If no any rows match the probe key, here start to handle null keys in build side. + /// The result of "Any = null" is null. + if (build_idx == 0 && !picking_null_keys) { + build_idx = first[bucket_size]; + picking_null_keys = true; // now pick null from build side + } + + while (build_idx && matched_cnt < batch_size) { + if (picking_null_keys || keys[probe_idx] == build_keys[build_idx]) { + build_idxs[matched_cnt] = build_idx; + probe_idxs[matched_cnt] = probe_idx; + null_flags[matched_cnt] = picking_null_keys; + matched_cnt++; + } + + build_idx = next[build_idx]; + + // If `build_idx` is 0, all matched keys are handled, + // now need to handle null keys in build side. + if (!build_idx && !picking_null_keys) { + build_idx = first[bucket_size]; + picking_null_keys = true; // now pick null keys from build side + } + } + + // may over batch_size when emplace 0 into build_idxs + if (!build_idx) { + probe_idxs[matched_cnt] = probe_idx; + build_idxs[matched_cnt] = 0; + picking_null_keys = false; + matched_cnt++; + } + + probe_idx++; + }; + + if (build_idx) { + do_the_probe(); + } + + while (probe_idx < probe_rows && matched_cnt < batch_size) { + build_idx = build_idx_map[probe_idx]; + + /// If the probe key is null + if constexpr (has_null_map) { + if (null_map[probe_idx]) { + probe_idx++; + break; + } + } + do_the_probe(); + if (picking_null_keys) { + break; + } + } + + probe_idx -= (build_idx != 0); + return std::tuple {probe_idx, build_idx, matched_cnt, picking_null_keys}; + } + const Key* __restrict build_keys; std::vector<uint8_t> visited; @@ -461,6 +481,7 @@ private: mutable uint32_t iter_idx = 1; vectorized::Arena* pool; bool _has_null_key = false; + bool _keep_null_key = false; bool _empty_build_side = true; }; --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org For additional commands, e-mail: commits-h...@doris.apache.org