This is an automated email from the ASF dual-hosted git repository. huangli pushed a commit to branch 4.9.2_dev_community in repository https://gitbox.apache.org/repos/asf/rocketmq.git
commit f5e0151fc4ce725317834525fc5a91a310c9aafb Author: huangli <[email protected]> AuthorDate: Thu Nov 4 16:46:12 2021 +0800 优化发送、消费的解码速度 --- .../processor/AbstractSendMessageProcessor.java | 77 +--------------------- .../protocol/header/PullMessageRequestHeader.java | 48 +++++++++++++- .../protocol/header/PullMessageResponseHeader.java | 24 ++++++- .../header/SendMessageRequestHeaderV2.java | 66 ++++++++++++++++++- .../protocol/header/SendMessageResponseHeader.java | 23 ++++++- .../protocol/header/FastCodesHeaderTest.java | 28 +++++--- .../remoting/protocol/FastCodesHeader.java | 34 ++++++++++ .../remoting/protocol/RemotingCommand.java | 5 ++ 8 files changed, 219 insertions(+), 86 deletions(-) diff --git a/broker/src/main/java/org/apache/rocketmq/broker/processor/AbstractSendMessageProcessor.java b/broker/src/main/java/org/apache/rocketmq/broker/processor/AbstractSendMessageProcessor.java index 3303d70..66480ad 100644 --- a/broker/src/main/java/org/apache/rocketmq/broker/processor/AbstractSendMessageProcessor.java +++ b/broker/src/main/java/org/apache/rocketmq/broker/processor/AbstractSendMessageProcessor.java @@ -288,7 +288,9 @@ public abstract class AbstractSendMessageProcessor extends AsyncNettyRequestProc switch (request.getCode()) { case RequestCode.SEND_BATCH_MESSAGE: case RequestCode.SEND_MESSAGE_V2: - requestHeaderV2 = decodeSendMessageHeaderV2(request); + requestHeaderV2 = + (SendMessageRequestHeaderV2) request + .decodeCommandCustomHeader(SendMessageRequestHeaderV2.class); case RequestCode.SEND_MESSAGE: if (null == requestHeaderV2) { requestHeader = @@ -303,79 +305,6 @@ public abstract class AbstractSendMessageProcessor extends AsyncNettyRequestProc return requestHeader; } - static SendMessageRequestHeaderV2 decodeSendMessageHeaderV2(RemotingCommand request) - throws RemotingCommandException { - SendMessageRequestHeaderV2 r = new SendMessageRequestHeaderV2(); - HashMap<String, String> fields = request.getExtFields(); - if (fields == null) { - throw new RemotingCommandException("the ext fields is null"); - } - - String s = fields.get("a"); - checkNotNull(s, "the custom field <a> is null"); - r.setA(s); - - s = fields.get("b"); - checkNotNull(s, "the custom field <b> is null"); - r.setB(s); - - s = fields.get("c"); - checkNotNull(s, "the custom field <c> is null"); - r.setC(s); - - s = fields.get("d"); - checkNotNull(s, "the custom field <d> is null"); - r.setD(Integer.parseInt(s)); - - s = fields.get("e"); - checkNotNull(s, "the custom field <e> is null"); - r.setE(Integer.parseInt(s)); - - s = fields.get("f"); - checkNotNull(s, "the custom field <f> is null"); - r.setF(Integer.parseInt(s)); - - s = fields.get("g"); - checkNotNull(s, "the custom field <g> is null"); - r.setG(Long.parseLong(s)); - - s = fields.get("h"); - checkNotNull(s, "the custom field <h> is null"); - r.setH(Integer.parseInt(s)); - - s = fields.get("i"); - if (s != null) { - r.setI(s); - } - - s = fields.get("j"); - if (s != null) { - r.setJ(Integer.parseInt(s)); - } - - s = fields.get("k"); - if (s != null) { - r.setK(Boolean.parseBoolean(s)); - } - - s = fields.get("l"); - if (s != null) { - r.setL(Integer.parseInt(s)); - } - - s = fields.get("m"); - if (s != null) { - r.setM(Boolean.parseBoolean(s)); - } - return r; - } - - private static void checkNotNull(String s, String msg) throws RemotingCommandException { - if (s == null) { - throw new RemotingCommandException(msg); - } - } - public void executeSendMessageHookAfter(final RemotingCommand response, final SendMessageContext context) { if (hasSendMessageHook()) { for (SendMessageHook hook : this.sendMessageHookList) { diff --git a/common/src/main/java/org/apache/rocketmq/common/protocol/header/PullMessageRequestHeader.java b/common/src/main/java/org/apache/rocketmq/common/protocol/header/PullMessageRequestHeader.java index 106e89e..adc32df 100644 --- a/common/src/main/java/org/apache/rocketmq/common/protocol/header/PullMessageRequestHeader.java +++ b/common/src/main/java/org/apache/rocketmq/common/protocol/header/PullMessageRequestHeader.java @@ -20,12 +20,15 @@ */ package org.apache.rocketmq.common.protocol.header; +import java.util.HashMap; + import org.apache.rocketmq.remoting.CommandCustomHeader; import org.apache.rocketmq.remoting.annotation.CFNotNull; import org.apache.rocketmq.remoting.annotation.CFNullable; import org.apache.rocketmq.remoting.exception.RemotingCommandException; +import org.apache.rocketmq.remoting.protocol.FastCodesHeader; -public class PullMessageRequestHeader implements CommandCustomHeader { +public class PullMessageRequestHeader implements CommandCustomHeader, FastCodesHeader { @CFNotNull private String consumerGroup; @CFNotNull @@ -52,6 +55,49 @@ public class PullMessageRequestHeader implements CommandCustomHeader { public void checkFields() throws RemotingCommandException { } + @Override + public void decode(HashMap<String, String> fields) throws RemotingCommandException { + String str = fields.get("consumerGroup"); + checkNotNull(str, "the custom field <consumerGroup> is null"); + this.consumerGroup = str; + + str = fields.get("topic"); + checkNotNull(str, "the custom field <topic> is null"); + this.topic = str; + + str = fields.get("queueId"); + checkNotNull(str, "the custom field <queueId> is null"); + this.queueId = Integer.parseInt(str); + + str = fields.get("queueOffset"); + checkNotNull(str, "the custom field <queueOffset> is null"); + this.queueOffset = Long.parseLong(str); + + str = fields.get("maxMsgNums"); + checkNotNull(str, "the custom field <maxMsgNums> is null"); + this.maxMsgNums = Integer.parseInt(str); + + str = fields.get("sysFlag"); + checkNotNull(str, "the custom field <sysFlag> is null"); + this.sysFlag = Integer.parseInt(str); + + str = fields.get("commitOffset"); + checkNotNull(str, "the custom field <commitOffset> is null"); + this.commitOffset = Long.parseLong(str); + + str = fields.get("suspendTimeoutMillis"); + checkNotNull(str, "the custom field <suspendTimeoutMillis> is null"); + this.suspendTimeoutMillis = Long.parseLong(str); + + this.subscription = fields.get("subscription");; + + str = fields.get("subVersion"); + checkNotNull(str, "the custom field <subVersion> is null"); + this.subVersion = Long.parseLong(str); + + this.expressionType = fields.get("expressionType"); + } + public String getConsumerGroup() { return consumerGroup; } diff --git a/common/src/main/java/org/apache/rocketmq/common/protocol/header/PullMessageResponseHeader.java b/common/src/main/java/org/apache/rocketmq/common/protocol/header/PullMessageResponseHeader.java index 0112f7d..db7f24b 100644 --- a/common/src/main/java/org/apache/rocketmq/common/protocol/header/PullMessageResponseHeader.java +++ b/common/src/main/java/org/apache/rocketmq/common/protocol/header/PullMessageResponseHeader.java @@ -20,11 +20,14 @@ */ package org.apache.rocketmq.common.protocol.header; +import java.util.HashMap; + import org.apache.rocketmq.remoting.CommandCustomHeader; import org.apache.rocketmq.remoting.annotation.CFNotNull; import org.apache.rocketmq.remoting.exception.RemotingCommandException; +import org.apache.rocketmq.remoting.protocol.FastCodesHeader; -public class PullMessageResponseHeader implements CommandCustomHeader { +public class PullMessageResponseHeader implements CommandCustomHeader, FastCodesHeader { @CFNotNull private Long suggestWhichBrokerId; @CFNotNull @@ -38,6 +41,25 @@ public class PullMessageResponseHeader implements CommandCustomHeader { public void checkFields() throws RemotingCommandException { } + @Override + public void decode(HashMap<String, String> fields) throws RemotingCommandException { + String str = fields.get("suggestWhichBrokerId"); + checkNotNull(str, "the custom field <suggestWhichBrokerId> is null"); + this.suggestWhichBrokerId = Long.parseLong(str); + + str = fields.get("nextBeginOffset"); + checkNotNull(str, "the custom field <nextBeginOffset> is null"); + this.nextBeginOffset = Long.parseLong(str); + + str = fields.get("minOffset"); + checkNotNull(str, "the custom field <minOffset> is null"); + this.minOffset = Long.parseLong(str); + + str = fields.get("maxOffset"); + checkNotNull(str, "the custom field <maxOffset> is null"); + this.maxOffset = Long.parseLong(str); + } + public Long getNextBeginOffset() { return nextBeginOffset; } diff --git a/common/src/main/java/org/apache/rocketmq/common/protocol/header/SendMessageRequestHeaderV2.java b/common/src/main/java/org/apache/rocketmq/common/protocol/header/SendMessageRequestHeaderV2.java index 4e0098b..498a7fa 100644 --- a/common/src/main/java/org/apache/rocketmq/common/protocol/header/SendMessageRequestHeaderV2.java +++ b/common/src/main/java/org/apache/rocketmq/common/protocol/header/SendMessageRequestHeaderV2.java @@ -17,6 +17,9 @@ package org.apache.rocketmq.common.protocol.header; +import java.util.HashMap; + +import org.apache.rocketmq.remoting.protocol.FastCodesHeader; import org.apache.rocketmq.remoting.CommandCustomHeader; import org.apache.rocketmq.remoting.annotation.CFNotNull; import org.apache.rocketmq.remoting.annotation.CFNullable; @@ -25,7 +28,7 @@ import org.apache.rocketmq.remoting.exception.RemotingCommandException; /** * Use short variable name to speed up FastJson deserialization process. */ -public class SendMessageRequestHeaderV2 implements CommandCustomHeader { +public class SendMessageRequestHeaderV2 implements CommandCustomHeader, FastCodesHeader { @CFNotNull private String a; // producerGroup; @CFNotNull @@ -94,6 +97,67 @@ public class SendMessageRequestHeaderV2 implements CommandCustomHeader { public void checkFields() throws RemotingCommandException { } + @Override + public void decode(HashMap<String, String> fields) throws RemotingCommandException { + + String str = fields.get("a"); + checkNotNull(str, "the custom field <a> is null"); + a = str; + + str = fields.get("b"); + checkNotNull(str, "the custom field <b> is null"); + b = str; + + str = fields.get("c"); + checkNotNull(str, "the custom field <c> is null"); + c = str; + + str = fields.get("d"); + checkNotNull(str, "the custom field <d> is null"); + d = Integer.parseInt(str); + + str = fields.get("e"); + checkNotNull(str, "the custom field <e> is null"); + e = Integer.parseInt(str); + + str = fields.get("f"); + checkNotNull(str, "the custom field <f> is null"); + f = Integer.parseInt(str); + + str = fields.get("g"); + checkNotNull(str, "the custom field <g> is null"); + g = Long.parseLong(str); + + str = fields.get("h"); + checkNotNull(str, "the custom field <h> is null"); + h = Integer.parseInt(str); + + str = fields.get("i"); + if (str != null) { + i = str; + } + + str = fields.get("j"); + if (str != null) { + j = Integer.parseInt(str); + } + + str = fields.get("k"); + if (str != null) { + k = Boolean.parseBoolean(str); + } + + str = fields.get("l"); + if (str != null) { + l = Integer.parseInt(str); + } + + str = fields.get("m"); + if (str != null) { + m = Boolean.parseBoolean(str); + } + } + public String getA() { return a; } diff --git a/common/src/main/java/org/apache/rocketmq/common/protocol/header/SendMessageResponseHeader.java b/common/src/main/java/org/apache/rocketmq/common/protocol/header/SendMessageResponseHeader.java index 6834881..9d8786f 100644 --- a/common/src/main/java/org/apache/rocketmq/common/protocol/header/SendMessageResponseHeader.java +++ b/common/src/main/java/org/apache/rocketmq/common/protocol/header/SendMessageResponseHeader.java @@ -20,11 +20,14 @@ */ package org.apache.rocketmq.common.protocol.header; +import java.util.HashMap; + import org.apache.rocketmq.remoting.CommandCustomHeader; import org.apache.rocketmq.remoting.annotation.CFNotNull; import org.apache.rocketmq.remoting.exception.RemotingCommandException; +import org.apache.rocketmq.remoting.protocol.FastCodesHeader; -public class SendMessageResponseHeader implements CommandCustomHeader { +public class SendMessageResponseHeader implements CommandCustomHeader, FastCodesHeader { @CFNotNull private String msgId; @CFNotNull @@ -37,6 +40,24 @@ public class SendMessageResponseHeader implements CommandCustomHeader { public void checkFields() throws RemotingCommandException { } + @Override + public void decode(HashMap<String, String> fields) throws RemotingCommandException { + String str = fields.get("msgId"); + checkNotNull(str, "the custom field <msgId> is null"); + this.msgId = str; + + str = fields.get("queueId"); + checkNotNull(str, "the custom field <queueId> is null"); + this.queueId = Integer.parseInt(str); + + str = fields.get("queueOffset"); + checkNotNull(str, "the custom field <queueOffset> is null"); + this.queueOffset = Long.parseLong(str); + + str = fields.get("transactionId"); + this.transactionId = str; + } + public String getMsgId() { return msgId; } diff --git a/broker/src/test/java/org/apache/rocketmq/broker/processor/AbstractSendMessageProcessorTest.java b/common/src/test/java/org/apache/rocketmq/common/protocol/header/FastCodesHeaderTest.java similarity index 73% rename from broker/src/test/java/org/apache/rocketmq/broker/processor/AbstractSendMessageProcessorTest.java rename to common/src/test/java/org/apache/rocketmq/common/protocol/header/FastCodesHeaderTest.java index da2611b..9e28aa9 100644 --- a/broker/src/test/java/org/apache/rocketmq/broker/processor/AbstractSendMessageProcessorTest.java +++ b/common/src/test/java/org/apache/rocketmq/common/protocol/header/FastCodesHeaderTest.java @@ -14,7 +14,7 @@ * See the License for the specific language governing permissions and * limitations under the License. */ -package org.apache.rocketmq.broker.processor; +package org.apache.rocketmq.common.protocol.header; import java.lang.reflect.Field; import java.util.ArrayList; @@ -24,14 +24,24 @@ import java.util.stream.Collectors; import java.util.stream.Stream; import org.apache.rocketmq.common.protocol.header.SendMessageRequestHeaderV2; +import org.apache.rocketmq.remoting.CommandCustomHeader; +import org.apache.rocketmq.remoting.protocol.FastCodesHeader; import org.apache.rocketmq.remoting.protocol.RemotingCommand; import org.junit.Assert; import org.junit.Test; -public class AbstractSendMessageProcessorTest { +public class FastCodesHeaderTest { + @Test - public void testDecodeSendMessageHeaderV2() throws Exception { - Field[] declaredFields = SendMessageRequestHeaderV2.class.getDeclaredFields(); + public void testFastDecode() throws Exception { + testFastDecode(SendMessageRequestHeaderV2.class); + testFastDecode(SendMessageResponseHeader.class); + testFastDecode(PullMessageRequestHeader.class); + testFastDecode(PullMessageResponseHeader.class); + } + + private void testFastDecode(Class<? extends CommandCustomHeader> classHeader) throws Exception { + Field[] declaredFields = classHeader.getDeclaredFields(); List<Field> declaredFieldsList = new ArrayList<>(); for (Field f : declaredFields) { if (f.getName().startsWith("$")) { @@ -43,7 +53,7 @@ public class AbstractSendMessageProcessorTest { RemotingCommand command = RemotingCommand.createRequestCommand(0, null); HashMap<String, String> m = buildExtFields(declaredFieldsList); command.setExtFields(m); - check(command, declaredFieldsList); + check(command, declaredFieldsList, classHeader); } private HashMap<String, String> buildExtFields(List<Field> fields) { @@ -65,9 +75,11 @@ public class AbstractSendMessageProcessorTest { return extFields; } - private void check(RemotingCommand command, List<Field> fields) throws Exception { - SendMessageRequestHeaderV2 o1 = (SendMessageRequestHeaderV2) command.decodeCommandCustomHeader(SendMessageRequestHeaderV2.class); - SendMessageRequestHeaderV2 o2 = AbstractSendMessageProcessor.decodeSendMessageHeaderV2(command); + private void check(RemotingCommand command, List<Field> fields, + Class<? extends CommandCustomHeader> classHeader) throws Exception { + CommandCustomHeader o1 = command.decodeCommandCustomHeader(classHeader); + CommandCustomHeader o2 = classHeader.newInstance(); + ((FastCodesHeader)o2).decode(command.getExtFields()); for (Field f : fields) { Object value1 = f.get(o1); Object value2 = f.get(o2); diff --git a/remoting/src/main/java/org/apache/rocketmq/remoting/protocol/FastCodesHeader.java b/remoting/src/main/java/org/apache/rocketmq/remoting/protocol/FastCodesHeader.java new file mode 100644 index 0000000..4604ae1 --- /dev/null +++ b/remoting/src/main/java/org/apache/rocketmq/remoting/protocol/FastCodesHeader.java @@ -0,0 +1,34 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.rocketmq.remoting.protocol; + +import java.util.HashMap; + +import org.apache.rocketmq.remoting.exception.RemotingCommandException; + +public interface FastCodesHeader { + default void checkNotNull(String s, String msg) throws RemotingCommandException { + if (s == null) { + throw new RemotingCommandException(msg); + } + } + + void decode(HashMap<String, String> fields) throws RemotingCommandException; + + +} diff --git a/remoting/src/main/java/org/apache/rocketmq/remoting/protocol/RemotingCommand.java b/remoting/src/main/java/org/apache/rocketmq/remoting/protocol/RemotingCommand.java index 51b6194..912eea5 100644 --- a/remoting/src/main/java/org/apache/rocketmq/remoting/protocol/RemotingCommand.java +++ b/remoting/src/main/java/org/apache/rocketmq/remoting/protocol/RemotingCommand.java @@ -243,6 +243,11 @@ public class RemotingCommand { } if (this.extFields != null) { + if (objectHeader instanceof FastCodesHeader) { + ((FastCodesHeader) objectHeader).decode(this.extFields); + objectHeader.checkFields(); + return objectHeader; + } Field[] fields = getClazzFields(classHeader); for (Field field : fields) {
