This is an automated email from the ASF dual-hosted git repository.

wangbo pushed a commit to branch 2.0.10-decimal-patch
in repository https://gitbox.apache.org/repos/asf/doris.git


The following commit(s) were added to refs/heads/2.0.10-decimal-patch by this 
push:
     new 25fc5d24826 Fix insert select missing audit log when connect follower 
FE (#36597)
25fc5d24826 is described below

commit 25fc5d24826481ce080c7b36ed2ea8f6cc97a883
Author: wangbo <wan...@apache.org>
AuthorDate: Thu Jun 20 15:23:24 2024 +0800

    Fix insert select missing audit log when connect follower FE (#36597)
    
    ## Proposed changes
    
    pick #36454
---
 be/src/runtime/fragment_mgr.cpp                    |   4 +-
 be/src/runtime/query_context.h                     |   7 +-
 be/src/runtime/runtime_query_statistics_mgr.cpp    |  34 ++++--
 .../apache/doris/planner/StreamLoadPlanner.java    |   2 +
 .../java/org/apache/doris/qe/ConnectContext.java   |   9 ++
 .../main/java/org/apache/doris/qe/Coordinator.java |  14 +++
 .../WorkloadRuntimeStatusMgr.java                  | 121 +++++++++++----------
 gensrc/thrift/PaloInternalService.thrift           |   3 +
 8 files changed, 119 insertions(+), 75 deletions(-)

diff --git a/be/src/runtime/fragment_mgr.cpp b/be/src/runtime/fragment_mgr.cpp
index 1529d66def2..66538529c3f 100644
--- a/be/src/runtime/fragment_mgr.cpp
+++ b/be/src/runtime/fragment_mgr.cpp
@@ -692,9 +692,11 @@ Status FragmentMgr::_get_query_ctx(const Params& params, 
TUniqueId query_id, boo
         }
 
         query_ctx->coord_addr = params.coord;
+        query_ctx->current_connect_fe = params.current_connect_fe;
         LOG(INFO) << "query_id: " << UniqueId(query_ctx->query_id.hi, 
query_ctx->query_id.lo)
                   << " coord_addr " << query_ctx->coord_addr
-                  << " total fragment num on current host: " << 
params.fragment_num_on_host;
+                  << " total fragment num on current host: " << 
params.fragment_num_on_host
+                  << " report audit fe:" << params.current_connect_fe;
         query_ctx->query_globals = params.query_globals;
 
         if (params.__isset.resource_info) {
diff --git a/be/src/runtime/query_context.h b/be/src/runtime/query_context.h
index 8746483df4c..e47e09e5921 100644
--- a/be/src/runtime/query_context.h
+++ b/be/src/runtime/query_context.h
@@ -182,7 +182,7 @@ public:
 
     void register_query_statistics(std::shared_ptr<QueryStatistics> qs) {
         
_exec_env->runtime_query_statistics_mgr()->register_query_statistics(print_id(query_id),
 qs,
-                                                                             
coord_addr);
+                                                                             
current_connect_fe);
     }
 
     std::shared_ptr<QueryStatistics> get_query_statistics() {
@@ -198,7 +198,7 @@ public:
                 if (_exec_env &&
                     _exec_env->runtime_query_statistics_mgr()) { // for ut 
FragmentMgrTest.normal
                     
_exec_env->runtime_query_statistics_mgr()->register_query_statistics(
-                            query_id_str, qs, coord_addr);
+                            query_id_str, qs, current_connect_fe);
                 }
             } else {
                 LOG(INFO) << " query " << query_id_str << " get memory query 
statistics failed ";
@@ -212,7 +212,7 @@ public:
             if (_exec_env &&
                 _exec_env->runtime_query_statistics_mgr()) { // for ut 
FragmentMgrTest.normal
                 
_exec_env->runtime_query_statistics_mgr()->register_query_statistics(
-                        print_id(query_id), _cpu_statistics, coord_addr);
+                        print_id(query_id), _cpu_statistics, 
current_connect_fe);
             }
         }
     }
@@ -226,6 +226,7 @@ public:
     std::string user;
     std::string group;
     TNetworkAddress coord_addr;
+    TNetworkAddress current_connect_fe;
     TQueryGlobals query_globals;
 
     /// In the current implementation, for multiple fragments executed by a 
query on the same BE node,
diff --git a/be/src/runtime/runtime_query_statistics_mgr.cpp 
b/be/src/runtime/runtime_query_statistics_mgr.cpp
index 5c40ea61763..0ed8cbeb79c 100644
--- a/be/src/runtime/runtime_query_statistics_mgr.cpp
+++ b/be/src/runtime/runtime_query_statistics_mgr.cpp
@@ -83,8 +83,8 @@ void 
RuntimeQueryStatiticsMgr::report_runtime_query_statistics() {
 
         if (!coord_status.ok()) {
             std::stringstream ss;
-            LOG(WARNING) << "could not get client " << add_str
-                         << " when report workload runtime stats, reason is "
+            LOG(WARNING) << "[report_query_statistics]could not get client " 
<< add_str
+                         << " when report workload runtime stats, reason:"
                          << coord_status.to_string();
             continue;
         }
@@ -103,26 +103,38 @@ void 
RuntimeQueryStatiticsMgr::report_runtime_query_statistics() {
             coord->reportExecStatus(res, params);
             rpc_result[addr] = true;
         } catch (apache::thrift::TApplicationException& e) {
-            LOG(WARNING) << "fe " << add_str
-                         << " throw exception when report statistics, reason=" 
<< e.what()
+            LOG(WARNING) << "[report_query_statistics]fe " << add_str
+                         << " throw exception when report statistics, reason:" 
<< e.what()
                          << " , you can see fe log for details.";
         } catch (apache::thrift::transport::TTransportException& e) {
-            LOG(WARNING) << "report workload runtime statistics to " << add_str
-                         << " failed,  err: " << e.what();
+            LOG(WARNING) << "[report_query_statistics]report workload runtime 
statistics to "
+                         << add_str << " failed,  reason: " << e.what();
             rpc_status = coord.reopen();
             if (!rpc_status.ok()) {
-                LOG(WARNING)
-                        << "reopen thrift client failed when report workload 
runtime statistics to"
-                        << add_str;
+                LOG(WARNING) << "[report_query_statistics]reopen thrift client 
failed when report "
+                                "workload runtime statistics to"
+                             << add_str;
             } else {
                 try {
                     coord->reportExecStatus(res, params);
                     rpc_result[addr] = true;
                 } catch (apache::thrift::transport::TTransportException& e2) {
-                    LOG(WARNING) << "retry report workload runtime stats to " 
<< add_str
-                                 << " failed,  err: " << e2.what();
+                    LOG(WARNING)
+                            << "[report_query_statistics]retry report workload 
runtime stats to "
+                            << add_str << " failed,  reason: " << e2.what();
+                } catch (std::exception& e) {
+                    LOG_WARNING(
+                            "[report_query_statistics]unknow exception when 
report workload "
+                            "runtime statistics to {}, "
+                            "reason:{}. ",
+                            add_str, e.what());
                 }
             }
+        } catch (std::exception& e) {
+            LOG_WARNING(
+                    "[report_query_statistics]unknown exception when report 
workload runtime "
+                    "statistics to {}, reason:{}. ",
+                    add_str, e.what());
         }
     }
 
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/planner/StreamLoadPlanner.java 
b/fe/fe-core/src/main/java/org/apache/doris/planner/StreamLoadPlanner.java
index 16c9e8f32eb..26c0f1fae65 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/planner/StreamLoadPlanner.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/planner/StreamLoadPlanner.java
@@ -283,6 +283,7 @@ public class StreamLoadPlanner {
 
         params.setDescTbl(analyzer.getDescTbl().toThrift());
         params.setCoord(new 
TNetworkAddress(FrontendOptions.getLocalHostAddress(), Config.rpc_port));
+        params.setCurrentConnectFe(new 
TNetworkAddress(FrontendOptions.getLocalHostAddress(), Config.rpc_port));
 
         TPlanFragmentExecParams execParams = new TPlanFragmentExecParams();
         // user load id (streamLoadTask.id) as query id
@@ -501,6 +502,7 @@ public class StreamLoadPlanner {
 
         pipParams.setDescTbl(analyzer.getDescTbl().toThrift());
         pipParams.setCoord(new 
TNetworkAddress(FrontendOptions.getLocalHostAddress(), Config.rpc_port));
+        pipParams.setCurrentConnectFe(new 
TNetworkAddress(FrontendOptions.getLocalHostAddress(), Config.rpc_port));
         pipParams.setQueryId(loadId);
         pipParams.per_exch_num_senders = Maps.newHashMap();
         pipParams.destinations = Lists.newArrayList();
diff --git a/fe/fe-core/src/main/java/org/apache/doris/qe/ConnectContext.java 
b/fe/fe-core/src/main/java/org/apache/doris/qe/ConnectContext.java
index 2840021aea9..217c9633153 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/qe/ConnectContext.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/qe/ConnectContext.java
@@ -187,6 +187,10 @@ public class ConnectContext {
 
     private String workloadGroupName = "";
 
+    // isProxy used for forward request from other FE and used in one thread
+    // it's default thread-safe
+    private boolean isProxy = false;
+
     public void setUserQueryTimeout(int queryTimeout) {
         if (queryTimeout > 0) {
             sessionVariable.setQueryTimeoutS(queryTimeout);
@@ -298,6 +302,7 @@ public class ConnectContext {
             mysqlChannel = new MysqlChannel(connection, this);
         } else if (isProxy) {
             mysqlChannel = new ProxyMysqlChannel();
+            this.isProxy = isProxy;
         } else {
             mysqlChannel = new DummyMysqlChannel();
         }
@@ -914,5 +919,9 @@ public class ConnectContext {
     public void setUserVars(Map<String, LiteralExpr> userVars) {
         this.userVars = userVars;
     }
+
+    public boolean isProxy() {
+        return isProxy;
+    }
 }
 
diff --git a/fe/fe-core/src/main/java/org/apache/doris/qe/Coordinator.java 
b/fe/fe-core/src/main/java/org/apache/doris/qe/Coordinator.java
index e2889310428..e50bf77650d 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/qe/Coordinator.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/qe/Coordinator.java
@@ -121,6 +121,7 @@ import io.opentelemetry.api.trace.Span;
 import io.opentelemetry.api.trace.SpanKind;
 import io.opentelemetry.context.Context;
 import io.opentelemetry.context.Scope;
+import org.apache.commons.lang3.StringUtils;
 import org.apache.commons.lang3.tuple.ImmutableTriple;
 import org.apache.commons.lang3.tuple.Triple;
 import org.apache.logging.log4j.LogManager;
@@ -178,6 +179,10 @@ public class Coordinator implements CoordInterface {
     private final TQueryGlobals queryGlobals = new TQueryGlobals();
     private TQueryOptions queryOptions;
     private TNetworkAddress coordAddress;
+    // fe audit log in connected FE,if a query is forward
+    // we should send the connected FE to be,
+    // then be report query statistics to the connected FE
+    private TNetworkAddress currentConnectFE;
 
     // protects all fields below
     private final Lock lock = new ReentrantLock();
@@ -497,6 +502,13 @@ public class Coordinator implements CoordInterface {
         }
 
         coordAddress = new TNetworkAddress(localIP, Config.rpc_port);
+        if (ConnectContext.get() != null && ConnectContext.get().isProxy() && 
!StringUtils.isEmpty(
+                ConnectContext.get().getCurrentConnectedFEIp())) {
+            currentConnectFE = new 
TNetworkAddress(ConnectContext.get().getCurrentConnectedFEIp(),
+                    Config.rpc_port);
+        } else {
+            currentConnectFE = coordAddress;
+        }
 
         this.idToBackend = Env.getCurrentSystemInfo().getIdToBackend();
         if (LOG.isDebugEnabled()) {
@@ -3206,6 +3218,7 @@ public class Coordinator implements CoordInterface {
                 params.params.setSenderId(i);
                 params.params.setNumSenders(instanceExecParams.size());
                 params.setCoord(coordAddress);
+                params.setCurrentConnectFe(currentConnectFE);
                 params.setBackendNum(backendNum++);
                 params.setQueryGlobals(queryGlobals);
                 params.setQueryOptions(queryOptions);
@@ -3292,6 +3305,7 @@ public class Coordinator implements CoordInterface {
                     params.setDestinations(destinations);
                     params.setNumSenders(instanceExecParams.size());
                     params.setCoord(coordAddress);
+                    params.setCurrentConnectFe(currentConnectFE);
                     params.setQueryGlobals(queryGlobals);
                     params.setQueryOptions(queryOptions);
                     params.query_options.setEnablePipelineEngine(true);
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/resource/workloadschedpolicy/WorkloadRuntimeStatusMgr.java
 
b/fe/fe-core/src/main/java/org/apache/doris/resource/workloadschedpolicy/WorkloadRuntimeStatusMgr.java
index 623c3c9aaa9..3e9f7a381c4 100644
--- 
a/fe/fe-core/src/main/java/org/apache/doris/resource/workloadschedpolicy/WorkloadRuntimeStatusMgr.java
+++ 
b/fe/fe-core/src/main/java/org/apache/doris/resource/workloadschedpolicy/WorkloadRuntimeStatusMgr.java
@@ -19,6 +19,7 @@ package org.apache.doris.resource.workloadschedpolicy;
 
 import org.apache.doris.catalog.Env;
 import org.apache.doris.common.Config;
+import org.apache.doris.common.Pair;
 import org.apache.doris.common.util.Daemon;
 import org.apache.doris.plugin.AuditEvent;
 import org.apache.doris.thrift.TQueryStatistics;
@@ -30,22 +31,36 @@ import org.apache.logging.log4j.LogManager;
 import org.apache.logging.log4j.Logger;
 
 import java.util.ArrayList;
-import java.util.HashSet;
 import java.util.Iterator;
 import java.util.List;
 import java.util.Map;
 import java.util.Set;
 import java.util.concurrent.locks.ReentrantReadWriteLock;
 
+// NOTE: not using a lock for beToQueryStatsMap's update because it should 
void global lock for all be
+// this may cause in some corner case missing statistics update,for example:
+// time1: clear logic judge query 1 is timeout
+// time2: query 1 is update by report
+// time3: clear logic remove query 1
+// in this case, lost query stats is allowed. because query report time out is 
60s by default,
+// when this case happens, we should first to find why be not report for so 
long.
 public class WorkloadRuntimeStatusMgr {
 
     private static final Logger LOG = 
LogManager.getLogger(WorkloadRuntimeStatusMgr.class);
-    private Map<Long, Map<String, TQueryStatistics>> beToQueryStatsMap = 
Maps.newConcurrentMap();
-    private Map<Long, Long> beLastReportTime = Maps.newConcurrentMap();
-    private Map<String, Long> queryLastReportTime = Maps.newConcurrentMap();
+    private Map<Long, BeReportInfo> beToQueryStatsMap = 
Maps.newConcurrentMap();
     private final ReentrantReadWriteLock queryAuditEventLock = new 
ReentrantReadWriteLock();
     private List<AuditEvent> queryAuditEventList = Lists.newLinkedList();
 
+    private class BeReportInfo {
+        volatile long beLastReportTime;
+
+        BeReportInfo(long beLastReportTime) {
+            this.beLastReportTime = beLastReportTime;
+        }
+
+        Map<String, Pair<Long, TQueryStatistics>> queryStatsMap = 
Maps.newConcurrentMap();
+    }
+
     class WorkloadRuntimeStatsThread extends Daemon {
 
         WorkloadRuntimeStatusMgr workloadStatsMgr;
@@ -130,41 +145,65 @@ public class WorkloadRuntimeStatusMgr {
             return;
         }
         long beId = params.backend_id;
-        Map<String, TQueryStatistics> queryIdMap = beToQueryStatsMap.get(beId);
-        beLastReportTime.put(beId, System.currentTimeMillis());
-        if (queryIdMap == null) {
-            queryIdMap = Maps.newConcurrentMap();
-            queryIdMap.putAll(params.query_statistics_map);
-            beToQueryStatsMap.put(beId, queryIdMap);
+        // NOTE(wb) one be sends update request one by one,
+        // so there is no need a global lock for beToQueryStatsMap here,
+        // just keep one be's put/remove/get is atomic operation is enough
+        long currentTime = System.currentTimeMillis();
+        BeReportInfo beReportInfo = beToQueryStatsMap.get(beId);
+        if (beReportInfo == null) {
+            beReportInfo = new BeReportInfo(currentTime);
+            beToQueryStatsMap.put(beId, beReportInfo);
         } else {
-            long currentTime = System.currentTimeMillis();
-            for (Map.Entry<String, TQueryStatistics> entry : 
params.query_statistics_map.entrySet()) {
-                queryIdMap.put(entry.getKey(), entry.getValue());
-                queryLastReportTime.put(entry.getKey(), currentTime);
+            beReportInfo.beLastReportTime = currentTime;
+        }
+        for (Map.Entry<String, TQueryStatistics> entry : 
params.query_statistics_map.entrySet()) {
+            beReportInfo.queryStatsMap.put(entry.getKey(), 
Pair.of(currentTime, (TQueryStatistics) entry.getValue()));
+        }
+    }
+
+    void clearReportTimeoutBeStatistics() {
+        // 1 clear report timeout be
+        Set<Long> currentBeIdSet = beToQueryStatsMap.keySet();
+        Long currentTime = System.currentTimeMillis();
+        for (Long beId : currentBeIdSet) {
+            BeReportInfo beReportInfo = beToQueryStatsMap.get(beId);
+            if (currentTime - beReportInfo.beLastReportTime > 
Config.be_report_query_statistics_timeout_ms) {
+                beToQueryStatsMap.remove(beId);
+                continue;
+            }
+            Set<String> queryIdSet = beReportInfo.queryStatsMap.keySet();
+            for (String queryId : queryIdSet) {
+                Pair<Long, TQueryStatistics> pair = 
beReportInfo.queryStatsMap.get(queryId);
+                long queryLastReportTime = pair.first;
+                if (currentTime - queryLastReportTime > 
Config.be_report_query_statistics_timeout_ms) {
+                    beReportInfo.queryStatsMap.remove(queryId);
+                }
             }
         }
     }
 
+    // NOTE: currently getQueryStatisticsMap must be called before clear 
beToQueryStatsMap
+    // so there is no need lock or null check when visit beToQueryStatsMap
     public Map<String, TQueryStatistics> getQueryStatisticsMap() {
         // 1 merge query stats in all be
         Set<Long> beIdSet = beToQueryStatsMap.keySet();
-        Map<String, TQueryStatistics> retQueryMap = Maps.newHashMap();
+        Map<String, TQueryStatistics> resultQueryMap = Maps.newHashMap();
         for (Long beId : beIdSet) {
-            Map<String, TQueryStatistics> currentQueryMap = 
beToQueryStatsMap.get(beId);
-            Set<String> queryIdSet = currentQueryMap.keySet();
+            BeReportInfo beReportInfo = beToQueryStatsMap.get(beId);
+            Set<String> queryIdSet = beReportInfo.queryStatsMap.keySet();
             for (String queryId : queryIdSet) {
-                TQueryStatistics retQuery = retQueryMap.get(queryId);
+                TQueryStatistics curQueryStats = 
beReportInfo.queryStatsMap.get(queryId).second;
+
+                TQueryStatistics retQuery = resultQueryMap.get(queryId);
                 if (retQuery == null) {
                     retQuery = new TQueryStatistics();
-                    retQueryMap.put(queryId, retQuery);
+                    resultQueryMap.put(queryId, retQuery);
                 }
-
-                TQueryStatistics curQueryStats = currentQueryMap.get(queryId);
                 mergeQueryStatistics(retQuery, curQueryStats);
             }
         }
 
-        return retQueryMap;
+        return resultQueryMap;
     }
 
     private void mergeQueryStatistics(TQueryStatistics dst, TQueryStatistics 
src) {
@@ -176,44 +215,6 @@ public class WorkloadRuntimeStatusMgr {
         }
     }
 
-    void clearReportTimeoutBeStatistics() {
-        // 1 clear report timeout be
-        Set<Long> beNeedToRemove = new HashSet<>();
-        Set<Long> currentBeIdSet = beToQueryStatsMap.keySet();
-        Long currentTime = System.currentTimeMillis();
-        for (Long beId : currentBeIdSet) {
-            Long lastReportTime = beLastReportTime.get(beId);
-            if (lastReportTime != null
-                    && currentTime - lastReportTime > 
Config.be_report_query_statistics_timeout_ms) {
-                beNeedToRemove.add(beId);
-            }
-        }
-        for (Long beId : beNeedToRemove) {
-            beToQueryStatsMap.remove(beId);
-            beLastReportTime.remove(beId);
-        }
-
-        // 2 clear report timeout query
-        Set<String> queryNeedToClear = new HashSet<>();
-        Long newCurrentTime = System.currentTimeMillis();
-        Set<String> queryLastReportTimeKeySet = queryLastReportTime.keySet();
-        for (String queryId : queryLastReportTimeKeySet) {
-            Long lastReportTime = queryLastReportTime.get(queryId);
-            if (lastReportTime != null
-                    && newCurrentTime - lastReportTime > 
Config.be_report_query_statistics_timeout_ms) {
-                queryNeedToClear.add(queryId);
-            }
-        }
-
-        Set<Long> beIdSet = beToQueryStatsMap.keySet();
-        for (String queryId : queryNeedToClear) {
-            for (Long beId : beIdSet) {
-                beToQueryStatsMap.get(beId).remove(queryId);
-            }
-            queryLastReportTime.remove(queryId);
-        }
-    }
-
     private void queryAuditEventLogWriteLock() {
         queryAuditEventLock.writeLock().lock();
     }
diff --git a/gensrc/thrift/PaloInternalService.thrift 
b/gensrc/thrift/PaloInternalService.thrift
index 22422aeabac..4b6a19783f7 100644
--- a/gensrc/thrift/PaloInternalService.thrift
+++ b/gensrc/thrift/PaloInternalService.thrift
@@ -453,6 +453,8 @@ struct TExecPlanFragmentParams {
 
   // scan node id -> scan range params, only for external file scan
   24: optional map<Types.TPlanNodeId, PlanNodes.TFileScanRangeParams> 
file_scan_params
+
+  32: optional Types.TNetworkAddress current_connect_fe
 }
 
 struct TExecPlanFragmentParamsList {
@@ -667,6 +669,7 @@ struct TPipelineFragmentParams {
   28: optional string table_name
   // scan node id -> scan range params, only for external file scan
   29: optional map<Types.TPlanNodeId, PlanNodes.TFileScanRangeParams> 
file_scan_params
+  43: optional Types.TNetworkAddress current_connect_fe
 }
 
 struct TPipelineFragmentParamsList {


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org
For additional commands, e-mail: commits-h...@doris.apache.org

Reply via email to