This is an automated email from the ASF dual-hosted git repository. morrysnow pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/master by this push: new dd99468b8f [fix](stats) Fix jdbc timeout with multiple FE when execute analyze table (#21115) dd99468b8f is described below commit dd99468b8f6bf8237c654bbd34fbe0eab27411ab Author: AKIRA <33112463+kikyou1...@users.noreply.github.com> AuthorDate: Sun Jun 25 16:49:36 2023 +0900 [fix](stats) Fix jdbc timeout with multiple FE when execute analyze table (#21115) SQL may forward to master to execute when connecting to follower node, the result should be set to `StmtExecutor#proxyResultSet` Before this PR, in above scenario , submit analyze sql by mysql client/jdbc whould return get malformed packet/ Communication failed. --- .../org/apache/doris/analysis/AnalyzeStmt.java | 7 +++++- .../org/apache/doris/analysis/AnalyzeTblStmt.java | 5 ----- .../org/apache/doris/analysis/RedirectStatus.java | 4 ++-- .../main/java/org/apache/doris/catalog/Env.java | 14 ------------ .../main/java/org/apache/doris/qe/DdlExecutor.java | 6 ------ .../java/org/apache/doris/qe/StmtExecutor.java | 11 ++++++++++ .../apache/doris/statistics/AnalysisManager.java | 25 ++++++++++++++++------ 7 files changed, 38 insertions(+), 34 deletions(-) diff --git a/fe/fe-core/src/main/java/org/apache/doris/analysis/AnalyzeStmt.java b/fe/fe-core/src/main/java/org/apache/doris/analysis/AnalyzeStmt.java index 202a870f12..6f1f7c64d8 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/analysis/AnalyzeStmt.java +++ b/fe/fe-core/src/main/java/org/apache/doris/analysis/AnalyzeStmt.java @@ -25,7 +25,7 @@ import org.apache.doris.statistics.AnalysisInfo.ScheduleType; import java.util.Map; -public class AnalyzeStmt extends DdlStmt { +public class AnalyzeStmt extends StatementBase { protected AnalyzeProperties analyzeProperties; @@ -81,4 +81,9 @@ public class AnalyzeStmt extends DdlStmt { public AnalyzeProperties getAnalyzeProperties() { return analyzeProperties; } + + @Override + public RedirectStatus getRedirectStatus() { + return RedirectStatus.FORWARD_WITH_SYNC; + } } diff --git a/fe/fe-core/src/main/java/org/apache/doris/analysis/AnalyzeTblStmt.java b/fe/fe-core/src/main/java/org/apache/doris/analysis/AnalyzeTblStmt.java index 71d25c97f6..03681bdb36 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/analysis/AnalyzeTblStmt.java +++ b/fe/fe-core/src/main/java/org/apache/doris/analysis/AnalyzeTblStmt.java @@ -252,11 +252,6 @@ public class AnalyzeTblStmt extends AnalyzeStmt { return table instanceof HMSExternalTable && table.getPartitionNames().size() > partNum; } - @Override - public RedirectStatus getRedirectStatus() { - return RedirectStatus.FORWARD_NO_SYNC; - } - private void checkAnalyzePriv(String dbName, String tblName) throws AnalysisException { if (!Env.getCurrentEnv().getAccessManager() .checkTblPriv(ConnectContext.get(), dbName, tblName, PrivPredicate.SELECT)) { diff --git a/fe/fe-core/src/main/java/org/apache/doris/analysis/RedirectStatus.java b/fe/fe-core/src/main/java/org/apache/doris/analysis/RedirectStatus.java index 478f9fdf07..628d5a3080 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/analysis/RedirectStatus.java +++ b/fe/fe-core/src/main/java/org/apache/doris/analysis/RedirectStatus.java @@ -28,7 +28,7 @@ public class RedirectStatus { public RedirectStatus(boolean isForwardToMaster, boolean needToWaitJournalSync) { this.isForwardToMaster = isForwardToMaster; - this.needToWaitJournalSync = needToWaitJournalSync; + this.needToWaitJournalSync = needToWaitJournalSync; } public boolean isForwardToMaster() { @@ -47,7 +47,7 @@ public class RedirectStatus { this.needToWaitJournalSync = needToWaitJournalSync; } - public static RedirectStatus FORWARD_NO_SYNC = new RedirectStatus(true, false); + public static RedirectStatus FORWARD_NO_SYNC = new RedirectStatus(true, false); public static RedirectStatus FORWARD_WITH_SYNC = new RedirectStatus(true, true); public static RedirectStatus NO_FORWARD = new RedirectStatus(false, false); } diff --git a/fe/fe-core/src/main/java/org/apache/doris/catalog/Env.java b/fe/fe-core/src/main/java/org/apache/doris/catalog/Env.java index 90e4d06118..933b7714a6 100755 --- a/fe/fe-core/src/main/java/org/apache/doris/catalog/Env.java +++ b/fe/fe-core/src/main/java/org/apache/doris/catalog/Env.java @@ -39,7 +39,6 @@ import org.apache.doris.analysis.AlterMaterializedViewStmt; import org.apache.doris.analysis.AlterSystemStmt; import org.apache.doris.analysis.AlterTableStmt; import org.apache.doris.analysis.AlterViewStmt; -import org.apache.doris.analysis.AnalyzeTblStmt; import org.apache.doris.analysis.BackupStmt; import org.apache.doris.analysis.CancelAlterSystemStmt; import org.apache.doris.analysis.CancelAlterTableStmt; @@ -211,7 +210,6 @@ import org.apache.doris.resource.Tag; import org.apache.doris.resource.workloadgroup.WorkloadGroupMgr; import org.apache.doris.service.FrontendOptions; import org.apache.doris.statistics.AnalysisManager; -import org.apache.doris.statistics.AnalysisTaskScheduler; import org.apache.doris.statistics.StatisticsAutoAnalyzer; import org.apache.doris.statistics.StatisticsCache; import org.apache.doris.statistics.StatisticsCleaner; @@ -5345,18 +5343,6 @@ public class Env { return count; } - public AnalysisTaskScheduler getAnalysisJobScheduler() { - return analysisManager.taskScheduler; - } - - // TODO: - // 1. handle partition level analysis statement properly - // 2. support sample job - // 3. support period job - public void createAnalysisJob(AnalyzeTblStmt analyzeTblStmt) throws DdlException { - analysisManager.createAnalysisJob(analyzeTblStmt); - } - public AnalysisManager getAnalysisManager() { return analysisManager; } diff --git a/fe/fe-core/src/main/java/org/apache/doris/qe/DdlExecutor.java b/fe/fe-core/src/main/java/org/apache/doris/qe/DdlExecutor.java index faff8fdfb4..15b7bcc883 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/qe/DdlExecutor.java +++ b/fe/fe-core/src/main/java/org/apache/doris/qe/DdlExecutor.java @@ -43,8 +43,6 @@ import org.apache.doris.analysis.AlterTableStmt; import org.apache.doris.analysis.AlterUserStmt; import org.apache.doris.analysis.AlterViewStmt; import org.apache.doris.analysis.AlterWorkloadGroupStmt; -import org.apache.doris.analysis.AnalyzeDBStmt; -import org.apache.doris.analysis.AnalyzeTblStmt; import org.apache.doris.analysis.BackupStmt; import org.apache.doris.analysis.CancelAlterSystemStmt; import org.apache.doris.analysis.CancelAlterTableStmt; @@ -297,8 +295,6 @@ public class DdlExecutor { env.getRefreshManager().handleRefreshTable((RefreshTableStmt) ddlStmt); } else if (ddlStmt instanceof RefreshDbStmt) { env.getRefreshManager().handleRefreshDb((RefreshDbStmt) ddlStmt); - } else if (ddlStmt instanceof AnalyzeTblStmt) { - env.createAnalysisJob((AnalyzeTblStmt) ddlStmt); } else if (ddlStmt instanceof AlterResourceStmt) { env.getResourceMgr().alterResource((AlterResourceStmt) ddlStmt); } else if (ddlStmt instanceof AlterWorkloadGroupStmt) { @@ -337,8 +333,6 @@ public class DdlExecutor { env.getAnalysisManager().dropStats((DropStatsStmt) ddlStmt); } else if (ddlStmt instanceof KillAnalysisJobStmt) { env.getAnalysisManager().handleKillAnalyzeStmt((KillAnalysisJobStmt) ddlStmt); - } else if (ddlStmt instanceof AnalyzeDBStmt) { - env.getAnalysisManager().createAnalysisJobs((AnalyzeDBStmt) ddlStmt); } else if (ddlStmt instanceof CleanQueryStatsStmt) { CleanQueryStatsStmt stmt = (CleanQueryStatsStmt) ddlStmt; CleanQueryStatsInfo cleanQueryStatsInfo = null; diff --git a/fe/fe-core/src/main/java/org/apache/doris/qe/StmtExecutor.java b/fe/fe-core/src/main/java/org/apache/doris/qe/StmtExecutor.java index ed68e03889..9195738b75 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/qe/StmtExecutor.java +++ b/fe/fe-core/src/main/java/org/apache/doris/qe/StmtExecutor.java @@ -736,6 +736,8 @@ public class StmtExecutor { handleLockTablesStmt(); } else if (parsedStmt instanceof UnsupportedStmt) { handleUnsupportedStmt(); + } else if (parsedStmt instanceof AnalyzeStmt) { + handleAnalyzeStmt(); } else { context.getState().setError(ErrorCode.ERR_NOT_SUPPORTED_YET, "Do not support this query."); } @@ -1899,6 +1901,10 @@ public class StmtExecutor { context.getState().setOk(); } + private void handleAnalyzeStmt() throws DdlException { + context.env.getAnalysisManager().createAnalyze((AnalyzeStmt) parsedStmt, isProxy); + } + // Process switch catalog private void handleSwitchStmt() throws AnalysisException { SwitchStmt switchStmt = (SwitchStmt) parsedStmt; @@ -2537,5 +2543,10 @@ public class StmtExecutor { public void setProfileType(ProfileType profileType) { this.profileType = profileType; } + + + public void setProxyResultSet(ShowResultSet proxyResultSet) { + this.proxyResultSet = proxyResultSet; + } } diff --git a/fe/fe-core/src/main/java/org/apache/doris/statistics/AnalysisManager.java b/fe/fe-core/src/main/java/org/apache/doris/statistics/AnalysisManager.java index fc29affadb..c4443e5733 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/statistics/AnalysisManager.java +++ b/fe/fe-core/src/main/java/org/apache/doris/statistics/AnalysisManager.java @@ -18,6 +18,7 @@ package org.apache.doris.statistics; import org.apache.doris.analysis.AnalyzeDBStmt; +import org.apache.doris.analysis.AnalyzeStmt; import org.apache.doris.analysis.AnalyzeTblStmt; import org.apache.doris.analysis.DropAnalyzeJobStmt; import org.apache.doris.analysis.DropStatsStmt; @@ -151,7 +152,15 @@ public class AnalysisManager extends Daemon implements Writable { return statisticsCache; } - public void createAnalysisJobs(AnalyzeDBStmt analyzeDBStmt) throws DdlException { + public void createAnalyze(AnalyzeStmt analyzeStmt, boolean proxy) throws DdlException { + if (analyzeStmt instanceof AnalyzeDBStmt) { + createAnalysisJobs((AnalyzeDBStmt) analyzeStmt, proxy); + } else if (analyzeStmt instanceof AnalyzeTblStmt) { + createAnalysisJob((AnalyzeTblStmt) analyzeStmt, proxy); + } + } + + public void createAnalysisJobs(AnalyzeDBStmt analyzeDBStmt, boolean proxy) throws DdlException { DatabaseIf<TableIf> db = analyzeDBStmt.getDb(); List<TableIf> tbls = db.getTables(); List<AnalysisInfo> analysisInfos = new ArrayList<>(); @@ -179,7 +188,7 @@ public class AnalysisManager extends Daemon implements Writable { analysisInfos.add(buildAndAssignJob(analyzeTblStmt)); } if (!analyzeDBStmt.isSync()) { - sendJobId(analysisInfos); + sendJobId(analysisInfos, proxy); } } finally { db.readUnlock(); @@ -188,12 +197,12 @@ public class AnalysisManager extends Daemon implements Writable { } // Each analyze stmt corresponding to an analysis job. - public void createAnalysisJob(AnalyzeTblStmt stmt) throws DdlException { + public void createAnalysisJob(AnalyzeTblStmt stmt, boolean proxy) throws DdlException { AnalysisInfo jobInfo = buildAndAssignJob(stmt); if (jobInfo == null) { return; } - sendJobId(ImmutableList.of(jobInfo)); + sendJobId(ImmutableList.of(jobInfo), proxy); } @Nullable @@ -259,7 +268,7 @@ public class AnalysisManager extends Daemon implements Writable { analysisTaskInfos.values().forEach(taskScheduler::schedule); } - private void sendJobId(List<AnalysisInfo> analysisInfos) { + private void sendJobId(List<AnalysisInfo> analysisInfos, boolean proxy) { List<Column> columns = new ArrayList<>(); columns.add(new Column("Catalog_Name", ScalarType.createVarchar(1024))); columns.add(new Column("DB_Name", ScalarType.createVarchar(1024))); @@ -279,7 +288,11 @@ public class AnalysisManager extends Daemon implements Writable { } ShowResultSet commonResultSet = new ShowResultSet(commonResultSetMetaData, resultRows); try { - ConnectContext.get().getExecutor().sendResultSet(commonResultSet); + if (!proxy) { + ConnectContext.get().getExecutor().sendResultSet(commonResultSet); + } else { + ConnectContext.get().getExecutor().setProxyResultSet(commonResultSet); + } } catch (Throwable t) { LOG.warn("Failed to send job id to user", t); } --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org For additional commands, e-mail: commits-h...@doris.apache.org