This is an automated email from the ASF dual-hosted git repository.
yuzhou pushed a commit to branch develop
in repository https://gitbox.apache.org/repos/asf/rocketmq.git
The following commit(s) were added to refs/heads/develop by this push:
new f120645a1 [ISSUE #4167] Optimize the logic of
MessageDecoder#decodeMessageId method (#4082)
f120645a1 is described below
commit f120645a11abc1ecd2b6a6f1a07a8d339cc1ea11
Author: 灼华 <[email protected]>
AuthorDate: Thu Apr 14 23:06:09 2022 +0800
[ISSUE #4167] Optimize the logic of MessageDecoder#decodeMessageId method
(#4082)
Co-authored-by: 灼华 <[email protected]>
---
.../rocketmq/common/message/MessageDecoder.java | 19 +++++++---------
.../common/message/MessageDecoderTest.java | 26 ++++++++++++++++++++++
.../java/org/apache/rocketmq/store/MappedFile.java | 2 +-
3 files changed, 35 insertions(+), 12 deletions(-)
diff --git
a/common/src/main/java/org/apache/rocketmq/common/message/MessageDecoder.java
b/common/src/main/java/org/apache/rocketmq/common/message/MessageDecoder.java
index 929912772..b127ac6cb 100644
---
a/common/src/main/java/org/apache/rocketmq/common/message/MessageDecoder.java
+++
b/common/src/main/java/org/apache/rocketmq/common/message/MessageDecoder.java
@@ -82,20 +82,17 @@ public class MessageDecoder {
}
public static MessageId decodeMessageId(final String msgId) throws
UnknownHostException {
- SocketAddress address;
- long offset;
- int ipLength = msgId.length() == 32 ? 4 * 2 : 16 * 2;
+ byte[] bytes = UtilAll.string2bytes(msgId);
+ ByteBuffer byteBuffer = ByteBuffer.wrap(bytes);
- byte[] ip = UtilAll.string2bytes(msgId.substring(0, ipLength));
- byte[] port = UtilAll.string2bytes(msgId.substring(ipLength, ipLength
+ 8));
- ByteBuffer bb = ByteBuffer.wrap(port);
- int portInt = bb.getInt(0);
- address = new InetSocketAddress(InetAddress.getByAddress(ip), portInt);
+ // address(ip+port)
+ byte[] ip = new byte[msgId.length() == 32 ? 4 : 16];
+ byteBuffer.get(ip);
+ int port = byteBuffer.getInt();
+ SocketAddress address = new
InetSocketAddress(InetAddress.getByAddress(ip), port);
// offset
- byte[] data = UtilAll.string2bytes(msgId.substring(ipLength + 8,
ipLength + 8 + 16));
- bb = ByteBuffer.wrap(data);
- offset = bb.getLong(0);
+ long offset = byteBuffer.getLong();
return new MessageId(address, offset);
}
diff --git
a/common/src/test/java/org/apache/rocketmq/common/message/MessageDecoderTest.java
b/common/src/test/java/org/apache/rocketmq/common/message/MessageDecoderTest.java
index b27f24669..51ea5971b 100644
---
a/common/src/test/java/org/apache/rocketmq/common/message/MessageDecoderTest.java
+++
b/common/src/test/java/org/apache/rocketmq/common/message/MessageDecoderTest.java
@@ -17,6 +17,7 @@
package org.apache.rocketmq.common.message;
+import org.apache.rocketmq.common.sysflag.MessageSysFlag;
import org.junit.Test;
import java.net.InetAddress;
@@ -28,6 +29,7 @@ import java.util.Map;
import static
org.apache.rocketmq.common.message.MessageDecoder.NAME_VALUE_SEPARATOR;
import static
org.apache.rocketmq.common.message.MessageDecoder.PROPERTY_SEPARATOR;
import static
org.apache.rocketmq.common.message.MessageDecoder.createMessageId;
+import static
org.apache.rocketmq.common.message.MessageDecoder.decodeMessageId;
import static org.assertj.core.api.Assertions.assertThat;
public class MessageDecoderTest {
@@ -373,4 +375,28 @@ public class MessageDecoderTest {
assertThat(m.get("1")).isEqualTo("1");
}
+ @Test
+ public void testMessageId() throws Exception{
+ // ipv4 messageId test
+ MessageExt msgExt = new MessageExt();
+ msgExt.setStoreHost(new InetSocketAddress("127.0.0.1", 9103));
+ msgExt.setCommitLogOffset(123456);
+ verifyMessageId(msgExt);
+
+ // ipv6 messageId test
+ msgExt.setStoreHostAddressV6Flag();
+ msgExt.setStoreHost(new
InetSocketAddress(InetAddress.getByName("::1"), 0));
+ verifyMessageId(msgExt);
+ }
+
+ private void verifyMessageId(MessageExt msgExt) throws
UnknownHostException {
+ int storehostIPLength = (msgExt.getSysFlag() &
MessageSysFlag.STOREHOSTADDRESS_V6_FLAG) == 0 ? 4 : 16;
+ int msgIDLength = storehostIPLength + 4 + 8;
+ ByteBuffer byteBufferMsgId = ByteBuffer.allocate(msgIDLength);
+ String msgId = createMessageId(byteBufferMsgId,
msgExt.getStoreHostBytes(), msgExt.getCommitLogOffset());
+
+ MessageId messageId = decodeMessageId(msgId);
+ assertThat(messageId.getAddress()).isEqualTo(msgExt.getStoreHost());
+
assertThat(messageId.getOffset()).isEqualTo(msgExt.getCommitLogOffset());
+ }
}
\ No newline at end of file
diff --git a/store/src/main/java/org/apache/rocketmq/store/MappedFile.java
b/store/src/main/java/org/apache/rocketmq/store/MappedFile.java
index b46e7cad1..f07031499 100644
--- a/store/src/main/java/org/apache/rocketmq/store/MappedFile.java
+++ b/store/src/main/java/org/apache/rocketmq/store/MappedFile.java
@@ -262,7 +262,7 @@ public class MappedFile extends ReferenceResource {
}
/**
- * Content of data from offset to offset + length will be wrote to file.
+ * Content of data from offset to offset + length will be written to file.
*
* @param offset The offset of the subarray to be used.
* @param length The length of the subarray to be used.