This is an automated email from the ASF dual-hosted git repository.

morrysnow pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/doris.git


The following commit(s) were added to refs/heads/master by this push:
     new 4c6458aa77 [enhancement](nereids) Execute sync analyze task with 
multi-thread (#22211)
4c6458aa77 is described below

commit 4c6458aa778a94d27d15ee579803c9aec584fa31
Author: AKIRA <33112463+kikyou1...@users.noreply.github.com>
AuthorDate: Mon Jul 31 15:05:07 2023 +0800

    [enhancement](nereids) Execute sync analyze task with multi-thread (#22211)
    
    It was executed in sequentialy, which may cause a lot of time
---
 .../apache/doris/statistics/AnalysisManager.java   | 64 +++++++++++++++-------
 1 file changed, 45 insertions(+), 19 deletions(-)

diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/statistics/AnalysisManager.java 
b/fe/fe-core/src/main/java/org/apache/doris/statistics/AnalysisManager.java
index cb6a3dfe5c..66f0b94aa8 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/statistics/AnalysisManager.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/statistics/AnalysisManager.java
@@ -41,6 +41,7 @@ import org.apache.doris.common.AnalysisException;
 import org.apache.doris.common.Config;
 import org.apache.doris.common.DdlException;
 import org.apache.doris.common.FeConstants;
+import org.apache.doris.common.ThreadPoolManager.BlockedPolicy;
 import org.apache.doris.common.io.Writable;
 import org.apache.doris.common.util.Daemon;
 import org.apache.doris.common.util.Util;
@@ -59,6 +60,7 @@ import org.apache.doris.statistics.util.StatisticsUtil;
 
 import com.google.common.collect.ImmutableList;
 import com.google.common.collect.Maps;
+import com.google.common.util.concurrent.ThreadFactoryBuilder;
 import org.apache.logging.log4j.LogManager;
 import org.apache.logging.log4j.Logger;
 import org.jetbrains.annotations.Nullable;
@@ -82,6 +84,9 @@ import java.util.StringJoiner;
 import java.util.TreeMap;
 import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.ConcurrentMap;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.SynchronousQueue;
+import java.util.concurrent.ThreadPoolExecutor;
 import java.util.concurrent.TimeUnit;
 import java.util.function.Function;
 import java.util.function.Predicate;
@@ -738,12 +743,23 @@ public class AnalysisManager extends Daemon implements 
Writable {
         ConnectContext ctx = ConnectContext.get();
         try {
             ctxToSyncTask.put(ctx, syncTaskCollection);
-            syncTaskCollection.execute();
+            ThreadPoolExecutor syncExecPool = createThreadPoolForSyncAnalyze();
+            syncTaskCollection.execute(syncExecPool);
         } finally {
             ctxToSyncTask.remove(ctx);
         }
     }
 
+    private ThreadPoolExecutor createThreadPoolForSyncAnalyze() {
+        String poolName = "SYNC ANALYZE THREAD POOL";
+        return new ThreadPoolExecutor(0, 64,
+                0, TimeUnit.SECONDS,
+                new SynchronousQueue(),
+                new ThreadFactoryBuilder().setDaemon(true).setNameFormat("SYNC 
ANALYZE" + "-%d")
+                        .build(), new BlockedPolicy(poolName,
+                (int) 
TimeUnit.HOURS.toSeconds(Config.analyze_task_timeout_in_hours)));
+    }
+
     public void dropStats(DropStatsStmt dropStatsStmt) throws DdlException {
         if (dropStatsStmt.dropExpired) {
             Env.getCurrentEnv().getStatisticsCleaner().clear();
@@ -844,28 +860,38 @@ public class AnalysisManager extends Daemon implements 
Writable {
             tasks.forEach(BaseAnalysisTask::cancel);
         }
 
-        public void execute() {
-            List<String> colNames = new ArrayList<>();
-            List<String> errorMessages = new ArrayList<>();
+        public void execute(ThreadPoolExecutor executor) {
+            List<String> colNames = Collections.synchronizedList(new 
ArrayList<>());
+            List<String> errorMessages = Collections.synchronizedList(new 
ArrayList<>());
+            CountDownLatch countDownLatch = new CountDownLatch(tasks.size());
             for (BaseAnalysisTask task : tasks) {
-                if (cancelled) {
-                    colNames.add(task.info.colName);
-                    errorMessages.add("Cancelled");
-                    continue;
-                }
-                try {
-                    task.doExecute();
-                    updateSyncTaskStatus(task, AnalysisState.FINISHED);
-                } catch (Throwable t) {
-                    colNames.add(task.info.colName);
-                    errorMessages.add(Util.getRootCauseMessage(t));
-                    updateSyncTaskStatus(task, AnalysisState.FAILED);
-                    LOG.warn("Failed to analyze, info: {}", task, t);
-                }
+                executor.submit(() -> {
+                    try {
+                        if (cancelled) {
+                            return;
+                        }
+                        try {
+                            task.doExecute();
+                            updateSyncTaskStatus(task, AnalysisState.FINISHED);
+                        } catch (Throwable t) {
+                            colNames.add(task.info.colName);
+                            errorMessages.add(Util.getRootCauseMessage(t));
+                            updateSyncTaskStatus(task, AnalysisState.FAILED);
+                            LOG.warn("Failed to analyze, info: {}", task, t);
+                        }
+                    } finally {
+                        countDownLatch.countDown();
+                    }
+                });
+            }
+            try {
+                countDownLatch.await();
+            } catch (InterruptedException t) {
+                LOG.warn("Thread got interrupted when waiting sync analyze 
task execution finished", t);
             }
             if (!colNames.isEmpty()) {
                 throw new RuntimeException("Failed to analyze following 
columns:[" + String.join(",", colNames)
-                    + "] Reasons: " + String.join(",", errorMessages));
+                        + "] Reasons: " + String.join(",", errorMessages));
             }
         }
 


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org
For additional commands, e-mail: commits-h...@doris.apache.org

Reply via email to