This is an automated email from the ASF dual-hosted git repository.

gabriellee pushed a commit to branch branch-2.1
in repository https://gitbox.apache.org/repos/asf/doris.git


The following commit(s) were added to refs/heads/branch-2.1 by this push:
     new 82c681595e2 [fix](local exchange) Fix local exchange blocked by a huge 
data block… (#38693)
82c681595e2 is described below

commit 82c681595e2b592b58942d6e332ea5fb2fee4ac1
Author: Gabriel <gabrielleeb...@gmail.com>
AuthorDate: Thu Aug 1 18:04:19 2024 +0800

    [fix](local exchange) Fix local exchange blocked by a huge data block… 
(#38693)
    
    … (#38657)
    
    If a huge block is push into local exchanger, it will be blocked due to
    concurrent problems. This PR use a unique lock to resolve it .
---
 .../local_exchange/local_exchange_sink_operator.h  |   2 +
 .../local_exchange_source_operator.cpp             |   6 +-
 .../local_exchange_source_operator.h               |   2 +
 .../pipeline_x/local_exchange/local_exchanger.cpp  | 111 +++++++++------------
 .../pipeline_x/local_exchange/local_exchanger.h    |  14 +++
 5 files changed, 71 insertions(+), 64 deletions(-)

diff --git 
a/be/src/pipeline/pipeline_x/local_exchange/local_exchange_sink_operator.h 
b/be/src/pipeline/pipeline_x/local_exchange/local_exchange_sink_operator.h
index 99b88747a98..a32ecc21e00 100644
--- a/be/src/pipeline/pipeline_x/local_exchange/local_exchange_sink_operator.h
+++ b/be/src/pipeline/pipeline_x/local_exchange/local_exchange_sink_operator.h
@@ -49,6 +49,8 @@ private:
     friend class BroadcastExchanger;
     friend class PassToOneExchanger;
     friend class AdaptivePassthroughExchanger;
+    template <typename BlockType>
+    friend class Exchanger;
 
     ExchangerBase* _exchanger = nullptr;
 
diff --git 
a/be/src/pipeline/pipeline_x/local_exchange/local_exchange_source_operator.cpp 
b/be/src/pipeline/pipeline_x/local_exchange/local_exchange_source_operator.cpp
index 086a3b551fd..b3a28a6404f 100644
--- 
a/be/src/pipeline/pipeline_x/local_exchange/local_exchange_source_operator.cpp
+++ 
b/be/src/pipeline/pipeline_x/local_exchange/local_exchange_source_operator.cpp
@@ -66,11 +66,13 @@ std::string LocalExchangeSourceLocalState::debug_string(int 
indentation_level) c
     fmt::memory_buffer debug_string_buffer;
     fmt::format_to(debug_string_buffer,
                    "{}, _channel_id: {}, _num_partitions: {}, _num_senders: 
{}, _num_sources: {}, "
-                   "_running_sink_operators: {}, _running_source_operators: 
{}, mem_usage: {}",
+                   "_running_sink_operators: {}, _running_source_operators: 
{}, mem_usage: {}, "
+                   "data queue info: {}",
                    Base::debug_string(indentation_level), _channel_id, 
_exchanger->_num_partitions,
                    _exchanger->_num_senders, _exchanger->_num_sources,
                    _exchanger->_running_sink_operators, 
_exchanger->_running_source_operators,
-                   _shared_state->mem_usage.load());
+                   _shared_state->mem_usage.load(),
+                   _exchanger->data_queue_debug_string(_channel_id));
     size_t i = 0;
     fmt::format_to(debug_string_buffer, ", MemTrackers: ");
     for (auto* mem_tracker : _shared_state->mem_trackers) {
diff --git 
a/be/src/pipeline/pipeline_x/local_exchange/local_exchange_source_operator.h 
b/be/src/pipeline/pipeline_x/local_exchange/local_exchange_source_operator.h
index 7cefc1ca900..193b1c553f9 100644
--- a/be/src/pipeline/pipeline_x/local_exchange/local_exchange_source_operator.h
+++ b/be/src/pipeline/pipeline_x/local_exchange/local_exchange_source_operator.h
@@ -47,6 +47,8 @@ private:
     friend class BroadcastExchanger;
     friend class PassToOneExchanger;
     friend class AdaptivePassthroughExchanger;
+    template <typename BlockType>
+    friend class Exchanger;
 
     ExchangerBase* _exchanger = nullptr;
     int _channel_id;
diff --git a/be/src/pipeline/pipeline_x/local_exchange/local_exchanger.cpp 
b/be/src/pipeline/pipeline_x/local_exchange/local_exchanger.cpp
index 7a044aaa77f..eb3875dcf7c 100644
--- a/be/src/pipeline/pipeline_x/local_exchange/local_exchanger.cpp
+++ b/be/src/pipeline/pipeline_x/local_exchange/local_exchanger.cpp
@@ -24,6 +24,37 @@
 
 namespace doris::pipeline {
 
+template <typename BlockType>
+bool Exchanger<BlockType>::_enqueue_data_and_set_ready(int channel_id,
+                                                       
LocalExchangeSinkLocalState& local_state,
+                                                       BlockType&& block) {
+    std::unique_lock l(_m);
+    if (_data_queue[channel_id].enqueue(std::move(block))) {
+        local_state._shared_state->set_ready_to_read(channel_id);
+        return true;
+    }
+    return false;
+}
+
+template <typename BlockType>
+bool Exchanger<BlockType>::_dequeue_data(LocalExchangeSourceLocalState& 
local_state,
+                                         BlockType& block, bool* eos) {
+    bool all_finished = _running_sink_operators == 0;
+    if (_data_queue[local_state._channel_id].try_dequeue(block)) {
+        return true;
+    } else if (all_finished) {
+        *eos = true;
+    } else {
+        std::unique_lock l(_m);
+        if (_data_queue[local_state._channel_id].try_dequeue(block)) {
+            return true;
+        }
+        COUNTER_UPDATE(local_state._get_block_failed_counter, 1);
+        local_state._dependency->block();
+    }
+    return false;
+}
+
 Status ShuffleExchanger::sink(RuntimeState* state, vectorized::Block* 
in_block, bool eos,
                               LocalExchangeSinkLocalState& local_state) {
     {
@@ -72,17 +103,11 @@ Status ShuffleExchanger::get_block(RuntimeState* state, 
vectorized::Block* block
         return Status::OK();
     };
 
-    bool all_finished = _running_sink_operators == 0;
-    if (_data_queue[local_state._channel_id].try_dequeue(partitioned_block)) {
+    if (_dequeue_data(local_state, partitioned_block, eos)) {
         SCOPED_TIMER(local_state._copy_data_timer);
         mutable_block = 
vectorized::VectorizedUtils::build_mutable_mem_reuse_block(
                 block, partitioned_block.first->data_block);
         RETURN_IF_ERROR(get_data(block));
-    } else if (all_finished) {
-        *eos = true;
-    } else {
-        COUNTER_UPDATE(local_state._get_block_failed_counter, 1);
-        local_state._dependency->block();
     }
     return Status::OK();
 }
@@ -90,7 +115,6 @@ Status ShuffleExchanger::get_block(RuntimeState* state, 
vectorized::Block* block
 Status ShuffleExchanger::_split_rows(RuntimeState* state, const uint32_t* 
__restrict channel_ids,
                                      vectorized::Block* block, bool eos,
                                      LocalExchangeSinkLocalState& local_state) 
{
-    auto& data_queue = _data_queue;
     const auto rows = block->rows();
     auto row_idx = std::make_shared<vectorized::PODArray<uint32_t>>(rows);
     {
@@ -133,9 +157,9 @@ Status ShuffleExchanger::_split_rows(RuntimeState* state, 
const uint32_t* __rest
             if (size > 0) {
                 local_state._shared_state->add_mem_usage(
                         it.second, 
new_block_wrapper->data_block.allocated_bytes(), false);
-                if (data_queue[it.second].enqueue({new_block_wrapper, 
{row_idx, start, size}})) {
-                    local_state._shared_state->set_ready_to_read(it.second);
-                } else {
+
+                if (!_enqueue_data_and_set_ready(it.second, local_state,
+                                                 {new_block_wrapper, {row_idx, 
start, size}})) {
                     local_state._shared_state->sub_mem_usage(
                             it.second, 
new_block_wrapper->data_block.allocated_bytes(), false);
                     new_block_wrapper->unref(local_state._shared_state);
@@ -152,10 +176,8 @@ Status ShuffleExchanger::_split_rows(RuntimeState* state, 
const uint32_t* __rest
             if (size > 0) {
                 local_state._shared_state->add_mem_usage(
                         i % _num_sources, 
new_block_wrapper->data_block.allocated_bytes(), false);
-                if (data_queue[i % _num_sources].enqueue(
-                            {new_block_wrapper, {row_idx, start, size}})) {
-                    local_state._shared_state->set_ready_to_read(i % 
_num_sources);
-                } else {
+                if (!_enqueue_data_and_set_ready(i % _num_sources, local_state,
+                                                 {new_block_wrapper, {row_idx, 
start, size}})) {
                     local_state._shared_state->sub_mem_usage(
                             i % _num_sources, 
new_block_wrapper->data_block.allocated_bytes(),
                             false);
@@ -175,9 +197,8 @@ Status ShuffleExchanger::_split_rows(RuntimeState* state, 
const uint32_t* __rest
             if (size > 0) {
                 local_state._shared_state->add_mem_usage(
                         map[i], 
new_block_wrapper->data_block.allocated_bytes(), false);
-                if (data_queue[map[i]].enqueue({new_block_wrapper, {row_idx, 
start, size}})) {
-                    local_state._shared_state->set_ready_to_read(map[i]);
-                } else {
+                if (!_enqueue_data_and_set_ready(map[i], local_state,
+                                                 {new_block_wrapper, {row_idx, 
start, size}})) {
                     local_state._shared_state->sub_mem_usage(
                             map[i], 
new_block_wrapper->data_block.allocated_bytes(), false);
                     new_block_wrapper->unref(local_state._shared_state);
@@ -201,9 +222,7 @@ Status PassthroughExchanger::sink(RuntimeState* state, 
vectorized::Block* in_blo
     auto channel_id = (local_state._channel_id++) % _num_partitions;
     size_t memory_usage = new_block.allocated_bytes();
     local_state._shared_state->add_mem_usage(channel_id, memory_usage);
-    if (_data_queue[channel_id].enqueue(std::move(new_block))) {
-        local_state._shared_state->set_ready_to_read(channel_id);
-    } else {
+    if (!_enqueue_data_and_set_ready(channel_id, local_state, 
std::move(new_block))) {
         local_state._shared_state->sub_mem_usage(channel_id, memory_usage);
     }
 
@@ -222,19 +241,13 @@ void 
PassthroughExchanger::close(LocalExchangeSourceLocalState& local_state) {
 Status PassthroughExchanger::get_block(RuntimeState* state, vectorized::Block* 
block, bool* eos,
                                        LocalExchangeSourceLocalState& 
local_state) {
     vectorized::Block next_block;
-    bool all_finished = _running_sink_operators == 0;
-    if (_data_queue[local_state._channel_id].try_dequeue(next_block)) {
+    if (_dequeue_data(local_state, next_block, eos)) {
         block->swap(next_block);
         local_state._shared_state->sub_mem_usage(local_state._channel_id, 
block->allocated_bytes());
         if (_free_block_limit == 0 ||
             _free_blocks.size_approx() < _free_block_limit * _num_sources) {
             _free_blocks.enqueue(std::move(next_block));
         }
-    } else if (all_finished) {
-        *eos = true;
-    } else {
-        COUNTER_UPDATE(local_state._get_block_failed_counter, 1);
-        local_state._dependency->block();
     }
     return Status::OK();
 }
@@ -243,9 +256,7 @@ Status PassToOneExchanger::sink(RuntimeState* state, 
vectorized::Block* in_block
                                 LocalExchangeSinkLocalState& local_state) {
     vectorized::Block new_block(in_block->clone_empty());
     new_block.swap(*in_block);
-    if (_data_queue[0].enqueue(std::move(new_block))) {
-        local_state._shared_state->set_ready_to_read(0);
-    }
+    _enqueue_data_and_set_ready(0, local_state, std::move(new_block));
 
     return Status::OK();
 }
@@ -257,14 +268,8 @@ Status PassToOneExchanger::get_block(RuntimeState* state, 
vectorized::Block* blo
         return Status::OK();
     }
     vectorized::Block next_block;
-    bool all_finished = _running_sink_operators == 0;
-    if (_data_queue[0].try_dequeue(next_block)) {
+    if (_dequeue_data(local_state, next_block, eos)) {
         *block = std::move(next_block);
-    } else if (all_finished) {
-        *eos = true;
-    } else {
-        COUNTER_UPDATE(local_state._get_block_failed_counter, 1);
-        local_state._dependency->block();
     }
     return Status::OK();
 }
@@ -274,9 +279,7 @@ Status BroadcastExchanger::sink(RuntimeState* state, 
vectorized::Block* in_block
     for (size_t i = 0; i < _num_partitions; i++) {
         auto mutable_block = 
vectorized::MutableBlock::create_unique(in_block->clone_empty());
         RETURN_IF_ERROR(mutable_block->add_rows(in_block, 0, 
in_block->rows()));
-        if (_data_queue[i].enqueue(mutable_block->to_block())) {
-            local_state._shared_state->set_ready_to_read(i);
-        }
+        _enqueue_data_and_set_ready(i, local_state, mutable_block->to_block());
     }
 
     return Status::OK();
@@ -293,14 +296,8 @@ void 
BroadcastExchanger::close(LocalExchangeSourceLocalState& local_state) {
 Status BroadcastExchanger::get_block(RuntimeState* state, vectorized::Block* 
block, bool* eos,
                                      LocalExchangeSourceLocalState& 
local_state) {
     vectorized::Block next_block;
-    bool all_finished = _running_sink_operators == 0;
-    if (_data_queue[local_state._channel_id].try_dequeue(next_block)) {
+    if (_dequeue_data(local_state, next_block, eos)) {
         *block = std::move(next_block);
-    } else if (all_finished) {
-        *eos = true;
-    } else {
-        COUNTER_UPDATE(local_state._get_block_failed_counter, 1);
-        local_state._dependency->block();
     }
     return Status::OK();
 }
@@ -316,9 +313,8 @@ Status 
AdaptivePassthroughExchanger::_passthrough_sink(RuntimeState* state,
     auto channel_id = (local_state._channel_id++) % _num_partitions;
     size_t memory_usage = new_block.allocated_bytes();
     local_state._shared_state->add_mem_usage(channel_id, memory_usage);
-    if (_data_queue[channel_id].enqueue(std::move(new_block))) {
-        local_state._shared_state->set_ready_to_read(channel_id);
-    } else {
+
+    if (!_enqueue_data_and_set_ready(channel_id, local_state, 
std::move(new_block))) {
         local_state._shared_state->sub_mem_usage(channel_id, memory_usage);
     }
 
@@ -349,7 +345,6 @@ Status 
AdaptivePassthroughExchanger::_split_rows(RuntimeState* state,
                                                  const uint32_t* __restrict 
channel_ids,
                                                  vectorized::Block* block, 
bool eos,
                                                  LocalExchangeSinkLocalState& 
local_state) {
-    auto& data_queue = _data_queue;
     const auto rows = block->rows();
     auto row_idx = std::make_shared<std::vector<uint32_t>>(rows);
     {
@@ -378,9 +373,7 @@ Status 
AdaptivePassthroughExchanger::_split_rows(RuntimeState* state,
 
             size_t memory_usage = new_block.allocated_bytes();
             local_state._shared_state->add_mem_usage(i, memory_usage);
-            if (data_queue[i].enqueue(std::move(new_block))) {
-                local_state._shared_state->set_ready_to_read(i);
-            } else {
+            if (!_enqueue_data_and_set_ready(i, local_state, 
std::move(new_block))) {
                 local_state._shared_state->sub_mem_usage(i, memory_usage);
             }
         }
@@ -404,19 +397,13 @@ Status 
AdaptivePassthroughExchanger::get_block(RuntimeState* state, vectorized::
                                                bool* eos,
                                                LocalExchangeSourceLocalState& 
local_state) {
     vectorized::Block next_block;
-    bool all_finished = _running_sink_operators == 0;
-    if (_data_queue[local_state._channel_id].try_dequeue(next_block)) {
+    if (_dequeue_data(local_state, next_block, eos)) {
         block->swap(next_block);
         if (_free_block_limit == 0 ||
             _free_blocks.size_approx() < _free_block_limit * _num_sources) {
             _free_blocks.enqueue(std::move(next_block));
         }
         local_state._shared_state->sub_mem_usage(local_state._channel_id, 
block->allocated_bytes());
-    } else if (all_finished) {
-        *eos = true;
-    } else {
-        COUNTER_UPDATE(local_state._get_block_failed_counter, 1);
-        local_state._dependency->block();
     }
     return Status::OK();
 }
diff --git a/be/src/pipeline/pipeline_x/local_exchange/local_exchanger.h 
b/be/src/pipeline/pipeline_x/local_exchange/local_exchanger.h
index ee0b5e286de..b1d1d1f2668 100644
--- a/be/src/pipeline/pipeline_x/local_exchange/local_exchanger.h
+++ b/be/src/pipeline/pipeline_x/local_exchange/local_exchanger.h
@@ -54,6 +54,8 @@ public:
 
     virtual DependencySPtr get_local_state_dependency(int _channel_id) { 
return nullptr; }
 
+    virtual std::string data_queue_debug_string(int i) = 0;
+
 protected:
     friend struct LocalExchangeSharedState;
     friend struct ShuffleBlockWrapper;
@@ -114,9 +116,21 @@ public:
             : ExchangerBase(running_sink_operators, num_sources, 
num_partitions, free_block_limit) {
     }
     ~Exchanger() override = default;
+    std::string data_queue_debug_string(int i) override {
+        fmt::memory_buffer debug_string_buffer;
+        fmt::format_to(debug_string_buffer, "Data Queue {}: [size approx = {}, 
eos = {}]",
+                       _data_queue[i].data_queue.size_approx(), 
_data_queue[i].eos);
+        return fmt::to_string(debug_string_buffer);
+    }
 
 protected:
+    bool _enqueue_data_and_set_ready(int channel_id, 
LocalExchangeSinkLocalState& local_state,
+                                     BlockType&& block);
+    bool _dequeue_data(LocalExchangeSourceLocalState& local_state, BlockType& 
block, bool* eos);
     std::vector<BlockQueue<BlockType>> _data_queue;
+
+private:
+    std::mutex _m;
 };
 
 class LocalExchangeSourceLocalState;


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org
For additional commands, e-mail: commits-h...@doris.apache.org

Reply via email to