This is an automated email from the ASF dual-hosted git repository. jinrongtong pushed a commit to branch develop in repository https://gitbox.apache.org/repos/asf/rocketmq.git
The following commit(s) were added to refs/heads/develop by this push: new 280804c559 [ISSUE #8693] Fix checking MultiDispatchMessage when appending commitlog 280804c559 is described below commit 280804c5592341f92a43b6d72ec6e94db77b74ac Author: Liu Shengzhong <szliu0...@gmail.com> AuthorDate: Wed Sep 18 13:57:20 2024 +0800 [ISSUE #8693] Fix checking MultiDispatchMessage when appending commitlog --- store/src/main/java/org/apache/rocketmq/store/CommitLog.java | 8 +++++--- .../main/java/org/apache/rocketmq/store/MessageExtEncoder.java | 2 +- 2 files changed, 6 insertions(+), 4 deletions(-) diff --git a/store/src/main/java/org/apache/rocketmq/store/CommitLog.java b/store/src/main/java/org/apache/rocketmq/store/CommitLog.java index f34c6944c9..972e71aadd 100644 --- a/store/src/main/java/org/apache/rocketmq/store/CommitLog.java +++ b/store/src/main/java/org/apache/rocketmq/store/CommitLog.java @@ -61,6 +61,7 @@ import org.apache.rocketmq.store.config.MessageStoreConfig; import org.apache.rocketmq.store.ha.HAService; import org.apache.rocketmq.store.ha.autoswitch.AutoSwitchHAService; import org.apache.rocketmq.store.logfile.MappedFile; +import org.apache.rocketmq.store.queue.MultiDispatchUtils; import org.apache.rocketmq.store.util.LibC; import org.rocksdb.RocksDBException; @@ -1903,7 +1904,7 @@ public class CommitLog implements Swappable { // STORETIMESTAMP + STOREHOSTADDRESS + OFFSET <br> ByteBuffer preEncodeBuffer = msgInner.getEncodedBuff(); - boolean isMultiDispatchMsg = messageStoreConfig.isEnableMultiDispatch() && CommitLog.isMultiDispatchMsg(msgInner); + final boolean isMultiDispatchMsg = CommitLog.isMultiDispatchMsg(messageStoreConfig, msgInner); if (isMultiDispatchMsg) { AppendMessageResult appendMessageResult = handlePropertiesForLmqMsg(preEncodeBuffer, msgInner); if (appendMessageResult != null) { @@ -2244,8 +2245,9 @@ public class CommitLog implements Swappable { return flushManager; } - public static boolean isMultiDispatchMsg(MessageExtBrokerInner msg) { - return StringUtils.isNoneBlank(msg.getProperty(MessageConst.PROPERTY_INNER_MULTI_DISPATCH)) && !msg.getTopic().startsWith(MixAll.RETRY_GROUP_TOPIC_PREFIX); + public static boolean isMultiDispatchMsg(MessageStoreConfig messageStoreConfig, MessageExtBrokerInner msg) { + return StringUtils.isNotBlank(msg.getProperty(MessageConst.PROPERTY_INNER_MULTI_DISPATCH)) && + MultiDispatchUtils.isNeedHandleMultiDispatch(messageStoreConfig, msg.getTopic()); } private boolean isCloseReadAhead() { diff --git a/store/src/main/java/org/apache/rocketmq/store/MessageExtEncoder.java b/store/src/main/java/org/apache/rocketmq/store/MessageExtEncoder.java index 20e9a652b7..5c74918d9e 100644 --- a/store/src/main/java/org/apache/rocketmq/store/MessageExtEncoder.java +++ b/store/src/main/java/org/apache/rocketmq/store/MessageExtEncoder.java @@ -175,7 +175,7 @@ public class MessageExtEncoder { public PutMessageResult encode(MessageExtBrokerInner msgInner) { this.byteBuf.clear(); - if (messageStoreConfig.isEnableMultiDispatch() && CommitLog.isMultiDispatchMsg(msgInner)) { + if (CommitLog.isMultiDispatchMsg(messageStoreConfig, msgInner)) { return encodeWithoutProperties(msgInner); }