This is an automated email from the ASF dual-hosted git repository. liaoxin pushed a commit to branch branch-2.1 in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/branch-2.1 by this push: new 5806dae467e [fix](move-memtable) do not retry open streams (#41550) (#41999) 5806dae467e is described below commit 5806dae467ea2ba33e095e1d5abe02a6d967a3dc Author: Kaijie Chen <c...@apache.org> AuthorDate: Thu Oct 17 15:56:56 2024 +0800 [fix](move-memtable) do not retry open streams (#41550) (#41999) backport #41550 --- be/src/vec/sink/load_stream_stub.cpp | 60 ++++++++++++++-------------- be/src/vec/sink/load_stream_stub.h | 6 +-- be/src/vec/sink/writer/vtablet_writer_v2.cpp | 2 +- 3 files changed, 33 insertions(+), 35 deletions(-) diff --git a/be/src/vec/sink/load_stream_stub.cpp b/be/src/vec/sink/load_stream_stub.cpp index e29d64118b9..1d13ca4b903 100644 --- a/be/src/vec/sink/load_stream_stub.cpp +++ b/be/src/vec/sink/load_stream_stub.cpp @@ -135,7 +135,7 @@ LoadStreamStub::LoadStreamStub(PUniqueId load_id, int64_t src_id, _is_incremental(incremental) {}; LoadStreamStub::~LoadStreamStub() { - if (_is_init.load() && !_is_closed.load()) { + if (_is_open.load() && !_is_closed.load()) { auto ret = brpc::StreamClose(_stream_id); LOG(INFO) << *this << " is deconstructed, close " << (ret == 0 ? "success" : "failed"); } @@ -149,8 +149,9 @@ Status LoadStreamStub::open(BrpcClientCache<PBackendService_Stub>* client_cache, int64_t idle_timeout_ms, bool enable_profile) { std::unique_lock<bthread::Mutex> lock(_open_mutex); if (_is_init.load()) { - return _init_st; + return _status; } + _is_init.store(true); _dst_id = node_info.id; brpc::StreamOptions opt; opt.max_buf_size = config::load_stream_max_buf_size; @@ -160,8 +161,8 @@ Status LoadStreamStub::open(BrpcClientCache<PBackendService_Stub>* client_cache, brpc::Controller cntl; if (int ret = brpc::StreamCreate(&_stream_id, cntl, &opt)) { delete opt.handler; - _init_st = Status::Error<true>(ret, "Failed to create stream"); - return _init_st; + _status = Status::Error<true>(ret, "Failed to create stream"); + return _status; } cntl.set_timeout_ms(config::open_load_stream_timeout_ms); POpenLoadStreamRequest request; @@ -174,8 +175,8 @@ Status LoadStreamStub::open(BrpcClientCache<PBackendService_Stub>* client_cache, } else if (total_streams > 0) { request.set_total_streams(total_streams); } else { - _init_st = Status::InternalError("total_streams should be greator than 0"); - return _init_st; + _status = Status::InternalError("total_streams should be greator than 0"); + return _status; } request.set_idle_timeout_ms(idle_timeout_ms); schema.to_protobuf(request.mutable_schema()); @@ -199,13 +200,13 @@ Status LoadStreamStub::open(BrpcClientCache<PBackendService_Stub>* client_cache, } if (cntl.Failed()) { brpc::StreamClose(_stream_id); - _init_st = Status::InternalError("Failed to connect to backend {}: {}", _dst_id, - cntl.ErrorText()); - return _init_st; + _status = Status::InternalError("Failed to connect to backend {}: {}", _dst_id, + cntl.ErrorText()); + return _status; } LOG(INFO) << "open load stream to host=" << node_info.host << ", port=" << node_info.brpc_port << ", " << *this; - _is_init.store(true); + _is_open.store(true); return Status::OK(); } @@ -213,9 +214,9 @@ Status LoadStreamStub::open(BrpcClientCache<PBackendService_Stub>* client_cache, Status LoadStreamStub::append_data(int64_t partition_id, int64_t index_id, int64_t tablet_id, int64_t segment_id, uint64_t offset, std::span<const Slice> data, bool segment_eos) { - if (!_is_init.load()) { - add_failed_tablet(tablet_id, _init_st); - return _init_st; + if (!_is_open.load()) { + add_failed_tablet(tablet_id, _status); + return _status; } DBUG_EXECUTE_IF("LoadStreamStub.only_send_segment_0", { if (segment_id != 0) { @@ -239,9 +240,9 @@ Status LoadStreamStub::append_data(int64_t partition_id, int64_t index_id, int64 Status LoadStreamStub::add_segment(int64_t partition_id, int64_t index_id, int64_t tablet_id, int64_t segment_id, const SegmentStatistics& segment_stat, TabletSchemaSPtr flush_schema) { - if (!_is_init.load()) { - add_failed_tablet(tablet_id, _init_st); - return _init_st; + if (!_is_open.load()) { + add_failed_tablet(tablet_id, _status); + return _status; } DBUG_EXECUTE_IF("LoadStreamStub.only_send_segment_0", { if (segment_id != 0) { @@ -265,8 +266,8 @@ Status LoadStreamStub::add_segment(int64_t partition_id, int64_t index_id, int64 // CLOSE_LOAD Status LoadStreamStub::close_load(const std::vector<PTabletID>& tablets_to_commit) { - if (!_is_init.load()) { - return _init_st; + if (!_is_open.load()) { + return _status; } PStreamHeader header; *header.mutable_load_id() = _load_id; @@ -275,10 +276,10 @@ Status LoadStreamStub::close_load(const std::vector<PTabletID>& tablets_to_commi for (const auto& tablet : tablets_to_commit) { *header.add_tablets() = tablet; } - _close_st = _encode_and_send(header); - if (!_close_st.ok()) { - LOG(WARNING) << "stream " << _stream_id << " close failed: " << _close_st; - return _close_st; + _status = _encode_and_send(header); + if (!_status.ok()) { + LOG(WARNING) << "stream " << _stream_id << " close failed: " << _status; + return _status; } _is_closing.store(true); return Status::OK(); @@ -286,8 +287,8 @@ Status LoadStreamStub::close_load(const std::vector<PTabletID>& tablets_to_commi // GET_SCHEMA Status LoadStreamStub::get_schema(const std::vector<PTabletID>& tablets) { - if (!_is_init.load()) { - return _init_st; + if (!_is_open.load()) { + return _status; } PStreamHeader header; *header.mutable_load_id() = _load_id; @@ -309,8 +310,8 @@ Status LoadStreamStub::get_schema(const std::vector<PTabletID>& tablets) { Status LoadStreamStub::wait_for_schema(int64_t partition_id, int64_t index_id, int64_t tablet_id, int64_t timeout_ms) { - if (!_is_init.load()) { - return _init_st; + if (!_is_open.load()) { + return _status; } if (_tablet_schema_for_index->contains(index_id)) { return Status::OK(); @@ -337,11 +338,8 @@ Status LoadStreamStub::wait_for_schema(int64_t partition_id, int64_t index_id, i Status LoadStreamStub::close_wait(RuntimeState* state, int64_t timeout_ms) { DBUG_EXECUTE_IF("LoadStreamStub::close_wait.long_wait", DBUG_BLOCK); - if (!_is_init.load()) { - return _init_st; - } if (!_is_closing.load()) { - return _close_st; + return _status; } if (_is_closed.load()) { return _check_cancel(); @@ -370,7 +368,7 @@ Status LoadStreamStub::close_wait(RuntimeState* state, int64_t timeout_ms) { void LoadStreamStub::cancel(Status reason) { LOG(WARNING) << *this << " is cancelled because of " << reason; - if (_is_init.load()) { + if (_is_open.load()) { brpc::StreamClose(_stream_id); } { diff --git a/be/src/vec/sink/load_stream_stub.h b/be/src/vec/sink/load_stream_stub.h index 048cf4dc60f..223babb42e3 100644 --- a/be/src/vec/sink/load_stream_stub.h +++ b/be/src/vec/sink/load_stream_stub.h @@ -195,7 +195,7 @@ public: int64_t dst_id() const { return _dst_id; } - bool is_inited() const { return _is_init.load(); } + bool is_open() const { return _is_open.load(); } bool is_incremental() const { return _is_incremental; } @@ -231,6 +231,7 @@ private: protected: std::atomic<bool> _is_init; + std::atomic<bool> _is_open; std::atomic<bool> _is_closing; std::atomic<bool> _is_closed; std::atomic<bool> _is_cancelled; @@ -240,8 +241,7 @@ protected: brpc::StreamId _stream_id; int64_t _src_id = -1; // source backend_id int64_t _dst_id = -1; // destination backend_id - Status _init_st = Status::InternalError<false>("Stream is not open"); - Status _close_st; + Status _status = Status::InternalError<false>("Stream is not open"); Status _cancel_st; bthread::Mutex _open_mutex; diff --git a/be/src/vec/sink/writer/vtablet_writer_v2.cpp b/be/src/vec/sink/writer/vtablet_writer_v2.cpp index 273430c7948..16c11b1cf42 100644 --- a/be/src/vec/sink/writer/vtablet_writer_v2.cpp +++ b/be/src/vec/sink/writer/vtablet_writer_v2.cpp @@ -389,7 +389,7 @@ Status VTabletWriterV2::_select_streams(int64_t tablet_id, int64_t partition_id, VLOG_DEBUG << fmt::format("_select_streams P{} I{} T{}", partition_id, index_id, tablet_id); _tablets_for_node[node_id].emplace(tablet_id, tablet); auto stream = _load_stream_map->at(node_id)->at(_stream_index); - for (int i = 1; i < _stream_per_node && !stream->is_inited(); i++) { + for (int i = 1; i < _stream_per_node && !stream->is_open(); i++) { stream = _load_stream_map->at(node_id)->at((_stream_index + i) % _stream_per_node); } streams.emplace_back(std::move(stream)); --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org For additional commands, e-mail: commits-h...@doris.apache.org