This is an automated email from the ASF dual-hosted git repository.

morningman pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-doris.git


The following commit(s) were added to refs/heads/master by this push:
     new a91888a  [BUG] fix memory limit failure and optimize memory usage in 
join stage (#5514)
a91888a is described below

commit a91888a68bb15496799c979f2eb87c69e9049f1b
Author: stdpain <34912776+stdp...@users.noreply.github.com>
AuthorDate: Sun Mar 21 11:32:51 2021 +0800

    [BUG] fix memory limit failure and optimize memory usage in join stage 
(#5514)
    
    This patch works well on tpcds-1T query-24
---
 be/src/exec/hash_join_node.cpp        |  5 +++++
 be/src/exec/hash_join_node.h          |  8 ++++++++
 be/src/exec/hash_join_node_ir.cpp     |  6 +++++-
 be/test/exec/buffered_reader_test.cpp | 16 ++++++++--------
 4 files changed, 26 insertions(+), 9 deletions(-)

diff --git a/be/src/exec/hash_join_node.cpp b/be/src/exec/hash_join_node.cpp
index be1ee9a..ce22f3e 100644
--- a/be/src/exec/hash_join_node.cpp
+++ b/be/src/exec/hash_join_node.cpp
@@ -34,6 +34,7 @@ namespace doris {
 HashJoinNode::HashJoinNode(ObjectPool* pool, const TPlanNode& tnode, const 
DescriptorTbl& descs)
         : ExecNode(pool, tnode, descs),
           _join_op(tnode.hash_join_node.join_op),
+          _probe_counter(0),
           _probe_eos(false),
           _process_build_batch_fn(NULL),
           _process_probe_batch_fn(NULL),
@@ -369,6 +370,10 @@ Status HashJoinNode::open(RuntimeState* state) {
 Status HashJoinNode::get_next(RuntimeState* state, RowBatch* out_batch, bool* 
eos) {
     RETURN_IF_ERROR(exec_debug_action(TExecNodePhase::GETNEXT));
     RETURN_IF_CANCELLED(state);
+    // In most cases, no additional memory overhead will be applied for at 
this stage, 
+    // but if the expression calculation in this node needs to apply for 
additional memory, 
+    // it may cause the memory to exceed the limit.
+    RETURN_IF_LIMIT_EXCEEDED(state, "Hash join, while execute get_next.");
     SCOPED_TIMER(_runtime_profile->total_time_counter());
 
     if (reached_limit()) {
diff --git a/be/src/exec/hash_join_node.h b/be/src/exec/hash_join_node.h
index b8645ce..63e0505 100644
--- a/be/src/exec/hash_join_node.h
+++ b/be/src/exec/hash_join_node.h
@@ -105,6 +105,7 @@ private:
     // is responsible for.
     boost::scoped_ptr<RowBatch> _probe_batch;
     int _probe_batch_pos; // current scan pos in _probe_batch
+    int _probe_counter;
     bool _probe_eos;      // if true, probe child has no more rows to process
     TupleRow* _current_probe_row;
 
@@ -174,6 +175,13 @@ private:
     // This is only used for debugging and outputting the left child rows 
before
     // doing the join.
     std::string get_probe_row_output_string(TupleRow* probe_row);
+
+    // RELEASE_CONTEXT_COUNTER should be power of 2
+    // GCC will optimize the modulo operation to &(release_context_counter - 1)
+    // build_expr_context and probe_expr_context will free local alloc after 
this probe calculations
+    static constexpr int RELEASE_CONTEXT_COUNTER = 1 << 5;
+    static_assert((RELEASE_CONTEXT_COUNTER & (RELEASE_CONTEXT_COUNTER - 1)) == 
0,
+                  "should be power of 2");
 };
 
 } // namespace doris
diff --git a/be/src/exec/hash_join_node_ir.cpp 
b/be/src/exec/hash_join_node_ir.cpp
index d301201..3bd7f42 100644
--- a/be/src/exec/hash_join_node_ir.cpp
+++ b/be/src/exec/hash_join_node_ir.cpp
@@ -17,6 +17,7 @@
 
 #include "exec/hash_join_node.h"
 #include "exec/hash_table.hpp"
+#include "exprs/expr_context.h"
 #include "runtime/row_batch.h"
 
 namespace doris {
@@ -118,7 +119,10 @@ int HashJoinNode::process_probe_batch(RowBatch* out_batch, 
RowBatch* probe_batch
             if (UNLIKELY(_probe_batch_pos == probe_rows)) {
                 goto end;
             }
-
+            if (++_probe_counter % RELEASE_CONTEXT_COUNTER == 0) {
+                ExprContext::free_local_allocations(_probe_expr_ctxs);
+                ExprContext::free_local_allocations(_build_expr_ctxs);
+            }
             _current_probe_row = probe_batch->get_row(_probe_batch_pos++);
             _hash_tbl_iterator = _hash_tbl->find(_current_probe_row);
             _matched_probe = false;
diff --git a/be/test/exec/buffered_reader_test.cpp 
b/be/test/exec/buffered_reader_test.cpp
index aee4975..cb89688 100644
--- a/be/test/exec/buffered_reader_test.cpp
+++ b/be/test/exec/buffered_reader_test.cpp
@@ -34,9 +34,9 @@ protected:
 
 TEST_F(BufferedReaderTest, normal_use) {
     // buffered_reader_test_file 950 bytes
-    LocalFileReader file_reader(
+    auto file_reader = new LocalFileReader(
             
"./be/test/exec/test_data/buffered_reader/buffered_reader_test_file", 0);
-    BufferedReader reader(&file_reader, 1024);
+    BufferedReader reader(file_reader, 1024);
     auto st = reader.open();
     ASSERT_TRUE(st.ok());
     uint8_t buf[1024];
@@ -51,9 +51,9 @@ TEST_F(BufferedReaderTest, normal_use) {
 
 TEST_F(BufferedReaderTest, test_validity) {
     // buffered_reader_test_file.txt 45 bytes
-    LocalFileReader file_reader(
+    auto file_reader = new LocalFileReader(
             
"./be/test/exec/test_data/buffered_reader/buffered_reader_test_file.txt", 0);
-    BufferedReader reader(&file_reader, 64);
+    BufferedReader reader(file_reader, 64);
     auto st = reader.open();
     ASSERT_TRUE(st.ok());
     uint8_t buf[10];
@@ -92,9 +92,9 @@ TEST_F(BufferedReaderTest, test_validity) {
 
 TEST_F(BufferedReaderTest, test_seek) {
     // buffered_reader_test_file.txt 45 bytes
-    LocalFileReader file_reader(
+    auto file_reader = new LocalFileReader(
             
"./be/test/exec/test_data/buffered_reader/buffered_reader_test_file.txt", 0);
-    BufferedReader reader(&file_reader, 64);
+    BufferedReader reader(file_reader, 64);
     auto st = reader.open();
     ASSERT_TRUE(st.ok());
     uint8_t buf[10];
@@ -142,9 +142,9 @@ TEST_F(BufferedReaderTest, test_seek) {
 
 TEST_F(BufferedReaderTest, test_miss) {
     // buffered_reader_test_file.txt 45 bytes
-    LocalFileReader file_reader(
+    auto file_reader = new LocalFileReader(
             
"./be/test/exec/test_data/buffered_reader/buffered_reader_test_file.txt", 0);
-    BufferedReader reader(&file_reader, 64);
+    BufferedReader reader(file_reader, 64);
     auto st = reader.open();
     ASSERT_TRUE(st.ok());
     uint8_t buf[128];

---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org
For additional commands, e-mail: commits-h...@doris.apache.org

Reply via email to