This is an automated email from the ASF dual-hosted git repository. wangbo pushed a commit to branch branch-2.0 in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/branch-2.0 by this push: new fb789f1dddd [Fix]Fix insert select missing audit log when connect follower FE (#36454) fb789f1dddd is described below commit fb789f1ddddf5719c1de6354f4ae19f5f71b8e2f Author: wangbo <wan...@apache.org> AuthorDate: Thu Jun 20 15:16:14 2024 +0800 [Fix]Fix insert select missing audit log when connect follower FE (#36454) ## Proposed changes Fix when a ```insert select``` is executed in Follower, audit log could missing query statistics. This is because ```audit log``` is logged in the connect FE, but request is forward to master FE, then the coord FE is master FE, BE report query statistics to cood FE, finally the connected Follower could not get reported query statistics, audit log missing query statistics. We can add a new field to mark client connected FE, then be report query statistics to the connected FE. Besides, I do refactor for FE's WorkloadRuntimeStatusMgr to make logic more clear and add some log in be. --- be/src/runtime/fragment_mgr.cpp | 4 +- be/src/runtime/query_context.h | 7 +- be/src/runtime/runtime_query_statistics_mgr.cpp | 34 ++++-- .../apache/doris/planner/StreamLoadPlanner.java | 2 + .../java/org/apache/doris/qe/ConnectContext.java | 9 ++ .../main/java/org/apache/doris/qe/Coordinator.java | 14 +++ .../WorkloadRuntimeStatusMgr.java | 121 +++++++++++---------- gensrc/thrift/PaloInternalService.thrift | 3 + 8 files changed, 119 insertions(+), 75 deletions(-) diff --git a/be/src/runtime/fragment_mgr.cpp b/be/src/runtime/fragment_mgr.cpp index 1529d66def2..66538529c3f 100644 --- a/be/src/runtime/fragment_mgr.cpp +++ b/be/src/runtime/fragment_mgr.cpp @@ -692,9 +692,11 @@ Status FragmentMgr::_get_query_ctx(const Params& params, TUniqueId query_id, boo } query_ctx->coord_addr = params.coord; + query_ctx->current_connect_fe = params.current_connect_fe; LOG(INFO) << "query_id: " << UniqueId(query_ctx->query_id.hi, query_ctx->query_id.lo) << " coord_addr " << query_ctx->coord_addr - << " total fragment num on current host: " << params.fragment_num_on_host; + << " total fragment num on current host: " << params.fragment_num_on_host + << " report audit fe:" << params.current_connect_fe; query_ctx->query_globals = params.query_globals; if (params.__isset.resource_info) { diff --git a/be/src/runtime/query_context.h b/be/src/runtime/query_context.h index 8746483df4c..e47e09e5921 100644 --- a/be/src/runtime/query_context.h +++ b/be/src/runtime/query_context.h @@ -182,7 +182,7 @@ public: void register_query_statistics(std::shared_ptr<QueryStatistics> qs) { _exec_env->runtime_query_statistics_mgr()->register_query_statistics(print_id(query_id), qs, - coord_addr); + current_connect_fe); } std::shared_ptr<QueryStatistics> get_query_statistics() { @@ -198,7 +198,7 @@ public: if (_exec_env && _exec_env->runtime_query_statistics_mgr()) { // for ut FragmentMgrTest.normal _exec_env->runtime_query_statistics_mgr()->register_query_statistics( - query_id_str, qs, coord_addr); + query_id_str, qs, current_connect_fe); } } else { LOG(INFO) << " query " << query_id_str << " get memory query statistics failed "; @@ -212,7 +212,7 @@ public: if (_exec_env && _exec_env->runtime_query_statistics_mgr()) { // for ut FragmentMgrTest.normal _exec_env->runtime_query_statistics_mgr()->register_query_statistics( - print_id(query_id), _cpu_statistics, coord_addr); + print_id(query_id), _cpu_statistics, current_connect_fe); } } } @@ -226,6 +226,7 @@ public: std::string user; std::string group; TNetworkAddress coord_addr; + TNetworkAddress current_connect_fe; TQueryGlobals query_globals; /// In the current implementation, for multiple fragments executed by a query on the same BE node, diff --git a/be/src/runtime/runtime_query_statistics_mgr.cpp b/be/src/runtime/runtime_query_statistics_mgr.cpp index 5c40ea61763..0ed8cbeb79c 100644 --- a/be/src/runtime/runtime_query_statistics_mgr.cpp +++ b/be/src/runtime/runtime_query_statistics_mgr.cpp @@ -83,8 +83,8 @@ void RuntimeQueryStatiticsMgr::report_runtime_query_statistics() { if (!coord_status.ok()) { std::stringstream ss; - LOG(WARNING) << "could not get client " << add_str - << " when report workload runtime stats, reason is " + LOG(WARNING) << "[report_query_statistics]could not get client " << add_str + << " when report workload runtime stats, reason:" << coord_status.to_string(); continue; } @@ -103,26 +103,38 @@ void RuntimeQueryStatiticsMgr::report_runtime_query_statistics() { coord->reportExecStatus(res, params); rpc_result[addr] = true; } catch (apache::thrift::TApplicationException& e) { - LOG(WARNING) << "fe " << add_str - << " throw exception when report statistics, reason=" << e.what() + LOG(WARNING) << "[report_query_statistics]fe " << add_str + << " throw exception when report statistics, reason:" << e.what() << " , you can see fe log for details."; } catch (apache::thrift::transport::TTransportException& e) { - LOG(WARNING) << "report workload runtime statistics to " << add_str - << " failed, err: " << e.what(); + LOG(WARNING) << "[report_query_statistics]report workload runtime statistics to " + << add_str << " failed, reason: " << e.what(); rpc_status = coord.reopen(); if (!rpc_status.ok()) { - LOG(WARNING) - << "reopen thrift client failed when report workload runtime statistics to" - << add_str; + LOG(WARNING) << "[report_query_statistics]reopen thrift client failed when report " + "workload runtime statistics to" + << add_str; } else { try { coord->reportExecStatus(res, params); rpc_result[addr] = true; } catch (apache::thrift::transport::TTransportException& e2) { - LOG(WARNING) << "retry report workload runtime stats to " << add_str - << " failed, err: " << e2.what(); + LOG(WARNING) + << "[report_query_statistics]retry report workload runtime stats to " + << add_str << " failed, reason: " << e2.what(); + } catch (std::exception& e) { + LOG_WARNING( + "[report_query_statistics]unknow exception when report workload " + "runtime statistics to {}, " + "reason:{}. ", + add_str, e.what()); } } + } catch (std::exception& e) { + LOG_WARNING( + "[report_query_statistics]unknown exception when report workload runtime " + "statistics to {}, reason:{}. ", + add_str, e.what()); } } diff --git a/fe/fe-core/src/main/java/org/apache/doris/planner/StreamLoadPlanner.java b/fe/fe-core/src/main/java/org/apache/doris/planner/StreamLoadPlanner.java index 16c9e8f32eb..26c0f1fae65 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/planner/StreamLoadPlanner.java +++ b/fe/fe-core/src/main/java/org/apache/doris/planner/StreamLoadPlanner.java @@ -283,6 +283,7 @@ public class StreamLoadPlanner { params.setDescTbl(analyzer.getDescTbl().toThrift()); params.setCoord(new TNetworkAddress(FrontendOptions.getLocalHostAddress(), Config.rpc_port)); + params.setCurrentConnectFe(new TNetworkAddress(FrontendOptions.getLocalHostAddress(), Config.rpc_port)); TPlanFragmentExecParams execParams = new TPlanFragmentExecParams(); // user load id (streamLoadTask.id) as query id @@ -501,6 +502,7 @@ public class StreamLoadPlanner { pipParams.setDescTbl(analyzer.getDescTbl().toThrift()); pipParams.setCoord(new TNetworkAddress(FrontendOptions.getLocalHostAddress(), Config.rpc_port)); + pipParams.setCurrentConnectFe(new TNetworkAddress(FrontendOptions.getLocalHostAddress(), Config.rpc_port)); pipParams.setQueryId(loadId); pipParams.per_exch_num_senders = Maps.newHashMap(); pipParams.destinations = Lists.newArrayList(); diff --git a/fe/fe-core/src/main/java/org/apache/doris/qe/ConnectContext.java b/fe/fe-core/src/main/java/org/apache/doris/qe/ConnectContext.java index 508691f29a3..315973bf64d 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/qe/ConnectContext.java +++ b/fe/fe-core/src/main/java/org/apache/doris/qe/ConnectContext.java @@ -187,6 +187,10 @@ public class ConnectContext { private String workloadGroupName = ""; + // isProxy used for forward request from other FE and used in one thread + // it's default thread-safe + private boolean isProxy = false; + public void setUserQueryTimeout(int queryTimeout) { if (queryTimeout > 0) { sessionVariable.setQueryTimeoutS(queryTimeout); @@ -298,6 +302,7 @@ public class ConnectContext { mysqlChannel = new MysqlChannel(connection, this); } else if (isProxy) { mysqlChannel = new ProxyMysqlChannel(); + this.isProxy = isProxy; } else { mysqlChannel = new DummyMysqlChannel(); } @@ -914,5 +919,9 @@ public class ConnectContext { public void setUserVars(Map<String, LiteralExpr> userVars) { this.userVars = userVars; } + + public boolean isProxy() { + return isProxy; + } } diff --git a/fe/fe-core/src/main/java/org/apache/doris/qe/Coordinator.java b/fe/fe-core/src/main/java/org/apache/doris/qe/Coordinator.java index e2889310428..e50bf77650d 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/qe/Coordinator.java +++ b/fe/fe-core/src/main/java/org/apache/doris/qe/Coordinator.java @@ -121,6 +121,7 @@ import io.opentelemetry.api.trace.Span; import io.opentelemetry.api.trace.SpanKind; import io.opentelemetry.context.Context; import io.opentelemetry.context.Scope; +import org.apache.commons.lang3.StringUtils; import org.apache.commons.lang3.tuple.ImmutableTriple; import org.apache.commons.lang3.tuple.Triple; import org.apache.logging.log4j.LogManager; @@ -178,6 +179,10 @@ public class Coordinator implements CoordInterface { private final TQueryGlobals queryGlobals = new TQueryGlobals(); private TQueryOptions queryOptions; private TNetworkAddress coordAddress; + // fe audit log in connected FE,if a query is forward + // we should send the connected FE to be, + // then be report query statistics to the connected FE + private TNetworkAddress currentConnectFE; // protects all fields below private final Lock lock = new ReentrantLock(); @@ -497,6 +502,13 @@ public class Coordinator implements CoordInterface { } coordAddress = new TNetworkAddress(localIP, Config.rpc_port); + if (ConnectContext.get() != null && ConnectContext.get().isProxy() && !StringUtils.isEmpty( + ConnectContext.get().getCurrentConnectedFEIp())) { + currentConnectFE = new TNetworkAddress(ConnectContext.get().getCurrentConnectedFEIp(), + Config.rpc_port); + } else { + currentConnectFE = coordAddress; + } this.idToBackend = Env.getCurrentSystemInfo().getIdToBackend(); if (LOG.isDebugEnabled()) { @@ -3206,6 +3218,7 @@ public class Coordinator implements CoordInterface { params.params.setSenderId(i); params.params.setNumSenders(instanceExecParams.size()); params.setCoord(coordAddress); + params.setCurrentConnectFe(currentConnectFE); params.setBackendNum(backendNum++); params.setQueryGlobals(queryGlobals); params.setQueryOptions(queryOptions); @@ -3292,6 +3305,7 @@ public class Coordinator implements CoordInterface { params.setDestinations(destinations); params.setNumSenders(instanceExecParams.size()); params.setCoord(coordAddress); + params.setCurrentConnectFe(currentConnectFE); params.setQueryGlobals(queryGlobals); params.setQueryOptions(queryOptions); params.query_options.setEnablePipelineEngine(true); diff --git a/fe/fe-core/src/main/java/org/apache/doris/resource/workloadschedpolicy/WorkloadRuntimeStatusMgr.java b/fe/fe-core/src/main/java/org/apache/doris/resource/workloadschedpolicy/WorkloadRuntimeStatusMgr.java index 623c3c9aaa9..3e9f7a381c4 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/resource/workloadschedpolicy/WorkloadRuntimeStatusMgr.java +++ b/fe/fe-core/src/main/java/org/apache/doris/resource/workloadschedpolicy/WorkloadRuntimeStatusMgr.java @@ -19,6 +19,7 @@ package org.apache.doris.resource.workloadschedpolicy; import org.apache.doris.catalog.Env; import org.apache.doris.common.Config; +import org.apache.doris.common.Pair; import org.apache.doris.common.util.Daemon; import org.apache.doris.plugin.AuditEvent; import org.apache.doris.thrift.TQueryStatistics; @@ -30,22 +31,36 @@ import org.apache.logging.log4j.LogManager; import org.apache.logging.log4j.Logger; import java.util.ArrayList; -import java.util.HashSet; import java.util.Iterator; import java.util.List; import java.util.Map; import java.util.Set; import java.util.concurrent.locks.ReentrantReadWriteLock; +// NOTE: not using a lock for beToQueryStatsMap's update because it should void global lock for all be +// this may cause in some corner case missing statistics update,for example: +// time1: clear logic judge query 1 is timeout +// time2: query 1 is update by report +// time3: clear logic remove query 1 +// in this case, lost query stats is allowed. because query report time out is 60s by default, +// when this case happens, we should first to find why be not report for so long. public class WorkloadRuntimeStatusMgr { private static final Logger LOG = LogManager.getLogger(WorkloadRuntimeStatusMgr.class); - private Map<Long, Map<String, TQueryStatistics>> beToQueryStatsMap = Maps.newConcurrentMap(); - private Map<Long, Long> beLastReportTime = Maps.newConcurrentMap(); - private Map<String, Long> queryLastReportTime = Maps.newConcurrentMap(); + private Map<Long, BeReportInfo> beToQueryStatsMap = Maps.newConcurrentMap(); private final ReentrantReadWriteLock queryAuditEventLock = new ReentrantReadWriteLock(); private List<AuditEvent> queryAuditEventList = Lists.newLinkedList(); + private class BeReportInfo { + volatile long beLastReportTime; + + BeReportInfo(long beLastReportTime) { + this.beLastReportTime = beLastReportTime; + } + + Map<String, Pair<Long, TQueryStatistics>> queryStatsMap = Maps.newConcurrentMap(); + } + class WorkloadRuntimeStatsThread extends Daemon { WorkloadRuntimeStatusMgr workloadStatsMgr; @@ -130,41 +145,65 @@ public class WorkloadRuntimeStatusMgr { return; } long beId = params.backend_id; - Map<String, TQueryStatistics> queryIdMap = beToQueryStatsMap.get(beId); - beLastReportTime.put(beId, System.currentTimeMillis()); - if (queryIdMap == null) { - queryIdMap = Maps.newConcurrentMap(); - queryIdMap.putAll(params.query_statistics_map); - beToQueryStatsMap.put(beId, queryIdMap); + // NOTE(wb) one be sends update request one by one, + // so there is no need a global lock for beToQueryStatsMap here, + // just keep one be's put/remove/get is atomic operation is enough + long currentTime = System.currentTimeMillis(); + BeReportInfo beReportInfo = beToQueryStatsMap.get(beId); + if (beReportInfo == null) { + beReportInfo = new BeReportInfo(currentTime); + beToQueryStatsMap.put(beId, beReportInfo); } else { - long currentTime = System.currentTimeMillis(); - for (Map.Entry<String, TQueryStatistics> entry : params.query_statistics_map.entrySet()) { - queryIdMap.put(entry.getKey(), entry.getValue()); - queryLastReportTime.put(entry.getKey(), currentTime); + beReportInfo.beLastReportTime = currentTime; + } + for (Map.Entry<String, TQueryStatistics> entry : params.query_statistics_map.entrySet()) { + beReportInfo.queryStatsMap.put(entry.getKey(), Pair.of(currentTime, (TQueryStatistics) entry.getValue())); + } + } + + void clearReportTimeoutBeStatistics() { + // 1 clear report timeout be + Set<Long> currentBeIdSet = beToQueryStatsMap.keySet(); + Long currentTime = System.currentTimeMillis(); + for (Long beId : currentBeIdSet) { + BeReportInfo beReportInfo = beToQueryStatsMap.get(beId); + if (currentTime - beReportInfo.beLastReportTime > Config.be_report_query_statistics_timeout_ms) { + beToQueryStatsMap.remove(beId); + continue; + } + Set<String> queryIdSet = beReportInfo.queryStatsMap.keySet(); + for (String queryId : queryIdSet) { + Pair<Long, TQueryStatistics> pair = beReportInfo.queryStatsMap.get(queryId); + long queryLastReportTime = pair.first; + if (currentTime - queryLastReportTime > Config.be_report_query_statistics_timeout_ms) { + beReportInfo.queryStatsMap.remove(queryId); + } } } } + // NOTE: currently getQueryStatisticsMap must be called before clear beToQueryStatsMap + // so there is no need lock or null check when visit beToQueryStatsMap public Map<String, TQueryStatistics> getQueryStatisticsMap() { // 1 merge query stats in all be Set<Long> beIdSet = beToQueryStatsMap.keySet(); - Map<String, TQueryStatistics> retQueryMap = Maps.newHashMap(); + Map<String, TQueryStatistics> resultQueryMap = Maps.newHashMap(); for (Long beId : beIdSet) { - Map<String, TQueryStatistics> currentQueryMap = beToQueryStatsMap.get(beId); - Set<String> queryIdSet = currentQueryMap.keySet(); + BeReportInfo beReportInfo = beToQueryStatsMap.get(beId); + Set<String> queryIdSet = beReportInfo.queryStatsMap.keySet(); for (String queryId : queryIdSet) { - TQueryStatistics retQuery = retQueryMap.get(queryId); + TQueryStatistics curQueryStats = beReportInfo.queryStatsMap.get(queryId).second; + + TQueryStatistics retQuery = resultQueryMap.get(queryId); if (retQuery == null) { retQuery = new TQueryStatistics(); - retQueryMap.put(queryId, retQuery); + resultQueryMap.put(queryId, retQuery); } - - TQueryStatistics curQueryStats = currentQueryMap.get(queryId); mergeQueryStatistics(retQuery, curQueryStats); } } - return retQueryMap; + return resultQueryMap; } private void mergeQueryStatistics(TQueryStatistics dst, TQueryStatistics src) { @@ -176,44 +215,6 @@ public class WorkloadRuntimeStatusMgr { } } - void clearReportTimeoutBeStatistics() { - // 1 clear report timeout be - Set<Long> beNeedToRemove = new HashSet<>(); - Set<Long> currentBeIdSet = beToQueryStatsMap.keySet(); - Long currentTime = System.currentTimeMillis(); - for (Long beId : currentBeIdSet) { - Long lastReportTime = beLastReportTime.get(beId); - if (lastReportTime != null - && currentTime - lastReportTime > Config.be_report_query_statistics_timeout_ms) { - beNeedToRemove.add(beId); - } - } - for (Long beId : beNeedToRemove) { - beToQueryStatsMap.remove(beId); - beLastReportTime.remove(beId); - } - - // 2 clear report timeout query - Set<String> queryNeedToClear = new HashSet<>(); - Long newCurrentTime = System.currentTimeMillis(); - Set<String> queryLastReportTimeKeySet = queryLastReportTime.keySet(); - for (String queryId : queryLastReportTimeKeySet) { - Long lastReportTime = queryLastReportTime.get(queryId); - if (lastReportTime != null - && newCurrentTime - lastReportTime > Config.be_report_query_statistics_timeout_ms) { - queryNeedToClear.add(queryId); - } - } - - Set<Long> beIdSet = beToQueryStatsMap.keySet(); - for (String queryId : queryNeedToClear) { - for (Long beId : beIdSet) { - beToQueryStatsMap.get(beId).remove(queryId); - } - queryLastReportTime.remove(queryId); - } - } - private void queryAuditEventLogWriteLock() { queryAuditEventLock.writeLock().lock(); } diff --git a/gensrc/thrift/PaloInternalService.thrift b/gensrc/thrift/PaloInternalService.thrift index 4185e8a4276..4d0e7d01b86 100644 --- a/gensrc/thrift/PaloInternalService.thrift +++ b/gensrc/thrift/PaloInternalService.thrift @@ -458,6 +458,8 @@ struct TExecPlanFragmentParams { // scan node id -> scan range params, only for external file scan 24: optional map<Types.TPlanNodeId, PlanNodes.TFileScanRangeParams> file_scan_params + + 32: optional Types.TNetworkAddress current_connect_fe } struct TExecPlanFragmentParamsList { @@ -672,6 +674,7 @@ struct TPipelineFragmentParams { 28: optional string table_name // scan node id -> scan range params, only for external file scan 29: optional map<Types.TPlanNodeId, PlanNodes.TFileScanRangeParams> file_scan_params + 43: optional Types.TNetworkAddress current_connect_fe } struct TPipelineFragmentParamsList { --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org For additional commands, e-mail: commits-h...@doris.apache.org