This is an automated email from the ASF dual-hosted git repository. yuzhou pushed a commit to branch develop in repository https://gitbox.apache.org/repos/asf/rocketmq.git
The following commit(s) were added to refs/heads/develop by this push: new f120645a1 [ISSUE #4167] Optimize the logic of MessageDecoder#decodeMessageId method (#4082) f120645a1 is described below commit f120645a11abc1ecd2b6a6f1a07a8d339cc1ea11 Author: 灼华 <43363120+burnin...@users.noreply.github.com> AuthorDate: Thu Apr 14 23:06:09 2022 +0800 [ISSUE #4167] Optimize the logic of MessageDecoder#decodeMessageId method (#4082) Co-authored-by: 灼华 <g...@didiglobal.com> --- .../rocketmq/common/message/MessageDecoder.java | 19 +++++++--------- .../common/message/MessageDecoderTest.java | 26 ++++++++++++++++++++++ .../java/org/apache/rocketmq/store/MappedFile.java | 2 +- 3 files changed, 35 insertions(+), 12 deletions(-) diff --git a/common/src/main/java/org/apache/rocketmq/common/message/MessageDecoder.java b/common/src/main/java/org/apache/rocketmq/common/message/MessageDecoder.java index 929912772..b127ac6cb 100644 --- a/common/src/main/java/org/apache/rocketmq/common/message/MessageDecoder.java +++ b/common/src/main/java/org/apache/rocketmq/common/message/MessageDecoder.java @@ -82,20 +82,17 @@ public class MessageDecoder { } public static MessageId decodeMessageId(final String msgId) throws UnknownHostException { - SocketAddress address; - long offset; - int ipLength = msgId.length() == 32 ? 4 * 2 : 16 * 2; + byte[] bytes = UtilAll.string2bytes(msgId); + ByteBuffer byteBuffer = ByteBuffer.wrap(bytes); - byte[] ip = UtilAll.string2bytes(msgId.substring(0, ipLength)); - byte[] port = UtilAll.string2bytes(msgId.substring(ipLength, ipLength + 8)); - ByteBuffer bb = ByteBuffer.wrap(port); - int portInt = bb.getInt(0); - address = new InetSocketAddress(InetAddress.getByAddress(ip), portInt); + // address(ip+port) + byte[] ip = new byte[msgId.length() == 32 ? 4 : 16]; + byteBuffer.get(ip); + int port = byteBuffer.getInt(); + SocketAddress address = new InetSocketAddress(InetAddress.getByAddress(ip), port); // offset - byte[] data = UtilAll.string2bytes(msgId.substring(ipLength + 8, ipLength + 8 + 16)); - bb = ByteBuffer.wrap(data); - offset = bb.getLong(0); + long offset = byteBuffer.getLong(); return new MessageId(address, offset); } diff --git a/common/src/test/java/org/apache/rocketmq/common/message/MessageDecoderTest.java b/common/src/test/java/org/apache/rocketmq/common/message/MessageDecoderTest.java index b27f24669..51ea5971b 100644 --- a/common/src/test/java/org/apache/rocketmq/common/message/MessageDecoderTest.java +++ b/common/src/test/java/org/apache/rocketmq/common/message/MessageDecoderTest.java @@ -17,6 +17,7 @@ package org.apache.rocketmq.common.message; +import org.apache.rocketmq.common.sysflag.MessageSysFlag; import org.junit.Test; import java.net.InetAddress; @@ -28,6 +29,7 @@ import java.util.Map; import static org.apache.rocketmq.common.message.MessageDecoder.NAME_VALUE_SEPARATOR; import static org.apache.rocketmq.common.message.MessageDecoder.PROPERTY_SEPARATOR; import static org.apache.rocketmq.common.message.MessageDecoder.createMessageId; +import static org.apache.rocketmq.common.message.MessageDecoder.decodeMessageId; import static org.assertj.core.api.Assertions.assertThat; public class MessageDecoderTest { @@ -373,4 +375,28 @@ public class MessageDecoderTest { assertThat(m.get("1")).isEqualTo("1"); } + @Test + public void testMessageId() throws Exception{ + // ipv4 messageId test + MessageExt msgExt = new MessageExt(); + msgExt.setStoreHost(new InetSocketAddress("127.0.0.1", 9103)); + msgExt.setCommitLogOffset(123456); + verifyMessageId(msgExt); + + // ipv6 messageId test + msgExt.setStoreHostAddressV6Flag(); + msgExt.setStoreHost(new InetSocketAddress(InetAddress.getByName("::1"), 0)); + verifyMessageId(msgExt); + } + + private void verifyMessageId(MessageExt msgExt) throws UnknownHostException { + int storehostIPLength = (msgExt.getSysFlag() & MessageSysFlag.STOREHOSTADDRESS_V6_FLAG) == 0 ? 4 : 16; + int msgIDLength = storehostIPLength + 4 + 8; + ByteBuffer byteBufferMsgId = ByteBuffer.allocate(msgIDLength); + String msgId = createMessageId(byteBufferMsgId, msgExt.getStoreHostBytes(), msgExt.getCommitLogOffset()); + + MessageId messageId = decodeMessageId(msgId); + assertThat(messageId.getAddress()).isEqualTo(msgExt.getStoreHost()); + assertThat(messageId.getOffset()).isEqualTo(msgExt.getCommitLogOffset()); + } } \ No newline at end of file diff --git a/store/src/main/java/org/apache/rocketmq/store/MappedFile.java b/store/src/main/java/org/apache/rocketmq/store/MappedFile.java index b46e7cad1..f07031499 100644 --- a/store/src/main/java/org/apache/rocketmq/store/MappedFile.java +++ b/store/src/main/java/org/apache/rocketmq/store/MappedFile.java @@ -262,7 +262,7 @@ public class MappedFile extends ReferenceResource { } /** - * Content of data from offset to offset + length will be wrote to file. + * Content of data from offset to offset + length will be written to file. * * @param offset The offset of the subarray to be used. * @param length The length of the subarray to be used.