This is an automated email from the ASF dual-hosted git repository. morrysnow pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/master by this push: new 4c6458aa77 [enhancement](nereids) Execute sync analyze task with multi-thread (#22211) 4c6458aa77 is described below commit 4c6458aa778a94d27d15ee579803c9aec584fa31 Author: AKIRA <33112463+kikyou1...@users.noreply.github.com> AuthorDate: Mon Jul 31 15:05:07 2023 +0800 [enhancement](nereids) Execute sync analyze task with multi-thread (#22211) It was executed in sequentialy, which may cause a lot of time --- .../apache/doris/statistics/AnalysisManager.java | 64 +++++++++++++++------- 1 file changed, 45 insertions(+), 19 deletions(-) diff --git a/fe/fe-core/src/main/java/org/apache/doris/statistics/AnalysisManager.java b/fe/fe-core/src/main/java/org/apache/doris/statistics/AnalysisManager.java index cb6a3dfe5c..66f0b94aa8 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/statistics/AnalysisManager.java +++ b/fe/fe-core/src/main/java/org/apache/doris/statistics/AnalysisManager.java @@ -41,6 +41,7 @@ import org.apache.doris.common.AnalysisException; import org.apache.doris.common.Config; import org.apache.doris.common.DdlException; import org.apache.doris.common.FeConstants; +import org.apache.doris.common.ThreadPoolManager.BlockedPolicy; import org.apache.doris.common.io.Writable; import org.apache.doris.common.util.Daemon; import org.apache.doris.common.util.Util; @@ -59,6 +60,7 @@ import org.apache.doris.statistics.util.StatisticsUtil; import com.google.common.collect.ImmutableList; import com.google.common.collect.Maps; +import com.google.common.util.concurrent.ThreadFactoryBuilder; import org.apache.logging.log4j.LogManager; import org.apache.logging.log4j.Logger; import org.jetbrains.annotations.Nullable; @@ -82,6 +84,9 @@ import java.util.StringJoiner; import java.util.TreeMap; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ConcurrentMap; +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.SynchronousQueue; +import java.util.concurrent.ThreadPoolExecutor; import java.util.concurrent.TimeUnit; import java.util.function.Function; import java.util.function.Predicate; @@ -738,12 +743,23 @@ public class AnalysisManager extends Daemon implements Writable { ConnectContext ctx = ConnectContext.get(); try { ctxToSyncTask.put(ctx, syncTaskCollection); - syncTaskCollection.execute(); + ThreadPoolExecutor syncExecPool = createThreadPoolForSyncAnalyze(); + syncTaskCollection.execute(syncExecPool); } finally { ctxToSyncTask.remove(ctx); } } + private ThreadPoolExecutor createThreadPoolForSyncAnalyze() { + String poolName = "SYNC ANALYZE THREAD POOL"; + return new ThreadPoolExecutor(0, 64, + 0, TimeUnit.SECONDS, + new SynchronousQueue(), + new ThreadFactoryBuilder().setDaemon(true).setNameFormat("SYNC ANALYZE" + "-%d") + .build(), new BlockedPolicy(poolName, + (int) TimeUnit.HOURS.toSeconds(Config.analyze_task_timeout_in_hours))); + } + public void dropStats(DropStatsStmt dropStatsStmt) throws DdlException { if (dropStatsStmt.dropExpired) { Env.getCurrentEnv().getStatisticsCleaner().clear(); @@ -844,28 +860,38 @@ public class AnalysisManager extends Daemon implements Writable { tasks.forEach(BaseAnalysisTask::cancel); } - public void execute() { - List<String> colNames = new ArrayList<>(); - List<String> errorMessages = new ArrayList<>(); + public void execute(ThreadPoolExecutor executor) { + List<String> colNames = Collections.synchronizedList(new ArrayList<>()); + List<String> errorMessages = Collections.synchronizedList(new ArrayList<>()); + CountDownLatch countDownLatch = new CountDownLatch(tasks.size()); for (BaseAnalysisTask task : tasks) { - if (cancelled) { - colNames.add(task.info.colName); - errorMessages.add("Cancelled"); - continue; - } - try { - task.doExecute(); - updateSyncTaskStatus(task, AnalysisState.FINISHED); - } catch (Throwable t) { - colNames.add(task.info.colName); - errorMessages.add(Util.getRootCauseMessage(t)); - updateSyncTaskStatus(task, AnalysisState.FAILED); - LOG.warn("Failed to analyze, info: {}", task, t); - } + executor.submit(() -> { + try { + if (cancelled) { + return; + } + try { + task.doExecute(); + updateSyncTaskStatus(task, AnalysisState.FINISHED); + } catch (Throwable t) { + colNames.add(task.info.colName); + errorMessages.add(Util.getRootCauseMessage(t)); + updateSyncTaskStatus(task, AnalysisState.FAILED); + LOG.warn("Failed to analyze, info: {}", task, t); + } + } finally { + countDownLatch.countDown(); + } + }); + } + try { + countDownLatch.await(); + } catch (InterruptedException t) { + LOG.warn("Thread got interrupted when waiting sync analyze task execution finished", t); } if (!colNames.isEmpty()) { throw new RuntimeException("Failed to analyze following columns:[" + String.join(",", colNames) - + "] Reasons: " + String.join(",", errorMessages)); + + "] Reasons: " + String.join(",", errorMessages)); } } --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org For additional commands, e-mail: commits-h...@doris.apache.org