This is an automated email from the ASF dual-hosted git repository. zhouxzhan pushed a commit to branch develop in repository https://gitbox.apache.org/repos/asf/rocketmq.git
commit 8bf81c46849837703d7acc4802558ed3bdc122b7 Author: nowinkey <[email protected]> AuthorDate: Sun Feb 5 20:38:35 2023 +0800 Set a custom reject policy for batchDispatchRequestExecutor --- .../apache/rocketmq/store/DefaultMessageStore.java | 23 +++++++++++++++++----- 1 file changed, 18 insertions(+), 5 deletions(-) diff --git a/store/src/main/java/org/apache/rocketmq/store/DefaultMessageStore.java b/store/src/main/java/org/apache/rocketmq/store/DefaultMessageStore.java index 97a0ee835..cf1745154 100644 --- a/store/src/main/java/org/apache/rocketmq/store/DefaultMessageStore.java +++ b/store/src/main/java/org/apache/rocketmq/store/DefaultMessageStore.java @@ -43,6 +43,7 @@ import java.util.Objects; import java.util.Optional; import java.util.Set; import java.util.concurrent.LinkedBlockingQueue; +import java.util.concurrent.RejectedExecutionHandler; import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeoutException; import java.util.concurrent.ExecutionException; @@ -55,6 +56,7 @@ import java.util.concurrent.ConcurrentMap; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ConcurrentLinkedQueue; import java.util.concurrent.atomic.AtomicInteger; +import java.util.concurrent.atomic.AtomicLong; import java.util.function.Supplier; import org.apache.commons.lang3.StringUtils; import org.apache.rocketmq.common.AbstractBrokerRunnable; @@ -2632,9 +2634,9 @@ public class DefaultMessageStore implements MessageStore { DispatchRequest[][] buffer; - int ptr = 0; + long ptr = 0; - AtomicInteger maxPtr = new AtomicInteger(); + AtomicLong maxPtr = new AtomicLong(); public DispatchRequestOrderlyQueue(int bufferNum) { this.buffer = new DispatchRequest[bufferNum][]; @@ -2658,7 +2660,7 @@ public class DefaultMessageStore implements MessageStore { public DispatchRequest[] get(List<DispatchRequest[]> rets) { synchronized (this) { for (int i = 0; i < this.buffer.length; i++) { - int mod = ptr % this.buffer.length; + int mod = (int) (ptr % this.buffer.length); DispatchRequest[] ret = this.buffer[mod]; if (ret == null) { this.notifyAll(); @@ -2860,8 +2862,19 @@ public class DefaultMessageStore implements MessageStore { DefaultMessageStore.this.getBrokerConfig().getBatchDispatchRequestThreadPoolNums(), 1000 * 60, TimeUnit.MICROSECONDS, - new LinkedBlockingQueue<>(1024), - new ThreadFactoryImpl("BatchDispatchRequestServiceThread_")); + new LinkedBlockingQueue<>(4096), + new ThreadFactoryImpl("BatchDispatchRequestServiceThread_"), + new RejectedExecutionHandler() { + @Override + public void rejectedExecution(Runnable r, ThreadPoolExecutor executor) { + try { + LOGGER.warn("Task {} is blocking put into the workQueue", r); + executor.getQueue().put(r); + } catch (InterruptedException e) { + LOGGER.error("Task {} failed to put into the workQueue", r); + } + } + }); } private void pollBatchDispatchRequest() {
