This is an automated email from the ASF dual-hosted git repository.

morrysnow pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/doris.git


The following commit(s) were added to refs/heads/master by this push:
     new dd99468b8f [fix](stats) Fix jdbc timeout with multiple FE when execute 
analyze table (#21115)
dd99468b8f is described below

commit dd99468b8f6bf8237c654bbd34fbe0eab27411ab
Author: AKIRA <33112463+kikyou1...@users.noreply.github.com>
AuthorDate: Sun Jun 25 16:49:36 2023 +0900

    [fix](stats) Fix jdbc timeout with multiple FE when execute analyze table 
(#21115)
    
    SQL may forward to master to execute when connecting to follower node, the 
result should be set to `StmtExecutor#proxyResultSet`
    
    Before this PR, in above scenario , submit analyze sql by  mysql 
client/jdbc whould return get malformed packet/ Communication failed.
---
 .../org/apache/doris/analysis/AnalyzeStmt.java     |  7 +++++-
 .../org/apache/doris/analysis/AnalyzeTblStmt.java  |  5 -----
 .../org/apache/doris/analysis/RedirectStatus.java  |  4 ++--
 .../main/java/org/apache/doris/catalog/Env.java    | 14 ------------
 .../main/java/org/apache/doris/qe/DdlExecutor.java |  6 ------
 .../java/org/apache/doris/qe/StmtExecutor.java     | 11 ++++++++++
 .../apache/doris/statistics/AnalysisManager.java   | 25 ++++++++++++++++------
 7 files changed, 38 insertions(+), 34 deletions(-)

diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/analysis/AnalyzeStmt.java 
b/fe/fe-core/src/main/java/org/apache/doris/analysis/AnalyzeStmt.java
index 202a870f12..6f1f7c64d8 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/analysis/AnalyzeStmt.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/analysis/AnalyzeStmt.java
@@ -25,7 +25,7 @@ import org.apache.doris.statistics.AnalysisInfo.ScheduleType;
 
 import java.util.Map;
 
-public class AnalyzeStmt extends DdlStmt {
+public class AnalyzeStmt extends StatementBase {
 
     protected AnalyzeProperties analyzeProperties;
 
@@ -81,4 +81,9 @@ public class AnalyzeStmt extends DdlStmt {
     public AnalyzeProperties getAnalyzeProperties() {
         return analyzeProperties;
     }
+
+    @Override
+    public RedirectStatus getRedirectStatus() {
+        return RedirectStatus.FORWARD_WITH_SYNC;
+    }
 }
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/analysis/AnalyzeTblStmt.java 
b/fe/fe-core/src/main/java/org/apache/doris/analysis/AnalyzeTblStmt.java
index 71d25c97f6..03681bdb36 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/analysis/AnalyzeTblStmt.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/analysis/AnalyzeTblStmt.java
@@ -252,11 +252,6 @@ public class AnalyzeTblStmt extends AnalyzeStmt {
         return table instanceof HMSExternalTable && 
table.getPartitionNames().size() > partNum;
     }
 
-    @Override
-    public RedirectStatus getRedirectStatus() {
-        return RedirectStatus.FORWARD_NO_SYNC;
-    }
-
     private void checkAnalyzePriv(String dbName, String tblName) throws 
AnalysisException {
         if (!Env.getCurrentEnv().getAccessManager()
                 .checkTblPriv(ConnectContext.get(), dbName, tblName, 
PrivPredicate.SELECT)) {
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/analysis/RedirectStatus.java 
b/fe/fe-core/src/main/java/org/apache/doris/analysis/RedirectStatus.java
index 478f9fdf07..628d5a3080 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/analysis/RedirectStatus.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/analysis/RedirectStatus.java
@@ -28,7 +28,7 @@ public class RedirectStatus {
 
     public RedirectStatus(boolean isForwardToMaster, boolean 
needToWaitJournalSync) {
         this.isForwardToMaster = isForwardToMaster;
-        this.needToWaitJournalSync  = needToWaitJournalSync;
+        this.needToWaitJournalSync = needToWaitJournalSync;
     }
 
     public boolean isForwardToMaster() {
@@ -47,7 +47,7 @@ public class RedirectStatus {
         this.needToWaitJournalSync = needToWaitJournalSync;
     }
 
-    public static RedirectStatus FORWARD_NO_SYNC  = new RedirectStatus(true, 
false);
+    public static RedirectStatus FORWARD_NO_SYNC = new RedirectStatus(true, 
false);
     public static RedirectStatus FORWARD_WITH_SYNC = new RedirectStatus(true, 
true);
     public static RedirectStatus NO_FORWARD =   new RedirectStatus(false, 
false);
 }
diff --git a/fe/fe-core/src/main/java/org/apache/doris/catalog/Env.java 
b/fe/fe-core/src/main/java/org/apache/doris/catalog/Env.java
index 90e4d06118..933b7714a6 100755
--- a/fe/fe-core/src/main/java/org/apache/doris/catalog/Env.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/catalog/Env.java
@@ -39,7 +39,6 @@ import org.apache.doris.analysis.AlterMaterializedViewStmt;
 import org.apache.doris.analysis.AlterSystemStmt;
 import org.apache.doris.analysis.AlterTableStmt;
 import org.apache.doris.analysis.AlterViewStmt;
-import org.apache.doris.analysis.AnalyzeTblStmt;
 import org.apache.doris.analysis.BackupStmt;
 import org.apache.doris.analysis.CancelAlterSystemStmt;
 import org.apache.doris.analysis.CancelAlterTableStmt;
@@ -211,7 +210,6 @@ import org.apache.doris.resource.Tag;
 import org.apache.doris.resource.workloadgroup.WorkloadGroupMgr;
 import org.apache.doris.service.FrontendOptions;
 import org.apache.doris.statistics.AnalysisManager;
-import org.apache.doris.statistics.AnalysisTaskScheduler;
 import org.apache.doris.statistics.StatisticsAutoAnalyzer;
 import org.apache.doris.statistics.StatisticsCache;
 import org.apache.doris.statistics.StatisticsCleaner;
@@ -5345,18 +5343,6 @@ public class Env {
         return count;
     }
 
-    public AnalysisTaskScheduler getAnalysisJobScheduler() {
-        return analysisManager.taskScheduler;
-    }
-
-    // TODO:
-    //  1. handle partition level analysis statement properly
-    //  2. support sample job
-    //  3. support period job
-    public void createAnalysisJob(AnalyzeTblStmt analyzeTblStmt) throws 
DdlException {
-        analysisManager.createAnalysisJob(analyzeTblStmt);
-    }
-
     public AnalysisManager getAnalysisManager() {
         return analysisManager;
     }
diff --git a/fe/fe-core/src/main/java/org/apache/doris/qe/DdlExecutor.java 
b/fe/fe-core/src/main/java/org/apache/doris/qe/DdlExecutor.java
index faff8fdfb4..15b7bcc883 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/qe/DdlExecutor.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/qe/DdlExecutor.java
@@ -43,8 +43,6 @@ import org.apache.doris.analysis.AlterTableStmt;
 import org.apache.doris.analysis.AlterUserStmt;
 import org.apache.doris.analysis.AlterViewStmt;
 import org.apache.doris.analysis.AlterWorkloadGroupStmt;
-import org.apache.doris.analysis.AnalyzeDBStmt;
-import org.apache.doris.analysis.AnalyzeTblStmt;
 import org.apache.doris.analysis.BackupStmt;
 import org.apache.doris.analysis.CancelAlterSystemStmt;
 import org.apache.doris.analysis.CancelAlterTableStmt;
@@ -297,8 +295,6 @@ public class DdlExecutor {
             env.getRefreshManager().handleRefreshTable((RefreshTableStmt) 
ddlStmt);
         } else if (ddlStmt instanceof RefreshDbStmt) {
             env.getRefreshManager().handleRefreshDb((RefreshDbStmt) ddlStmt);
-        } else if (ddlStmt instanceof AnalyzeTblStmt) {
-            env.createAnalysisJob((AnalyzeTblStmt) ddlStmt);
         } else if (ddlStmt instanceof AlterResourceStmt) {
             env.getResourceMgr().alterResource((AlterResourceStmt) ddlStmt);
         } else if (ddlStmt instanceof AlterWorkloadGroupStmt) {
@@ -337,8 +333,6 @@ public class DdlExecutor {
             env.getAnalysisManager().dropStats((DropStatsStmt) ddlStmt);
         } else if (ddlStmt instanceof KillAnalysisJobStmt) {
             
env.getAnalysisManager().handleKillAnalyzeStmt((KillAnalysisJobStmt) ddlStmt);
-        } else if (ddlStmt instanceof AnalyzeDBStmt) {
-            env.getAnalysisManager().createAnalysisJobs((AnalyzeDBStmt) 
ddlStmt);
         } else if (ddlStmt instanceof CleanQueryStatsStmt) {
             CleanQueryStatsStmt stmt = (CleanQueryStatsStmt) ddlStmt;
             CleanQueryStatsInfo cleanQueryStatsInfo = null;
diff --git a/fe/fe-core/src/main/java/org/apache/doris/qe/StmtExecutor.java 
b/fe/fe-core/src/main/java/org/apache/doris/qe/StmtExecutor.java
index ed68e03889..9195738b75 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/qe/StmtExecutor.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/qe/StmtExecutor.java
@@ -736,6 +736,8 @@ public class StmtExecutor {
                 handleLockTablesStmt();
             } else if (parsedStmt instanceof UnsupportedStmt) {
                 handleUnsupportedStmt();
+            } else if (parsedStmt instanceof AnalyzeStmt) {
+                handleAnalyzeStmt();
             } else {
                 context.getState().setError(ErrorCode.ERR_NOT_SUPPORTED_YET, 
"Do not support this query.");
             }
@@ -1899,6 +1901,10 @@ public class StmtExecutor {
         context.getState().setOk();
     }
 
+    private void handleAnalyzeStmt() throws DdlException {
+        context.env.getAnalysisManager().createAnalyze((AnalyzeStmt) 
parsedStmt, isProxy);
+    }
+
     // Process switch catalog
     private void handleSwitchStmt() throws AnalysisException {
         SwitchStmt switchStmt = (SwitchStmt) parsedStmt;
@@ -2537,5 +2543,10 @@ public class StmtExecutor {
     public void setProfileType(ProfileType profileType) {
         this.profileType = profileType;
     }
+
+
+    public void setProxyResultSet(ShowResultSet proxyResultSet) {
+        this.proxyResultSet = proxyResultSet;
+    }
 }
 
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/statistics/AnalysisManager.java 
b/fe/fe-core/src/main/java/org/apache/doris/statistics/AnalysisManager.java
index fc29affadb..c4443e5733 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/statistics/AnalysisManager.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/statistics/AnalysisManager.java
@@ -18,6 +18,7 @@
 package org.apache.doris.statistics;
 
 import org.apache.doris.analysis.AnalyzeDBStmt;
+import org.apache.doris.analysis.AnalyzeStmt;
 import org.apache.doris.analysis.AnalyzeTblStmt;
 import org.apache.doris.analysis.DropAnalyzeJobStmt;
 import org.apache.doris.analysis.DropStatsStmt;
@@ -151,7 +152,15 @@ public class AnalysisManager extends Daemon implements 
Writable {
         return statisticsCache;
     }
 
-    public void createAnalysisJobs(AnalyzeDBStmt analyzeDBStmt) throws 
DdlException {
+    public void createAnalyze(AnalyzeStmt analyzeStmt, boolean proxy) throws 
DdlException {
+        if (analyzeStmt instanceof AnalyzeDBStmt) {
+            createAnalysisJobs((AnalyzeDBStmt) analyzeStmt, proxy);
+        } else if (analyzeStmt instanceof AnalyzeTblStmt) {
+            createAnalysisJob((AnalyzeTblStmt) analyzeStmt, proxy);
+        }
+    }
+
+    public void createAnalysisJobs(AnalyzeDBStmt analyzeDBStmt, boolean proxy) 
throws DdlException {
         DatabaseIf<TableIf> db = analyzeDBStmt.getDb();
         List<TableIf> tbls = db.getTables();
         List<AnalysisInfo> analysisInfos = new ArrayList<>();
@@ -179,7 +188,7 @@ public class AnalysisManager extends Daemon implements 
Writable {
                 analysisInfos.add(buildAndAssignJob(analyzeTblStmt));
             }
             if (!analyzeDBStmt.isSync()) {
-                sendJobId(analysisInfos);
+                sendJobId(analysisInfos, proxy);
             }
         } finally {
             db.readUnlock();
@@ -188,12 +197,12 @@ public class AnalysisManager extends Daemon implements 
Writable {
     }
 
     // Each analyze stmt corresponding to an analysis job.
-    public void createAnalysisJob(AnalyzeTblStmt stmt) throws DdlException {
+    public void createAnalysisJob(AnalyzeTblStmt stmt, boolean proxy) throws 
DdlException {
         AnalysisInfo jobInfo = buildAndAssignJob(stmt);
         if (jobInfo == null) {
             return;
         }
-        sendJobId(ImmutableList.of(jobInfo));
+        sendJobId(ImmutableList.of(jobInfo), proxy);
     }
 
     @Nullable
@@ -259,7 +268,7 @@ public class AnalysisManager extends Daemon implements 
Writable {
         analysisTaskInfos.values().forEach(taskScheduler::schedule);
     }
 
-    private void sendJobId(List<AnalysisInfo> analysisInfos) {
+    private void sendJobId(List<AnalysisInfo> analysisInfos, boolean proxy) {
         List<Column> columns = new ArrayList<>();
         columns.add(new Column("Catalog_Name", 
ScalarType.createVarchar(1024)));
         columns.add(new Column("DB_Name", ScalarType.createVarchar(1024)));
@@ -279,7 +288,11 @@ public class AnalysisManager extends Daemon implements 
Writable {
         }
         ShowResultSet commonResultSet = new 
ShowResultSet(commonResultSetMetaData, resultRows);
         try {
-            ConnectContext.get().getExecutor().sendResultSet(commonResultSet);
+            if (!proxy) {
+                
ConnectContext.get().getExecutor().sendResultSet(commonResultSet);
+            } else {
+                
ConnectContext.get().getExecutor().setProxyResultSet(commonResultSet);
+            }
         } catch (Throwable t) {
             LOG.warn("Failed to send job id to user", t);
         }


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org
For additional commands, e-mail: commits-h...@doris.apache.org

Reply via email to