This is an automated email from the ASF dual-hosted git repository. zhangyang pushed a commit to branch develop in repository https://gitbox.apache.org/repos/asf/rocketmq.git
The following commit(s) were added to refs/heads/develop by this push: new 97198fa [ISSUE #3449] Delayed message supports asynchronous delivery (#3458) 97198fa is described below commit 97198fadf691a2f65c1f588b0466fa9ee7da1657 Author: zhangyang <zhangyan...@xiaomi.com> AuthorDate: Thu Jan 20 22:22:43 2022 +0800 [ISSUE #3449] Delayed message supports asynchronous delivery (#3458) * [schedule] Delayed message repeated delivery optimization Signed-off-by: zhangyang21 <zhangyan...@xiaomi.com> * [schedule] Support asynchronous delivery Signed-off-by: zhangyang <git_y...@163.com> * use maxDelayLevel as deliverThreadPoolNums Signed-off-by: zhangyang21 <zhangyan...@xiaomi.com> * [HAConnection] fix code style Signed-off-by: zhangyang21 <zhangyan...@xiaomi.com> --- .../rocketmq/store/config/MessageStoreConfig.java | 28 + .../org/apache/rocketmq/store/ha/HAConnection.java | 2 +- .../store/schedule/ScheduleMessageService.java | 621 ++++++++++++++++----- .../rocketmq/store/ScheduleMessageServiceTest.java | 116 +++- 4 files changed, 625 insertions(+), 142 deletions(-) diff --git a/store/src/main/java/org/apache/rocketmq/store/config/MessageStoreConfig.java b/store/src/main/java/org/apache/rocketmq/store/config/MessageStoreConfig.java index 1188f21..bb1e01f 100644 --- a/store/src/main/java/org/apache/rocketmq/store/config/MessageStoreConfig.java +++ b/store/src/main/java/org/apache/rocketmq/store/config/MessageStoreConfig.java @@ -163,6 +163,10 @@ public class MessageStoreConfig { private boolean enableMultiDispatch = false; private int maxLmqConsumeQueueNum = 20000; + private boolean enableScheduleAsyncDeliver = false; + private int scheduleAsyncDeliverMaxPendingLimit = 2000; + private int scheduleAsyncDeliverMaxResendNum2Blocked = 3; + public boolean isDebugLockEnable() { return debugLockEnable; } @@ -772,4 +776,28 @@ public class MessageStoreConfig { public void setMaxLmqConsumeQueueNum(int maxLmqConsumeQueueNum) { this.maxLmqConsumeQueueNum = maxLmqConsumeQueueNum; } + + public boolean isEnableScheduleAsyncDeliver() { + return enableScheduleAsyncDeliver; + } + + public void setEnableScheduleAsyncDeliver(boolean enableScheduleAsyncDeliver) { + this.enableScheduleAsyncDeliver = enableScheduleAsyncDeliver; + } + + public int getScheduleAsyncDeliverMaxPendingLimit() { + return scheduleAsyncDeliverMaxPendingLimit; + } + + public void setScheduleAsyncDeliverMaxPendingLimit(int scheduleAsyncDeliverMaxPendingLimit) { + this.scheduleAsyncDeliverMaxPendingLimit = scheduleAsyncDeliverMaxPendingLimit; + } + + public int getScheduleAsyncDeliverMaxResendNum2Blocked() { + return scheduleAsyncDeliverMaxResendNum2Blocked; + } + + public void setScheduleAsyncDeliverMaxResendNum2Blocked(int scheduleAsyncDeliverMaxResendNum2Blocked) { + this.scheduleAsyncDeliverMaxResendNum2Blocked = scheduleAsyncDeliverMaxResendNum2Blocked; + } } diff --git a/store/src/main/java/org/apache/rocketmq/store/ha/HAConnection.java b/store/src/main/java/org/apache/rocketmq/store/ha/HAConnection.java index dd68c73..c08c515 100644 --- a/store/src/main/java/org/apache/rocketmq/store/ha/HAConnection.java +++ b/store/src/main/java/org/apache/rocketmq/store/ha/HAConnection.java @@ -168,7 +168,7 @@ public class HAConnection { if (HAConnection.this.slaveRequestOffset < 0) { HAConnection.this.slaveRequestOffset = readOffset; log.info("slave[" + HAConnection.this.clientAddr + "] request offset " + readOffset); - } else if (HAConnection.this.slaveAckOffset > HAConnection.this.haService.getDefaultMessageStore().getMaxPhyOffset()){ + } else if (HAConnection.this.slaveAckOffset > HAConnection.this.haService.getDefaultMessageStore().getMaxPhyOffset()) { log.warn("slave[{}] request offset={} greater than local commitLog offset={}. ", HAConnection.this.clientAddr, HAConnection.this.slaveAckOffset, diff --git a/store/src/main/java/org/apache/rocketmq/store/schedule/ScheduleMessageService.java b/store/src/main/java/org/apache/rocketmq/store/schedule/ScheduleMessageService.java index c45287f..d5b4e8d 100644 --- a/store/src/main/java/org/apache/rocketmq/store/schedule/ScheduleMessageService.java +++ b/store/src/main/java/org/apache/rocketmq/store/schedule/ScheduleMessageService.java @@ -19,8 +19,12 @@ package org.apache.rocketmq.store.schedule; import java.util.HashMap; import java.util.Iterator; import java.util.Map; +import java.util.Queue; +import java.util.concurrent.CompletableFuture; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ConcurrentMap; +import java.util.concurrent.ExecutionException; +import java.util.concurrent.LinkedBlockingQueue; import java.util.concurrent.ScheduledExecutorService; import java.util.concurrent.ScheduledThreadPoolExecutor; import java.util.concurrent.TimeUnit; @@ -54,6 +58,8 @@ public class ScheduleMessageService extends ConfigManager { private static final long FIRST_DELAY_TIME = 1000L; private static final long DELAY_FOR_A_WHILE = 100L; private static final long DELAY_FOR_A_PERIOD = 10000L; + private static final long WAIT_FOR_SHUTDOWN = 5000L; + private static final long DELAY_FOR_A_SLEEP = 10L; private final ConcurrentMap<Integer /* level */, Long/* delay timeMillis */> delayLevelTable = new ConcurrentHashMap<Integer, Long>(32); @@ -63,13 +69,19 @@ public class ScheduleMessageService extends ConfigManager { private final DefaultMessageStore defaultMessageStore; private final AtomicBoolean started = new AtomicBoolean(false); private ScheduledExecutorService deliverExecutorService; - private int deliverThreadPoolNums = Runtime.getRuntime().availableProcessors(); private MessageStore writeMessageStore; private int maxDelayLevel; + private boolean enableAsyncDeliver = false; + private ScheduledExecutorService handleExecutorService; + private final Map<Integer /* level */, LinkedBlockingQueue<PutResultProcess>> deliverPendingTable = + new ConcurrentHashMap<>(32); public ScheduleMessageService(final DefaultMessageStore defaultMessageStore) { this.defaultMessageStore = defaultMessageStore; this.writeMessageStore = defaultMessageStore; + if (defaultMessageStore != null) { + this.enableAsyncDeliver = defaultMessageStore.getMessageStoreConfig().isEnableScheduleAsyncDeliver(); + } } public static int queueId2DelayLevel(final int queueId) { @@ -116,7 +128,10 @@ public class ScheduleMessageService extends ConfigManager { public void start() { if (started.compareAndSet(false, true)) { super.load(); - this.deliverExecutorService = new ScheduledThreadPoolExecutor(deliverThreadPoolNums, new ThreadFactoryImpl("ScheduleMessageTimerThread_")); + this.deliverExecutorService = new ScheduledThreadPoolExecutor(this.maxDelayLevel, new ThreadFactoryImpl("ScheduleMessageTimerThread_")); + if (this.enableAsyncDeliver) { + this.handleExecutorService = new ScheduledThreadPoolExecutor(this.maxDelayLevel, new ThreadFactoryImpl("ScheduleMessageExecutorHandleThread_")); + } for (Map.Entry<Integer, Long> entry : this.delayLevelTable.entrySet()) { Integer level = entry.getKey(); Long timeDelay = entry.getValue(); @@ -126,6 +141,9 @@ public class ScheduleMessageService extends ConfigManager { } if (timeDelay != null) { + if (this.enableAsyncDeliver) { + this.handleExecutorService.schedule(new HandlePutResultTask(level), FIRST_DELAY_TIME, TimeUnit.MILLISECONDS); + } this.deliverExecutorService.schedule(new DeliverDelayedMessageTimerTask(level, offset), FIRST_DELAY_TIME, TimeUnit.MILLISECONDS); } } @@ -148,7 +166,29 @@ public class ScheduleMessageService extends ConfigManager { public void shutdown() { if (this.started.compareAndSet(true, false) && null != this.deliverExecutorService) { - this.deliverExecutorService.shutdownNow(); + this.deliverExecutorService.shutdown(); + try { + this.deliverExecutorService.awaitTermination(WAIT_FOR_SHUTDOWN, TimeUnit.MILLISECONDS); + } catch (InterruptedException e) { + log.error("deliverExecutorService awaitTermination error", e); + } + + if (this.handleExecutorService != null) { + this.handleExecutorService.shutdown(); + try { + this.handleExecutorService.awaitTermination(WAIT_FOR_SHUTDOWN, TimeUnit.MILLISECONDS); + } catch (InterruptedException e) { + log.error("handleExecutorService awaitTermination error", e); + } + } + + if (this.deliverPendingTable != null) { + for (int i = 1; i <= this.deliverPendingTable.size(); i++) { + log.warn("deliverPendingTable level: {}, size: {}", i, this.deliverPendingTable.get(i).size()); + } + } + + this.persist(); } } @@ -255,6 +295,9 @@ public class ScheduleMessageService extends ConfigManager { long num = Long.parseLong(value.substring(0, value.length() - 1)); long delayTimeMillis = tu * num; this.delayLevelTable.put(level, delayTimeMillis); + if (this.enableAsyncDeliver) { + this.deliverPendingTable.put(level, new LinkedBlockingQueue<>()); + } } } catch (Exception e) { log.error("parseDelayLevel exception", e); @@ -265,6 +308,36 @@ public class ScheduleMessageService extends ConfigManager { return true; } + private MessageExtBrokerInner messageTimeup(MessageExt msgExt) { + MessageExtBrokerInner msgInner = new MessageExtBrokerInner(); + msgInner.setBody(msgExt.getBody()); + msgInner.setFlag(msgExt.getFlag()); + MessageAccessor.setProperties(msgInner, msgExt.getProperties()); + + TopicFilterType topicFilterType = MessageExt.parseTopicFilterType(msgInner.getSysFlag()); + long tagsCodeValue = + MessageExtBrokerInner.tagsString2tagsCode(topicFilterType, msgInner.getTags()); + msgInner.setTagsCode(tagsCodeValue); + msgInner.setPropertiesString(MessageDecoder.messageProperties2String(msgExt.getProperties())); + + msgInner.setSysFlag(msgExt.getSysFlag()); + msgInner.setBornTimestamp(msgExt.getBornTimestamp()); + msgInner.setBornHost(msgExt.getBornHost()); + msgInner.setStoreHost(msgExt.getStoreHost()); + msgInner.setReconsumeTimes(msgExt.getReconsumeTimes()); + + msgInner.setWaitStoreMsgOK(false); + MessageAccessor.clearProperty(msgInner, MessageConst.PROPERTY_DELAY_TIME_LEVEL); + + msgInner.setTopic(msgInner.getProperty(MessageConst.PROPERTY_REAL_TOPIC)); + + String queueIdStr = msgInner.getProperty(MessageConst.PROPERTY_REAL_QUEUE_ID); + int queueId = Integer.parseInt(queueIdStr); + msgInner.setQueueId(queueId); + + return msgInner; + } + class DeliverDelayedMessageTimerTask implements Runnable { private final int delayLevel; private final long offset; @@ -283,8 +356,7 @@ public class ScheduleMessageService extends ConfigManager { } catch (Exception e) { // XXX: warn and notify me log.error("ScheduleMessageService, executeOnTimeup exception", e); - ScheduleMessageService.this.deliverExecutorService.schedule(new DeliverDelayedMessageTimerTask( - this.delayLevel, this.offset), DELAY_FOR_A_PERIOD, TimeUnit.MILLISECONDS); + this.scheduleNextTimerTask(this.offset, DELAY_FOR_A_PERIOD); } } @@ -308,158 +380,431 @@ public class ScheduleMessageService extends ConfigManager { ScheduleMessageService.this.defaultMessageStore.findConsumeQueue(TopicValidator.RMQ_SYS_SCHEDULE_TOPIC, delayLevel2QueueId(delayLevel)); - long failScheduleOffset = offset; + if (cq == null) { + this.scheduleNextTimerTask(this.offset, DELAY_FOR_A_WHILE); + return; + } - if (cq != null) { - SelectMappedBufferResult bufferCQ = cq.getIndexBuffer(this.offset); - if (bufferCQ != null) { - try { - long nextOffset = offset; - int i = 0; - ConsumeQueueExt.CqExtUnit cqExtUnit = new ConsumeQueueExt.CqExtUnit(); - for (; i < bufferCQ.getSize(); i += ConsumeQueue.CQ_STORE_UNIT_SIZE) { - long offsetPy = bufferCQ.getByteBuffer().getLong(); - int sizePy = bufferCQ.getByteBuffer().getInt(); - long tagsCode = bufferCQ.getByteBuffer().getLong(); - - if (cq.isExtAddr(tagsCode)) { - if (cq.getExt(tagsCode, cqExtUnit)) { - tagsCode = cqExtUnit.getTagsCode(); - } else { - //can't find ext content.So re compute tags code. - log.error("[BUG] can't find consume queue extend file content!addr={}, offsetPy={}, sizePy={}", - tagsCode, offsetPy, sizePy); - long msgStoreTime = defaultMessageStore.getCommitLog().pickupStoreTimestamp(offsetPy, sizePy); - tagsCode = computeDeliverTimestamp(delayLevel, msgStoreTime); - } - } + SelectMappedBufferResult bufferCQ = cq.getIndexBuffer(this.offset); + if (bufferCQ == null) { + long resetOffset; + if ((resetOffset = cq.getMinOffsetInQueue()) > this.offset) { + log.error("schedule CQ offset invalid. offset={}, cqMinOffset={}, queueId={}", + this.offset, resetOffset, cq.getQueueId()); + } else if ((resetOffset = cq.getMaxOffsetInQueue()) < this.offset) { + log.error("schedule CQ offset invalid. offset={}, cqMaxOffset={}, queueId={}", + this.offset, resetOffset, cq.getQueueId()); + } else { + resetOffset = this.offset; + } - long now = System.currentTimeMillis(); - long deliverTimestamp = this.correctDeliverTimestamp(now, tagsCode); - - nextOffset = offset + (i / ConsumeQueue.CQ_STORE_UNIT_SIZE); - - long countdown = deliverTimestamp - now; - - if (countdown <= 0) { - MessageExt msgExt = - ScheduleMessageService.this.defaultMessageStore.lookMessageByOffset( - offsetPy, sizePy); - - if (msgExt != null) { - try { - MessageExtBrokerInner msgInner = this.messageTimeup(msgExt); - if (TopicValidator.RMQ_SYS_TRANS_HALF_TOPIC.equals(msgInner.getTopic())) { - log.error("[BUG] the real topic of schedule msg is {}, discard the msg. msg={}", - msgInner.getTopic(), msgInner); - continue; - } - PutMessageResult putMessageResult = - ScheduleMessageService.this.writeMessageStore - .putMessage(msgInner); - - if (putMessageResult != null - && putMessageResult.getPutMessageStatus() == PutMessageStatus.PUT_OK) { - if (ScheduleMessageService.this.defaultMessageStore.getMessageStoreConfig().isEnableScheduleMessageStats()) { - ScheduleMessageService.this.defaultMessageStore.getBrokerStatsManager().incQueueGetNums(MixAll.SCHEDULE_CONSUMER_GROUP, TopicValidator.RMQ_SYS_SCHEDULE_TOPIC, delayLevel - 1, putMessageResult.getAppendMessageResult().getMsgNum()); - ScheduleMessageService.this.defaultMessageStore.getBrokerStatsManager().incQueueGetSize(MixAll.SCHEDULE_CONSUMER_GROUP, TopicValidator.RMQ_SYS_SCHEDULE_TOPIC, delayLevel - 1, putMessageResult.getAppendMessageResult().getWroteBytes()); - ScheduleMessageService.this.defaultMessageStore.getBrokerStatsManager().incGroupGetNums(MixAll.SCHEDULE_CONSUMER_GROUP, TopicValidator.RMQ_SYS_SCHEDULE_TOPIC, putMessageResult.getAppendMessageResult().getMsgNum()); - ScheduleMessageService.this.defaultMessageStore.getBrokerStatsManager().incGroupGetSize(MixAll.SCHEDULE_CONSUMER_GROUP, TopicValidator.RMQ_SYS_SCHEDULE_TOPIC, putMessageResult.getAppendMessageResult().getWroteBytes()); - ScheduleMessageService.this.defaultMessageStore.getBrokerStatsManager().incTopicPutNums(msgInner.getTopic(), putMessageResult.getAppendMessageResult().getMsgNum(), 1); - ScheduleMessageService.this.defaultMessageStore.getBrokerStatsManager().incTopicPutSize(msgInner.getTopic(), - putMessageResult.getAppendMessageResult().getWroteBytes()); - ScheduleMessageService.this.defaultMessageStore.getBrokerStatsManager().incBrokerPutNums(putMessageResult.getAppendMessageResult().getMsgNum()); - } - continue; - } else { - // XXX: warn and notify me - log.error( - "ScheduleMessageService, a message time up, but reput it failed, topic: {} msgId {}", - msgExt.getTopic(), msgExt.getMsgId()); - ScheduleMessageService.this.deliverExecutorService.schedule( - new DeliverDelayedMessageTimerTask(this.delayLevel, - nextOffset), DELAY_FOR_A_PERIOD, TimeUnit.MILLISECONDS); - ScheduleMessageService.this.updateOffset(this.delayLevel, - nextOffset); - return; - } - } catch (Exception e) { - /* - * XXX: warn and notify me - */ - log.error( - "ScheduleMessageService, messageTimeup execute error, drop it. msgExt={}, nextOffset={}, offsetPy={}, sizePy={}", msgExt, nextOffset, offsetPy, sizePy, e); - } - } - } else { - ScheduleMessageService.this.deliverExecutorService.schedule( - new DeliverDelayedMessageTimerTask(this.delayLevel, nextOffset), - countdown, TimeUnit.MILLISECONDS); - ScheduleMessageService.this.updateOffset(this.delayLevel, nextOffset); - return; - } - } // end of for + this.scheduleNextTimerTask(resetOffset, DELAY_FOR_A_WHILE); + return; + } - nextOffset = offset + (i / ConsumeQueue.CQ_STORE_UNIT_SIZE); - ScheduleMessageService.this.deliverExecutorService.schedule(new DeliverDelayedMessageTimerTask( - this.delayLevel, nextOffset), DELAY_FOR_A_WHILE, TimeUnit.MILLISECONDS); - ScheduleMessageService.this.updateOffset(this.delayLevel, nextOffset); + long nextOffset = this.offset; + try { + int i = 0; + ConsumeQueueExt.CqExtUnit cqExtUnit = new ConsumeQueueExt.CqExtUnit(); + for (; i < bufferCQ.getSize() && isStarted(); i += ConsumeQueue.CQ_STORE_UNIT_SIZE) { + long offsetPy = bufferCQ.getByteBuffer().getLong(); + int sizePy = bufferCQ.getByteBuffer().getInt(); + long tagsCode = bufferCQ.getByteBuffer().getLong(); + + if (cq.isExtAddr(tagsCode)) { + if (cq.getExt(tagsCode, cqExtUnit)) { + tagsCode = cqExtUnit.getTagsCode(); + } else { + //can't find ext content.So re compute tags code. + log.error("[BUG] can't find consume queue extend file content!addr={}, offsetPy={}, sizePy={}", + tagsCode, offsetPy, sizePy); + long msgStoreTime = defaultMessageStore.getCommitLog().pickupStoreTimestamp(offsetPy, sizePy); + tagsCode = computeDeliverTimestamp(delayLevel, msgStoreTime); + } + } + + long now = System.currentTimeMillis(); + long deliverTimestamp = this.correctDeliverTimestamp(now, tagsCode); + nextOffset = offset + (i / ConsumeQueue.CQ_STORE_UNIT_SIZE); + + long countdown = deliverTimestamp - now; + if (countdown > 0) { + this.scheduleNextTimerTask(nextOffset, DELAY_FOR_A_WHILE); return; - } finally { + } - bufferCQ.release(); + MessageExt msgExt = ScheduleMessageService.this.defaultMessageStore.lookMessageByOffset(offsetPy, sizePy); + if (msgExt == null) { + continue; } - } // end of if (bufferCQ != null) - else { - - long cqMinOffset = cq.getMinOffsetInQueue(); - long cqMaxOffset = cq.getMaxOffsetInQueue(); - if (offset < cqMinOffset) { - failScheduleOffset = cqMinOffset; - log.error("schedule CQ offset invalid. offset={}, cqMinOffset={}, cqMaxOffset={}, queueId={}", - offset, cqMinOffset, cqMaxOffset, cq.getQueueId()); + + MessageExtBrokerInner msgInner = ScheduleMessageService.this.messageTimeup(msgExt); + if (TopicValidator.RMQ_SYS_TRANS_HALF_TOPIC.equals(msgInner.getTopic())) { + log.error("[BUG] the real topic of schedule msg is {}, discard the msg. msg={}", + msgInner.getTopic(), msgInner); + continue; + } + + boolean deliverSuc; + if (ScheduleMessageService.this.enableAsyncDeliver) { + deliverSuc = this.asyncDeliver(msgInner, msgExt.getMsgId(), offset, offsetPy, sizePy); + } else { + deliverSuc = this.syncDeliver(msgInner, msgExt.getMsgId(), offset, offsetPy, sizePy); } - if (offset > cqMaxOffset) { - failScheduleOffset = cqMaxOffset; - log.error("schedule CQ offset invalid. offset={}, cqMinOffset={}, cqMaxOffset={}, queueId={}", - offset, cqMinOffset, cqMaxOffset, cq.getQueueId()); + if (!deliverSuc) { + this.scheduleNextTimerTask(nextOffset, DELAY_FOR_A_WHILE); + return; } } - } // end of if (cq != null) - ScheduleMessageService.this.deliverExecutorService.schedule(new DeliverDelayedMessageTimerTask(this.delayLevel, - failScheduleOffset), DELAY_FOR_A_WHILE, TimeUnit.MILLISECONDS); + nextOffset = this.offset + (i / ConsumeQueue.CQ_STORE_UNIT_SIZE); + } catch (Exception e) { + log.error("ScheduleMessageService, messageTimeup execute error, offset = {}", nextOffset, e); + } finally { + bufferCQ.release(); + } + + this.scheduleNextTimerTask(nextOffset, DELAY_FOR_A_WHILE); } - private MessageExtBrokerInner messageTimeup(MessageExt msgExt) { - MessageExtBrokerInner msgInner = new MessageExtBrokerInner(); - msgInner.setBody(msgExt.getBody()); - msgInner.setFlag(msgExt.getFlag()); - MessageAccessor.setProperties(msgInner, msgExt.getProperties()); + public void scheduleNextTimerTask(long offset, long delay) { + ScheduleMessageService.this.deliverExecutorService.schedule(new DeliverDelayedMessageTimerTask( + this.delayLevel, offset), delay, TimeUnit.MILLISECONDS); + } - TopicFilterType topicFilterType = MessageExt.parseTopicFilterType(msgInner.getSysFlag()); - long tagsCodeValue = - MessageExtBrokerInner.tagsString2tagsCode(topicFilterType, msgInner.getTags()); - msgInner.setTagsCode(tagsCodeValue); - msgInner.setPropertiesString(MessageDecoder.messageProperties2String(msgExt.getProperties())); + private boolean syncDeliver(MessageExtBrokerInner msgInner, String msgId, long offset, long offsetPy, + int sizePy) { + PutResultProcess resultProcess = deliverMessage(msgInner, msgId, offset, offsetPy, sizePy, false); + PutMessageResult result = resultProcess.get(); + boolean sendStatus = result != null && result.getPutMessageStatus() == PutMessageStatus.PUT_OK; + if (sendStatus) { + ScheduleMessageService.this.updateOffset(this.delayLevel, resultProcess.getNextOffset()); + } + return sendStatus; + } - msgInner.setSysFlag(msgExt.getSysFlag()); - msgInner.setBornTimestamp(msgExt.getBornTimestamp()); - msgInner.setBornHost(msgExt.getBornHost()); - msgInner.setStoreHost(msgExt.getStoreHost()); - msgInner.setReconsumeTimes(msgExt.getReconsumeTimes()); + private boolean asyncDeliver(MessageExtBrokerInner msgInner, String msgId, long offset, long offsetPy, + int sizePy) { + Queue<PutResultProcess> processesQueue = ScheduleMessageService.this.deliverPendingTable.get(this.delayLevel); + + //Flow Control + int currentPendingNum = processesQueue.size(); + int maxPendingLimit = ScheduleMessageService.this.defaultMessageStore.getMessageStoreConfig() + .getScheduleAsyncDeliverMaxPendingLimit(); + if (currentPendingNum > maxPendingLimit) { + log.warn("Asynchronous deliver triggers flow control, " + + "currentPendingNum={}, maxPendingLimit={}", currentPendingNum, maxPendingLimit); + return false; + } - msgInner.setWaitStoreMsgOK(false); - MessageAccessor.clearProperty(msgInner, MessageConst.PROPERTY_DELAY_TIME_LEVEL); + //Blocked + PutResultProcess firstProcess = processesQueue.peek(); + if (firstProcess != null && firstProcess.need2Blocked()) { + log.warn("Asynchronous deliver block. info={}", firstProcess.toString()); + return false; + } - msgInner.setTopic(msgInner.getProperty(MessageConst.PROPERTY_REAL_TOPIC)); + PutResultProcess resultProcess = deliverMessage(msgInner, msgId, offset, offsetPy, sizePy, true); + processesQueue.add(resultProcess); + return true; + } - String queueIdStr = msgInner.getProperty(MessageConst.PROPERTY_REAL_QUEUE_ID); - int queueId = Integer.parseInt(queueIdStr); - msgInner.setQueueId(queueId); + private PutResultProcess deliverMessage(MessageExtBrokerInner msgInner, String msgId, long offset, + long offsetPy, int sizePy, boolean autoResend) { + CompletableFuture<PutMessageResult> future = + ScheduleMessageService.this.writeMessageStore.asyncPutMessage(msgInner); + return new PutResultProcess() + .setTopic(msgInner.getTopic()) + .setDelayLevel(this.delayLevel) + .setOffset(offset) + .setPhysicOffset(offsetPy) + .setPhysicSize(sizePy) + .setMsgId(msgId) + .setAutoResend(autoResend) + .setFuture(future) + .thenProcess(); + } + } + + public class HandlePutResultTask implements Runnable { + private final int delayLevel; - return msgInner; + public HandlePutResultTask(int delayLevel) { + this.delayLevel = delayLevel; } + + @Override + public void run() { + LinkedBlockingQueue<PutResultProcess> pendingQueue = + ScheduleMessageService.this.deliverPendingTable.get(this.delayLevel); + + PutResultProcess putResultProcess; + while ((putResultProcess = pendingQueue.peek()) != null) { + try { + switch (putResultProcess.getStatus()) { + case SUCCESS: + ScheduleMessageService.this.updateOffset(this.delayLevel, putResultProcess.getNextOffset()); + pendingQueue.remove(); + break; + case RUNNING: + break; + case EXCEPTION: + if (!isStarted()) { + log.warn("HandlePutResultTask shutdown, info={}", putResultProcess.toString()); + return; + } + log.warn("putResultProcess error, info={}", putResultProcess.toString()); + putResultProcess.onException(); + break; + case SKIP: + log.warn("putResultProcess skip, info={}", putResultProcess.toString()); + pendingQueue.remove(); + break; + } + } catch (Exception e) { + log.error("HandlePutResultTask exception. info={}", putResultProcess.toString(), e); + putResultProcess.onException(); + } + } + + if (isStarted()) { + ScheduleMessageService.this.handleExecutorService + .schedule(new HandlePutResultTask(this.delayLevel), DELAY_FOR_A_SLEEP, TimeUnit.MILLISECONDS); + } + } + } + + public class PutResultProcess { + private String topic; + private long offset; + private long physicOffset; + private int physicSize; + private int delayLevel; + private String msgId; + private boolean autoResend = false; + private CompletableFuture<PutMessageResult> future; + + private volatile int resendCount = 0; + private volatile ProcessStatus status = ProcessStatus.RUNNING; + + public PutResultProcess setTopic(String topic) { + this.topic = topic; + return this; + } + + public PutResultProcess setOffset(long offset) { + this.offset = offset; + return this; + } + + public PutResultProcess setPhysicOffset(long physicOffset) { + this.physicOffset = physicOffset; + return this; + } + + public PutResultProcess setPhysicSize(int physicSize) { + this.physicSize = physicSize; + return this; + } + + public PutResultProcess setDelayLevel(int delayLevel) { + this.delayLevel = delayLevel; + return this; + } + + public PutResultProcess setMsgId(String msgId) { + this.msgId = msgId; + return this; + } + + public PutResultProcess setAutoResend(boolean autoResend) { + this.autoResend = autoResend; + return this; + } + + public PutResultProcess setFuture(CompletableFuture<PutMessageResult> future) { + this.future = future; + return this; + } + + public String getTopic() { + return topic; + } + + public long getOffset() { + return offset; + } + + public long getNextOffset() { + return offset + 1; + } + + public long getPhysicOffset() { + return physicOffset; + } + + public int getPhysicSize() { + return physicSize; + } + + public Integer getDelayLevel() { + return delayLevel; + } + + public String getMsgId() { + return msgId; + } + + public boolean isAutoResend() { + return autoResend; + } + + public CompletableFuture<PutMessageResult> getFuture() { + return future; + } + + public int getResendCount() { + return resendCount; + } + + public PutResultProcess thenProcess() { + this.future.thenAccept(result -> { + this.handleResult(result); + }); + + this.future.exceptionally(e -> { + log.error("ScheduleMessageService put message exceptionally, info: {}", + PutResultProcess.this.toString(), e); + + onException(); + return null; + }); + return this; + } + + private void handleResult(PutMessageResult result) { + if (result != null && result.getPutMessageStatus() == PutMessageStatus.PUT_OK) { + onSuccess(result); + } else { + log.warn("ScheduleMessageService put message failed. info: {}.", result); + onException(); + } + } + + public void onSuccess(PutMessageResult result) { + this.status = ProcessStatus.SUCCESS; + if (ScheduleMessageService.this.defaultMessageStore.getMessageStoreConfig().isEnableScheduleMessageStats()) { + ScheduleMessageService.this.defaultMessageStore.getBrokerStatsManager().incQueueGetNums(MixAll.SCHEDULE_CONSUMER_GROUP, TopicValidator.RMQ_SYS_SCHEDULE_TOPIC, delayLevel - 1, result.getAppendMessageResult().getMsgNum()); + ScheduleMessageService.this.defaultMessageStore.getBrokerStatsManager().incQueueGetSize(MixAll.SCHEDULE_CONSUMER_GROUP, TopicValidator.RMQ_SYS_SCHEDULE_TOPIC, delayLevel - 1, result.getAppendMessageResult().getWroteBytes()); + ScheduleMessageService.this.defaultMessageStore.getBrokerStatsManager().incGroupGetNums(MixAll.SCHEDULE_CONSUMER_GROUP, TopicValidator.RMQ_SYS_SCHEDULE_TOPIC, result.getAppendMessageResult().getMsgNum()); + ScheduleMessageService.this.defaultMessageStore.getBrokerStatsManager().incGroupGetSize(MixAll.SCHEDULE_CONSUMER_GROUP, TopicValidator.RMQ_SYS_SCHEDULE_TOPIC, result.getAppendMessageResult().getWroteBytes()); + ScheduleMessageService.this.defaultMessageStore.getBrokerStatsManager().incTopicPutNums(this.topic, result.getAppendMessageResult().getMsgNum(), 1); + ScheduleMessageService.this.defaultMessageStore.getBrokerStatsManager().incTopicPutSize(this.topic, result.getAppendMessageResult().getWroteBytes()); + ScheduleMessageService.this.defaultMessageStore.getBrokerStatsManager().incBrokerPutNums(result.getAppendMessageResult().getMsgNum()); + } + } + + public void onException() { + log.warn("ScheduleMessageService onException, info: {}", this.toString()); + if (this.autoResend) { + this.resend(); + } else { + this.status = ProcessStatus.SKIP; + } + } + + public ProcessStatus getStatus() { + return this.status; + } + + public PutMessageResult get() { + try { + return this.future.get(); + } catch (InterruptedException | ExecutionException e) { + return new PutMessageResult(PutMessageStatus.UNKNOWN_ERROR, null); + } + } + + private void resend() { + log.info("Resend message, info: {}", this.toString()); + + // Gradually increase the resend interval. + try { + Thread.sleep(Math.min(this.resendCount++ * 100, 60 * 1000)); + } catch (InterruptedException e) { + e.printStackTrace(); + } + + try { + MessageExt msgExt = ScheduleMessageService.this.defaultMessageStore.lookMessageByOffset(this.physicOffset, this.physicSize); + if (msgExt == null) { + log.warn("ScheduleMessageService resend not found message. info: {}", this.toString()); + this.status = need2Skip() ? ProcessStatus.SKIP : ProcessStatus.EXCEPTION; + return; + } + + MessageExtBrokerInner msgInner = ScheduleMessageService.this.messageTimeup(msgExt); + PutMessageResult result = ScheduleMessageService.this.writeMessageStore.putMessage(msgInner); + this.handleResult(result); + if (result != null && result.getPutMessageStatus() == PutMessageStatus.PUT_OK) { + log.info("Resend message success, info: {}", this.toString()); + } + } catch (Exception e) { + this.status = ProcessStatus.EXCEPTION; + log.error("Resend message error, info: {}", this.toString(), e); + } + } + + public boolean need2Blocked() { + int maxResendNum2Blocked = ScheduleMessageService.this.defaultMessageStore.getMessageStoreConfig() + .getScheduleAsyncDeliverMaxResendNum2Blocked(); + return this.resendCount > maxResendNum2Blocked; + } + + public boolean need2Skip() { + int maxResendNum2Blocked = ScheduleMessageService.this.defaultMessageStore.getMessageStoreConfig() + .getScheduleAsyncDeliverMaxResendNum2Blocked(); + return this.resendCount > maxResendNum2Blocked * 2; + } + + @Override + public String toString() { + return "PutResultProcess{" + + "topic='" + topic + '\'' + + ", offset=" + offset + + ", physicOffset=" + physicOffset + + ", physicSize=" + physicSize + + ", delayLevel=" + delayLevel + + ", msgId='" + msgId + '\'' + + ", autoResend=" + autoResend + + ", resendCount=" + resendCount + + ", status=" + status + + '}'; + } + } + + public enum ProcessStatus { + /** + * In process, the processing result has not yet been returned. + * */ + RUNNING, + + /** + * Put message success. + * */ + SUCCESS, + + /** + * Put message exception. + * When autoResend is true, the message will be resend. + * */ + EXCEPTION, + + /** + * Skip put message. + * When the message cannot be looked, the message will be skipped. + * */ + SKIP, } } diff --git a/store/src/test/java/org/apache/rocketmq/store/ScheduleMessageServiceTest.java b/store/src/test/java/org/apache/rocketmq/store/ScheduleMessageServiceTest.java index bfcebd7..1c0451c 100644 --- a/store/src/test/java/org/apache/rocketmq/store/ScheduleMessageServiceTest.java +++ b/store/src/test/java/org/apache/rocketmq/store/ScheduleMessageServiceTest.java @@ -18,18 +18,36 @@ package org.apache.rocketmq.store; import java.lang.reflect.Field; +import java.util.ArrayList; +import java.util.Collections; +import java.util.List; import java.util.Map; import java.util.Random; +import java.util.concurrent.CompletableFuture; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ConcurrentMap; +import java.util.concurrent.LinkedBlockingQueue; +import java.util.concurrent.ScheduledExecutorService; +import java.util.concurrent.ScheduledThreadPoolExecutor; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicBoolean; import org.apache.rocketmq.common.BrokerConfig; +import org.apache.rocketmq.common.ThreadFactoryImpl; +import org.apache.rocketmq.common.message.MessageConst; +import org.apache.rocketmq.common.message.MessageExt; import org.apache.rocketmq.store.config.FlushDiskType; import org.apache.rocketmq.store.config.MessageStoreConfig; import org.apache.rocketmq.store.schedule.ScheduleMessageService; import org.apache.rocketmq.store.stats.BrokerStatsManager; +import org.junit.Assert; import org.junit.Test; import static org.junit.Assert.assertEquals; +import static org.mockito.ArgumentMatchers.any; +import static org.mockito.ArgumentMatchers.anyInt; +import static org.mockito.ArgumentMatchers.anyLong; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.when; public class ScheduleMessageServiceTest { @@ -40,7 +58,9 @@ public class ScheduleMessageServiceTest { ConcurrentMap<Integer /* level */, Long/* offset */> offsetTable = null; - ScheduleMessageService scheduleMessageService = new ScheduleMessageService((DefaultMessageStore) buildMessageStore()); + DefaultMessageStore defaultMessageStore = new DefaultMessageStore(buildMessageStoreConfig(), + new BrokerStatsManager("simpleTest", true), null, new BrokerConfig()); + ScheduleMessageService scheduleMessageService = new ScheduleMessageService(defaultMessageStore); scheduleMessageService.parseDelayLevel(); ConcurrentMap<Integer /* level */, Long/* offset */> offsetTable1 = new ConcurrentHashMap<>(); @@ -71,7 +91,7 @@ public class ScheduleMessageServiceTest { } - private MessageStore buildMessageStore() throws Exception { + private MessageStoreConfig buildMessageStoreConfig() throws Exception { MessageStoreConfig messageStoreConfig = new MessageStoreConfig(); messageStoreConfig.setMappedFileSizeCommitLog(1024 * 1024 * 10); messageStoreConfig.setMappedFileSizeConsumeQueue(1024 * 1024 * 10); @@ -79,6 +99,96 @@ public class ScheduleMessageServiceTest { messageStoreConfig.setMaxIndexNum(100 * 100); messageStoreConfig.setFlushDiskType(FlushDiskType.SYNC_FLUSH); messageStoreConfig.setFlushIntervalConsumeQueue(1); - return new DefaultMessageStore(messageStoreConfig, new BrokerStatsManager("simpleTest", true), null, new BrokerConfig()); + return messageStoreConfig; + } + + @Test + public void testHandlePutResultTask() throws Exception { + DefaultMessageStore messageStore = mock(DefaultMessageStore.class); + MessageStoreConfig config = buildMessageStoreConfig(); + config.setEnableScheduleMessageStats(false); + config.setEnableScheduleAsyncDeliver(true); + when(messageStore.getMessageStoreConfig()).thenReturn(config); + ScheduleMessageService scheduleMessageService = new ScheduleMessageService(messageStore); + scheduleMessageService.parseDelayLevel(); + + Field field = scheduleMessageService.getClass().getDeclaredField("deliverPendingTable"); + field.setAccessible(true); + Map<Integer /* level */, LinkedBlockingQueue<ScheduleMessageService.PutResultProcess>> deliverPendingTable = + (Map<Integer, LinkedBlockingQueue<ScheduleMessageService.PutResultProcess>>) field.get(scheduleMessageService); + + field = scheduleMessageService.getClass().getDeclaredField("offsetTable"); + field.setAccessible(true); + ConcurrentMap<Integer /* level */, Long/* offset */> offsetTable = + (ConcurrentMap<Integer /* level */, Long/* offset */>) field.get(scheduleMessageService); + for (int i = 1; i <= scheduleMessageService.getMaxDelayLevel(); i++) { + offsetTable.put(i, 0L); + } + + int deliverThreadPoolNums = Runtime.getRuntime().availableProcessors(); + ScheduledExecutorService handleExecutorService = new ScheduledThreadPoolExecutor(deliverThreadPoolNums, + new ThreadFactoryImpl("ScheduleMessageExecutorHandleThread_")); + field = scheduleMessageService.getClass().getDeclaredField("handleExecutorService"); + field.setAccessible(true); + field.set(scheduleMessageService, handleExecutorService); + + field = scheduleMessageService.getClass().getDeclaredField("started"); + field.setAccessible(true); + AtomicBoolean started = (AtomicBoolean) field.get(scheduleMessageService); + started.set(true); + + for (int level = 1; level <= scheduleMessageService.getMaxDelayLevel(); level++) { + ScheduleMessageService.HandlePutResultTask handlePutResultTask = scheduleMessageService.new HandlePutResultTask(level); + handleExecutorService.schedule(handlePutResultTask, 10L, TimeUnit.MILLISECONDS); + } + + MessageExt messageExt = new MessageExt(); + messageExt.putUserProperty("init", "test"); + messageExt.getProperties().put(MessageConst.PROPERTY_REAL_QUEUE_ID, "0"); + when(messageStore.lookMessageByOffset(anyLong(), anyInt())).thenReturn(messageExt); + when(messageStore.putMessage(any())).thenReturn(new PutMessageResult(PutMessageStatus.PUT_OK, null)); + + int msgNum = 100; + int totalMsgNum = msgNum * scheduleMessageService.getMaxDelayLevel(); + List<CompletableFuture<PutMessageResult>> putMsgFutrueList = new ArrayList<>(totalMsgNum); + for (int level = 1; level <= scheduleMessageService.getMaxDelayLevel(); level++) { + for (int num = 0; num < msgNum; num++) { + CompletableFuture<PutMessageResult> future = new CompletableFuture<>(); + ScheduleMessageService.PutResultProcess putResultProcess = scheduleMessageService.new PutResultProcess(); + putResultProcess = putResultProcess + .setOffset(num) + .setAutoResend(true) + .setFuture(future) + .thenProcess(); + deliverPendingTable.get(level).add(putResultProcess); + putMsgFutrueList.add(future); + } + } + + Collections.shuffle(putMsgFutrueList); + Random random = new Random(); + for (CompletableFuture<PutMessageResult> future : putMsgFutrueList) { + PutMessageStatus status; + if (random.nextInt(1000) % 2 == 0) { + status = PutMessageStatus.PUT_OK; + } else { + status = PutMessageStatus.OS_PAGECACHE_BUSY; + } + + if (random.nextInt(1000) % 2 == 0) { + PutMessageResult result = new PutMessageResult(status, null); + future.complete(result); + } else { + future.completeExceptionally(new Throwable("complete exceptionally")); + } + } + + Thread.sleep(1000); + for (int level = 1; level <= scheduleMessageService.getMaxDelayLevel(); level++) { + Assert.assertEquals(0, deliverPendingTable.get(level).size()); + Assert.assertEquals(msgNum, offsetTable.get(level).longValue()); + } + + scheduleMessageService.shutdown(); } }