This is an automated email from the ASF dual-hosted git repository. yiguolei pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/master by this push: new 8c59e16f811 [opt](query cancel) optimization for query cancel #28778 8c59e16f811 is described below commit 8c59e16f811cd1266f4ea7e41bf8ddcfaea38574 Author: zhiqiang <seuhezhiqi...@163.com> AuthorDate: Fri Dec 22 12:48:37 2023 +0800 [opt](query cancel) optimization for query cancel #28778 --- be/src/agent/heartbeat_server.cpp | 5 +++++ be/src/runtime/exec_env.cpp | 37 +++++++++++++++---------------------- be/src/runtime/exec_env.h | 1 + be/src/runtime/fragment_mgr.cpp | 13 ++++++++++++- be/src/runtime/query_context.h | 4 +++- be/src/util/debug_util.cpp | 1 - 6 files changed, 36 insertions(+), 25 deletions(-) diff --git a/be/src/agent/heartbeat_server.cpp b/be/src/agent/heartbeat_server.cpp index a0714ac2c5c..b7efbe6796f 100644 --- a/be/src/agent/heartbeat_server.cpp +++ b/be/src/agent/heartbeat_server.cpp @@ -220,6 +220,11 @@ Status HeartbeatServer::_heartbeat(const TMasterInfo& master_info) { if (master_info.__isset.frontend_infos) { ExecEnv::GetInstance()->update_frontends(master_info.frontend_infos); + } else { + LOG_EVERY_N(WARNING, 2) << fmt::format( + "Heartbeat from {}:{} does not have frontend_infos, this may because we are " + "upgrading cluster", + master_info.network_address.hostname, master_info.network_address.port); } if (need_report) { diff --git a/be/src/runtime/exec_env.cpp b/be/src/runtime/exec_env.cpp index 7115dd76f13..71aee1c5b47 100644 --- a/be/src/runtime/exec_env.cpp +++ b/be/src/runtime/exec_env.cpp @@ -18,11 +18,13 @@ #include "runtime/exec_env.h" #include <gen_cpp/HeartbeatService_types.h> +#include <glog/logging.h> #include <mutex> #include <utility> #include "common/config.h" +#include "common/logging.h" #include "olap/olap_define.h" #include "olap/storage_engine.h" #include "olap/tablet_manager.h" @@ -119,34 +121,25 @@ std::map<TNetworkAddress, FrontendInfo> ExecEnv::get_running_frontends() { const auto now = GetCurrentTimeMicros() / 1000; for (const auto& pair : _frontends) { - if (pair.second.info.process_uuid != 0) { - if (now - pair.second.last_reveiving_time_ms < expired_duration) { + auto& brpc_addr = pair.first; + auto& fe_info = pair.second; + + if (fe_info.info.process_uuid == 0) { + // FE is in an unknown state, regart it as alive. conservative + res[brpc_addr] = fe_info; + } else { + if (now - fe_info.last_reveiving_time_ms < expired_duration) { // If fe info has just been update in last expired_duration, regard it as running. - res[pair.first] = pair.second; + res[brpc_addr] = fe_info; } else { // Fe info has not been udpate for more than expired_duration, regard it as an abnormal. // Abnormal means this fe can not connect to master, and it is not dropped from cluster. // or fe do not have master yet. - LOG(INFO) << "Frontend " << PrintFrontendInfo(pair.second.info) - << " has not update its hb " - << "for more than " << config::fe_expire_duration_seconds - << " secs, regard it as abnormal."; + LOG_EVERY_N(WARNING, 50) << fmt::format( + "Frontend {}:{} has not update its hb for more than {} secs, regard it as " + "abnormal", + brpc_addr.hostname, brpc_addr.port, config::fe_expire_duration_seconds); } - - continue; - } - - if (pair.second.last_reveiving_time_ms - pair.second.first_receiving_time_ms > - expired_duration) { - // A zero process-uuid that sustains more than 60 seconds(default). - // We will regard this fe as a abnormal frontend. - LOG(INFO) << "Frontend " << PrintFrontendInfo(pair.second.info) - << " has not update its hb " - << "for more than " << config::fe_expire_duration_seconds - << " secs, regard it as abnormal."; - continue; - } else { - res[pair.first] = pair.second; } } diff --git a/be/src/runtime/exec_env.h b/be/src/runtime/exec_env.h index 95c4e97e1b3..4f4325a79c7 100644 --- a/be/src/runtime/exec_env.h +++ b/be/src/runtime/exec_env.h @@ -349,6 +349,7 @@ private: std::shared_ptr<WalManager> _wal_manager; std::mutex _frontends_lock; + // ip:brpc_port -> frontend_indo std::map<TNetworkAddress, FrontendInfo> _frontends; GroupCommitMgr* _group_commit_mgr = nullptr; diff --git a/be/src/runtime/fragment_mgr.cpp b/be/src/runtime/fragment_mgr.cpp index c54786e5428..65e1ba475ae 100644 --- a/be/src/runtime/fragment_mgr.cpp +++ b/be/src/runtime/fragment_mgr.cpp @@ -1167,6 +1167,8 @@ void FragmentMgr::cancel_worker() { } else { for (const auto& q : _query_ctx_map) { if (q.second->get_fe_process_uuid() == 0) { + // zero means this query is from a older version fe or + // this fe is starting continue; } @@ -1175,7 +1177,16 @@ void FragmentMgr::cancel_worker() { if (q.second->get_fe_process_uuid() == itr->second.info.process_uuid || itr->second.info.process_uuid == 0) { continue; + } else { + LOG_WARNING("Coordinator of query {} restarted, going to cancel it.", + print_id(q.second->query_id())); } + } else { + LOG_WARNING( + "Could not find target coordinator {}:{} of query {}, going to " + "cancel it.", + q.second->coord_addr.hostname, q.second->coord_addr.port, + print_id(q.second->query_id())); } // Coorninator of this query has already dead. @@ -1195,7 +1206,7 @@ void FragmentMgr::cancel_worker() { if (!queries_to_cancel.empty()) { LOG(INFO) << "There are " << queries_to_cancel.size() - << " queries need to be cancelled, coordinator dead."; + << " queries need to be cancelled, coordinator dead or restarted."; } for (const auto& qid : queries_to_cancel) { diff --git a/be/src/runtime/query_context.h b/be/src/runtime/query_context.h index 6d392c56175..a230fd653e8 100644 --- a/be/src/runtime/query_context.h +++ b/be/src/runtime/query_context.h @@ -184,7 +184,9 @@ public: return _query_options.be_exec_version; } - [[nodiscard]] int64_t get_fe_process_uuid() const { return _query_options.fe_process_uuid; } + [[nodiscard]] int64_t get_fe_process_uuid() const { + return _query_options.__isset.fe_process_uuid ? _query_options.fe_process_uuid : 0; + } RuntimeFilterMgr* runtime_filter_mgr() { return _runtime_filter_mgr.get(); } diff --git a/be/src/util/debug_util.cpp b/be/src/util/debug_util.cpp index 85f2d482946..37243a6935b 100644 --- a/be/src/util/debug_util.cpp +++ b/be/src/util/debug_util.cpp @@ -125,7 +125,6 @@ std::string PrintFrontendInfos(const std::vector<TFrontendInfo>& fe_infos) { std::string PrintFrontendInfo(const TFrontendInfo& fe_info) { std::stringstream ss; fe_info.printTo(ss); - return ss.str(); } --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org For additional commands, e-mail: commits-h...@doris.apache.org