This is an automated email from the ASF dual-hosted git repository.

yiguolei pushed a commit to branch branch-2.1
in repository https://gitbox.apache.org/repos/asf/doris.git


The following commit(s) were added to refs/heads/branch-2.1 by this push:
     new 5acd1279a9c [fix](2.1) Fix correctness in branch-2.1 (#39901)
5acd1279a9c is described below

commit 5acd1279a9cda5e1e820b7418562be94b6ca08f2
Author: Gabriel <gabrielleeb...@gmail.com>
AuthorDate: Mon Aug 26 14:12:59 2024 +0800

    [fix](2.1) Fix correctness in branch-2.1 (#39901)
    
    ## Proposed changes
    
    Issue Number: close #xxx
    
    <!--Describe your changes.-->
---
 .../pipeline_x/local_exchange/local_exchanger.cpp  | 22 ++++++++++++++++++----
 1 file changed, 18 insertions(+), 4 deletions(-)

diff --git a/be/src/pipeline/pipeline_x/local_exchange/local_exchanger.cpp 
b/be/src/pipeline/pipeline_x/local_exchange/local_exchanger.cpp
index 194b81c7bee..1353e832e24 100644
--- a/be/src/pipeline/pipeline_x/local_exchange/local_exchanger.cpp
+++ b/be/src/pipeline/pipeline_x/local_exchange/local_exchanger.cpp
@@ -183,8 +183,15 @@ Status ShuffleExchanger::_split_rows(RuntimeState* state, 
const uint32_t* __rest
             uint32_t start = local_state._partition_rows_histogram[i];
             uint32_t size = local_state._partition_rows_histogram[i + 1] - 
start;
             if (size > 0) {
-                _enqueue_data_and_set_ready(i % _num_sources, local_state,
-                                            {new_block_wrapper, {row_idx, 
start, size}});
+                local_state._shared_state->add_mem_usage(
+                        i % _num_sources, 
new_block_wrapper->data_block.allocated_bytes(), false);
+                if (!_enqueue_data_and_set_ready(i % _num_sources, local_state,
+                                                 {new_block_wrapper, {row_idx, 
start, size}})) {
+                    local_state._shared_state->sub_mem_usage(
+                            i % _num_sources, 
new_block_wrapper->data_block.allocated_bytes(),
+                            false);
+                    new_block_wrapper->unref(local_state._shared_state);
+                }
             } else {
                 new_block_wrapper->unref(local_state._shared_state);
             }
@@ -204,8 +211,15 @@ Status ShuffleExchanger::_split_rows(RuntimeState* state, 
const uint32_t* __rest
             uint32_t start = local_state._partition_rows_histogram[it.first];
             uint32_t size = local_state._partition_rows_histogram[it.first + 
1] - start;
             if (size > 0) {
-                _enqueue_data_and_set_ready(it.second, local_state,
-                                            {new_block_wrapper, {row_idx, 
start, size}});
+                local_state._shared_state->add_mem_usage(
+                        it.second, 
new_block_wrapper->data_block.allocated_bytes(), false);
+
+                if (!_enqueue_data_and_set_ready(it.second, local_state,
+                                                 {new_block_wrapper, {row_idx, 
start, size}})) {
+                    local_state._shared_state->sub_mem_usage(
+                            it.second, 
new_block_wrapper->data_block.allocated_bytes(), false);
+                    new_block_wrapper->unref(local_state._shared_state);
+                }
             } else {
                 new_block_wrapper->unref(local_state._shared_state);
             }


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org
For additional commands, e-mail: commits-h...@doris.apache.org

Reply via email to