This is an automated email from the ASF dual-hosted git repository. jacktengg pushed a commit to branch spill_and_reserve in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/spill_and_reserve by this push: new 46d3bbd474b [fix](exchange)fix exchange sink buffer does not update total_queue_size when EOF. (#47312) (#47621) 46d3bbd474b is described below commit 46d3bbd474b8f72ae30bbfb46599e9fe36cbde96 Author: TengJianPing <tengjianp...@selectdb.com> AuthorDate: Sat Feb 8 10:50:41 2025 +0800 [fix](exchange)fix exchange sink buffer does not update total_queue_size when EOF. (#47312) (#47621) https://github.com/apache/doris/pull/41602 EOF clears _instance_to_package_queue but does not update total_queue_size, causing incorrect judgments that rely on total_queue_size. UT ``` mock transmit_blockv2 dest ins id :1 mock transmit_blockv2 dest ins id :2 mock transmit_blockv2 dest ins id :3 queue size : 6 each queue size : Instance: 2, queue size: 2 Instance: 1, queue size: 2 Instance: 3, queue size: 2 queue size : 6 // error size each queue size : Instance: 2, queue size: 0 Instance: 1, queue size: 2 Instance: 3, queue size: 2 mock transmit_blockv2 dest ins id :1 mock transmit_blockv2 dest ins id :1 mock transmit_blockv2 dest ins id :3 mock transmit_blockv2 dest ins id :3 ``` ### What problem does this PR solve? Issue Number: close #xxx Related PR: #xxx Problem Summary: ### Release note None ### Check List (For Author) - Test <!-- At least one of them must be included. --> - [ ] Regression test - [ ] Unit Test - [ ] Manual test (add detailed scripts or steps below) - [ ] No need to test or manual test. Explain why: - [ ] This is a refactor/code format and no logic has been changed. - [ ] Previous test can cover this change. - [ ] No code files have been changed. - [ ] Other reason <!-- Add your reason? --> - Behavior changed: - [ ] No. - [ ] Yes. <!-- Explain the behavior change --> - Does this need documentation? - [ ] No. - [ ] Yes. <!-- Add document PR link here. eg: https://github.com/apache/doris-website/pull/1214 --> ### Check List (For Reviewer who merge this PR) - [ ] Confirm the release note - [ ] Confirm test cases - [ ] Confirm document - [ ] Add branch pick label <!-- Add branch pick label that this PR should merge into --> Co-authored-by: Mryange <yanxuech...@selectdb.com> --- be/src/pipeline/exec/exchange_sink_buffer.cpp | 24 ++++++++++++++- be/src/pipeline/exec/exchange_sink_buffer.h | 3 ++ be/src/pipeline/exec/exchange_sink_operator.cpp | 13 ++++---- be/test/vec/exec/exchange_sink_test.cpp | 40 +++++++++++++++++++++++++ 4 files changed, 73 insertions(+), 7 deletions(-) diff --git a/be/src/pipeline/exec/exchange_sink_buffer.cpp b/be/src/pipeline/exec/exchange_sink_buffer.cpp index 3fb460c9cc7..a776025c676 100644 --- a/be/src/pipeline/exec/exchange_sink_buffer.cpp +++ b/be/src/pipeline/exec/exchange_sink_buffer.cpp @@ -42,6 +42,7 @@ #include "runtime/exec_env.h" #include "runtime/thread_context.h" #include "service/backend_options.h" +#include "util/defer_op.h" #include "util/proto_util.h" #include "util/time.h" #include "vec/sink/vdata_stream_sender.h" @@ -445,7 +446,8 @@ void ExchangeSinkBuffer::_set_receiver_eof(InstanceLoId id) { // When the receiving side reaches eof, it means the receiver has finished early. // The remaining data in the current rpc_channel does not need to be sent, // and the rpc_channel should be turned off immediately. - _turn_off_channel(id, lock); + Defer turn_off([&]() { _turn_off_channel(id, lock); }); + std::queue<BroadcastTransmitInfo, std::list<BroadcastTransmitInfo>>& broadcast_q = _instance_to_broadcast_package_queue[id]; for (; !broadcast_q.empty(); broadcast_q.pop()) { @@ -461,12 +463,22 @@ void ExchangeSinkBuffer::_set_receiver_eof(InstanceLoId id) { std::queue<TransmitInfo, std::list<TransmitInfo>>& q = _instance_to_package_queue[id]; for (; !q.empty(); q.pop()) { + // Must update _total_queue_size here, otherwise if _total_queue_size > _queue_capacity at EOF, + // ExchangeSinkQueueDependency will be blocked and pipeline will be deadlocked + _total_queue_size--; if (q.front().block) { COUNTER_UPDATE(q.front().channel->_parent->memory_used_counter(), -q.front().block->ByteSizeLong()); } } + // Try to wake up pipeline after clearing the queue + if (_total_queue_size <= _queue_capacity) { + for (auto& [_, dep] : _queue_deps) { + dep->set_ready(); + } + } + { std::queue<TransmitInfo, std::list<TransmitInfo>> empty; swap(empty, q); @@ -578,6 +590,16 @@ void ExchangeSinkBuffer::update_profile(RuntimeProfile* profile) { } } +std::string ExchangeSinkBuffer::debug_each_instance_queue_size() { + fmt::memory_buffer debug_string_buffer; + for (auto& [id, m] : _instance_to_package_queue_mutex) { + std::unique_lock<std::mutex> lock(*m); + fmt::format_to(debug_string_buffer, "Instance: {}, queue size: {}\n", id, + _instance_to_package_queue[id].size()); + } + return fmt::to_string(debug_string_buffer); +} + } // namespace pipeline #include "common/compile_check_end.h" } // namespace doris diff --git a/be/src/pipeline/exec/exchange_sink_buffer.h b/be/src/pipeline/exec/exchange_sink_buffer.h index 899a2991110..51698c118cd 100644 --- a/be/src/pipeline/exec/exchange_sink_buffer.h +++ b/be/src/pipeline/exec/exchange_sink_buffer.h @@ -250,6 +250,7 @@ public: } void set_low_memory_mode() { _queue_capacity = 8; } + std::string debug_each_instance_queue_size(); #ifdef BE_TEST public: #else @@ -319,6 +320,8 @@ private: void get_max_min_rpc_time(int64_t* max_time, int64_t* min_time); int64_t get_sum_rpc_time(); + // _total_queue_size is the sum of the sizes of all instance_to_package_queues. + // Any modification to instance_to_package_queue requires a corresponding modification to _total_queue_size. std::atomic<int> _total_queue_size = 0; // _running_sink_count is used to track how many sinks have not finished yet. diff --git a/be/src/pipeline/exec/exchange_sink_operator.cpp b/be/src/pipeline/exec/exchange_sink_operator.cpp index 573f4aa840e..c6ac3b80d88 100644 --- a/be/src/pipeline/exec/exchange_sink_operator.cpp +++ b/be/src/pipeline/exec/exchange_sink_operator.cpp @@ -507,12 +507,13 @@ std::string ExchangeSinkLocalState::debug_string(int indentation_level) const { fmt::memory_buffer debug_string_buffer; fmt::format_to(debug_string_buffer, "{}", Base::debug_string(indentation_level)); if (_sink_buffer) { - fmt::format_to(debug_string_buffer, - ", Sink Buffer: (_is_finishing = {}, blocks in queue: {}, queue capacity: " - "{}, queue dep: {}), _reach_limit: {}, working channels: {}", - _sink_buffer->_is_failed.load(), _sink_buffer->_total_queue_size, - _sink_buffer->_queue_capacity, (void*)_queue_dependency.get(), - _reach_limit.load(), _working_channels_count.load()); + fmt::format_to( + debug_string_buffer, + ", Sink Buffer: (_is_finishing = {}, blocks in queue: {}, queue capacity: " + "{}, queue dep: {}), _reach_limit: {}, working channels: {} , each queue size: {}", + _sink_buffer->_is_failed.load(), _sink_buffer->_total_queue_size, + _sink_buffer->_queue_capacity, (void*)_queue_dependency.get(), _reach_limit.load(), + _working_channels_count.load(), _sink_buffer->debug_each_instance_queue_size()); } return fmt::to_string(debug_string_buffer); } diff --git a/be/test/vec/exec/exchange_sink_test.cpp b/be/test/vec/exec/exchange_sink_test.cpp index 9576ed71ee2..7dbd352bd3a 100644 --- a/be/test/vec/exec/exchange_sink_test.cpp +++ b/be/test/vec/exec/exchange_sink_test.cpp @@ -193,4 +193,44 @@ TEST_F(ExchangeSInkTest, test_error_end) { } } +TEST_F(ExchangeSInkTest, test_queue_size) { + { + auto state = create_runtime_state(); + auto buffer = create_buffer(state); + + auto sink1 = create_sink(state, buffer); + + EXPECT_EQ(sink1.add_block(dest_ins_id_1, false), Status::OK()); + EXPECT_EQ(sink1.add_block(dest_ins_id_1, false), Status::OK()); + EXPECT_EQ(sink1.add_block(dest_ins_id_1, false), Status::OK()); + + EXPECT_EQ(sink1.add_block(dest_ins_id_2, false), Status::OK()); + EXPECT_EQ(sink1.add_block(dest_ins_id_2, false), Status::OK()); + EXPECT_EQ(sink1.add_block(dest_ins_id_2, false), Status::OK()); + + EXPECT_EQ(sink1.add_block(dest_ins_id_3, false), Status::OK()); + EXPECT_EQ(sink1.add_block(dest_ins_id_3, false), Status::OK()); + EXPECT_EQ(sink1.add_block(dest_ins_id_3, false), Status::OK()); + + std::cout << "queue size : " << buffer->_total_queue_size << "\n"; + + EXPECT_EQ(buffer->_total_queue_size, 6); + + std::cout << "each queue size : \n" << buffer->debug_each_instance_queue_size() << "\n"; + + pop_block(dest_ins_id_2, PopState::eof); + + std::cout << "queue size : " << buffer->_total_queue_size << "\n"; + + EXPECT_EQ(buffer->_total_queue_size, 4); + + std::cout << "each queue size : \n" << buffer->debug_each_instance_queue_size() << "\n"; + + EXPECT_EQ(buffer->_rpc_channel_is_turn_off[dest_ins_id_1], false); + EXPECT_EQ(buffer->_rpc_channel_is_turn_off[dest_ins_id_2], true); + EXPECT_EQ(buffer->_rpc_channel_is_turn_off[dest_ins_id_3], false); + clear_all_done(); + } +} + } // namespace doris::vectorized --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org For additional commands, e-mail: commits-h...@doris.apache.org