This is an automated email from the ASF dual-hosted git repository. yiguolei pushed a commit to branch branch-2.1 in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/branch-2.1 by this push: new 5acd1279a9c [fix](2.1) Fix correctness in branch-2.1 (#39901) 5acd1279a9c is described below commit 5acd1279a9cda5e1e820b7418562be94b6ca08f2 Author: Gabriel <gabrielleeb...@gmail.com> AuthorDate: Mon Aug 26 14:12:59 2024 +0800 [fix](2.1) Fix correctness in branch-2.1 (#39901) ## Proposed changes Issue Number: close #xxx <!--Describe your changes.--> --- .../pipeline_x/local_exchange/local_exchanger.cpp | 22 ++++++++++++++++++---- 1 file changed, 18 insertions(+), 4 deletions(-) diff --git a/be/src/pipeline/pipeline_x/local_exchange/local_exchanger.cpp b/be/src/pipeline/pipeline_x/local_exchange/local_exchanger.cpp index 194b81c7bee..1353e832e24 100644 --- a/be/src/pipeline/pipeline_x/local_exchange/local_exchanger.cpp +++ b/be/src/pipeline/pipeline_x/local_exchange/local_exchanger.cpp @@ -183,8 +183,15 @@ Status ShuffleExchanger::_split_rows(RuntimeState* state, const uint32_t* __rest uint32_t start = local_state._partition_rows_histogram[i]; uint32_t size = local_state._partition_rows_histogram[i + 1] - start; if (size > 0) { - _enqueue_data_and_set_ready(i % _num_sources, local_state, - {new_block_wrapper, {row_idx, start, size}}); + local_state._shared_state->add_mem_usage( + i % _num_sources, new_block_wrapper->data_block.allocated_bytes(), false); + if (!_enqueue_data_and_set_ready(i % _num_sources, local_state, + {new_block_wrapper, {row_idx, start, size}})) { + local_state._shared_state->sub_mem_usage( + i % _num_sources, new_block_wrapper->data_block.allocated_bytes(), + false); + new_block_wrapper->unref(local_state._shared_state); + } } else { new_block_wrapper->unref(local_state._shared_state); } @@ -204,8 +211,15 @@ Status ShuffleExchanger::_split_rows(RuntimeState* state, const uint32_t* __rest uint32_t start = local_state._partition_rows_histogram[it.first]; uint32_t size = local_state._partition_rows_histogram[it.first + 1] - start; if (size > 0) { - _enqueue_data_and_set_ready(it.second, local_state, - {new_block_wrapper, {row_idx, start, size}}); + local_state._shared_state->add_mem_usage( + it.second, new_block_wrapper->data_block.allocated_bytes(), false); + + if (!_enqueue_data_and_set_ready(it.second, local_state, + {new_block_wrapper, {row_idx, start, size}})) { + local_state._shared_state->sub_mem_usage( + it.second, new_block_wrapper->data_block.allocated_bytes(), false); + new_block_wrapper->unref(local_state._shared_state); + } } else { new_block_wrapper->unref(local_state._shared_state); } --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org For additional commands, e-mail: commits-h...@doris.apache.org