This is an automated email from the ASF dual-hosted git repository.

kxiao pushed a commit to branch branch-2.0
in repository https://gitbox.apache.org/repos/asf/doris.git

commit 6551dfd04b53da80217645e6cc295179de8d7c1e
Author: Kang <kxiao.ti...@gmail.com>
AuthorDate: Sun Jul 9 09:01:50 2023 +0800

        [fix](sink) fix pipeline load stuck (#216361)
---
 be/src/vec/sink/vtablet_sink.cpp | 20 +++++++++-----------
 1 file changed, 9 insertions(+), 11 deletions(-)

diff --git a/be/src/vec/sink/vtablet_sink.cpp b/be/src/vec/sink/vtablet_sink.cpp
index be7183428a..43bf98c967 100644
--- a/be/src/vec/sink/vtablet_sink.cpp
+++ b/be/src/vec/sink/vtablet_sink.cpp
@@ -107,6 +107,7 @@ public:
 
     void Run() override {
         SCOPED_TRACK_MEMORY_TO_UNKNOWN();
+        DCHECK(packet_in_flight);
         auto open_partition_failed = [this]() {
             std::stringstream ss;
             ss << "failed to open partition, error=" << 
berror(this->cntl.ErrorCode())
@@ -126,7 +127,7 @@ public:
                 open_partition_failed();
             }
         }
-        done = true;
+        packet_in_flight = false;
     }
 
     void join() { brpc::Join(cntl.call_id()); }
@@ -136,7 +137,7 @@ public:
     VNodeChannel* vnode_channel;
     IndexChannel* index_channel;
     int64_t partition_id;
-    std::atomic<bool> done {false};
+    std::atomic<bool> packet_in_flight {false};
 };
 
 IndexChannel::~IndexChannel() {}
@@ -547,6 +548,7 @@ void VNodeChannel::open_partition(int64_t partition_id) {
         remain_ms = config::min_load_rpc_timeout_ms;
     }
     open_partition_closure->cntl.set_timeout_ms(remain_ms);
+    open_partition_closure->packet_in_flight = true;
     _stub->open_partition(&open_partition_closure.get()->cntl, &request,
                           &open_partition_closure.get()->result, 
open_partition_closure.get());
 
@@ -563,7 +565,7 @@ void VNodeChannel::open_partition_wait() {
 
 bool VNodeChannel::open_partition_finished() const {
     for (auto& open_partition_closure : _open_partition_closures) {
-        if (!open_partition_closure->done) {
+        if (open_partition_closure->packet_in_flight) {
             return false;
         }
     }
@@ -912,12 +914,7 @@ void VNodeChannel::cancel(const std::string& cancel_msg) {
 }
 
 bool VNodeChannel::is_send_data_rpc_done() const {
-    if (_add_block_closure != nullptr) {
-        return _add_batches_finished || (_cancelled && 
!_add_block_closure->is_packet_in_flight());
-    } else {
-        // such as, canceled before open_wait new closure.
-        return _add_batches_finished || _cancelled;
-    }
+    return _add_batches_finished || _cancelled;
 }
 
 Status VNodeChannel::close_wait(RuntimeState* state) {
@@ -1563,8 +1560,9 @@ void VOlapTableSink::try_close(RuntimeState* state, 
Status exec_status) {
 }
 
 bool VOlapTableSink::is_close_done() {
-    if (config::enable_lazy_open_partition && !_open_partition_done) {
-        return false;
+    // Only after try_close, need to wait rpc end.
+    if (!_try_close) {
+        return true;
     }
     bool close_done = true;
     for (const auto& index_channel : _channels) {


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org
For additional commands, e-mail: commits-h...@doris.apache.org

Reply via email to