This is an automated email from the ASF dual-hosted git repository. yiguolei pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/master by this push: new 3d6de8572fc [fix](exchange)fix exchange sink buffer does not update total_queue_size when EOF. (#47312) 3d6de8572fc is described below commit 3d6de8572fc0eaf975b5c3e6a901fa59b31c805f Author: Mryange <yanxuech...@selectdb.com> AuthorDate: Fri Jan 24 05:04:20 2025 +0800 [fix](exchange)fix exchange sink buffer does not update total_queue_size when EOF. (#47312) ### What problem does this PR solve? https://github.com/apache/doris/pull/41602 EOF clears _instance_to_package_queue but does not update total_queue_size, causing incorrect judgments that rely on total_queue_size. UT ``` mock transmit_blockv2 dest ins id :1 mock transmit_blockv2 dest ins id :2 mock transmit_blockv2 dest ins id :3 queue size : 6 each queue size : Instance: 2, queue size: 2 Instance: 1, queue size: 2 Instance: 3, queue size: 2 queue size : 6 // error size each queue size : Instance: 2, queue size: 0 Instance: 1, queue size: 2 Instance: 3, queue size: 2 mock transmit_blockv2 dest ins id :1 mock transmit_blockv2 dest ins id :1 mock transmit_blockv2 dest ins id :3 mock transmit_blockv2 dest ins id :3 ``` --- be/src/pipeline/exec/exchange_sink_buffer.cpp | 24 ++++++++++++++- be/src/pipeline/exec/exchange_sink_buffer.h | 4 +++ be/src/pipeline/exec/exchange_sink_operator.cpp | 13 ++++---- be/test/vec/exec/exchange_sink_test.cpp | 40 +++++++++++++++++++++++++ 4 files changed, 74 insertions(+), 7 deletions(-) diff --git a/be/src/pipeline/exec/exchange_sink_buffer.cpp b/be/src/pipeline/exec/exchange_sink_buffer.cpp index 800ef615073..4acb747bbc9 100644 --- a/be/src/pipeline/exec/exchange_sink_buffer.cpp +++ b/be/src/pipeline/exec/exchange_sink_buffer.cpp @@ -42,6 +42,7 @@ #include "runtime/exec_env.h" #include "runtime/thread_context.h" #include "service/backend_options.h" +#include "util/defer_op.h" #include "util/proto_util.h" #include "util/time.h" #include "vec/sink/vdata_stream_sender.h" @@ -442,7 +443,8 @@ void ExchangeSinkBuffer::_set_receiver_eof(InstanceLoId id) { // When the receiving side reaches eof, it means the receiver has finished early. // The remaining data in the current rpc_channel does not need to be sent, // and the rpc_channel should be turned off immediately. - _turn_off_channel(id, lock); + Defer turn_off([&]() { _turn_off_channel(id, lock); }); + std::queue<BroadcastTransmitInfo, std::list<BroadcastTransmitInfo>>& broadcast_q = _instance_to_broadcast_package_queue[id]; for (; !broadcast_q.empty(); broadcast_q.pop()) { @@ -458,12 +460,22 @@ void ExchangeSinkBuffer::_set_receiver_eof(InstanceLoId id) { std::queue<TransmitInfo, std::list<TransmitInfo>>& q = _instance_to_package_queue[id]; for (; !q.empty(); q.pop()) { + // Must update _total_queue_size here, otherwise if _total_queue_size > _queue_capacity at EOF, + // ExchangeSinkQueueDependency will be blocked and pipeline will be deadlocked + _total_queue_size--; if (q.front().block) { COUNTER_UPDATE(q.front().channel->_parent->memory_used_counter(), -q.front().block->ByteSizeLong()); } } + // Try to wake up pipeline after clearing the queue + if (_total_queue_size <= _queue_capacity) { + for (auto& [_, dep] : _queue_deps) { + dep->set_ready(); + } + } + { std::queue<TransmitInfo, std::list<TransmitInfo>> empty; swap(empty, q); @@ -575,6 +587,16 @@ void ExchangeSinkBuffer::update_profile(RuntimeProfile* profile) { } } +std::string ExchangeSinkBuffer::debug_each_instance_queue_size() { + fmt::memory_buffer debug_string_buffer; + for (auto& [id, m] : _instance_to_package_queue_mutex) { + std::unique_lock<std::mutex> lock(*m); + fmt::format_to(debug_string_buffer, "Instance: {}, queue size: {}\n", id, + _instance_to_package_queue[id].size()); + } + return fmt::to_string(debug_string_buffer); +} + } // namespace pipeline #include "common/compile_check_end.h" } // namespace doris diff --git a/be/src/pipeline/exec/exchange_sink_buffer.h b/be/src/pipeline/exec/exchange_sink_buffer.h index 80e5dc42289..e6c4635aef3 100644 --- a/be/src/pipeline/exec/exchange_sink_buffer.h +++ b/be/src/pipeline/exec/exchange_sink_buffer.h @@ -239,6 +239,8 @@ public: _queue_deps[sender_ins_id] = queue_dependency; _parents[sender_ins_id] = local_state; } + + std::string debug_each_instance_queue_size(); #ifdef BE_TEST public: #else @@ -306,6 +308,8 @@ private: void get_max_min_rpc_time(int64_t* max_time, int64_t* min_time); int64_t get_sum_rpc_time(); + // _total_queue_size is the sum of the sizes of all instance_to_package_queues. + // Any modification to instance_to_package_queue requires a corresponding modification to _total_queue_size. std::atomic<int> _total_queue_size = 0; // _running_sink_count is used to track how many sinks have not finished yet. diff --git a/be/src/pipeline/exec/exchange_sink_operator.cpp b/be/src/pipeline/exec/exchange_sink_operator.cpp index d7422e0e8f0..9c3e451b333 100644 --- a/be/src/pipeline/exec/exchange_sink_operator.cpp +++ b/be/src/pipeline/exec/exchange_sink_operator.cpp @@ -499,12 +499,13 @@ std::string ExchangeSinkLocalState::debug_string(int indentation_level) const { fmt::memory_buffer debug_string_buffer; fmt::format_to(debug_string_buffer, "{}", Base::debug_string(indentation_level)); if (_sink_buffer) { - fmt::format_to(debug_string_buffer, - ", Sink Buffer: (_is_finishing = {}, blocks in queue: {}, queue capacity: " - "{}, queue dep: {}), _reach_limit: {}, working channels: {}", - _sink_buffer->_is_failed.load(), _sink_buffer->_total_queue_size, - _sink_buffer->_queue_capacity, (void*)_queue_dependency.get(), - _reach_limit.load(), _working_channels_count.load()); + fmt::format_to( + debug_string_buffer, + ", Sink Buffer: (_is_finishing = {}, blocks in queue: {}, queue capacity: " + "{}, queue dep: {}), _reach_limit: {}, working channels: {} , each queue size: {}", + _sink_buffer->_is_failed.load(), _sink_buffer->_total_queue_size, + _sink_buffer->_queue_capacity, (void*)_queue_dependency.get(), _reach_limit.load(), + _working_channels_count.load(), _sink_buffer->debug_each_instance_queue_size()); } return fmt::to_string(debug_string_buffer); } diff --git a/be/test/vec/exec/exchange_sink_test.cpp b/be/test/vec/exec/exchange_sink_test.cpp index 9576ed71ee2..7dbd352bd3a 100644 --- a/be/test/vec/exec/exchange_sink_test.cpp +++ b/be/test/vec/exec/exchange_sink_test.cpp @@ -193,4 +193,44 @@ TEST_F(ExchangeSInkTest, test_error_end) { } } +TEST_F(ExchangeSInkTest, test_queue_size) { + { + auto state = create_runtime_state(); + auto buffer = create_buffer(state); + + auto sink1 = create_sink(state, buffer); + + EXPECT_EQ(sink1.add_block(dest_ins_id_1, false), Status::OK()); + EXPECT_EQ(sink1.add_block(dest_ins_id_1, false), Status::OK()); + EXPECT_EQ(sink1.add_block(dest_ins_id_1, false), Status::OK()); + + EXPECT_EQ(sink1.add_block(dest_ins_id_2, false), Status::OK()); + EXPECT_EQ(sink1.add_block(dest_ins_id_2, false), Status::OK()); + EXPECT_EQ(sink1.add_block(dest_ins_id_2, false), Status::OK()); + + EXPECT_EQ(sink1.add_block(dest_ins_id_3, false), Status::OK()); + EXPECT_EQ(sink1.add_block(dest_ins_id_3, false), Status::OK()); + EXPECT_EQ(sink1.add_block(dest_ins_id_3, false), Status::OK()); + + std::cout << "queue size : " << buffer->_total_queue_size << "\n"; + + EXPECT_EQ(buffer->_total_queue_size, 6); + + std::cout << "each queue size : \n" << buffer->debug_each_instance_queue_size() << "\n"; + + pop_block(dest_ins_id_2, PopState::eof); + + std::cout << "queue size : " << buffer->_total_queue_size << "\n"; + + EXPECT_EQ(buffer->_total_queue_size, 4); + + std::cout << "each queue size : \n" << buffer->debug_each_instance_queue_size() << "\n"; + + EXPECT_EQ(buffer->_rpc_channel_is_turn_off[dest_ins_id_1], false); + EXPECT_EQ(buffer->_rpc_channel_is_turn_off[dest_ins_id_2], true); + EXPECT_EQ(buffer->_rpc_channel_is_turn_off[dest_ins_id_3], false); + clear_all_done(); + } +} + } // namespace doris::vectorized --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org For additional commands, e-mail: commits-h...@doris.apache.org