This is an automated email from the ASF dual-hosted git repository.
ltamber pushed a commit to branch develop
in repository https://gitbox.apache.org/repos/asf/rocketmq.git
The following commit(s) were added to refs/heads/develop by this push:
new 35c88e3094 [ISSUE #10225] Add RejectedExecutionHandler support for
ThreadPoolMonitor (#10222)
35c88e3094 is described below
commit 35c88e3094b54f5a14d86b1d649d4c5a1fe47428
Author: ltamber <[email protected]>
AuthorDate: Mon Mar 30 09:40:11 2026 +0800
[ISSUE #10225] Add RejectedExecutionHandler support for ThreadPoolMonitor
(#10222)
---
.../rocketmq/common/thread/ThreadPoolMonitor.java | 101 ++++++++++++++-------
1 file changed, 69 insertions(+), 32 deletions(-)
diff --git
a/common/src/main/java/org/apache/rocketmq/common/thread/ThreadPoolMonitor.java
b/common/src/main/java/org/apache/rocketmq/common/thread/ThreadPoolMonitor.java
index 746128d296..a79674568b 100644
---
a/common/src/main/java/org/apache/rocketmq/common/thread/ThreadPoolMonitor.java
+++
b/common/src/main/java/org/apache/rocketmq/common/thread/ThreadPoolMonitor.java
@@ -19,13 +19,16 @@ package org.apache.rocketmq.common.thread;
import com.google.common.collect.Lists;
import com.google.common.util.concurrent.ThreadFactoryBuilder;
+
import java.util.Collections;
import java.util.List;
import java.util.concurrent.CopyOnWriteArrayList;
import java.util.concurrent.LinkedBlockingQueue;
+import java.util.concurrent.RejectedExecutionHandler;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
+
import org.apache.rocketmq.common.UtilAll;
import org.apache.rocketmq.common.utils.ThreadUtils;
import org.apache.rocketmq.logging.org.slf4j.Logger;
@@ -37,7 +40,7 @@ public class ThreadPoolMonitor {
private static final List<ThreadPoolWrapper> MONITOR_EXECUTOR = new
CopyOnWriteArrayList<>();
private static final ScheduledExecutorService MONITOR_SCHEDULED =
ThreadUtils.newSingleThreadScheduledExecutor(
- new
ThreadFactoryBuilder().setNameFormat("ThreadPoolMonitor-%d").build()
+ new
ThreadFactoryBuilder().setNameFormat("ThreadPoolMonitor-%d").build()
);
private static volatile long threadPoolStatusPeriodTime =
TimeUnit.SECONDS.toMillis(3);
@@ -55,48 +58,82 @@ public class ThreadPoolMonitor {
}
public static ThreadPoolExecutor createAndMonitor(int corePoolSize,
- int maximumPoolSize,
- long keepAliveTime,
- TimeUnit unit,
- String name,
- int queueCapacity) {
+ int maximumPoolSize,
+ long keepAliveTime,
+ TimeUnit unit,
+ String name,
+ int queueCapacity) {
return createAndMonitor(corePoolSize, maximumPoolSize, keepAliveTime,
unit, name, queueCapacity, Collections.emptyList());
}
public static ThreadPoolExecutor createAndMonitor(int corePoolSize,
- int maximumPoolSize,
- long keepAliveTime,
- TimeUnit unit,
- String name,
- int queueCapacity,
- ThreadPoolStatusMonitor... threadPoolStatusMonitors) {
+ int maximumPoolSize,
+ long keepAliveTime,
+ TimeUnit unit,
+ String name,
+ int queueCapacity,
+ RejectedExecutionHandler
handler) {
+ return createAndMonitor(corePoolSize, maximumPoolSize, keepAliveTime,
unit, name, queueCapacity, handler, Collections.emptyList());
+ }
+
+ public static ThreadPoolExecutor createAndMonitor(int corePoolSize,
+ int maximumPoolSize,
+ long keepAliveTime,
+ TimeUnit unit,
+ String name,
+ int queueCapacity,
+
ThreadPoolStatusMonitor... threadPoolStatusMonitors) {
+ return createAndMonitor(corePoolSize, maximumPoolSize, keepAliveTime,
unit, name, queueCapacity,
+ Lists.newArrayList(threadPoolStatusMonitors));
+ }
+
+ public static ThreadPoolExecutor createAndMonitor(int corePoolSize,
+ int maximumPoolSize,
+ long keepAliveTime,
+ TimeUnit unit,
+ String name,
+ int queueCapacity,
+ RejectedExecutionHandler
handler,
+
ThreadPoolStatusMonitor... threadPoolStatusMonitors) {
+ return createAndMonitor(corePoolSize, maximumPoolSize, keepAliveTime,
unit, name, queueCapacity, handler,
+ Lists.newArrayList(threadPoolStatusMonitors));
+ }
+
+ public static ThreadPoolExecutor createAndMonitor(int corePoolSize,
+ int maximumPoolSize,
+ long keepAliveTime,
+ TimeUnit unit,
+ String name,
+ int queueCapacity,
+
List<ThreadPoolStatusMonitor> threadPoolStatusMonitors) {
return createAndMonitor(corePoolSize, maximumPoolSize, keepAliveTime,
unit, name, queueCapacity,
- Lists.newArrayList(threadPoolStatusMonitors));
+ new ThreadPoolExecutor.DiscardOldestPolicy(),
threadPoolStatusMonitors);
}
public static ThreadPoolExecutor createAndMonitor(int corePoolSize,
- int maximumPoolSize,
- long keepAliveTime,
- TimeUnit unit,
- String name,
- int queueCapacity,
- List<ThreadPoolStatusMonitor> threadPoolStatusMonitors) {
+ int maximumPoolSize,
+ long keepAliveTime,
+ TimeUnit unit,
+ String name,
+ int queueCapacity,
+ RejectedExecutionHandler
handler,
+
List<ThreadPoolStatusMonitor> threadPoolStatusMonitors) {
ThreadPoolExecutor executor = (ThreadPoolExecutor)
ThreadUtils.newThreadPoolExecutor(
- corePoolSize,
- maximumPoolSize,
- keepAliveTime,
- unit,
- new LinkedBlockingQueue<>(queueCapacity),
- new ThreadFactoryBuilder().setNameFormat(name + "-%d").build(),
- new ThreadPoolExecutor.DiscardOldestPolicy());
+ corePoolSize,
+ maximumPoolSize,
+ keepAliveTime,
+ unit,
+ new LinkedBlockingQueue<>(queueCapacity),
+ new ThreadFactoryBuilder().setNameFormat(name + "-%d").build(),
+ handler);
List<ThreadPoolStatusMonitor> printers = Lists.newArrayList(new
ThreadPoolQueueSizeMonitor(queueCapacity));
printers.addAll(threadPoolStatusMonitors);
MONITOR_EXECUTOR.add(ThreadPoolWrapper.builder()
- .name(name)
- .threadPoolExecutor(executor)
- .statusPrinters(printers)
- .build());
+ .name(name)
+ .threadPoolExecutor(executor)
+ .statusPrinters(printers)
+ .build());
return executor;
}
@@ -110,7 +147,7 @@ public class ThreadPoolMonitor {
waterMarkLogger.info("{}{}{}", nameFormatted, descFormatted,
value);
if (enablePrintJstack) {
if
(monitor.needPrintJstack(threadPoolWrapper.getThreadPoolExecutor(), value) &&
- System.currentTimeMillis() - jstackTime >
jstackPeriodTime) {
+ System.currentTimeMillis() - jstackTime >
jstackPeriodTime) {
jstackTime = System.currentTimeMillis();
jstackLogger.warn("jstack start\n{}",
UtilAll.jstack());
}
@@ -121,7 +158,7 @@ public class ThreadPoolMonitor {
public static void init() {
MONITOR_SCHEDULED.scheduleAtFixedRate(ThreadPoolMonitor::logThreadPoolStatus,
20,
- threadPoolStatusPeriodTime, TimeUnit.MILLISECONDS);
+ threadPoolStatusPeriodTime, TimeUnit.MILLISECONDS);
}
public static void shutdown() {