This is an automated email from the ASF dual-hosted git repository.
dataroaring pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/master by this push:
new 7f16e31d874 [fix](load_stream) close brpc stream after load stream is
closed (#56120)
7f16e31d874 is described below
commit 7f16e31d8746acb69da96d1cdf4c46b4af4ba97c
Author: Yongqiang YANG <[email protected]>
AuthorDate: Thu Sep 18 15:42:20 2025 +0800
[fix](load_stream) close brpc stream after load stream is closed (#56120)
Otherwise, auto partition on multi bes may lead to segment num mismatch
problem.
Co-authored-by: Yongqiang YANG <[email protected]>
Co-authored-by: Xin Liao <[email protected]>
---
be/src/runtime/load_stream.cpp | 25 +++++++++++++++++++++----
be/src/runtime/load_stream.h | 4 +++-
be/src/vec/sink/load_stream_map_pool.cpp | 6 +++++-
be/src/vec/sink/load_stream_map_pool.h | 1 +
be/src/vec/sink/load_stream_stub.cpp | 11 +++++++----
be/src/vec/sink/load_stream_stub.h | 4 ++--
gensrc/proto/internal_service.proto | 1 +
7 files changed, 40 insertions(+), 12 deletions(-)
diff --git a/be/src/runtime/load_stream.cpp b/be/src/runtime/load_stream.cpp
index d042e4ea752..42ac15ea51a 100644
--- a/be/src/runtime/load_stream.cpp
+++ b/be/src/runtime/load_stream.cpp
@@ -463,7 +463,7 @@ Status LoadStream::init(const POpenLoadStreamRequest*
request) {
return Status::OK();
}
-void LoadStream::close(int64_t src_id, const std::vector<PTabletID>&
tablets_to_commit,
+bool LoadStream::close(int64_t src_id, const std::vector<PTabletID>&
tablets_to_commit,
std::vector<int64_t>* success_tablet_ids,
FailedTablets* failed_tablets) {
std::lock_guard<bthread::Mutex> lock_guard(_lock);
SCOPED_TIMER(_close_wait_timer);
@@ -482,7 +482,7 @@ void LoadStream::close(int64_t src_id, const
std::vector<PTabletID>& tablets_to_
if (_close_load_cnt < _total_streams) {
// do not return commit info if there is remaining streams.
- return;
+ return false;
}
for (auto& [_, index_stream] : _index_streams_map) {
@@ -490,6 +490,7 @@ void LoadStream::close(int64_t src_id, const
std::vector<PTabletID>& tablets_to_
}
LOG(INFO) << "close load " << *this << ", success_tablet_num=" <<
success_tablet_ids->size()
<< ", failed_tablet_num=" << failed_tablets->size();
+ return true;
}
void LoadStream::_report_result(StreamId stream, const Status& status,
@@ -680,9 +681,25 @@ void LoadStream::_dispatch(StreamId id, const
PStreamHeader& hdr, butil::IOBuf*
std::vector<int64_t> success_tablet_ids;
FailedTablets failed_tablets;
std::vector<PTabletID> tablets_to_commit(hdr.tablets().begin(),
hdr.tablets().end());
- close(hdr.src_id(), tablets_to_commit, &success_tablet_ids,
&failed_tablets);
+ bool all_closed =
+ close(hdr.src_id(), tablets_to_commit, &success_tablet_ids,
&failed_tablets);
_report_result(id, Status::OK(), success_tablet_ids, failed_tablets,
true);
- brpc::StreamClose(id);
+ std::lock_guard<bthread::Mutex> lock_guard(_lock);
+ // if incremental stream, we need to wait for all non-incremental
streams to be closed
+ // before closing incremental streams. We need a fencing mechanism to
avoid use after closing
+ // across different be.
+ if (hdr.has_num_incremental_streams() && hdr.num_incremental_streams()
> 0) {
+ _closing_stream_ids.push_back(id);
+ } else {
+ brpc::StreamClose(id);
+ }
+
+ if (all_closed) {
+ for (auto& closing_id : _closing_stream_ids) {
+ brpc::StreamClose(closing_id);
+ }
+ _closing_stream_ids.clear();
+ }
} break;
case PStreamHeader::GET_SCHEMA: {
_report_schema(id, hdr);
diff --git a/be/src/runtime/load_stream.h b/be/src/runtime/load_stream.h
index 6e84a3f12f8..9aefa3d9093 100644
--- a/be/src/runtime/load_stream.h
+++ b/be/src/runtime/load_stream.h
@@ -129,7 +129,8 @@ public:
}
}
- void close(int64_t src_id, const std::vector<PTabletID>& tablets_to_commit,
+ // return true if all streams are closed, otherwise return false
+ bool close(int64_t src_id, const std::vector<PTabletID>& tablets_to_commit,
std::vector<int64_t>* success_tablet_ids, FailedTablets*
failed_tablet_ids);
// callbacks called by brpc
@@ -177,6 +178,7 @@ private:
RuntimeProfile::Counter* _close_wait_timer = nullptr;
LoadStreamMgr* _load_stream_mgr = nullptr;
std::shared_ptr<ResourceContext> _resource_ctx;
+ std::vector<int64_t> _closing_stream_ids;
bool _is_incremental = false;
};
diff --git a/be/src/vec/sink/load_stream_map_pool.cpp
b/be/src/vec/sink/load_stream_map_pool.cpp
index 24a1cb77a48..e68977ea64e 100644
--- a/be/src/vec/sink/load_stream_map_pool.cpp
+++ b/be/src/vec/sink/load_stream_map_pool.cpp
@@ -29,6 +29,7 @@ LoadStreamMap::LoadStreamMap(UniqueId load_id, int64_t
src_id, int num_streams,
_src_id(src_id),
_num_streams(num_streams),
_use_cnt(num_use),
+ _num_incremental_streams(0),
_pool(pool),
_tablet_schema_for_index(std::make_shared<IndexToTabletSchema>()),
_enable_unique_mow_for_index(std::make_shared<IndexToEnableMoW>()) {
@@ -42,6 +43,9 @@ std::shared_ptr<LoadStreamStubs>
LoadStreamMap::get_or_create(int64_t dst_id, bo
if (streams != nullptr) {
return streams;
}
+ if (incremental) {
+ _num_incremental_streams.fetch_add(1);
+ }
streams = std::make_shared<LoadStreamStubs>(_num_streams, _load_id,
_src_id,
_tablet_schema_for_index,
_enable_unique_mow_for_index,
incremental);
@@ -118,7 +122,7 @@ void LoadStreamMap::close_load(bool incremental) {
tablets_to_commit.push_back(tablet);
tablets_to_commit.back().set_num_segments(_segments_for_tablet[tablet_id]);
}
- auto st = streams->close_load(tablets_to_commit);
+ auto st = streams->close_load(tablets_to_commit,
_num_incremental_streams.load());
if (!st.ok()) {
LOG(WARNING) << "close_load for " << (incremental ? "incremental"
: "non-incremental")
<< " streams failed: " << st << ", load_id=" <<
_load_id;
diff --git a/be/src/vec/sink/load_stream_map_pool.h
b/be/src/vec/sink/load_stream_map_pool.h
index ab8a40d0c91..6186aa85c86 100644
--- a/be/src/vec/sink/load_stream_map_pool.h
+++ b/be/src/vec/sink/load_stream_map_pool.h
@@ -112,6 +112,7 @@ private:
const int64_t _src_id;
const int _num_streams;
std::atomic<int> _use_cnt;
+ std::atomic<int> _num_incremental_streams;
std::mutex _mutex;
std::unordered_map<int64_t, std::shared_ptr<LoadStreamStubs>>
_streams_for_node;
LoadStreamMapPool* _pool = nullptr;
diff --git a/be/src/vec/sink/load_stream_stub.cpp
b/be/src/vec/sink/load_stream_stub.cpp
index 17ae49eb570..7f6ef126fd7 100644
--- a/be/src/vec/sink/load_stream_stub.cpp
+++ b/be/src/vec/sink/load_stream_stub.cpp
@@ -253,7 +253,8 @@ Status LoadStreamStub::add_segment(int64_t partition_id,
int64_t index_id, int64
}
// CLOSE_LOAD
-Status LoadStreamStub::close_load(const std::vector<PTabletID>&
tablets_to_commit) {
+Status LoadStreamStub::close_load(const std::vector<PTabletID>&
tablets_to_commit,
+ int num_incremental_streams) {
if (!_is_open.load()) {
return _status;
}
@@ -264,6 +265,7 @@ Status LoadStreamStub::close_load(const
std::vector<PTabletID>& tablets_to_commi
for (const auto& tablet : tablets_to_commit) {
*header.add_tablets() = tablet;
}
+ header.set_num_incremental_streams(num_incremental_streams);
_status = _encode_and_send(header);
if (!_status.ok()) {
LOG(WARNING) << "stream " << _stream_id << " close failed: " <<
_status;
@@ -541,7 +543,8 @@ Status
LoadStreamStubs::open(BrpcClientCache<PBackendService_Stub>* client_cache
return status;
}
-Status LoadStreamStubs::close_load(const std::vector<PTabletID>&
tablets_to_commit) {
+Status LoadStreamStubs::close_load(const std::vector<PTabletID>&
tablets_to_commit,
+ int num_incremental_streams) {
if (!_open_success.load()) {
return Status::InternalError("streams not open");
}
@@ -550,10 +553,10 @@ Status LoadStreamStubs::close_load(const
std::vector<PTabletID>& tablets_to_comm
for (auto& stream : _streams) {
Status st;
if (first) {
- st = stream->close_load(tablets_to_commit);
+ st = stream->close_load(tablets_to_commit,
num_incremental_streams);
first = false;
} else {
- st = stream->close_load({});
+ st = stream->close_load({}, num_incremental_streams);
}
if (!st.ok()) {
LOG(WARNING) << "close_load failed: " << st << "; stream: " <<
*stream;
diff --git a/be/src/vec/sink/load_stream_stub.h
b/be/src/vec/sink/load_stream_stub.h
index 066805087a6..15e51cdd8e6 100644
--- a/be/src/vec/sink/load_stream_stub.h
+++ b/be/src/vec/sink/load_stream_stub.h
@@ -147,7 +147,7 @@ public:
int32_t segment_id, const SegmentStatistics&
segment_stat);
// CLOSE_LOAD
- Status close_load(const std::vector<PTabletID>& tablets_to_commit);
+ Status close_load(const std::vector<PTabletID>& tablets_to_commit, int
num_incremental_streams);
// GET_SCHEMA
Status get_schema(const std::vector<PTabletID>& tablets);
@@ -320,7 +320,7 @@ public:
}
}
- Status close_load(const std::vector<PTabletID>& tablets_to_commit);
+ Status close_load(const std::vector<PTabletID>& tablets_to_commit, int
num_incremental_streams);
std::unordered_set<int64_t> success_tablets() {
std::unordered_set<int64_t> s;
diff --git a/gensrc/proto/internal_service.proto
b/gensrc/proto/internal_service.proto
index c07d124652c..89ec5cb6f09 100644
--- a/gensrc/proto/internal_service.proto
+++ b/gensrc/proto/internal_service.proto
@@ -991,6 +991,7 @@ message PStreamHeader {
optional TabletSchemaPB flush_schema = 11;
optional uint64 offset = 12;
optional FileType file_type = 13;
+ optional int64 num_incremental_streams = 14;
}
message PGetWalQueueSizeRequest{
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]