This is an automated email from the ASF dual-hosted git repository. gabriellee pushed a commit to branch branch-2.1 in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/branch-2.1 by this push: new 82c681595e2 [fix](local exchange) Fix local exchange blocked by a huge data block… (#38693) 82c681595e2 is described below commit 82c681595e2b592b58942d6e332ea5fb2fee4ac1 Author: Gabriel <gabrielleeb...@gmail.com> AuthorDate: Thu Aug 1 18:04:19 2024 +0800 [fix](local exchange) Fix local exchange blocked by a huge data block… (#38693) … (#38657) If a huge block is push into local exchanger, it will be blocked due to concurrent problems. This PR use a unique lock to resolve it . --- .../local_exchange/local_exchange_sink_operator.h | 2 + .../local_exchange_source_operator.cpp | 6 +- .../local_exchange_source_operator.h | 2 + .../pipeline_x/local_exchange/local_exchanger.cpp | 111 +++++++++------------ .../pipeline_x/local_exchange/local_exchanger.h | 14 +++ 5 files changed, 71 insertions(+), 64 deletions(-) diff --git a/be/src/pipeline/pipeline_x/local_exchange/local_exchange_sink_operator.h b/be/src/pipeline/pipeline_x/local_exchange/local_exchange_sink_operator.h index 99b88747a98..a32ecc21e00 100644 --- a/be/src/pipeline/pipeline_x/local_exchange/local_exchange_sink_operator.h +++ b/be/src/pipeline/pipeline_x/local_exchange/local_exchange_sink_operator.h @@ -49,6 +49,8 @@ private: friend class BroadcastExchanger; friend class PassToOneExchanger; friend class AdaptivePassthroughExchanger; + template <typename BlockType> + friend class Exchanger; ExchangerBase* _exchanger = nullptr; diff --git a/be/src/pipeline/pipeline_x/local_exchange/local_exchange_source_operator.cpp b/be/src/pipeline/pipeline_x/local_exchange/local_exchange_source_operator.cpp index 086a3b551fd..b3a28a6404f 100644 --- a/be/src/pipeline/pipeline_x/local_exchange/local_exchange_source_operator.cpp +++ b/be/src/pipeline/pipeline_x/local_exchange/local_exchange_source_operator.cpp @@ -66,11 +66,13 @@ std::string LocalExchangeSourceLocalState::debug_string(int indentation_level) c fmt::memory_buffer debug_string_buffer; fmt::format_to(debug_string_buffer, "{}, _channel_id: {}, _num_partitions: {}, _num_senders: {}, _num_sources: {}, " - "_running_sink_operators: {}, _running_source_operators: {}, mem_usage: {}", + "_running_sink_operators: {}, _running_source_operators: {}, mem_usage: {}, " + "data queue info: {}", Base::debug_string(indentation_level), _channel_id, _exchanger->_num_partitions, _exchanger->_num_senders, _exchanger->_num_sources, _exchanger->_running_sink_operators, _exchanger->_running_source_operators, - _shared_state->mem_usage.load()); + _shared_state->mem_usage.load(), + _exchanger->data_queue_debug_string(_channel_id)); size_t i = 0; fmt::format_to(debug_string_buffer, ", MemTrackers: "); for (auto* mem_tracker : _shared_state->mem_trackers) { diff --git a/be/src/pipeline/pipeline_x/local_exchange/local_exchange_source_operator.h b/be/src/pipeline/pipeline_x/local_exchange/local_exchange_source_operator.h index 7cefc1ca900..193b1c553f9 100644 --- a/be/src/pipeline/pipeline_x/local_exchange/local_exchange_source_operator.h +++ b/be/src/pipeline/pipeline_x/local_exchange/local_exchange_source_operator.h @@ -47,6 +47,8 @@ private: friend class BroadcastExchanger; friend class PassToOneExchanger; friend class AdaptivePassthroughExchanger; + template <typename BlockType> + friend class Exchanger; ExchangerBase* _exchanger = nullptr; int _channel_id; diff --git a/be/src/pipeline/pipeline_x/local_exchange/local_exchanger.cpp b/be/src/pipeline/pipeline_x/local_exchange/local_exchanger.cpp index 7a044aaa77f..eb3875dcf7c 100644 --- a/be/src/pipeline/pipeline_x/local_exchange/local_exchanger.cpp +++ b/be/src/pipeline/pipeline_x/local_exchange/local_exchanger.cpp @@ -24,6 +24,37 @@ namespace doris::pipeline { +template <typename BlockType> +bool Exchanger<BlockType>::_enqueue_data_and_set_ready(int channel_id, + LocalExchangeSinkLocalState& local_state, + BlockType&& block) { + std::unique_lock l(_m); + if (_data_queue[channel_id].enqueue(std::move(block))) { + local_state._shared_state->set_ready_to_read(channel_id); + return true; + } + return false; +} + +template <typename BlockType> +bool Exchanger<BlockType>::_dequeue_data(LocalExchangeSourceLocalState& local_state, + BlockType& block, bool* eos) { + bool all_finished = _running_sink_operators == 0; + if (_data_queue[local_state._channel_id].try_dequeue(block)) { + return true; + } else if (all_finished) { + *eos = true; + } else { + std::unique_lock l(_m); + if (_data_queue[local_state._channel_id].try_dequeue(block)) { + return true; + } + COUNTER_UPDATE(local_state._get_block_failed_counter, 1); + local_state._dependency->block(); + } + return false; +} + Status ShuffleExchanger::sink(RuntimeState* state, vectorized::Block* in_block, bool eos, LocalExchangeSinkLocalState& local_state) { { @@ -72,17 +103,11 @@ Status ShuffleExchanger::get_block(RuntimeState* state, vectorized::Block* block return Status::OK(); }; - bool all_finished = _running_sink_operators == 0; - if (_data_queue[local_state._channel_id].try_dequeue(partitioned_block)) { + if (_dequeue_data(local_state, partitioned_block, eos)) { SCOPED_TIMER(local_state._copy_data_timer); mutable_block = vectorized::VectorizedUtils::build_mutable_mem_reuse_block( block, partitioned_block.first->data_block); RETURN_IF_ERROR(get_data(block)); - } else if (all_finished) { - *eos = true; - } else { - COUNTER_UPDATE(local_state._get_block_failed_counter, 1); - local_state._dependency->block(); } return Status::OK(); } @@ -90,7 +115,6 @@ Status ShuffleExchanger::get_block(RuntimeState* state, vectorized::Block* block Status ShuffleExchanger::_split_rows(RuntimeState* state, const uint32_t* __restrict channel_ids, vectorized::Block* block, bool eos, LocalExchangeSinkLocalState& local_state) { - auto& data_queue = _data_queue; const auto rows = block->rows(); auto row_idx = std::make_shared<vectorized::PODArray<uint32_t>>(rows); { @@ -133,9 +157,9 @@ Status ShuffleExchanger::_split_rows(RuntimeState* state, const uint32_t* __rest if (size > 0) { local_state._shared_state->add_mem_usage( it.second, new_block_wrapper->data_block.allocated_bytes(), false); - if (data_queue[it.second].enqueue({new_block_wrapper, {row_idx, start, size}})) { - local_state._shared_state->set_ready_to_read(it.second); - } else { + + if (!_enqueue_data_and_set_ready(it.second, local_state, + {new_block_wrapper, {row_idx, start, size}})) { local_state._shared_state->sub_mem_usage( it.second, new_block_wrapper->data_block.allocated_bytes(), false); new_block_wrapper->unref(local_state._shared_state); @@ -152,10 +176,8 @@ Status ShuffleExchanger::_split_rows(RuntimeState* state, const uint32_t* __rest if (size > 0) { local_state._shared_state->add_mem_usage( i % _num_sources, new_block_wrapper->data_block.allocated_bytes(), false); - if (data_queue[i % _num_sources].enqueue( - {new_block_wrapper, {row_idx, start, size}})) { - local_state._shared_state->set_ready_to_read(i % _num_sources); - } else { + if (!_enqueue_data_and_set_ready(i % _num_sources, local_state, + {new_block_wrapper, {row_idx, start, size}})) { local_state._shared_state->sub_mem_usage( i % _num_sources, new_block_wrapper->data_block.allocated_bytes(), false); @@ -175,9 +197,8 @@ Status ShuffleExchanger::_split_rows(RuntimeState* state, const uint32_t* __rest if (size > 0) { local_state._shared_state->add_mem_usage( map[i], new_block_wrapper->data_block.allocated_bytes(), false); - if (data_queue[map[i]].enqueue({new_block_wrapper, {row_idx, start, size}})) { - local_state._shared_state->set_ready_to_read(map[i]); - } else { + if (!_enqueue_data_and_set_ready(map[i], local_state, + {new_block_wrapper, {row_idx, start, size}})) { local_state._shared_state->sub_mem_usage( map[i], new_block_wrapper->data_block.allocated_bytes(), false); new_block_wrapper->unref(local_state._shared_state); @@ -201,9 +222,7 @@ Status PassthroughExchanger::sink(RuntimeState* state, vectorized::Block* in_blo auto channel_id = (local_state._channel_id++) % _num_partitions; size_t memory_usage = new_block.allocated_bytes(); local_state._shared_state->add_mem_usage(channel_id, memory_usage); - if (_data_queue[channel_id].enqueue(std::move(new_block))) { - local_state._shared_state->set_ready_to_read(channel_id); - } else { + if (!_enqueue_data_and_set_ready(channel_id, local_state, std::move(new_block))) { local_state._shared_state->sub_mem_usage(channel_id, memory_usage); } @@ -222,19 +241,13 @@ void PassthroughExchanger::close(LocalExchangeSourceLocalState& local_state) { Status PassthroughExchanger::get_block(RuntimeState* state, vectorized::Block* block, bool* eos, LocalExchangeSourceLocalState& local_state) { vectorized::Block next_block; - bool all_finished = _running_sink_operators == 0; - if (_data_queue[local_state._channel_id].try_dequeue(next_block)) { + if (_dequeue_data(local_state, next_block, eos)) { block->swap(next_block); local_state._shared_state->sub_mem_usage(local_state._channel_id, block->allocated_bytes()); if (_free_block_limit == 0 || _free_blocks.size_approx() < _free_block_limit * _num_sources) { _free_blocks.enqueue(std::move(next_block)); } - } else if (all_finished) { - *eos = true; - } else { - COUNTER_UPDATE(local_state._get_block_failed_counter, 1); - local_state._dependency->block(); } return Status::OK(); } @@ -243,9 +256,7 @@ Status PassToOneExchanger::sink(RuntimeState* state, vectorized::Block* in_block LocalExchangeSinkLocalState& local_state) { vectorized::Block new_block(in_block->clone_empty()); new_block.swap(*in_block); - if (_data_queue[0].enqueue(std::move(new_block))) { - local_state._shared_state->set_ready_to_read(0); - } + _enqueue_data_and_set_ready(0, local_state, std::move(new_block)); return Status::OK(); } @@ -257,14 +268,8 @@ Status PassToOneExchanger::get_block(RuntimeState* state, vectorized::Block* blo return Status::OK(); } vectorized::Block next_block; - bool all_finished = _running_sink_operators == 0; - if (_data_queue[0].try_dequeue(next_block)) { + if (_dequeue_data(local_state, next_block, eos)) { *block = std::move(next_block); - } else if (all_finished) { - *eos = true; - } else { - COUNTER_UPDATE(local_state._get_block_failed_counter, 1); - local_state._dependency->block(); } return Status::OK(); } @@ -274,9 +279,7 @@ Status BroadcastExchanger::sink(RuntimeState* state, vectorized::Block* in_block for (size_t i = 0; i < _num_partitions; i++) { auto mutable_block = vectorized::MutableBlock::create_unique(in_block->clone_empty()); RETURN_IF_ERROR(mutable_block->add_rows(in_block, 0, in_block->rows())); - if (_data_queue[i].enqueue(mutable_block->to_block())) { - local_state._shared_state->set_ready_to_read(i); - } + _enqueue_data_and_set_ready(i, local_state, mutable_block->to_block()); } return Status::OK(); @@ -293,14 +296,8 @@ void BroadcastExchanger::close(LocalExchangeSourceLocalState& local_state) { Status BroadcastExchanger::get_block(RuntimeState* state, vectorized::Block* block, bool* eos, LocalExchangeSourceLocalState& local_state) { vectorized::Block next_block; - bool all_finished = _running_sink_operators == 0; - if (_data_queue[local_state._channel_id].try_dequeue(next_block)) { + if (_dequeue_data(local_state, next_block, eos)) { *block = std::move(next_block); - } else if (all_finished) { - *eos = true; - } else { - COUNTER_UPDATE(local_state._get_block_failed_counter, 1); - local_state._dependency->block(); } return Status::OK(); } @@ -316,9 +313,8 @@ Status AdaptivePassthroughExchanger::_passthrough_sink(RuntimeState* state, auto channel_id = (local_state._channel_id++) % _num_partitions; size_t memory_usage = new_block.allocated_bytes(); local_state._shared_state->add_mem_usage(channel_id, memory_usage); - if (_data_queue[channel_id].enqueue(std::move(new_block))) { - local_state._shared_state->set_ready_to_read(channel_id); - } else { + + if (!_enqueue_data_and_set_ready(channel_id, local_state, std::move(new_block))) { local_state._shared_state->sub_mem_usage(channel_id, memory_usage); } @@ -349,7 +345,6 @@ Status AdaptivePassthroughExchanger::_split_rows(RuntimeState* state, const uint32_t* __restrict channel_ids, vectorized::Block* block, bool eos, LocalExchangeSinkLocalState& local_state) { - auto& data_queue = _data_queue; const auto rows = block->rows(); auto row_idx = std::make_shared<std::vector<uint32_t>>(rows); { @@ -378,9 +373,7 @@ Status AdaptivePassthroughExchanger::_split_rows(RuntimeState* state, size_t memory_usage = new_block.allocated_bytes(); local_state._shared_state->add_mem_usage(i, memory_usage); - if (data_queue[i].enqueue(std::move(new_block))) { - local_state._shared_state->set_ready_to_read(i); - } else { + if (!_enqueue_data_and_set_ready(i, local_state, std::move(new_block))) { local_state._shared_state->sub_mem_usage(i, memory_usage); } } @@ -404,19 +397,13 @@ Status AdaptivePassthroughExchanger::get_block(RuntimeState* state, vectorized:: bool* eos, LocalExchangeSourceLocalState& local_state) { vectorized::Block next_block; - bool all_finished = _running_sink_operators == 0; - if (_data_queue[local_state._channel_id].try_dequeue(next_block)) { + if (_dequeue_data(local_state, next_block, eos)) { block->swap(next_block); if (_free_block_limit == 0 || _free_blocks.size_approx() < _free_block_limit * _num_sources) { _free_blocks.enqueue(std::move(next_block)); } local_state._shared_state->sub_mem_usage(local_state._channel_id, block->allocated_bytes()); - } else if (all_finished) { - *eos = true; - } else { - COUNTER_UPDATE(local_state._get_block_failed_counter, 1); - local_state._dependency->block(); } return Status::OK(); } diff --git a/be/src/pipeline/pipeline_x/local_exchange/local_exchanger.h b/be/src/pipeline/pipeline_x/local_exchange/local_exchanger.h index ee0b5e286de..b1d1d1f2668 100644 --- a/be/src/pipeline/pipeline_x/local_exchange/local_exchanger.h +++ b/be/src/pipeline/pipeline_x/local_exchange/local_exchanger.h @@ -54,6 +54,8 @@ public: virtual DependencySPtr get_local_state_dependency(int _channel_id) { return nullptr; } + virtual std::string data_queue_debug_string(int i) = 0; + protected: friend struct LocalExchangeSharedState; friend struct ShuffleBlockWrapper; @@ -114,9 +116,21 @@ public: : ExchangerBase(running_sink_operators, num_sources, num_partitions, free_block_limit) { } ~Exchanger() override = default; + std::string data_queue_debug_string(int i) override { + fmt::memory_buffer debug_string_buffer; + fmt::format_to(debug_string_buffer, "Data Queue {}: [size approx = {}, eos = {}]", + _data_queue[i].data_queue.size_approx(), _data_queue[i].eos); + return fmt::to_string(debug_string_buffer); + } protected: + bool _enqueue_data_and_set_ready(int channel_id, LocalExchangeSinkLocalState& local_state, + BlockType&& block); + bool _dequeue_data(LocalExchangeSourceLocalState& local_state, BlockType& block, bool* eos); std::vector<BlockQueue<BlockType>> _data_queue; + +private: + std::mutex _m; }; class LocalExchangeSourceLocalState; --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org For additional commands, e-mail: commits-h...@doris.apache.org