This is an automated email from the ASF dual-hosted git repository. yiguolei pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/master by this push: new 2daa5f3fef [fix](statistics) Fix statistics related threads continuously spawn as doing checkpoint #16088 2daa5f3fef is described below commit 2daa5f3fef24d8324e723577de5e111e162ec094 Author: AKIRA <33112463+kikyou1...@users.noreply.github.com> AuthorDate: Sat Jan 21 07:58:33 2023 +0800 [fix](statistics) Fix statistics related threads continuously spawn as doing checkpoint #16088 --- .../main/java/org/apache/doris/common/Config.java | 2 +- .../src/main/java/org/apache/doris/catalog/Env.java | 5 +++-- .../doris/statistics/AnalysisTaskExecutor.java | 20 ++++++++++---------- .../doris/statistics/AnalysisTaskExecutorTest.java | 2 +- 4 files changed, 15 insertions(+), 14 deletions(-) diff --git a/fe/fe-common/src/main/java/org/apache/doris/common/Config.java b/fe/fe-common/src/main/java/org/apache/doris/common/Config.java index d9472377a3..706e3543f6 100644 --- a/fe/fe-common/src/main/java/org/apache/doris/common/Config.java +++ b/fe/fe-common/src/main/java/org/apache/doris/common/Config.java @@ -1850,7 +1850,7 @@ public class Config extends ConfigBase { * Used to determined how many statistics collection SQL could run simultaneously. */ @ConfField - public static int statistics_simultaneously_running_job_num = 10; + public static int statistics_simultaneously_running_task_num = 10; /** * Internal table replica num, once set, user should promise the avaible BE is greater than this value, diff --git a/fe/fe-core/src/main/java/org/apache/doris/catalog/Env.java b/fe/fe-core/src/main/java/org/apache/doris/catalog/Env.java index e3ac2ff6f2..0a238e9dbd 100755 --- a/fe/fe-core/src/main/java/org/apache/doris/catalog/Env.java +++ b/fe/fe-core/src/main/java/org/apache/doris/catalog/Env.java @@ -259,7 +259,6 @@ import java.util.Comparator; import java.util.HashMap; import java.util.List; import java.util.Map; -import java.util.Map.Entry; import java.util.Optional; import java.util.Set; import java.util.concurrent.BlockingQueue; @@ -639,7 +638,9 @@ public class Env { this.mtmvJobManager = new MTMVJobManager(); this.extMetaCacheMgr = new ExternalMetaCacheMgr(); this.fqdnManager = new FQDNManager(systemInfo); - this.analysisManager = new AnalysisManager(); + if (!isCheckpointCatalog) { + this.analysisManager = new AnalysisManager(); + } } public static void destroyCheckpoint() { diff --git a/fe/fe-core/src/main/java/org/apache/doris/statistics/AnalysisTaskExecutor.java b/fe/fe-core/src/main/java/org/apache/doris/statistics/AnalysisTaskExecutor.java index 783c73b6de..919185d287 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/statistics/AnalysisTaskExecutor.java +++ b/fe/fe-core/src/main/java/org/apache/doris/statistics/AnalysisTaskExecutor.java @@ -38,8 +38,8 @@ public class AnalysisTaskExecutor extends Thread { private static final Logger LOG = LogManager.getLogger(AnalysisTaskExecutor.class); private final ThreadPoolExecutor executors = ThreadPoolManager.newDaemonThreadPool( - Config.statistics_simultaneously_running_job_num, - Config.statistics_simultaneously_running_job_num, 0, + Config.statistics_simultaneously_running_task_num, + Config.statistics_simultaneously_running_task_num, 0, TimeUnit.DAYS, new LinkedBlockingQueue<>(), new BlockedPolicy("Analysis Job Executor", Integer.MAX_VALUE), "Analysis Job Executor", true); @@ -47,9 +47,9 @@ public class AnalysisTaskExecutor extends Thread { private final AnalysisTaskScheduler taskScheduler; private final BlockingCounter blockingCounter = - new BlockingCounter(Config.statistics_simultaneously_running_job_num); + new BlockingCounter(Config.statistics_simultaneously_running_task_num); - private final BlockingQueue<AnalysisTaskWrapper> jobQueue = + private final BlockingQueue<AnalysisTaskWrapper> taskQueue = new PriorityBlockingQueue<AnalysisTaskWrapper>(20, Comparator.comparingLong(AnalysisTaskWrapper::getStartTime)); @@ -60,11 +60,11 @@ public class AnalysisTaskExecutor extends Thread { @Override public void run() { fetchAndExecute(); - cancelExpiredJob(); + cancelExpiredTask(); } - private void cancelExpiredJob() { - String name = "Expired Analysis Job Killer"; + private void cancelExpiredTask() { + String name = "Expired Analysis Task Killer"; Thread t = new Thread(this::doCancelExpiredJob, name); t.setDaemon(true); t.start(); @@ -73,7 +73,7 @@ public class AnalysisTaskExecutor extends Thread { private void doCancelExpiredJob() { for (;;) { try { - AnalysisTaskWrapper taskWrapper = jobQueue.take(); + AnalysisTaskWrapper taskWrapper = taskQueue.take(); try { long timeout = StatisticConstants.STATISTICS_TASKS_TIMEOUT_IN_MS; taskWrapper.get(timeout < 0 ? 0 : timeout, TimeUnit.MILLISECONDS); @@ -95,7 +95,7 @@ public class AnalysisTaskExecutor extends Thread { LOG.warn(throwable); } } - }, "Analysis Job Submitter"); + }, "Analysis Task Submitter"); t.setDaemon(true); t.start(); } @@ -119,6 +119,6 @@ public class AnalysisTaskExecutor extends Thread { } public void putJob(AnalysisTaskWrapper wrapper) throws Exception { - jobQueue.put(wrapper); + taskQueue.put(wrapper); } } diff --git a/fe/fe-core/src/test/java/org/apache/doris/statistics/AnalysisTaskExecutorTest.java b/fe/fe-core/src/test/java/org/apache/doris/statistics/AnalysisTaskExecutorTest.java index ba75d26cec..8dfcff429b 100644 --- a/fe/fe-core/src/test/java/org/apache/doris/statistics/AnalysisTaskExecutorTest.java +++ b/fe/fe-core/src/test/java/org/apache/doris/statistics/AnalysisTaskExecutorTest.java @@ -74,7 +74,7 @@ public class AnalysisTaskExecutorTest extends TestWithFeService { }; AnalysisTaskExecutor analysisTaskExecutor = new AnalysisTaskExecutor(analysisTaskScheduler); - BlockingQueue<AnalysisTaskWrapper> b = Deencapsulation.getField(analysisTaskExecutor, "jobQueue"); + BlockingQueue<AnalysisTaskWrapper> b = Deencapsulation.getField(analysisTaskExecutor, "taskQueue"); AnalysisTaskWrapper analysisTaskWrapper = new AnalysisTaskWrapper(analysisTaskExecutor, analysisJob); Deencapsulation.setField(analysisTaskWrapper, "startTime", 5); b.put(analysisTaskWrapper); --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org For additional commands, e-mail: commits-h...@doris.apache.org