This is an automated email from the ASF dual-hosted git repository.

yiguolei pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/doris.git


The following commit(s) were added to refs/heads/master by this push:
     new 3d6de8572fc [fix](exchange)fix exchange sink buffer does not update 
total_queue_size when EOF. (#47312)
3d6de8572fc is described below

commit 3d6de8572fc0eaf975b5c3e6a901fa59b31c805f
Author: Mryange <yanxuech...@selectdb.com>
AuthorDate: Fri Jan 24 05:04:20 2025 +0800

    [fix](exchange)fix exchange sink buffer does not update total_queue_size 
when EOF. (#47312)
    
    ### What problem does this PR solve?
    
    https://github.com/apache/doris/pull/41602
    EOF clears _instance_to_package_queue but does not update
    total_queue_size, causing incorrect judgments that rely on
    total_queue_size.
    
    UT
    
    ```
    mock transmit_blockv2 dest ins id :1
    mock transmit_blockv2 dest ins id :2
    mock transmit_blockv2 dest ins id :3
    queue size : 6
    each queue size :
    Instance: 2, queue size: 2
    Instance: 1, queue size: 2
    Instance: 3, queue size: 2
    
    queue size : 6 // error size
    each queue size :
    Instance: 2, queue size: 0
    Instance: 1, queue size: 2
    Instance: 3, queue size: 2
    
    mock transmit_blockv2 dest ins id :1
    mock transmit_blockv2 dest ins id :1
    mock transmit_blockv2 dest ins id :3
    mock transmit_blockv2 dest ins id :3
    ```
---
 be/src/pipeline/exec/exchange_sink_buffer.cpp   | 24 ++++++++++++++-
 be/src/pipeline/exec/exchange_sink_buffer.h     |  4 +++
 be/src/pipeline/exec/exchange_sink_operator.cpp | 13 ++++----
 be/test/vec/exec/exchange_sink_test.cpp         | 40 +++++++++++++++++++++++++
 4 files changed, 74 insertions(+), 7 deletions(-)

diff --git a/be/src/pipeline/exec/exchange_sink_buffer.cpp 
b/be/src/pipeline/exec/exchange_sink_buffer.cpp
index 800ef615073..4acb747bbc9 100644
--- a/be/src/pipeline/exec/exchange_sink_buffer.cpp
+++ b/be/src/pipeline/exec/exchange_sink_buffer.cpp
@@ -42,6 +42,7 @@
 #include "runtime/exec_env.h"
 #include "runtime/thread_context.h"
 #include "service/backend_options.h"
+#include "util/defer_op.h"
 #include "util/proto_util.h"
 #include "util/time.h"
 #include "vec/sink/vdata_stream_sender.h"
@@ -442,7 +443,8 @@ void ExchangeSinkBuffer::_set_receiver_eof(InstanceLoId id) 
{
     // When the receiving side reaches eof, it means the receiver has finished 
early.
     // The remaining data in the current rpc_channel does not need to be sent,
     // and the rpc_channel should be turned off immediately.
-    _turn_off_channel(id, lock);
+    Defer turn_off([&]() { _turn_off_channel(id, lock); });
+
     std::queue<BroadcastTransmitInfo, std::list<BroadcastTransmitInfo>>& 
broadcast_q =
             _instance_to_broadcast_package_queue[id];
     for (; !broadcast_q.empty(); broadcast_q.pop()) {
@@ -458,12 +460,22 @@ void ExchangeSinkBuffer::_set_receiver_eof(InstanceLoId 
id) {
 
     std::queue<TransmitInfo, std::list<TransmitInfo>>& q = 
_instance_to_package_queue[id];
     for (; !q.empty(); q.pop()) {
+        // Must update _total_queue_size here, otherwise if _total_queue_size 
> _queue_capacity at EOF,
+        // ExchangeSinkQueueDependency will be blocked and pipeline will be 
deadlocked
+        _total_queue_size--;
         if (q.front().block) {
             COUNTER_UPDATE(q.front().channel->_parent->memory_used_counter(),
                            -q.front().block->ByteSizeLong());
         }
     }
 
+    // Try to wake up pipeline after clearing the queue
+    if (_total_queue_size <= _queue_capacity) {
+        for (auto& [_, dep] : _queue_deps) {
+            dep->set_ready();
+        }
+    }
+
     {
         std::queue<TransmitInfo, std::list<TransmitInfo>> empty;
         swap(empty, q);
@@ -575,6 +587,16 @@ void ExchangeSinkBuffer::update_profile(RuntimeProfile* 
profile) {
     }
 }
 
+std::string ExchangeSinkBuffer::debug_each_instance_queue_size() {
+    fmt::memory_buffer debug_string_buffer;
+    for (auto& [id, m] : _instance_to_package_queue_mutex) {
+        std::unique_lock<std::mutex> lock(*m);
+        fmt::format_to(debug_string_buffer, "Instance: {}, queue size: {}\n", 
id,
+                       _instance_to_package_queue[id].size());
+    }
+    return fmt::to_string(debug_string_buffer);
+}
+
 } // namespace pipeline
 #include "common/compile_check_end.h"
 } // namespace doris
diff --git a/be/src/pipeline/exec/exchange_sink_buffer.h 
b/be/src/pipeline/exec/exchange_sink_buffer.h
index 80e5dc42289..e6c4635aef3 100644
--- a/be/src/pipeline/exec/exchange_sink_buffer.h
+++ b/be/src/pipeline/exec/exchange_sink_buffer.h
@@ -239,6 +239,8 @@ public:
         _queue_deps[sender_ins_id] = queue_dependency;
         _parents[sender_ins_id] = local_state;
     }
+
+    std::string debug_each_instance_queue_size();
 #ifdef BE_TEST
 public:
 #else
@@ -306,6 +308,8 @@ private:
     void get_max_min_rpc_time(int64_t* max_time, int64_t* min_time);
     int64_t get_sum_rpc_time();
 
+    // _total_queue_size is the sum of the sizes of all 
instance_to_package_queues.
+    // Any modification to instance_to_package_queue requires a corresponding 
modification to _total_queue_size.
     std::atomic<int> _total_queue_size = 0;
 
     // _running_sink_count is used to track how many sinks have not finished 
yet.
diff --git a/be/src/pipeline/exec/exchange_sink_operator.cpp 
b/be/src/pipeline/exec/exchange_sink_operator.cpp
index d7422e0e8f0..9c3e451b333 100644
--- a/be/src/pipeline/exec/exchange_sink_operator.cpp
+++ b/be/src/pipeline/exec/exchange_sink_operator.cpp
@@ -499,12 +499,13 @@ std::string ExchangeSinkLocalState::debug_string(int 
indentation_level) const {
     fmt::memory_buffer debug_string_buffer;
     fmt::format_to(debug_string_buffer, "{}", 
Base::debug_string(indentation_level));
     if (_sink_buffer) {
-        fmt::format_to(debug_string_buffer,
-                       ", Sink Buffer: (_is_finishing = {}, blocks in queue: 
{}, queue capacity: "
-                       "{}, queue dep: {}), _reach_limit: {}, working 
channels: {}",
-                       _sink_buffer->_is_failed.load(), 
_sink_buffer->_total_queue_size,
-                       _sink_buffer->_queue_capacity, 
(void*)_queue_dependency.get(),
-                       _reach_limit.load(), _working_channels_count.load());
+        fmt::format_to(
+                debug_string_buffer,
+                ", Sink Buffer: (_is_finishing = {}, blocks in queue: {}, 
queue capacity: "
+                "{}, queue dep: {}), _reach_limit: {}, working channels: {} , 
each queue size: {}",
+                _sink_buffer->_is_failed.load(), 
_sink_buffer->_total_queue_size,
+                _sink_buffer->_queue_capacity, (void*)_queue_dependency.get(), 
_reach_limit.load(),
+                _working_channels_count.load(), 
_sink_buffer->debug_each_instance_queue_size());
     }
     return fmt::to_string(debug_string_buffer);
 }
diff --git a/be/test/vec/exec/exchange_sink_test.cpp 
b/be/test/vec/exec/exchange_sink_test.cpp
index 9576ed71ee2..7dbd352bd3a 100644
--- a/be/test/vec/exec/exchange_sink_test.cpp
+++ b/be/test/vec/exec/exchange_sink_test.cpp
@@ -193,4 +193,44 @@ TEST_F(ExchangeSInkTest, test_error_end) {
     }
 }
 
+TEST_F(ExchangeSInkTest, test_queue_size) {
+    {
+        auto state = create_runtime_state();
+        auto buffer = create_buffer(state);
+
+        auto sink1 = create_sink(state, buffer);
+
+        EXPECT_EQ(sink1.add_block(dest_ins_id_1, false), Status::OK());
+        EXPECT_EQ(sink1.add_block(dest_ins_id_1, false), Status::OK());
+        EXPECT_EQ(sink1.add_block(dest_ins_id_1, false), Status::OK());
+
+        EXPECT_EQ(sink1.add_block(dest_ins_id_2, false), Status::OK());
+        EXPECT_EQ(sink1.add_block(dest_ins_id_2, false), Status::OK());
+        EXPECT_EQ(sink1.add_block(dest_ins_id_2, false), Status::OK());
+
+        EXPECT_EQ(sink1.add_block(dest_ins_id_3, false), Status::OK());
+        EXPECT_EQ(sink1.add_block(dest_ins_id_3, false), Status::OK());
+        EXPECT_EQ(sink1.add_block(dest_ins_id_3, false), Status::OK());
+
+        std::cout << "queue size : " << buffer->_total_queue_size << "\n";
+
+        EXPECT_EQ(buffer->_total_queue_size, 6);
+
+        std::cout << "each queue size : \n" << 
buffer->debug_each_instance_queue_size() << "\n";
+
+        pop_block(dest_ins_id_2, PopState::eof);
+
+        std::cout << "queue size : " << buffer->_total_queue_size << "\n";
+
+        EXPECT_EQ(buffer->_total_queue_size, 4);
+
+        std::cout << "each queue size : \n" << 
buffer->debug_each_instance_queue_size() << "\n";
+
+        EXPECT_EQ(buffer->_rpc_channel_is_turn_off[dest_ins_id_1], false);
+        EXPECT_EQ(buffer->_rpc_channel_is_turn_off[dest_ins_id_2], true);
+        EXPECT_EQ(buffer->_rpc_channel_is_turn_off[dest_ins_id_3], false);
+        clear_all_done();
+    }
+}
+
 } // namespace doris::vectorized


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org
For additional commands, e-mail: commits-h...@doris.apache.org

Reply via email to