This is an automated email from the ASF dual-hosted git repository. kxiao pushed a commit to branch branch-2.0 in repository https://gitbox.apache.org/repos/asf/doris.git
commit 6551dfd04b53da80217645e6cc295179de8d7c1e Author: Kang <kxiao.ti...@gmail.com> AuthorDate: Sun Jul 9 09:01:50 2023 +0800 [fix](sink) fix pipeline load stuck (#216361) --- be/src/vec/sink/vtablet_sink.cpp | 20 +++++++++----------- 1 file changed, 9 insertions(+), 11 deletions(-) diff --git a/be/src/vec/sink/vtablet_sink.cpp b/be/src/vec/sink/vtablet_sink.cpp index be7183428a..43bf98c967 100644 --- a/be/src/vec/sink/vtablet_sink.cpp +++ b/be/src/vec/sink/vtablet_sink.cpp @@ -107,6 +107,7 @@ public: void Run() override { SCOPED_TRACK_MEMORY_TO_UNKNOWN(); + DCHECK(packet_in_flight); auto open_partition_failed = [this]() { std::stringstream ss; ss << "failed to open partition, error=" << berror(this->cntl.ErrorCode()) @@ -126,7 +127,7 @@ public: open_partition_failed(); } } - done = true; + packet_in_flight = false; } void join() { brpc::Join(cntl.call_id()); } @@ -136,7 +137,7 @@ public: VNodeChannel* vnode_channel; IndexChannel* index_channel; int64_t partition_id; - std::atomic<bool> done {false}; + std::atomic<bool> packet_in_flight {false}; }; IndexChannel::~IndexChannel() {} @@ -547,6 +548,7 @@ void VNodeChannel::open_partition(int64_t partition_id) { remain_ms = config::min_load_rpc_timeout_ms; } open_partition_closure->cntl.set_timeout_ms(remain_ms); + open_partition_closure->packet_in_flight = true; _stub->open_partition(&open_partition_closure.get()->cntl, &request, &open_partition_closure.get()->result, open_partition_closure.get()); @@ -563,7 +565,7 @@ void VNodeChannel::open_partition_wait() { bool VNodeChannel::open_partition_finished() const { for (auto& open_partition_closure : _open_partition_closures) { - if (!open_partition_closure->done) { + if (open_partition_closure->packet_in_flight) { return false; } } @@ -912,12 +914,7 @@ void VNodeChannel::cancel(const std::string& cancel_msg) { } bool VNodeChannel::is_send_data_rpc_done() const { - if (_add_block_closure != nullptr) { - return _add_batches_finished || (_cancelled && !_add_block_closure->is_packet_in_flight()); - } else { - // such as, canceled before open_wait new closure. - return _add_batches_finished || _cancelled; - } + return _add_batches_finished || _cancelled; } Status VNodeChannel::close_wait(RuntimeState* state) { @@ -1563,8 +1560,9 @@ void VOlapTableSink::try_close(RuntimeState* state, Status exec_status) { } bool VOlapTableSink::is_close_done() { - if (config::enable_lazy_open_partition && !_open_partition_done) { - return false; + // Only after try_close, need to wait rpc end. + if (!_try_close) { + return true; } bool close_done = true; for (const auto& index_channel : _channels) { --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org For additional commands, e-mail: commits-h...@doris.apache.org