This is an automated email from the ASF dual-hosted git repository.

gabriellee pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/doris.git


The following commit(s) were added to refs/heads/master by this push:
     new bb1a9c98d42 [fix](local exchange) Fix local exchange blocked by a huge 
data block (#38657)
bb1a9c98d42 is described below

commit bb1a9c98d42c224fefb1140158a7c52ec651c1c3
Author: Gabriel <gabrielleeb...@gmail.com>
AuthorDate: Thu Aug 1 16:05:53 2024 +0800

    [fix](local exchange) Fix local exchange blocked by a huge data block 
(#38657)
    
    If a huge block is push into local exchanger, it will be blocked due to
    concurrent problems. This PR use a unique lock to resolve it .
---
 .../local_exchange/local_exchange_sink_operator.h  |   2 +
 .../local_exchange_source_operator.cpp             |   6 +-
 .../local_exchange_source_operator.h               |   2 +
 be/src/pipeline/local_exchange/local_exchanger.cpp | 115 +++++++++------------
 be/src/pipeline/local_exchange/local_exchanger.h   |  14 +++
 5 files changed, 72 insertions(+), 67 deletions(-)

diff --git a/be/src/pipeline/local_exchange/local_exchange_sink_operator.h 
b/be/src/pipeline/local_exchange/local_exchange_sink_operator.h
index 0ff1df26001..faa48d209f4 100644
--- a/be/src/pipeline/local_exchange/local_exchange_sink_operator.h
+++ b/be/src/pipeline/local_exchange/local_exchange_sink_operator.h
@@ -56,6 +56,8 @@ private:
     friend class PassToOneExchanger;
     friend class LocalMergeSortExchanger;
     friend class AdaptivePassthroughExchanger;
+    template <typename BlockType>
+    friend class Exchanger;
 
     ExchangerBase* _exchanger = nullptr;
 
diff --git a/be/src/pipeline/local_exchange/local_exchange_source_operator.cpp 
b/be/src/pipeline/local_exchange/local_exchange_source_operator.cpp
index 56f0a157cde..6b0cca2d71a 100644
--- a/be/src/pipeline/local_exchange/local_exchange_source_operator.cpp
+++ b/be/src/pipeline/local_exchange/local_exchange_source_operator.cpp
@@ -75,11 +75,13 @@ std::string LocalExchangeSourceLocalState::debug_string(int 
indentation_level) c
     fmt::memory_buffer debug_string_buffer;
     fmt::format_to(debug_string_buffer,
                    "{}, _channel_id: {}, _num_partitions: {}, _num_senders: 
{}, _num_sources: {}, "
-                   "_running_sink_operators: {}, _running_source_operators: 
{}, mem_usage: {}",
+                   "_running_sink_operators: {}, _running_source_operators: 
{}, mem_usage: {}, "
+                   "data queue info: {}",
                    Base::debug_string(indentation_level), _channel_id, 
_exchanger->_num_partitions,
                    _exchanger->_num_senders, _exchanger->_num_sources,
                    _exchanger->_running_sink_operators, 
_exchanger->_running_source_operators,
-                   _shared_state->mem_usage.load());
+                   _shared_state->mem_usage.load(),
+                   _exchanger->data_queue_debug_string(_channel_id));
     size_t i = 0;
     fmt::format_to(debug_string_buffer, ", MemTrackers: ");
     for (auto* mem_tracker : _shared_state->mem_trackers) {
diff --git a/be/src/pipeline/local_exchange/local_exchange_source_operator.h 
b/be/src/pipeline/local_exchange/local_exchange_source_operator.h
index f9fa4cfa4ed..d2f68d4ebac 100644
--- a/be/src/pipeline/local_exchange/local_exchange_source_operator.h
+++ b/be/src/pipeline/local_exchange/local_exchange_source_operator.h
@@ -51,6 +51,8 @@ private:
     friend class PassToOneExchanger;
     friend class LocalMergeSortExchanger;
     friend class AdaptivePassthroughExchanger;
+    template <typename BlockType>
+    friend class Exchanger;
 
     ExchangerBase* _exchanger = nullptr;
     int _channel_id;
diff --git a/be/src/pipeline/local_exchange/local_exchanger.cpp 
b/be/src/pipeline/local_exchange/local_exchanger.cpp
index 27b7fc7e7fd..647ddcfba2d 100644
--- a/be/src/pipeline/local_exchange/local_exchanger.cpp
+++ b/be/src/pipeline/local_exchange/local_exchanger.cpp
@@ -26,6 +26,37 @@
 
 namespace doris::pipeline {
 
+template <typename BlockType>
+bool Exchanger<BlockType>::_enqueue_data_and_set_ready(int channel_id,
+                                                       
LocalExchangeSinkLocalState& local_state,
+                                                       BlockType&& block) {
+    std::unique_lock l(_m);
+    if (_data_queue[channel_id].enqueue(std::move(block))) {
+        local_state._shared_state->set_ready_to_read(channel_id);
+        return true;
+    }
+    return false;
+}
+
+template <typename BlockType>
+bool Exchanger<BlockType>::_dequeue_data(LocalExchangeSourceLocalState& 
local_state,
+                                         BlockType& block, bool* eos) {
+    bool all_finished = _running_sink_operators == 0;
+    if (_data_queue[local_state._channel_id].try_dequeue(block)) {
+        return true;
+    } else if (all_finished) {
+        *eos = true;
+    } else {
+        std::unique_lock l(_m);
+        if (_data_queue[local_state._channel_id].try_dequeue(block)) {
+            return true;
+        }
+        COUNTER_UPDATE(local_state._get_block_failed_counter, 1);
+        local_state._dependency->block();
+    }
+    return false;
+}
+
 Status ShuffleExchanger::sink(RuntimeState* state, vectorized::Block* 
in_block, bool eos,
                               LocalExchangeSinkLocalState& local_state) {
     {
@@ -74,17 +105,11 @@ Status ShuffleExchanger::get_block(RuntimeState* state, 
vectorized::Block* block
         return Status::OK();
     };
 
-    bool all_finished = _running_sink_operators == 0;
-    if (_data_queue[local_state._channel_id].try_dequeue(partitioned_block)) {
+    if (_dequeue_data(local_state, partitioned_block, eos)) {
         SCOPED_TIMER(local_state._copy_data_timer);
         mutable_block = 
vectorized::VectorizedUtils::build_mutable_mem_reuse_block(
                 block, partitioned_block.first->data_block);
         RETURN_IF_ERROR(get_data(block));
-    } else if (all_finished) {
-        *eos = true;
-    } else {
-        COUNTER_UPDATE(local_state._get_block_failed_counter, 1);
-        local_state._dependency->block();
     }
     return Status::OK();
 }
@@ -92,7 +117,6 @@ Status ShuffleExchanger::get_block(RuntimeState* state, 
vectorized::Block* block
 Status ShuffleExchanger::_split_rows(RuntimeState* state, const uint32_t* 
__restrict channel_ids,
                                      vectorized::Block* block, bool eos,
                                      LocalExchangeSinkLocalState& local_state) 
{
-    auto& data_queue = _data_queue;
     const auto rows = block->rows();
     auto row_idx = std::make_shared<vectorized::PODArray<uint32_t>>(rows);
     {
@@ -135,9 +159,9 @@ Status ShuffleExchanger::_split_rows(RuntimeState* state, 
const uint32_t* __rest
             if (size > 0) {
                 local_state._shared_state->add_mem_usage(
                         it.second, 
new_block_wrapper->data_block.allocated_bytes(), false);
-                if (data_queue[it.second].enqueue({new_block_wrapper, 
{row_idx, start, size}})) {
-                    local_state._shared_state->set_ready_to_read(it.second);
-                } else {
+
+                if (!_enqueue_data_and_set_ready(it.second, local_state,
+                                                 {new_block_wrapper, {row_idx, 
start, size}})) {
                     local_state._shared_state->sub_mem_usage(
                             it.second, 
new_block_wrapper->data_block.allocated_bytes(), false);
                     new_block_wrapper->unref(local_state._shared_state);
@@ -154,10 +178,8 @@ Status ShuffleExchanger::_split_rows(RuntimeState* state, 
const uint32_t* __rest
             if (size > 0) {
                 local_state._shared_state->add_mem_usage(
                         i % _num_sources, 
new_block_wrapper->data_block.allocated_bytes(), false);
-                if (data_queue[i % _num_sources].enqueue(
-                            {new_block_wrapper, {row_idx, start, size}})) {
-                    local_state._shared_state->set_ready_to_read(i % 
_num_sources);
-                } else {
+                if (!_enqueue_data_and_set_ready(i % _num_sources, local_state,
+                                                 {new_block_wrapper, {row_idx, 
start, size}})) {
                     local_state._shared_state->sub_mem_usage(
                             i % _num_sources, 
new_block_wrapper->data_block.allocated_bytes(),
                             false);
@@ -177,9 +199,8 @@ Status ShuffleExchanger::_split_rows(RuntimeState* state, 
const uint32_t* __rest
             if (size > 0) {
                 local_state._shared_state->add_mem_usage(
                         map[i], 
new_block_wrapper->data_block.allocated_bytes(), false);
-                if (data_queue[map[i]].enqueue({new_block_wrapper, {row_idx, 
start, size}})) {
-                    local_state._shared_state->set_ready_to_read(map[i]);
-                } else {
+                if (!_enqueue_data_and_set_ready(map[i], local_state,
+                                                 {new_block_wrapper, {row_idx, 
start, size}})) {
                     local_state._shared_state->sub_mem_usage(
                             map[i], 
new_block_wrapper->data_block.allocated_bytes(), false);
                     new_block_wrapper->unref(local_state._shared_state);
@@ -203,9 +224,7 @@ Status PassthroughExchanger::sink(RuntimeState* state, 
vectorized::Block* in_blo
     auto channel_id = (local_state._channel_id++) % _num_partitions;
     size_t memory_usage = new_block.allocated_bytes();
     local_state._shared_state->add_mem_usage(channel_id, memory_usage);
-    if (_data_queue[channel_id].enqueue(std::move(new_block))) {
-        local_state._shared_state->set_ready_to_read(channel_id);
-    } else {
+    if (!_enqueue_data_and_set_ready(channel_id, local_state, 
std::move(new_block))) {
         local_state._shared_state->sub_mem_usage(channel_id, memory_usage);
     }
 
@@ -224,19 +243,13 @@ void 
PassthroughExchanger::close(LocalExchangeSourceLocalState& local_state) {
 Status PassthroughExchanger::get_block(RuntimeState* state, vectorized::Block* 
block, bool* eos,
                                        LocalExchangeSourceLocalState& 
local_state) {
     vectorized::Block next_block;
-    bool all_finished = _running_sink_operators == 0;
-    if (_data_queue[local_state._channel_id].try_dequeue(next_block)) {
+    if (_dequeue_data(local_state, next_block, eos)) {
         block->swap(next_block);
         local_state._shared_state->sub_mem_usage(local_state._channel_id, 
block->allocated_bytes());
         if (_free_block_limit == 0 ||
             _free_blocks.size_approx() < _free_block_limit * _num_sources) {
             _free_blocks.enqueue(std::move(next_block));
         }
-    } else if (all_finished) {
-        *eos = true;
-    } else {
-        COUNTER_UPDATE(local_state._get_block_failed_counter, 1);
-        local_state._dependency->block();
     }
     return Status::OK();
 }
@@ -245,9 +258,7 @@ Status PassToOneExchanger::sink(RuntimeState* state, 
vectorized::Block* in_block
                                 LocalExchangeSinkLocalState& local_state) {
     vectorized::Block new_block(in_block->clone_empty());
     new_block.swap(*in_block);
-    if (_data_queue[0].enqueue(std::move(new_block))) {
-        local_state._shared_state->set_ready_to_read(0);
-    }
+    _enqueue_data_and_set_ready(0, local_state, std::move(new_block));
 
     return Status::OK();
 }
@@ -259,14 +270,8 @@ Status PassToOneExchanger::get_block(RuntimeState* state, 
vectorized::Block* blo
         return Status::OK();
     }
     vectorized::Block next_block;
-    bool all_finished = _running_sink_operators == 0;
-    if (_data_queue[0].try_dequeue(next_block)) {
+    if (_dequeue_data(local_state, next_block, eos)) {
         *block = std::move(next_block);
-    } else if (all_finished) {
-        *eos = true;
-    } else {
-        COUNTER_UPDATE(local_state._get_block_failed_counter, 1);
-        local_state._dependency->block();
     }
     return Status::OK();
 }
@@ -283,9 +288,7 @@ Status LocalMergeSortExchanger::sink(RuntimeState* state, 
vectorized::Block* in_
     size_t memory_usage = new_block.allocated_bytes();
     add_mem_usage(local_state, memory_usage);
 
-    if (_data_queue[local_state._channel_id].enqueue(std::move(new_block))) {
-        local_state._shared_state->set_ready_to_read(0);
-    } else {
+    if (!_enqueue_data_and_set_ready(local_state._channel_id, local_state, 
std::move(new_block))) {
         sub_mem_usage(local_state, memory_usage);
     }
     if (eos) {
@@ -402,9 +405,7 @@ Status BroadcastExchanger::sink(RuntimeState* state, 
vectorized::Block* in_block
     for (size_t i = 0; i < _num_partitions; i++) {
         auto mutable_block = 
vectorized::MutableBlock::create_unique(in_block->clone_empty());
         RETURN_IF_ERROR(mutable_block->add_rows(in_block, 0, 
in_block->rows()));
-        if (_data_queue[i].enqueue(mutable_block->to_block())) {
-            local_state._shared_state->set_ready_to_read(i);
-        }
+        _enqueue_data_and_set_ready(i, local_state, mutable_block->to_block());
     }
 
     return Status::OK();
@@ -421,14 +422,8 @@ void 
BroadcastExchanger::close(LocalExchangeSourceLocalState& local_state) {
 Status BroadcastExchanger::get_block(RuntimeState* state, vectorized::Block* 
block, bool* eos,
                                      LocalExchangeSourceLocalState& 
local_state) {
     vectorized::Block next_block;
-    bool all_finished = _running_sink_operators == 0;
-    if (_data_queue[local_state._channel_id].try_dequeue(next_block)) {
+    if (_dequeue_data(local_state, next_block, eos)) {
         *block = std::move(next_block);
-    } else if (all_finished) {
-        *eos = true;
-    } else {
-        COUNTER_UPDATE(local_state._get_block_failed_counter, 1);
-        local_state._dependency->block();
     }
     return Status::OK();
 }
@@ -444,9 +439,8 @@ Status 
AdaptivePassthroughExchanger::_passthrough_sink(RuntimeState* state,
     auto channel_id = (local_state._channel_id++) % _num_partitions;
     size_t memory_usage = new_block.allocated_bytes();
     local_state._shared_state->add_mem_usage(channel_id, memory_usage);
-    if (_data_queue[channel_id].enqueue(std::move(new_block))) {
-        local_state._shared_state->set_ready_to_read(channel_id);
-    } else {
+
+    if (!_enqueue_data_and_set_ready(channel_id, local_state, 
std::move(new_block))) {
         local_state._shared_state->sub_mem_usage(channel_id, memory_usage);
     }
 
@@ -477,7 +471,6 @@ Status 
AdaptivePassthroughExchanger::_split_rows(RuntimeState* state,
                                                  const uint32_t* __restrict 
channel_ids,
                                                  vectorized::Block* block, 
bool eos,
                                                  LocalExchangeSinkLocalState& 
local_state) {
-    auto& data_queue = _data_queue;
     const auto rows = block->rows();
     auto row_idx = std::make_shared<std::vector<uint32_t>>(rows);
     {
@@ -506,9 +499,7 @@ Status 
AdaptivePassthroughExchanger::_split_rows(RuntimeState* state,
 
             size_t memory_usage = new_block.allocated_bytes();
             local_state._shared_state->add_mem_usage(i, memory_usage);
-            if (data_queue[i].enqueue(std::move(new_block))) {
-                local_state._shared_state->set_ready_to_read(i);
-            } else {
+            if (!_enqueue_data_and_set_ready(i, local_state, 
std::move(new_block))) {
                 local_state._shared_state->sub_mem_usage(i, memory_usage);
             }
         }
@@ -532,19 +523,13 @@ Status 
AdaptivePassthroughExchanger::get_block(RuntimeState* state, vectorized::
                                                bool* eos,
                                                LocalExchangeSourceLocalState& 
local_state) {
     vectorized::Block next_block;
-    bool all_finished = _running_sink_operators == 0;
-    if (_data_queue[local_state._channel_id].try_dequeue(next_block)) {
+    if (_dequeue_data(local_state, next_block, eos)) {
         block->swap(next_block);
         if (_free_block_limit == 0 ||
             _free_blocks.size_approx() < _free_block_limit * _num_sources) {
             _free_blocks.enqueue(std::move(next_block));
         }
         local_state._shared_state->sub_mem_usage(local_state._channel_id, 
block->allocated_bytes());
-    } else if (all_finished) {
-        *eos = true;
-    } else {
-        COUNTER_UPDATE(local_state._get_block_failed_counter, 1);
-        local_state._dependency->block();
     }
     return Status::OK();
 }
diff --git a/be/src/pipeline/local_exchange/local_exchanger.h 
b/be/src/pipeline/local_exchange/local_exchanger.h
index 2c4f8f5b785..6cd64126069 100644
--- a/be/src/pipeline/local_exchange/local_exchanger.h
+++ b/be/src/pipeline/local_exchange/local_exchanger.h
@@ -55,6 +55,8 @@ public:
     virtual std::vector<Dependency*> local_sink_state_dependency(int 
channel_id) { return {}; }
     virtual std::vector<Dependency*> local_state_dependency(int channel_id) { 
return {}; }
 
+    virtual std::string data_queue_debug_string(int i) = 0;
+
 protected:
     friend struct LocalExchangeSharedState;
     friend struct ShuffleBlockWrapper;
@@ -115,9 +117,21 @@ public:
             : ExchangerBase(running_sink_operators, num_sources, 
num_partitions, free_block_limit) {
     }
     ~Exchanger() override = default;
+    std::string data_queue_debug_string(int i) override {
+        fmt::memory_buffer debug_string_buffer;
+        fmt::format_to(debug_string_buffer, "Data Queue {}: [size approx = {}, 
eos = {}]",
+                       _data_queue[i].data_queue.size_approx(), 
_data_queue[i].eos);
+        return fmt::to_string(debug_string_buffer);
+    }
 
 protected:
+    bool _enqueue_data_and_set_ready(int channel_id, 
LocalExchangeSinkLocalState& local_state,
+                                     BlockType&& block);
+    bool _dequeue_data(LocalExchangeSourceLocalState& local_state, BlockType& 
block, bool* eos);
     std::vector<BlockQueue<BlockType>> _data_queue;
+
+private:
+    std::mutex _m;
 };
 
 class LocalExchangeSourceLocalState;


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org
For additional commands, e-mail: commits-h...@doris.apache.org

Reply via email to