This is an automated email from the ASF dual-hosted git repository.

lizhimin pushed a commit to branch develop
in repository https://gitbox.apache.org/repos/asf/rocketmq.git


The following commit(s) were added to refs/heads/develop by this push:
     new e1021ecc73 [ISSUE #9416] Fix batch send messages have the same message 
id when consumed (#9417)
e1021ecc73 is described below

commit e1021ecc73790ce4d4d0acc6570d0b6381f84076
Author: qianye <wuxingcan....@alibaba-inc.com>
AuthorDate: Wed May 21 10:27:23 2025 +0800

    [ISSUE #9416] Fix batch send messages have the same message id when 
consumed (#9417)
---
 .../org/apache/rocketmq/store/MessageExtEncoder.java | 20 +-------------------
 .../apache/rocketmq/store/AppendCallbackTest.java    |  7 +++----
 .../apache/rocketmq/store/BatchPutMessageTest.java   | 14 +++++++-------
 3 files changed, 11 insertions(+), 30 deletions(-)

diff --git 
a/store/src/main/java/org/apache/rocketmq/store/MessageExtEncoder.java 
b/store/src/main/java/org/apache/rocketmq/store/MessageExtEncoder.java
index 7531c96d11..500b0e6f53 100644
--- a/store/src/main/java/org/apache/rocketmq/store/MessageExtEncoder.java
+++ b/store/src/main/java/org/apache/rocketmq/store/MessageExtEncoder.java
@@ -290,15 +290,6 @@ public class MessageExtEncoder {
             throw new RuntimeException("message body size exceeded");
         }
 
-        // properties from MessageExtBatch
-        String batchPropStr = 
MessageDecoder.messageProperties2String(messageExtBatch.getProperties());
-        final byte[] batchPropData = 
batchPropStr.getBytes(MessageDecoder.CHARSET_UTF8);
-        int batchPropDataLen = batchPropData.length;
-        if (batchPropDataLen > Short.MAX_VALUE) {
-            CommitLog.log.warn("Properties size of messageExtBatch exceeded, 
properties size: {}, maxSize: {}.", batchPropDataLen, Short.MAX_VALUE);
-            throw new RuntimeException("Properties size of messageExtBatch 
exceeded!");
-        }
-        final short batchPropLen = (short) batchPropDataLen;
 
         int batchSize = 0;
         while (messagesByteBuff.hasRemaining()) {
@@ -320,14 +311,11 @@ public class MessageExtEncoder {
             short propertiesLen = messagesByteBuff.getShort();
             int propertiesPos = messagesByteBuff.position();
             messagesByteBuff.position(propertiesPos + propertiesLen);
-            boolean needAppendLastPropertySeparator = propertiesLen > 0 && 
batchPropLen > 0
-                && messagesByteBuff.get(messagesByteBuff.position() - 1) != 
MessageDecoder.PROPERTY_SEPARATOR;
 
             final byte[] topicData = 
messageExtBatch.getTopic().getBytes(MessageDecoder.CHARSET_UTF8);
 
             final int topicLength = topicData.length;
-            int totalPropLen = needAppendLastPropertySeparator ?
-                propertiesLen + batchPropLen + 1 : propertiesLen + 
batchPropLen;
+            int totalPropLen = propertiesLen;
 
             // properties need to add crc32
             totalPropLen += crc32ReservedLength;
@@ -386,12 +374,6 @@ public class MessageExtEncoder {
             if (propertiesLen > 0) {
                 this.byteBuf.writeBytes(messagesByteBuff.array(), 
propertiesPos, propertiesLen);
             }
-            if (batchPropLen > 0) {
-                if (needAppendLastPropertySeparator) {
-                    this.byteBuf.writeByte((byte) 
MessageDecoder.PROPERTY_SEPARATOR);
-                }
-                this.byteBuf.writeBytes(batchPropData, 0, batchPropLen);
-            }
             this.byteBuf.writerIndex(this.byteBuf.writerIndex() + 
crc32ReservedLength);
         }
         putMessageContext.setBatchSize(batchSize);
diff --git 
a/store/src/test/java/org/apache/rocketmq/store/AppendCallbackTest.java 
b/store/src/test/java/org/apache/rocketmq/store/AppendCallbackTest.java
index 3748571496..e8785994f8 100644
--- a/store/src/test/java/org/apache/rocketmq/store/AppendCallbackTest.java
+++ b/store/src/test/java/org/apache/rocketmq/store/AppendCallbackTest.java
@@ -17,6 +17,9 @@
 
 package org.apache.rocketmq.store;
 
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertTrue;
+
 import java.io.File;
 import java.net.InetSocketAddress;
 import java.nio.ByteBuffer;
@@ -24,7 +27,6 @@ import java.util.ArrayList;
 import java.util.HashSet;
 import java.util.List;
 import java.util.Set;
-
 import java.util.concurrent.ConcurrentHashMap;
 import org.apache.rocketmq.common.BrokerConfig;
 import org.apache.rocketmq.common.UtilAll;
@@ -37,9 +39,6 @@ import org.junit.After;
 import org.junit.Before;
 import org.junit.Test;
 
-import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertTrue;
-
 public class AppendCallbackTest {
 
     AppendMessageCallback callback;
diff --git 
a/store/src/test/java/org/apache/rocketmq/store/BatchPutMessageTest.java 
b/store/src/test/java/org/apache/rocketmq/store/BatchPutMessageTest.java
index 768029ca1a..e8b1218098 100644
--- a/store/src/test/java/org/apache/rocketmq/store/BatchPutMessageTest.java
+++ b/store/src/test/java/org/apache/rocketmq/store/BatchPutMessageTest.java
@@ -17,6 +17,12 @@
 
 package org.apache.rocketmq.store;
 
+import static java.util.concurrent.TimeUnit.SECONDS;
+import static 
org.apache.rocketmq.common.message.MessageDecoder.messageProperties2String;
+import static org.assertj.core.api.Assertions.assertThat;
+import static org.awaitility.Awaitility.await;
+import static org.junit.Assert.assertTrue;
+
 import java.io.File;
 import java.net.InetSocketAddress;
 import java.nio.charset.Charset;
@@ -39,12 +45,6 @@ import org.junit.After;
 import org.junit.Before;
 import org.junit.Test;
 
-import static java.util.concurrent.TimeUnit.SECONDS;
-import static 
org.apache.rocketmq.common.message.MessageDecoder.messageProperties2String;
-import static org.assertj.core.api.Assertions.assertThat;
-import static org.awaitility.Awaitility.await;
-import static org.junit.Assert.assertTrue;
-
 public class BatchPutMessageTest {
 
     private MessageStore messageStore;
@@ -108,7 +108,7 @@ public class BatchPutMessageTest {
             short propertiesLength = (short) propertiesBytes.length;
             final byte[] topicData = 
msg.getTopic().getBytes(MessageDecoder.CHARSET_UTF8);
             final int topicLength = topicData.length;
-            msgLengthArr[j] = calMsgLength(msg.getBody().length, topicLength, 
propertiesLength + batchPropLen) + msgLengthArr[j - 1];
+            msgLengthArr[j] = calMsgLength(msg.getBody().length, topicLength, 
propertiesLength) + msgLengthArr[j - 1];
             j++;
         }
         byte[] batchMessageBody = MessageDecoder.encodeMessages(messages);

Reply via email to