This is an automated email from the ASF dual-hosted git repository.
morningman pushed a commit to branch branch-1.2-lts
in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/branch-1.2-lts by this push:
new 3310c0d3b6 [cherry-pick](memory) Fix right join overflow and high
frequency load slow memtracker (#16246)
3310c0d3b6 is described below
commit 3310c0d3b6872f3039851f0d4a6cabcbe45842eb
Author: Xinyi Zou <[email protected]>
AuthorDate: Thu Feb 2 09:54:09 2023 +0800
[cherry-pick](memory) Fix right join overflow and high frequency load slow
memtracker (#16246)
cherry-pick
#15440
#16244
---
be/src/runtime/thread_context.h | 6 ------
be/src/vec/exec/join/join_op.h | 4 ++--
be/src/vec/exec/join/process_hash_table_probe.h | 1 +
.../vec/exec/join/process_hash_table_probe_impl.h | 23 +++++++++++++++++++---
be/src/vec/exec/join/vhash_join_node.h | 3 +++
5 files changed, 26 insertions(+), 11 deletions(-)
diff --git a/be/src/runtime/thread_context.h b/be/src/runtime/thread_context.h
index 8b7116aabc..038ed3e264 100644
--- a/be/src/runtime/thread_context.h
+++ b/be/src/runtime/thread_context.h
@@ -343,8 +343,6 @@ private:
do { \
if (doris::thread_context_ptr.init) { \
doris::thread_context()->thread_mem_tracker_mgr->consume(size); \
- } else { \
- doris::ThreadMemTrackerMgr::consume_no_attach(size); \
} \
} while (0)
// NOTE, The LOG cannot be printed in the mem hook. If the LOG statement
triggers the mem hook LOG,
@@ -359,16 +357,12 @@ private:
} else {
\
doris::thread_context()->thread_mem_tracker_mgr->consume(size); \
}
\
- } else {
\
- doris::ThreadMemTrackerMgr::consume_no_attach(size);
\
}
\
} while (0)
#define RELEASE_MEM_TRACKER(size) \
do { \
if (doris::thread_context_ptr.init) { \
doris::thread_context()->thread_mem_tracker_mgr->consume(-size); \
- } else { \
- doris::ThreadMemTrackerMgr::consume_no_attach(-size); \
} \
} while (0)
#else
diff --git a/be/src/vec/exec/join/join_op.h b/be/src/vec/exec/join/join_op.h
index c760e6da9a..78d03b167d 100644
--- a/be/src/vec/exec/join/join_op.h
+++ b/be/src/vec/exec/join/join_op.h
@@ -92,6 +92,8 @@ template <typename RowRefListType>
class ForwardIterator {
public:
using RowRefType = typename RowRefListType::RowRefType;
+ ForwardIterator() : root(nullptr), first(false), batch(nullptr),
position(0) {}
+
ForwardIterator(RowRefListType* begin)
: root(begin), first(true), batch(root->next), position(0) {}
@@ -136,8 +138,6 @@ private:
bool first;
Batch<RowRefType>* batch;
size_t position;
-
- ForwardIterator() : root(nullptr), first(false), batch(nullptr),
position(0) {}
};
struct RowRefList : RowRef {
diff --git a/be/src/vec/exec/join/process_hash_table_probe.h
b/be/src/vec/exec/join/process_hash_table_probe.h
index 8c1ec02b1c..5d8c8097a7 100644
--- a/be/src/vec/exec/join/process_hash_table_probe.h
+++ b/be/src/vec/exec/join/process_hash_table_probe.h
@@ -19,6 +19,7 @@
#include <vector>
+#include "join_op.h"
#include "vec/columns/column.h"
#include "vec/columns/columns_number.h"
#include "vec/common/arena.h"
diff --git a/be/src/vec/exec/join/process_hash_table_probe_impl.h
b/be/src/vec/exec/join/process_hash_table_probe_impl.h
index 54319f782c..31e621d1c1 100644
--- a/be/src/vec/exec/join/process_hash_table_probe_impl.h
+++ b/be/src/vec/exec/join/process_hash_table_probe_impl.h
@@ -657,6 +657,7 @@ Status
ProcessHashTableProbe<JoinOpType>::process_data_in_hashtable(HashTableTyp
auto& iter = hash_table_ctx.iter;
auto block_size = 0;
+ auto& visited_iter = _join_node->_outer_join_pull_visited_iter;
auto insert_from_hash_table = [&](uint8_t offset, uint32_t row_num) {
block_size++;
@@ -666,6 +667,16 @@ Status
ProcessHashTableProbe<JoinOpType>::process_data_in_hashtable(HashTableTyp
}
};
+ if (visited_iter.ok()) {
+ DCHECK((std::is_same_v<Mapped, RowRefListWithFlag>));
+ for (; visited_iter.ok() && block_size < _batch_size;
++visited_iter) {
+ insert_from_hash_table(visited_iter->block_offset,
visited_iter->row_num);
+ }
+ if (!visited_iter.ok()) {
+ ++iter;
+ }
+ }
+
for (; iter != hash_table_ctx.hash_table.end() && block_size <
_batch_size; ++iter) {
auto& mapped = iter->get_second();
if constexpr (std::is_same_v<Mapped, RowRefListWithFlag>) {
@@ -676,9 +687,15 @@ Status
ProcessHashTableProbe<JoinOpType>::process_data_in_hashtable(HashTableTyp
}
}
} else {
- for (auto it = mapped.begin(); it.ok(); ++it) {
- if constexpr (JoinOpType != TJoinOp::RIGHT_SEMI_JOIN) {
- insert_from_hash_table(it->block_offset,
it->row_num);
+ if constexpr (JoinOpType != TJoinOp::RIGHT_SEMI_JOIN) {
+ visited_iter = mapped.begin();
+ for (; visited_iter.ok() && block_size < _batch_size;
++visited_iter) {
+ insert_from_hash_table(visited_iter->block_offset,
+ visited_iter->row_num);
+ }
+ if (visited_iter.ok()) {
+ // block_size >= _batch_size, quit for loop
+ break;
}
}
}
diff --git a/be/src/vec/exec/join/vhash_join_node.h
b/be/src/vec/exec/join/vhash_join_node.h
index a41d3c2f7b..f6d6982128 100644
--- a/be/src/vec/exec/join/vhash_join_node.h
+++ b/be/src/vec/exec/join/vhash_join_node.h
@@ -262,6 +262,9 @@ private:
std::unique_ptr<HashTableCtxVariants> _process_hashtable_ctx_variants;
+ // for full/right outer join
+ ForwardIterator<RowRefListWithFlag> _outer_join_pull_visited_iter;
+
std::shared_ptr<std::vector<Block>> _build_blocks;
Block _probe_block;
ColumnRawPtrs _probe_columns;
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]