This is an automated email from the ASF dual-hosted git repository.
panxiaolei pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/master by this push:
new e956872d426 [Improvement](join) support all match one logic at join
lazy materialize (#50375)
e956872d426 is described below
commit e956872d4267bd900b2be9464dc051e1dacbdf54
Author: Pxl <[email protected]>
AuthorDate: Wed May 7 14:51:11 2025 +0800
[Improvement](join) support all match one logic at join lazy materialize
(#50375)
### What problem does this PR solve?
support all match one logic at join lazy materialize
```
select max(a.k4) from t10000000 a, t10000000 b where a.k1 = b.k1 and
a.k2>=b.k2 and b.k2>=a.k2;
before
- NonEqualJoinConjunctEvaluationTime: 71.582ms
after
- NonEqualJoinConjunctEvaluationTime: 41.480ms
```
### Check List (For Author)
- Test <!-- At least one of them must be included. -->
- [ ] Regression test
- [ ] Unit Test
- [ ] Manual test (add detailed scripts or steps below)
- [x] No need to test or manual test. Explain why:
- [ ] This is a refactor/code format and no logic has been changed.
- [x] Previous test can cover this change.
- [ ] No code files have been changed.
- [ ] Other reason <!-- Add your reason? -->
- Behavior changed:
- [x] No.
- [ ] Yes. <!-- Explain the behavior change -->
- Does this need documentation?
- [x] No.
- [ ] Yes. <!-- Add document PR link here. eg:
https://github.com/apache/doris-website/pull/1214 -->
### Check List (For Reviewer who merge this PR)
- [ ] Confirm the release note
- [ ] Confirm test cases
- [ ] Confirm document
- [ ] Add branch pick label <!-- Add branch pick label that this PR
should merge into -->
---
.../pipeline/exec/join/process_hash_table_probe.h | 7 +-
.../exec/join/process_hash_table_probe_impl.h | 90 +++++++++++-----------
2 files changed, 49 insertions(+), 48 deletions(-)
diff --git a/be/src/pipeline/exec/join/process_hash_table_probe.h
b/be/src/pipeline/exec/join/process_hash_table_probe.h
index c9a3e62f3aa..4fde7ed5fea 100644
--- a/be/src/pipeline/exec/join/process_hash_table_probe.h
+++ b/be/src/pipeline/exec/join/process_hash_table_probe.h
@@ -46,9 +46,9 @@ struct ProcessHashTableProbe {
~ProcessHashTableProbe() = default;
// output build side result column
- void build_side_output_column(vectorized::MutableColumns& mcol, int size,
bool is_mark_join);
+ void build_side_output_column(vectorized::MutableColumns& mcol, bool
is_mark_join);
- void probe_side_output_column(vectorized::MutableColumns& mcol, int size,
bool all_match_one);
+ void probe_side_output_column(vectorized::MutableColumns& mcol);
// Only process the join with no other join conjunct, because of no other
join conjunt
// the output block struct is same with mutable block. we can do more opt
on it and simplify
@@ -62,8 +62,7 @@ struct ProcessHashTableProbe {
// each matching join column need to be processed by other join conjunct.
so the struct of mutable block
// and output block may be different
// The output result is determined by the other join conjunct result and
same_to_prev struct
- Status do_other_join_conjuncts(vectorized::Block* output_block,
DorisVector<uint8_t>& visited,
- bool has_null_in_build_side);
+ Status do_other_join_conjuncts(vectorized::Block* output_block,
DorisVector<uint8_t>& visited);
Status do_mark_join_conjuncts(vectorized::Block* output_block, const
uint8_t* null_map);
diff --git a/be/src/pipeline/exec/join/process_hash_table_probe_impl.h
b/be/src/pipeline/exec/join/process_hash_table_probe_impl.h
index 40ab0c73768..df7ad9456bb 100644
--- a/be/src/pipeline/exec/join/process_hash_table_probe_impl.h
+++ b/be/src/pipeline/exec/join/process_hash_table_probe_impl.h
@@ -33,6 +33,28 @@
namespace doris::pipeline {
#include "common/compile_check_begin.h"
+
+static bool check_all_match_one(const auto& vecs) {
+ size_t size = vecs.size();
+ if (!size || vecs[size - 1] != vecs[0] + size - 1) {
+ return false;
+ }
+ for (size_t i = 1; i < size; i++) {
+ if (vecs[i] == vecs[i - 1]) {
+ return false;
+ }
+ }
+ return true;
+}
+
+static void insert_with_indexs(auto& dst, const auto& src, const auto& indexs,
bool all_match_one) {
+ if (all_match_one) {
+ dst->insert_range_from(*src, indexs[0], indexs.size());
+ } else {
+ dst->insert_indices_from(*src, indexs.data(), indexs.data() +
indexs.size());
+ }
+}
+
template <int JoinOpType>
ProcessHashTableProbe<JoinOpType>::ProcessHashTableProbe(HashJoinProbeLocalState*
parent,
int batch_size)
@@ -55,13 +77,14 @@
ProcessHashTableProbe<JoinOpType>::ProcessHashTableProbe(HashJoinProbeLocalState
template <int JoinOpType>
void
ProcessHashTableProbe<JoinOpType>::build_side_output_column(vectorized::MutableColumns&
mcol,
- int size,
bool is_mark_join) {
+ bool
is_mark_join) {
SCOPED_TIMER(_build_side_output_timer);
// indicates whether build_indexs contain 0
bool build_index_has_zero =
(JoinOpType != TJoinOp::INNER_JOIN && JoinOpType !=
TJoinOp::RIGHT_OUTER_JOIN) ||
_have_other_join_conjunct || is_mark_join;
+ size_t size = _build_indexs.size();
if (!size) {
return;
}
@@ -115,10 +138,10 @@ void
ProcessHashTableProbe<JoinOpType>::build_side_output_column(vectorized::Mut
}
template <int JoinOpType>
-void
ProcessHashTableProbe<JoinOpType>::probe_side_output_column(vectorized::MutableColumns&
mcol,
- int size,
bool all_match_one) {
+void
ProcessHashTableProbe<JoinOpType>::probe_side_output_column(vectorized::MutableColumns&
mcol) {
SCOPED_TIMER(_probe_side_output_timer);
auto& probe_block = _parent->_probe_block;
+ bool all_match_one = check_all_match_one(_probe_indexs.get_data());
for (int i = 0; i < _left_output_slot_flags.size(); ++i) {
if (_left_output_slot_flags[i]) {
@@ -129,15 +152,10 @@ void
ProcessHashTableProbe<JoinOpType>::probe_side_output_column(vectorized::Mut
if (_left_output_slot_flags[i] &&
!_parent_operator->is_lazy_materialized_column(i)) {
auto& column = probe_block.get_by_position(i).column;
- if (all_match_one) {
- mcol[i]->insert_range_from(*column,
_probe_indexs.get_element(0), size);
- } else {
- mcol[i]->insert_indices_from(*column,
_probe_indexs.get_data().data(),
- _probe_indexs.get_data().data() +
size);
- }
+ insert_with_indexs(mcol[i], column, _probe_indexs.get_data(),
all_match_one);
} else {
mcol[i]->insert_default();
- mcol[i] = vectorized::ColumnConst::create(std::move(mcol[i]),
size);
+ mcol[i] = vectorized::ColumnConst::create(std::move(mcol[i]),
_probe_indexs.size());
}
}
}
@@ -248,24 +266,15 @@ Status
ProcessHashTableProbe<JoinOpType>::process(HashTableType& hash_table_ctx,
current_offset = new_current_offset;
}
- build_side_output_column(mcol, current_offset, is_mark_join);
+ // input row_indexs's size may bigger than current_offset coz
_init_probe_side
+ _probe_indexs.resize(current_offset);
+ _build_indexs.resize(current_offset);
+
+ build_side_output_column(mcol, is_mark_join);
if (_have_other_join_conjunct ||
(JoinOpType != TJoinOp::RIGHT_SEMI_JOIN && JoinOpType !=
TJoinOp::RIGHT_ANTI_JOIN)) {
- auto check_all_match_one = [](const auto& vecs, int size) {
- if (!size || vecs[size - 1] != vecs[0] + size - 1) {
- return false;
- }
- for (int i = 1; i < size; i++) {
- if (vecs[i] == vecs[i - 1]) {
- return false;
- }
- }
- return true;
- };
-
- probe_side_output_column(mcol, current_offset,
- check_all_match_one(_probe_indexs.get_data(),
current_offset));
+ probe_side_output_column(mcol);
}
output_block->swap(mutable_block.to_block());
@@ -280,8 +289,7 @@ Status
ProcessHashTableProbe<JoinOpType>::process(HashTableType& hash_table_ctx,
->empty_build_side(); // empty build side will return
false to instead null
return do_mark_join_conjuncts(output_block, ignore_null_map ? nullptr
: null_map);
} else if (_have_other_join_conjunct) {
- return do_other_join_conjuncts(output_block,
hash_table_ctx.hash_table->get_visited(),
-
hash_table_ctx.hash_table->has_null_key());
+ return do_other_join_conjuncts(output_block,
hash_table_ctx.hash_table->get_visited());
}
return Status::OK();
@@ -323,7 +331,8 @@ Status
ProcessHashTableProbe<JoinOpType>::finalize_block_with_filter(
auto do_lazy_materialize = [&](const std::vector<bool>& output_slot_flags,
vectorized::ColumnVector<unsigned int>&
row_indexs,
- int column_offset, vectorized::Block*
source_block) {
+ int column_offset, vectorized::Block*
source_block,
+ bool try_all_match_one) {
std::vector<int> column_ids;
for (int i = 0; i < output_slot_flags.size(); ++i) {
if (output_slot_flags[i] &&
@@ -334,23 +343,16 @@ Status
ProcessHashTableProbe<JoinOpType>::finalize_block_with_filter(
if (column_ids.empty()) {
return;
}
- size_t row_count = filter_ptr->size();
- // input row_indexs's size may bigger than row_count coz
_init_probe_side
- row_indexs.resize(row_count);
-
+ const auto& column_filter =
+ assert_cast<const
vectorized::ColumnUInt8*>(filter_ptr.get())->get_data();
bool need_filter =
- simd::count_zero_num(
- (int8_t*)assert_cast<const
vectorized::ColumnUInt8*>(filter_ptr.get())
- ->get_data()
- .data(),
- row_count) != 0;
+ simd::count_zero_num((int8_t*)column_filter.data(),
column_filter.size()) != 0;
if (need_filter) {
- const auto& column_filter =
- assert_cast<const
vectorized::ColumnUInt8*>(filter_ptr.get())->get_data();
row_indexs.filter(column_filter);
}
const auto& container = row_indexs.get_data();
+ bool all_match_one = try_all_match_one &&
check_all_match_one(container);
for (int column_id : column_ids) {
int output_column_id = column_id + column_offset;
output_block->get_by_position(output_column_id).column =
@@ -361,12 +363,13 @@ Status
ProcessHashTableProbe<JoinOpType>::finalize_block_with_filter(
auto& src = source_block->get_by_position(column_id).column;
auto dst =
output_block->get_by_position(output_column_id).column->assume_mutable();
dst->clear();
- dst->insert_indices_from(*src, container.data(), container.data()
+ container.size());
+ insert_with_indexs(dst, src, container, all_match_one);
}
};
do_lazy_materialize(_right_output_slot_flags, _build_indexs,
(int)_right_col_idx,
- _build_block.get());
- do_lazy_materialize(_left_output_slot_flags, _probe_indexs, 0,
&_parent->_probe_block);
+ _build_block.get(), false);
+ // probe side indexs must be incremental so set try_all_match_one to true
+ do_lazy_materialize(_left_output_slot_flags, _probe_indexs, 0,
&_parent->_probe_block, true);
return Status::OK();
}
@@ -520,8 +523,7 @@ Status
ProcessHashTableProbe<JoinOpType>::do_mark_join_conjuncts(vectorized::Blo
template <int JoinOpType>
Status
ProcessHashTableProbe<JoinOpType>::do_other_join_conjuncts(vectorized::Block*
output_block,
-
DorisVector<uint8_t>& visited,
- bool
has_null_in_build_side) {
+
DorisVector<uint8_t>& visited) {
// dispose the other join conjunct exec
auto row_count = output_block->rows();
if (!row_count) {
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]