This is an automated email from the ASF dual-hosted git repository. dataroaring pushed a commit to branch branch-3.0 in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/branch-3.0 by this push: new 9c39a343fc7 [fix](load) add lock for runtime_state->tablet_commit_infos (#48709) (#48732) 9c39a343fc7 is described below commit 9c39a343fc7b0578aea1dddeed4f4d16cba7929f Author: Kaijie Chen <chenkai...@selectdb.com> AuthorDate: Mon Mar 10 17:58:14 2025 +0800 [fix](load) add lock for runtime_state->tablet_commit_infos (#48709) (#48732) backport #48709 --- be/src/io/fs/multi_table_pipe.cpp | 4 ++-- be/src/runtime/fragment_mgr.cpp | 27 ++++++++-------------- be/src/runtime/runtime_state.h | 27 +++++++++++++++++----- .../runtime/stream_load/stream_load_executor.cpp | 2 +- be/src/vec/sink/writer/vtablet_writer.cpp | 21 +++++++++-------- be/src/vec/sink/writer/vtablet_writer_v2.cpp | 5 +--- 6 files changed, 45 insertions(+), 41 deletions(-) diff --git a/be/src/io/fs/multi_table_pipe.cpp b/be/src/io/fs/multi_table_pipe.cpp index b3af2531f15..eb601c5d6f5 100644 --- a/be/src/io/fs/multi_table_pipe.cpp +++ b/be/src/io/fs/multi_table_pipe.cpp @@ -263,9 +263,9 @@ Status MultiTablePipe::exec_plans(ExecEnv* exec_env, std::vector<ExecParam> para { std::lock_guard<std::mutex> l(_tablet_commit_infos_lock); + auto commit_infos = state->tablet_commit_infos(); _tablet_commit_infos.insert(_tablet_commit_infos.end(), - state->tablet_commit_infos().begin(), - state->tablet_commit_infos().end()); + commit_infos.begin(), commit_infos.end()); } _number_total_rows += state->num_rows_load_total(); _number_loaded_rows += state->num_rows_load_success(); diff --git a/be/src/runtime/fragment_mgr.cpp b/be/src/runtime/fragment_mgr.cpp index c8c012543f9..2eea1e27ce9 100644 --- a/be/src/runtime/fragment_mgr.cpp +++ b/be/src/runtime/fragment_mgr.cpp @@ -530,35 +530,26 @@ void FragmentMgr::coordinator_callback(const ReportStatusRequest& req) { } } } - if (!req.runtime_state->tablet_commit_infos().empty()) { + if (auto tci = req.runtime_state->tablet_commit_infos(); !tci.empty()) { params.__isset.commitInfos = true; - params.commitInfos.reserve(req.runtime_state->tablet_commit_infos().size()); - for (auto& info : req.runtime_state->tablet_commit_infos()) { - params.commitInfos.push_back(info); - } + params.commitInfos.insert(params.commitInfos.end(), tci.begin(), tci.end()); } else if (!req.runtime_states.empty()) { for (auto* rs : req.runtime_states) { - if (!rs->tablet_commit_infos().empty()) { + if (auto rs_tci = rs->tablet_commit_infos(); !rs_tci.empty()) { params.__isset.commitInfos = true; - params.commitInfos.insert(params.commitInfos.end(), - rs->tablet_commit_infos().begin(), - rs->tablet_commit_infos().end()); + params.commitInfos.insert(params.commitInfos.end(), rs_tci.begin(), rs_tci.end()); } } } - if (!req.runtime_state->error_tablet_infos().empty()) { + if (auto eti = req.runtime_state->error_tablet_infos(); !eti.empty()) { params.__isset.errorTabletInfos = true; - params.errorTabletInfos.reserve(req.runtime_state->error_tablet_infos().size()); - for (auto& info : req.runtime_state->error_tablet_infos()) { - params.errorTabletInfos.push_back(info); - } + params.errorTabletInfos.insert(params.errorTabletInfos.end(), eti.begin(), eti.end()); } else if (!req.runtime_states.empty()) { for (auto* rs : req.runtime_states) { - if (!rs->error_tablet_infos().empty()) { + if (auto rs_eti = rs->error_tablet_infos(); !rs_eti.empty()) { params.__isset.errorTabletInfos = true; - params.errorTabletInfos.insert(params.errorTabletInfos.end(), - rs->error_tablet_infos().begin(), - rs->error_tablet_infos().end()); + params.errorTabletInfos.insert(params.errorTabletInfos.end(), rs_eti.begin(), + rs_eti.end()); } } } diff --git a/be/src/runtime/runtime_state.h b/be/src/runtime/runtime_state.h index e86990ae92b..a42f4b5232a 100644 --- a/be/src/runtime/runtime_state.h +++ b/be/src/runtime/runtime_state.h @@ -433,19 +433,33 @@ public: return _query_options.partitioned_hash_agg_rows_threshold; } - const std::vector<TTabletCommitInfo>& tablet_commit_infos() const { + std::vector<TTabletCommitInfo> tablet_commit_infos() const { + std::lock_guard<std::mutex> lock(_tablet_infos_mutex); return _tablet_commit_infos; } - std::vector<TTabletCommitInfo>& tablet_commit_infos() { return _tablet_commit_infos; } + void add_tablet_commit_infos(std::vector<TTabletCommitInfo>& commit_infos) { + std::lock_guard<std::mutex> lock(_tablet_infos_mutex); + _tablet_commit_infos.insert(_tablet_commit_infos.end(), + std::make_move_iterator(commit_infos.begin()), + std::make_move_iterator(commit_infos.end())); + } - std::vector<THivePartitionUpdate>& hive_partition_updates() { return _hive_partition_updates; } + std::vector<TErrorTabletInfo> error_tablet_infos() const { + std::lock_guard<std::mutex> lock(_tablet_infos_mutex); + return _error_tablet_infos; + } - std::vector<TIcebergCommitData>& iceberg_commit_datas() { return _iceberg_commit_datas; } + void add_error_tablet_infos(std::vector<TErrorTabletInfo>& tablet_infos) { + std::lock_guard<std::mutex> lock(_tablet_infos_mutex); + _error_tablet_infos.insert(_error_tablet_infos.end(), + std::make_move_iterator(tablet_infos.begin()), + std::make_move_iterator(tablet_infos.end())); + } - const std::vector<TErrorTabletInfo>& error_tablet_infos() const { return _error_tablet_infos; } + std::vector<THivePartitionUpdate>& hive_partition_updates() { return _hive_partition_updates; } - std::vector<TErrorTabletInfo>& error_tablet_infos() { return _error_tablet_infos; } + std::vector<TIcebergCommitData>& iceberg_commit_datas() { return _iceberg_commit_datas; } // local runtime filter mgr, the runtime filter do not have remote target or // not need local merge should regist here. the instance exec finish, the local @@ -712,6 +726,7 @@ private: int64_t _error_row_number; std::string _error_log_file_path; std::unique_ptr<std::ofstream> _error_log_file; // error file path, absolute path + mutable std::mutex _tablet_infos_mutex; std::vector<TTabletCommitInfo> _tablet_commit_infos; std::vector<TErrorTabletInfo> _error_tablet_infos; int _max_operator_id = 0; diff --git a/be/src/runtime/stream_load/stream_load_executor.cpp b/be/src/runtime/stream_load/stream_load_executor.cpp index ad4d22946f1..1c1fa992f50 100644 --- a/be/src/runtime/stream_load/stream_load_executor.cpp +++ b/be/src/runtime/stream_load/stream_load_executor.cpp @@ -78,7 +78,7 @@ Status StreamLoadExecutor::execute_plan_fragment(std::shared_ptr<StreamLoadConte ctx->txn_id = state->wal_id(); } ctx->exec_env()->new_load_stream_mgr()->remove(ctx->id); - ctx->commit_infos = std::move(state->tablet_commit_infos()); + ctx->commit_infos = state->tablet_commit_infos(); ctx->number_total_rows = state->num_rows_load_total(); ctx->number_loaded_rows = state->num_rows_load_success(); ctx->number_filtered_rows = state->num_rows_load_filtered(); diff --git a/be/src/vec/sink/writer/vtablet_writer.cpp b/be/src/vec/sink/writer/vtablet_writer.cpp index ccf56f5f433..a10d5e46a26 100644 --- a/be/src/vec/sink/writer/vtablet_writer.cpp +++ b/be/src/vec/sink/writer/vtablet_writer.cpp @@ -194,15 +194,18 @@ Status IndexChannel::check_intolerable_failure() { } void IndexChannel::set_error_tablet_in_state(RuntimeState* state) { - std::vector<TErrorTabletInfo>& error_tablet_infos = state->error_tablet_infos(); + std::vector<TErrorTabletInfo> error_tablet_infos; - std::lock_guard<doris::SpinLock> l(_fail_lock); - for (const auto& it : _failed_channels_msgs) { - TErrorTabletInfo error_info; - error_info.__set_tabletId(it.first); - error_info.__set_msg(it.second); - error_tablet_infos.emplace_back(error_info); + { + std::lock_guard<doris::SpinLock> l(_fail_lock); + for (const auto& it : _failed_channels_msgs) { + TErrorTabletInfo error_info; + error_info.__set_tabletId(it.first); + error_info.__set_msg(it.second); + error_tablet_infos.emplace_back(error_info); + } } + state->add_error_tablet_infos(error_tablet_infos); } void IndexChannel::set_tablets_received_rows( @@ -967,9 +970,7 @@ Status VNodeChannel::close_wait(RuntimeState* state) { if (_add_batches_finished) { _close_check(); - state->tablet_commit_infos().insert(state->tablet_commit_infos().end(), - std::make_move_iterator(_tablet_commit_infos.begin()), - std::make_move_iterator(_tablet_commit_infos.end())); + _state->add_tablet_commit_infos(_tablet_commit_infos); _index_channel->set_error_tablet_in_state(state); _index_channel->set_tablets_received_rows(_tablets_received_rows, _node_id); diff --git a/be/src/vec/sink/writer/vtablet_writer_v2.cpp b/be/src/vec/sink/writer/vtablet_writer_v2.cpp index b08b5fdf328..f432595efa5 100644 --- a/be/src/vec/sink/writer/vtablet_writer_v2.cpp +++ b/be/src/vec/sink/writer/vtablet_writer_v2.cpp @@ -672,10 +672,7 @@ Status VTabletWriterV2::close(Status exec_status) { std::vector<TTabletCommitInfo> tablet_commit_infos; RETURN_IF_ERROR( _create_commit_info(tablet_commit_infos, _load_stream_map, _num_replicas)); - _state->tablet_commit_infos().insert( - _state->tablet_commit_infos().end(), - std::make_move_iterator(tablet_commit_infos.begin()), - std::make_move_iterator(tablet_commit_infos.end())); + _state->add_tablet_commit_infos(tablet_commit_infos); } // _number_input_rows don't contain num_rows_load_filtered and num_rows_load_unselected in scan node --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org For additional commands, e-mail: commits-h...@doris.apache.org