This is an automated email from the ASF dual-hosted git repository.

dataroaring pushed a commit to branch branch-3.0
in repository https://gitbox.apache.org/repos/asf/doris.git


The following commit(s) were added to refs/heads/branch-3.0 by this push:
     new 9c39a343fc7 [fix](load) add lock for 
runtime_state->tablet_commit_infos (#48709) (#48732)
9c39a343fc7 is described below

commit 9c39a343fc7b0578aea1dddeed4f4d16cba7929f
Author: Kaijie Chen <chenkai...@selectdb.com>
AuthorDate: Mon Mar 10 17:58:14 2025 +0800

    [fix](load) add lock for runtime_state->tablet_commit_infos (#48709) 
(#48732)
    
    backport #48709
---
 be/src/io/fs/multi_table_pipe.cpp                  |  4 ++--
 be/src/runtime/fragment_mgr.cpp                    | 27 ++++++++--------------
 be/src/runtime/runtime_state.h                     | 27 +++++++++++++++++-----
 .../runtime/stream_load/stream_load_executor.cpp   |  2 +-
 be/src/vec/sink/writer/vtablet_writer.cpp          | 21 +++++++++--------
 be/src/vec/sink/writer/vtablet_writer_v2.cpp       |  5 +---
 6 files changed, 45 insertions(+), 41 deletions(-)

diff --git a/be/src/io/fs/multi_table_pipe.cpp 
b/be/src/io/fs/multi_table_pipe.cpp
index b3af2531f15..eb601c5d6f5 100644
--- a/be/src/io/fs/multi_table_pipe.cpp
+++ b/be/src/io/fs/multi_table_pipe.cpp
@@ -263,9 +263,9 @@ Status MultiTablePipe::exec_plans(ExecEnv* exec_env, 
std::vector<ExecParam> para
 
                     {
                         std::lock_guard<std::mutex> 
l(_tablet_commit_infos_lock);
+                        auto commit_infos = state->tablet_commit_infos();
                         _tablet_commit_infos.insert(_tablet_commit_infos.end(),
-                                                    
state->tablet_commit_infos().begin(),
-                                                    
state->tablet_commit_infos().end());
+                                                    commit_infos.begin(), 
commit_infos.end());
                     }
                     _number_total_rows += state->num_rows_load_total();
                     _number_loaded_rows += state->num_rows_load_success();
diff --git a/be/src/runtime/fragment_mgr.cpp b/be/src/runtime/fragment_mgr.cpp
index c8c012543f9..2eea1e27ce9 100644
--- a/be/src/runtime/fragment_mgr.cpp
+++ b/be/src/runtime/fragment_mgr.cpp
@@ -530,35 +530,26 @@ void FragmentMgr::coordinator_callback(const 
ReportStatusRequest& req) {
             }
         }
     }
-    if (!req.runtime_state->tablet_commit_infos().empty()) {
+    if (auto tci = req.runtime_state->tablet_commit_infos(); !tci.empty()) {
         params.__isset.commitInfos = true;
-        
params.commitInfos.reserve(req.runtime_state->tablet_commit_infos().size());
-        for (auto& info : req.runtime_state->tablet_commit_infos()) {
-            params.commitInfos.push_back(info);
-        }
+        params.commitInfos.insert(params.commitInfos.end(), tci.begin(), 
tci.end());
     } else if (!req.runtime_states.empty()) {
         for (auto* rs : req.runtime_states) {
-            if (!rs->tablet_commit_infos().empty()) {
+            if (auto rs_tci = rs->tablet_commit_infos(); !rs_tci.empty()) {
                 params.__isset.commitInfos = true;
-                params.commitInfos.insert(params.commitInfos.end(),
-                                          rs->tablet_commit_infos().begin(),
-                                          rs->tablet_commit_infos().end());
+                params.commitInfos.insert(params.commitInfos.end(), 
rs_tci.begin(), rs_tci.end());
             }
         }
     }
-    if (!req.runtime_state->error_tablet_infos().empty()) {
+    if (auto eti = req.runtime_state->error_tablet_infos(); !eti.empty()) {
         params.__isset.errorTabletInfos = true;
-        
params.errorTabletInfos.reserve(req.runtime_state->error_tablet_infos().size());
-        for (auto& info : req.runtime_state->error_tablet_infos()) {
-            params.errorTabletInfos.push_back(info);
-        }
+        params.errorTabletInfos.insert(params.errorTabletInfos.end(), 
eti.begin(), eti.end());
     } else if (!req.runtime_states.empty()) {
         for (auto* rs : req.runtime_states) {
-            if (!rs->error_tablet_infos().empty()) {
+            if (auto rs_eti = rs->error_tablet_infos(); !rs_eti.empty()) {
                 params.__isset.errorTabletInfos = true;
-                params.errorTabletInfos.insert(params.errorTabletInfos.end(),
-                                               
rs->error_tablet_infos().begin(),
-                                               rs->error_tablet_infos().end());
+                params.errorTabletInfos.insert(params.errorTabletInfos.end(), 
rs_eti.begin(),
+                                               rs_eti.end());
             }
         }
     }
diff --git a/be/src/runtime/runtime_state.h b/be/src/runtime/runtime_state.h
index e86990ae92b..a42f4b5232a 100644
--- a/be/src/runtime/runtime_state.h
+++ b/be/src/runtime/runtime_state.h
@@ -433,19 +433,33 @@ public:
         return _query_options.partitioned_hash_agg_rows_threshold;
     }
 
-    const std::vector<TTabletCommitInfo>& tablet_commit_infos() const {
+    std::vector<TTabletCommitInfo> tablet_commit_infos() const {
+        std::lock_guard<std::mutex> lock(_tablet_infos_mutex);
         return _tablet_commit_infos;
     }
 
-    std::vector<TTabletCommitInfo>& tablet_commit_infos() { return 
_tablet_commit_infos; }
+    void add_tablet_commit_infos(std::vector<TTabletCommitInfo>& commit_infos) 
{
+        std::lock_guard<std::mutex> lock(_tablet_infos_mutex);
+        _tablet_commit_infos.insert(_tablet_commit_infos.end(),
+                                    
std::make_move_iterator(commit_infos.begin()),
+                                    
std::make_move_iterator(commit_infos.end()));
+    }
 
-    std::vector<THivePartitionUpdate>& hive_partition_updates() { return 
_hive_partition_updates; }
+    std::vector<TErrorTabletInfo> error_tablet_infos() const {
+        std::lock_guard<std::mutex> lock(_tablet_infos_mutex);
+        return _error_tablet_infos;
+    }
 
-    std::vector<TIcebergCommitData>& iceberg_commit_datas() { return 
_iceberg_commit_datas; }
+    void add_error_tablet_infos(std::vector<TErrorTabletInfo>& tablet_infos) {
+        std::lock_guard<std::mutex> lock(_tablet_infos_mutex);
+        _error_tablet_infos.insert(_error_tablet_infos.end(),
+                                   
std::make_move_iterator(tablet_infos.begin()),
+                                   
std::make_move_iterator(tablet_infos.end()));
+    }
 
-    const std::vector<TErrorTabletInfo>& error_tablet_infos() const { return 
_error_tablet_infos; }
+    std::vector<THivePartitionUpdate>& hive_partition_updates() { return 
_hive_partition_updates; }
 
-    std::vector<TErrorTabletInfo>& error_tablet_infos() { return 
_error_tablet_infos; }
+    std::vector<TIcebergCommitData>& iceberg_commit_datas() { return 
_iceberg_commit_datas; }
 
     // local runtime filter mgr, the runtime filter do not have remote target 
or
     // not need local merge should regist here. the instance exec finish, the 
local
@@ -712,6 +726,7 @@ private:
     int64_t _error_row_number;
     std::string _error_log_file_path;
     std::unique_ptr<std::ofstream> _error_log_file; // error file path, 
absolute path
+    mutable std::mutex _tablet_infos_mutex;
     std::vector<TTabletCommitInfo> _tablet_commit_infos;
     std::vector<TErrorTabletInfo> _error_tablet_infos;
     int _max_operator_id = 0;
diff --git a/be/src/runtime/stream_load/stream_load_executor.cpp 
b/be/src/runtime/stream_load/stream_load_executor.cpp
index ad4d22946f1..1c1fa992f50 100644
--- a/be/src/runtime/stream_load/stream_load_executor.cpp
+++ b/be/src/runtime/stream_load/stream_load_executor.cpp
@@ -78,7 +78,7 @@ Status 
StreamLoadExecutor::execute_plan_fragment(std::shared_ptr<StreamLoadConte
             ctx->txn_id = state->wal_id();
         }
         ctx->exec_env()->new_load_stream_mgr()->remove(ctx->id);
-        ctx->commit_infos = std::move(state->tablet_commit_infos());
+        ctx->commit_infos = state->tablet_commit_infos();
         ctx->number_total_rows = state->num_rows_load_total();
         ctx->number_loaded_rows = state->num_rows_load_success();
         ctx->number_filtered_rows = state->num_rows_load_filtered();
diff --git a/be/src/vec/sink/writer/vtablet_writer.cpp 
b/be/src/vec/sink/writer/vtablet_writer.cpp
index ccf56f5f433..a10d5e46a26 100644
--- a/be/src/vec/sink/writer/vtablet_writer.cpp
+++ b/be/src/vec/sink/writer/vtablet_writer.cpp
@@ -194,15 +194,18 @@ Status IndexChannel::check_intolerable_failure() {
 }
 
 void IndexChannel::set_error_tablet_in_state(RuntimeState* state) {
-    std::vector<TErrorTabletInfo>& error_tablet_infos = 
state->error_tablet_infos();
+    std::vector<TErrorTabletInfo> error_tablet_infos;
 
-    std::lock_guard<doris::SpinLock> l(_fail_lock);
-    for (const auto& it : _failed_channels_msgs) {
-        TErrorTabletInfo error_info;
-        error_info.__set_tabletId(it.first);
-        error_info.__set_msg(it.second);
-        error_tablet_infos.emplace_back(error_info);
+    {
+        std::lock_guard<doris::SpinLock> l(_fail_lock);
+        for (const auto& it : _failed_channels_msgs) {
+            TErrorTabletInfo error_info;
+            error_info.__set_tabletId(it.first);
+            error_info.__set_msg(it.second);
+            error_tablet_infos.emplace_back(error_info);
+        }
     }
+    state->add_error_tablet_infos(error_tablet_infos);
 }
 
 void IndexChannel::set_tablets_received_rows(
@@ -967,9 +970,7 @@ Status VNodeChannel::close_wait(RuntimeState* state) {
 
     if (_add_batches_finished) {
         _close_check();
-        state->tablet_commit_infos().insert(state->tablet_commit_infos().end(),
-                                            
std::make_move_iterator(_tablet_commit_infos.begin()),
-                                            
std::make_move_iterator(_tablet_commit_infos.end()));
+        _state->add_tablet_commit_infos(_tablet_commit_infos);
 
         _index_channel->set_error_tablet_in_state(state);
         _index_channel->set_tablets_received_rows(_tablets_received_rows, 
_node_id);
diff --git a/be/src/vec/sink/writer/vtablet_writer_v2.cpp 
b/be/src/vec/sink/writer/vtablet_writer_v2.cpp
index b08b5fdf328..f432595efa5 100644
--- a/be/src/vec/sink/writer/vtablet_writer_v2.cpp
+++ b/be/src/vec/sink/writer/vtablet_writer_v2.cpp
@@ -672,10 +672,7 @@ Status VTabletWriterV2::close(Status exec_status) {
             std::vector<TTabletCommitInfo> tablet_commit_infos;
             RETURN_IF_ERROR(
                     _create_commit_info(tablet_commit_infos, _load_stream_map, 
_num_replicas));
-            _state->tablet_commit_infos().insert(
-                    _state->tablet_commit_infos().end(),
-                    std::make_move_iterator(tablet_commit_infos.begin()),
-                    std::make_move_iterator(tablet_commit_infos.end()));
+            _state->add_tablet_commit_infos(tablet_commit_infos);
         }
 
         // _number_input_rows don't contain num_rows_load_filtered and 
num_rows_load_unselected in scan node


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org
For additional commands, e-mail: commits-h...@doris.apache.org

Reply via email to