This is an automated email from the ASF dual-hosted git repository. morningman pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/incubator-doris.git
The following commit(s) were added to refs/heads/master by this push: new a91888a [BUG] fix memory limit failure and optimize memory usage in join stage (#5514) a91888a is described below commit a91888a68bb15496799c979f2eb87c69e9049f1b Author: stdpain <34912776+stdp...@users.noreply.github.com> AuthorDate: Sun Mar 21 11:32:51 2021 +0800 [BUG] fix memory limit failure and optimize memory usage in join stage (#5514) This patch works well on tpcds-1T query-24 --- be/src/exec/hash_join_node.cpp | 5 +++++ be/src/exec/hash_join_node.h | 8 ++++++++ be/src/exec/hash_join_node_ir.cpp | 6 +++++- be/test/exec/buffered_reader_test.cpp | 16 ++++++++-------- 4 files changed, 26 insertions(+), 9 deletions(-) diff --git a/be/src/exec/hash_join_node.cpp b/be/src/exec/hash_join_node.cpp index be1ee9a..ce22f3e 100644 --- a/be/src/exec/hash_join_node.cpp +++ b/be/src/exec/hash_join_node.cpp @@ -34,6 +34,7 @@ namespace doris { HashJoinNode::HashJoinNode(ObjectPool* pool, const TPlanNode& tnode, const DescriptorTbl& descs) : ExecNode(pool, tnode, descs), _join_op(tnode.hash_join_node.join_op), + _probe_counter(0), _probe_eos(false), _process_build_batch_fn(NULL), _process_probe_batch_fn(NULL), @@ -369,6 +370,10 @@ Status HashJoinNode::open(RuntimeState* state) { Status HashJoinNode::get_next(RuntimeState* state, RowBatch* out_batch, bool* eos) { RETURN_IF_ERROR(exec_debug_action(TExecNodePhase::GETNEXT)); RETURN_IF_CANCELLED(state); + // In most cases, no additional memory overhead will be applied for at this stage, + // but if the expression calculation in this node needs to apply for additional memory, + // it may cause the memory to exceed the limit. + RETURN_IF_LIMIT_EXCEEDED(state, "Hash join, while execute get_next."); SCOPED_TIMER(_runtime_profile->total_time_counter()); if (reached_limit()) { diff --git a/be/src/exec/hash_join_node.h b/be/src/exec/hash_join_node.h index b8645ce..63e0505 100644 --- a/be/src/exec/hash_join_node.h +++ b/be/src/exec/hash_join_node.h @@ -105,6 +105,7 @@ private: // is responsible for. boost::scoped_ptr<RowBatch> _probe_batch; int _probe_batch_pos; // current scan pos in _probe_batch + int _probe_counter; bool _probe_eos; // if true, probe child has no more rows to process TupleRow* _current_probe_row; @@ -174,6 +175,13 @@ private: // This is only used for debugging and outputting the left child rows before // doing the join. std::string get_probe_row_output_string(TupleRow* probe_row); + + // RELEASE_CONTEXT_COUNTER should be power of 2 + // GCC will optimize the modulo operation to &(release_context_counter - 1) + // build_expr_context and probe_expr_context will free local alloc after this probe calculations + static constexpr int RELEASE_CONTEXT_COUNTER = 1 << 5; + static_assert((RELEASE_CONTEXT_COUNTER & (RELEASE_CONTEXT_COUNTER - 1)) == 0, + "should be power of 2"); }; } // namespace doris diff --git a/be/src/exec/hash_join_node_ir.cpp b/be/src/exec/hash_join_node_ir.cpp index d301201..3bd7f42 100644 --- a/be/src/exec/hash_join_node_ir.cpp +++ b/be/src/exec/hash_join_node_ir.cpp @@ -17,6 +17,7 @@ #include "exec/hash_join_node.h" #include "exec/hash_table.hpp" +#include "exprs/expr_context.h" #include "runtime/row_batch.h" namespace doris { @@ -118,7 +119,10 @@ int HashJoinNode::process_probe_batch(RowBatch* out_batch, RowBatch* probe_batch if (UNLIKELY(_probe_batch_pos == probe_rows)) { goto end; } - + if (++_probe_counter % RELEASE_CONTEXT_COUNTER == 0) { + ExprContext::free_local_allocations(_probe_expr_ctxs); + ExprContext::free_local_allocations(_build_expr_ctxs); + } _current_probe_row = probe_batch->get_row(_probe_batch_pos++); _hash_tbl_iterator = _hash_tbl->find(_current_probe_row); _matched_probe = false; diff --git a/be/test/exec/buffered_reader_test.cpp b/be/test/exec/buffered_reader_test.cpp index aee4975..cb89688 100644 --- a/be/test/exec/buffered_reader_test.cpp +++ b/be/test/exec/buffered_reader_test.cpp @@ -34,9 +34,9 @@ protected: TEST_F(BufferedReaderTest, normal_use) { // buffered_reader_test_file 950 bytes - LocalFileReader file_reader( + auto file_reader = new LocalFileReader( "./be/test/exec/test_data/buffered_reader/buffered_reader_test_file", 0); - BufferedReader reader(&file_reader, 1024); + BufferedReader reader(file_reader, 1024); auto st = reader.open(); ASSERT_TRUE(st.ok()); uint8_t buf[1024]; @@ -51,9 +51,9 @@ TEST_F(BufferedReaderTest, normal_use) { TEST_F(BufferedReaderTest, test_validity) { // buffered_reader_test_file.txt 45 bytes - LocalFileReader file_reader( + auto file_reader = new LocalFileReader( "./be/test/exec/test_data/buffered_reader/buffered_reader_test_file.txt", 0); - BufferedReader reader(&file_reader, 64); + BufferedReader reader(file_reader, 64); auto st = reader.open(); ASSERT_TRUE(st.ok()); uint8_t buf[10]; @@ -92,9 +92,9 @@ TEST_F(BufferedReaderTest, test_validity) { TEST_F(BufferedReaderTest, test_seek) { // buffered_reader_test_file.txt 45 bytes - LocalFileReader file_reader( + auto file_reader = new LocalFileReader( "./be/test/exec/test_data/buffered_reader/buffered_reader_test_file.txt", 0); - BufferedReader reader(&file_reader, 64); + BufferedReader reader(file_reader, 64); auto st = reader.open(); ASSERT_TRUE(st.ok()); uint8_t buf[10]; @@ -142,9 +142,9 @@ TEST_F(BufferedReaderTest, test_seek) { TEST_F(BufferedReaderTest, test_miss) { // buffered_reader_test_file.txt 45 bytes - LocalFileReader file_reader( + auto file_reader = new LocalFileReader( "./be/test/exec/test_data/buffered_reader/buffered_reader_test_file.txt", 0); - BufferedReader reader(&file_reader, 64); + BufferedReader reader(file_reader, 64); auto st = reader.open(); ASSERT_TRUE(st.ok()); uint8_t buf[128]; --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org For additional commands, e-mail: commits-h...@doris.apache.org