This is an automated email from the ASF dual-hosted git repository. lijibing pushed a commit to branch high-priority-column in repository https://gitbox.apache.org/repos/asf/doris.git
commit fe58392bd4729cb82efbdaeabd7dd9ab13512cfe Author: Jibing-Li <64681310+jibing...@users.noreply.github.com> AuthorDate: Wed Mar 6 17:05:21 2024 +0800 Support follower sync query columns to master. (#31859) --- .../main/java/org/apache/doris/catalog/Env.java | 8 ++ .../apache/doris/service/FrontendServiceImpl.java | 8 ++ .../apache/doris/statistics/AnalysisManager.java | 15 +++ .../doris/statistics/FollowerColumnSender.java | 120 +++++++++++++++++++++ .../doris/statistics/HighPriorityColumn.java | 11 ++ .../doris/statistics/StatisticsAutoCollector.java | 21 +++- gensrc/thrift/FrontendService.thrift | 13 +++ 7 files changed, 194 insertions(+), 2 deletions(-) diff --git a/fe/fe-core/src/main/java/org/apache/doris/catalog/Env.java b/fe/fe-core/src/main/java/org/apache/doris/catalog/Env.java index ba31670cbc2..945ccd1be5e 100755 --- a/fe/fe-core/src/main/java/org/apache/doris/catalog/Env.java +++ b/fe/fe-core/src/main/java/org/apache/doris/catalog/Env.java @@ -240,6 +240,7 @@ import org.apache.doris.scheduler.registry.ExportTaskRegister; import org.apache.doris.service.ExecuteEnv; import org.apache.doris.service.FrontendOptions; import org.apache.doris.statistics.AnalysisManager; +import org.apache.doris.statistics.FollowerColumnSender; import org.apache.doris.statistics.StatisticsAutoCollector; import org.apache.doris.statistics.StatisticsCache; import org.apache.doris.statistics.StatisticsCleaner; @@ -522,6 +523,8 @@ public class Env { private StatisticsJobAppender statisticsJobAppender; + private FollowerColumnSender followerColumnSender; + private HiveTransactionMgr hiveTransactionMgr; private TopicPublisherThread topicPublisherThread; @@ -1732,6 +1735,11 @@ public class Env { if (analysisManager != null) { analysisManager.getStatisticsCache().preHeat(); } + + if (followerColumnSender == null) { + followerColumnSender = new FollowerColumnSender(); + followerColumnSender.start(); + } } // Set global variable 'lower_case_table_names' only when the cluster is initialized. diff --git a/fe/fe-core/src/main/java/org/apache/doris/service/FrontendServiceImpl.java b/fe/fe-core/src/main/java/org/apache/doris/service/FrontendServiceImpl.java index 04b7ea43997..c6f85fb96aa 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/service/FrontendServiceImpl.java +++ b/fe/fe-core/src/main/java/org/apache/doris/service/FrontendServiceImpl.java @@ -222,6 +222,7 @@ import org.apache.doris.thrift.TStreamLoadMultiTablePutResult; import org.apache.doris.thrift.TStreamLoadPutRequest; import org.apache.doris.thrift.TStreamLoadPutResult; import org.apache.doris.thrift.TStringLiteral; +import org.apache.doris.thrift.TSyncQueryColumns; import org.apache.doris.thrift.TTableIndexQueryStats; import org.apache.doris.thrift.TTableMetadataNameIds; import org.apache.doris.thrift.TTableQueryStats; @@ -3815,4 +3816,11 @@ public class FrontendServiceImpl implements FrontendService.Iface { return result; } + @Override + public TStatus syncQueryColumns(TSyncQueryColumns request) throws TException { + Env.getCurrentEnv().getAnalysisManager().mergeFollowerQueryColumns(request.highPriorityColumns, + request.midPriorityColumns); + return new TStatus(TStatusCode.OK); + } + } diff --git a/fe/fe-core/src/main/java/org/apache/doris/statistics/AnalysisManager.java b/fe/fe-core/src/main/java/org/apache/doris/statistics/AnalysisManager.java index bac84996dd1..e2bd85a0f43 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/statistics/AnalysisManager.java +++ b/fe/fe-core/src/main/java/org/apache/doris/statistics/AnalysisManager.java @@ -64,6 +64,7 @@ import org.apache.doris.statistics.util.StatisticsUtil; import org.apache.doris.system.Frontend; import org.apache.doris.system.SystemInfoService; import org.apache.doris.thrift.TInvalidateFollowerStatsCacheRequest; +import org.apache.doris.thrift.TQueryColumn; import com.google.common.annotations.VisibleForTesting; import com.google.common.collect.ImmutableList; @@ -1191,4 +1192,18 @@ public class AnalysisManager implements Writable { } } } + + public void mergeFollowerQueryColumns(Collection<TQueryColumn> highColumns, + Collection<TQueryColumn> midColumns) { + for (TQueryColumn c : highColumns) { + if (!highPriorityColumns.offer(new HighPriorityColumn(c.catalogId, c.dbId, c.tblId, c.colName))) { + break; + } + } + for (TQueryColumn c : midColumns) { + if (!midPriorityColumns.offer(new HighPriorityColumn(c.catalogId, c.dbId, c.tblId, c.colName))) { + break; + } + } + } } diff --git a/fe/fe-core/src/main/java/org/apache/doris/statistics/FollowerColumnSender.java b/fe/fe-core/src/main/java/org/apache/doris/statistics/FollowerColumnSender.java new file mode 100644 index 00000000000..181000c1ef2 --- /dev/null +++ b/fe/fe-core/src/main/java/org/apache/doris/statistics/FollowerColumnSender.java @@ -0,0 +1,120 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package org.apache.doris.statistics; + +import org.apache.doris.catalog.Env; +import org.apache.doris.common.ClientPool; +import org.apache.doris.common.util.MasterDaemon; +import org.apache.doris.ha.FrontendNodeType; +import org.apache.doris.statistics.util.StatisticsUtil; +import org.apache.doris.system.Frontend; +import org.apache.doris.thrift.FrontendService; +import org.apache.doris.thrift.TNetworkAddress; +import org.apache.doris.thrift.TQueryColumn; +import org.apache.doris.thrift.TSyncQueryColumns; + +import org.apache.logging.log4j.LogManager; +import org.apache.logging.log4j.Logger; + +import java.net.InetSocketAddress; +import java.util.List; +import java.util.stream.Collectors; + +public class FollowerColumnSender extends MasterDaemon { + + private static final Logger LOG = LogManager.getLogger(FollowerColumnSender.class); + + public static final long INTERVAL = 5000; + + public FollowerColumnSender() { + super("Follower Column Sender", INTERVAL); + } + + @Override + protected void runAfterCatalogReady() { + if (!StatisticsUtil.enableAutoAnalyze()) { + return; + } + if (Env.getCurrentEnv().isMaster()) { + return; + } + if (Env.isCheckpointThread()) { + return; + } + send(); + } + + protected void send() { + if (Env.getCurrentEnv().isMaster()) { + return; + } + Env currentEnv = Env.getCurrentEnv(); + AnalysisManager analysisManager = currentEnv.getAnalysisManager(); + if (analysisManager.highPriorityColumns.isEmpty() && analysisManager.midPriorityColumns.isEmpty()) { + return; + } + List<TQueryColumn> highPriorityColumns + = analysisManager.highPriorityColumns + .stream() + .map(HighPriorityColumn::toThrift) + .collect(Collectors.toList()); + List<TQueryColumn> midPriorityColumns + = analysisManager.midPriorityColumns + .stream() + .map(HighPriorityColumn::toThrift) + .collect(Collectors.toList()); + analysisManager.highPriorityColumns.clear(); + analysisManager.midPriorityColumns.clear(); + TSyncQueryColumns queryColumns = new TSyncQueryColumns(); + queryColumns.highPriorityColumns = highPriorityColumns; + queryColumns.midPriorityColumns = midPriorityColumns; + Frontend master = null; + try { + InetSocketAddress masterAddress = currentEnv.getHaProtocol().getLeader(); + for (Frontend fe : currentEnv.getFrontends(FrontendNodeType.FOLLOWER)) { + InetSocketAddress socketAddress = new InetSocketAddress(fe.getHost(), fe.getEditLogPort()); + if (socketAddress.equals(masterAddress)) { + master = fe; + break; + } + } + } catch (Exception e) { + LOG.warn("Failed to find master FE.", e); + return; + } + + if (master == null) { + LOG.warn("No master found in cluster."); + return; + } + TNetworkAddress address = new TNetworkAddress(master.getHost(), master.getRpcPort()); + FrontendService.Client client = null; + try { + client = ClientPool.frontendPool.borrowObject(address); + client.syncQueryColumns(queryColumns); + LOG.info("Send {} high priority columns and {} mid priority columns to master.", + highPriorityColumns.size(), midPriorityColumns.size()); + } catch (Throwable t) { + LOG.warn("Failed to sync stats to master: {}", address, t); + } finally { + if (client != null) { + ClientPool.frontendPool.returnObject(address, client); + } + } + } +} diff --git a/fe/fe-core/src/main/java/org/apache/doris/statistics/HighPriorityColumn.java b/fe/fe-core/src/main/java/org/apache/doris/statistics/HighPriorityColumn.java index c4bc20c399a..b2292ef725d 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/statistics/HighPriorityColumn.java +++ b/fe/fe-core/src/main/java/org/apache/doris/statistics/HighPriorityColumn.java @@ -17,6 +17,8 @@ package org.apache.doris.statistics; +import org.apache.doris.thrift.TQueryColumn; + import java.util.Objects; public class HighPriorityColumn { @@ -52,4 +54,13 @@ public class HighPriorityColumn { && this.tblId == otherCriticalColumn.tblId && this.colName.equals(otherCriticalColumn.colName); } + + public TQueryColumn toThrift() { + TQueryColumn tQueryColumn = new TQueryColumn(); + tQueryColumn.catalogId = catalogId; + tQueryColumn.dbId = dbId; + tQueryColumn.tblId = tblId; + tQueryColumn.colName = colName; + return tQueryColumn; + } } diff --git a/fe/fe-core/src/main/java/org/apache/doris/statistics/StatisticsAutoCollector.java b/fe/fe-core/src/main/java/org/apache/doris/statistics/StatisticsAutoCollector.java index c498881bfbf..e0df94b5cb0 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/statistics/StatisticsAutoCollector.java +++ b/fe/fe-core/src/main/java/org/apache/doris/statistics/StatisticsAutoCollector.java @@ -132,6 +132,7 @@ public class StatisticsAutoCollector extends StatisticsCollector { } } + // TODO: Need refactor, hard to understand now. protected boolean needAnalyzeColumn(TableIf table, String column) { AnalysisManager manager = Env.getServingEnv().getAnalysisManager(); TableStatsMeta tableStatsStatus = manager.findTableStatsStatus(table.getId()); @@ -151,11 +152,17 @@ public class StatisticsAutoCollector extends StatisticsCollector { if (lastAnalyzeUpdateRows == 0 && currentUpdatedRows > 0) { return true; } + if (lastAnalyzeUpdateRows > currentUpdatedRows) { + // Shouldn't happen. Just in case. + return true; + } OlapTable olapTable = (OlapTable) table; + long currentRowCount = olapTable.getRowCount(); + long lastAnalyzeRowCount = columnStatsMeta.rowCount; if (tableStatsStatus.newPartitionLoaded.get() && olapTable.isPartitionColumn(column)) { return true; } - if (columnStatsMeta.rowCount == 0 && olapTable.getRowCount() > 0) { + if (lastAnalyzeRowCount == 0 && currentRowCount > 0) { return true; } if (currentUpdatedRows == lastAnalyzeUpdateRows) { @@ -163,7 +170,17 @@ public class StatisticsAutoCollector extends StatisticsCollector { } double healthValue = ((double) (currentUpdatedRows - lastAnalyzeUpdateRows) / (double) currentUpdatedRows) * 100.0; - LOG.info("Column " + column + " health value is " + healthValue); + LOG.info("Column " + column + " update rows health value is " + healthValue); + if (healthValue < StatisticsUtil.getTableStatsHealthThreshold()) { + return true; + } + if (currentRowCount == 0 && lastAnalyzeRowCount != 0) { + return true; + } + if (currentRowCount == 0 && lastAnalyzeRowCount == 0) { + return false; + } + healthValue = ((double) (currentRowCount - lastAnalyzeRowCount) / (double) currentRowCount) * 100.0; return healthValue < StatisticsUtil.getTableStatsHealthThreshold(); } else { if (!(table instanceof HMSExternalTable)) { diff --git a/gensrc/thrift/FrontendService.thrift b/gensrc/thrift/FrontendService.thrift index c7bdbb4fc0e..2ddf52afd49 100644 --- a/gensrc/thrift/FrontendService.thrift +++ b/gensrc/thrift/FrontendService.thrift @@ -1422,6 +1422,18 @@ struct TReportCommitTxnResultRequest { 4: optional binary payload } +struct TQueryColumn { + 1: optional i64 catalogId + 2: optional i64 dbId + 3: optional i64 tblId + 4: optional string colName +} + +struct TSyncQueryColumns { + 1: optional list<TQueryColumn> highPriorityColumns; + 2: optional list<TQueryColumn> midPriorityColumns; +} + service FrontendService { TGetDbsResult getDbNames(1: TGetDbsParams params) TGetTablesResult getTableNames(1: TGetTablesParams params) @@ -1509,4 +1521,5 @@ service FrontendService { TShowProcessListResult showProcessList(1: TShowProcessListRequest request) Status.TStatus reportCommitTxnResult(1: TReportCommitTxnResultRequest request) + Status.TStatus syncQueryColumns(1: TSyncQueryColumns request) } --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org For additional commands, e-mail: commits-h...@doris.apache.org