This is an automated email from the ASF dual-hosted git repository.

gabriellee pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/doris.git


The following commit(s) were added to refs/heads/master by this push:
     new 4649faf2a84 [fix](load) Fix wrong results for high-concurrent loading 
(#36841)
4649faf2a84 is described below

commit 4649faf2a841d3d421ee48640770ed1f4e764dbf
Author: Gabriel <gabrielleeb...@gmail.com>
AuthorDate: Wed Jun 26 19:41:42 2024 +0800

    [fix](load) Fix wrong results for high-concurrent loading (#36841)
---
 be/src/runtime/group_commit_mgr.cpp | 31 +++++++++++++++++++------------
 be/src/runtime/group_commit_mgr.h   |  1 +
 2 files changed, 20 insertions(+), 12 deletions(-)

diff --git a/be/src/runtime/group_commit_mgr.cpp 
b/be/src/runtime/group_commit_mgr.cpp
index ab11b795ed5..111780c9a42 100644
--- a/be/src/runtime/group_commit_mgr.cpp
+++ b/be/src/runtime/group_commit_mgr.cpp
@@ -287,18 +287,25 @@ Status GroupCommitTable::get_first_block_load_queue(
     if (!_is_creating_plan_fragment) {
         _is_creating_plan_fragment = true;
         create_plan_dep->block();
-        RETURN_IF_ERROR(
-                _thread_pool->submit_func([&, be_exe_version, mem_tracker, dep 
= create_plan_dep] {
-                    Defer defer {[&, dep = dep]() {
-                        dep->set_ready();
-                        std::unique_lock l(_lock);
-                        _is_creating_plan_fragment = false;
-                    }};
-                    auto st = _create_group_commit_load(be_exe_version, 
mem_tracker);
-                    if (!st.ok()) {
-                        LOG(WARNING) << "create group commit load error, st=" 
<< st.to_string();
-                    }
-                }));
+        RETURN_IF_ERROR(_thread_pool->submit_func([&, be_exe_version, 
mem_tracker,
+                                                   dep = create_plan_dep] {
+            Defer defer {[&, dep = dep]() {
+                dep->set_ready();
+                std::unique_lock l(_lock);
+                for (auto it : _create_plan_deps) {
+                    it->set_ready();
+                }
+                std::vector<std::shared_ptr<pipeline::Dependency>> 
{}.swap(_create_plan_deps);
+                _is_creating_plan_fragment = false;
+            }};
+            auto st = _create_group_commit_load(be_exe_version, mem_tracker);
+            if (!st.ok()) {
+                LOG(WARNING) << "create group commit load error, st=" << 
st.to_string();
+            }
+        }));
+    } else {
+        create_plan_dep->block();
+        _create_plan_deps.push_back(create_plan_dep);
     }
     return try_to_get_matched_queue();
 }
diff --git a/be/src/runtime/group_commit_mgr.h 
b/be/src/runtime/group_commit_mgr.h
index 702ebb9c746..c668197e8dc 100644
--- a/be/src/runtime/group_commit_mgr.h
+++ b/be/src/runtime/group_commit_mgr.h
@@ -187,6 +187,7 @@ private:
     // fragment_instance_id to load_block_queue
     std::unordered_map<UniqueId, std::shared_ptr<LoadBlockQueue>> 
_load_block_queues;
     bool _is_creating_plan_fragment = false;
+    std::vector<std::shared_ptr<pipeline::Dependency>> _create_plan_deps;
 };
 
 class GroupCommitMgr {


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org
For additional commands, e-mail: commits-h...@doris.apache.org

Reply via email to