This is an automated email from the ASF dual-hosted git repository.

liaoxin pushed a commit to branch branch-2.1
in repository https://gitbox.apache.org/repos/asf/doris.git


The following commit(s) were added to refs/heads/branch-2.1 by this push:
     new 5806dae467e [fix](move-memtable) do not retry open streams (#41550) 
(#41999)
5806dae467e is described below

commit 5806dae467ea2ba33e095e1d5abe02a6d967a3dc
Author: Kaijie Chen <c...@apache.org>
AuthorDate: Thu Oct 17 15:56:56 2024 +0800

    [fix](move-memtable) do not retry open streams (#41550) (#41999)
    
    backport #41550
---
 be/src/vec/sink/load_stream_stub.cpp         | 60 ++++++++++++++--------------
 be/src/vec/sink/load_stream_stub.h           |  6 +--
 be/src/vec/sink/writer/vtablet_writer_v2.cpp |  2 +-
 3 files changed, 33 insertions(+), 35 deletions(-)

diff --git a/be/src/vec/sink/load_stream_stub.cpp 
b/be/src/vec/sink/load_stream_stub.cpp
index e29d64118b9..1d13ca4b903 100644
--- a/be/src/vec/sink/load_stream_stub.cpp
+++ b/be/src/vec/sink/load_stream_stub.cpp
@@ -135,7 +135,7 @@ LoadStreamStub::LoadStreamStub(PUniqueId load_id, int64_t 
src_id,
           _is_incremental(incremental) {};
 
 LoadStreamStub::~LoadStreamStub() {
-    if (_is_init.load() && !_is_closed.load()) {
+    if (_is_open.load() && !_is_closed.load()) {
         auto ret = brpc::StreamClose(_stream_id);
         LOG(INFO) << *this << " is deconstructed, close " << (ret == 0 ? 
"success" : "failed");
     }
@@ -149,8 +149,9 @@ Status 
LoadStreamStub::open(BrpcClientCache<PBackendService_Stub>* client_cache,
                             int64_t idle_timeout_ms, bool enable_profile) {
     std::unique_lock<bthread::Mutex> lock(_open_mutex);
     if (_is_init.load()) {
-        return _init_st;
+        return _status;
     }
+    _is_init.store(true);
     _dst_id = node_info.id;
     brpc::StreamOptions opt;
     opt.max_buf_size = config::load_stream_max_buf_size;
@@ -160,8 +161,8 @@ Status 
LoadStreamStub::open(BrpcClientCache<PBackendService_Stub>* client_cache,
     brpc::Controller cntl;
     if (int ret = brpc::StreamCreate(&_stream_id, cntl, &opt)) {
         delete opt.handler;
-        _init_st = Status::Error<true>(ret, "Failed to create stream");
-        return _init_st;
+        _status = Status::Error<true>(ret, "Failed to create stream");
+        return _status;
     }
     cntl.set_timeout_ms(config::open_load_stream_timeout_ms);
     POpenLoadStreamRequest request;
@@ -174,8 +175,8 @@ Status 
LoadStreamStub::open(BrpcClientCache<PBackendService_Stub>* client_cache,
     } else if (total_streams > 0) {
         request.set_total_streams(total_streams);
     } else {
-        _init_st = Status::InternalError("total_streams should be greator than 
0");
-        return _init_st;
+        _status = Status::InternalError("total_streams should be greator than 
0");
+        return _status;
     }
     request.set_idle_timeout_ms(idle_timeout_ms);
     schema.to_protobuf(request.mutable_schema());
@@ -199,13 +200,13 @@ Status 
LoadStreamStub::open(BrpcClientCache<PBackendService_Stub>* client_cache,
     }
     if (cntl.Failed()) {
         brpc::StreamClose(_stream_id);
-        _init_st = Status::InternalError("Failed to connect to backend {}: 
{}", _dst_id,
-                                         cntl.ErrorText());
-        return _init_st;
+        _status = Status::InternalError("Failed to connect to backend {}: {}", 
_dst_id,
+                                        cntl.ErrorText());
+        return _status;
     }
     LOG(INFO) << "open load stream to host=" << node_info.host << ", port=" << 
node_info.brpc_port
               << ", " << *this;
-    _is_init.store(true);
+    _is_open.store(true);
     return Status::OK();
 }
 
@@ -213,9 +214,9 @@ Status 
LoadStreamStub::open(BrpcClientCache<PBackendService_Stub>* client_cache,
 Status LoadStreamStub::append_data(int64_t partition_id, int64_t index_id, 
int64_t tablet_id,
                                    int64_t segment_id, uint64_t offset, 
std::span<const Slice> data,
                                    bool segment_eos) {
-    if (!_is_init.load()) {
-        add_failed_tablet(tablet_id, _init_st);
-        return _init_st;
+    if (!_is_open.load()) {
+        add_failed_tablet(tablet_id, _status);
+        return _status;
     }
     DBUG_EXECUTE_IF("LoadStreamStub.only_send_segment_0", {
         if (segment_id != 0) {
@@ -239,9 +240,9 @@ Status LoadStreamStub::append_data(int64_t partition_id, 
int64_t index_id, int64
 Status LoadStreamStub::add_segment(int64_t partition_id, int64_t index_id, 
int64_t tablet_id,
                                    int64_t segment_id, const 
SegmentStatistics& segment_stat,
                                    TabletSchemaSPtr flush_schema) {
-    if (!_is_init.load()) {
-        add_failed_tablet(tablet_id, _init_st);
-        return _init_st;
+    if (!_is_open.load()) {
+        add_failed_tablet(tablet_id, _status);
+        return _status;
     }
     DBUG_EXECUTE_IF("LoadStreamStub.only_send_segment_0", {
         if (segment_id != 0) {
@@ -265,8 +266,8 @@ Status LoadStreamStub::add_segment(int64_t partition_id, 
int64_t index_id, int64
 
 // CLOSE_LOAD
 Status LoadStreamStub::close_load(const std::vector<PTabletID>& 
tablets_to_commit) {
-    if (!_is_init.load()) {
-        return _init_st;
+    if (!_is_open.load()) {
+        return _status;
     }
     PStreamHeader header;
     *header.mutable_load_id() = _load_id;
@@ -275,10 +276,10 @@ Status LoadStreamStub::close_load(const 
std::vector<PTabletID>& tablets_to_commi
     for (const auto& tablet : tablets_to_commit) {
         *header.add_tablets() = tablet;
     }
-    _close_st = _encode_and_send(header);
-    if (!_close_st.ok()) {
-        LOG(WARNING) << "stream " << _stream_id << " close failed: " << 
_close_st;
-        return _close_st;
+    _status = _encode_and_send(header);
+    if (!_status.ok()) {
+        LOG(WARNING) << "stream " << _stream_id << " close failed: " << 
_status;
+        return _status;
     }
     _is_closing.store(true);
     return Status::OK();
@@ -286,8 +287,8 @@ Status LoadStreamStub::close_load(const 
std::vector<PTabletID>& tablets_to_commi
 
 // GET_SCHEMA
 Status LoadStreamStub::get_schema(const std::vector<PTabletID>& tablets) {
-    if (!_is_init.load()) {
-        return _init_st;
+    if (!_is_open.load()) {
+        return _status;
     }
     PStreamHeader header;
     *header.mutable_load_id() = _load_id;
@@ -309,8 +310,8 @@ Status LoadStreamStub::get_schema(const 
std::vector<PTabletID>& tablets) {
 
 Status LoadStreamStub::wait_for_schema(int64_t partition_id, int64_t index_id, 
int64_t tablet_id,
                                        int64_t timeout_ms) {
-    if (!_is_init.load()) {
-        return _init_st;
+    if (!_is_open.load()) {
+        return _status;
     }
     if (_tablet_schema_for_index->contains(index_id)) {
         return Status::OK();
@@ -337,11 +338,8 @@ Status LoadStreamStub::wait_for_schema(int64_t 
partition_id, int64_t index_id, i
 
 Status LoadStreamStub::close_wait(RuntimeState* state, int64_t timeout_ms) {
     DBUG_EXECUTE_IF("LoadStreamStub::close_wait.long_wait", DBUG_BLOCK);
-    if (!_is_init.load()) {
-        return _init_st;
-    }
     if (!_is_closing.load()) {
-        return _close_st;
+        return _status;
     }
     if (_is_closed.load()) {
         return _check_cancel();
@@ -370,7 +368,7 @@ Status LoadStreamStub::close_wait(RuntimeState* state, 
int64_t timeout_ms) {
 
 void LoadStreamStub::cancel(Status reason) {
     LOG(WARNING) << *this << " is cancelled because of " << reason;
-    if (_is_init.load()) {
+    if (_is_open.load()) {
         brpc::StreamClose(_stream_id);
     }
     {
diff --git a/be/src/vec/sink/load_stream_stub.h 
b/be/src/vec/sink/load_stream_stub.h
index 048cf4dc60f..223babb42e3 100644
--- a/be/src/vec/sink/load_stream_stub.h
+++ b/be/src/vec/sink/load_stream_stub.h
@@ -195,7 +195,7 @@ public:
 
     int64_t dst_id() const { return _dst_id; }
 
-    bool is_inited() const { return _is_init.load(); }
+    bool is_open() const { return _is_open.load(); }
 
     bool is_incremental() const { return _is_incremental; }
 
@@ -231,6 +231,7 @@ private:
 
 protected:
     std::atomic<bool> _is_init;
+    std::atomic<bool> _is_open;
     std::atomic<bool> _is_closing;
     std::atomic<bool> _is_closed;
     std::atomic<bool> _is_cancelled;
@@ -240,8 +241,7 @@ protected:
     brpc::StreamId _stream_id;
     int64_t _src_id = -1; // source backend_id
     int64_t _dst_id = -1; // destination backend_id
-    Status _init_st = Status::InternalError<false>("Stream is not open");
-    Status _close_st;
+    Status _status = Status::InternalError<false>("Stream is not open");
     Status _cancel_st;
 
     bthread::Mutex _open_mutex;
diff --git a/be/src/vec/sink/writer/vtablet_writer_v2.cpp 
b/be/src/vec/sink/writer/vtablet_writer_v2.cpp
index 273430c7948..16c11b1cf42 100644
--- a/be/src/vec/sink/writer/vtablet_writer_v2.cpp
+++ b/be/src/vec/sink/writer/vtablet_writer_v2.cpp
@@ -389,7 +389,7 @@ Status VTabletWriterV2::_select_streams(int64_t tablet_id, 
int64_t partition_id,
         VLOG_DEBUG << fmt::format("_select_streams P{} I{} T{}", partition_id, 
index_id, tablet_id);
         _tablets_for_node[node_id].emplace(tablet_id, tablet);
         auto stream = _load_stream_map->at(node_id)->at(_stream_index);
-        for (int i = 1; i < _stream_per_node && !stream->is_inited(); i++) {
+        for (int i = 1; i < _stream_per_node && !stream->is_open(); i++) {
             stream = _load_stream_map->at(node_id)->at((_stream_index + i) % 
_stream_per_node);
         }
         streams.emplace_back(std::move(stream));


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org
For additional commands, e-mail: commits-h...@doris.apache.org

Reply via email to