This is an automated email from the ASF dual-hosted git repository.

jinrongtong pushed a commit to branch develop
in repository https://gitbox.apache.org/repos/asf/rocketmq.git


The following commit(s) were added to refs/heads/develop by this push:
     new 280804c559 [ISSUE #8693] Fix checking MultiDispatchMessage when 
appending commitlog
280804c559 is described below

commit 280804c5592341f92a43b6d72ec6e94db77b74ac
Author: Liu Shengzhong <szliu0...@gmail.com>
AuthorDate: Wed Sep 18 13:57:20 2024 +0800

    [ISSUE #8693] Fix checking MultiDispatchMessage when appending commitlog
---
 store/src/main/java/org/apache/rocketmq/store/CommitLog.java      | 8 +++++---
 .../main/java/org/apache/rocketmq/store/MessageExtEncoder.java    | 2 +-
 2 files changed, 6 insertions(+), 4 deletions(-)

diff --git a/store/src/main/java/org/apache/rocketmq/store/CommitLog.java 
b/store/src/main/java/org/apache/rocketmq/store/CommitLog.java
index f34c6944c9..972e71aadd 100644
--- a/store/src/main/java/org/apache/rocketmq/store/CommitLog.java
+++ b/store/src/main/java/org/apache/rocketmq/store/CommitLog.java
@@ -61,6 +61,7 @@ import org.apache.rocketmq.store.config.MessageStoreConfig;
 import org.apache.rocketmq.store.ha.HAService;
 import org.apache.rocketmq.store.ha.autoswitch.AutoSwitchHAService;
 import org.apache.rocketmq.store.logfile.MappedFile;
+import org.apache.rocketmq.store.queue.MultiDispatchUtils;
 import org.apache.rocketmq.store.util.LibC;
 import org.rocksdb.RocksDBException;
 
@@ -1903,7 +1904,7 @@ public class CommitLog implements Swappable {
             // STORETIMESTAMP + STOREHOSTADDRESS + OFFSET <br>
 
             ByteBuffer preEncodeBuffer = msgInner.getEncodedBuff();
-            boolean isMultiDispatchMsg = 
messageStoreConfig.isEnableMultiDispatch() && 
CommitLog.isMultiDispatchMsg(msgInner);
+            final boolean isMultiDispatchMsg = 
CommitLog.isMultiDispatchMsg(messageStoreConfig, msgInner);
             if (isMultiDispatchMsg) {
                 AppendMessageResult appendMessageResult = 
handlePropertiesForLmqMsg(preEncodeBuffer, msgInner);
                 if (appendMessageResult != null) {
@@ -2244,8 +2245,9 @@ public class CommitLog implements Swappable {
         return flushManager;
     }
 
-    public static boolean isMultiDispatchMsg(MessageExtBrokerInner msg) {
-        return 
StringUtils.isNoneBlank(msg.getProperty(MessageConst.PROPERTY_INNER_MULTI_DISPATCH))
 && !msg.getTopic().startsWith(MixAll.RETRY_GROUP_TOPIC_PREFIX);
+    public static boolean isMultiDispatchMsg(MessageStoreConfig 
messageStoreConfig, MessageExtBrokerInner msg) {
+        return 
StringUtils.isNotBlank(msg.getProperty(MessageConst.PROPERTY_INNER_MULTI_DISPATCH))
 &&
+            MultiDispatchUtils.isNeedHandleMultiDispatch(messageStoreConfig, 
msg.getTopic());
     }
 
     private boolean isCloseReadAhead() {
diff --git 
a/store/src/main/java/org/apache/rocketmq/store/MessageExtEncoder.java 
b/store/src/main/java/org/apache/rocketmq/store/MessageExtEncoder.java
index 20e9a652b7..5c74918d9e 100644
--- a/store/src/main/java/org/apache/rocketmq/store/MessageExtEncoder.java
+++ b/store/src/main/java/org/apache/rocketmq/store/MessageExtEncoder.java
@@ -175,7 +175,7 @@ public class MessageExtEncoder {
     public PutMessageResult encode(MessageExtBrokerInner msgInner) {
         this.byteBuf.clear();
 
-        if (messageStoreConfig.isEnableMultiDispatch() && 
CommitLog.isMultiDispatchMsg(msgInner)) {
+        if (CommitLog.isMultiDispatchMsg(messageStoreConfig, msgInner)) {
             return encodeWithoutProperties(msgInner);
         }
 

Reply via email to