This is an automated email from the ASF dual-hosted git repository.

yiguolei pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/doris.git


The following commit(s) were added to refs/heads/master by this push:
     new 8c59e16f811 [opt](query cancel) optimization for query cancel #28778
8c59e16f811 is described below

commit 8c59e16f811cd1266f4ea7e41bf8ddcfaea38574
Author: zhiqiang <seuhezhiqi...@163.com>
AuthorDate: Fri Dec 22 12:48:37 2023 +0800

    [opt](query cancel) optimization for query cancel #28778
---
 be/src/agent/heartbeat_server.cpp |  5 +++++
 be/src/runtime/exec_env.cpp       | 37 +++++++++++++++----------------------
 be/src/runtime/exec_env.h         |  1 +
 be/src/runtime/fragment_mgr.cpp   | 13 ++++++++++++-
 be/src/runtime/query_context.h    |  4 +++-
 be/src/util/debug_util.cpp        |  1 -
 6 files changed, 36 insertions(+), 25 deletions(-)

diff --git a/be/src/agent/heartbeat_server.cpp 
b/be/src/agent/heartbeat_server.cpp
index a0714ac2c5c..b7efbe6796f 100644
--- a/be/src/agent/heartbeat_server.cpp
+++ b/be/src/agent/heartbeat_server.cpp
@@ -220,6 +220,11 @@ Status HeartbeatServer::_heartbeat(const TMasterInfo& 
master_info) {
 
     if (master_info.__isset.frontend_infos) {
         ExecEnv::GetInstance()->update_frontends(master_info.frontend_infos);
+    } else {
+        LOG_EVERY_N(WARNING, 2) << fmt::format(
+                "Heartbeat from {}:{} does not have frontend_infos, this may 
because we are "
+                "upgrading cluster",
+                master_info.network_address.hostname, 
master_info.network_address.port);
     }
 
     if (need_report) {
diff --git a/be/src/runtime/exec_env.cpp b/be/src/runtime/exec_env.cpp
index 7115dd76f13..71aee1c5b47 100644
--- a/be/src/runtime/exec_env.cpp
+++ b/be/src/runtime/exec_env.cpp
@@ -18,11 +18,13 @@
 #include "runtime/exec_env.h"
 
 #include <gen_cpp/HeartbeatService_types.h>
+#include <glog/logging.h>
 
 #include <mutex>
 #include <utility>
 
 #include "common/config.h"
+#include "common/logging.h"
 #include "olap/olap_define.h"
 #include "olap/storage_engine.h"
 #include "olap/tablet_manager.h"
@@ -119,34 +121,25 @@ std::map<TNetworkAddress, FrontendInfo> 
ExecEnv::get_running_frontends() {
     const auto now = GetCurrentTimeMicros() / 1000;
 
     for (const auto& pair : _frontends) {
-        if (pair.second.info.process_uuid != 0) {
-            if (now - pair.second.last_reveiving_time_ms < expired_duration) {
+        auto& brpc_addr = pair.first;
+        auto& fe_info = pair.second;
+
+        if (fe_info.info.process_uuid == 0) {
+            // FE is in an unknown state, regart it as alive. conservative
+            res[brpc_addr] = fe_info;
+        } else {
+            if (now - fe_info.last_reveiving_time_ms < expired_duration) {
                 // If fe info has just been update in last expired_duration, 
regard it as running.
-                res[pair.first] = pair.second;
+                res[brpc_addr] = fe_info;
             } else {
                 // Fe info has not been udpate for more than expired_duration, 
regard it as an abnormal.
                 // Abnormal means this fe can not connect to master, and it is 
not dropped from cluster.
                 // or fe do not have master yet.
-                LOG(INFO) << "Frontend " << PrintFrontendInfo(pair.second.info)
-                          << " has not update its hb "
-                          << "for more than " << 
config::fe_expire_duration_seconds
-                          << " secs, regard it as abnormal.";
+                LOG_EVERY_N(WARNING, 50) << fmt::format(
+                        "Frontend {}:{} has not update its hb for more than {} 
secs, regard it as "
+                        "abnormal",
+                        brpc_addr.hostname, brpc_addr.port, 
config::fe_expire_duration_seconds);
             }
-
-            continue;
-        }
-
-        if (pair.second.last_reveiving_time_ms - 
pair.second.first_receiving_time_ms >
-            expired_duration) {
-            // A zero process-uuid that sustains more than 60 seconds(default).
-            // We will regard this fe as a abnormal frontend.
-            LOG(INFO) << "Frontend " << PrintFrontendInfo(pair.second.info)
-                      << " has not update its hb "
-                      << "for more than " << config::fe_expire_duration_seconds
-                      << " secs, regard it as abnormal.";
-            continue;
-        } else {
-            res[pair.first] = pair.second;
         }
     }
 
diff --git a/be/src/runtime/exec_env.h b/be/src/runtime/exec_env.h
index 95c4e97e1b3..4f4325a79c7 100644
--- a/be/src/runtime/exec_env.h
+++ b/be/src/runtime/exec_env.h
@@ -349,6 +349,7 @@ private:
     std::shared_ptr<WalManager> _wal_manager;
 
     std::mutex _frontends_lock;
+    // ip:brpc_port -> frontend_indo
     std::map<TNetworkAddress, FrontendInfo> _frontends;
     GroupCommitMgr* _group_commit_mgr = nullptr;
 
diff --git a/be/src/runtime/fragment_mgr.cpp b/be/src/runtime/fragment_mgr.cpp
index c54786e5428..65e1ba475ae 100644
--- a/be/src/runtime/fragment_mgr.cpp
+++ b/be/src/runtime/fragment_mgr.cpp
@@ -1167,6 +1167,8 @@ void FragmentMgr::cancel_worker() {
             } else {
                 for (const auto& q : _query_ctx_map) {
                     if (q.second->get_fe_process_uuid() == 0) {
+                        // zero means this query is from a older version fe or
+                        // this fe is starting
                         continue;
                     }
 
@@ -1175,7 +1177,16 @@ void FragmentMgr::cancel_worker() {
                         if (q.second->get_fe_process_uuid() == 
itr->second.info.process_uuid ||
                             itr->second.info.process_uuid == 0) {
                             continue;
+                        } else {
+                            LOG_WARNING("Coordinator of query {} restarted, 
going to cancel it.",
+                                        print_id(q.second->query_id()));
                         }
+                    } else {
+                        LOG_WARNING(
+                                "Could not find target coordinator {}:{} of 
query {}, going to "
+                                "cancel it.",
+                                q.second->coord_addr.hostname, 
q.second->coord_addr.port,
+                                print_id(q.second->query_id()));
                     }
 
                     // Coorninator of this query has already dead.
@@ -1195,7 +1206,7 @@ void FragmentMgr::cancel_worker() {
 
         if (!queries_to_cancel.empty()) {
             LOG(INFO) << "There are " << queries_to_cancel.size()
-                      << " queries need to be cancelled, coordinator dead.";
+                      << " queries need to be cancelled, coordinator dead or 
restarted.";
         }
 
         for (const auto& qid : queries_to_cancel) {
diff --git a/be/src/runtime/query_context.h b/be/src/runtime/query_context.h
index 6d392c56175..a230fd653e8 100644
--- a/be/src/runtime/query_context.h
+++ b/be/src/runtime/query_context.h
@@ -184,7 +184,9 @@ public:
         return _query_options.be_exec_version;
     }
 
-    [[nodiscard]] int64_t get_fe_process_uuid() const { return 
_query_options.fe_process_uuid; }
+    [[nodiscard]] int64_t get_fe_process_uuid() const {
+        return _query_options.__isset.fe_process_uuid ? 
_query_options.fe_process_uuid : 0;
+    }
 
     RuntimeFilterMgr* runtime_filter_mgr() { return _runtime_filter_mgr.get(); 
}
 
diff --git a/be/src/util/debug_util.cpp b/be/src/util/debug_util.cpp
index 85f2d482946..37243a6935b 100644
--- a/be/src/util/debug_util.cpp
+++ b/be/src/util/debug_util.cpp
@@ -125,7 +125,6 @@ std::string PrintFrontendInfos(const 
std::vector<TFrontendInfo>& fe_infos) {
 std::string PrintFrontendInfo(const TFrontendInfo& fe_info) {
     std::stringstream ss;
     fe_info.printTo(ss);
-
     return ss.str();
 }
 


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org
For additional commands, e-mail: commits-h...@doris.apache.org

Reply via email to