This is an automated email from the ASF dual-hosted git repository.

zhangyang pushed a commit to branch develop
in repository https://gitbox.apache.org/repos/asf/rocketmq.git


The following commit(s) were added to refs/heads/develop by this push:
     new 97198fa  [ISSUE #3449] Delayed message supports asynchronous delivery 
(#3458)
97198fa is described below

commit 97198fadf691a2f65c1f588b0466fa9ee7da1657
Author: zhangyang <zhangyan...@xiaomi.com>
AuthorDate: Thu Jan 20 22:22:43 2022 +0800

    [ISSUE #3449] Delayed message supports asynchronous delivery (#3458)
    
    * [schedule] Delayed message repeated delivery optimization
    
    Signed-off-by: zhangyang21 <zhangyan...@xiaomi.com>
    
    * [schedule] Support asynchronous delivery
    
    Signed-off-by: zhangyang <git_y...@163.com>
    
    * use maxDelayLevel as deliverThreadPoolNums
    
    Signed-off-by: zhangyang21 <zhangyan...@xiaomi.com>
    
    * [HAConnection] fix code style
    
    Signed-off-by: zhangyang21 <zhangyan...@xiaomi.com>
---
 .../rocketmq/store/config/MessageStoreConfig.java  |  28 +
 .../org/apache/rocketmq/store/ha/HAConnection.java |   2 +-
 .../store/schedule/ScheduleMessageService.java     | 621 ++++++++++++++++-----
 .../rocketmq/store/ScheduleMessageServiceTest.java | 116 +++-
 4 files changed, 625 insertions(+), 142 deletions(-)

diff --git 
a/store/src/main/java/org/apache/rocketmq/store/config/MessageStoreConfig.java 
b/store/src/main/java/org/apache/rocketmq/store/config/MessageStoreConfig.java
index 1188f21..bb1e01f 100644
--- 
a/store/src/main/java/org/apache/rocketmq/store/config/MessageStoreConfig.java
+++ 
b/store/src/main/java/org/apache/rocketmq/store/config/MessageStoreConfig.java
@@ -163,6 +163,10 @@ public class MessageStoreConfig {
     private boolean enableMultiDispatch = false;
     private int maxLmqConsumeQueueNum = 20000;
 
+    private boolean enableScheduleAsyncDeliver = false;
+    private int scheduleAsyncDeliverMaxPendingLimit = 2000;
+    private int scheduleAsyncDeliverMaxResendNum2Blocked = 3;
+
     public boolean isDebugLockEnable() {
         return debugLockEnable;
     }
@@ -772,4 +776,28 @@ public class MessageStoreConfig {
     public void setMaxLmqConsumeQueueNum(int maxLmqConsumeQueueNum) {
         this.maxLmqConsumeQueueNum = maxLmqConsumeQueueNum;
     }
+
+    public boolean isEnableScheduleAsyncDeliver() {
+        return enableScheduleAsyncDeliver;
+    }
+
+    public void setEnableScheduleAsyncDeliver(boolean 
enableScheduleAsyncDeliver) {
+        this.enableScheduleAsyncDeliver = enableScheduleAsyncDeliver;
+    }
+
+    public int getScheduleAsyncDeliverMaxPendingLimit() {
+        return scheduleAsyncDeliverMaxPendingLimit;
+    }
+
+    public void setScheduleAsyncDeliverMaxPendingLimit(int 
scheduleAsyncDeliverMaxPendingLimit) {
+        this.scheduleAsyncDeliverMaxPendingLimit = 
scheduleAsyncDeliverMaxPendingLimit;
+    }
+
+    public int getScheduleAsyncDeliverMaxResendNum2Blocked() {
+        return scheduleAsyncDeliverMaxResendNum2Blocked;
+    }
+
+    public void setScheduleAsyncDeliverMaxResendNum2Blocked(int 
scheduleAsyncDeliverMaxResendNum2Blocked) {
+        this.scheduleAsyncDeliverMaxResendNum2Blocked = 
scheduleAsyncDeliverMaxResendNum2Blocked;
+    }
 }
diff --git a/store/src/main/java/org/apache/rocketmq/store/ha/HAConnection.java 
b/store/src/main/java/org/apache/rocketmq/store/ha/HAConnection.java
index dd68c73..c08c515 100644
--- a/store/src/main/java/org/apache/rocketmq/store/ha/HAConnection.java
+++ b/store/src/main/java/org/apache/rocketmq/store/ha/HAConnection.java
@@ -168,7 +168,7 @@ public class HAConnection {
                             if (HAConnection.this.slaveRequestOffset < 0) {
                                 HAConnection.this.slaveRequestOffset = 
readOffset;
                                 log.info("slave[" + 
HAConnection.this.clientAddr + "] request offset " + readOffset);
-                            } else if (HAConnection.this.slaveAckOffset > 
HAConnection.this.haService.getDefaultMessageStore().getMaxPhyOffset()){
+                            } else if (HAConnection.this.slaveAckOffset > 
HAConnection.this.haService.getDefaultMessageStore().getMaxPhyOffset()) {
                                 log.warn("slave[{}] request offset={} greater 
than local commitLog offset={}. ",
                                         HAConnection.this.clientAddr,
                                         HAConnection.this.slaveAckOffset,
diff --git 
a/store/src/main/java/org/apache/rocketmq/store/schedule/ScheduleMessageService.java
 
b/store/src/main/java/org/apache/rocketmq/store/schedule/ScheduleMessageService.java
index c45287f..d5b4e8d 100644
--- 
a/store/src/main/java/org/apache/rocketmq/store/schedule/ScheduleMessageService.java
+++ 
b/store/src/main/java/org/apache/rocketmq/store/schedule/ScheduleMessageService.java
@@ -19,8 +19,12 @@ package org.apache.rocketmq.store.schedule;
 import java.util.HashMap;
 import java.util.Iterator;
 import java.util.Map;
+import java.util.Queue;
+import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.ConcurrentMap;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.LinkedBlockingQueue;
 import java.util.concurrent.ScheduledExecutorService;
 import java.util.concurrent.ScheduledThreadPoolExecutor;
 import java.util.concurrent.TimeUnit;
@@ -54,6 +58,8 @@ public class ScheduleMessageService extends ConfigManager {
     private static final long FIRST_DELAY_TIME = 1000L;
     private static final long DELAY_FOR_A_WHILE = 100L;
     private static final long DELAY_FOR_A_PERIOD = 10000L;
+    private static final long WAIT_FOR_SHUTDOWN = 5000L;
+    private static final long DELAY_FOR_A_SLEEP = 10L;
 
     private final ConcurrentMap<Integer /* level */, Long/* delay timeMillis 
*/> delayLevelTable =
         new ConcurrentHashMap<Integer, Long>(32);
@@ -63,13 +69,19 @@ public class ScheduleMessageService extends ConfigManager {
     private final DefaultMessageStore defaultMessageStore;
     private final AtomicBoolean started = new AtomicBoolean(false);
     private ScheduledExecutorService deliverExecutorService;
-    private int deliverThreadPoolNums = 
Runtime.getRuntime().availableProcessors();
     private MessageStore writeMessageStore;
     private int maxDelayLevel;
+    private boolean enableAsyncDeliver = false;
+    private ScheduledExecutorService handleExecutorService;
+    private final Map<Integer /* level */, 
LinkedBlockingQueue<PutResultProcess>> deliverPendingTable =
+        new ConcurrentHashMap<>(32);
 
     public ScheduleMessageService(final DefaultMessageStore 
defaultMessageStore) {
         this.defaultMessageStore = defaultMessageStore;
         this.writeMessageStore = defaultMessageStore;
+        if (defaultMessageStore != null) {
+            this.enableAsyncDeliver = 
defaultMessageStore.getMessageStoreConfig().isEnableScheduleAsyncDeliver();
+        }
     }
 
     public static int queueId2DelayLevel(final int queueId) {
@@ -116,7 +128,10 @@ public class ScheduleMessageService extends ConfigManager {
     public void start() {
         if (started.compareAndSet(false, true)) {
             super.load();
-            this.deliverExecutorService = new 
ScheduledThreadPoolExecutor(deliverThreadPoolNums, new 
ThreadFactoryImpl("ScheduleMessageTimerThread_"));
+            this.deliverExecutorService = new 
ScheduledThreadPoolExecutor(this.maxDelayLevel, new 
ThreadFactoryImpl("ScheduleMessageTimerThread_"));
+            if (this.enableAsyncDeliver) {
+                this.handleExecutorService = new 
ScheduledThreadPoolExecutor(this.maxDelayLevel, new 
ThreadFactoryImpl("ScheduleMessageExecutorHandleThread_"));
+            }
             for (Map.Entry<Integer, Long> entry : 
this.delayLevelTable.entrySet()) {
                 Integer level = entry.getKey();
                 Long timeDelay = entry.getValue();
@@ -126,6 +141,9 @@ public class ScheduleMessageService extends ConfigManager {
                 }
 
                 if (timeDelay != null) {
+                    if (this.enableAsyncDeliver) {
+                        this.handleExecutorService.schedule(new 
HandlePutResultTask(level), FIRST_DELAY_TIME, TimeUnit.MILLISECONDS);
+                    }
                     this.deliverExecutorService.schedule(new 
DeliverDelayedMessageTimerTask(level, offset), FIRST_DELAY_TIME, 
TimeUnit.MILLISECONDS);
                 }
             }
@@ -148,7 +166,29 @@ public class ScheduleMessageService extends ConfigManager {
 
     public void shutdown() {
         if (this.started.compareAndSet(true, false) && null != 
this.deliverExecutorService) {
-            this.deliverExecutorService.shutdownNow();
+            this.deliverExecutorService.shutdown();
+            try {
+                
this.deliverExecutorService.awaitTermination(WAIT_FOR_SHUTDOWN, 
TimeUnit.MILLISECONDS);
+            } catch (InterruptedException e) {
+                log.error("deliverExecutorService awaitTermination error", e);
+            }
+
+            if (this.handleExecutorService != null) {
+                this.handleExecutorService.shutdown();
+                try {
+                    
this.handleExecutorService.awaitTermination(WAIT_FOR_SHUTDOWN, 
TimeUnit.MILLISECONDS);
+                } catch (InterruptedException e) {
+                    log.error("handleExecutorService awaitTermination error", 
e);
+                }
+            }
+
+            if (this.deliverPendingTable != null) {
+                for (int i = 1; i <= this.deliverPendingTable.size(); i++) {
+                    log.warn("deliverPendingTable level: {}, size: {}", i, 
this.deliverPendingTable.get(i).size());
+                }
+            }
+
+            this.persist();
         }
     }
 
@@ -255,6 +295,9 @@ public class ScheduleMessageService extends ConfigManager {
                 long num = Long.parseLong(value.substring(0, value.length() - 
1));
                 long delayTimeMillis = tu * num;
                 this.delayLevelTable.put(level, delayTimeMillis);
+                if (this.enableAsyncDeliver) {
+                    this.deliverPendingTable.put(level, new 
LinkedBlockingQueue<>());
+                }
             }
         } catch (Exception e) {
             log.error("parseDelayLevel exception", e);
@@ -265,6 +308,36 @@ public class ScheduleMessageService extends ConfigManager {
         return true;
     }
 
+    private MessageExtBrokerInner messageTimeup(MessageExt msgExt) {
+        MessageExtBrokerInner msgInner = new MessageExtBrokerInner();
+        msgInner.setBody(msgExt.getBody());
+        msgInner.setFlag(msgExt.getFlag());
+        MessageAccessor.setProperties(msgInner, msgExt.getProperties());
+
+        TopicFilterType topicFilterType = 
MessageExt.parseTopicFilterType(msgInner.getSysFlag());
+        long tagsCodeValue =
+            MessageExtBrokerInner.tagsString2tagsCode(topicFilterType, 
msgInner.getTags());
+        msgInner.setTagsCode(tagsCodeValue);
+        
msgInner.setPropertiesString(MessageDecoder.messageProperties2String(msgExt.getProperties()));
+
+        msgInner.setSysFlag(msgExt.getSysFlag());
+        msgInner.setBornTimestamp(msgExt.getBornTimestamp());
+        msgInner.setBornHost(msgExt.getBornHost());
+        msgInner.setStoreHost(msgExt.getStoreHost());
+        msgInner.setReconsumeTimes(msgExt.getReconsumeTimes());
+
+        msgInner.setWaitStoreMsgOK(false);
+        MessageAccessor.clearProperty(msgInner, 
MessageConst.PROPERTY_DELAY_TIME_LEVEL);
+
+        
msgInner.setTopic(msgInner.getProperty(MessageConst.PROPERTY_REAL_TOPIC));
+
+        String queueIdStr = 
msgInner.getProperty(MessageConst.PROPERTY_REAL_QUEUE_ID);
+        int queueId = Integer.parseInt(queueIdStr);
+        msgInner.setQueueId(queueId);
+
+        return msgInner;
+    }
+
     class DeliverDelayedMessageTimerTask implements Runnable {
         private final int delayLevel;
         private final long offset;
@@ -283,8 +356,7 @@ public class ScheduleMessageService extends ConfigManager {
             } catch (Exception e) {
                 // XXX: warn and notify me
                 log.error("ScheduleMessageService, executeOnTimeup exception", 
e);
-                
ScheduleMessageService.this.deliverExecutorService.schedule(new 
DeliverDelayedMessageTimerTask(
-                    this.delayLevel, this.offset), DELAY_FOR_A_PERIOD, 
TimeUnit.MILLISECONDS);
+                this.scheduleNextTimerTask(this.offset, DELAY_FOR_A_PERIOD);
             }
         }
 
@@ -308,158 +380,431 @@ public class ScheduleMessageService extends 
ConfigManager {
                 
ScheduleMessageService.this.defaultMessageStore.findConsumeQueue(TopicValidator.RMQ_SYS_SCHEDULE_TOPIC,
                     delayLevel2QueueId(delayLevel));
 
-            long failScheduleOffset = offset;
+            if (cq == null) {
+                this.scheduleNextTimerTask(this.offset, DELAY_FOR_A_WHILE);
+                return;
+            }
 
-            if (cq != null) {
-                SelectMappedBufferResult bufferCQ = 
cq.getIndexBuffer(this.offset);
-                if (bufferCQ != null) {
-                    try {
-                        long nextOffset = offset;
-                        int i = 0;
-                        ConsumeQueueExt.CqExtUnit cqExtUnit = new 
ConsumeQueueExt.CqExtUnit();
-                        for (; i < bufferCQ.getSize(); i += 
ConsumeQueue.CQ_STORE_UNIT_SIZE) {
-                            long offsetPy = bufferCQ.getByteBuffer().getLong();
-                            int sizePy = bufferCQ.getByteBuffer().getInt();
-                            long tagsCode = bufferCQ.getByteBuffer().getLong();
-
-                            if (cq.isExtAddr(tagsCode)) {
-                                if (cq.getExt(tagsCode, cqExtUnit)) {
-                                    tagsCode = cqExtUnit.getTagsCode();
-                                } else {
-                                    //can't find ext content.So re compute 
tags code.
-                                    log.error("[BUG] can't find consume queue 
extend file content!addr={}, offsetPy={}, sizePy={}",
-                                        tagsCode, offsetPy, sizePy);
-                                    long msgStoreTime = 
defaultMessageStore.getCommitLog().pickupStoreTimestamp(offsetPy, sizePy);
-                                    tagsCode = 
computeDeliverTimestamp(delayLevel, msgStoreTime);
-                                }
-                            }
+            SelectMappedBufferResult bufferCQ = cq.getIndexBuffer(this.offset);
+            if (bufferCQ == null) {
+                long resetOffset;
+                if ((resetOffset = cq.getMinOffsetInQueue()) > this.offset) {
+                    log.error("schedule CQ offset invalid. offset={}, 
cqMinOffset={}, queueId={}",
+                        this.offset, resetOffset, cq.getQueueId());
+                } else if ((resetOffset = cq.getMaxOffsetInQueue()) < 
this.offset) {
+                    log.error("schedule CQ offset invalid. offset={}, 
cqMaxOffset={}, queueId={}",
+                        this.offset, resetOffset, cq.getQueueId());
+                } else {
+                    resetOffset = this.offset;
+                }
 
-                            long now = System.currentTimeMillis();
-                            long deliverTimestamp = 
this.correctDeliverTimestamp(now, tagsCode);
-
-                            nextOffset = offset + (i / 
ConsumeQueue.CQ_STORE_UNIT_SIZE);
-
-                            long countdown = deliverTimestamp - now;
-
-                            if (countdown <= 0) {
-                                MessageExt msgExt =
-                                    
ScheduleMessageService.this.defaultMessageStore.lookMessageByOffset(
-                                        offsetPy, sizePy);
-
-                                if (msgExt != null) {
-                                    try {
-                                        MessageExtBrokerInner msgInner = 
this.messageTimeup(msgExt);
-                                        if 
(TopicValidator.RMQ_SYS_TRANS_HALF_TOPIC.equals(msgInner.getTopic())) {
-                                            log.error("[BUG] the real topic of 
schedule msg is {}, discard the msg. msg={}",
-                                                msgInner.getTopic(), msgInner);
-                                            continue;
-                                        }
-                                        PutMessageResult putMessageResult =
-                                            
ScheduleMessageService.this.writeMessageStore
-                                                .putMessage(msgInner);
-
-                                        if (putMessageResult != null
-                                            && 
putMessageResult.getPutMessageStatus() == PutMessageStatus.PUT_OK) {
-                                            if 
(ScheduleMessageService.this.defaultMessageStore.getMessageStoreConfig().isEnableScheduleMessageStats())
 {
-                                                
ScheduleMessageService.this.defaultMessageStore.getBrokerStatsManager().incQueueGetNums(MixAll.SCHEDULE_CONSUMER_GROUP,
 TopicValidator.RMQ_SYS_SCHEDULE_TOPIC, delayLevel - 1, 
putMessageResult.getAppendMessageResult().getMsgNum());
-                                                
ScheduleMessageService.this.defaultMessageStore.getBrokerStatsManager().incQueueGetSize(MixAll.SCHEDULE_CONSUMER_GROUP,
 TopicValidator.RMQ_SYS_SCHEDULE_TOPIC, delayLevel - 1, 
putMessageResult.getAppendMessageResult().getWroteBytes());
-                                                
ScheduleMessageService.this.defaultMessageStore.getBrokerStatsManager().incGroupGetNums(MixAll.SCHEDULE_CONSUMER_GROUP,
 TopicValidator.RMQ_SYS_SCHEDULE_TOPIC, 
putMessageResult.getAppendMessageResult().getMsgNum());
-                                                
ScheduleMessageService.this.defaultMessageStore.getBrokerStatsManager().incGroupGetSize(MixAll.SCHEDULE_CONSUMER_GROUP,
 TopicValidator.RMQ_SYS_SCHEDULE_TOPIC, 
putMessageResult.getAppendMessageResult().getWroteBytes());
-                                                
ScheduleMessageService.this.defaultMessageStore.getBrokerStatsManager().incTopicPutNums(msgInner.getTopic(),
 putMessageResult.getAppendMessageResult().getMsgNum(), 1);
-                                                
ScheduleMessageService.this.defaultMessageStore.getBrokerStatsManager().incTopicPutSize(msgInner.getTopic(),
-                                                    
putMessageResult.getAppendMessageResult().getWroteBytes());
-                                                
ScheduleMessageService.this.defaultMessageStore.getBrokerStatsManager().incBrokerPutNums(putMessageResult.getAppendMessageResult().getMsgNum());
-                                            }
-                                            continue;
-                                        } else {
-                                            // XXX: warn and notify me
-                                            log.error(
-                                                "ScheduleMessageService, a 
message time up, but reput it failed, topic: {} msgId {}",
-                                                msgExt.getTopic(), 
msgExt.getMsgId());
-                                            
ScheduleMessageService.this.deliverExecutorService.schedule(
-                                                new 
DeliverDelayedMessageTimerTask(this.delayLevel,
-                                                    nextOffset), 
DELAY_FOR_A_PERIOD, TimeUnit.MILLISECONDS);
-                                            
ScheduleMessageService.this.updateOffset(this.delayLevel,
-                                                nextOffset);
-                                            return;
-                                        }
-                                    } catch (Exception e) {
-                                        /*
-                                         * XXX: warn and notify me
-                                         */
-                                        log.error(
-                                            "ScheduleMessageService, 
messageTimeup execute error, drop it. msgExt={}, nextOffset={}, offsetPy={}, 
sizePy={}", msgExt, nextOffset, offsetPy, sizePy, e);
-                                    }
-                                }
-                            } else {
-                                
ScheduleMessageService.this.deliverExecutorService.schedule(
-                                    new 
DeliverDelayedMessageTimerTask(this.delayLevel, nextOffset),
-                                    countdown, TimeUnit.MILLISECONDS);
-                                
ScheduleMessageService.this.updateOffset(this.delayLevel, nextOffset);
-                                return;
-                            }
-                        } // end of for
+                this.scheduleNextTimerTask(resetOffset, DELAY_FOR_A_WHILE);
+                return;
+            }
 
-                        nextOffset = offset + (i / 
ConsumeQueue.CQ_STORE_UNIT_SIZE);
-                        
ScheduleMessageService.this.deliverExecutorService.schedule(new 
DeliverDelayedMessageTimerTask(
-                            this.delayLevel, nextOffset), DELAY_FOR_A_WHILE, 
TimeUnit.MILLISECONDS);
-                        
ScheduleMessageService.this.updateOffset(this.delayLevel, nextOffset);
+            long nextOffset = this.offset;
+            try {
+                int i = 0;
+                ConsumeQueueExt.CqExtUnit cqExtUnit = new 
ConsumeQueueExt.CqExtUnit();
+                for (; i < bufferCQ.getSize() && isStarted(); i += 
ConsumeQueue.CQ_STORE_UNIT_SIZE) {
+                    long offsetPy = bufferCQ.getByteBuffer().getLong();
+                    int sizePy = bufferCQ.getByteBuffer().getInt();
+                    long tagsCode = bufferCQ.getByteBuffer().getLong();
+
+                    if (cq.isExtAddr(tagsCode)) {
+                        if (cq.getExt(tagsCode, cqExtUnit)) {
+                            tagsCode = cqExtUnit.getTagsCode();
+                        } else {
+                            //can't find ext content.So re compute tags code.
+                            log.error("[BUG] can't find consume queue extend 
file content!addr={}, offsetPy={}, sizePy={}",
+                                tagsCode, offsetPy, sizePy);
+                            long msgStoreTime = 
defaultMessageStore.getCommitLog().pickupStoreTimestamp(offsetPy, sizePy);
+                            tagsCode = computeDeliverTimestamp(delayLevel, 
msgStoreTime);
+                        }
+                    }
+
+                    long now = System.currentTimeMillis();
+                    long deliverTimestamp = this.correctDeliverTimestamp(now, 
tagsCode);
+                    nextOffset = offset + (i / 
ConsumeQueue.CQ_STORE_UNIT_SIZE);
+
+                    long countdown = deliverTimestamp - now;
+                    if (countdown > 0) {
+                        this.scheduleNextTimerTask(nextOffset, 
DELAY_FOR_A_WHILE);
                         return;
-                    } finally {
+                    }
 
-                        bufferCQ.release();
+                    MessageExt msgExt = 
ScheduleMessageService.this.defaultMessageStore.lookMessageByOffset(offsetPy, 
sizePy);
+                    if (msgExt == null) {
+                        continue;
                     }
-                } // end of if (bufferCQ != null)
-                else {
-
-                    long cqMinOffset = cq.getMinOffsetInQueue();
-                    long cqMaxOffset = cq.getMaxOffsetInQueue();
-                    if (offset < cqMinOffset) {
-                        failScheduleOffset = cqMinOffset;
-                        log.error("schedule CQ offset invalid. offset={}, 
cqMinOffset={}, cqMaxOffset={}, queueId={}",
-                            offset, cqMinOffset, cqMaxOffset, cq.getQueueId());
+
+                    MessageExtBrokerInner msgInner = 
ScheduleMessageService.this.messageTimeup(msgExt);
+                    if 
(TopicValidator.RMQ_SYS_TRANS_HALF_TOPIC.equals(msgInner.getTopic())) {
+                        log.error("[BUG] the real topic of schedule msg is {}, 
discard the msg. msg={}",
+                            msgInner.getTopic(), msgInner);
+                        continue;
+                    }
+
+                    boolean deliverSuc;
+                    if (ScheduleMessageService.this.enableAsyncDeliver) {
+                        deliverSuc = this.asyncDeliver(msgInner, 
msgExt.getMsgId(), offset, offsetPy, sizePy);
+                    } else {
+                        deliverSuc = this.syncDeliver(msgInner, 
msgExt.getMsgId(), offset, offsetPy, sizePy);
                     }
 
-                    if (offset > cqMaxOffset) {
-                        failScheduleOffset = cqMaxOffset;
-                        log.error("schedule CQ offset invalid. offset={}, 
cqMinOffset={}, cqMaxOffset={}, queueId={}",
-                            offset, cqMinOffset, cqMaxOffset, cq.getQueueId());
+                    if (!deliverSuc) {
+                        this.scheduleNextTimerTask(nextOffset, 
DELAY_FOR_A_WHILE);
+                        return;
                     }
                 }
-            } // end of if (cq != null)
 
-            ScheduleMessageService.this.deliverExecutorService.schedule(new 
DeliverDelayedMessageTimerTask(this.delayLevel,
-                failScheduleOffset), DELAY_FOR_A_WHILE, TimeUnit.MILLISECONDS);
+                nextOffset = this.offset + (i / 
ConsumeQueue.CQ_STORE_UNIT_SIZE);
+            } catch (Exception e) {
+                log.error("ScheduleMessageService, messageTimeup execute 
error, offset = {}", nextOffset, e);
+            } finally {
+                bufferCQ.release();
+            }
+
+            this.scheduleNextTimerTask(nextOffset, DELAY_FOR_A_WHILE);
         }
 
-        private MessageExtBrokerInner messageTimeup(MessageExt msgExt) {
-            MessageExtBrokerInner msgInner = new MessageExtBrokerInner();
-            msgInner.setBody(msgExt.getBody());
-            msgInner.setFlag(msgExt.getFlag());
-            MessageAccessor.setProperties(msgInner, msgExt.getProperties());
+        public void scheduleNextTimerTask(long offset, long delay) {
+            ScheduleMessageService.this.deliverExecutorService.schedule(new 
DeliverDelayedMessageTimerTask(
+                this.delayLevel, offset), delay, TimeUnit.MILLISECONDS);
+        }
 
-            TopicFilterType topicFilterType = 
MessageExt.parseTopicFilterType(msgInner.getSysFlag());
-            long tagsCodeValue =
-                MessageExtBrokerInner.tagsString2tagsCode(topicFilterType, 
msgInner.getTags());
-            msgInner.setTagsCode(tagsCodeValue);
-            
msgInner.setPropertiesString(MessageDecoder.messageProperties2String(msgExt.getProperties()));
+        private boolean syncDeliver(MessageExtBrokerInner msgInner, String 
msgId, long offset, long offsetPy,
+            int sizePy) {
+            PutResultProcess resultProcess = deliverMessage(msgInner, msgId, 
offset, offsetPy, sizePy, false);
+            PutMessageResult result = resultProcess.get();
+            boolean sendStatus = result != null && 
result.getPutMessageStatus() == PutMessageStatus.PUT_OK;
+            if (sendStatus) {
+                ScheduleMessageService.this.updateOffset(this.delayLevel, 
resultProcess.getNextOffset());
+            }
+            return sendStatus;
+        }
 
-            msgInner.setSysFlag(msgExt.getSysFlag());
-            msgInner.setBornTimestamp(msgExt.getBornTimestamp());
-            msgInner.setBornHost(msgExt.getBornHost());
-            msgInner.setStoreHost(msgExt.getStoreHost());
-            msgInner.setReconsumeTimes(msgExt.getReconsumeTimes());
+        private boolean asyncDeliver(MessageExtBrokerInner msgInner, String 
msgId, long offset, long offsetPy,
+            int sizePy) {
+            Queue<PutResultProcess> processesQueue = 
ScheduleMessageService.this.deliverPendingTable.get(this.delayLevel);
+
+            //Flow Control
+            int currentPendingNum = processesQueue.size();
+            int maxPendingLimit = 
ScheduleMessageService.this.defaultMessageStore.getMessageStoreConfig()
+                .getScheduleAsyncDeliverMaxPendingLimit();
+            if (currentPendingNum > maxPendingLimit) {
+                log.warn("Asynchronous deliver triggers flow control, " +
+                    "currentPendingNum={}, maxPendingLimit={}", 
currentPendingNum, maxPendingLimit);
+                return false;
+            }
 
-            msgInner.setWaitStoreMsgOK(false);
-            MessageAccessor.clearProperty(msgInner, 
MessageConst.PROPERTY_DELAY_TIME_LEVEL);
+            //Blocked
+            PutResultProcess firstProcess = processesQueue.peek();
+            if (firstProcess != null && firstProcess.need2Blocked()) {
+                log.warn("Asynchronous deliver block. info={}", 
firstProcess.toString());
+                return false;
+            }
 
-            
msgInner.setTopic(msgInner.getProperty(MessageConst.PROPERTY_REAL_TOPIC));
+            PutResultProcess resultProcess = deliverMessage(msgInner, msgId, 
offset, offsetPy, sizePy, true);
+            processesQueue.add(resultProcess);
+            return true;
+        }
 
-            String queueIdStr = 
msgInner.getProperty(MessageConst.PROPERTY_REAL_QUEUE_ID);
-            int queueId = Integer.parseInt(queueIdStr);
-            msgInner.setQueueId(queueId);
+        private PutResultProcess deliverMessage(MessageExtBrokerInner 
msgInner, String msgId, long offset,
+            long offsetPy, int sizePy, boolean autoResend) {
+            CompletableFuture<PutMessageResult> future =
+                
ScheduleMessageService.this.writeMessageStore.asyncPutMessage(msgInner);
+            return new PutResultProcess()
+                .setTopic(msgInner.getTopic())
+                .setDelayLevel(this.delayLevel)
+                .setOffset(offset)
+                .setPhysicOffset(offsetPy)
+                .setPhysicSize(sizePy)
+                .setMsgId(msgId)
+                .setAutoResend(autoResend)
+                .setFuture(future)
+                .thenProcess();
+        }
+    }
+
+    public class HandlePutResultTask implements Runnable {
+        private final int delayLevel;
 
-            return msgInner;
+        public HandlePutResultTask(int delayLevel) {
+            this.delayLevel = delayLevel;
         }
+
+        @Override
+        public void run() {
+            LinkedBlockingQueue<PutResultProcess> pendingQueue =
+                
ScheduleMessageService.this.deliverPendingTable.get(this.delayLevel);
+
+            PutResultProcess putResultProcess;
+            while ((putResultProcess = pendingQueue.peek()) != null) {
+                try {
+                    switch (putResultProcess.getStatus()) {
+                        case SUCCESS:
+                            
ScheduleMessageService.this.updateOffset(this.delayLevel, 
putResultProcess.getNextOffset());
+                            pendingQueue.remove();
+                            break;
+                        case RUNNING:
+                            break;
+                        case EXCEPTION:
+                            if (!isStarted()) {
+                                log.warn("HandlePutResultTask shutdown, 
info={}", putResultProcess.toString());
+                                return;
+                            }
+                            log.warn("putResultProcess error, info={}", 
putResultProcess.toString());
+                            putResultProcess.onException();
+                            break;
+                        case SKIP:
+                            log.warn("putResultProcess skip, info={}", 
putResultProcess.toString());
+                            pendingQueue.remove();
+                            break;
+                    }
+                } catch (Exception e) {
+                    log.error("HandlePutResultTask exception. info={}", 
putResultProcess.toString(), e);
+                    putResultProcess.onException();
+                }
+            }
+
+            if (isStarted()) {
+                ScheduleMessageService.this.handleExecutorService
+                    .schedule(new HandlePutResultTask(this.delayLevel), 
DELAY_FOR_A_SLEEP, TimeUnit.MILLISECONDS);
+            }
+        }
+    }
+
+    public class PutResultProcess {
+        private String topic;
+        private long offset;
+        private long physicOffset;
+        private int physicSize;
+        private int delayLevel;
+        private String msgId;
+        private boolean autoResend = false;
+        private CompletableFuture<PutMessageResult> future;
+
+        private volatile int resendCount = 0;
+        private volatile ProcessStatus status = ProcessStatus.RUNNING;
+
+        public PutResultProcess setTopic(String topic) {
+            this.topic = topic;
+            return this;
+        }
+
+        public PutResultProcess setOffset(long offset) {
+            this.offset = offset;
+            return this;
+        }
+
+        public PutResultProcess setPhysicOffset(long physicOffset) {
+            this.physicOffset = physicOffset;
+            return this;
+        }
+
+        public PutResultProcess setPhysicSize(int physicSize) {
+            this.physicSize = physicSize;
+            return this;
+        }
+
+        public PutResultProcess setDelayLevel(int delayLevel) {
+            this.delayLevel = delayLevel;
+            return this;
+        }
+
+        public PutResultProcess setMsgId(String msgId) {
+            this.msgId = msgId;
+            return this;
+        }
+
+        public PutResultProcess setAutoResend(boolean autoResend) {
+            this.autoResend = autoResend;
+            return this;
+        }
+
+        public PutResultProcess setFuture(CompletableFuture<PutMessageResult> 
future) {
+            this.future = future;
+            return this;
+        }
+
+        public String getTopic() {
+            return topic;
+        }
+
+        public long getOffset() {
+            return offset;
+        }
+
+        public long getNextOffset() {
+            return offset + 1;
+        }
+
+        public long getPhysicOffset() {
+            return physicOffset;
+        }
+
+        public int getPhysicSize() {
+            return physicSize;
+        }
+
+        public Integer getDelayLevel() {
+            return delayLevel;
+        }
+
+        public String getMsgId() {
+            return msgId;
+        }
+
+        public boolean isAutoResend() {
+            return autoResend;
+        }
+
+        public CompletableFuture<PutMessageResult> getFuture() {
+            return future;
+        }
+
+        public int getResendCount() {
+            return resendCount;
+        }
+
+        public PutResultProcess thenProcess() {
+            this.future.thenAccept(result -> {
+                this.handleResult(result);
+            });
+
+            this.future.exceptionally(e -> {
+                log.error("ScheduleMessageService put message exceptionally, 
info: {}",
+                    PutResultProcess.this.toString(), e);
+
+                onException();
+                return null;
+            });
+            return this;
+        }
+
+        private void handleResult(PutMessageResult result) {
+            if (result != null && result.getPutMessageStatus() == 
PutMessageStatus.PUT_OK) {
+                onSuccess(result);
+            } else {
+                log.warn("ScheduleMessageService put message failed. info: 
{}.", result);
+                onException();
+            }
+        }
+
+        public void onSuccess(PutMessageResult result) {
+            this.status = ProcessStatus.SUCCESS;
+            if 
(ScheduleMessageService.this.defaultMessageStore.getMessageStoreConfig().isEnableScheduleMessageStats())
 {
+                
ScheduleMessageService.this.defaultMessageStore.getBrokerStatsManager().incQueueGetNums(MixAll.SCHEDULE_CONSUMER_GROUP,
 TopicValidator.RMQ_SYS_SCHEDULE_TOPIC, delayLevel - 1, 
result.getAppendMessageResult().getMsgNum());
+                
ScheduleMessageService.this.defaultMessageStore.getBrokerStatsManager().incQueueGetSize(MixAll.SCHEDULE_CONSUMER_GROUP,
 TopicValidator.RMQ_SYS_SCHEDULE_TOPIC, delayLevel - 1, 
result.getAppendMessageResult().getWroteBytes());
+                
ScheduleMessageService.this.defaultMessageStore.getBrokerStatsManager().incGroupGetNums(MixAll.SCHEDULE_CONSUMER_GROUP,
 TopicValidator.RMQ_SYS_SCHEDULE_TOPIC, 
result.getAppendMessageResult().getMsgNum());
+                
ScheduleMessageService.this.defaultMessageStore.getBrokerStatsManager().incGroupGetSize(MixAll.SCHEDULE_CONSUMER_GROUP,
 TopicValidator.RMQ_SYS_SCHEDULE_TOPIC, 
result.getAppendMessageResult().getWroteBytes());
+                
ScheduleMessageService.this.defaultMessageStore.getBrokerStatsManager().incTopicPutNums(this.topic,
 result.getAppendMessageResult().getMsgNum(), 1);
+                
ScheduleMessageService.this.defaultMessageStore.getBrokerStatsManager().incTopicPutSize(this.topic,
 result.getAppendMessageResult().getWroteBytes());
+                
ScheduleMessageService.this.defaultMessageStore.getBrokerStatsManager().incBrokerPutNums(result.getAppendMessageResult().getMsgNum());
+            }
+        }
+
+        public void onException() {
+            log.warn("ScheduleMessageService onException, info: {}", 
this.toString());
+            if (this.autoResend) {
+                this.resend();
+            } else {
+                this.status = ProcessStatus.SKIP;
+            }
+        }
+
+        public ProcessStatus getStatus() {
+            return this.status;
+        }
+
+        public PutMessageResult get() {
+            try {
+                return this.future.get();
+            } catch (InterruptedException | ExecutionException e) {
+                return new PutMessageResult(PutMessageStatus.UNKNOWN_ERROR, 
null);
+            }
+        }
+
+        private void resend() {
+            log.info("Resend message, info: {}", this.toString());
+
+            // Gradually increase the resend interval.
+            try {
+                Thread.sleep(Math.min(this.resendCount++ * 100, 60 * 1000));
+            } catch (InterruptedException e) {
+                e.printStackTrace();
+            }
+
+            try {
+                MessageExt msgExt = 
ScheduleMessageService.this.defaultMessageStore.lookMessageByOffset(this.physicOffset,
 this.physicSize);
+                if (msgExt == null) {
+                    log.warn("ScheduleMessageService resend not found message. 
info: {}", this.toString());
+                    this.status = need2Skip() ? ProcessStatus.SKIP : 
ProcessStatus.EXCEPTION;
+                    return;
+                }
+
+                MessageExtBrokerInner msgInner = 
ScheduleMessageService.this.messageTimeup(msgExt);
+                PutMessageResult result = 
ScheduleMessageService.this.writeMessageStore.putMessage(msgInner);
+                this.handleResult(result);
+                if (result != null && result.getPutMessageStatus() == 
PutMessageStatus.PUT_OK) {
+                    log.info("Resend message success, info: {}", 
this.toString());
+                }
+            } catch (Exception e) {
+                this.status = ProcessStatus.EXCEPTION;
+                log.error("Resend message error, info: {}", this.toString(), 
e);
+            }
+        }
+
+        public boolean need2Blocked() {
+            int maxResendNum2Blocked = 
ScheduleMessageService.this.defaultMessageStore.getMessageStoreConfig()
+                .getScheduleAsyncDeliverMaxResendNum2Blocked();
+            return this.resendCount > maxResendNum2Blocked;
+        }
+
+        public boolean need2Skip() {
+            int maxResendNum2Blocked = 
ScheduleMessageService.this.defaultMessageStore.getMessageStoreConfig()
+                .getScheduleAsyncDeliverMaxResendNum2Blocked();
+            return this.resendCount > maxResendNum2Blocked * 2;
+        }
+
+        @Override
+        public String toString() {
+            return "PutResultProcess{" +
+                "topic='" + topic + '\'' +
+                ", offset=" + offset +
+                ", physicOffset=" + physicOffset +
+                ", physicSize=" + physicSize +
+                ", delayLevel=" + delayLevel +
+                ", msgId='" + msgId + '\'' +
+                ", autoResend=" + autoResend +
+                ", resendCount=" + resendCount +
+                ", status=" + status +
+                '}';
+        }
+    }
+
+    public enum ProcessStatus {
+        /**
+         * In process, the processing result has not yet been returned.
+         * */
+        RUNNING,
+
+        /**
+         * Put message success.
+         * */
+        SUCCESS,
+
+        /**
+         * Put message exception.
+         * When autoResend is true, the message will be resend.
+         * */
+        EXCEPTION,
+
+        /**
+         * Skip put message.
+         * When the message cannot be looked, the message will be skipped.
+         * */
+        SKIP,
     }
 }
diff --git 
a/store/src/test/java/org/apache/rocketmq/store/ScheduleMessageServiceTest.java 
b/store/src/test/java/org/apache/rocketmq/store/ScheduleMessageServiceTest.java
index bfcebd7..1c0451c 100644
--- 
a/store/src/test/java/org/apache/rocketmq/store/ScheduleMessageServiceTest.java
+++ 
b/store/src/test/java/org/apache/rocketmq/store/ScheduleMessageServiceTest.java
@@ -18,18 +18,36 @@
 package org.apache.rocketmq.store;
 
 import java.lang.reflect.Field;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.List;
 import java.util.Map;
 import java.util.Random;
+import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.ConcurrentMap;
+import java.util.concurrent.LinkedBlockingQueue;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.ScheduledThreadPoolExecutor;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicBoolean;
 import org.apache.rocketmq.common.BrokerConfig;
+import org.apache.rocketmq.common.ThreadFactoryImpl;
+import org.apache.rocketmq.common.message.MessageConst;
+import org.apache.rocketmq.common.message.MessageExt;
 import org.apache.rocketmq.store.config.FlushDiskType;
 import org.apache.rocketmq.store.config.MessageStoreConfig;
 import org.apache.rocketmq.store.schedule.ScheduleMessageService;
 import org.apache.rocketmq.store.stats.BrokerStatsManager;
+import org.junit.Assert;
 import org.junit.Test;
 
 import static org.junit.Assert.assertEquals;
+import static org.mockito.ArgumentMatchers.any;
+import static org.mockito.ArgumentMatchers.anyInt;
+import static org.mockito.ArgumentMatchers.anyLong;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.when;
 
 public class ScheduleMessageServiceTest {
 
@@ -40,7 +58,9 @@ public class ScheduleMessageServiceTest {
 
         ConcurrentMap<Integer /* level */, Long/* offset */> offsetTable = 
null;
 
-        ScheduleMessageService scheduleMessageService = new 
ScheduleMessageService((DefaultMessageStore) buildMessageStore());
+        DefaultMessageStore defaultMessageStore = new 
DefaultMessageStore(buildMessageStoreConfig(),
+            new BrokerStatsManager("simpleTest", true), null, new 
BrokerConfig());
+        ScheduleMessageService scheduleMessageService = new 
ScheduleMessageService(defaultMessageStore);
         scheduleMessageService.parseDelayLevel();
 
         ConcurrentMap<Integer /* level */, Long/* offset */> offsetTable1 = 
new ConcurrentHashMap<>();
@@ -71,7 +91,7 @@ public class ScheduleMessageServiceTest {
 
     }
 
-    private MessageStore buildMessageStore() throws Exception {
+    private MessageStoreConfig buildMessageStoreConfig() throws Exception {
         MessageStoreConfig messageStoreConfig = new MessageStoreConfig();
         messageStoreConfig.setMappedFileSizeCommitLog(1024 * 1024 * 10);
         messageStoreConfig.setMappedFileSizeConsumeQueue(1024 * 1024 * 10);
@@ -79,6 +99,96 @@ public class ScheduleMessageServiceTest {
         messageStoreConfig.setMaxIndexNum(100 * 100);
         messageStoreConfig.setFlushDiskType(FlushDiskType.SYNC_FLUSH);
         messageStoreConfig.setFlushIntervalConsumeQueue(1);
-        return new DefaultMessageStore(messageStoreConfig, new 
BrokerStatsManager("simpleTest", true), null, new BrokerConfig());
+        return messageStoreConfig;
+    }
+
+    @Test
+    public void testHandlePutResultTask() throws Exception {
+        DefaultMessageStore messageStore = mock(DefaultMessageStore.class);
+        MessageStoreConfig config = buildMessageStoreConfig();
+        config.setEnableScheduleMessageStats(false);
+        config.setEnableScheduleAsyncDeliver(true);
+        when(messageStore.getMessageStoreConfig()).thenReturn(config);
+        ScheduleMessageService scheduleMessageService = new 
ScheduleMessageService(messageStore);
+        scheduleMessageService.parseDelayLevel();
+
+        Field field = 
scheduleMessageService.getClass().getDeclaredField("deliverPendingTable");
+        field.setAccessible(true);
+        Map<Integer /* level */, 
LinkedBlockingQueue<ScheduleMessageService.PutResultProcess>> 
deliverPendingTable =
+            (Map<Integer, 
LinkedBlockingQueue<ScheduleMessageService.PutResultProcess>>) 
field.get(scheduleMessageService);
+
+        field = 
scheduleMessageService.getClass().getDeclaredField("offsetTable");
+        field.setAccessible(true);
+        ConcurrentMap<Integer /* level */, Long/* offset */> offsetTable =
+            (ConcurrentMap<Integer /* level */, Long/* offset */>) 
field.get(scheduleMessageService);
+        for (int i = 1; i <= scheduleMessageService.getMaxDelayLevel(); i++) {
+            offsetTable.put(i, 0L);
+        }
+
+        int deliverThreadPoolNums = Runtime.getRuntime().availableProcessors();
+        ScheduledExecutorService handleExecutorService = new 
ScheduledThreadPoolExecutor(deliverThreadPoolNums,
+            new ThreadFactoryImpl("ScheduleMessageExecutorHandleThread_"));
+        field = 
scheduleMessageService.getClass().getDeclaredField("handleExecutorService");
+        field.setAccessible(true);
+        field.set(scheduleMessageService, handleExecutorService);
+
+        field = scheduleMessageService.getClass().getDeclaredField("started");
+        field.setAccessible(true);
+        AtomicBoolean started = (AtomicBoolean) 
field.get(scheduleMessageService);
+        started.set(true);
+
+        for (int level = 1; level <= 
scheduleMessageService.getMaxDelayLevel(); level++) {
+            ScheduleMessageService.HandlePutResultTask handlePutResultTask = 
scheduleMessageService.new HandlePutResultTask(level);
+            handleExecutorService.schedule(handlePutResultTask, 10L, 
TimeUnit.MILLISECONDS);
+        }
+
+        MessageExt messageExt = new MessageExt();
+        messageExt.putUserProperty("init", "test");
+        messageExt.getProperties().put(MessageConst.PROPERTY_REAL_QUEUE_ID, 
"0");
+        when(messageStore.lookMessageByOffset(anyLong(), 
anyInt())).thenReturn(messageExt);
+        when(messageStore.putMessage(any())).thenReturn(new 
PutMessageResult(PutMessageStatus.PUT_OK, null));
+
+        int msgNum = 100;
+        int totalMsgNum = msgNum * scheduleMessageService.getMaxDelayLevel();
+        List<CompletableFuture<PutMessageResult>> putMsgFutrueList = new 
ArrayList<>(totalMsgNum);
+        for (int level = 1; level <= 
scheduleMessageService.getMaxDelayLevel(); level++) {
+            for (int num = 0; num < msgNum; num++) {
+                CompletableFuture<PutMessageResult> future = new 
CompletableFuture<>();
+                ScheduleMessageService.PutResultProcess putResultProcess = 
scheduleMessageService.new PutResultProcess();
+                putResultProcess = putResultProcess
+                    .setOffset(num)
+                    .setAutoResend(true)
+                    .setFuture(future)
+                    .thenProcess();
+                deliverPendingTable.get(level).add(putResultProcess);
+                putMsgFutrueList.add(future);
+            }
+        }
+
+        Collections.shuffle(putMsgFutrueList);
+        Random random = new Random();
+        for (CompletableFuture<PutMessageResult> future : putMsgFutrueList) {
+            PutMessageStatus status;
+            if (random.nextInt(1000) % 2 == 0) {
+                status = PutMessageStatus.PUT_OK;
+            } else {
+                status = PutMessageStatus.OS_PAGECACHE_BUSY;
+            }
+
+            if (random.nextInt(1000) % 2 == 0) {
+                PutMessageResult result = new PutMessageResult(status, null);
+                future.complete(result);
+            } else {
+                future.completeExceptionally(new Throwable("complete 
exceptionally"));
+            }
+        }
+
+        Thread.sleep(1000);
+        for (int level = 1; level <= 
scheduleMessageService.getMaxDelayLevel(); level++) {
+            Assert.assertEquals(0, deliverPendingTable.get(level).size());
+            Assert.assertEquals(msgNum, offsetTable.get(level).longValue());
+        }
+
+        scheduleMessageService.shutdown();
     }
 }

Reply via email to