This is an automated email from the ASF dual-hosted git repository.

yiguolei pushed a commit to branch branch-2.1
in repository https://gitbox.apache.org/repos/asf/doris.git


The following commit(s) were added to refs/heads/branch-2.1 by this push:
     new 0f0567eb730 Revert "[fix](move-memtable) fix initial use count of 
streams for auto partition (#33165) (#33236)"
0f0567eb730 is described below

commit 0f0567eb7303fe2006581c99a92d47ba2739f8ee
Author: yiguolei <yiguo...@gmail.com>
AuthorDate: Thu Apr 4 00:35:18 2024 +0800

    Revert "[fix](move-memtable) fix initial use count of streams for auto 
partition (#33165) (#33236)"
    
    This reverts commit df197c6a1429a2b113e23193a4c8a32b0b4726fe.
---
 be/src/vec/sink/load_stream_stub.cpp            |  33 +++-
 be/src/vec/sink/load_stream_stub.h              |  14 +-
 be/src/vec/sink/load_stream_stub_pool.cpp       | 110 ++++----------
 be/src/vec/sink/load_stream_stub_pool.h         |  50 +++---
 be/src/vec/sink/writer/vtablet_writer_v2.cpp    | 192 ++++++++++++------------
 be/src/vec/sink/writer/vtablet_writer_v2.h      |  10 +-
 be/test/io/fs/stream_sink_file_writer_test.cpp  |   4 +-
 be/test/vec/exec/load_stream_stub_pool_test.cpp |  31 ++--
 8 files changed, 188 insertions(+), 256 deletions(-)

diff --git a/be/src/vec/sink/load_stream_stub.cpp 
b/be/src/vec/sink/load_stream_stub.cpp
index 7708e3b255f..6eb91e46853 100644
--- a/be/src/vec/sink/load_stream_stub.cpp
+++ b/be/src/vec/sink/load_stream_stub.cpp
@@ -125,13 +125,19 @@ inline std::ostream& operator<<(std::ostream& ostr, const 
LoadStreamReplyHandler
     return ostr;
 }
 
-LoadStreamStub::LoadStreamStub(PUniqueId load_id, int64_t src_id,
-                               std::shared_ptr<IndexToTabletSchema> schema_map,
-                               std::shared_ptr<IndexToEnableMoW> mow_map)
-        : _load_id(load_id),
+LoadStreamStub::LoadStreamStub(PUniqueId load_id, int64_t src_id, int num_use)
+        : _use_cnt(num_use),
+          _load_id(load_id),
           _src_id(src_id),
-          _tablet_schema_for_index(schema_map),
-          _enable_unique_mow_for_index(mow_map) {};
+          _tablet_schema_for_index(std::make_shared<IndexToTabletSchema>()),
+          _enable_unique_mow_for_index(std::make_shared<IndexToEnableMoW>()) 
{};
+
+LoadStreamStub::LoadStreamStub(LoadStreamStub& stub)
+        : _use_cnt(stub._use_cnt.load()),
+          _load_id(stub._load_id),
+          _src_id(stub._src_id),
+          _tablet_schema_for_index(stub._tablet_schema_for_index),
+          _enable_unique_mow_for_index(stub._enable_unique_mow_for_index) {};
 
 LoadStreamStub::~LoadStreamStub() {
     if (_is_init.load() && !_is_closed.load()) {
@@ -235,12 +241,23 @@ Status LoadStreamStub::add_segment(int64_t partition_id, 
int64_t index_id, int64
 
 // CLOSE_LOAD
 Status LoadStreamStub::close_load(const std::vector<PTabletID>& 
tablets_to_commit) {
+    {
+        std::lock_guard<std::mutex> lock(_tablets_to_commit_mutex);
+        _tablets_to_commit.insert(_tablets_to_commit.end(), 
tablets_to_commit.begin(),
+                                  tablets_to_commit.end());
+    }
+    if (--_use_cnt > 0) {
+        return Status::OK();
+    }
     PStreamHeader header;
     *header.mutable_load_id() = _load_id;
     header.set_src_id(_src_id);
     header.set_opcode(doris::PStreamHeader::CLOSE_LOAD);
-    for (const auto& tablet : tablets_to_commit) {
-        *header.add_tablets() = tablet;
+    {
+        std::lock_guard<std::mutex> lock(_tablets_to_commit_mutex);
+        for (const auto& tablet : _tablets_to_commit) {
+            *header.add_tablets() = tablet;
+        }
     }
     return _encode_and_send(header);
 }
diff --git a/be/src/vec/sink/load_stream_stub.h 
b/be/src/vec/sink/load_stream_stub.h
index aa8b850760e..f20b0e6ea3d 100644
--- a/be/src/vec/sink/load_stream_stub.h
+++ b/be/src/vec/sink/load_stream_stub.h
@@ -109,14 +109,10 @@ class LoadStreamStub {
 
 public:
     // construct new stub
-    LoadStreamStub(PUniqueId load_id, int64_t src_id,
-                   std::shared_ptr<IndexToTabletSchema> schema_map,
-                   std::shared_ptr<IndexToEnableMoW> mow_map);
+    LoadStreamStub(PUniqueId load_id, int64_t src_id, int num_use);
 
-    LoadStreamStub(UniqueId load_id, int64_t src_id,
-                   std::shared_ptr<IndexToTabletSchema> schema_map,
-                   std::shared_ptr<IndexToEnableMoW> mow_map)
-            : LoadStreamStub(load_id.to_proto(), src_id, schema_map, mow_map) 
{};
+    // copy constructor, shared_ptr members are shared
+    LoadStreamStub(LoadStreamStub& stub);
 
 // for mock this class in UT
 #ifdef BE_TEST
@@ -217,6 +213,7 @@ protected:
     std::atomic<bool> _is_closed;
     std::atomic<bool> _is_cancelled;
     std::atomic<bool> _is_eos;
+    std::atomic<int> _use_cnt;
 
     PUniqueId _load_id;
     brpc::StreamId _stream_id;
@@ -229,6 +226,9 @@ protected:
     bthread::Mutex _cancel_mutex;
     bthread::ConditionVariable _close_cv;
 
+    std::mutex _tablets_to_commit_mutex;
+    std::vector<PTabletID> _tablets_to_commit;
+
     std::mutex _buffer_mutex;
     std::mutex _send_mutex;
     butil::IOBuf _buffer;
diff --git a/be/src/vec/sink/load_stream_stub_pool.cpp 
b/be/src/vec/sink/load_stream_stub_pool.cpp
index 3eae49aff77..d76402b57d5 100644
--- a/be/src/vec/sink/load_stream_stub_pool.cpp
+++ b/be/src/vec/sink/load_stream_stub_pool.cpp
@@ -23,104 +23,50 @@
 namespace doris {
 class TExpr;
 
-LoadStreamMap::LoadStreamMap(UniqueId load_id, int64_t src_id, int 
num_streams, int num_use,
-                             LoadStreamStubPool* pool)
-        : _load_id(load_id),
-          _src_id(src_id),
-          _num_streams(num_streams),
-          _use_cnt(num_use),
-          _pool(pool) {
-    DCHECK(num_streams > 0) << "stream num should be greater than 0";
-    DCHECK(num_use > 0) << "use num should be greater than 0";
-}
-
-std::shared_ptr<Streams> LoadStreamMap::get_or_create(int64_t dst_id) {
-    std::lock_guard<std::mutex> lock(_mutex);
-    std::shared_ptr<Streams> streams = _streams_for_node[dst_id];
-    if (streams != nullptr) {
-        return streams;
-    }
-    streams = std::make_shared<Streams>();
-    auto schema_map = std::make_shared<IndexToTabletSchema>();
-    auto mow_map = std::make_shared<IndexToEnableMoW>();
-    for (int i = 0; i < _num_streams; i++) {
-        streams->emplace_back(new LoadStreamStub(_load_id, _src_id, 
schema_map, mow_map));
-    }
-    _streams_for_node[dst_id] = streams;
-    return streams;
-}
-
-std::shared_ptr<Streams> LoadStreamMap::at(int64_t dst_id) {
-    std::lock_guard<std::mutex> lock(_mutex);
-    return _streams_for_node.at(dst_id);
-}
-
-bool LoadStreamMap::contains(int64_t dst_id) {
-    std::lock_guard<std::mutex> lock(_mutex);
-    return _streams_for_node.contains(dst_id);
-}
+LoadStreams::LoadStreams(UniqueId load_id, int64_t dst_id, int num_use, 
LoadStreamStubPool* pool)
+        : _load_id(load_id), _dst_id(dst_id), _use_cnt(num_use), _pool(pool) {}
 
-void LoadStreamMap::for_each(std::function<void(int64_t, const Streams&)> fn) {
-    std::lock_guard<std::mutex> lock(_mutex);
-    for (auto& [dst_id, streams] : _streams_for_node) {
-        fn(dst_id, *streams);
-    }
-}
-
-Status LoadStreamMap::for_each_st(std::function<Status(int64_t, const 
Streams&)> fn) {
-    std::lock_guard<std::mutex> lock(_mutex);
-    for (auto& [dst_id, streams] : _streams_for_node) {
-        RETURN_IF_ERROR(fn(dst_id, *streams));
-    }
-    return Status::OK();
-}
-
-void LoadStreamMap::save_tablets_to_commit(int64_t dst_id,
-                                           const std::vector<PTabletID>& 
tablets_to_commit) {
-    std::lock_guard<std::mutex> lock(_tablets_to_commit_mutex);
-    auto& tablets = _tablets_to_commit[dst_id];
-    tablets.insert(tablets.end(), tablets_to_commit.begin(), 
tablets_to_commit.end());
-}
-
-bool LoadStreamMap::release() {
+void LoadStreams::release() {
     int num_use = --_use_cnt;
+    DBUG_EXECUTE_IF("LoadStreams.release.keeping_streams", { num_use = 1; });
     if (num_use == 0) {
-        LOG(INFO) << "releasing streams, load_id=" << _load_id;
-        _pool->erase(_load_id);
-        return true;
+        LOG(INFO) << "releasing streams, load_id=" << _load_id << ", dst_id=" 
<< _dst_id;
+        _pool->erase(_load_id, _dst_id);
+    } else {
+        LOG(INFO) << "keeping streams, load_id=" << _load_id << ", dst_id=" << 
_dst_id
+                  << ", use_cnt=" << num_use;
     }
-    LOG(INFO) << "keeping streams, load_id=" << _load_id << ", use_cnt=" << 
num_use;
-    return false;
-}
-
-Status LoadStreamMap::close_load() {
-    return for_each_st([this](int64_t dst_id, const Streams& streams) -> 
Status {
-        const auto& tablets = _tablets_to_commit[dst_id];
-        for (auto& stream : streams) {
-            RETURN_IF_ERROR(stream->close_load(tablets));
-        }
-        return Status::OK();
-    });
 }
 
 LoadStreamStubPool::LoadStreamStubPool() = default;
 
 LoadStreamStubPool::~LoadStreamStubPool() = default;
-std::shared_ptr<LoadStreamMap> LoadStreamStubPool::get_or_create(UniqueId 
load_id, int64_t src_id,
-                                                                 int 
num_streams, int num_use) {
+
+std::shared_ptr<LoadStreams> LoadStreamStubPool::get_or_create(PUniqueId 
load_id, int64_t src_id,
+                                                               int64_t dst_id, 
int num_streams,
+                                                               int num_sink) {
+    auto key = std::make_pair(UniqueId(load_id), dst_id);
     std::lock_guard<std::mutex> lock(_mutex);
-    std::shared_ptr<LoadStreamMap> streams = _pool[load_id];
-    if (streams != nullptr) {
+    std::shared_ptr<LoadStreams> streams = _pool[key];
+    if (streams) {
         return streams;
     }
-    streams = std::make_shared<LoadStreamMap>(load_id, src_id, num_streams, 
num_use, this);
-    _pool[load_id] = streams;
+    DCHECK(num_streams > 0) << "stream num should be greater than 0";
+    DCHECK(num_sink > 0) << "sink num should be greater than 0";
+    auto [it, _] = _template_stubs.emplace(load_id, new LoadStreamStub 
{load_id, src_id, num_sink});
+    streams = std::make_shared<LoadStreams>(load_id, dst_id, num_sink, this);
+    for (int32_t i = 0; i < num_streams; i++) {
+        // copy construct, internal tablet schema map will be shared among all 
stubs
+        streams->streams().emplace_back(new LoadStreamStub {*it->second});
+    }
+    _pool[key] = streams;
     return streams;
 }
 
-void LoadStreamStubPool::erase(UniqueId load_id) {
+void LoadStreamStubPool::erase(UniqueId load_id, int64_t dst_id) {
     std::lock_guard<std::mutex> lock(_mutex);
-    _pool.erase(load_id);
+    _pool.erase(std::make_pair(load_id, dst_id));
+    _template_stubs.erase(load_id);
 }
 
 } // namespace doris
diff --git a/be/src/vec/sink/load_stream_stub_pool.h 
b/be/src/vec/sink/load_stream_stub_pool.h
index 65f3bb66cd2..662fc5bc1a1 100644
--- a/be/src/vec/sink/load_stream_stub_pool.h
+++ b/be/src/vec/sink/load_stream_stub_pool.h
@@ -72,41 +72,20 @@ class LoadStreamStubPool;
 
 using Streams = std::vector<std::shared_ptr<LoadStreamStub>>;
 
-class LoadStreamMap {
+class LoadStreams {
 public:
-    LoadStreamMap(UniqueId load_id, int64_t src_id, int num_streams, int 
num_use,
-                  LoadStreamStubPool* pool);
+    LoadStreams(UniqueId load_id, int64_t dst_id, int num_use, 
LoadStreamStubPool* pool);
 
-    std::shared_ptr<Streams> get_or_create(int64_t dst_id);
+    void release();
 
-    std::shared_ptr<Streams> at(int64_t dst_id);
-
-    bool contains(int64_t dst_id);
-
-    void for_each(std::function<void(int64_t, const Streams&)> fn);
-
-    Status for_each_st(std::function<Status(int64_t, const Streams&)> fn);
-
-    void save_tablets_to_commit(int64_t dst_id, const std::vector<PTabletID>& 
tablets_to_commit);
-
-    // Return true if the last instance is just released.
-    bool release();
-
-    // send CLOSE_LOAD to all streams, return ERROR if any.
-    // only call this method after release() returns true.
-    Status close_load();
+    Streams& streams() { return _streams; }
 
 private:
-    const UniqueId _load_id;
-    const int64_t _src_id;
-    const int _num_streams;
+    Streams _streams;
+    UniqueId _load_id;
+    int64_t _dst_id;
     std::atomic<int> _use_cnt;
-    std::mutex _mutex;
-    std::unordered_map<int64_t, std::shared_ptr<Streams>> _streams_for_node;
     LoadStreamStubPool* _pool = nullptr;
-
-    std::mutex _tablets_to_commit_mutex;
-    std::unordered_map<int64_t, std::vector<PTabletID>> _tablets_to_commit;
 };
 
 class LoadStreamStubPool {
@@ -115,19 +94,26 @@ public:
 
     ~LoadStreamStubPool();
 
-    std::shared_ptr<LoadStreamMap> get_or_create(UniqueId load_id, int64_t 
src_id, int num_streams,
-                                                 int num_use);
+    std::shared_ptr<LoadStreams> get_or_create(PUniqueId load_id, int64_t 
src_id, int64_t dst_id,
+                                               int num_streams, int num_sink);
 
-    void erase(UniqueId load_id);
+    void erase(UniqueId load_id, int64_t dst_id);
 
     size_t size() {
         std::lock_guard<std::mutex> lock(_mutex);
         return _pool.size();
     }
 
+    // for UT only
+    size_t templates_size() {
+        std::lock_guard<std::mutex> lock(_mutex);
+        return _template_stubs.size();
+    }
+
 private:
     std::mutex _mutex;
-    std::unordered_map<UniqueId, std::shared_ptr<LoadStreamMap>> _pool;
+    std::unordered_map<UniqueId, std::unique_ptr<LoadStreamStub>> 
_template_stubs;
+    std::unordered_map<std::pair<UniqueId, int64_t>, 
std::shared_ptr<LoadStreams>> _pool;
 };
 
 } // namespace doris
diff --git a/be/src/vec/sink/writer/vtablet_writer_v2.cpp 
b/be/src/vec/sink/writer/vtablet_writer_v2.cpp
index 315df063ea0..68506ca161e 100644
--- a/be/src/vec/sink/writer/vtablet_writer_v2.cpp
+++ b/be/src/vec/sink/writer/vtablet_writer_v2.cpp
@@ -98,7 +98,7 @@ Status VTabletWriterV2::_incremental_open_streams(
                     tablet.set_partition_id(partition->id);
                     tablet.set_index_id(index.index_id);
                     tablet.set_tablet_id(tablet_id);
-                    if (!_streams_for_node->contains(node)) {
+                    if (!_streams_for_node.contains(node)) {
                         new_backends.insert(node);
                     }
                     _tablets_for_node[node].emplace(tablet_id, tablet);
@@ -111,9 +111,11 @@ Status VTabletWriterV2::_incremental_open_streams(
             }
         }
     }
-    for (int64_t dst_id : new_backends) {
-        auto streams = _streams_for_node->get_or_create(dst_id);
-        RETURN_IF_ERROR(_open_streams_to_backend(dst_id, *streams));
+    for (int64_t node_id : new_backends) {
+        auto load_streams = 
ExecEnv::GetInstance()->load_stream_stub_pool()->get_or_create(
+                _load_id, _backend_id, node_id, _stream_per_node, 
_num_local_sink);
+        RETURN_IF_ERROR(_open_streams_to_backend(node_id, *load_streams));
+        _streams_for_node[node_id] = load_streams;
     }
     return Status::OK();
 }
@@ -240,8 +242,6 @@ Status VTabletWriterV2::_init(RuntimeState* state, 
RuntimeProfile* profile) {
     } else {
         _delta_writer_for_tablet = 
std::make_shared<DeltaWriterV2Map>(_load_id);
     }
-    _streams_for_node = 
ExecEnv::GetInstance()->load_stream_stub_pool()->get_or_create(
-            _load_id, _backend_id, _stream_per_node, _num_local_sink);
     return Status::OK();
 }
 
@@ -253,21 +253,23 @@ Status VTabletWriterV2::open(RuntimeState* state, 
RuntimeProfile* profile) {
     SCOPED_CONSUME_MEM_TRACKER(_mem_tracker.get());
 
     RETURN_IF_ERROR(_build_tablet_node_mapping());
-    RETURN_IF_ERROR(_open_streams());
+    RETURN_IF_ERROR(_open_streams(_backend_id));
     RETURN_IF_ERROR(_init_row_distribution());
 
     return Status::OK();
 }
 
-Status VTabletWriterV2::_open_streams() {
+Status VTabletWriterV2::_open_streams(int64_t src_id) {
     for (auto& [dst_id, _] : _tablets_for_node) {
-        auto streams = _streams_for_node->get_or_create(dst_id);
+        auto streams = 
ExecEnv::GetInstance()->load_stream_stub_pool()->get_or_create(
+                _load_id, src_id, dst_id, _stream_per_node, _num_local_sink);
         RETURN_IF_ERROR(_open_streams_to_backend(dst_id, *streams));
+        _streams_for_node[dst_id] = streams;
     }
     return Status::OK();
 }
 
-Status VTabletWriterV2::_open_streams_to_backend(int64_t dst_id, Streams& 
streams) {
+Status VTabletWriterV2::_open_streams_to_backend(int64_t dst_id, LoadStreams& 
streams) {
     const auto* node_info = _nodes_info->find_node(dst_id);
     DBUG_EXECUTE_IF("VTabletWriterV2._open_streams_to_backend.node_info_null",
                     { node_info = nullptr; });
@@ -276,14 +278,14 @@ Status VTabletWriterV2::_open_streams_to_backend(int64_t 
dst_id, Streams& stream
     }
     auto idle_timeout_ms = _state->execution_timeout() * 1000;
     // get tablet schema from each backend only in the 1st stream
-    for (auto& stream : streams | std::ranges::views::take(1)) {
+    for (auto& stream : streams.streams() | std::ranges::views::take(1)) {
         const std::vector<PTabletID>& tablets_for_schema = 
_indexes_from_node[node_info->id];
         RETURN_IF_ERROR(stream->open(stream, 
_state->exec_env()->brpc_internal_client_cache(),
                                      *node_info, _txn_id, *_schema, 
tablets_for_schema,
                                      _total_streams, idle_timeout_ms, 
_state->enable_profile()));
     }
     // for the rest streams, open without getting tablet schema
-    for (auto& stream : streams | std::ranges::views::drop(1)) {
+    for (auto& stream : streams.streams() | std::ranges::views::drop(1)) {
         RETURN_IF_ERROR(stream->open(stream, 
_state->exec_env()->brpc_internal_client_cache(),
                                      *node_info, _txn_id, *_schema, {}, 
_total_streams,
                                      idle_timeout_ms, 
_state->enable_profile()));
@@ -358,7 +360,7 @@ Status VTabletWriterV2::_select_streams(int64_t tablet_id, 
int64_t partition_id,
         tablet.set_index_id(index_id);
         tablet.set_tablet_id(tablet_id);
         _tablets_for_node[node_id].emplace(tablet_id, tablet);
-        
streams.emplace_back(_streams_for_node->at(node_id)->at(_stream_index));
+        
streams.emplace_back(_streams_for_node.at(node_id)->streams().at(_stream_index));
         RETURN_IF_ERROR(streams[0]->wait_for_schema(partition_id, index_id, 
tablet_id));
     }
     _stream_index = (_stream_index + 1) % _stream_per_node;
@@ -467,13 +469,11 @@ Status VTabletWriterV2::_cancel(Status status) {
         _delta_writer_for_tablet->cancel(status);
         _delta_writer_for_tablet.reset();
     }
-    if (_streams_for_node) {
-        _streams_for_node->for_each([status](int64_t dst_id, const Streams& 
streams) {
-            for (auto& stream : streams) {
-                stream->cancel(status);
-            }
-        });
-        _streams_for_node->release();
+    for (const auto& [_, streams] : _streams_for_node) {
+        for (const auto& stream : streams->streams()) {
+            stream->cancel(status);
+        }
+        streams->release();
     }
     return Status::OK();
 }
@@ -527,83 +527,87 @@ Status VTabletWriterV2::close(Status exec_status) {
         COUNTER_SET(_row_distribution_timer, 
(int64_t)_row_distribution_watch.elapsed_time());
         COUNTER_SET(_validate_data_timer, 
_block_convertor->validate_data_ns());
 
-        // close DeltaWriters
+        // defer stream release to prevent memory leak
+        Defer defer([&] {
+            for (const auto& [_, streams] : _streams_for_node) {
+                streams->release();
+            }
+            _streams_for_node.clear();
+        });
+
         {
             SCOPED_TIMER(_close_writer_timer);
             // close all delta writers if this is the last user
-            auto st = _delta_writer_for_tablet->close(_profile);
+            RETURN_IF_ERROR(_delta_writer_for_tablet->close(_profile));
             _delta_writer_for_tablet.reset();
-            if (!st.ok()) {
-                RETURN_IF_ERROR(_cancel(st));
-            }
         }
 
-        _calc_tablets_to_commit();
-        const bool is_last_sink = _streams_for_node->release();
-        LOG(INFO) << "sink " << _sender_id << " released streams, is_last=" << 
is_last_sink
-                  << ", load_id=" << print_id(_load_id);
+        {
+            // send CLOSE_LOAD to all streams, return ERROR if any
+            for (const auto& [_, streams] : _streams_for_node) {
+                RETURN_IF_ERROR(_close_load(streams->streams()));
+            }
+        }
 
-        // send CLOSE_LOAD and close_wait on all streams
-        if (is_last_sink) {
-            RETURN_IF_ERROR(_streams_for_node->close_load());
+        {
             SCOPED_TIMER(_close_load_timer);
-            RETURN_IF_ERROR(_streams_for_node->for_each_st(
-                    [this](int64_t dst_id, const Streams& streams) -> Status {
-                        for (auto& stream : streams) {
-                            int64_t remain_ms =
-                                    
static_cast<int64_t>(_state->execution_timeout()) * 1000 -
-                                    _timeout_watch.elapsed_time() / 1000 / 
1000;
-                            if (remain_ms <= 0) {
-                                LOG(WARNING) << "load timed out before close 
waiting, load_id="
-                                             << print_id(_load_id);
-                                return Status::TimedOut("load timed out before 
close waiting");
-                            }
-                            RETURN_IF_ERROR(stream->close_wait(_state, 
remain_ms));
-                        }
-                        return Status::OK();
-                    }));
+            for (const auto& [_, streams] : _streams_for_node) {
+                for (const auto& stream : streams->streams()) {
+                    int64_t remain_ms = 
static_cast<int64_t>(_state->execution_timeout()) * 1000 -
+                                        _timeout_watch.elapsed_time() / 1000 / 
1000;
+                    if (remain_ms <= 0) {
+                        LOG(WARNING) << "load timed out before close waiting, 
load_id="
+                                     << print_id(_load_id);
+                        return Status::TimedOut("load timed out before close 
waiting");
+                    }
+                    RETURN_IF_ERROR(stream->close_wait(_state, remain_ms));
+                }
+            }
         }
 
-        // calculate and submit commit info
-        if (is_last_sink) {
-            std::unordered_map<int64_t, int> failed_tablets;
-            std::unordered_map<int64_t, Status> failed_reason;
-            std::vector<TTabletCommitInfo> tablet_commit_infos;
+        std::unordered_map<int64_t, int> failed_tablets;
 
-            _streams_for_node->for_each([&](int64_t dst_id, const Streams& 
streams) {
+        std::vector<TTabletCommitInfo> tablet_commit_infos;
+        for (const auto& [node_id, streams] : _streams_for_node) {
+            for (const auto& stream : streams->streams()) {
                 std::unordered_set<int64_t> known_tablets;
-                for (const auto& stream : streams) {
-                    for (auto [tablet_id, reason] : stream->failed_tablets()) {
-                        if (known_tablets.contains(tablet_id)) {
-                            continue;
-                        }
-                        known_tablets.insert(tablet_id);
-                        failed_tablets[tablet_id]++;
-                        failed_reason[tablet_id] = reason;
+                for (auto [tablet_id, _] : stream->failed_tablets()) {
+                    if (known_tablets.contains(tablet_id)) {
+                        continue;
                     }
-                    for (auto tablet_id : stream->success_tablets()) {
-                        if (known_tablets.contains(tablet_id)) {
-                            continue;
-                        }
-                        known_tablets.insert(tablet_id);
-                        TTabletCommitInfo commit_info;
-                        commit_info.tabletId = tablet_id;
-                        commit_info.backendId = dst_id;
-                        
tablet_commit_infos.emplace_back(std::move(commit_info));
+                    known_tablets.insert(tablet_id);
+                    failed_tablets[tablet_id]++;
+                }
+                for (auto tablet_id : stream->success_tablets()) {
+                    if (known_tablets.contains(tablet_id)) {
+                        continue;
                     }
+                    known_tablets.insert(tablet_id);
+                    TTabletCommitInfo commit_info;
+                    commit_info.tabletId = tablet_id;
+                    commit_info.backendId = node_id;
+                    tablet_commit_infos.emplace_back(std::move(commit_info));
                 }
-            });
-
-            for (auto [tablet_id, replicas] : failed_tablets) {
-                if (replicas > (_num_replicas - 1) / 2) {
-                    return failed_reason.at(tablet_id);
+            }
+        }
+        for (auto [tablet_id, replicas] : failed_tablets) {
+            if (replicas <= (_num_replicas - 1) / 2) {
+                continue;
+            }
+            auto backends = _location->find_tablet(tablet_id)->node_ids;
+            for (auto& backend_id : backends) {
+                for (const auto& stream : 
_streams_for_node[backend_id]->streams()) {
+                    const auto& failed_tablets = stream->failed_tablets();
+                    if (failed_tablets.contains(tablet_id)) {
+                        return failed_tablets.at(tablet_id);
+                    }
                 }
             }
-            _state->tablet_commit_infos().insert(
-                    _state->tablet_commit_infos().end(),
-                    std::make_move_iterator(tablet_commit_infos.begin()),
-                    std::make_move_iterator(tablet_commit_infos.end()));
+            DCHECK(false) << "failed tablet " << tablet_id << " should have 
failed reason";
         }
+        
_state->tablet_commit_infos().insert(_state->tablet_commit_infos().end(),
+                                             
std::make_move_iterator(tablet_commit_infos.begin()),
+                                             
std::make_move_iterator(tablet_commit_infos.end()));
 
         // _number_input_rows don't contain num_rows_load_filtered and 
num_rows_load_unselected in scan node
         int64_t num_rows_load_total = _number_input_rows + 
_state->num_rows_load_filtered() +
@@ -625,28 +629,18 @@ Status VTabletWriterV2::close(Status exec_status) {
     return status;
 }
 
-void VTabletWriterV2::_calc_tablets_to_commit() {
-    for (const auto& [dst_id, tablets] : _tablets_for_node) {
-        std::vector<PTabletID> tablets_to_commit;
-        std::vector<int64_t> partition_ids;
-        for (const auto& [tablet_id, tablet] : tablets) {
-            if 
(_tablet_finder->partition_ids().contains(tablet.partition_id())) {
-                if (VLOG_DEBUG_IS_ON) {
-                    partition_ids.push_back(tablet.partition_id());
-                }
-                tablets_to_commit.push_back(tablet);
-            }
+Status VTabletWriterV2::_close_load(const Streams& streams) {
+    auto node_id = streams[0]->dst_id();
+    std::vector<PTabletID> tablets_to_commit;
+    for (auto [tablet_id, tablet] : _tablets_for_node[node_id]) {
+        if (_tablet_finder->partition_ids().contains(tablet.partition_id())) {
+            tablets_to_commit.push_back(tablet);
         }
-        if (VLOG_DEBUG_IS_ON) {
-            std::string msg("close load partitions: ");
-            msg.reserve(partition_ids.size() * 7);
-            for (auto v : partition_ids) {
-                msg.append(std::to_string(v) + ", ");
-            }
-            LOG(WARNING) << msg;
-        }
-        _streams_for_node->save_tablets_to_commit(dst_id, tablets_to_commit);
     }
+    for (const auto& stream : streams) {
+        RETURN_IF_ERROR(stream->close_load(tablets_to_commit));
+    }
+    return Status::OK();
 }
 
 } // namespace doris::vectorized
diff --git a/be/src/vec/sink/writer/vtablet_writer_v2.h 
b/be/src/vec/sink/writer/vtablet_writer_v2.h
index 7785733bf4a..460b3acc33f 100644
--- a/be/src/vec/sink/writer/vtablet_writer_v2.h
+++ b/be/src/vec/sink/writer/vtablet_writer_v2.h
@@ -69,7 +69,7 @@
 namespace doris {
 class DeltaWriterV2;
 class LoadStreamStub;
-class LoadStreamMap;
+class LoadStreams;
 class ObjectPool;
 class RowDescriptor;
 class RuntimeState;
@@ -121,9 +121,9 @@ private:
 
     Status _init(RuntimeState* state, RuntimeProfile* profile);
 
-    Status _open_streams();
+    Status _open_streams(int64_t src_id);
 
-    Status _open_streams_to_backend(int64_t dst_id, Streams& streams);
+    Status _open_streams_to_backend(int64_t dst_id, LoadStreams& streams);
 
     Status _incremental_open_streams(const std::vector<TOlapTablePartition>& 
partitions);
 
@@ -140,7 +140,7 @@ private:
     Status _select_streams(int64_t tablet_id, int64_t partition_id, int64_t 
index_id,
                            Streams& streams);
 
-    void _calc_tablets_to_commit();
+    Status _close_load(const Streams& streams);
 
     Status _cancel(Status status);
 
@@ -217,7 +217,7 @@ private:
     std::unordered_map<int64_t, std::unordered_map<int64_t, PTabletID>> 
_tablets_for_node;
     std::unordered_map<int64_t, std::vector<PTabletID>> _indexes_from_node;
 
-    std::shared_ptr<LoadStreamMap> _streams_for_node;
+    std::unordered_map<int64_t, std::shared_ptr<LoadStreams>> 
_streams_for_node;
 
     size_t _stream_index = 0;
     std::shared_ptr<DeltaWriterV2Map> _delta_writer_for_tablet;
diff --git a/be/test/io/fs/stream_sink_file_writer_test.cpp 
b/be/test/io/fs/stream_sink_file_writer_test.cpp
index ad6e496c56f..7e5bdd350f5 100644
--- a/be/test/io/fs/stream_sink_file_writer_test.cpp
+++ b/be/test/io/fs/stream_sink_file_writer_test.cpp
@@ -51,9 +51,7 @@ static std::atomic<int64_t> g_num_request;
 class StreamSinkFileWriterTest : public testing::Test {
     class MockStreamStub : public LoadStreamStub {
     public:
-        MockStreamStub(PUniqueId load_id, int64_t src_id)
-                : LoadStreamStub(load_id, src_id, 
std::make_shared<IndexToTabletSchema>(),
-                                 std::make_shared<IndexToEnableMoW>()) {};
+        MockStreamStub(PUniqueId load_id, int64_t src_id) : 
LoadStreamStub(load_id, src_id, 1) {};
 
         virtual ~MockStreamStub() = default;
 
diff --git a/be/test/vec/exec/load_stream_stub_pool_test.cpp 
b/be/test/vec/exec/load_stream_stub_pool_test.cpp
index e576db3bdaa..24da3bb6999 100644
--- a/be/test/vec/exec/load_stream_stub_pool_test.cpp
+++ b/be/test/vec/exec/load_stream_stub_pool_test.cpp
@@ -32,29 +32,20 @@ TEST_F(LoadStreamStubPoolTest, test) {
     LoadStreamStubPool pool;
     int64_t src_id = 100;
     PUniqueId load_id;
-    load_id.set_lo(1);
+    load_id.set_hi(1);
     load_id.set_hi(2);
-    PUniqueId load_id2;
-    load_id2.set_lo(2);
-    load_id2.set_hi(1);
-    auto streams_for_node1 = pool.get_or_create(load_id, src_id, 5, 2);
-    auto streams_for_node2 = pool.get_or_create(load_id, src_id, 5, 2);
-    EXPECT_EQ(1, pool.size());
-    auto streams_for_node3 = pool.get_or_create(load_id2, src_id, 8, 1);
+    auto streams1 = pool.get_or_create(load_id, src_id, 101, 5, 1);
+    auto streams2 = pool.get_or_create(load_id, src_id, 102, 5, 1);
+    auto streams3 = pool.get_or_create(load_id, src_id, 101, 5, 1);
     EXPECT_EQ(2, pool.size());
-    EXPECT_EQ(streams_for_node1, streams_for_node2);
-    EXPECT_NE(streams_for_node1, streams_for_node3);
-
-    EXPECT_EQ(5, streams_for_node1->get_or_create(101)->size());
-    EXPECT_EQ(5, streams_for_node2->get_or_create(102)->size());
-    EXPECT_EQ(8, streams_for_node3->get_or_create(101)->size());
-
-    EXPECT_TRUE(streams_for_node3->release());
-    EXPECT_EQ(1, pool.size());
-    EXPECT_FALSE(streams_for_node1->release());
-    EXPECT_EQ(1, pool.size());
-    EXPECT_TRUE(streams_for_node2->release());
+    EXPECT_EQ(1, pool.templates_size());
+    EXPECT_EQ(streams1, streams3);
+    EXPECT_NE(streams1, streams2);
+    streams1->release();
+    streams2->release();
+    streams3->release();
     EXPECT_EQ(0, pool.size());
+    EXPECT_EQ(0, pool.templates_size());
 }
 
 } // namespace doris


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org
For additional commands, e-mail: commits-h...@doris.apache.org

Reply via email to