This is an automated email from the ASF dual-hosted git repository.
panxiaolei pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/master by this push:
new 65a32f76b05 [Feature](join) Support lazy materialization of columns
that are not used in join other conjunct (#49073)
65a32f76b05 is described below
commit 65a32f76b053f1f345f8e53e413152728ac1eb90
Author: Pxl <[email protected]>
AuthorDate: Tue Mar 25 11:57:59 2025 +0800
[Feature](join) Support lazy materialization of columns that are not used
in join other conjunct (#49073)
### What problem does this PR solve?
1. Support lazy materialization of columns that are not used in join
other conjunct
2. Simplify some code
before:
```
HASH_JOIN_OPERATOR (id=6 , nereids_id=1089):(ExecTime: 10sec315ms)
- BlocksProduced: 90.294K (90294)
- ExecTime: 10sec315ms
- NonEqualJoinConjunctEvaluationTime: 2sec454ms
- ProbeRows: 1.0M (1000000)
- ProbeWhenBuildSideOutputTime: 2sec223ms
- ProbeWhenProbeSideOutputTime: 3sec33ms
- ProbeWhenSearchHashTableTime: 1sec877ms
- ProjectionTime: 155.383ms
- RowsProduced: 1.0M (1000000)
```
after:
```
HASH_JOIN_OPERATOR (id=6 , nereids_id=1108):(ExecTime: 5sec669ms)
- BlocksProduced: 90.294K (90294)
- ExecTime: 5sec669ms
- NonEqualJoinConjunctEvaluationTime: 2sec111ms
- ProbeRows: 1.0M (1000000)
- ProbeWhenBuildSideOutputTime: 689.966ms
- ProbeWhenProbeSideOutputTime: 350.553ms
- ProbeWhenSearchHashTableTime: 1sec883ms
- ProjectionTime: 150.629ms
- RowsProduced: 1.0M (1000000)
```
### Check List (For Author)
- Test <!-- At least one of them must be included. -->
- [ ] Regression test
- [ ] Unit Test
- [ ] Manual test (add detailed scripts or steps below)
- [x] No need to test or manual test. Explain why:
- [ ] This is a refactor/code format and no logic has been changed.
- [x] Previous test can cover this change.
- [ ] No code files have been changed.
- [ ] Other reason <!-- Add your reason? -->
- Behavior changed:
- [x] No.
- [ ] Yes. <!-- Explain the behavior change -->
- Does this need documentation?
- [x] No.
- [ ] Yes. <!-- Add document PR link here. eg:
https://github.com/apache/doris-website/pull/1214 -->
### Check List (For Reviewer who merge this PR)
- [ ] Confirm the release note
- [ ] Confirm test cases
- [ ] Confirm document
- [ ] Add branch pick label <!-- Add branch pick label that this PR
should merge into -->
---
be/src/pipeline/exec/hashjoin_probe_operator.cpp | 40 +---
be/src/pipeline/exec/hashjoin_probe_operator.h | 16 +-
.../pipeline/exec/join/process_hash_table_probe.h | 41 ++--
.../exec/join/process_hash_table_probe_impl.h | 250 +++++++++++++--------
be/src/vec/common/hash_table/join_hash_table.h | 4 +-
be/src/vec/exprs/vexpr.h | 6 +
be/src/vec/exprs/vslot_ref.h | 4 +
7 files changed, 196 insertions(+), 165 deletions(-)
diff --git a/be/src/pipeline/exec/hashjoin_probe_operator.cpp
b/be/src/pipeline/exec/hashjoin_probe_operator.cpp
index 7f273680f6d..a432b59cfa2 100644
--- a/be/src/pipeline/exec/hashjoin_probe_operator.cpp
+++ b/be/src/pipeline/exec/hashjoin_probe_operator.cpp
@@ -93,34 +93,6 @@ void HashJoinProbeLocalState::prepare_for_next() {
_prepare_probe_block();
}
-bool HashJoinProbeLocalState::have_other_join_conjunct() const {
- return _parent->cast<HashJoinProbeOperatorX>()._have_other_join_conjunct;
-}
-
-bool HashJoinProbeLocalState::is_right_semi_anti() const {
- return _parent->cast<HashJoinProbeOperatorX>()._is_right_semi_anti;
-}
-
-bool HashJoinProbeLocalState::is_outer_join() const {
- return _parent->cast<HashJoinProbeOperatorX>()._is_outer_join;
-}
-
-std::vector<bool>* HashJoinProbeLocalState::left_output_slot_flags() {
- return &_parent->cast<HashJoinProbeOperatorX>()._left_output_slot_flags;
-}
-
-std::vector<bool>* HashJoinProbeLocalState::right_output_slot_flags() {
- return &_parent->cast<HashJoinProbeOperatorX>()._right_output_slot_flags;
-}
-
-vectorized::DataTypes HashJoinProbeLocalState::right_table_data_types() {
- return _parent->cast<HashJoinProbeOperatorX>()._right_table_data_types;
-}
-
-vectorized::DataTypes HashJoinProbeLocalState::left_table_data_types() {
- return _parent->cast<HashJoinProbeOperatorX>()._left_table_data_types;
-}
-
Status HashJoinProbeLocalState::close(RuntimeState* state) {
SCOPED_TIMER(exec_time_counter());
SCOPED_TIMER(_close_timer);
@@ -133,12 +105,6 @@ Status HashJoinProbeLocalState::close(RuntimeState* state)
{
if (process_hashtable_ctx._arena)
{
process_hashtable_ctx._arena.reset();
}
-
- if
(process_hashtable_ctx._serialize_key_arena) {
-
process_hashtable_ctx._serialize_key_arena.reset();
-
process_hashtable_ctx._serialized_key_buffer_size =
- 0;
- }
}},
*_process_hashtable_ctx_variants);
}
@@ -259,7 +225,7 @@ Status HashJoinProbeOperatorX::pull(doris::RuntimeState*
state, vectorized::Bloc
: nullptr,
mutable_join_block, &temp_block,
cast_set<uint32_t>(local_state._probe_block.rows()),
- _is_mark_join, _have_other_join_conjunct);
+ _is_mark_join);
} else {
st = Status::InternalError("uninited hash table");
}
@@ -522,6 +488,10 @@ Status HashJoinProbeOperatorX::prepare(RuntimeState*
state) {
RETURN_IF_ERROR(conjunct->prepare(state, *_intermediate_row_desc));
}
+ for (auto conjunct : _other_join_conjuncts) {
+
conjunct->root()->collect_slot_column_ids(_other_conjunct_refer_column_ids);
+ }
+
for (auto& conjunct : _mark_join_conjuncts) {
RETURN_IF_ERROR(conjunct->prepare(state, *_intermediate_row_desc));
}
diff --git a/be/src/pipeline/exec/hashjoin_probe_operator.h
b/be/src/pipeline/exec/hashjoin_probe_operator.h
index f3f1983975e..6fdf77e250d 100644
--- a/be/src/pipeline/exec/hashjoin_probe_operator.h
+++ b/be/src/pipeline/exec/hashjoin_probe_operator.h
@@ -61,14 +61,7 @@ public:
bool* eos, vectorized::Block*
temp_block,
bool check_rows_count = true);
- bool have_other_join_conjunct() const;
- bool is_right_semi_anti() const;
- bool is_outer_join() const;
- std::vector<bool>* left_output_slot_flags();
- std::vector<bool>* right_output_slot_flags();
- vectorized::DataTypes right_table_data_types();
- vectorized::DataTypes left_table_data_types();
- bool* has_null_in_build_side() { return
&_shared_state->_has_null_in_build_side; }
+ bool has_null_in_build_side() { return
_shared_state->_has_null_in_build_side; }
const std::shared_ptr<vectorized::Block>& build_block() const {
return _shared_state->build_block;
}
@@ -161,11 +154,17 @@ public:
bool need_finalize_variant_column() const { return
_need_finalize_variant_column; }
+ bool is_lazy_materialized_column(int column_id) const {
+ return _have_other_join_conjunct &&
!_other_conjunct_refer_column_ids.contains(column_id);
+ }
+
private:
Status _do_evaluate(vectorized::Block& block,
vectorized::VExprContextSPtrs& exprs,
RuntimeProfile::Counter& expr_call_timer,
std::vector<int>& res_col_ids) const;
friend class HashJoinProbeLocalState;
+ template <int JoinOpType>
+ friend struct ProcessHashTableProbe;
const TJoinDistributionType::type _join_distribution;
@@ -184,6 +183,7 @@ private:
std::vector<bool> _left_output_slot_flags;
std::vector<bool> _right_output_slot_flags;
bool _need_finalize_variant_column = false;
+ std::set<int> _other_conjunct_refer_column_ids;
std::vector<std::string> _right_table_column_names;
const std::vector<TExpr> _partition_exprs;
};
diff --git a/be/src/pipeline/exec/join/process_hash_table_probe.h
b/be/src/pipeline/exec/join/process_hash_table_probe.h
index 8cfcfc75e1b..0019b180de3 100644
--- a/be/src/pipeline/exec/join/process_hash_table_probe.h
+++ b/be/src/pipeline/exec/join/process_hash_table_probe.h
@@ -22,7 +22,6 @@
#include "vec/columns/column.h"
#include "vec/columns/columns_number.h"
#include "vec/common/arena.h"
-#include "vec/common/hash_table/join_hash_table.h"
namespace doris {
namespace vectorized {
@@ -33,12 +32,13 @@ struct HashJoinProbeContext;
namespace pipeline {
class HashJoinProbeLocalState;
+class HashJoinProbeOperatorX;
using MutableColumnPtr = vectorized::IColumn::MutablePtr;
using MutableColumns = std::vector<vectorized::MutableColumnPtr>;
using NullMap = vectorized::ColumnUInt8::Container;
-using ConstNullMapPtr = const vectorized::NullMap*;
+using ConstNullMapPtr = const NullMap*;
template <int JoinOpType>
struct ProcessHashTableProbe {
@@ -46,24 +46,19 @@ struct ProcessHashTableProbe {
~ProcessHashTableProbe() = default;
// output build side result column
- void build_side_output_column(vectorized::MutableColumns& mcol,
- const std::vector<bool>& output_slot_flags,
int size,
- bool have_other_join_conjunct, bool
is_mark_join);
+ void build_side_output_column(vectorized::MutableColumns& mcol, int size,
bool is_mark_join);
- void probe_side_output_column(vectorized::MutableColumns& mcol,
- const std::vector<bool>& output_slot_flags,
int size,
- bool all_match_one, bool
have_other_join_conjunct);
+ void probe_side_output_column(vectorized::MutableColumns& mcol, int size,
bool all_match_one);
template <typename HashTableType>
Status process(HashTableType& hash_table_ctx, ConstNullMapPtr null_map,
vectorized::MutableBlock& mutable_block, vectorized::Block*
output_block,
- uint32_t probe_rows, bool is_mark_join, bool
have_other_join_conjunct);
+ uint32_t probe_rows, bool is_mark_join);
// Only process the join with no other join conjunct, because of no other
join conjunt
// the output block struct is same with mutable block. we can do more opt
on it and simplify
// the logic of probe
- // TODO: opt the visited here to reduce the size of hash table
- template <typename HashTableType, bool with_other_conjuncts, bool
is_mark_join>
+ template <typename HashTableType, bool is_mark_join>
Status do_process(HashTableType& hash_table_ctx, const uint8_t* null_map,
vectorized::MutableBlock& mutable_block,
vectorized::Block* output_block,
uint32_t probe_rows);
@@ -74,12 +69,13 @@ struct ProcessHashTableProbe {
Status do_other_join_conjuncts(vectorized::Block* output_block,
DorisVector<uint8_t>& visited,
bool has_null_in_build_side);
- template <bool with_other_conjuncts>
Status do_mark_join_conjuncts(vectorized::Block* output_block, const
uint8_t* null_map);
+ Status finalize_block_with_filter(vectorized::Block* output_block, size_t
filter_column_id,
+ size_t column_to_keep);
+
template <typename HashTableType>
typename HashTableType::State _init_probe_side(HashTableType&
hash_table_ctx, size_t probe_rows,
- bool
with_other_join_conjuncts,
const uint8_t* null_map);
// Process full outer join/ right join / right semi/anti join to output
the join result
@@ -93,15 +89,17 @@ struct ProcessHashTableProbe {
uint32_t _process_probe_null_key(uint32_t probe_idx);
pipeline::HashJoinProbeLocalState* _parent = nullptr;
+ pipeline::HashJoinProbeOperatorX* _parent_operator = nullptr;
+
const int _batch_size;
const std::shared_ptr<vectorized::Block>& _build_block;
std::unique_ptr<vectorized::Arena> _arena;
- std::vector<StringRef> _probe_keys;
- std::vector<uint32_t> _probe_indexs;
+ vectorized::ColumnVector<uint32_t> _probe_indexs;
+ vectorized::ColumnVector<uint32_t> _output_row_indexs;
bool _probe_visited = false;
bool _picking_null_keys = false;
- std::vector<uint32_t> _build_indexs;
+ vectorized::ColumnVector<uint32_t> _build_indexs;
std::vector<uint8_t> _null_flags;
/// If the probe key of one row on left side is null,
@@ -111,19 +109,14 @@ struct ProcessHashTableProbe {
std::vector<int> _build_blocks_locs;
- size_t _serialized_key_buffer_size {0};
- uint8_t* _serialized_key_buffer = nullptr;
- std::unique_ptr<vectorized::Arena> _serialize_key_arena;
std::vector<char> _probe_side_find_result;
- bool _have_other_join_conjunct;
- bool _is_right_semi_anti;
- std::vector<bool>* _left_output_slot_flags = nullptr;
- std::vector<bool>* _right_output_slot_flags = nullptr;
+ const bool _have_other_join_conjunct;
+ const std::vector<bool>& _left_output_slot_flags;
+ const std::vector<bool>& _right_output_slot_flags;
// nullable column but not has null except first row
std::vector<bool> _build_column_has_null;
bool _need_calculate_build_index_has_zero = true;
- bool* _has_null_in_build_side;
RuntimeProfile::Counter* _search_hashtable_timer = nullptr;
RuntimeProfile::Counter* _init_probe_side_timer = nullptr;
diff --git a/be/src/pipeline/exec/join/process_hash_table_probe_impl.h
b/be/src/pipeline/exec/join/process_hash_table_probe_impl.h
index b06e7c7c388..4c46d581bc4 100644
--- a/be/src/pipeline/exec/join/process_hash_table_probe_impl.h
+++ b/be/src/pipeline/exec/join/process_hash_table_probe_impl.h
@@ -25,10 +25,10 @@
#include "process_hash_table_probe.h"
#include "runtime/thread_context.h" // IWYU pragma: keep
#include "util/simd/bits.h"
+#include "vec/columns/column_const.h"
#include "vec/columns/column_filter_helper.h"
#include "vec/columns/column_nullable.h"
#include "vec/columns/columns_number.h"
-#include "vec/common/hash_table/join_hash_table.h"
#include "vec/exprs/vexpr_context.h"
namespace doris::pipeline {
@@ -37,44 +37,42 @@ template <int JoinOpType>
ProcessHashTableProbe<JoinOpType>::ProcessHashTableProbe(HashJoinProbeLocalState*
parent,
int batch_size)
: _parent(parent),
+ _parent_operator(&parent->_parent->template
cast<HashJoinProbeOperatorX>()),
_batch_size(batch_size),
_build_block(parent->build_block()),
- _have_other_join_conjunct(parent->have_other_join_conjunct()),
- _is_right_semi_anti(parent->is_right_semi_anti()),
- _left_output_slot_flags(parent->left_output_slot_flags()),
- _right_output_slot_flags(parent->right_output_slot_flags()),
- _has_null_in_build_side(parent->has_null_in_build_side()),
+
_have_other_join_conjunct(_parent_operator->_have_other_join_conjunct),
+ _left_output_slot_flags(_parent_operator->_left_output_slot_flags),
+ _right_output_slot_flags(_parent_operator->_right_output_slot_flags),
_search_hashtable_timer(parent->_search_hashtable_timer),
_init_probe_side_timer(parent->_init_probe_side_timer),
_build_side_output_timer(parent->_build_side_output_timer),
_probe_side_output_timer(parent->_probe_side_output_timer),
_finish_probe_phase_timer(parent->_finish_probe_phase_timer),
- _right_col_idx((_is_right_semi_anti && !_have_other_join_conjunct)
+ _right_col_idx((_parent_operator->_is_right_semi_anti &&
!_have_other_join_conjunct)
? 0
- : _parent->left_table_data_types().size()),
- _right_col_len(_parent->right_table_data_types().size()) {}
+ :
_parent_operator->_left_table_data_types.size()),
+ _right_col_len(_parent_operator->_right_table_data_types.size()) {}
template <int JoinOpType>
-void ProcessHashTableProbe<JoinOpType>::build_side_output_column(
- vectorized::MutableColumns& mcol, const std::vector<bool>&
output_slot_flags, int size,
- bool have_other_join_conjunct, bool is_mark_join) {
+void
ProcessHashTableProbe<JoinOpType>::build_side_output_column(vectorized::MutableColumns&
mcol,
+ int size,
bool is_mark_join) {
SCOPED_TIMER(_build_side_output_timer);
// indicates whether build_indexs contain 0
bool build_index_has_zero =
(JoinOpType != TJoinOp::INNER_JOIN && JoinOpType !=
TJoinOp::RIGHT_OUTER_JOIN) ||
- have_other_join_conjunct || is_mark_join;
+ _have_other_join_conjunct || is_mark_join;
if (!size) {
return;
}
if (!build_index_has_zero && _build_column_has_null.empty()) {
_need_calculate_build_index_has_zero = false;
- _build_column_has_null.resize(output_slot_flags.size());
+ _build_column_has_null.resize(_right_output_slot_flags.size());
for (int i = 0; i < _right_col_len; i++) {
const auto& column = *_build_block->safe_get_by_position(i).column;
_build_column_has_null[i] = false;
- if (output_slot_flags[i] && column.is_nullable()) {
+ if (_right_output_slot_flags[i] && column.is_nullable()) {
const auto& nullable = assert_cast<const
vectorized::ColumnNullable&>(column);
_build_column_has_null[i] = !simd::contain_byte(
nullable.get_null_map_data().data() + 1,
nullable.size() - 1, 1);
@@ -83,16 +81,18 @@ void
ProcessHashTableProbe<JoinOpType>::build_side_output_column(
}
}
- for (size_t i = 0; i < _right_col_len && i + _right_col_idx < mcol.size();
i++) {
+ for (int i = 0; i < _right_col_len && i + _right_col_idx < mcol.size();
i++) {
const auto& column = *_build_block->safe_get_by_position(i).column;
- if (output_slot_flags[i]) {
+ if (_right_output_slot_flags[i] &&
+ !_parent_operator->is_lazy_materialized_column(i +
(int)_right_col_idx)) {
if (!build_index_has_zero && _build_column_has_null[i]) {
assert_cast<vectorized::ColumnNullable*>(mcol[i +
_right_col_idx].get())
- ->insert_indices_from_not_has_null(column,
_build_indexs.data(),
-
_build_indexs.data() + size);
+ ->insert_indices_from_not_has_null(column,
_build_indexs.get_data().data(),
+
_build_indexs.get_data().data() + size);
} else {
- mcol[i + _right_col_idx]->insert_indices_from(column,
_build_indexs.data(),
-
_build_indexs.data() + size);
+ mcol[i + _right_col_idx]->insert_indices_from(
+ column, _build_indexs.get_data().data(),
+ _build_indexs.get_data().data() + size);
}
} else if (i + _right_col_idx != _parent->_mark_column_id) {
mcol[i + _right_col_idx]->insert_default();
@@ -115,23 +115,25 @@ void
ProcessHashTableProbe<JoinOpType>::build_side_output_column(
}
template <int JoinOpType>
-void ProcessHashTableProbe<JoinOpType>::probe_side_output_column(
- vectorized::MutableColumns& mcol, const std::vector<bool>&
output_slot_flags, int size,
- bool all_match_one, bool have_other_join_conjunct) {
+void
ProcessHashTableProbe<JoinOpType>::probe_side_output_column(vectorized::MutableColumns&
mcol,
+ int size,
bool all_match_one) {
SCOPED_TIMER(_probe_side_output_timer);
auto& probe_block = _parent->_probe_block;
- for (int i = 0; i < output_slot_flags.size(); ++i) {
- if (output_slot_flags[i]) {
- if (auto& p = _parent->parent()->cast<HashJoinProbeOperatorX>();
- p.need_finalize_variant_column()) {
+
+ for (int i = 0; i < _left_output_slot_flags.size(); ++i) {
+ if (_left_output_slot_flags[i]) {
+ if (_parent_operator->need_finalize_variant_column()) {
std::move(*probe_block.get_by_position(i).column).mutate()->finalize();
}
+ }
+
+ if (_left_output_slot_flags[i] &&
!_parent_operator->is_lazy_materialized_column(i)) {
auto& column = probe_block.get_by_position(i).column;
if (all_match_one) {
- mcol[i]->insert_range_from(*column, _probe_indexs[0], size);
+ mcol[i]->insert_range_from(*column,
_probe_indexs.get_element(0), size);
} else {
- mcol[i]->insert_indices_from(*column, _probe_indexs.data(),
- _probe_indexs.data() + size);
+ mcol[i]->insert_indices_from(*column,
_probe_indexs.get_data().data(),
+ _probe_indexs.get_data().data() +
size);
}
} else {
mcol[i]->insert_default();
@@ -143,14 +145,13 @@ void
ProcessHashTableProbe<JoinOpType>::probe_side_output_column(
template <int JoinOpType>
template <typename HashTableType>
typename HashTableType::State
ProcessHashTableProbe<JoinOpType>::_init_probe_side(
- HashTableType& hash_table_ctx, size_t probe_rows, bool
with_other_join_conjuncts,
- const uint8_t* null_map) {
+ HashTableType& hash_table_ctx, size_t probe_rows, const uint8_t*
null_map) {
// may over batch size 1 for some outer join case
_probe_indexs.resize(_batch_size + 1);
_build_indexs.resize(_batch_size + 1);
if ((JoinOpType == TJoinOp::NULL_AWARE_LEFT_ANTI_JOIN ||
JoinOpType == TJoinOp::NULL_AWARE_LEFT_SEMI_JOIN) &&
- with_other_join_conjuncts) {
+ _have_other_join_conjunct) {
_null_flags.resize(_batch_size + 1);
}
@@ -174,7 +175,7 @@ typename HashTableType::State
ProcessHashTableProbe<JoinOpType>::_init_probe_sid
}
template <int JoinOpType>
-template <typename HashTableType, bool with_other_conjuncts, bool is_mark_join>
+template <typename HashTableType, bool is_mark_join>
Status ProcessHashTableProbe<JoinOpType>::do_process(HashTableType&
hash_table_ctx,
const uint8_t* null_map,
vectorized::MutableBlock&
mutable_block,
@@ -188,7 +189,7 @@ Status
ProcessHashTableProbe<JoinOpType>::do_process(HashTableType& hash_table_c
auto& build_index = _parent->_build_index;
{
SCOPED_TIMER(_init_probe_side_timer);
- _init_probe_side<HashTableType>(hash_table_ctx, probe_rows,
with_other_conjuncts, null_map);
+ _init_probe_side<HashTableType>(hash_table_ctx, probe_rows, null_map);
}
auto& mcol = mutable_block.mutable_columns();
@@ -196,7 +197,7 @@ Status
ProcessHashTableProbe<JoinOpType>::do_process(HashTableType& hash_table_c
uint32_t current_offset = 0;
if ((JoinOpType == TJoinOp::NULL_AWARE_LEFT_ANTI_JOIN ||
JoinOpType == TJoinOp::NULL_AWARE_LEFT_SEMI_JOIN) &&
- with_other_conjuncts) {
+ _have_other_join_conjunct) {
SCOPED_TIMER(_search_hashtable_timer);
/// If `_build_index_for_null_probe_key` is not zero, it means we are
in progress of handling probe null key.
@@ -214,8 +215,9 @@ Status
ProcessHashTableProbe<JoinOpType>::do_process(HashTableType& hash_table_c
auto [new_probe_idx, new_build_idx, new_current_offset,
picking_null_keys] =
hash_table_ctx.hash_table->find_null_aware_with_other_conjuncts(
hash_table_ctx.keys,
hash_table_ctx.bucket_nums.data(), probe_index,
- build_index, probe_rows, _probe_indexs.data(),
_build_indexs.data(),
- _null_flags.data(), _picking_null_keys, null_map);
+ build_index, probe_rows,
_probe_indexs.get_data().data(),
+ _build_indexs.get_data().data(),
_null_flags.data(), _picking_null_keys,
+ null_map);
probe_index = new_probe_idx;
build_index = new_build_idx;
current_offset = new_current_offset;
@@ -237,20 +239,20 @@ Status
ProcessHashTableProbe<JoinOpType>::do_process(HashTableType& hash_table_c
auto [new_probe_idx, new_build_idx, new_current_offset] =
hash_table_ctx.hash_table->template find_batch<JoinOpType>(
hash_table_ctx.keys,
hash_table_ctx.bucket_nums.data(), probe_index,
- build_index, cast_set<int32_t>(probe_rows),
_probe_indexs.data(),
- _probe_visited, _build_indexs.data(), null_map,
with_other_conjuncts,
- is_mark_join, !_parent->_mark_join_conjuncts.empty());
+ build_index, cast_set<int32_t>(probe_rows),
_probe_indexs.get_data().data(),
+ _probe_visited, _build_indexs.get_data().data(),
null_map,
+ _have_other_join_conjunct, is_mark_join,
+ !_parent->_mark_join_conjuncts.empty());
probe_index = new_probe_idx;
build_index = new_build_idx;
current_offset = new_current_offset;
}
- build_side_output_column(mcol, *_right_output_slot_flags, current_offset,
with_other_conjuncts,
- is_mark_join);
+ build_side_output_column(mcol, current_offset, is_mark_join);
- if constexpr (with_other_conjuncts || (JoinOpType !=
TJoinOp::RIGHT_SEMI_JOIN &&
- JoinOpType !=
TJoinOp::RIGHT_ANTI_JOIN)) {
- auto check_all_match_one = [](const std::vector<uint32_t>& vecs, int
size) {
+ if (_have_other_join_conjunct ||
+ (JoinOpType != TJoinOp::RIGHT_SEMI_JOIN && JoinOpType !=
TJoinOp::RIGHT_ANTI_JOIN)) {
+ auto check_all_match_one = [](const auto& vecs, int size) {
if (!size || vecs[size - 1] != vecs[0] + size - 1) {
return false;
}
@@ -262,9 +264,8 @@ Status
ProcessHashTableProbe<JoinOpType>::do_process(HashTableType& hash_table_c
return true;
};
- probe_side_output_column(mcol, *_left_output_slot_flags,
current_offset,
- check_all_match_one(_probe_indexs,
current_offset),
- with_other_conjuncts);
+ probe_side_output_column(mcol, current_offset,
+ check_all_match_one(_probe_indexs.get_data(),
current_offset));
}
output_block->swap(mutable_block.to_block());
@@ -275,9 +276,8 @@ Status
ProcessHashTableProbe<JoinOpType>::do_process(HashTableType& hash_table_c
JoinOpType == TJoinOp::NULL_AWARE_LEFT_SEMI_JOIN) &&
hash_table_ctx.hash_table
->empty_build_side(); // empty build side will return
false to instead null
- return do_mark_join_conjuncts<with_other_conjuncts>(output_block,
- ignore_null_map ?
nullptr : null_map);
- } else if constexpr (with_other_conjuncts) {
+ return do_mark_join_conjuncts(output_block, ignore_null_map ? nullptr
: null_map);
+ } else if (_have_other_join_conjunct) {
return do_other_join_conjuncts(output_block,
hash_table_ctx.hash_table->get_visited(),
hash_table_ctx.hash_table->has_null_key());
}
@@ -293,15 +293,15 @@ uint32_t
ProcessHashTableProbe<JoinOpType>::_process_probe_null_key(uint32_t pro
DCHECK_LT(0, _build_index_for_null_probe_key);
uint32_t matched_cnt = 0;
for (; _build_index_for_null_probe_key < rows && matched_cnt <
_batch_size; ++matched_cnt) {
- _probe_indexs[matched_cnt] = probe_index;
- _build_indexs[matched_cnt] = _build_index_for_null_probe_key++;
+ _probe_indexs.get_element(matched_cnt) = probe_index;
+ _build_indexs.get_element(matched_cnt) =
_build_index_for_null_probe_key++;
_null_flags[matched_cnt] = 1;
}
if (_build_index_for_null_probe_key == rows) {
_build_index_for_null_probe_key = 0;
- _probe_indexs[matched_cnt] = probe_index;
- _build_indexs[matched_cnt] = 0;
+ _probe_indexs.get_element(matched_cnt) = probe_index;
+ _build_indexs.get_element(matched_cnt) = 0;
_null_flags[matched_cnt] = 0;
matched_cnt++;
}
@@ -309,6 +309,65 @@ uint32_t
ProcessHashTableProbe<JoinOpType>::_process_probe_null_key(uint32_t pro
return matched_cnt;
}
+template <int JoinOpType>
+Status ProcessHashTableProbe<JoinOpType>::finalize_block_with_filter(
+ vectorized::Block* output_block, size_t filter_column_id, size_t
column_to_keep) {
+ vectorized::ColumnPtr filter_ptr =
output_block->get_by_position(filter_column_id).column;
+ RETURN_IF_ERROR(
+ vectorized::Block::filter_block(output_block, filter_column_id,
column_to_keep));
+
+ auto do_lazy_materialize = [&](const std::vector<bool>& output_slot_flags,
+ vectorized::ColumnVector<unsigned int>&
row_indexs,
+ int column_offset, vectorized::Block*
source_block) {
+ if (!_have_other_join_conjunct) {
+ return;
+ }
+ std::vector<int> column_ids;
+ for (int i = 0; i < output_slot_flags.size(); ++i) {
+ if (output_slot_flags[i] &&
+ _parent_operator->is_lazy_materialized_column(i +
column_offset)) {
+ column_ids.push_back(i);
+ }
+ }
+ if (column_ids.empty()) {
+ return;
+ }
+ size_t row_count = filter_ptr->size();
+ // input row_indexs's size may bigger than row_count coz
_init_probe_side
+ row_indexs.resize(row_count);
+
+ bool need_filter =
+ simd::count_zero_num(
+ (int8_t*)assert_cast<const
vectorized::ColumnUInt8*>(filter_ptr.get())
+ ->get_data()
+ .data(),
+ row_count) != 0;
+ if (need_filter) {
+ const auto& column_filter =
+ assert_cast<const
vectorized::ColumnUInt8*>(filter_ptr.get())->get_data();
+ row_indexs.filter(column_filter);
+ }
+
+ const auto& container = row_indexs.get_data();
+ for (int column_id : column_ids) {
+ int output_column_id = column_id + column_offset;
+ output_block->get_by_position(output_column_id).column =
+ assert_cast<const vectorized::ColumnConst*>(
+
output_block->get_by_position(output_column_id).column.get())
+ ->get_data_column_ptr();
+
+ auto& src = source_block->get_by_position(column_id).column;
+ auto dst =
output_block->get_by_position(output_column_id).column->assume_mutable();
+ dst->clear();
+ dst->insert_indices_from(*src, container.data(), container.data()
+ container.size());
+ }
+ };
+ do_lazy_materialize(_right_output_slot_flags, _build_indexs,
(int)_right_col_idx,
+ _build_block.get());
+ do_lazy_materialize(_left_output_slot_flags, _probe_indexs, 0,
&_parent->_probe_block);
+ return Status::OK();
+}
+
/**
* Mark join: there is a column named mark column which stores the result
of mark join conjunct.
* For example:
@@ -343,7 +402,6 @@ uint32_t
ProcessHashTableProbe<JoinOpType>::_process_probe_null_key(uint32_t pro
* So this query will be a "null aware left anti join", which means the
equal conjunct's result should be nullable.
*/
template <int JoinOpType>
-template <bool with_other_conjuncts>
Status
ProcessHashTableProbe<JoinOpType>::do_mark_join_conjuncts(vectorized::Block*
output_block,
const
uint8_t* null_map) {
DCHECK(JoinOpType == TJoinOp::LEFT_ANTI_JOIN ||
@@ -377,23 +435,23 @@ Status
ProcessHashTableProbe<JoinOpType>::do_mark_join_conjuncts(vectorized::Blo
// select 4 not in (2, 3, null) => null, select 4 not in (2, 3) => true
// select 4 in (2, 3, null) => null, select 4 in (2, 3) => false
for (size_t i = 0; i != row_count; ++i) {
- mark_filter_data[i] = _build_indexs[i] != 0;
+ mark_filter_data[i] = _build_indexs.get_element(i) != 0;
}
- if constexpr (with_other_conjuncts) {
+ if (_have_other_join_conjunct) {
// _null_flags is true means build or probe side of the row is null
memcpy(mark_null_map, _null_flags.data(), row_count);
} else {
if (null_map) {
// probe side of the row is null, so the mark sign should also
be null.
for (size_t i = 0; i != row_count; ++i) {
- mark_null_map[i] |= null_map[_probe_indexs[i]];
+ mark_null_map[i] |= null_map[_probe_indexs.get_element(i)];
}
}
- if (!with_other_conjuncts && *_has_null_in_build_side) {
+ if (!_have_other_join_conjunct &&
_parent->has_null_in_build_side()) {
// _has_null_in_build_side will change false to null when row
not matched
for (size_t i = 0; i != row_count; ++i) {
- mark_null_map[i] |= _build_indexs[i] == 0;
+ mark_null_map[i] |= _build_indexs.get_element(i) == 0;
}
}
}
@@ -401,11 +459,11 @@ Status
ProcessHashTableProbe<JoinOpType>::do_mark_join_conjuncts(vectorized::Blo
// for non null aware join, build_indexs is 0 which means there is no
match
// sometimes null will be returned in conjunct, but it should not
actually be null.
for (size_t i = 0; i != row_count; ++i) {
- mark_null_map[i] &= _build_indexs[i] != 0;
+ mark_null_map[i] &= _build_indexs.get_element(i) != 0;
}
}
- if constexpr (with_other_conjuncts) {
+ if (_have_other_join_conjunct) {
vectorized::IColumn::Filter other_conjunct_filter(row_count, 1);
{
bool can_be_filter_all = false;
@@ -428,19 +486,20 @@ Status
ProcessHashTableProbe<JoinOpType>::do_mark_join_conjuncts(vectorized::Blo
auto filter_column = vectorized::ColumnUInt8::create(row_count, 0);
auto* __restrict filter_map = filter_column->get_data().data();
for (size_t i = 0; i != row_count; ++i) {
- if (_parent->_last_probe_match == _probe_indexs[i]) {
+ if (_parent->_last_probe_match == _probe_indexs.get_element(i)) {
continue;
}
- if (_build_indexs[i] == 0) {
- bool has_null_mark_value = _parent->_last_probe_null_mark ==
_probe_indexs[i];
+ if (_build_indexs.get_element(i) == 0) {
+ bool has_null_mark_value =
+ _parent->_last_probe_null_mark ==
_probe_indexs.get_element(i);
filter_map[i] = true;
mark_filter_data[i] = false;
mark_null_map[i] |= has_null_mark_value;
} else if (mark_null_map[i]) {
- _parent->_last_probe_null_mark = _probe_indexs[i];
+ _parent->_last_probe_null_mark = _probe_indexs.get_element(i);
} else if (mark_filter_data[i]) {
filter_map[i] = true;
- _parent->_last_probe_match = _probe_indexs[i];
+ _parent->_last_probe_match = _probe_indexs.get_element(i);
}
}
@@ -454,7 +513,7 @@ Status
ProcessHashTableProbe<JoinOpType>::do_mark_join_conjuncts(vectorized::Blo
auto result_column_id = output_block->columns();
output_block->insert(
{std::move(filter_column),
std::make_shared<vectorized::DataTypeUInt8>(), ""});
- return vectorized::Block::filter_block(output_block, result_column_id,
result_column_id);
+ return finalize_block_with_filter(output_block, result_column_id,
result_column_id);
}
template <int JoinOpType>
@@ -495,24 +554,24 @@ Status
ProcessHashTableProbe<JoinOpType>::do_other_join_conjuncts(vectorized::Bl
// process equal-conjuncts-matched tuples that are newly generated
// in this run if there are any.
- for (int i = 0; i < row_count; ++i) {
- bool join_hit = _build_indexs[i];
+ for (size_t i = 0; i < row_count; ++i) {
+ bool join_hit = _build_indexs.get_element(i);
bool other_hit = filter_column_ptr[i];
if (!join_hit) {
- filter_map[i] = _parent->_last_probe_match != _probe_indexs[i];
+ filter_map[i] = _parent->_last_probe_match !=
_probe_indexs.get_element(i);
} else {
filter_map[i] = other_hit;
}
if (filter_map[i]) {
- _parent->_last_probe_match = _probe_indexs[i];
+ _parent->_last_probe_match = _probe_indexs.get_element(i);
}
}
for (size_t i = 0; i < row_count; ++i) {
if (filter_map[i]) {
if constexpr (JoinOpType == TJoinOp::FULL_OUTER_JOIN) {
- visited[_build_indexs[i]] = 1;
+ visited[_build_indexs.get_element(i)] = 1;
}
}
}
@@ -524,24 +583,24 @@ Status
ProcessHashTableProbe<JoinOpType>::do_other_join_conjuncts(vectorized::Bl
auto* __restrict filter_map = new_filter_column->get_data().data();
for (size_t i = 0; i < row_count; ++i) {
- bool not_matched_before = _parent->_last_probe_match !=
_probe_indexs[i];
+ bool not_matched_before = _parent->_last_probe_match !=
_probe_indexs.get_element(i);
if constexpr (JoinOpType == TJoinOp::LEFT_SEMI_JOIN) {
- if (_build_indexs[i] == 0) {
+ if (_build_indexs.get_element(i) == 0) {
filter_map[i] = false;
} else if (filter_column_ptr[i]) {
filter_map[i] = not_matched_before;
- _parent->_last_probe_match = _probe_indexs[i];
+ _parent->_last_probe_match = _probe_indexs.get_element(i);
} else {
filter_map[i] = false;
}
} else {
- if (_build_indexs[i] == 0) {
+ if (_build_indexs.get_element(i) == 0) {
filter_map[i] = not_matched_before;
} else {
filter_map[i] = false;
if (filter_column_ptr[i]) {
- _parent->_last_probe_match = _probe_indexs[i];
+ _parent->_last_probe_match =
_probe_indexs.get_element(i);
}
}
}
@@ -551,11 +610,11 @@ Status
ProcessHashTableProbe<JoinOpType>::do_other_join_conjuncts(vectorized::Bl
} else if constexpr (JoinOpType == TJoinOp::RIGHT_SEMI_JOIN ||
JoinOpType == TJoinOp::RIGHT_ANTI_JOIN) {
for (int i = 0; i < row_count; ++i) {
- visited[_build_indexs[i]] |= filter_column_ptr[i];
+ visited[_build_indexs.get_element(i)] |= filter_column_ptr[i];
}
} else if constexpr (JoinOpType == TJoinOp::RIGHT_OUTER_JOIN) {
for (int i = 0; i < row_count; ++i) {
- visited[_build_indexs[i]] |= filter_column_ptr[i];
+ visited[_build_indexs.get_element(i)] |= filter_column_ptr[i];
}
}
@@ -568,8 +627,8 @@ Status
ProcessHashTableProbe<JoinOpType>::do_other_join_conjuncts(vectorized::Bl
JoinOpType == TJoinOp::NULL_AWARE_LEFT_ANTI_JOIN) {
orig_columns = _right_col_idx;
}
- RETURN_IF_ERROR(
- vectorized::Block::filter_block(output_block,
result_column_id, orig_columns));
+
+ return finalize_block_with_filter(output_block, result_column_id,
orig_columns);
}
return Status::OK();
@@ -602,17 +661,18 @@ Status
ProcessHashTableProbe<JoinOpType>::finish_probing(HashTableType& hash_tab
mcol.size(), _right_col_len, _right_col_idx);
}
for (size_t j = 0; j < _right_col_len; ++j) {
- if (_right_output_slot_flags->at(j)) {
+ if (_right_output_slot_flags[j]) {
const auto& column =
*_build_block->safe_get_by_position(j).column;
- mcol[j + _right_col_idx]->insert_indices_from(column,
_build_indexs.data(),
-
_build_indexs.data() + block_size);
+ mcol[j + _right_col_idx]->insert_indices_from(
+ column, _build_indexs.get_data().data(),
+ _build_indexs.get_data().data() + block_size);
} else {
mcol[j + _right_col_idx]->resize(block_size);
}
}
// just resize the left table column in case with other conjunct to
make block size is not zero
- if (_is_right_semi_anti && _have_other_join_conjunct) {
+ if (_parent_operator->_is_right_semi_anti &&
_have_other_join_conjunct) {
for (int i = 0; i < _right_col_idx; ++i) {
mcol[i]->resize(block_size);
}
@@ -638,17 +698,15 @@ Status
ProcessHashTableProbe<JoinOpType>::process(HashTableType& hash_table_ctx,
vectorized::ConstNullMapPtr
null_map,
vectorized::MutableBlock&
mutable_block,
vectorized::Block*
output_block,
- uint32_t probe_rows, bool
is_mark_join,
- bool
have_other_join_conjunct) {
+ uint32_t probe_rows, bool
is_mark_join) {
Status res;
std::visit(
- [&](auto is_mark_join, auto have_other_join_conjunct) {
- res = do_process<HashTableType, have_other_join_conjunct,
is_mark_join>(
+ [&](auto is_mark_join) {
+ res = do_process<HashTableType, is_mark_join>(
hash_table_ctx, null_map ? null_map->data() : nullptr,
mutable_block,
output_block, probe_rows);
},
- vectorized::make_bool_variant(is_mark_join),
- vectorized::make_bool_variant(have_other_join_conjunct));
+ vectorized::make_bool_variant(is_mark_join));
return res;
}
@@ -664,7 +722,7 @@ struct ExtractType<T(U)> {
template Status
ProcessHashTableProbe<JoinOpType>::process<ExtractType<void(T)>::Type>( \
ExtractType<void(T)>::Type & hash_table_ctx,
vectorized::ConstNullMapPtr null_map, \
vectorized::MutableBlock & mutable_block, vectorized::Block *
output_block, \
- uint32_t probe_rows, bool is_mark_join, bool
have_other_join_conjunct); \
+ uint32_t probe_rows, bool is_mark_join);
\
template Status
ProcessHashTableProbe<JoinOpType>::finish_probing<ExtractType<void(T)>::Type>( \
ExtractType<void(T)>::Type & hash_table_ctx,
vectorized::MutableBlock & mutable_block, \
vectorized::Block * output_block, bool* eos, bool is_mark_join);
diff --git a/be/src/vec/common/hash_table/join_hash_table.h
b/be/src/vec/common/hash_table/join_hash_table.h
index faccb4136d3..96d2ab2fcb9 100644
--- a/be/src/vec/common/hash_table/join_hash_table.h
+++ b/be/src/vec/common/hash_table/join_hash_table.h
@@ -179,7 +179,7 @@ public:
}
template <int JoinOpType, bool is_mark_join>
- bool iterate_map(std::vector<uint32_t>& build_idxs,
+ bool iterate_map(vectorized::ColumnVector<uint32_t>& build_idxs,
vectorized::ColumnFilterHelper* mark_column_helper) const
{
const auto batch_size = max_batch_size;
const auto elem_num = visited.size();
@@ -188,7 +188,7 @@ public:
while (count < batch_size && iter_idx < elem_num) {
const auto matched = visited[iter_idx];
- build_idxs[count] = iter_idx;
+ build_idxs.get_element(count) = iter_idx;
if constexpr (JoinOpType == TJoinOp::RIGHT_SEMI_JOIN) {
if constexpr (is_mark_join) {
mark_column_helper->insert_value(matched);
diff --git a/be/src/vec/exprs/vexpr.h b/be/src/vec/exprs/vexpr.h
index ea4575b7e61..3b42b4bf250 100644
--- a/be/src/vec/exprs/vexpr.h
+++ b/be/src/vec/exprs/vexpr.h
@@ -262,6 +262,12 @@ public:
void set_index_unique_id(uint32_t index_unique_id) { _index_unique_id =
index_unique_id; }
uint32_t index_unique_id() const { return _index_unique_id; }
+ virtual void collect_slot_column_ids(std::set<int>& column_ids) const {
+ for (auto child : _children) {
+ child->collect_slot_column_ids(column_ids);
+ }
+ }
+
protected:
/// Simple debug string that provides no expr subclass-specific information
std::string debug_string(const std::string& expr_name) const {
diff --git a/be/src/vec/exprs/vslot_ref.h b/be/src/vec/exprs/vslot_ref.h
index 48f08d1d5e0..145428a1cf3 100644
--- a/be/src/vec/exprs/vslot_ref.h
+++ b/be/src/vec/exprs/vslot_ref.h
@@ -59,6 +59,10 @@ public:
size_t estimate_memory(const size_t rows) override { return 0; }
+ void collect_slot_column_ids(std::set<int>& column_ids) const override {
+ column_ids.insert(_column_id);
+ }
+
private:
int _slot_id;
int _column_id;
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]