This is an automated email from the ASF dual-hosted git repository. morningman pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/incubator-doris.git
The following commit(s) were added to refs/heads/master by this push: new d4249e4 [Bug] fix Runtime filter can't find fragment-id when apply_filter called early (#6923) d4249e4 is described below commit d4249e4f2dddd13ca6db472d772a7b2c5146be40 Author: Pxl <952130...@qq.com> AuthorDate: Wed Oct 27 09:54:52 2021 +0800 [Bug] fix Runtime filter can't find fragment-id when apply_filter called early (#6923) #6921 --- be/src/runtime/fragment_mgr.cpp | 14 +++++++++++--- be/src/runtime/fragment_mgr.h | 2 ++ 2 files changed, 13 insertions(+), 3 deletions(-) diff --git a/be/src/runtime/fragment_mgr.cpp b/be/src/runtime/fragment_mgr.cpp index f980e7b..a8ff209 100644 --- a/be/src/runtime/fragment_mgr.cpp +++ b/be/src/runtime/fragment_mgr.cpp @@ -610,6 +610,7 @@ Status FragmentMgr::exec_plan_fragment(const TExecPlanFragmentParams& params, Fi { std::lock_guard<std::mutex> lock(_lock); _fragment_map.insert(std::make_pair(params.params.fragment_instance_id, exec_state)); + _cv.notify_all(); } auto st = _thread_pool->submit_func( @@ -747,7 +748,7 @@ Status FragmentMgr::exec_external_plan_fragment(const TScanOpenParams& params, } VLOG_QUERY << "BackendService execute open() TQueryPlanInfo: " - << apache::thrift::ThriftDebugString(t_query_plan_info); + << apache::thrift::ThriftDebugString(t_query_plan_info); // assign the param used to execute PlanFragment TExecPlanFragmentParams exec_fragment_params; exec_fragment_params.protocol_version = (PaloInternalServiceVersion::type)0; @@ -811,14 +812,21 @@ Status FragmentMgr::apply_filter(const PPublishFilterRequest* request, const cha std::shared_ptr<FragmentExecState> fragment_state; { - std::lock_guard<std::mutex> lock(_lock); + std::unique_lock<std::mutex> lock(_lock); + if (!_fragment_map.count(tfragment_instance_id)) { + VLOG_NOTICE << "wait for fragment start execute, fragment-id:" << fragment_instance_id; + _cv.wait_for(lock, std::chrono::milliseconds(1000), + [&] { return _fragment_map.count(tfragment_instance_id); }); + } + auto iter = _fragment_map.find(tfragment_instance_id); if (iter == _fragment_map.end()) { - LOG(WARNING) << "unknown.... fragment-id:" << fragment_instance_id; + VLOG_CRITICAL << "unknown.... fragment-id:" << fragment_instance_id; return Status::InvalidArgument("fragment-id"); } fragment_state = iter->second; } + DCHECK(fragment_state != nullptr); RuntimeFilterMgr* runtime_filter_mgr = fragment_state->executor()->runtime_state()->runtime_filter_mgr(); diff --git a/be/src/runtime/fragment_mgr.h b/be/src/runtime/fragment_mgr.h index 70233e1..ba56216 100644 --- a/be/src/runtime/fragment_mgr.h +++ b/be/src/runtime/fragment_mgr.h @@ -99,6 +99,8 @@ private: std::mutex _lock; + std::condition_variable _cv; + // Make sure that remove this before no data reference FragmentExecState std::unordered_map<TUniqueId, std::shared_ptr<FragmentExecState>> _fragment_map; // query id -> QueryFragmentsCtx --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org For additional commands, e-mail: commits-h...@doris.apache.org