This is an automated email from the ASF dual-hosted git repository. dataroaring pushed a commit to branch branch-3.0 in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/branch-3.0 by this push: new a565672578c [performance](move-memtable) async close tablet streams (#41156 & #43813) (#44471) a565672578c is described below commit a565672578c9403f7f66fb312843732c446b49e7 Author: Kaijie Chen <chenkai...@selectdb.com> AuthorDate: Tue Nov 26 23:37:36 2024 +0800 [performance](move-memtable) async close tablet streams (#41156 & #43813) (#44471) backport #41156 and #43813 --- be/src/runtime/load_stream.cpp | 135 +++++++++++++++++----------------- be/src/runtime/load_stream.h | 5 +- be/src/runtime/load_stream_writer.cpp | 12 ++- be/src/runtime/load_stream_writer.h | 11 ++- 4 files changed, 93 insertions(+), 70 deletions(-) diff --git a/be/src/runtime/load_stream.cpp b/be/src/runtime/load_stream.cpp index 88c64eb517c..752e2ff95b2 100644 --- a/be/src/runtime/load_stream.cpp +++ b/be/src/runtime/load_stream.cpp @@ -65,7 +65,6 @@ TabletStream::TabletStream(PUniqueId load_id, int64_t id, int64_t txn_id, _txn_id(txn_id), _load_stream_mgr(load_stream_mgr) { load_stream_mgr->create_tokens(_flush_tokens); - _status = Status::OK(); _profile = profile->create_child(fmt::format("TabletStream {}", id), true, true); _append_data_timer = ADD_TIMER(_profile, "AppendDataTime"); _add_segment_timer = ADD_TIMER(_profile, "AddSegmentTime"); @@ -74,7 +73,7 @@ TabletStream::TabletStream(PUniqueId load_id, int64_t id, int64_t txn_id, inline std::ostream& operator<<(std::ostream& ostr, const TabletStream& tablet_stream) { ostr << "load_id=" << tablet_stream._load_id << ", txn_id=" << tablet_stream._txn_id - << ", tablet_id=" << tablet_stream._id << ", status=" << tablet_stream._status; + << ", tablet_id=" << tablet_stream._id << ", status=" << tablet_stream._status.status(); return ostr; } @@ -93,19 +92,19 @@ Status TabletStream::init(std::shared_ptr<OlapTableSchemaParam> schema, int64_t _load_stream_writer = std::make_shared<LoadStreamWriter>(&req, _profile); DBUG_EXECUTE_IF("TabletStream.init.uninited_writer", { - _status = Status::Uninitialized("fault injection"); - return _status; + _status.update(Status::Uninitialized("fault injection")); + return _status.status(); }); - _status = _load_stream_writer->init(); + _status.update(_load_stream_writer->init()); if (!_status.ok()) { LOG(INFO) << "failed to init rowset builder due to " << *this; } - return _status; + return _status.status(); } Status TabletStream::append_data(const PStreamHeader& header, butil::IOBuf* data) { if (!_status.ok()) { - return _status; + return _status.status(); } // dispatch add_segment request @@ -174,8 +173,8 @@ Status TabletStream::append_data(const PStreamHeader& header, butil::IOBuf* data } DBUG_EXECUTE_IF("TabletStream.append_data.append_failed", { st = Status::InternalError("fault injection"); }); - if (!st.ok() && _status.ok()) { - _status = st; + if (!st.ok()) { + _status.update(st); LOG(WARNING) << "write data failed " << st << ", " << *this; } }; @@ -191,11 +190,11 @@ Status TabletStream::append_data(const PStreamHeader& header, butil::IOBuf* data timer.start(); while (flush_token->num_tasks() >= load_stream_flush_token_max_tasks) { if (timer.elapsed_time() / 1000 / 1000 >= load_stream_max_wait_flush_token_time_ms) { - _status = Status::Error<true>( - "wait flush token back pressure time is more than " - "load_stream_max_wait_flush_token_time {}", - load_stream_max_wait_flush_token_time_ms); - return _status; + _status.update( + Status::Error<true>("wait flush token back pressure time is more than " + "load_stream_max_wait_flush_token_time {}", + load_stream_max_wait_flush_token_time_ms)); + return _status.status(); } bthread_usleep(2 * 1000); // 2ms } @@ -210,14 +209,14 @@ Status TabletStream::append_data(const PStreamHeader& header, butil::IOBuf* data st = flush_token->submit_func(flush_func); } if (!st.ok()) { - _status = st; + _status.update(st); } - return _status; + return _status.status(); } Status TabletStream::add_segment(const PStreamHeader& header, butil::IOBuf* data) { if (!_status.ok()) { - return _status; + return _status.status(); } SCOPED_TIMER(_add_segment_timer); @@ -236,19 +235,19 @@ Status TabletStream::add_segment(const PStreamHeader& header, butil::IOBuf* data { std::lock_guard lock_guard(_lock); if (!_segids_mapping.contains(src_id)) { - _status = Status::InternalError( + _status.update(Status::InternalError( "add segment failed, no segment written by this src be yet, src_id={}, " "segment_id={}", - src_id, segid); - return _status; + src_id, segid)); + return _status.status(); } DBUG_EXECUTE_IF("TabletStream.add_segment.segid_never_written", { segid = _segids_mapping[src_id]->size(); }); if (segid >= _segids_mapping[src_id]->size()) { - _status = Status::InternalError( + _status.update(Status::InternalError( "add segment failed, segment is never written, src_id={}, segment_id={}", - src_id, segid); - return _status; + src_id, segid)); + return _status.status(); } new_segid = _segids_mapping[src_id]->at(segid); } @@ -259,8 +258,8 @@ Status TabletStream::add_segment(const PStreamHeader& header, butil::IOBuf* data auto st = _load_stream_writer->add_segment(new_segid, stat, flush_schema); DBUG_EXECUTE_IF("TabletStream.add_segment.add_segment_failed", { st = Status::InternalError("fault injection"); }); - if (!st.ok() && _status.ok()) { - _status = st; + if (!st.ok()) { + _status.update(st); LOG(INFO) << "add segment failed " << *this; } }; @@ -272,69 +271,69 @@ Status TabletStream::add_segment(const PStreamHeader& header, butil::IOBuf* data st = flush_token->submit_func(add_segment_func); } if (!st.ok()) { - _status = st; + _status.update(st); } - return _status; + return _status.status(); } -Status TabletStream::close() { - if (!_status.ok()) { - return _status; - } - - SCOPED_TIMER(_close_wait_timer); +Status TabletStream::_run_in_heavy_work_pool(std::function<Status()> fn) { bthread::Mutex mu; std::unique_lock<bthread::Mutex> lock(mu); bthread::ConditionVariable cv; - auto wait_func = [this, &mu, &cv] { + auto st = Status::OK(); + auto func = [this, &mu, &cv, &st, &fn] { signal::set_signal_task_id(_load_id); - for (auto& token : _flush_tokens) { - token->wait(); - } + st = fn(); std::lock_guard<bthread::Mutex> lock(mu); cv.notify_one(); }; - bool ret = _load_stream_mgr->heavy_work_pool()->try_offer(wait_func); - if (ret) { - cv.wait(lock); - } else { - _status = Status::Error<ErrorCode::INTERNAL_ERROR>( + bool ret = _load_stream_mgr->heavy_work_pool()->try_offer(func); + if (!ret) { + return Status::Error<ErrorCode::INTERNAL_ERROR>( "there is not enough thread resource for close load"); - return _status; } + cv.wait(lock); + return st; +} - DBUG_EXECUTE_IF("TabletStream.close.segment_num_mismatch", { _num_segments++; }); - if (_check_num_segments && (_next_segid.load() != _num_segments)) { - _status = Status::Corruption( - "segment num mismatch in tablet {}, expected: {}, actual: {}, load_id: {}", _id, - _num_segments, _next_segid.load(), print_id(_load_id)); - return _status; +void TabletStream::pre_close() { + if (!_status.ok()) { + return; } + SCOPED_TIMER(_close_wait_timer); + _status.update(_run_in_heavy_work_pool([this]() { + for (auto& token : _flush_tokens) { + token->wait(); + } + return Status::OK(); + })); // it is necessary to check status after wait_func, // for create_rowset could fail during add_segment when loading to MOW table, // in this case, should skip close to avoid submit_calc_delete_bitmap_task which could cause coredump. if (!_status.ok()) { - return _status; + return; } - auto close_func = [this, &mu, &cv]() { - signal::set_signal_task_id(_load_id); - auto st = _load_stream_writer->close(); - if (!st.ok() && _status.ok()) { - _status = st; - } - std::lock_guard<bthread::Mutex> lock(mu); - cv.notify_one(); - }; - ret = _load_stream_mgr->heavy_work_pool()->try_offer(close_func); - if (ret) { - cv.wait(lock); - } else { - _status = Status::Error<ErrorCode::INTERNAL_ERROR>( - "there is not enough thread resource for close load"); + DBUG_EXECUTE_IF("TabletStream.close.segment_num_mismatch", { _num_segments++; }); + if (_check_num_segments && (_next_segid.load() != _num_segments)) { + _status.update(Status::Corruption( + "segment num mismatch in tablet {}, expected: {}, actual: {}, load_id: {}", _id, + _num_segments, _next_segid.load(), print_id(_load_id))); + return; } - return _status; + + _status.update(_run_in_heavy_work_pool([this]() { return _load_stream_writer->pre_close(); })); +} + +Status TabletStream::close() { + if (!_status.ok()) { + return _status.status(); + } + + SCOPED_TIMER(_close_wait_timer); + _status.update(_run_in_heavy_work_pool([this]() { return _load_stream_writer->close(); })); + return _status.status(); } IndexStream::IndexStream(PUniqueId load_id, int64_t id, int64_t txn_id, @@ -402,6 +401,10 @@ void IndexStream::close(const std::vector<PTabletID>& tablets_to_commit, } } + for (auto& [_, tablet_stream] : _tablet_streams_map) { + tablet_stream->pre_close(); + } + for (auto& [_, tablet_stream] : _tablet_streams_map) { auto st = tablet_stream->close(); if (st.ok()) { diff --git a/be/src/runtime/load_stream.h b/be/src/runtime/load_stream.h index 3b649c68835..1f4ef2b3c4c 100644 --- a/be/src/runtime/load_stream.h +++ b/be/src/runtime/load_stream.h @@ -54,12 +54,15 @@ public: Status add_segment(const PStreamHeader& header, butil::IOBuf* data); void add_num_segments(int64_t num_segments) { _num_segments += num_segments; } void disable_num_segments_check() { _check_num_segments = false; } + void pre_close(); Status close(); int64_t id() const { return _id; } friend std::ostream& operator<<(std::ostream& ostr, const TabletStream& tablet_stream); private: + Status _run_in_heavy_work_pool(std::function<Status()> fn); + int64_t _id; LoadStreamWriterSharedPtr _load_stream_writer; std::vector<std::unique_ptr<ThreadPoolToken>> _flush_tokens; @@ -68,7 +71,7 @@ private: int64_t _num_segments = 0; bool _check_num_segments = true; bthread::Mutex _lock; - Status _status; + AtomicStatus _status; PUniqueId _load_id; int64_t _txn_id; RuntimeProfile* _profile = nullptr; diff --git a/be/src/runtime/load_stream_writer.cpp b/be/src/runtime/load_stream_writer.cpp index 2e987edc7bd..377b27e6e45 100644 --- a/be/src/runtime/load_stream_writer.cpp +++ b/be/src/runtime/load_stream_writer.cpp @@ -245,8 +245,7 @@ Status LoadStreamWriter::_calc_file_size(uint32_t segid, FileType file_type, siz return Status::OK(); } -Status LoadStreamWriter::close() { - std::lock_guard<std::mutex> l(_lock); +Status LoadStreamWriter::_pre_close() { SCOPED_ATTACH_TASK(_query_thread_context); if (!_is_init) { // if this delta writer is not initialized, but close() is called. @@ -306,6 +305,15 @@ Status LoadStreamWriter::close() { RETURN_IF_ERROR(_rowset_builder->build_rowset()); RETURN_IF_ERROR(_rowset_builder->submit_calc_delete_bitmap_task()); + _pre_closed = true; + return Status::OK(); +} + +Status LoadStreamWriter::close() { + std::lock_guard<std::mutex> l(_lock); + if (!_pre_closed) { + RETURN_IF_ERROR(_pre_close()); + } RETURN_IF_ERROR(_rowset_builder->wait_calc_delete_bitmap()); // FIXME(plat1ko): No `commit_txn` operation in cloud mode, need better abstractions RETURN_IF_ERROR(static_cast<RowsetBuilder*>(_rowset_builder.get())->commit_txn()); diff --git a/be/src/runtime/load_stream_writer.h b/be/src/runtime/load_stream_writer.h index b22817cb85c..8815b0f0e3e 100644 --- a/be/src/runtime/load_stream_writer.h +++ b/be/src/runtime/load_stream_writer.h @@ -70,14 +70,23 @@ public: Status add_segment(uint32_t segid, const SegmentStatistics& stat, TabletSchemaSPtr flush_chema); - Status _calc_file_size(uint32_t segid, FileType file_type, size_t* file_size); + Status pre_close() { + std::lock_guard<std::mutex> l(_lock); + return _pre_close(); + } // wait for all memtables to be flushed. Status close(); private: + Status _calc_file_size(uint32_t segid, FileType file_type, size_t* file_size); + + // without lock + Status _pre_close(); + bool _is_init = false; bool _is_canceled = false; + bool _pre_closed = false; WriteRequest _req; std::unique_ptr<BaseRowsetBuilder> _rowset_builder; std::shared_ptr<RowsetWriter> _rowset_writer; --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org For additional commands, e-mail: commits-h...@doris.apache.org