This is an automated email from the ASF dual-hosted git repository.

zhouxzhan pushed a commit to branch develop
in repository https://gitbox.apache.org/repos/asf/rocketmq.git

commit 144254a5c444e460377d85a727c3c9074f98c195
Author: nowinkey <[email protected]>
AuthorDate: Mon Jan 16 01:00:43 2023 +0800

    [ISSUE #5884] Concurrent check CommitLog messages
---
 .../apache/rocketmq/store/DefaultMessageStore.java | 283 ++++++++++++++++++++-
 .../rocketmq/store/config/MessageStoreConfig.java  |  13 +
 2 files changed, 290 insertions(+), 6 deletions(-)

diff --git 
a/store/src/main/java/org/apache/rocketmq/store/DefaultMessageStore.java 
b/store/src/main/java/org/apache/rocketmq/store/DefaultMessageStore.java
index 11898f8cf..a4af44222 100644
--- a/store/src/main/java/org/apache/rocketmq/store/DefaultMessageStore.java
+++ b/store/src/main/java/org/apache/rocketmq/store/DefaultMessageStore.java
@@ -42,14 +42,19 @@ import java.util.Map;
 import java.util.Objects;
 import java.util.Optional;
 import java.util.Set;
-import java.util.concurrent.CompletableFuture;
-import java.util.concurrent.ConcurrentHashMap;
-import java.util.concurrent.ConcurrentMap;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.TimeoutException;
 import java.util.concurrent.ExecutionException;
 import java.util.concurrent.Executors;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.ThreadPoolExecutor;
 import java.util.concurrent.ScheduledExecutorService;
-import java.util.concurrent.TimeUnit;
-import java.util.concurrent.TimeoutException;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.ConcurrentMap;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ConcurrentLinkedQueue;
+import java.util.concurrent.LinkedBlockingDeque;
+import java.util.concurrent.atomic.AtomicInteger;
 import java.util.function.Supplier;
 import org.apache.commons.lang3.StringUtils;
 import org.apache.rocketmq.common.AbstractBrokerRunnable;
@@ -2571,10 +2576,181 @@ public class DefaultMessageStore implements 
MessageStore {
         }
     }
 
+    class BatchDispatchRequest {
+
+        private ByteBuffer byteBuffer;
+
+        private int position;
+
+        private int size;
+
+        private int id;
+
+        public BatchDispatchRequest(ByteBuffer byteBuffer, int position, int 
size, int id) {
+            this.byteBuffer = byteBuffer;
+            this.position = position;
+            this.size = size;
+            this.id = id;
+        }
+    }
+
+    class DispatchRequestOrderlyQueue {
+
+        DispatchRequest[][] buffer;
+
+        int ptr = 0;
+
+        AtomicInteger maxPtr = new AtomicInteger();
+
+        public DispatchRequestOrderlyQueue(int bufferNum) {
+            this.buffer = new DispatchRequest[bufferNum][];
+        }
+
+        public void put(int idx, DispatchRequest[] obj) {
+            while (ptr + this.buffer.length <= idx) {
+                synchronized (this) {
+                    try {
+                        this.wait();
+                    } catch (InterruptedException e) {
+                        throw new RuntimeException(e);
+                    }
+                }
+            }
+            int mod = idx % this.buffer.length;
+            this.buffer[mod] = obj;
+            maxPtr.incrementAndGet();
+        }
+
+        public DispatchRequest[] get(List<DispatchRequest[]> rets) {
+            synchronized (this) {
+                for (int i = 0; i < this.buffer.length; i++) {
+                    int mod = ptr % this.buffer.length;
+                    DispatchRequest[] ret = this.buffer[mod];
+                    if (ret == null) {
+                        this.notifyAll();
+                        return null;
+                    }
+                    rets.add(ret);
+                    this.buffer[mod] = null;
+                    ptr++;
+                }
+            }
+            return null;
+        }
+
+        public synchronized boolean isEmpty() {
+            return maxPtr.get() == ptr;
+        }
+
+    }
+
     class ReputMessageService extends ServiceThread {
 
         private volatile long reputFromOffset = 0;
 
+        private int batchId = 0;
+
+        private final AtomicInteger mappedPageHoldCount = new AtomicInteger(0);
+
+        private static final int BATCH_SIZE = 1024 * 1024 * 4;
+
+        private final ConcurrentLinkedQueue<BatchDispatchRequest> 
batchDispatchRequestQueue = new ConcurrentLinkedQueue<>();
+
+        private int dispatchRequestOrderlyQueueSize = 16;
+
+        private final DispatchRequestOrderlyQueue dispatchRequestOrderlyQueue 
= new DispatchRequestOrderlyQueue(dispatchRequestOrderlyQueueSize);
+
+        private int batchDispatchRequestThreadPoolNums = 16;
+
+        private ExecutorService batchDispatchRequestExecutor;
+
+        public ReputMessageService() {
+            if (messageStoreConfig.isEnableBuildConsumeQueueConcurrently()) {
+                initExecutorService();
+                startBatchDispatchRequestService();
+            }
+        }
+
+        private void initExecutorService() {
+            batchDispatchRequestExecutor = new ThreadPoolExecutor(
+                    this.batchDispatchRequestThreadPoolNums,
+                    this.batchDispatchRequestThreadPoolNums,
+                    1000 * 60,
+                    TimeUnit.MICROSECONDS,
+                    new LinkedBlockingDeque<>(),
+                    new 
ThreadFactoryImpl("BatchDispatchRequestServiceThread_"));
+        }
+
+        private void startBatchDispatchRequestService() {
+            new Thread(() -> {
+                while (true) {
+                    if (!batchDispatchRequestQueue.isEmpty()) {
+                        BatchDispatchRequest task = 
batchDispatchRequestQueue.poll();
+                        batchDispatchRequestExecutor.execute(() -> {
+                            ByteBuffer tmpByteBuffer = 
task.byteBuffer.duplicate();
+                            tmpByteBuffer.position(task.position);
+                            tmpByteBuffer.limit(task.position + task.size);
+                            List<DispatchRequest> dispatchRequestList = new 
ArrayList<>();
+                            while (tmpByteBuffer.hasRemaining()) {
+                                DispatchRequest dispatchRequest = 
DefaultMessageStore.this.commitLog.checkMessageAndReturnSize(tmpByteBuffer, 
false, false, false);
+                                if (dispatchRequest.isSuccess()) {
+                                    dispatchRequestList.add(dispatchRequest);
+                                } else {
+                                    LOGGER.error("[BUG]read total count not 
equals msg total size.");
+                                }
+                            }
+                            this.dispatchRequestOrderlyQueue.put(task.id, 
dispatchRequestList.toArray(new DispatchRequest[dispatchRequestList.size()]));
+                            mappedPageHoldCount.getAndDecrement();
+                        });
+                    } else {
+                        try {
+                            Thread.sleep(1);
+                        } catch (InterruptedException e) {
+                            throw new RuntimeException(e);
+                        }
+                    }
+                }
+            }, "MainBatchDispatchRequestServiceThread").start();
+
+            new Thread(() -> {
+                List<DispatchRequest[]> dispatchRequestsList = new 
ArrayList<>();
+                while (true) {
+                    dispatchRequestsList.clear();
+                    dispatchRequestOrderlyQueue.get(dispatchRequestsList);
+                    if (!dispatchRequestsList.isEmpty()) {
+                        for (DispatchRequest[] dispatchRequests : 
dispatchRequestsList) {
+                            for (DispatchRequest dispatchRequest : 
dispatchRequests) {
+                                
DefaultMessageStore.this.doDispatch(dispatchRequest);
+                                // wake up long-polling
+                                if 
(DefaultMessageStore.this.brokerConfig.isLongPollingEnable()
+                                        && 
DefaultMessageStore.this.messageArrivingListener != null) {
+                                    
DefaultMessageStore.this.messageArrivingListener.arriving(dispatchRequest.getTopic(),
+                                            dispatchRequest.getQueueId(), 
dispatchRequest.getConsumeQueueOffset() + 1,
+                                            dispatchRequest.getTagsCode(), 
dispatchRequest.getStoreTimestamp(),
+                                            dispatchRequest.getBitMap(), 
dispatchRequest.getPropertiesMap());
+                                    
notifyMessageArrive4MultiQueue(dispatchRequest);
+                                }
+                                if 
(!DefaultMessageStore.this.getMessageStoreConfig().isDuplicationEnable() &&
+                                        
DefaultMessageStore.this.getMessageStoreConfig().getBrokerRole() == 
BrokerRole.SLAVE) {
+                                    DefaultMessageStore.this.storeStatsService
+                                            
.getSinglePutMessageTopicTimesTotal(dispatchRequest.getTopic()).add(1);
+                                    DefaultMessageStore.this.storeStatsService
+                                            
.getSinglePutMessageTopicSizeTotal(dispatchRequest.getTopic())
+                                            .add(dispatchRequest.getMsgSize());
+                                }
+                            }
+                        }
+                    } else {
+                        try {
+                            Thread.sleep(1);
+                        } catch (InterruptedException e) {
+                            throw new RuntimeException(e);
+                        }
+                    }
+                }
+            }, "DispatchServiceThread").start();
+        }
+
         public long getReputFromOffset() {
             return reputFromOffset;
         }
@@ -2689,6 +2865,97 @@ public class DefaultMessageStore implements MessageStore 
{
             }
         }
 
+        private void createBatchDispatchRequest(ByteBuffer byteBuffer, int 
position, int size) {
+            if (position < 0) {
+                return;
+            }
+            mappedPageHoldCount.getAndIncrement();
+            BatchDispatchRequest task = new BatchDispatchRequest(byteBuffer, 
position, size, batchId++);
+            batchDispatchRequestQueue.offer(task);
+        }
+
+        private void doReputConcurrently() {
+            if (this.reputFromOffset < 
DefaultMessageStore.this.commitLog.getMinOffset()) {
+                LOGGER.warn("The reputFromOffset={} is smaller than 
minPyOffset={}, this usually indicate that the dispatch behind too much and the 
commitlog has expired.",
+                        this.reputFromOffset, 
DefaultMessageStore.this.commitLog.getMinOffset());
+                this.reputFromOffset = 
DefaultMessageStore.this.commitLog.getMinOffset();
+            }
+            for (boolean doNext = true; this.isCommitLogAvailable() && doNext; 
) {
+
+                SelectMappedBufferResult result = 
DefaultMessageStore.this.commitLog.getData(reputFromOffset);
+
+                if (result == null) {
+                    break;
+                }
+
+                int batchDispatchRequestStart = -1;
+                int batchDispatchRequestSize = -1;
+                try {
+                    this.reputFromOffset = result.getStartOffset();
+
+                    for (int readSize = 0; readSize < result.getSize() && 
reputFromOffset < DefaultMessageStore.this.getConfirmOffset() && doNext; ) {
+                        ByteBuffer byteBuffer = result.getByteBuffer();
+
+                        int totalSize = byteBuffer.getInt();
+                        if (reputFromOffset + totalSize > 
DefaultMessageStore.this.getConfirmOffset()) {
+                            doNext = false;
+                            break;
+                        }
+
+                        int magicCode = byteBuffer.getInt();
+                        switch (magicCode) {
+                            case CommitLog.MESSAGE_MAGIC_CODE:
+                                break;
+                            case CommitLog.BLANK_MAGIC_CODE:
+                                totalSize = 0;
+                                break;
+                            default:
+                                totalSize = -1;
+                                doNext = false;
+                        }
+
+                        if (totalSize > 0) {
+                            if (batchDispatchRequestStart == -1) {
+                                batchDispatchRequestStart = 
byteBuffer.position() - 8;
+                                batchDispatchRequestSize = 0;
+                            }
+                            batchDispatchRequestSize += totalSize;
+                            if (batchDispatchRequestSize > BATCH_SIZE) {
+                                this.createBatchDispatchRequest(byteBuffer, 
batchDispatchRequestStart, batchDispatchRequestSize);
+                                batchDispatchRequestStart = -1;
+                                batchDispatchRequestSize = -1;
+                            }
+                            byteBuffer.position(byteBuffer.position() + 
totalSize - 8);
+                            this.reputFromOffset += totalSize;
+                            readSize += totalSize;
+                        } else {
+                            if (totalSize == 0) {
+                                this.reputFromOffset = 
DefaultMessageStore.this.commitLog.rollNextFile(this.reputFromOffset);
+                                readSize = result.getSize();
+                            }
+                            this.createBatchDispatchRequest(byteBuffer, 
batchDispatchRequestStart, batchDispatchRequestSize);
+                            batchDispatchRequestStart = -1;
+                            batchDispatchRequestSize = -1;
+                        }
+                    }
+                } catch (Throwable e) {
+                    throw e;
+                } finally {
+                    this.createBatchDispatchRequest(result.getByteBuffer(), 
batchDispatchRequestStart, batchDispatchRequestSize);
+                    boolean over = this.mappedPageHoldCount.get() == 0;
+                    while (!over) {
+                        try {
+                            Thread.sleep(1);
+                        } catch (Exception e) {
+                            e.printStackTrace();
+                        }
+                        over = this.mappedPageHoldCount.get() == 0;
+                    }
+                    result.release();
+                }
+            }
+        }
+
         private void notifyMessageArrive4MultiQueue(DispatchRequest 
dispatchRequest) {
             Map<String, String> prop = dispatchRequest.getPropertiesMap();
             if (prop == null) {
@@ -2724,7 +2991,11 @@ public class DefaultMessageStore implements MessageStore 
{
             while (!this.isStopped()) {
                 try {
                     Thread.sleep(1);
-                    this.doReput();
+                    if 
(!messageStoreConfig.isEnableBuildConsumeQueueConcurrently()) {
+                        this.doReput();
+                    } else {
+                        doReputConcurrently();
+                    }
                 } catch (Exception e) {
                     DefaultMessageStore.LOGGER.warn(this.getServiceName() + " 
service has exception. ", e);
                 }
diff --git 
a/store/src/main/java/org/apache/rocketmq/store/config/MessageStoreConfig.java 
b/store/src/main/java/org/apache/rocketmq/store/config/MessageStoreConfig.java
index e29fdc2b0..a55a41df3 100644
--- 
a/store/src/main/java/org/apache/rocketmq/store/config/MessageStoreConfig.java
+++ 
b/store/src/main/java/org/apache/rocketmq/store/config/MessageStoreConfig.java
@@ -376,6 +376,11 @@ public class MessageStoreConfig {
      */
     private int sampleCountThreshold = 5000;
 
+    /**
+     * Build ConsumeQueue concurrently with multi-thread
+     */
+    private boolean enableBuildConsumeQueueConcurrently = false;
+
     public boolean isDebugLockEnable() {
         return debugLockEnable;
     }
@@ -1600,4 +1605,12 @@ public class MessageStoreConfig {
     public void setSampleCountThreshold(int sampleCountThreshold) {
         this.sampleCountThreshold = sampleCountThreshold;
     }
+
+    public boolean isEnableBuildConsumeQueueConcurrently() {
+        return enableBuildConsumeQueueConcurrently;
+    }
+
+    public void setEnableBuildConsumeQueueConcurrently(boolean 
enableBuildConsumeQueueConcurrently) {
+        this.enableBuildConsumeQueueConcurrently = 
enableBuildConsumeQueueConcurrently;
+    }
 }

Reply via email to