This is an automated email from the ASF dual-hosted git repository.

morningman pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-doris.git


The following commit(s) were added to refs/heads/master by this push:
     new d4249e4  [Bug] fix Runtime filter can't find fragment-id when 
apply_filter called early (#6923)
d4249e4 is described below

commit d4249e4f2dddd13ca6db472d772a7b2c5146be40
Author: Pxl <952130...@qq.com>
AuthorDate: Wed Oct 27 09:54:52 2021 +0800

    [Bug] fix Runtime filter can't find fragment-id when apply_filter called 
early (#6923)
    
    #6921
---
 be/src/runtime/fragment_mgr.cpp | 14 +++++++++++---
 be/src/runtime/fragment_mgr.h   |  2 ++
 2 files changed, 13 insertions(+), 3 deletions(-)

diff --git a/be/src/runtime/fragment_mgr.cpp b/be/src/runtime/fragment_mgr.cpp
index f980e7b..a8ff209 100644
--- a/be/src/runtime/fragment_mgr.cpp
+++ b/be/src/runtime/fragment_mgr.cpp
@@ -610,6 +610,7 @@ Status FragmentMgr::exec_plan_fragment(const 
TExecPlanFragmentParams& params, Fi
     {
         std::lock_guard<std::mutex> lock(_lock);
         
_fragment_map.insert(std::make_pair(params.params.fragment_instance_id, 
exec_state));
+        _cv.notify_all();
     }
 
     auto st = _thread_pool->submit_func(
@@ -747,7 +748,7 @@ Status FragmentMgr::exec_external_plan_fragment(const 
TScanOpenParams& params,
     }
 
     VLOG_QUERY << "BackendService execute open()  TQueryPlanInfo: "
-        << apache::thrift::ThriftDebugString(t_query_plan_info);
+               << apache::thrift::ThriftDebugString(t_query_plan_info);
     // assign the param used to execute PlanFragment
     TExecPlanFragmentParams exec_fragment_params;
     exec_fragment_params.protocol_version = 
(PaloInternalServiceVersion::type)0;
@@ -811,14 +812,21 @@ Status FragmentMgr::apply_filter(const 
PPublishFilterRequest* request, const cha
     std::shared_ptr<FragmentExecState> fragment_state;
 
     {
-        std::lock_guard<std::mutex> lock(_lock);
+        std::unique_lock<std::mutex> lock(_lock);
+        if (!_fragment_map.count(tfragment_instance_id)) {
+            VLOG_NOTICE << "wait for fragment start execute, fragment-id:" << 
fragment_instance_id;
+            _cv.wait_for(lock, std::chrono::milliseconds(1000),
+                         [&] { return 
_fragment_map.count(tfragment_instance_id); });
+        }
+
         auto iter = _fragment_map.find(tfragment_instance_id);
         if (iter == _fragment_map.end()) {
-            LOG(WARNING) << "unknown.... fragment-id:" << fragment_instance_id;
+            VLOG_CRITICAL << "unknown.... fragment-id:" << 
fragment_instance_id;
             return Status::InvalidArgument("fragment-id");
         }
         fragment_state = iter->second;
     }
+
     DCHECK(fragment_state != nullptr);
     RuntimeFilterMgr* runtime_filter_mgr =
             fragment_state->executor()->runtime_state()->runtime_filter_mgr();
diff --git a/be/src/runtime/fragment_mgr.h b/be/src/runtime/fragment_mgr.h
index 70233e1..ba56216 100644
--- a/be/src/runtime/fragment_mgr.h
+++ b/be/src/runtime/fragment_mgr.h
@@ -99,6 +99,8 @@ private:
 
     std::mutex _lock;
 
+    std::condition_variable _cv;
+
     // Make sure that remove this before no data reference FragmentExecState
     std::unordered_map<TUniqueId, std::shared_ptr<FragmentExecState>> 
_fragment_map;
     // query id -> QueryFragmentsCtx

---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org
For additional commands, e-mail: commits-h...@doris.apache.org

Reply via email to