This is an automated email from the ASF dual-hosted git repository.

lijibing pushed a commit to branch high-priority-column
in repository https://gitbox.apache.org/repos/asf/doris.git

commit fe58392bd4729cb82efbdaeabd7dd9ab13512cfe
Author: Jibing-Li <64681310+jibing...@users.noreply.github.com>
AuthorDate: Wed Mar 6 17:05:21 2024 +0800

    Support follower sync query columns to master. (#31859)
---
 .../main/java/org/apache/doris/catalog/Env.java    |   8 ++
 .../apache/doris/service/FrontendServiceImpl.java  |   8 ++
 .../apache/doris/statistics/AnalysisManager.java   |  15 +++
 .../doris/statistics/FollowerColumnSender.java     | 120 +++++++++++++++++++++
 .../doris/statistics/HighPriorityColumn.java       |  11 ++
 .../doris/statistics/StatisticsAutoCollector.java  |  21 +++-
 gensrc/thrift/FrontendService.thrift               |  13 +++
 7 files changed, 194 insertions(+), 2 deletions(-)

diff --git a/fe/fe-core/src/main/java/org/apache/doris/catalog/Env.java 
b/fe/fe-core/src/main/java/org/apache/doris/catalog/Env.java
index ba31670cbc2..945ccd1be5e 100755
--- a/fe/fe-core/src/main/java/org/apache/doris/catalog/Env.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/catalog/Env.java
@@ -240,6 +240,7 @@ import 
org.apache.doris.scheduler.registry.ExportTaskRegister;
 import org.apache.doris.service.ExecuteEnv;
 import org.apache.doris.service.FrontendOptions;
 import org.apache.doris.statistics.AnalysisManager;
+import org.apache.doris.statistics.FollowerColumnSender;
 import org.apache.doris.statistics.StatisticsAutoCollector;
 import org.apache.doris.statistics.StatisticsCache;
 import org.apache.doris.statistics.StatisticsCleaner;
@@ -522,6 +523,8 @@ public class Env {
 
     private StatisticsJobAppender statisticsJobAppender;
 
+    private FollowerColumnSender followerColumnSender;
+
     private HiveTransactionMgr hiveTransactionMgr;
 
     private TopicPublisherThread topicPublisherThread;
@@ -1732,6 +1735,11 @@ public class Env {
         if (analysisManager != null) {
             analysisManager.getStatisticsCache().preHeat();
         }
+
+        if (followerColumnSender == null) {
+            followerColumnSender = new FollowerColumnSender();
+            followerColumnSender.start();
+        }
     }
 
     // Set global variable 'lower_case_table_names' only when the cluster is 
initialized.
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/service/FrontendServiceImpl.java 
b/fe/fe-core/src/main/java/org/apache/doris/service/FrontendServiceImpl.java
index 04b7ea43997..c6f85fb96aa 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/service/FrontendServiceImpl.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/service/FrontendServiceImpl.java
@@ -222,6 +222,7 @@ import 
org.apache.doris.thrift.TStreamLoadMultiTablePutResult;
 import org.apache.doris.thrift.TStreamLoadPutRequest;
 import org.apache.doris.thrift.TStreamLoadPutResult;
 import org.apache.doris.thrift.TStringLiteral;
+import org.apache.doris.thrift.TSyncQueryColumns;
 import org.apache.doris.thrift.TTableIndexQueryStats;
 import org.apache.doris.thrift.TTableMetadataNameIds;
 import org.apache.doris.thrift.TTableQueryStats;
@@ -3815,4 +3816,11 @@ public class FrontendServiceImpl implements 
FrontendService.Iface {
         return result;
     }
 
+    @Override
+    public TStatus syncQueryColumns(TSyncQueryColumns request) throws 
TException {
+        
Env.getCurrentEnv().getAnalysisManager().mergeFollowerQueryColumns(request.highPriorityColumns,
+                request.midPriorityColumns);
+        return new TStatus(TStatusCode.OK);
+    }
+
 }
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/statistics/AnalysisManager.java 
b/fe/fe-core/src/main/java/org/apache/doris/statistics/AnalysisManager.java
index bac84996dd1..e2bd85a0f43 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/statistics/AnalysisManager.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/statistics/AnalysisManager.java
@@ -64,6 +64,7 @@ import org.apache.doris.statistics.util.StatisticsUtil;
 import org.apache.doris.system.Frontend;
 import org.apache.doris.system.SystemInfoService;
 import org.apache.doris.thrift.TInvalidateFollowerStatsCacheRequest;
+import org.apache.doris.thrift.TQueryColumn;
 
 import com.google.common.annotations.VisibleForTesting;
 import com.google.common.collect.ImmutableList;
@@ -1191,4 +1192,18 @@ public class AnalysisManager implements Writable {
             }
         }
     }
+
+    public void mergeFollowerQueryColumns(Collection<TQueryColumn> highColumns,
+            Collection<TQueryColumn> midColumns) {
+        for (TQueryColumn c : highColumns) {
+            if (!highPriorityColumns.offer(new HighPriorityColumn(c.catalogId, 
c.dbId, c.tblId, c.colName))) {
+                break;
+            }
+        }
+        for (TQueryColumn c : midColumns) {
+            if (!midPriorityColumns.offer(new HighPriorityColumn(c.catalogId, 
c.dbId, c.tblId, c.colName))) {
+                break;
+            }
+        }
+    }
 }
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/statistics/FollowerColumnSender.java
 
b/fe/fe-core/src/main/java/org/apache/doris/statistics/FollowerColumnSender.java
new file mode 100644
index 00000000000..181000c1ef2
--- /dev/null
+++ 
b/fe/fe-core/src/main/java/org/apache/doris/statistics/FollowerColumnSender.java
@@ -0,0 +1,120 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package org.apache.doris.statistics;
+
+import org.apache.doris.catalog.Env;
+import org.apache.doris.common.ClientPool;
+import org.apache.doris.common.util.MasterDaemon;
+import org.apache.doris.ha.FrontendNodeType;
+import org.apache.doris.statistics.util.StatisticsUtil;
+import org.apache.doris.system.Frontend;
+import org.apache.doris.thrift.FrontendService;
+import org.apache.doris.thrift.TNetworkAddress;
+import org.apache.doris.thrift.TQueryColumn;
+import org.apache.doris.thrift.TSyncQueryColumns;
+
+import org.apache.logging.log4j.LogManager;
+import org.apache.logging.log4j.Logger;
+
+import java.net.InetSocketAddress;
+import java.util.List;
+import java.util.stream.Collectors;
+
+public class FollowerColumnSender extends MasterDaemon {
+
+    private static final Logger LOG = 
LogManager.getLogger(FollowerColumnSender.class);
+
+    public static final long INTERVAL = 5000;
+
+    public FollowerColumnSender() {
+        super("Follower Column Sender", INTERVAL);
+    }
+
+    @Override
+    protected void runAfterCatalogReady() {
+        if (!StatisticsUtil.enableAutoAnalyze()) {
+            return;
+        }
+        if (Env.getCurrentEnv().isMaster()) {
+            return;
+        }
+        if (Env.isCheckpointThread()) {
+            return;
+        }
+        send();
+    }
+
+    protected void send() {
+        if (Env.getCurrentEnv().isMaster()) {
+            return;
+        }
+        Env currentEnv = Env.getCurrentEnv();
+        AnalysisManager analysisManager = currentEnv.getAnalysisManager();
+        if (analysisManager.highPriorityColumns.isEmpty() && 
analysisManager.midPriorityColumns.isEmpty()) {
+            return;
+        }
+        List<TQueryColumn> highPriorityColumns
+                = analysisManager.highPriorityColumns
+                .stream()
+                .map(HighPriorityColumn::toThrift)
+                .collect(Collectors.toList());
+        List<TQueryColumn> midPriorityColumns
+                = analysisManager.midPriorityColumns
+                .stream()
+                .map(HighPriorityColumn::toThrift)
+                .collect(Collectors.toList());
+        analysisManager.highPriorityColumns.clear();
+        analysisManager.midPriorityColumns.clear();
+        TSyncQueryColumns queryColumns = new TSyncQueryColumns();
+        queryColumns.highPriorityColumns = highPriorityColumns;
+        queryColumns.midPriorityColumns = midPriorityColumns;
+        Frontend master = null;
+        try {
+            InetSocketAddress masterAddress = 
currentEnv.getHaProtocol().getLeader();
+            for (Frontend fe : 
currentEnv.getFrontends(FrontendNodeType.FOLLOWER)) {
+                InetSocketAddress socketAddress = new 
InetSocketAddress(fe.getHost(), fe.getEditLogPort());
+                if (socketAddress.equals(masterAddress)) {
+                    master = fe;
+                    break;
+                }
+            }
+        } catch (Exception e) {
+            LOG.warn("Failed to find master FE.", e);
+            return;
+        }
+
+        if (master == null) {
+            LOG.warn("No master found in cluster.");
+            return;
+        }
+        TNetworkAddress address = new TNetworkAddress(master.getHost(), 
master.getRpcPort());
+        FrontendService.Client client = null;
+        try {
+            client = ClientPool.frontendPool.borrowObject(address);
+            client.syncQueryColumns(queryColumns);
+            LOG.info("Send {} high priority columns and {} mid priority 
columns to master.",
+                    highPriorityColumns.size(), midPriorityColumns.size());
+        } catch (Throwable t) {
+            LOG.warn("Failed to sync stats to master: {}", address, t);
+        } finally {
+            if (client != null) {
+                ClientPool.frontendPool.returnObject(address, client);
+            }
+        }
+    }
+}
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/statistics/HighPriorityColumn.java 
b/fe/fe-core/src/main/java/org/apache/doris/statistics/HighPriorityColumn.java
index c4bc20c399a..b2292ef725d 100644
--- 
a/fe/fe-core/src/main/java/org/apache/doris/statistics/HighPriorityColumn.java
+++ 
b/fe/fe-core/src/main/java/org/apache/doris/statistics/HighPriorityColumn.java
@@ -17,6 +17,8 @@
 
 package org.apache.doris.statistics;
 
+import org.apache.doris.thrift.TQueryColumn;
+
 import java.util.Objects;
 
 public class HighPriorityColumn {
@@ -52,4 +54,13 @@ public class HighPriorityColumn {
             && this.tblId == otherCriticalColumn.tblId
             && this.colName.equals(otherCriticalColumn.colName);
     }
+
+    public TQueryColumn toThrift() {
+        TQueryColumn tQueryColumn = new TQueryColumn();
+        tQueryColumn.catalogId = catalogId;
+        tQueryColumn.dbId = dbId;
+        tQueryColumn.tblId = tblId;
+        tQueryColumn.colName = colName;
+        return tQueryColumn;
+    }
 }
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/statistics/StatisticsAutoCollector.java
 
b/fe/fe-core/src/main/java/org/apache/doris/statistics/StatisticsAutoCollector.java
index c498881bfbf..e0df94b5cb0 100644
--- 
a/fe/fe-core/src/main/java/org/apache/doris/statistics/StatisticsAutoCollector.java
+++ 
b/fe/fe-core/src/main/java/org/apache/doris/statistics/StatisticsAutoCollector.java
@@ -132,6 +132,7 @@ public class StatisticsAutoCollector extends 
StatisticsCollector {
         }
     }
 
+    // TODO: Need refactor, hard to understand now.
     protected boolean needAnalyzeColumn(TableIf table, String column) {
         AnalysisManager manager = Env.getServingEnv().getAnalysisManager();
         TableStatsMeta tableStatsStatus = 
manager.findTableStatsStatus(table.getId());
@@ -151,11 +152,17 @@ public class StatisticsAutoCollector extends 
StatisticsCollector {
             if (lastAnalyzeUpdateRows == 0 && currentUpdatedRows > 0) {
                 return true;
             }
+            if (lastAnalyzeUpdateRows > currentUpdatedRows) {
+                // Shouldn't happen. Just in case.
+                return true;
+            }
             OlapTable olapTable = (OlapTable) table;
+            long currentRowCount = olapTable.getRowCount();
+            long lastAnalyzeRowCount = columnStatsMeta.rowCount;
             if (tableStatsStatus.newPartitionLoaded.get() && 
olapTable.isPartitionColumn(column)) {
                 return true;
             }
-            if (columnStatsMeta.rowCount == 0 && olapTable.getRowCount() > 0) {
+            if (lastAnalyzeRowCount == 0 && currentRowCount > 0) {
                 return true;
             }
             if (currentUpdatedRows == lastAnalyzeUpdateRows) {
@@ -163,7 +170,17 @@ public class StatisticsAutoCollector extends 
StatisticsCollector {
             }
             double healthValue = ((double) (currentUpdatedRows - 
lastAnalyzeUpdateRows)
                     / (double) currentUpdatedRows) * 100.0;
-            LOG.info("Column " + column + " health value is " + healthValue);
+            LOG.info("Column " + column + " update rows health value is " + 
healthValue);
+            if (healthValue < StatisticsUtil.getTableStatsHealthThreshold()) {
+                return true;
+            }
+            if (currentRowCount == 0 && lastAnalyzeRowCount != 0) {
+                return true;
+            }
+            if (currentRowCount == 0 && lastAnalyzeRowCount == 0) {
+                return false;
+            }
+            healthValue = ((double) (currentRowCount - lastAnalyzeRowCount) / 
(double) currentRowCount) * 100.0;
             return healthValue < StatisticsUtil.getTableStatsHealthThreshold();
         } else {
             if (!(table instanceof HMSExternalTable)) {
diff --git a/gensrc/thrift/FrontendService.thrift 
b/gensrc/thrift/FrontendService.thrift
index c7bdbb4fc0e..2ddf52afd49 100644
--- a/gensrc/thrift/FrontendService.thrift
+++ b/gensrc/thrift/FrontendService.thrift
@@ -1422,6 +1422,18 @@ struct TReportCommitTxnResultRequest {
     4: optional binary payload
 }
 
+struct TQueryColumn {
+    1: optional i64 catalogId
+    2: optional i64 dbId
+    3: optional i64 tblId
+    4: optional string colName
+}
+
+struct TSyncQueryColumns {
+    1: optional list<TQueryColumn> highPriorityColumns;
+    2: optional list<TQueryColumn> midPriorityColumns;
+}
+
 service FrontendService {
     TGetDbsResult getDbNames(1: TGetDbsParams params)
     TGetTablesResult getTableNames(1: TGetTablesParams params)
@@ -1509,4 +1521,5 @@ service FrontendService {
 
     TShowProcessListResult showProcessList(1: TShowProcessListRequest request)
     Status.TStatus reportCommitTxnResult(1: TReportCommitTxnResultRequest 
request)
+    Status.TStatus syncQueryColumns(1: TSyncQueryColumns request)
 }


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org
For additional commands, e-mail: commits-h...@doris.apache.org

Reply via email to