This is an automated email from the ASF dual-hosted git repository. dataroaring pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/master by this push: new c071c327e7 [fix](load) fix add broken tablet core dump (#17104) c071c327e7 is described below commit c071c327e715750913425c243cfba811087a917f Author: Xin Liao <liaoxin...@126.com> AuthorDate: Fri Feb 24 23:59:03 2023 +0800 [fix](load) fix add broken tablet core dump (#17104) --- be/src/runtime/tablets_channel.cpp | 19 ++++++++++++++----- be/src/runtime/tablets_channel.h | 5 +++++ 2 files changed, 19 insertions(+), 5 deletions(-) diff --git a/be/src/runtime/tablets_channel.cpp b/be/src/runtime/tablets_channel.cpp index c692ca03d8..833b4d995c 100644 --- a/be/src/runtime/tablets_channel.cpp +++ b/be/src/runtime/tablets_channel.cpp @@ -125,7 +125,7 @@ Status TabletsChannel::close( // to make sure tablet writer in `_broken_tablets` won't call `close_wait` method. // `close_wait` might create the rowset and commit txn directly, and the subsequent // publish version task will success, which can cause the replica inconsistency. - if (_broken_tablets.find(it.second->tablet_id()) != _broken_tablets.end()) { + if (_is_broken_tablet(it.second->tablet_id())) { LOG(WARNING) << "SHOULD NOT HAPPEN, tablet writer is broken but not cancelled" << ", tablet_id=" << it.first << ", transaction_id=" << _txn_id; continue; @@ -312,7 +312,7 @@ Status TabletsChannel::add_batch(const TabletWriterAddRequest& request, std::unordered_map<int64_t /* tablet_id */, std::vector<int> /* row index */> tablet_to_rowidxs; for (int i = 0; i < request.tablet_ids_size(); ++i) { int64_t tablet_id = request.tablet_ids(i); - if (_broken_tablets.find(tablet_id) != _broken_tablets.end()) { + if (_is_broken_tablet(tablet_id)) { // skip broken tablets VLOG_PROGRESS << "skip broken tablet tablet=" << tablet_id; continue; @@ -347,7 +347,7 @@ Status TabletsChannel::add_batch(const TabletWriterAddRequest& request, error->set_tablet_id(tablet_to_rowidxs_it.first); error->set_msg(err_msg); tablet_writer_it->second->cancel_with_status(st); - _broken_tablets.insert(tablet_to_rowidxs_it.first); + _add_broken_tablet(tablet_to_rowidxs_it.first); // continue write to other tablet. // the error will return back to sender. } @@ -387,7 +387,7 @@ void TabletsChannel::flush_memtable_async(int64_t tablet_id) { tablet_id, _txn_id, st.to_string()); LOG(WARNING) << err_msg; iter->second->cancel_with_status(st); - _broken_tablets.insert(iter->second->tablet_id()); + _add_broken_tablet(tablet_id); } } @@ -415,7 +415,7 @@ void TabletsChannel::wait_flush(int64_t tablet_id) { tablet_id, _txn_id, st.to_string()); LOG(WARNING) << err_msg; iter->second->cancel_with_status(st); - _broken_tablets.insert(iter->second->tablet_id()); + _add_broken_tablet(tablet_id); } { @@ -423,6 +423,15 @@ void TabletsChannel::wait_flush(int64_t tablet_id) { _reducing_tablets.erase(tablet_id); } } +void TabletsChannel::_add_broken_tablet(int64_t tablet_id) { + std::unique_lock<std::shared_mutex> wlock(_broken_tablets_lock); + _broken_tablets.insert(tablet_id); +} + +bool TabletsChannel::_is_broken_tablet(int64_t tablet_id) { + std::shared_lock<std::shared_mutex> rlock(_broken_tablets_lock); + return _broken_tablets.find(tablet_id) != _broken_tablets.end(); +} template Status TabletsChannel::add_batch<PTabletWriterAddBlockRequest, PTabletWriterAddBlockResult>( diff --git a/be/src/runtime/tablets_channel.h b/be/src/runtime/tablets_channel.h index e649852a47..3f9fdce635 100644 --- a/be/src/runtime/tablets_channel.h +++ b/be/src/runtime/tablets_channel.h @@ -110,6 +110,9 @@ private: google::protobuf::RepeatedPtrField<PTabletError>* tablet_error, PSlaveTabletNodes slave_tablet_nodes, const bool write_single_replica); + void _add_broken_tablet(int64_t tablet_id); + bool _is_broken_tablet(int64_t tablet_id); + // id of this load channel TabletsChannelKey _key; @@ -151,6 +154,8 @@ private: // So that following batch will not handle this tablet anymore. std::unordered_set<int64_t> _broken_tablets; + std::shared_mutex _broken_tablets_lock; + std::unordered_set<int64_t> _reducing_tablets; std::unordered_set<int64_t> _partition_ids; --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org For additional commands, e-mail: commits-h...@doris.apache.org