This is an automated email from the ASF dual-hosted git repository. lizhimin pushed a commit to branch develop in repository https://gitbox.apache.org/repos/asf/rocketmq.git
The following commit(s) were added to refs/heads/develop by this push: new e1021ecc73 [ISSUE #9416] Fix batch send messages have the same message id when consumed (#9417) e1021ecc73 is described below commit e1021ecc73790ce4d4d0acc6570d0b6381f84076 Author: qianye <wuxingcan....@alibaba-inc.com> AuthorDate: Wed May 21 10:27:23 2025 +0800 [ISSUE #9416] Fix batch send messages have the same message id when consumed (#9417) --- .../org/apache/rocketmq/store/MessageExtEncoder.java | 20 +------------------- .../apache/rocketmq/store/AppendCallbackTest.java | 7 +++---- .../apache/rocketmq/store/BatchPutMessageTest.java | 14 +++++++------- 3 files changed, 11 insertions(+), 30 deletions(-) diff --git a/store/src/main/java/org/apache/rocketmq/store/MessageExtEncoder.java b/store/src/main/java/org/apache/rocketmq/store/MessageExtEncoder.java index 7531c96d11..500b0e6f53 100644 --- a/store/src/main/java/org/apache/rocketmq/store/MessageExtEncoder.java +++ b/store/src/main/java/org/apache/rocketmq/store/MessageExtEncoder.java @@ -290,15 +290,6 @@ public class MessageExtEncoder { throw new RuntimeException("message body size exceeded"); } - // properties from MessageExtBatch - String batchPropStr = MessageDecoder.messageProperties2String(messageExtBatch.getProperties()); - final byte[] batchPropData = batchPropStr.getBytes(MessageDecoder.CHARSET_UTF8); - int batchPropDataLen = batchPropData.length; - if (batchPropDataLen > Short.MAX_VALUE) { - CommitLog.log.warn("Properties size of messageExtBatch exceeded, properties size: {}, maxSize: {}.", batchPropDataLen, Short.MAX_VALUE); - throw new RuntimeException("Properties size of messageExtBatch exceeded!"); - } - final short batchPropLen = (short) batchPropDataLen; int batchSize = 0; while (messagesByteBuff.hasRemaining()) { @@ -320,14 +311,11 @@ public class MessageExtEncoder { short propertiesLen = messagesByteBuff.getShort(); int propertiesPos = messagesByteBuff.position(); messagesByteBuff.position(propertiesPos + propertiesLen); - boolean needAppendLastPropertySeparator = propertiesLen > 0 && batchPropLen > 0 - && messagesByteBuff.get(messagesByteBuff.position() - 1) != MessageDecoder.PROPERTY_SEPARATOR; final byte[] topicData = messageExtBatch.getTopic().getBytes(MessageDecoder.CHARSET_UTF8); final int topicLength = topicData.length; - int totalPropLen = needAppendLastPropertySeparator ? - propertiesLen + batchPropLen + 1 : propertiesLen + batchPropLen; + int totalPropLen = propertiesLen; // properties need to add crc32 totalPropLen += crc32ReservedLength; @@ -386,12 +374,6 @@ public class MessageExtEncoder { if (propertiesLen > 0) { this.byteBuf.writeBytes(messagesByteBuff.array(), propertiesPos, propertiesLen); } - if (batchPropLen > 0) { - if (needAppendLastPropertySeparator) { - this.byteBuf.writeByte((byte) MessageDecoder.PROPERTY_SEPARATOR); - } - this.byteBuf.writeBytes(batchPropData, 0, batchPropLen); - } this.byteBuf.writerIndex(this.byteBuf.writerIndex() + crc32ReservedLength); } putMessageContext.setBatchSize(batchSize); diff --git a/store/src/test/java/org/apache/rocketmq/store/AppendCallbackTest.java b/store/src/test/java/org/apache/rocketmq/store/AppendCallbackTest.java index 3748571496..e8785994f8 100644 --- a/store/src/test/java/org/apache/rocketmq/store/AppendCallbackTest.java +++ b/store/src/test/java/org/apache/rocketmq/store/AppendCallbackTest.java @@ -17,6 +17,9 @@ package org.apache.rocketmq.store; +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertTrue; + import java.io.File; import java.net.InetSocketAddress; import java.nio.ByteBuffer; @@ -24,7 +27,6 @@ import java.util.ArrayList; import java.util.HashSet; import java.util.List; import java.util.Set; - import java.util.concurrent.ConcurrentHashMap; import org.apache.rocketmq.common.BrokerConfig; import org.apache.rocketmq.common.UtilAll; @@ -37,9 +39,6 @@ import org.junit.After; import org.junit.Before; import org.junit.Test; -import static org.junit.Assert.assertEquals; -import static org.junit.Assert.assertTrue; - public class AppendCallbackTest { AppendMessageCallback callback; diff --git a/store/src/test/java/org/apache/rocketmq/store/BatchPutMessageTest.java b/store/src/test/java/org/apache/rocketmq/store/BatchPutMessageTest.java index 768029ca1a..e8b1218098 100644 --- a/store/src/test/java/org/apache/rocketmq/store/BatchPutMessageTest.java +++ b/store/src/test/java/org/apache/rocketmq/store/BatchPutMessageTest.java @@ -17,6 +17,12 @@ package org.apache.rocketmq.store; +import static java.util.concurrent.TimeUnit.SECONDS; +import static org.apache.rocketmq.common.message.MessageDecoder.messageProperties2String; +import static org.assertj.core.api.Assertions.assertThat; +import static org.awaitility.Awaitility.await; +import static org.junit.Assert.assertTrue; + import java.io.File; import java.net.InetSocketAddress; import java.nio.charset.Charset; @@ -39,12 +45,6 @@ import org.junit.After; import org.junit.Before; import org.junit.Test; -import static java.util.concurrent.TimeUnit.SECONDS; -import static org.apache.rocketmq.common.message.MessageDecoder.messageProperties2String; -import static org.assertj.core.api.Assertions.assertThat; -import static org.awaitility.Awaitility.await; -import static org.junit.Assert.assertTrue; - public class BatchPutMessageTest { private MessageStore messageStore; @@ -108,7 +108,7 @@ public class BatchPutMessageTest { short propertiesLength = (short) propertiesBytes.length; final byte[] topicData = msg.getTopic().getBytes(MessageDecoder.CHARSET_UTF8); final int topicLength = topicData.length; - msgLengthArr[j] = calMsgLength(msg.getBody().length, topicLength, propertiesLength + batchPropLen) + msgLengthArr[j - 1]; + msgLengthArr[j] = calMsgLength(msg.getBody().length, topicLength, propertiesLength) + msgLengthArr[j - 1]; j++; } byte[] batchMessageBody = MessageDecoder.encodeMessages(messages);