This is an automated email from the ASF dual-hosted git repository.

dataroaring pushed a commit to branch branch-3.0
in repository https://gitbox.apache.org/repos/asf/doris.git


The following commit(s) were added to refs/heads/branch-3.0 by this push:
     new a565672578c [performance](move-memtable) async close tablet streams 
(#41156 & #43813) (#44471)
a565672578c is described below

commit a565672578c9403f7f66fb312843732c446b49e7
Author: Kaijie Chen <chenkai...@selectdb.com>
AuthorDate: Tue Nov 26 23:37:36 2024 +0800

    [performance](move-memtable) async close tablet streams (#41156 & #43813) 
(#44471)
    
    backport #41156 and #43813
---
 be/src/runtime/load_stream.cpp        | 135 +++++++++++++++++-----------------
 be/src/runtime/load_stream.h          |   5 +-
 be/src/runtime/load_stream_writer.cpp |  12 ++-
 be/src/runtime/load_stream_writer.h   |  11 ++-
 4 files changed, 93 insertions(+), 70 deletions(-)

diff --git a/be/src/runtime/load_stream.cpp b/be/src/runtime/load_stream.cpp
index 88c64eb517c..752e2ff95b2 100644
--- a/be/src/runtime/load_stream.cpp
+++ b/be/src/runtime/load_stream.cpp
@@ -65,7 +65,6 @@ TabletStream::TabletStream(PUniqueId load_id, int64_t id, 
int64_t txn_id,
           _txn_id(txn_id),
           _load_stream_mgr(load_stream_mgr) {
     load_stream_mgr->create_tokens(_flush_tokens);
-    _status = Status::OK();
     _profile = profile->create_child(fmt::format("TabletStream {}", id), true, 
true);
     _append_data_timer = ADD_TIMER(_profile, "AppendDataTime");
     _add_segment_timer = ADD_TIMER(_profile, "AddSegmentTime");
@@ -74,7 +73,7 @@ TabletStream::TabletStream(PUniqueId load_id, int64_t id, 
int64_t txn_id,
 
 inline std::ostream& operator<<(std::ostream& ostr, const TabletStream& 
tablet_stream) {
     ostr << "load_id=" << tablet_stream._load_id << ", txn_id=" << 
tablet_stream._txn_id
-         << ", tablet_id=" << tablet_stream._id << ", status=" << 
tablet_stream._status;
+         << ", tablet_id=" << tablet_stream._id << ", status=" << 
tablet_stream._status.status();
     return ostr;
 }
 
@@ -93,19 +92,19 @@ Status 
TabletStream::init(std::shared_ptr<OlapTableSchemaParam> schema, int64_t
 
     _load_stream_writer = std::make_shared<LoadStreamWriter>(&req, _profile);
     DBUG_EXECUTE_IF("TabletStream.init.uninited_writer", {
-        _status = Status::Uninitialized("fault injection");
-        return _status;
+        _status.update(Status::Uninitialized("fault injection"));
+        return _status.status();
     });
-    _status = _load_stream_writer->init();
+    _status.update(_load_stream_writer->init());
     if (!_status.ok()) {
         LOG(INFO) << "failed to init rowset builder due to " << *this;
     }
-    return _status;
+    return _status.status();
 }
 
 Status TabletStream::append_data(const PStreamHeader& header, butil::IOBuf* 
data) {
     if (!_status.ok()) {
-        return _status;
+        return _status.status();
     }
 
     // dispatch add_segment request
@@ -174,8 +173,8 @@ Status TabletStream::append_data(const PStreamHeader& 
header, butil::IOBuf* data
         }
         DBUG_EXECUTE_IF("TabletStream.append_data.append_failed",
                         { st = Status::InternalError("fault injection"); });
-        if (!st.ok() && _status.ok()) {
-            _status = st;
+        if (!st.ok()) {
+            _status.update(st);
             LOG(WARNING) << "write data failed " << st << ", " << *this;
         }
     };
@@ -191,11 +190,11 @@ Status TabletStream::append_data(const PStreamHeader& 
header, butil::IOBuf* data
     timer.start();
     while (flush_token->num_tasks() >= load_stream_flush_token_max_tasks) {
         if (timer.elapsed_time() / 1000 / 1000 >= 
load_stream_max_wait_flush_token_time_ms) {
-            _status = Status::Error<true>(
-                    "wait flush token back pressure time is more than "
-                    "load_stream_max_wait_flush_token_time {}",
-                    load_stream_max_wait_flush_token_time_ms);
-            return _status;
+            _status.update(
+                    Status::Error<true>("wait flush token back pressure time 
is more than "
+                                        "load_stream_max_wait_flush_token_time 
{}",
+                                        
load_stream_max_wait_flush_token_time_ms));
+            return _status.status();
         }
         bthread_usleep(2 * 1000); // 2ms
     }
@@ -210,14 +209,14 @@ Status TabletStream::append_data(const PStreamHeader& 
header, butil::IOBuf* data
         st = flush_token->submit_func(flush_func);
     }
     if (!st.ok()) {
-        _status = st;
+        _status.update(st);
     }
-    return _status;
+    return _status.status();
 }
 
 Status TabletStream::add_segment(const PStreamHeader& header, butil::IOBuf* 
data) {
     if (!_status.ok()) {
-        return _status;
+        return _status.status();
     }
 
     SCOPED_TIMER(_add_segment_timer);
@@ -236,19 +235,19 @@ Status TabletStream::add_segment(const PStreamHeader& 
header, butil::IOBuf* data
     {
         std::lock_guard lock_guard(_lock);
         if (!_segids_mapping.contains(src_id)) {
-            _status = Status::InternalError(
+            _status.update(Status::InternalError(
                     "add segment failed, no segment written by this src be 
yet, src_id={}, "
                     "segment_id={}",
-                    src_id, segid);
-            return _status;
+                    src_id, segid));
+            return _status.status();
         }
         DBUG_EXECUTE_IF("TabletStream.add_segment.segid_never_written",
                         { segid = _segids_mapping[src_id]->size(); });
         if (segid >= _segids_mapping[src_id]->size()) {
-            _status = Status::InternalError(
+            _status.update(Status::InternalError(
                     "add segment failed, segment is never written, src_id={}, 
segment_id={}",
-                    src_id, segid);
-            return _status;
+                    src_id, segid));
+            return _status.status();
         }
         new_segid = _segids_mapping[src_id]->at(segid);
     }
@@ -259,8 +258,8 @@ Status TabletStream::add_segment(const PStreamHeader& 
header, butil::IOBuf* data
         auto st = _load_stream_writer->add_segment(new_segid, stat, 
flush_schema);
         DBUG_EXECUTE_IF("TabletStream.add_segment.add_segment_failed",
                         { st = Status::InternalError("fault injection"); });
-        if (!st.ok() && _status.ok()) {
-            _status = st;
+        if (!st.ok()) {
+            _status.update(st);
             LOG(INFO) << "add segment failed " << *this;
         }
     };
@@ -272,69 +271,69 @@ Status TabletStream::add_segment(const PStreamHeader& 
header, butil::IOBuf* data
         st = flush_token->submit_func(add_segment_func);
     }
     if (!st.ok()) {
-        _status = st;
+        _status.update(st);
     }
-    return _status;
+    return _status.status();
 }
 
-Status TabletStream::close() {
-    if (!_status.ok()) {
-        return _status;
-    }
-
-    SCOPED_TIMER(_close_wait_timer);
+Status TabletStream::_run_in_heavy_work_pool(std::function<Status()> fn) {
     bthread::Mutex mu;
     std::unique_lock<bthread::Mutex> lock(mu);
     bthread::ConditionVariable cv;
-    auto wait_func = [this, &mu, &cv] {
+    auto st = Status::OK();
+    auto func = [this, &mu, &cv, &st, &fn] {
         signal::set_signal_task_id(_load_id);
-        for (auto& token : _flush_tokens) {
-            token->wait();
-        }
+        st = fn();
         std::lock_guard<bthread::Mutex> lock(mu);
         cv.notify_one();
     };
-    bool ret = _load_stream_mgr->heavy_work_pool()->try_offer(wait_func);
-    if (ret) {
-        cv.wait(lock);
-    } else {
-        _status = Status::Error<ErrorCode::INTERNAL_ERROR>(
+    bool ret = _load_stream_mgr->heavy_work_pool()->try_offer(func);
+    if (!ret) {
+        return Status::Error<ErrorCode::INTERNAL_ERROR>(
                 "there is not enough thread resource for close load");
-        return _status;
     }
+    cv.wait(lock);
+    return st;
+}
 
-    DBUG_EXECUTE_IF("TabletStream.close.segment_num_mismatch", { 
_num_segments++; });
-    if (_check_num_segments && (_next_segid.load() != _num_segments)) {
-        _status = Status::Corruption(
-                "segment num mismatch in tablet {}, expected: {}, actual: {}, 
load_id: {}", _id,
-                _num_segments, _next_segid.load(), print_id(_load_id));
-        return _status;
+void TabletStream::pre_close() {
+    if (!_status.ok()) {
+        return;
     }
 
+    SCOPED_TIMER(_close_wait_timer);
+    _status.update(_run_in_heavy_work_pool([this]() {
+        for (auto& token : _flush_tokens) {
+            token->wait();
+        }
+        return Status::OK();
+    }));
     // it is necessary to check status after wait_func,
     // for create_rowset could fail during add_segment when loading to MOW 
table,
     // in this case, should skip close to avoid submit_calc_delete_bitmap_task 
which could cause coredump.
     if (!_status.ok()) {
-        return _status;
+        return;
     }
 
-    auto close_func = [this, &mu, &cv]() {
-        signal::set_signal_task_id(_load_id);
-        auto st = _load_stream_writer->close();
-        if (!st.ok() && _status.ok()) {
-            _status = st;
-        }
-        std::lock_guard<bthread::Mutex> lock(mu);
-        cv.notify_one();
-    };
-    ret = _load_stream_mgr->heavy_work_pool()->try_offer(close_func);
-    if (ret) {
-        cv.wait(lock);
-    } else {
-        _status = Status::Error<ErrorCode::INTERNAL_ERROR>(
-                "there is not enough thread resource for close load");
+    DBUG_EXECUTE_IF("TabletStream.close.segment_num_mismatch", { 
_num_segments++; });
+    if (_check_num_segments && (_next_segid.load() != _num_segments)) {
+        _status.update(Status::Corruption(
+                "segment num mismatch in tablet {}, expected: {}, actual: {}, 
load_id: {}", _id,
+                _num_segments, _next_segid.load(), print_id(_load_id)));
+        return;
     }
-    return _status;
+
+    _status.update(_run_in_heavy_work_pool([this]() { return 
_load_stream_writer->pre_close(); }));
+}
+
+Status TabletStream::close() {
+    if (!_status.ok()) {
+        return _status.status();
+    }
+
+    SCOPED_TIMER(_close_wait_timer);
+    _status.update(_run_in_heavy_work_pool([this]() { return 
_load_stream_writer->close(); }));
+    return _status.status();
 }
 
 IndexStream::IndexStream(PUniqueId load_id, int64_t id, int64_t txn_id,
@@ -402,6 +401,10 @@ void IndexStream::close(const std::vector<PTabletID>& 
tablets_to_commit,
         }
     }
 
+    for (auto& [_, tablet_stream] : _tablet_streams_map) {
+        tablet_stream->pre_close();
+    }
+
     for (auto& [_, tablet_stream] : _tablet_streams_map) {
         auto st = tablet_stream->close();
         if (st.ok()) {
diff --git a/be/src/runtime/load_stream.h b/be/src/runtime/load_stream.h
index 3b649c68835..1f4ef2b3c4c 100644
--- a/be/src/runtime/load_stream.h
+++ b/be/src/runtime/load_stream.h
@@ -54,12 +54,15 @@ public:
     Status add_segment(const PStreamHeader& header, butil::IOBuf* data);
     void add_num_segments(int64_t num_segments) { _num_segments += 
num_segments; }
     void disable_num_segments_check() { _check_num_segments = false; }
+    void pre_close();
     Status close();
     int64_t id() const { return _id; }
 
     friend std::ostream& operator<<(std::ostream& ostr, const TabletStream& 
tablet_stream);
 
 private:
+    Status _run_in_heavy_work_pool(std::function<Status()> fn);
+
     int64_t _id;
     LoadStreamWriterSharedPtr _load_stream_writer;
     std::vector<std::unique_ptr<ThreadPoolToken>> _flush_tokens;
@@ -68,7 +71,7 @@ private:
     int64_t _num_segments = 0;
     bool _check_num_segments = true;
     bthread::Mutex _lock;
-    Status _status;
+    AtomicStatus _status;
     PUniqueId _load_id;
     int64_t _txn_id;
     RuntimeProfile* _profile = nullptr;
diff --git a/be/src/runtime/load_stream_writer.cpp 
b/be/src/runtime/load_stream_writer.cpp
index 2e987edc7bd..377b27e6e45 100644
--- a/be/src/runtime/load_stream_writer.cpp
+++ b/be/src/runtime/load_stream_writer.cpp
@@ -245,8 +245,7 @@ Status LoadStreamWriter::_calc_file_size(uint32_t segid, 
FileType file_type, siz
     return Status::OK();
 }
 
-Status LoadStreamWriter::close() {
-    std::lock_guard<std::mutex> l(_lock);
+Status LoadStreamWriter::_pre_close() {
     SCOPED_ATTACH_TASK(_query_thread_context);
     if (!_is_init) {
         // if this delta writer is not initialized, but close() is called.
@@ -306,6 +305,15 @@ Status LoadStreamWriter::close() {
 
     RETURN_IF_ERROR(_rowset_builder->build_rowset());
     RETURN_IF_ERROR(_rowset_builder->submit_calc_delete_bitmap_task());
+    _pre_closed = true;
+    return Status::OK();
+}
+
+Status LoadStreamWriter::close() {
+    std::lock_guard<std::mutex> l(_lock);
+    if (!_pre_closed) {
+        RETURN_IF_ERROR(_pre_close());
+    }
     RETURN_IF_ERROR(_rowset_builder->wait_calc_delete_bitmap());
     // FIXME(plat1ko): No `commit_txn` operation in cloud mode, need better 
abstractions
     
RETURN_IF_ERROR(static_cast<RowsetBuilder*>(_rowset_builder.get())->commit_txn());
diff --git a/be/src/runtime/load_stream_writer.h 
b/be/src/runtime/load_stream_writer.h
index b22817cb85c..8815b0f0e3e 100644
--- a/be/src/runtime/load_stream_writer.h
+++ b/be/src/runtime/load_stream_writer.h
@@ -70,14 +70,23 @@ public:
 
     Status add_segment(uint32_t segid, const SegmentStatistics& stat, 
TabletSchemaSPtr flush_chema);
 
-    Status _calc_file_size(uint32_t segid, FileType file_type, size_t* 
file_size);
+    Status pre_close() {
+        std::lock_guard<std::mutex> l(_lock);
+        return _pre_close();
+    }
 
     // wait for all memtables to be flushed.
     Status close();
 
 private:
+    Status _calc_file_size(uint32_t segid, FileType file_type, size_t* 
file_size);
+
+    // without lock
+    Status _pre_close();
+
     bool _is_init = false;
     bool _is_canceled = false;
+    bool _pre_closed = false;
     WriteRequest _req;
     std::unique_ptr<BaseRowsetBuilder> _rowset_builder;
     std::shared_ptr<RowsetWriter> _rowset_writer;


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org
For additional commands, e-mail: commits-h...@doris.apache.org

Reply via email to