This is an automated email from the ASF dual-hosted git repository.

ltamber pushed a commit to branch develop
in repository https://gitbox.apache.org/repos/asf/rocketmq.git


The following commit(s) were added to refs/heads/develop by this push:
     new 35c88e3094 [ISSUE #10225] Add RejectedExecutionHandler support for 
ThreadPoolMonitor (#10222)
35c88e3094 is described below

commit 35c88e3094b54f5a14d86b1d649d4c5a1fe47428
Author: ltamber <[email protected]>
AuthorDate: Mon Mar 30 09:40:11 2026 +0800

    [ISSUE #10225] Add RejectedExecutionHandler support for ThreadPoolMonitor 
(#10222)
---
 .../rocketmq/common/thread/ThreadPoolMonitor.java  | 101 ++++++++++++++-------
 1 file changed, 69 insertions(+), 32 deletions(-)

diff --git 
a/common/src/main/java/org/apache/rocketmq/common/thread/ThreadPoolMonitor.java 
b/common/src/main/java/org/apache/rocketmq/common/thread/ThreadPoolMonitor.java
index 746128d296..a79674568b 100644
--- 
a/common/src/main/java/org/apache/rocketmq/common/thread/ThreadPoolMonitor.java
+++ 
b/common/src/main/java/org/apache/rocketmq/common/thread/ThreadPoolMonitor.java
@@ -19,13 +19,16 @@ package org.apache.rocketmq.common.thread;
 
 import com.google.common.collect.Lists;
 import com.google.common.util.concurrent.ThreadFactoryBuilder;
+
 import java.util.Collections;
 import java.util.List;
 import java.util.concurrent.CopyOnWriteArrayList;
 import java.util.concurrent.LinkedBlockingQueue;
+import java.util.concurrent.RejectedExecutionHandler;
 import java.util.concurrent.ScheduledExecutorService;
 import java.util.concurrent.ThreadPoolExecutor;
 import java.util.concurrent.TimeUnit;
+
 import org.apache.rocketmq.common.UtilAll;
 import org.apache.rocketmq.common.utils.ThreadUtils;
 import org.apache.rocketmq.logging.org.slf4j.Logger;
@@ -37,7 +40,7 @@ public class ThreadPoolMonitor {
 
     private static final List<ThreadPoolWrapper> MONITOR_EXECUTOR = new 
CopyOnWriteArrayList<>();
     private static final ScheduledExecutorService MONITOR_SCHEDULED = 
ThreadUtils.newSingleThreadScheduledExecutor(
-        new 
ThreadFactoryBuilder().setNameFormat("ThreadPoolMonitor-%d").build()
+            new 
ThreadFactoryBuilder().setNameFormat("ThreadPoolMonitor-%d").build()
     );
 
     private static volatile long threadPoolStatusPeriodTime = 
TimeUnit.SECONDS.toMillis(3);
@@ -55,48 +58,82 @@ public class ThreadPoolMonitor {
     }
 
     public static ThreadPoolExecutor createAndMonitor(int corePoolSize,
-        int maximumPoolSize,
-        long keepAliveTime,
-        TimeUnit unit,
-        String name,
-        int queueCapacity) {
+                                                      int maximumPoolSize,
+                                                      long keepAliveTime,
+                                                      TimeUnit unit,
+                                                      String name,
+                                                      int queueCapacity) {
         return createAndMonitor(corePoolSize, maximumPoolSize, keepAliveTime, 
unit, name, queueCapacity, Collections.emptyList());
     }
 
     public static ThreadPoolExecutor createAndMonitor(int corePoolSize,
-        int maximumPoolSize,
-        long keepAliveTime,
-        TimeUnit unit,
-        String name,
-        int queueCapacity,
-        ThreadPoolStatusMonitor... threadPoolStatusMonitors) {
+                                                      int maximumPoolSize,
+                                                      long keepAliveTime,
+                                                      TimeUnit unit,
+                                                      String name,
+                                                      int queueCapacity,
+                                                      RejectedExecutionHandler 
handler) {
+        return createAndMonitor(corePoolSize, maximumPoolSize, keepAliveTime, 
unit, name, queueCapacity, handler, Collections.emptyList());
+    }
+
+    public static ThreadPoolExecutor createAndMonitor(int corePoolSize,
+                                                      int maximumPoolSize,
+                                                      long keepAliveTime,
+                                                      TimeUnit unit,
+                                                      String name,
+                                                      int queueCapacity,
+                                                      
ThreadPoolStatusMonitor... threadPoolStatusMonitors) {
+        return createAndMonitor(corePoolSize, maximumPoolSize, keepAliveTime, 
unit, name, queueCapacity,
+                Lists.newArrayList(threadPoolStatusMonitors));
+    }
+
+    public static ThreadPoolExecutor createAndMonitor(int corePoolSize,
+                                                      int maximumPoolSize,
+                                                      long keepAliveTime,
+                                                      TimeUnit unit,
+                                                      String name,
+                                                      int queueCapacity,
+                                                      RejectedExecutionHandler 
handler,
+                                                      
ThreadPoolStatusMonitor... threadPoolStatusMonitors) {
+        return createAndMonitor(corePoolSize, maximumPoolSize, keepAliveTime, 
unit, name, queueCapacity, handler,
+                Lists.newArrayList(threadPoolStatusMonitors));
+    }
+
+    public static ThreadPoolExecutor createAndMonitor(int corePoolSize,
+                                                      int maximumPoolSize,
+                                                      long keepAliveTime,
+                                                      TimeUnit unit,
+                                                      String name,
+                                                      int queueCapacity,
+                                                      
List<ThreadPoolStatusMonitor> threadPoolStatusMonitors) {
         return createAndMonitor(corePoolSize, maximumPoolSize, keepAliveTime, 
unit, name, queueCapacity,
-            Lists.newArrayList(threadPoolStatusMonitors));
+                new ThreadPoolExecutor.DiscardOldestPolicy(), 
threadPoolStatusMonitors);
     }
 
     public static ThreadPoolExecutor createAndMonitor(int corePoolSize,
-        int maximumPoolSize,
-        long keepAliveTime,
-        TimeUnit unit,
-        String name,
-        int queueCapacity,
-        List<ThreadPoolStatusMonitor> threadPoolStatusMonitors) {
+                                                      int maximumPoolSize,
+                                                      long keepAliveTime,
+                                                      TimeUnit unit,
+                                                      String name,
+                                                      int queueCapacity,
+                                                      RejectedExecutionHandler 
handler,
+                                                      
List<ThreadPoolStatusMonitor> threadPoolStatusMonitors) {
         ThreadPoolExecutor executor = (ThreadPoolExecutor) 
ThreadUtils.newThreadPoolExecutor(
-            corePoolSize,
-            maximumPoolSize,
-            keepAliveTime,
-            unit,
-            new LinkedBlockingQueue<>(queueCapacity),
-            new ThreadFactoryBuilder().setNameFormat(name + "-%d").build(),
-            new ThreadPoolExecutor.DiscardOldestPolicy());
+                corePoolSize,
+                maximumPoolSize,
+                keepAliveTime,
+                unit,
+                new LinkedBlockingQueue<>(queueCapacity),
+                new ThreadFactoryBuilder().setNameFormat(name + "-%d").build(),
+                handler);
         List<ThreadPoolStatusMonitor> printers = Lists.newArrayList(new 
ThreadPoolQueueSizeMonitor(queueCapacity));
         printers.addAll(threadPoolStatusMonitors);
 
         MONITOR_EXECUTOR.add(ThreadPoolWrapper.builder()
-            .name(name)
-            .threadPoolExecutor(executor)
-            .statusPrinters(printers)
-            .build());
+                .name(name)
+                .threadPoolExecutor(executor)
+                .statusPrinters(printers)
+                .build());
         return executor;
     }
 
@@ -110,7 +147,7 @@ public class ThreadPoolMonitor {
                 waterMarkLogger.info("{}{}{}", nameFormatted, descFormatted, 
value);
                 if (enablePrintJstack) {
                     if 
(monitor.needPrintJstack(threadPoolWrapper.getThreadPoolExecutor(), value) &&
-                        System.currentTimeMillis() - jstackTime > 
jstackPeriodTime) {
+                            System.currentTimeMillis() - jstackTime > 
jstackPeriodTime) {
                         jstackTime = System.currentTimeMillis();
                         jstackLogger.warn("jstack start\n{}", 
UtilAll.jstack());
                     }
@@ -121,7 +158,7 @@ public class ThreadPoolMonitor {
 
     public static void init() {
         
MONITOR_SCHEDULED.scheduleAtFixedRate(ThreadPoolMonitor::logThreadPoolStatus, 
20,
-            threadPoolStatusPeriodTime, TimeUnit.MILLISECONDS);
+                threadPoolStatusPeriodTime, TimeUnit.MILLISECONDS);
     }
 
     public static void shutdown() {

Reply via email to