This is an automated email from the ASF dual-hosted git repository. wangbo pushed a commit to branch 2.0.10-decimal-patch in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/2.0.10-decimal-patch by this push: new 25fc5d24826 Fix insert select missing audit log when connect follower FE (#36597) 25fc5d24826 is described below commit 25fc5d24826481ce080c7b36ed2ea8f6cc97a883 Author: wangbo <wan...@apache.org> AuthorDate: Thu Jun 20 15:23:24 2024 +0800 Fix insert select missing audit log when connect follower FE (#36597) ## Proposed changes pick #36454 --- be/src/runtime/fragment_mgr.cpp | 4 +- be/src/runtime/query_context.h | 7 +- be/src/runtime/runtime_query_statistics_mgr.cpp | 34 ++++-- .../apache/doris/planner/StreamLoadPlanner.java | 2 + .../java/org/apache/doris/qe/ConnectContext.java | 9 ++ .../main/java/org/apache/doris/qe/Coordinator.java | 14 +++ .../WorkloadRuntimeStatusMgr.java | 121 +++++++++++---------- gensrc/thrift/PaloInternalService.thrift | 3 + 8 files changed, 119 insertions(+), 75 deletions(-) diff --git a/be/src/runtime/fragment_mgr.cpp b/be/src/runtime/fragment_mgr.cpp index 1529d66def2..66538529c3f 100644 --- a/be/src/runtime/fragment_mgr.cpp +++ b/be/src/runtime/fragment_mgr.cpp @@ -692,9 +692,11 @@ Status FragmentMgr::_get_query_ctx(const Params& params, TUniqueId query_id, boo } query_ctx->coord_addr = params.coord; + query_ctx->current_connect_fe = params.current_connect_fe; LOG(INFO) << "query_id: " << UniqueId(query_ctx->query_id.hi, query_ctx->query_id.lo) << " coord_addr " << query_ctx->coord_addr - << " total fragment num on current host: " << params.fragment_num_on_host; + << " total fragment num on current host: " << params.fragment_num_on_host + << " report audit fe:" << params.current_connect_fe; query_ctx->query_globals = params.query_globals; if (params.__isset.resource_info) { diff --git a/be/src/runtime/query_context.h b/be/src/runtime/query_context.h index 8746483df4c..e47e09e5921 100644 --- a/be/src/runtime/query_context.h +++ b/be/src/runtime/query_context.h @@ -182,7 +182,7 @@ public: void register_query_statistics(std::shared_ptr<QueryStatistics> qs) { _exec_env->runtime_query_statistics_mgr()->register_query_statistics(print_id(query_id), qs, - coord_addr); + current_connect_fe); } std::shared_ptr<QueryStatistics> get_query_statistics() { @@ -198,7 +198,7 @@ public: if (_exec_env && _exec_env->runtime_query_statistics_mgr()) { // for ut FragmentMgrTest.normal _exec_env->runtime_query_statistics_mgr()->register_query_statistics( - query_id_str, qs, coord_addr); + query_id_str, qs, current_connect_fe); } } else { LOG(INFO) << " query " << query_id_str << " get memory query statistics failed "; @@ -212,7 +212,7 @@ public: if (_exec_env && _exec_env->runtime_query_statistics_mgr()) { // for ut FragmentMgrTest.normal _exec_env->runtime_query_statistics_mgr()->register_query_statistics( - print_id(query_id), _cpu_statistics, coord_addr); + print_id(query_id), _cpu_statistics, current_connect_fe); } } } @@ -226,6 +226,7 @@ public: std::string user; std::string group; TNetworkAddress coord_addr; + TNetworkAddress current_connect_fe; TQueryGlobals query_globals; /// In the current implementation, for multiple fragments executed by a query on the same BE node, diff --git a/be/src/runtime/runtime_query_statistics_mgr.cpp b/be/src/runtime/runtime_query_statistics_mgr.cpp index 5c40ea61763..0ed8cbeb79c 100644 --- a/be/src/runtime/runtime_query_statistics_mgr.cpp +++ b/be/src/runtime/runtime_query_statistics_mgr.cpp @@ -83,8 +83,8 @@ void RuntimeQueryStatiticsMgr::report_runtime_query_statistics() { if (!coord_status.ok()) { std::stringstream ss; - LOG(WARNING) << "could not get client " << add_str - << " when report workload runtime stats, reason is " + LOG(WARNING) << "[report_query_statistics]could not get client " << add_str + << " when report workload runtime stats, reason:" << coord_status.to_string(); continue; } @@ -103,26 +103,38 @@ void RuntimeQueryStatiticsMgr::report_runtime_query_statistics() { coord->reportExecStatus(res, params); rpc_result[addr] = true; } catch (apache::thrift::TApplicationException& e) { - LOG(WARNING) << "fe " << add_str - << " throw exception when report statistics, reason=" << e.what() + LOG(WARNING) << "[report_query_statistics]fe " << add_str + << " throw exception when report statistics, reason:" << e.what() << " , you can see fe log for details."; } catch (apache::thrift::transport::TTransportException& e) { - LOG(WARNING) << "report workload runtime statistics to " << add_str - << " failed, err: " << e.what(); + LOG(WARNING) << "[report_query_statistics]report workload runtime statistics to " + << add_str << " failed, reason: " << e.what(); rpc_status = coord.reopen(); if (!rpc_status.ok()) { - LOG(WARNING) - << "reopen thrift client failed when report workload runtime statistics to" - << add_str; + LOG(WARNING) << "[report_query_statistics]reopen thrift client failed when report " + "workload runtime statistics to" + << add_str; } else { try { coord->reportExecStatus(res, params); rpc_result[addr] = true; } catch (apache::thrift::transport::TTransportException& e2) { - LOG(WARNING) << "retry report workload runtime stats to " << add_str - << " failed, err: " << e2.what(); + LOG(WARNING) + << "[report_query_statistics]retry report workload runtime stats to " + << add_str << " failed, reason: " << e2.what(); + } catch (std::exception& e) { + LOG_WARNING( + "[report_query_statistics]unknow exception when report workload " + "runtime statistics to {}, " + "reason:{}. ", + add_str, e.what()); } } + } catch (std::exception& e) { + LOG_WARNING( + "[report_query_statistics]unknown exception when report workload runtime " + "statistics to {}, reason:{}. ", + add_str, e.what()); } } diff --git a/fe/fe-core/src/main/java/org/apache/doris/planner/StreamLoadPlanner.java b/fe/fe-core/src/main/java/org/apache/doris/planner/StreamLoadPlanner.java index 16c9e8f32eb..26c0f1fae65 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/planner/StreamLoadPlanner.java +++ b/fe/fe-core/src/main/java/org/apache/doris/planner/StreamLoadPlanner.java @@ -283,6 +283,7 @@ public class StreamLoadPlanner { params.setDescTbl(analyzer.getDescTbl().toThrift()); params.setCoord(new TNetworkAddress(FrontendOptions.getLocalHostAddress(), Config.rpc_port)); + params.setCurrentConnectFe(new TNetworkAddress(FrontendOptions.getLocalHostAddress(), Config.rpc_port)); TPlanFragmentExecParams execParams = new TPlanFragmentExecParams(); // user load id (streamLoadTask.id) as query id @@ -501,6 +502,7 @@ public class StreamLoadPlanner { pipParams.setDescTbl(analyzer.getDescTbl().toThrift()); pipParams.setCoord(new TNetworkAddress(FrontendOptions.getLocalHostAddress(), Config.rpc_port)); + pipParams.setCurrentConnectFe(new TNetworkAddress(FrontendOptions.getLocalHostAddress(), Config.rpc_port)); pipParams.setQueryId(loadId); pipParams.per_exch_num_senders = Maps.newHashMap(); pipParams.destinations = Lists.newArrayList(); diff --git a/fe/fe-core/src/main/java/org/apache/doris/qe/ConnectContext.java b/fe/fe-core/src/main/java/org/apache/doris/qe/ConnectContext.java index 2840021aea9..217c9633153 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/qe/ConnectContext.java +++ b/fe/fe-core/src/main/java/org/apache/doris/qe/ConnectContext.java @@ -187,6 +187,10 @@ public class ConnectContext { private String workloadGroupName = ""; + // isProxy used for forward request from other FE and used in one thread + // it's default thread-safe + private boolean isProxy = false; + public void setUserQueryTimeout(int queryTimeout) { if (queryTimeout > 0) { sessionVariable.setQueryTimeoutS(queryTimeout); @@ -298,6 +302,7 @@ public class ConnectContext { mysqlChannel = new MysqlChannel(connection, this); } else if (isProxy) { mysqlChannel = new ProxyMysqlChannel(); + this.isProxy = isProxy; } else { mysqlChannel = new DummyMysqlChannel(); } @@ -914,5 +919,9 @@ public class ConnectContext { public void setUserVars(Map<String, LiteralExpr> userVars) { this.userVars = userVars; } + + public boolean isProxy() { + return isProxy; + } } diff --git a/fe/fe-core/src/main/java/org/apache/doris/qe/Coordinator.java b/fe/fe-core/src/main/java/org/apache/doris/qe/Coordinator.java index e2889310428..e50bf77650d 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/qe/Coordinator.java +++ b/fe/fe-core/src/main/java/org/apache/doris/qe/Coordinator.java @@ -121,6 +121,7 @@ import io.opentelemetry.api.trace.Span; import io.opentelemetry.api.trace.SpanKind; import io.opentelemetry.context.Context; import io.opentelemetry.context.Scope; +import org.apache.commons.lang3.StringUtils; import org.apache.commons.lang3.tuple.ImmutableTriple; import org.apache.commons.lang3.tuple.Triple; import org.apache.logging.log4j.LogManager; @@ -178,6 +179,10 @@ public class Coordinator implements CoordInterface { private final TQueryGlobals queryGlobals = new TQueryGlobals(); private TQueryOptions queryOptions; private TNetworkAddress coordAddress; + // fe audit log in connected FE,if a query is forward + // we should send the connected FE to be, + // then be report query statistics to the connected FE + private TNetworkAddress currentConnectFE; // protects all fields below private final Lock lock = new ReentrantLock(); @@ -497,6 +502,13 @@ public class Coordinator implements CoordInterface { } coordAddress = new TNetworkAddress(localIP, Config.rpc_port); + if (ConnectContext.get() != null && ConnectContext.get().isProxy() && !StringUtils.isEmpty( + ConnectContext.get().getCurrentConnectedFEIp())) { + currentConnectFE = new TNetworkAddress(ConnectContext.get().getCurrentConnectedFEIp(), + Config.rpc_port); + } else { + currentConnectFE = coordAddress; + } this.idToBackend = Env.getCurrentSystemInfo().getIdToBackend(); if (LOG.isDebugEnabled()) { @@ -3206,6 +3218,7 @@ public class Coordinator implements CoordInterface { params.params.setSenderId(i); params.params.setNumSenders(instanceExecParams.size()); params.setCoord(coordAddress); + params.setCurrentConnectFe(currentConnectFE); params.setBackendNum(backendNum++); params.setQueryGlobals(queryGlobals); params.setQueryOptions(queryOptions); @@ -3292,6 +3305,7 @@ public class Coordinator implements CoordInterface { params.setDestinations(destinations); params.setNumSenders(instanceExecParams.size()); params.setCoord(coordAddress); + params.setCurrentConnectFe(currentConnectFE); params.setQueryGlobals(queryGlobals); params.setQueryOptions(queryOptions); params.query_options.setEnablePipelineEngine(true); diff --git a/fe/fe-core/src/main/java/org/apache/doris/resource/workloadschedpolicy/WorkloadRuntimeStatusMgr.java b/fe/fe-core/src/main/java/org/apache/doris/resource/workloadschedpolicy/WorkloadRuntimeStatusMgr.java index 623c3c9aaa9..3e9f7a381c4 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/resource/workloadschedpolicy/WorkloadRuntimeStatusMgr.java +++ b/fe/fe-core/src/main/java/org/apache/doris/resource/workloadschedpolicy/WorkloadRuntimeStatusMgr.java @@ -19,6 +19,7 @@ package org.apache.doris.resource.workloadschedpolicy; import org.apache.doris.catalog.Env; import org.apache.doris.common.Config; +import org.apache.doris.common.Pair; import org.apache.doris.common.util.Daemon; import org.apache.doris.plugin.AuditEvent; import org.apache.doris.thrift.TQueryStatistics; @@ -30,22 +31,36 @@ import org.apache.logging.log4j.LogManager; import org.apache.logging.log4j.Logger; import java.util.ArrayList; -import java.util.HashSet; import java.util.Iterator; import java.util.List; import java.util.Map; import java.util.Set; import java.util.concurrent.locks.ReentrantReadWriteLock; +// NOTE: not using a lock for beToQueryStatsMap's update because it should void global lock for all be +// this may cause in some corner case missing statistics update,for example: +// time1: clear logic judge query 1 is timeout +// time2: query 1 is update by report +// time3: clear logic remove query 1 +// in this case, lost query stats is allowed. because query report time out is 60s by default, +// when this case happens, we should first to find why be not report for so long. public class WorkloadRuntimeStatusMgr { private static final Logger LOG = LogManager.getLogger(WorkloadRuntimeStatusMgr.class); - private Map<Long, Map<String, TQueryStatistics>> beToQueryStatsMap = Maps.newConcurrentMap(); - private Map<Long, Long> beLastReportTime = Maps.newConcurrentMap(); - private Map<String, Long> queryLastReportTime = Maps.newConcurrentMap(); + private Map<Long, BeReportInfo> beToQueryStatsMap = Maps.newConcurrentMap(); private final ReentrantReadWriteLock queryAuditEventLock = new ReentrantReadWriteLock(); private List<AuditEvent> queryAuditEventList = Lists.newLinkedList(); + private class BeReportInfo { + volatile long beLastReportTime; + + BeReportInfo(long beLastReportTime) { + this.beLastReportTime = beLastReportTime; + } + + Map<String, Pair<Long, TQueryStatistics>> queryStatsMap = Maps.newConcurrentMap(); + } + class WorkloadRuntimeStatsThread extends Daemon { WorkloadRuntimeStatusMgr workloadStatsMgr; @@ -130,41 +145,65 @@ public class WorkloadRuntimeStatusMgr { return; } long beId = params.backend_id; - Map<String, TQueryStatistics> queryIdMap = beToQueryStatsMap.get(beId); - beLastReportTime.put(beId, System.currentTimeMillis()); - if (queryIdMap == null) { - queryIdMap = Maps.newConcurrentMap(); - queryIdMap.putAll(params.query_statistics_map); - beToQueryStatsMap.put(beId, queryIdMap); + // NOTE(wb) one be sends update request one by one, + // so there is no need a global lock for beToQueryStatsMap here, + // just keep one be's put/remove/get is atomic operation is enough + long currentTime = System.currentTimeMillis(); + BeReportInfo beReportInfo = beToQueryStatsMap.get(beId); + if (beReportInfo == null) { + beReportInfo = new BeReportInfo(currentTime); + beToQueryStatsMap.put(beId, beReportInfo); } else { - long currentTime = System.currentTimeMillis(); - for (Map.Entry<String, TQueryStatistics> entry : params.query_statistics_map.entrySet()) { - queryIdMap.put(entry.getKey(), entry.getValue()); - queryLastReportTime.put(entry.getKey(), currentTime); + beReportInfo.beLastReportTime = currentTime; + } + for (Map.Entry<String, TQueryStatistics> entry : params.query_statistics_map.entrySet()) { + beReportInfo.queryStatsMap.put(entry.getKey(), Pair.of(currentTime, (TQueryStatistics) entry.getValue())); + } + } + + void clearReportTimeoutBeStatistics() { + // 1 clear report timeout be + Set<Long> currentBeIdSet = beToQueryStatsMap.keySet(); + Long currentTime = System.currentTimeMillis(); + for (Long beId : currentBeIdSet) { + BeReportInfo beReportInfo = beToQueryStatsMap.get(beId); + if (currentTime - beReportInfo.beLastReportTime > Config.be_report_query_statistics_timeout_ms) { + beToQueryStatsMap.remove(beId); + continue; + } + Set<String> queryIdSet = beReportInfo.queryStatsMap.keySet(); + for (String queryId : queryIdSet) { + Pair<Long, TQueryStatistics> pair = beReportInfo.queryStatsMap.get(queryId); + long queryLastReportTime = pair.first; + if (currentTime - queryLastReportTime > Config.be_report_query_statistics_timeout_ms) { + beReportInfo.queryStatsMap.remove(queryId); + } } } } + // NOTE: currently getQueryStatisticsMap must be called before clear beToQueryStatsMap + // so there is no need lock or null check when visit beToQueryStatsMap public Map<String, TQueryStatistics> getQueryStatisticsMap() { // 1 merge query stats in all be Set<Long> beIdSet = beToQueryStatsMap.keySet(); - Map<String, TQueryStatistics> retQueryMap = Maps.newHashMap(); + Map<String, TQueryStatistics> resultQueryMap = Maps.newHashMap(); for (Long beId : beIdSet) { - Map<String, TQueryStatistics> currentQueryMap = beToQueryStatsMap.get(beId); - Set<String> queryIdSet = currentQueryMap.keySet(); + BeReportInfo beReportInfo = beToQueryStatsMap.get(beId); + Set<String> queryIdSet = beReportInfo.queryStatsMap.keySet(); for (String queryId : queryIdSet) { - TQueryStatistics retQuery = retQueryMap.get(queryId); + TQueryStatistics curQueryStats = beReportInfo.queryStatsMap.get(queryId).second; + + TQueryStatistics retQuery = resultQueryMap.get(queryId); if (retQuery == null) { retQuery = new TQueryStatistics(); - retQueryMap.put(queryId, retQuery); + resultQueryMap.put(queryId, retQuery); } - - TQueryStatistics curQueryStats = currentQueryMap.get(queryId); mergeQueryStatistics(retQuery, curQueryStats); } } - return retQueryMap; + return resultQueryMap; } private void mergeQueryStatistics(TQueryStatistics dst, TQueryStatistics src) { @@ -176,44 +215,6 @@ public class WorkloadRuntimeStatusMgr { } } - void clearReportTimeoutBeStatistics() { - // 1 clear report timeout be - Set<Long> beNeedToRemove = new HashSet<>(); - Set<Long> currentBeIdSet = beToQueryStatsMap.keySet(); - Long currentTime = System.currentTimeMillis(); - for (Long beId : currentBeIdSet) { - Long lastReportTime = beLastReportTime.get(beId); - if (lastReportTime != null - && currentTime - lastReportTime > Config.be_report_query_statistics_timeout_ms) { - beNeedToRemove.add(beId); - } - } - for (Long beId : beNeedToRemove) { - beToQueryStatsMap.remove(beId); - beLastReportTime.remove(beId); - } - - // 2 clear report timeout query - Set<String> queryNeedToClear = new HashSet<>(); - Long newCurrentTime = System.currentTimeMillis(); - Set<String> queryLastReportTimeKeySet = queryLastReportTime.keySet(); - for (String queryId : queryLastReportTimeKeySet) { - Long lastReportTime = queryLastReportTime.get(queryId); - if (lastReportTime != null - && newCurrentTime - lastReportTime > Config.be_report_query_statistics_timeout_ms) { - queryNeedToClear.add(queryId); - } - } - - Set<Long> beIdSet = beToQueryStatsMap.keySet(); - for (String queryId : queryNeedToClear) { - for (Long beId : beIdSet) { - beToQueryStatsMap.get(beId).remove(queryId); - } - queryLastReportTime.remove(queryId); - } - } - private void queryAuditEventLogWriteLock() { queryAuditEventLock.writeLock().lock(); } diff --git a/gensrc/thrift/PaloInternalService.thrift b/gensrc/thrift/PaloInternalService.thrift index 22422aeabac..4b6a19783f7 100644 --- a/gensrc/thrift/PaloInternalService.thrift +++ b/gensrc/thrift/PaloInternalService.thrift @@ -453,6 +453,8 @@ struct TExecPlanFragmentParams { // scan node id -> scan range params, only for external file scan 24: optional map<Types.TPlanNodeId, PlanNodes.TFileScanRangeParams> file_scan_params + + 32: optional Types.TNetworkAddress current_connect_fe } struct TExecPlanFragmentParamsList { @@ -667,6 +669,7 @@ struct TPipelineFragmentParams { 28: optional string table_name // scan node id -> scan range params, only for external file scan 29: optional map<Types.TPlanNodeId, PlanNodes.TFileScanRangeParams> file_scan_params + 43: optional Types.TNetworkAddress current_connect_fe } struct TPipelineFragmentParamsList { --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org For additional commands, e-mail: commits-h...@doris.apache.org