This is an automated email from the ASF dual-hosted git repository.

dataroaring pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/doris.git


The following commit(s) were added to refs/heads/master by this push:
     new 7f16e31d874 [fix](load_stream) close brpc stream after load stream is 
closed (#56120)
7f16e31d874 is described below

commit 7f16e31d8746acb69da96d1cdf4c46b4af4ba97c
Author: Yongqiang YANG <[email protected]>
AuthorDate: Thu Sep 18 15:42:20 2025 +0800

    [fix](load_stream) close brpc stream after load stream is closed (#56120)
    
    Otherwise, auto partition on multi bes may lead to segment num mismatch
    problem.
    
    Co-authored-by: Yongqiang YANG <[email protected]>
    Co-authored-by: Xin Liao <[email protected]>
---
 be/src/runtime/load_stream.cpp           | 25 +++++++++++++++++++++----
 be/src/runtime/load_stream.h             |  4 +++-
 be/src/vec/sink/load_stream_map_pool.cpp |  6 +++++-
 be/src/vec/sink/load_stream_map_pool.h   |  1 +
 be/src/vec/sink/load_stream_stub.cpp     | 11 +++++++----
 be/src/vec/sink/load_stream_stub.h       |  4 ++--
 gensrc/proto/internal_service.proto      |  1 +
 7 files changed, 40 insertions(+), 12 deletions(-)

diff --git a/be/src/runtime/load_stream.cpp b/be/src/runtime/load_stream.cpp
index d042e4ea752..42ac15ea51a 100644
--- a/be/src/runtime/load_stream.cpp
+++ b/be/src/runtime/load_stream.cpp
@@ -463,7 +463,7 @@ Status LoadStream::init(const POpenLoadStreamRequest* 
request) {
     return Status::OK();
 }
 
-void LoadStream::close(int64_t src_id, const std::vector<PTabletID>& 
tablets_to_commit,
+bool LoadStream::close(int64_t src_id, const std::vector<PTabletID>& 
tablets_to_commit,
                        std::vector<int64_t>* success_tablet_ids, 
FailedTablets* failed_tablets) {
     std::lock_guard<bthread::Mutex> lock_guard(_lock);
     SCOPED_TIMER(_close_wait_timer);
@@ -482,7 +482,7 @@ void LoadStream::close(int64_t src_id, const 
std::vector<PTabletID>& tablets_to_
 
     if (_close_load_cnt < _total_streams) {
         // do not return commit info if there is remaining streams.
-        return;
+        return false;
     }
 
     for (auto& [_, index_stream] : _index_streams_map) {
@@ -490,6 +490,7 @@ void LoadStream::close(int64_t src_id, const 
std::vector<PTabletID>& tablets_to_
     }
     LOG(INFO) << "close load " << *this << ", success_tablet_num=" << 
success_tablet_ids->size()
               << ", failed_tablet_num=" << failed_tablets->size();
+    return true;
 }
 
 void LoadStream::_report_result(StreamId stream, const Status& status,
@@ -680,9 +681,25 @@ void LoadStream::_dispatch(StreamId id, const 
PStreamHeader& hdr, butil::IOBuf*
         std::vector<int64_t> success_tablet_ids;
         FailedTablets failed_tablets;
         std::vector<PTabletID> tablets_to_commit(hdr.tablets().begin(), 
hdr.tablets().end());
-        close(hdr.src_id(), tablets_to_commit, &success_tablet_ids, 
&failed_tablets);
+        bool all_closed =
+                close(hdr.src_id(), tablets_to_commit, &success_tablet_ids, 
&failed_tablets);
         _report_result(id, Status::OK(), success_tablet_ids, failed_tablets, 
true);
-        brpc::StreamClose(id);
+        std::lock_guard<bthread::Mutex> lock_guard(_lock);
+        // if incremental stream, we need to wait for all non-incremental 
streams to be closed
+        // before closing incremental streams. We need a fencing mechanism to 
avoid use after closing
+        // across different be.
+        if (hdr.has_num_incremental_streams() && hdr.num_incremental_streams() 
> 0) {
+            _closing_stream_ids.push_back(id);
+        } else {
+            brpc::StreamClose(id);
+        }
+
+        if (all_closed) {
+            for (auto& closing_id : _closing_stream_ids) {
+                brpc::StreamClose(closing_id);
+            }
+            _closing_stream_ids.clear();
+        }
     } break;
     case PStreamHeader::GET_SCHEMA: {
         _report_schema(id, hdr);
diff --git a/be/src/runtime/load_stream.h b/be/src/runtime/load_stream.h
index 6e84a3f12f8..9aefa3d9093 100644
--- a/be/src/runtime/load_stream.h
+++ b/be/src/runtime/load_stream.h
@@ -129,7 +129,8 @@ public:
         }
     }
 
-    void close(int64_t src_id, const std::vector<PTabletID>& tablets_to_commit,
+    // return true if all streams are closed, otherwise return false
+    bool close(int64_t src_id, const std::vector<PTabletID>& tablets_to_commit,
                std::vector<int64_t>* success_tablet_ids, FailedTablets* 
failed_tablet_ids);
 
     // callbacks called by brpc
@@ -177,6 +178,7 @@ private:
     RuntimeProfile::Counter* _close_wait_timer = nullptr;
     LoadStreamMgr* _load_stream_mgr = nullptr;
     std::shared_ptr<ResourceContext> _resource_ctx;
+    std::vector<int64_t> _closing_stream_ids;
     bool _is_incremental = false;
 };
 
diff --git a/be/src/vec/sink/load_stream_map_pool.cpp 
b/be/src/vec/sink/load_stream_map_pool.cpp
index 24a1cb77a48..e68977ea64e 100644
--- a/be/src/vec/sink/load_stream_map_pool.cpp
+++ b/be/src/vec/sink/load_stream_map_pool.cpp
@@ -29,6 +29,7 @@ LoadStreamMap::LoadStreamMap(UniqueId load_id, int64_t 
src_id, int num_streams,
           _src_id(src_id),
           _num_streams(num_streams),
           _use_cnt(num_use),
+          _num_incremental_streams(0),
           _pool(pool),
           _tablet_schema_for_index(std::make_shared<IndexToTabletSchema>()),
           _enable_unique_mow_for_index(std::make_shared<IndexToEnableMoW>()) {
@@ -42,6 +43,9 @@ std::shared_ptr<LoadStreamStubs> 
LoadStreamMap::get_or_create(int64_t dst_id, bo
     if (streams != nullptr) {
         return streams;
     }
+    if (incremental) {
+        _num_incremental_streams.fetch_add(1);
+    }
     streams = std::make_shared<LoadStreamStubs>(_num_streams, _load_id, 
_src_id,
                                                 _tablet_schema_for_index,
                                                 _enable_unique_mow_for_index, 
incremental);
@@ -118,7 +122,7 @@ void LoadStreamMap::close_load(bool incremental) {
             tablets_to_commit.push_back(tablet);
             
tablets_to_commit.back().set_num_segments(_segments_for_tablet[tablet_id]);
         }
-        auto st = streams->close_load(tablets_to_commit);
+        auto st = streams->close_load(tablets_to_commit, 
_num_incremental_streams.load());
         if (!st.ok()) {
             LOG(WARNING) << "close_load for " << (incremental ? "incremental" 
: "non-incremental")
                          << " streams failed: " << st << ", load_id=" << 
_load_id;
diff --git a/be/src/vec/sink/load_stream_map_pool.h 
b/be/src/vec/sink/load_stream_map_pool.h
index ab8a40d0c91..6186aa85c86 100644
--- a/be/src/vec/sink/load_stream_map_pool.h
+++ b/be/src/vec/sink/load_stream_map_pool.h
@@ -112,6 +112,7 @@ private:
     const int64_t _src_id;
     const int _num_streams;
     std::atomic<int> _use_cnt;
+    std::atomic<int> _num_incremental_streams;
     std::mutex _mutex;
     std::unordered_map<int64_t, std::shared_ptr<LoadStreamStubs>> 
_streams_for_node;
     LoadStreamMapPool* _pool = nullptr;
diff --git a/be/src/vec/sink/load_stream_stub.cpp 
b/be/src/vec/sink/load_stream_stub.cpp
index 17ae49eb570..7f6ef126fd7 100644
--- a/be/src/vec/sink/load_stream_stub.cpp
+++ b/be/src/vec/sink/load_stream_stub.cpp
@@ -253,7 +253,8 @@ Status LoadStreamStub::add_segment(int64_t partition_id, 
int64_t index_id, int64
 }
 
 // CLOSE_LOAD
-Status LoadStreamStub::close_load(const std::vector<PTabletID>& 
tablets_to_commit) {
+Status LoadStreamStub::close_load(const std::vector<PTabletID>& 
tablets_to_commit,
+                                  int num_incremental_streams) {
     if (!_is_open.load()) {
         return _status;
     }
@@ -264,6 +265,7 @@ Status LoadStreamStub::close_load(const 
std::vector<PTabletID>& tablets_to_commi
     for (const auto& tablet : tablets_to_commit) {
         *header.add_tablets() = tablet;
     }
+    header.set_num_incremental_streams(num_incremental_streams);
     _status = _encode_and_send(header);
     if (!_status.ok()) {
         LOG(WARNING) << "stream " << _stream_id << " close failed: " << 
_status;
@@ -541,7 +543,8 @@ Status 
LoadStreamStubs::open(BrpcClientCache<PBackendService_Stub>* client_cache
     return status;
 }
 
-Status LoadStreamStubs::close_load(const std::vector<PTabletID>& 
tablets_to_commit) {
+Status LoadStreamStubs::close_load(const std::vector<PTabletID>& 
tablets_to_commit,
+                                   int num_incremental_streams) {
     if (!_open_success.load()) {
         return Status::InternalError("streams not open");
     }
@@ -550,10 +553,10 @@ Status LoadStreamStubs::close_load(const 
std::vector<PTabletID>& tablets_to_comm
     for (auto& stream : _streams) {
         Status st;
         if (first) {
-            st = stream->close_load(tablets_to_commit);
+            st = stream->close_load(tablets_to_commit, 
num_incremental_streams);
             first = false;
         } else {
-            st = stream->close_load({});
+            st = stream->close_load({}, num_incremental_streams);
         }
         if (!st.ok()) {
             LOG(WARNING) << "close_load failed: " << st << "; stream: " << 
*stream;
diff --git a/be/src/vec/sink/load_stream_stub.h 
b/be/src/vec/sink/load_stream_stub.h
index 066805087a6..15e51cdd8e6 100644
--- a/be/src/vec/sink/load_stream_stub.h
+++ b/be/src/vec/sink/load_stream_stub.h
@@ -147,7 +147,7 @@ public:
                        int32_t segment_id, const SegmentStatistics& 
segment_stat);
 
     // CLOSE_LOAD
-    Status close_load(const std::vector<PTabletID>& tablets_to_commit);
+    Status close_load(const std::vector<PTabletID>& tablets_to_commit, int 
num_incremental_streams);
 
     // GET_SCHEMA
     Status get_schema(const std::vector<PTabletID>& tablets);
@@ -320,7 +320,7 @@ public:
         }
     }
 
-    Status close_load(const std::vector<PTabletID>& tablets_to_commit);
+    Status close_load(const std::vector<PTabletID>& tablets_to_commit, int 
num_incremental_streams);
 
     std::unordered_set<int64_t> success_tablets() {
         std::unordered_set<int64_t> s;
diff --git a/gensrc/proto/internal_service.proto 
b/gensrc/proto/internal_service.proto
index c07d124652c..89ec5cb6f09 100644
--- a/gensrc/proto/internal_service.proto
+++ b/gensrc/proto/internal_service.proto
@@ -991,6 +991,7 @@ message PStreamHeader {
     optional TabletSchemaPB flush_schema = 11;
     optional uint64 offset = 12;
     optional FileType file_type = 13;
+    optional int64 num_incremental_streams = 14;
 }
 
 message PGetWalQueueSizeRequest{


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to