This is an automated email from the ASF dual-hosted git repository.

wangbo pushed a commit to branch branch-2.0
in repository https://gitbox.apache.org/repos/asf/doris.git


The following commit(s) were added to refs/heads/branch-2.0 by this push:
     new fb789f1dddd [Fix]Fix insert select missing audit log when connect 
follower FE (#36454)
fb789f1dddd is described below

commit fb789f1ddddf5719c1de6354f4ae19f5f71b8e2f
Author: wangbo <wan...@apache.org>
AuthorDate: Thu Jun 20 15:16:14 2024 +0800

    [Fix]Fix insert select missing audit log when connect follower FE (#36454)
    
    ## Proposed changes
    Fix when a ```insert select``` is executed in Follower, audit log could
    missing query statistics.
    This is because ```audit log``` is logged in the connect FE, but request
    is forward to master FE, then the coord FE is master FE, BE report query
    statistics to cood FE, finally the connected Follower could not get
    reported query statistics, audit log missing query statistics.
    We can add a new field to mark client connected FE, then be report query
    statistics to the connected FE.
    Besides, I do refactor for FE's WorkloadRuntimeStatusMgr to make logic
    more clear and add some log in be.
---
 be/src/runtime/fragment_mgr.cpp                    |   4 +-
 be/src/runtime/query_context.h                     |   7 +-
 be/src/runtime/runtime_query_statistics_mgr.cpp    |  34 ++++--
 .../apache/doris/planner/StreamLoadPlanner.java    |   2 +
 .../java/org/apache/doris/qe/ConnectContext.java   |   9 ++
 .../main/java/org/apache/doris/qe/Coordinator.java |  14 +++
 .../WorkloadRuntimeStatusMgr.java                  | 121 +++++++++++----------
 gensrc/thrift/PaloInternalService.thrift           |   3 +
 8 files changed, 119 insertions(+), 75 deletions(-)

diff --git a/be/src/runtime/fragment_mgr.cpp b/be/src/runtime/fragment_mgr.cpp
index 1529d66def2..66538529c3f 100644
--- a/be/src/runtime/fragment_mgr.cpp
+++ b/be/src/runtime/fragment_mgr.cpp
@@ -692,9 +692,11 @@ Status FragmentMgr::_get_query_ctx(const Params& params, 
TUniqueId query_id, boo
         }
 
         query_ctx->coord_addr = params.coord;
+        query_ctx->current_connect_fe = params.current_connect_fe;
         LOG(INFO) << "query_id: " << UniqueId(query_ctx->query_id.hi, 
query_ctx->query_id.lo)
                   << " coord_addr " << query_ctx->coord_addr
-                  << " total fragment num on current host: " << 
params.fragment_num_on_host;
+                  << " total fragment num on current host: " << 
params.fragment_num_on_host
+                  << " report audit fe:" << params.current_connect_fe;
         query_ctx->query_globals = params.query_globals;
 
         if (params.__isset.resource_info) {
diff --git a/be/src/runtime/query_context.h b/be/src/runtime/query_context.h
index 8746483df4c..e47e09e5921 100644
--- a/be/src/runtime/query_context.h
+++ b/be/src/runtime/query_context.h
@@ -182,7 +182,7 @@ public:
 
     void register_query_statistics(std::shared_ptr<QueryStatistics> qs) {
         
_exec_env->runtime_query_statistics_mgr()->register_query_statistics(print_id(query_id),
 qs,
-                                                                             
coord_addr);
+                                                                             
current_connect_fe);
     }
 
     std::shared_ptr<QueryStatistics> get_query_statistics() {
@@ -198,7 +198,7 @@ public:
                 if (_exec_env &&
                     _exec_env->runtime_query_statistics_mgr()) { // for ut 
FragmentMgrTest.normal
                     
_exec_env->runtime_query_statistics_mgr()->register_query_statistics(
-                            query_id_str, qs, coord_addr);
+                            query_id_str, qs, current_connect_fe);
                 }
             } else {
                 LOG(INFO) << " query " << query_id_str << " get memory query 
statistics failed ";
@@ -212,7 +212,7 @@ public:
             if (_exec_env &&
                 _exec_env->runtime_query_statistics_mgr()) { // for ut 
FragmentMgrTest.normal
                 
_exec_env->runtime_query_statistics_mgr()->register_query_statistics(
-                        print_id(query_id), _cpu_statistics, coord_addr);
+                        print_id(query_id), _cpu_statistics, 
current_connect_fe);
             }
         }
     }
@@ -226,6 +226,7 @@ public:
     std::string user;
     std::string group;
     TNetworkAddress coord_addr;
+    TNetworkAddress current_connect_fe;
     TQueryGlobals query_globals;
 
     /// In the current implementation, for multiple fragments executed by a 
query on the same BE node,
diff --git a/be/src/runtime/runtime_query_statistics_mgr.cpp 
b/be/src/runtime/runtime_query_statistics_mgr.cpp
index 5c40ea61763..0ed8cbeb79c 100644
--- a/be/src/runtime/runtime_query_statistics_mgr.cpp
+++ b/be/src/runtime/runtime_query_statistics_mgr.cpp
@@ -83,8 +83,8 @@ void 
RuntimeQueryStatiticsMgr::report_runtime_query_statistics() {
 
         if (!coord_status.ok()) {
             std::stringstream ss;
-            LOG(WARNING) << "could not get client " << add_str
-                         << " when report workload runtime stats, reason is "
+            LOG(WARNING) << "[report_query_statistics]could not get client " 
<< add_str
+                         << " when report workload runtime stats, reason:"
                          << coord_status.to_string();
             continue;
         }
@@ -103,26 +103,38 @@ void 
RuntimeQueryStatiticsMgr::report_runtime_query_statistics() {
             coord->reportExecStatus(res, params);
             rpc_result[addr] = true;
         } catch (apache::thrift::TApplicationException& e) {
-            LOG(WARNING) << "fe " << add_str
-                         << " throw exception when report statistics, reason=" 
<< e.what()
+            LOG(WARNING) << "[report_query_statistics]fe " << add_str
+                         << " throw exception when report statistics, reason:" 
<< e.what()
                          << " , you can see fe log for details.";
         } catch (apache::thrift::transport::TTransportException& e) {
-            LOG(WARNING) << "report workload runtime statistics to " << add_str
-                         << " failed,  err: " << e.what();
+            LOG(WARNING) << "[report_query_statistics]report workload runtime 
statistics to "
+                         << add_str << " failed,  reason: " << e.what();
             rpc_status = coord.reopen();
             if (!rpc_status.ok()) {
-                LOG(WARNING)
-                        << "reopen thrift client failed when report workload 
runtime statistics to"
-                        << add_str;
+                LOG(WARNING) << "[report_query_statistics]reopen thrift client 
failed when report "
+                                "workload runtime statistics to"
+                             << add_str;
             } else {
                 try {
                     coord->reportExecStatus(res, params);
                     rpc_result[addr] = true;
                 } catch (apache::thrift::transport::TTransportException& e2) {
-                    LOG(WARNING) << "retry report workload runtime stats to " 
<< add_str
-                                 << " failed,  err: " << e2.what();
+                    LOG(WARNING)
+                            << "[report_query_statistics]retry report workload 
runtime stats to "
+                            << add_str << " failed,  reason: " << e2.what();
+                } catch (std::exception& e) {
+                    LOG_WARNING(
+                            "[report_query_statistics]unknow exception when 
report workload "
+                            "runtime statistics to {}, "
+                            "reason:{}. ",
+                            add_str, e.what());
                 }
             }
+        } catch (std::exception& e) {
+            LOG_WARNING(
+                    "[report_query_statistics]unknown exception when report 
workload runtime "
+                    "statistics to {}, reason:{}. ",
+                    add_str, e.what());
         }
     }
 
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/planner/StreamLoadPlanner.java 
b/fe/fe-core/src/main/java/org/apache/doris/planner/StreamLoadPlanner.java
index 16c9e8f32eb..26c0f1fae65 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/planner/StreamLoadPlanner.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/planner/StreamLoadPlanner.java
@@ -283,6 +283,7 @@ public class StreamLoadPlanner {
 
         params.setDescTbl(analyzer.getDescTbl().toThrift());
         params.setCoord(new 
TNetworkAddress(FrontendOptions.getLocalHostAddress(), Config.rpc_port));
+        params.setCurrentConnectFe(new 
TNetworkAddress(FrontendOptions.getLocalHostAddress(), Config.rpc_port));
 
         TPlanFragmentExecParams execParams = new TPlanFragmentExecParams();
         // user load id (streamLoadTask.id) as query id
@@ -501,6 +502,7 @@ public class StreamLoadPlanner {
 
         pipParams.setDescTbl(analyzer.getDescTbl().toThrift());
         pipParams.setCoord(new 
TNetworkAddress(FrontendOptions.getLocalHostAddress(), Config.rpc_port));
+        pipParams.setCurrentConnectFe(new 
TNetworkAddress(FrontendOptions.getLocalHostAddress(), Config.rpc_port));
         pipParams.setQueryId(loadId);
         pipParams.per_exch_num_senders = Maps.newHashMap();
         pipParams.destinations = Lists.newArrayList();
diff --git a/fe/fe-core/src/main/java/org/apache/doris/qe/ConnectContext.java 
b/fe/fe-core/src/main/java/org/apache/doris/qe/ConnectContext.java
index 508691f29a3..315973bf64d 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/qe/ConnectContext.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/qe/ConnectContext.java
@@ -187,6 +187,10 @@ public class ConnectContext {
 
     private String workloadGroupName = "";
 
+    // isProxy used for forward request from other FE and used in one thread
+    // it's default thread-safe
+    private boolean isProxy = false;
+
     public void setUserQueryTimeout(int queryTimeout) {
         if (queryTimeout > 0) {
             sessionVariable.setQueryTimeoutS(queryTimeout);
@@ -298,6 +302,7 @@ public class ConnectContext {
             mysqlChannel = new MysqlChannel(connection, this);
         } else if (isProxy) {
             mysqlChannel = new ProxyMysqlChannel();
+            this.isProxy = isProxy;
         } else {
             mysqlChannel = new DummyMysqlChannel();
         }
@@ -914,5 +919,9 @@ public class ConnectContext {
     public void setUserVars(Map<String, LiteralExpr> userVars) {
         this.userVars = userVars;
     }
+
+    public boolean isProxy() {
+        return isProxy;
+    }
 }
 
diff --git a/fe/fe-core/src/main/java/org/apache/doris/qe/Coordinator.java 
b/fe/fe-core/src/main/java/org/apache/doris/qe/Coordinator.java
index e2889310428..e50bf77650d 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/qe/Coordinator.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/qe/Coordinator.java
@@ -121,6 +121,7 @@ import io.opentelemetry.api.trace.Span;
 import io.opentelemetry.api.trace.SpanKind;
 import io.opentelemetry.context.Context;
 import io.opentelemetry.context.Scope;
+import org.apache.commons.lang3.StringUtils;
 import org.apache.commons.lang3.tuple.ImmutableTriple;
 import org.apache.commons.lang3.tuple.Triple;
 import org.apache.logging.log4j.LogManager;
@@ -178,6 +179,10 @@ public class Coordinator implements CoordInterface {
     private final TQueryGlobals queryGlobals = new TQueryGlobals();
     private TQueryOptions queryOptions;
     private TNetworkAddress coordAddress;
+    // fe audit log in connected FE,if a query is forward
+    // we should send the connected FE to be,
+    // then be report query statistics to the connected FE
+    private TNetworkAddress currentConnectFE;
 
     // protects all fields below
     private final Lock lock = new ReentrantLock();
@@ -497,6 +502,13 @@ public class Coordinator implements CoordInterface {
         }
 
         coordAddress = new TNetworkAddress(localIP, Config.rpc_port);
+        if (ConnectContext.get() != null && ConnectContext.get().isProxy() && 
!StringUtils.isEmpty(
+                ConnectContext.get().getCurrentConnectedFEIp())) {
+            currentConnectFE = new 
TNetworkAddress(ConnectContext.get().getCurrentConnectedFEIp(),
+                    Config.rpc_port);
+        } else {
+            currentConnectFE = coordAddress;
+        }
 
         this.idToBackend = Env.getCurrentSystemInfo().getIdToBackend();
         if (LOG.isDebugEnabled()) {
@@ -3206,6 +3218,7 @@ public class Coordinator implements CoordInterface {
                 params.params.setSenderId(i);
                 params.params.setNumSenders(instanceExecParams.size());
                 params.setCoord(coordAddress);
+                params.setCurrentConnectFe(currentConnectFE);
                 params.setBackendNum(backendNum++);
                 params.setQueryGlobals(queryGlobals);
                 params.setQueryOptions(queryOptions);
@@ -3292,6 +3305,7 @@ public class Coordinator implements CoordInterface {
                     params.setDestinations(destinations);
                     params.setNumSenders(instanceExecParams.size());
                     params.setCoord(coordAddress);
+                    params.setCurrentConnectFe(currentConnectFE);
                     params.setQueryGlobals(queryGlobals);
                     params.setQueryOptions(queryOptions);
                     params.query_options.setEnablePipelineEngine(true);
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/resource/workloadschedpolicy/WorkloadRuntimeStatusMgr.java
 
b/fe/fe-core/src/main/java/org/apache/doris/resource/workloadschedpolicy/WorkloadRuntimeStatusMgr.java
index 623c3c9aaa9..3e9f7a381c4 100644
--- 
a/fe/fe-core/src/main/java/org/apache/doris/resource/workloadschedpolicy/WorkloadRuntimeStatusMgr.java
+++ 
b/fe/fe-core/src/main/java/org/apache/doris/resource/workloadschedpolicy/WorkloadRuntimeStatusMgr.java
@@ -19,6 +19,7 @@ package org.apache.doris.resource.workloadschedpolicy;
 
 import org.apache.doris.catalog.Env;
 import org.apache.doris.common.Config;
+import org.apache.doris.common.Pair;
 import org.apache.doris.common.util.Daemon;
 import org.apache.doris.plugin.AuditEvent;
 import org.apache.doris.thrift.TQueryStatistics;
@@ -30,22 +31,36 @@ import org.apache.logging.log4j.LogManager;
 import org.apache.logging.log4j.Logger;
 
 import java.util.ArrayList;
-import java.util.HashSet;
 import java.util.Iterator;
 import java.util.List;
 import java.util.Map;
 import java.util.Set;
 import java.util.concurrent.locks.ReentrantReadWriteLock;
 
+// NOTE: not using a lock for beToQueryStatsMap's update because it should 
void global lock for all be
+// this may cause in some corner case missing statistics update,for example:
+// time1: clear logic judge query 1 is timeout
+// time2: query 1 is update by report
+// time3: clear logic remove query 1
+// in this case, lost query stats is allowed. because query report time out is 
60s by default,
+// when this case happens, we should first to find why be not report for so 
long.
 public class WorkloadRuntimeStatusMgr {
 
     private static final Logger LOG = 
LogManager.getLogger(WorkloadRuntimeStatusMgr.class);
-    private Map<Long, Map<String, TQueryStatistics>> beToQueryStatsMap = 
Maps.newConcurrentMap();
-    private Map<Long, Long> beLastReportTime = Maps.newConcurrentMap();
-    private Map<String, Long> queryLastReportTime = Maps.newConcurrentMap();
+    private Map<Long, BeReportInfo> beToQueryStatsMap = 
Maps.newConcurrentMap();
     private final ReentrantReadWriteLock queryAuditEventLock = new 
ReentrantReadWriteLock();
     private List<AuditEvent> queryAuditEventList = Lists.newLinkedList();
 
+    private class BeReportInfo {
+        volatile long beLastReportTime;
+
+        BeReportInfo(long beLastReportTime) {
+            this.beLastReportTime = beLastReportTime;
+        }
+
+        Map<String, Pair<Long, TQueryStatistics>> queryStatsMap = 
Maps.newConcurrentMap();
+    }
+
     class WorkloadRuntimeStatsThread extends Daemon {
 
         WorkloadRuntimeStatusMgr workloadStatsMgr;
@@ -130,41 +145,65 @@ public class WorkloadRuntimeStatusMgr {
             return;
         }
         long beId = params.backend_id;
-        Map<String, TQueryStatistics> queryIdMap = beToQueryStatsMap.get(beId);
-        beLastReportTime.put(beId, System.currentTimeMillis());
-        if (queryIdMap == null) {
-            queryIdMap = Maps.newConcurrentMap();
-            queryIdMap.putAll(params.query_statistics_map);
-            beToQueryStatsMap.put(beId, queryIdMap);
+        // NOTE(wb) one be sends update request one by one,
+        // so there is no need a global lock for beToQueryStatsMap here,
+        // just keep one be's put/remove/get is atomic operation is enough
+        long currentTime = System.currentTimeMillis();
+        BeReportInfo beReportInfo = beToQueryStatsMap.get(beId);
+        if (beReportInfo == null) {
+            beReportInfo = new BeReportInfo(currentTime);
+            beToQueryStatsMap.put(beId, beReportInfo);
         } else {
-            long currentTime = System.currentTimeMillis();
-            for (Map.Entry<String, TQueryStatistics> entry : 
params.query_statistics_map.entrySet()) {
-                queryIdMap.put(entry.getKey(), entry.getValue());
-                queryLastReportTime.put(entry.getKey(), currentTime);
+            beReportInfo.beLastReportTime = currentTime;
+        }
+        for (Map.Entry<String, TQueryStatistics> entry : 
params.query_statistics_map.entrySet()) {
+            beReportInfo.queryStatsMap.put(entry.getKey(), 
Pair.of(currentTime, (TQueryStatistics) entry.getValue()));
+        }
+    }
+
+    void clearReportTimeoutBeStatistics() {
+        // 1 clear report timeout be
+        Set<Long> currentBeIdSet = beToQueryStatsMap.keySet();
+        Long currentTime = System.currentTimeMillis();
+        for (Long beId : currentBeIdSet) {
+            BeReportInfo beReportInfo = beToQueryStatsMap.get(beId);
+            if (currentTime - beReportInfo.beLastReportTime > 
Config.be_report_query_statistics_timeout_ms) {
+                beToQueryStatsMap.remove(beId);
+                continue;
+            }
+            Set<String> queryIdSet = beReportInfo.queryStatsMap.keySet();
+            for (String queryId : queryIdSet) {
+                Pair<Long, TQueryStatistics> pair = 
beReportInfo.queryStatsMap.get(queryId);
+                long queryLastReportTime = pair.first;
+                if (currentTime - queryLastReportTime > 
Config.be_report_query_statistics_timeout_ms) {
+                    beReportInfo.queryStatsMap.remove(queryId);
+                }
             }
         }
     }
 
+    // NOTE: currently getQueryStatisticsMap must be called before clear 
beToQueryStatsMap
+    // so there is no need lock or null check when visit beToQueryStatsMap
     public Map<String, TQueryStatistics> getQueryStatisticsMap() {
         // 1 merge query stats in all be
         Set<Long> beIdSet = beToQueryStatsMap.keySet();
-        Map<String, TQueryStatistics> retQueryMap = Maps.newHashMap();
+        Map<String, TQueryStatistics> resultQueryMap = Maps.newHashMap();
         for (Long beId : beIdSet) {
-            Map<String, TQueryStatistics> currentQueryMap = 
beToQueryStatsMap.get(beId);
-            Set<String> queryIdSet = currentQueryMap.keySet();
+            BeReportInfo beReportInfo = beToQueryStatsMap.get(beId);
+            Set<String> queryIdSet = beReportInfo.queryStatsMap.keySet();
             for (String queryId : queryIdSet) {
-                TQueryStatistics retQuery = retQueryMap.get(queryId);
+                TQueryStatistics curQueryStats = 
beReportInfo.queryStatsMap.get(queryId).second;
+
+                TQueryStatistics retQuery = resultQueryMap.get(queryId);
                 if (retQuery == null) {
                     retQuery = new TQueryStatistics();
-                    retQueryMap.put(queryId, retQuery);
+                    resultQueryMap.put(queryId, retQuery);
                 }
-
-                TQueryStatistics curQueryStats = currentQueryMap.get(queryId);
                 mergeQueryStatistics(retQuery, curQueryStats);
             }
         }
 
-        return retQueryMap;
+        return resultQueryMap;
     }
 
     private void mergeQueryStatistics(TQueryStatistics dst, TQueryStatistics 
src) {
@@ -176,44 +215,6 @@ public class WorkloadRuntimeStatusMgr {
         }
     }
 
-    void clearReportTimeoutBeStatistics() {
-        // 1 clear report timeout be
-        Set<Long> beNeedToRemove = new HashSet<>();
-        Set<Long> currentBeIdSet = beToQueryStatsMap.keySet();
-        Long currentTime = System.currentTimeMillis();
-        for (Long beId : currentBeIdSet) {
-            Long lastReportTime = beLastReportTime.get(beId);
-            if (lastReportTime != null
-                    && currentTime - lastReportTime > 
Config.be_report_query_statistics_timeout_ms) {
-                beNeedToRemove.add(beId);
-            }
-        }
-        for (Long beId : beNeedToRemove) {
-            beToQueryStatsMap.remove(beId);
-            beLastReportTime.remove(beId);
-        }
-
-        // 2 clear report timeout query
-        Set<String> queryNeedToClear = new HashSet<>();
-        Long newCurrentTime = System.currentTimeMillis();
-        Set<String> queryLastReportTimeKeySet = queryLastReportTime.keySet();
-        for (String queryId : queryLastReportTimeKeySet) {
-            Long lastReportTime = queryLastReportTime.get(queryId);
-            if (lastReportTime != null
-                    && newCurrentTime - lastReportTime > 
Config.be_report_query_statistics_timeout_ms) {
-                queryNeedToClear.add(queryId);
-            }
-        }
-
-        Set<Long> beIdSet = beToQueryStatsMap.keySet();
-        for (String queryId : queryNeedToClear) {
-            for (Long beId : beIdSet) {
-                beToQueryStatsMap.get(beId).remove(queryId);
-            }
-            queryLastReportTime.remove(queryId);
-        }
-    }
-
     private void queryAuditEventLogWriteLock() {
         queryAuditEventLock.writeLock().lock();
     }
diff --git a/gensrc/thrift/PaloInternalService.thrift 
b/gensrc/thrift/PaloInternalService.thrift
index 4185e8a4276..4d0e7d01b86 100644
--- a/gensrc/thrift/PaloInternalService.thrift
+++ b/gensrc/thrift/PaloInternalService.thrift
@@ -458,6 +458,8 @@ struct TExecPlanFragmentParams {
 
   // scan node id -> scan range params, only for external file scan
   24: optional map<Types.TPlanNodeId, PlanNodes.TFileScanRangeParams> 
file_scan_params
+
+  32: optional Types.TNetworkAddress current_connect_fe
 }
 
 struct TExecPlanFragmentParamsList {
@@ -672,6 +674,7 @@ struct TPipelineFragmentParams {
   28: optional string table_name
   // scan node id -> scan range params, only for external file scan
   29: optional map<Types.TPlanNodeId, PlanNodes.TFileScanRangeParams> 
file_scan_params
+  43: optional Types.TNetworkAddress current_connect_fe
 }
 
 struct TPipelineFragmentParamsList {


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org
For additional commands, e-mail: commits-h...@doris.apache.org

Reply via email to