This is an automated email from the ASF dual-hosted git repository. gabriellee pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/master by this push: new 4649faf2a84 [fix](load) Fix wrong results for high-concurrent loading (#36841) 4649faf2a84 is described below commit 4649faf2a841d3d421ee48640770ed1f4e764dbf Author: Gabriel <gabrielleeb...@gmail.com> AuthorDate: Wed Jun 26 19:41:42 2024 +0800 [fix](load) Fix wrong results for high-concurrent loading (#36841) --- be/src/runtime/group_commit_mgr.cpp | 31 +++++++++++++++++++------------ be/src/runtime/group_commit_mgr.h | 1 + 2 files changed, 20 insertions(+), 12 deletions(-) diff --git a/be/src/runtime/group_commit_mgr.cpp b/be/src/runtime/group_commit_mgr.cpp index ab11b795ed5..111780c9a42 100644 --- a/be/src/runtime/group_commit_mgr.cpp +++ b/be/src/runtime/group_commit_mgr.cpp @@ -287,18 +287,25 @@ Status GroupCommitTable::get_first_block_load_queue( if (!_is_creating_plan_fragment) { _is_creating_plan_fragment = true; create_plan_dep->block(); - RETURN_IF_ERROR( - _thread_pool->submit_func([&, be_exe_version, mem_tracker, dep = create_plan_dep] { - Defer defer {[&, dep = dep]() { - dep->set_ready(); - std::unique_lock l(_lock); - _is_creating_plan_fragment = false; - }}; - auto st = _create_group_commit_load(be_exe_version, mem_tracker); - if (!st.ok()) { - LOG(WARNING) << "create group commit load error, st=" << st.to_string(); - } - })); + RETURN_IF_ERROR(_thread_pool->submit_func([&, be_exe_version, mem_tracker, + dep = create_plan_dep] { + Defer defer {[&, dep = dep]() { + dep->set_ready(); + std::unique_lock l(_lock); + for (auto it : _create_plan_deps) { + it->set_ready(); + } + std::vector<std::shared_ptr<pipeline::Dependency>> {}.swap(_create_plan_deps); + _is_creating_plan_fragment = false; + }}; + auto st = _create_group_commit_load(be_exe_version, mem_tracker); + if (!st.ok()) { + LOG(WARNING) << "create group commit load error, st=" << st.to_string(); + } + })); + } else { + create_plan_dep->block(); + _create_plan_deps.push_back(create_plan_dep); } return try_to_get_matched_queue(); } diff --git a/be/src/runtime/group_commit_mgr.h b/be/src/runtime/group_commit_mgr.h index 702ebb9c746..c668197e8dc 100644 --- a/be/src/runtime/group_commit_mgr.h +++ b/be/src/runtime/group_commit_mgr.h @@ -187,6 +187,7 @@ private: // fragment_instance_id to load_block_queue std::unordered_map<UniqueId, std::shared_ptr<LoadBlockQueue>> _load_block_queues; bool _is_creating_plan_fragment = false; + std::vector<std::shared_ptr<pipeline::Dependency>> _create_plan_deps; }; class GroupCommitMgr { --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org For additional commands, e-mail: commits-h...@doris.apache.org