This is an automated email from the ASF dual-hosted git repository.

morningman pushed a commit to branch branch-1.2-lts
in repository https://gitbox.apache.org/repos/asf/doris.git


The following commit(s) were added to refs/heads/branch-1.2-lts by this push:
     new 3310c0d3b6 [cherry-pick](memory) Fix right join overflow and high 
frequency load slow memtracker (#16246)
3310c0d3b6 is described below

commit 3310c0d3b6872f3039851f0d4a6cabcbe45842eb
Author: Xinyi Zou <[email protected]>
AuthorDate: Thu Feb 2 09:54:09 2023 +0800

    [cherry-pick](memory) Fix right join overflow and high frequency load slow 
memtracker (#16246)
    
    cherry-pick
    #15440
    #16244
---
 be/src/runtime/thread_context.h                    |  6 ------
 be/src/vec/exec/join/join_op.h                     |  4 ++--
 be/src/vec/exec/join/process_hash_table_probe.h    |  1 +
 .../vec/exec/join/process_hash_table_probe_impl.h  | 23 +++++++++++++++++++---
 be/src/vec/exec/join/vhash_join_node.h             |  3 +++
 5 files changed, 26 insertions(+), 11 deletions(-)

diff --git a/be/src/runtime/thread_context.h b/be/src/runtime/thread_context.h
index 8b7116aabc..038ed3e264 100644
--- a/be/src/runtime/thread_context.h
+++ b/be/src/runtime/thread_context.h
@@ -343,8 +343,6 @@ private:
     do {                                                                    \
         if (doris::thread_context_ptr.init) {                               \
             doris::thread_context()->thread_mem_tracker_mgr->consume(size); \
-        } else {                                                            \
-            doris::ThreadMemTrackerMgr::consume_no_attach(size);            \
         }                                                                   \
     } while (0)
 // NOTE, The LOG cannot be printed in the mem hook. If the LOG statement 
triggers the mem hook LOG,
@@ -359,16 +357,12 @@ private:
             } else {                                                           
            \
                 
doris::thread_context()->thread_mem_tracker_mgr->consume(size);            \
             }                                                                  
            \
-        } else {                                                               
            \
-            doris::ThreadMemTrackerMgr::consume_no_attach(size);               
            \
         }                                                                      
            \
     } while (0)
 #define RELEASE_MEM_TRACKER(size)                                            \
     do {                                                                     \
         if (doris::thread_context_ptr.init) {                                \
             doris::thread_context()->thread_mem_tracker_mgr->consume(-size); \
-        } else {                                                             \
-            doris::ThreadMemTrackerMgr::consume_no_attach(-size);            \
         }                                                                    \
     } while (0)
 #else
diff --git a/be/src/vec/exec/join/join_op.h b/be/src/vec/exec/join/join_op.h
index c760e6da9a..78d03b167d 100644
--- a/be/src/vec/exec/join/join_op.h
+++ b/be/src/vec/exec/join/join_op.h
@@ -92,6 +92,8 @@ template <typename RowRefListType>
 class ForwardIterator {
 public:
     using RowRefType = typename RowRefListType::RowRefType;
+    ForwardIterator() : root(nullptr), first(false), batch(nullptr), 
position(0) {}
+
     ForwardIterator(RowRefListType* begin)
             : root(begin), first(true), batch(root->next), position(0) {}
 
@@ -136,8 +138,6 @@ private:
     bool first;
     Batch<RowRefType>* batch;
     size_t position;
-
-    ForwardIterator() : root(nullptr), first(false), batch(nullptr), 
position(0) {}
 };
 
 struct RowRefList : RowRef {
diff --git a/be/src/vec/exec/join/process_hash_table_probe.h 
b/be/src/vec/exec/join/process_hash_table_probe.h
index 8c1ec02b1c..5d8c8097a7 100644
--- a/be/src/vec/exec/join/process_hash_table_probe.h
+++ b/be/src/vec/exec/join/process_hash_table_probe.h
@@ -19,6 +19,7 @@
 
 #include <vector>
 
+#include "join_op.h"
 #include "vec/columns/column.h"
 #include "vec/columns/columns_number.h"
 #include "vec/common/arena.h"
diff --git a/be/src/vec/exec/join/process_hash_table_probe_impl.h 
b/be/src/vec/exec/join/process_hash_table_probe_impl.h
index 54319f782c..31e621d1c1 100644
--- a/be/src/vec/exec/join/process_hash_table_probe_impl.h
+++ b/be/src/vec/exec/join/process_hash_table_probe_impl.h
@@ -657,6 +657,7 @@ Status 
ProcessHashTableProbe<JoinOpType>::process_data_in_hashtable(HashTableTyp
 
         auto& iter = hash_table_ctx.iter;
         auto block_size = 0;
+        auto& visited_iter = _join_node->_outer_join_pull_visited_iter;
 
         auto insert_from_hash_table = [&](uint8_t offset, uint32_t row_num) {
             block_size++;
@@ -666,6 +667,16 @@ Status 
ProcessHashTableProbe<JoinOpType>::process_data_in_hashtable(HashTableTyp
             }
         };
 
+        if (visited_iter.ok()) {
+            DCHECK((std::is_same_v<Mapped, RowRefListWithFlag>));
+            for (; visited_iter.ok() && block_size < _batch_size; 
++visited_iter) {
+                insert_from_hash_table(visited_iter->block_offset, 
visited_iter->row_num);
+            }
+            if (!visited_iter.ok()) {
+                ++iter;
+            }
+        }
+
         for (; iter != hash_table_ctx.hash_table.end() && block_size < 
_batch_size; ++iter) {
             auto& mapped = iter->get_second();
             if constexpr (std::is_same_v<Mapped, RowRefListWithFlag>) {
@@ -676,9 +687,15 @@ Status 
ProcessHashTableProbe<JoinOpType>::process_data_in_hashtable(HashTableTyp
                         }
                     }
                 } else {
-                    for (auto it = mapped.begin(); it.ok(); ++it) {
-                        if constexpr (JoinOpType != TJoinOp::RIGHT_SEMI_JOIN) {
-                            insert_from_hash_table(it->block_offset, 
it->row_num);
+                    if constexpr (JoinOpType != TJoinOp::RIGHT_SEMI_JOIN) {
+                        visited_iter = mapped.begin();
+                        for (; visited_iter.ok() && block_size < _batch_size; 
++visited_iter) {
+                            insert_from_hash_table(visited_iter->block_offset,
+                                                   visited_iter->row_num);
+                        }
+                        if (visited_iter.ok()) {
+                            // block_size >= _batch_size, quit for loop
+                            break;
                         }
                     }
                 }
diff --git a/be/src/vec/exec/join/vhash_join_node.h 
b/be/src/vec/exec/join/vhash_join_node.h
index a41d3c2f7b..f6d6982128 100644
--- a/be/src/vec/exec/join/vhash_join_node.h
+++ b/be/src/vec/exec/join/vhash_join_node.h
@@ -262,6 +262,9 @@ private:
 
     std::unique_ptr<HashTableCtxVariants> _process_hashtable_ctx_variants;
 
+    // for full/right outer join
+    ForwardIterator<RowRefListWithFlag> _outer_join_pull_visited_iter;
+
     std::shared_ptr<std::vector<Block>> _build_blocks;
     Block _probe_block;
     ColumnRawPtrs _probe_columns;


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to