This is an automated email from the ASF dual-hosted git repository.

zhouxzhan pushed a commit to branch develop
in repository https://gitbox.apache.org/repos/asf/rocketmq.git

commit d211e8664d78713ac3dd5424440b1e119d418b14
Author: nowinkey <[email protected]>
AuthorDate: Tue Feb 7 23:35:43 2023 +0800

    Change the reject policy to AbortPolicy and add try catch final protect
---
 .../apache/rocketmq/store/DefaultMessageStore.java | 62 ++++++++++------------
 1 file changed, 28 insertions(+), 34 deletions(-)

diff --git 
a/store/src/main/java/org/apache/rocketmq/store/DefaultMessageStore.java 
b/store/src/main/java/org/apache/rocketmq/store/DefaultMessageStore.java
index cf1745154..f12a90eea 100644
--- a/store/src/main/java/org/apache/rocketmq/store/DefaultMessageStore.java
+++ b/store/src/main/java/org/apache/rocketmq/store/DefaultMessageStore.java
@@ -43,7 +43,6 @@ import java.util.Objects;
 import java.util.Optional;
 import java.util.Set;
 import java.util.concurrent.LinkedBlockingQueue;
-import java.util.concurrent.RejectedExecutionHandler;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.TimeoutException;
 import java.util.concurrent.ExecutionException;
@@ -2620,9 +2619,9 @@ public class DefaultMessageStore implements MessageStore {
 
         private int size;
 
-        private int id;
+        private long id;
 
-        public BatchDispatchRequest(ByteBuffer byteBuffer, int position, int 
size, int id) {
+        public BatchDispatchRequest(ByteBuffer byteBuffer, int position, int 
size, long id) {
             this.byteBuffer = byteBuffer;
             this.position = position;
             this.size = size;
@@ -2642,7 +2641,7 @@ public class DefaultMessageStore implements MessageStore {
             this.buffer = new DispatchRequest[bufferNum][];
         }
 
-        public void put(int idx, DispatchRequest[] obj) {
+        public void put(long idx, DispatchRequest[] obj) {
             while (ptr + this.buffer.length <= idx) {
                 synchronized (this) {
                     try {
@@ -2652,7 +2651,7 @@ public class DefaultMessageStore implements MessageStore {
                     }
                 }
             }
-            int mod = idx % this.buffer.length;
+            int mod = (int) (idx % this.buffer.length);
             this.buffer[mod] = obj;
             maxPtr.incrementAndGet();
         }
@@ -2864,38 +2863,33 @@ public class DefaultMessageStore implements 
MessageStore {
                     TimeUnit.MICROSECONDS,
                     new LinkedBlockingQueue<>(4096),
                     new 
ThreadFactoryImpl("BatchDispatchRequestServiceThread_"),
-                    new RejectedExecutionHandler() {
-                        @Override
-                        public void rejectedExecution(Runnable r, 
ThreadPoolExecutor executor) {
-                            try {
-                                LOGGER.warn("Task {} is blocking put into the 
workQueue", r);
-                                executor.getQueue().put(r);
-                            } catch (InterruptedException e) {
-                                LOGGER.error("Task {} failed to put into the 
workQueue", r);
-                            }
-                        }
-                    });
+                    new ThreadPoolExecutor.AbortPolicy());
         }
 
         private void pollBatchDispatchRequest() {
-            if (!batchDispatchRequestQueue.isEmpty()) {
-                BatchDispatchRequest task = batchDispatchRequestQueue.poll();
-                batchDispatchRequestExecutor.execute(() -> {
-                    ByteBuffer tmpByteBuffer = task.byteBuffer.duplicate();
-                    tmpByteBuffer.position(task.position);
-                    tmpByteBuffer.limit(task.position + task.size);
-                    List<DispatchRequest> dispatchRequestList = new 
ArrayList<>();
-                    while (tmpByteBuffer.hasRemaining()) {
-                        DispatchRequest dispatchRequest = 
DefaultMessageStore.this.commitLog.checkMessageAndReturnSize(tmpByteBuffer, 
false, false, false);
-                        if (dispatchRequest.isSuccess()) {
-                            dispatchRequestList.add(dispatchRequest);
-                        } else {
-                            LOGGER.error("[BUG]read total count not equals msg 
total size.");
+            try {
+                if (!batchDispatchRequestQueue.isEmpty()) {
+                    BatchDispatchRequest task = 
batchDispatchRequestQueue.peek();
+                    batchDispatchRequestExecutor.execute(() -> {
+                        ByteBuffer tmpByteBuffer = task.byteBuffer.duplicate();
+                        tmpByteBuffer.position(task.position);
+                        tmpByteBuffer.limit(task.position + task.size);
+                        List<DispatchRequest> dispatchRequestList = new 
ArrayList<>();
+                        while (tmpByteBuffer.hasRemaining()) {
+                            DispatchRequest dispatchRequest = 
DefaultMessageStore.this.commitLog.checkMessageAndReturnSize(tmpByteBuffer, 
false, false, false);
+                            if (dispatchRequest.isSuccess()) {
+                                dispatchRequestList.add(dispatchRequest);
+                            } else {
+                                LOGGER.error("[BUG]read total count not equals 
msg total size.");
+                            }
                         }
-                    }
-                    dispatchRequestOrderlyQueue.put(task.id, 
dispatchRequestList.toArray(new DispatchRequest[dispatchRequestList.size()]));
-                    mappedPageHoldCount.getAndDecrement();
-                });
+                        dispatchRequestOrderlyQueue.put(task.id, 
dispatchRequestList.toArray(new DispatchRequest[dispatchRequestList.size()]));
+                        mappedPageHoldCount.getAndDecrement();
+                    });
+                    batchDispatchRequestQueue.poll();
+                }
+            } catch (Exception e) {
+                LOGGER.warn(e.getMessage());
             }
         }
 
@@ -2987,7 +2981,7 @@ public class DefaultMessageStore implements MessageStore {
 
         private static final int BATCH_SIZE = 1024 * 1024 * 4;
 
-        private int batchId = 0;
+        private long batchId = 0;
 
         public void createBatchDispatchRequest(ByteBuffer byteBuffer, int 
position, int size) {
             if (position < 0) {

Reply via email to