This is an automated email from the ASF dual-hosted git repository. huangli pushed a commit to branch 4.9.2_dev_community in repository https://gitbox.apache.org/repos/asf/rocketmq.git
commit 10428801df75187ba914e553dd341adf8a1ae6cb Author: huangli <[email protected]> AuthorDate: Fri Nov 5 17:48:40 2021 +0800 优化rocketmq编解码实现零拷贝,配合前面的header编解码优化,和fastjson版本相比,在生产者(client)的火焰图中,encode的占比从7.37%下降到1.78%,decode的占比从3.84降低到1.64% --- .../processor/AbstractSendMessageProcessor.java | 13 --- .../broker/processor/SendMessageProcessor.java | 4 +- .../protocol/header/PullMessageRequestHeader.java | 17 +++ .../protocol/header/PullMessageResponseHeader.java | 10 ++ .../header/SendMessageRequestHeaderV2.java | 21 ++++ .../protocol/header/SendMessageResponseHeader.java | 10 ++ .../rocketmq/example/benchmark/BatchProducer.java | 3 + .../rocketmq/example/benchmark/Consumer.java | 3 + .../rocketmq/example/benchmark/Producer.java | 3 + .../example/benchmark/TransactionProducer.java | 3 + .../rocketmq/remoting/netty/NettyDecoder.java | 5 +- .../rocketmq/remoting/netty/NettyEncoder.java | 3 +- .../remoting/netty/NettyRemotingAbstract.java | 1 + .../remoting/protocol/FastCodesHeader.java | 11 ++ .../remoting/protocol/RemotingCommand.java | 59 ++++++---- .../remoting/protocol/RocketMQSerializable.java | 119 ++++++++++++++------- .../remoting/protocol/RemotingCommandTest.java | 15 ++- .../protocol/RocketMQSerializableTest.java | 68 +++++++++++- 18 files changed, 284 insertions(+), 84 deletions(-) diff --git a/broker/src/main/java/org/apache/rocketmq/broker/processor/AbstractSendMessageProcessor.java b/broker/src/main/java/org/apache/rocketmq/broker/processor/AbstractSendMessageProcessor.java index 85cb705..1f0744e 100644 --- a/broker/src/main/java/org/apache/rocketmq/broker/processor/AbstractSendMessageProcessor.java +++ b/broker/src/main/java/org/apache/rocketmq/broker/processor/AbstractSendMessageProcessor.java @@ -279,19 +279,6 @@ public abstract class AbstractSendMessageProcessor extends AsyncNettyRequestProc this.sendMessageHookList = sendMessageHookList; } - protected void doResponse(ChannelHandlerContext ctx, RemotingCommand request, - final RemotingCommand response) { - if (!request.isOnewayRPC()) { - try { - ctx.writeAndFlush(response); - } catch (Throwable e) { - log.error("SendMessageProcessor process request over, but response failed", e); - log.error(request.toString()); - log.error(response.toString()); - } - } - } - public void executeSendMessageHookBefore(final ChannelHandlerContext ctx, final RemotingCommand request, SendMessageContext context) { if (hasSendMessageHook()) { diff --git a/broker/src/main/java/org/apache/rocketmq/broker/processor/SendMessageProcessor.java b/broker/src/main/java/org/apache/rocketmq/broker/processor/SendMessageProcessor.java index 994d596..0499695 100644 --- a/broker/src/main/java/org/apache/rocketmq/broker/processor/SendMessageProcessor.java +++ b/broker/src/main/java/org/apache/rocketmq/broker/processor/SendMessageProcessor.java @@ -545,8 +545,6 @@ public class SendMessageProcessor extends AbstractSendMessageProcessor implement responseHeader.setQueueId(queueIdInt); responseHeader.setQueueOffset(putMessageResult.getAppendMessageResult().getLogicsOffset()); - doResponse(ctx, request, response); - if (hasSendMessageHook()) { sendMessageContext.setMsgId(responseHeader.getMsgId()); sendMessageContext.setQueueId(responseHeader.getQueueId()); @@ -561,7 +559,7 @@ public class SendMessageProcessor extends AbstractSendMessageProcessor implement sendMessageContext.setCommercialSendSize(wroteSize); sendMessageContext.setCommercialOwner(owner); } - return null; + return response; } else { if (hasSendMessageHook()) { int wroteSize = request.getBody().length; diff --git a/common/src/main/java/org/apache/rocketmq/common/protocol/header/PullMessageRequestHeader.java b/common/src/main/java/org/apache/rocketmq/common/protocol/header/PullMessageRequestHeader.java index adc32df..e351344 100644 --- a/common/src/main/java/org/apache/rocketmq/common/protocol/header/PullMessageRequestHeader.java +++ b/common/src/main/java/org/apache/rocketmq/common/protocol/header/PullMessageRequestHeader.java @@ -28,6 +28,8 @@ import org.apache.rocketmq.remoting.annotation.CFNullable; import org.apache.rocketmq.remoting.exception.RemotingCommandException; import org.apache.rocketmq.remoting.protocol.FastCodesHeader; +import io.netty.buffer.ByteBuf; + public class PullMessageRequestHeader implements CommandCustomHeader, FastCodesHeader { @CFNotNull private String consumerGroup; @@ -56,6 +58,21 @@ public class PullMessageRequestHeader implements CommandCustomHeader, FastCodesH } @Override + public void encode(ByteBuf out) { + writeIfNotNull(out, "consumerGroup", consumerGroup); + writeIfNotNull(out, "topic", topic); + writeIfNotNull(out, "queueId", queueId); + writeIfNotNull(out, "queueOffset", queueOffset); + writeIfNotNull(out, "maxMsgNums", maxMsgNums); + writeIfNotNull(out, "sysFlag", sysFlag); + writeIfNotNull(out, "commitOffset", commitOffset); + writeIfNotNull(out, "suspendTimeoutMillis", suspendTimeoutMillis); + writeIfNotNull(out, "subscription", subscription); + writeIfNotNull(out, "subVersion", subVersion); + writeIfNotNull(out, "expressionType", expressionType); + } + + @Override public void decode(HashMap<String, String> fields) throws RemotingCommandException { String str = fields.get("consumerGroup"); checkNotNull(str, "the custom field <consumerGroup> is null"); diff --git a/common/src/main/java/org/apache/rocketmq/common/protocol/header/PullMessageResponseHeader.java b/common/src/main/java/org/apache/rocketmq/common/protocol/header/PullMessageResponseHeader.java index db7f24b..1ac5050 100644 --- a/common/src/main/java/org/apache/rocketmq/common/protocol/header/PullMessageResponseHeader.java +++ b/common/src/main/java/org/apache/rocketmq/common/protocol/header/PullMessageResponseHeader.java @@ -27,6 +27,8 @@ import org.apache.rocketmq.remoting.annotation.CFNotNull; import org.apache.rocketmq.remoting.exception.RemotingCommandException; import org.apache.rocketmq.remoting.protocol.FastCodesHeader; +import io.netty.buffer.ByteBuf; + public class PullMessageResponseHeader implements CommandCustomHeader, FastCodesHeader { @CFNotNull private Long suggestWhichBrokerId; @@ -42,6 +44,14 @@ public class PullMessageResponseHeader implements CommandCustomHeader, FastCodes } @Override + public void encode(ByteBuf out) { + writeIfNotNull(out, "suggestWhichBrokerId", suggestWhichBrokerId); + writeIfNotNull(out, "nextBeginOffset", nextBeginOffset); + writeIfNotNull(out, "minOffset", minOffset); + writeIfNotNull(out, "maxOffset", maxOffset); + } + + @Override public void decode(HashMap<String, String> fields) throws RemotingCommandException { String str = fields.get("suggestWhichBrokerId"); checkNotNull(str, "the custom field <suggestWhichBrokerId> is null"); diff --git a/common/src/main/java/org/apache/rocketmq/common/protocol/header/SendMessageRequestHeaderV2.java b/common/src/main/java/org/apache/rocketmq/common/protocol/header/SendMessageRequestHeaderV2.java index de26947..f0cd9e5 100644 --- a/common/src/main/java/org/apache/rocketmq/common/protocol/header/SendMessageRequestHeaderV2.java +++ b/common/src/main/java/org/apache/rocketmq/common/protocol/header/SendMessageRequestHeaderV2.java @@ -25,6 +25,8 @@ import org.apache.rocketmq.remoting.annotation.CFNotNull; import org.apache.rocketmq.remoting.annotation.CFNullable; import org.apache.rocketmq.remoting.exception.RemotingCommandException; +import io.netty.buffer.ByteBuf; + /** * Use short variable name to speed up FastJson deserialization process. */ @@ -108,6 +110,25 @@ public class SendMessageRequestHeaderV2 implements CommandCustomHeader, FastCode } @Override + public void encode(ByteBuf out) { + writeIfNotNull(out, "a", a); + writeIfNotNull(out, "b", b); + writeIfNotNull(out, "c", c); + writeIfNotNull(out, "d", d); + writeIfNotNull(out, "e", e); + writeIfNotNull(out, "f", f); + writeIfNotNull(out, "g", g); + writeIfNotNull(out, "h", h); + writeIfNotNull(out, "i", i); + writeIfNotNull(out, "j", j); + writeIfNotNull(out, "k", k); + writeIfNotNull(out, "l", l); + writeIfNotNull(out, "m", m); + writeIfNotNull(out, "n", n); + writeIfNotNull(out, "o", o); + } + + @Override public void decode(HashMap<String, String> fields) throws RemotingCommandException { String str = fields.get("a"); diff --git a/common/src/main/java/org/apache/rocketmq/common/protocol/header/SendMessageResponseHeader.java b/common/src/main/java/org/apache/rocketmq/common/protocol/header/SendMessageResponseHeader.java index 9d8786f..cc60e37 100644 --- a/common/src/main/java/org/apache/rocketmq/common/protocol/header/SendMessageResponseHeader.java +++ b/common/src/main/java/org/apache/rocketmq/common/protocol/header/SendMessageResponseHeader.java @@ -27,6 +27,8 @@ import org.apache.rocketmq.remoting.annotation.CFNotNull; import org.apache.rocketmq.remoting.exception.RemotingCommandException; import org.apache.rocketmq.remoting.protocol.FastCodesHeader; +import io.netty.buffer.ByteBuf; + public class SendMessageResponseHeader implements CommandCustomHeader, FastCodesHeader { @CFNotNull private String msgId; @@ -41,6 +43,14 @@ public class SendMessageResponseHeader implements CommandCustomHeader, FastCodes } @Override + public void encode(ByteBuf out) { + writeIfNotNull(out, "msgId", msgId); + writeIfNotNull(out, "queueId", queueId); + writeIfNotNull(out, "queueOffset", queueOffset); + writeIfNotNull(out, "transactionId", transactionId); + } + + @Override public void decode(HashMap<String, String> fields) throws RemotingCommandException { String str = fields.get("msgId"); checkNotNull(str, "the custom field <msgId> is null"); diff --git a/example/src/main/java/org/apache/rocketmq/example/benchmark/BatchProducer.java b/example/src/main/java/org/apache/rocketmq/example/benchmark/BatchProducer.java index cf207cd..eadb9c3 100644 --- a/example/src/main/java/org/apache/rocketmq/example/benchmark/BatchProducer.java +++ b/example/src/main/java/org/apache/rocketmq/example/benchmark/BatchProducer.java @@ -46,6 +46,8 @@ import org.apache.rocketmq.logging.InternalLogger; import org.apache.rocketmq.common.message.Message; import org.apache.rocketmq.remoting.RPCHook; import org.apache.rocketmq.remoting.exception.RemotingException; +import org.apache.rocketmq.remoting.protocol.RemotingCommand; +import org.apache.rocketmq.remoting.protocol.SerializeType; import org.apache.rocketmq.srvutil.ServerUtil; public class BatchProducer { @@ -53,6 +55,7 @@ public class BatchProducer { private static byte[] msgBody; public static void main(String[] args) throws MQClientException { + System.setProperty(RemotingCommand.SERIALIZE_TYPE_PROPERTY, SerializeType.ROCKETMQ.name()); Options options = ServerUtil.buildCommandlineOptions(new Options()); CommandLine commandLine = ServerUtil.parseCmdLine("benchmarkBatchProducer", args, buildCommandlineOptions(options), new PosixParser()); diff --git a/example/src/main/java/org/apache/rocketmq/example/benchmark/Consumer.java b/example/src/main/java/org/apache/rocketmq/example/benchmark/Consumer.java index d08795d..c9e64f3 100644 --- a/example/src/main/java/org/apache/rocketmq/example/benchmark/Consumer.java +++ b/example/src/main/java/org/apache/rocketmq/example/benchmark/Consumer.java @@ -34,6 +34,8 @@ import org.apache.rocketmq.common.MixAll; import org.apache.rocketmq.common.filter.ExpressionType; import org.apache.rocketmq.common.message.MessageExt; import org.apache.rocketmq.remoting.RPCHook; +import org.apache.rocketmq.remoting.protocol.RemotingCommand; +import org.apache.rocketmq.remoting.protocol.SerializeType; import org.apache.rocketmq.srvutil.ServerUtil; import java.io.IOException; @@ -49,6 +51,7 @@ import java.util.concurrent.atomic.AtomicLong; public class Consumer { public static void main(String[] args) throws MQClientException, IOException { + System.setProperty(RemotingCommand.SERIALIZE_TYPE_PROPERTY, SerializeType.ROCKETMQ.name()); Options options = ServerUtil.buildCommandlineOptions(new Options()); CommandLine commandLine = ServerUtil.parseCmdLine("benchmarkConsumer", args, buildCommandlineOptions(options), new PosixParser()); if (null == commandLine) { diff --git a/example/src/main/java/org/apache/rocketmq/example/benchmark/Producer.java b/example/src/main/java/org/apache/rocketmq/example/benchmark/Producer.java index c32e00e..bdef16e 100644 --- a/example/src/main/java/org/apache/rocketmq/example/benchmark/Producer.java +++ b/example/src/main/java/org/apache/rocketmq/example/benchmark/Producer.java @@ -32,6 +32,8 @@ import org.apache.rocketmq.common.message.Message; import org.apache.rocketmq.logging.InternalLogger; import org.apache.rocketmq.remoting.RPCHook; import org.apache.rocketmq.remoting.exception.RemotingException; +import org.apache.rocketmq.remoting.protocol.RemotingCommand; +import org.apache.rocketmq.remoting.protocol.SerializeType; import org.apache.rocketmq.srvutil.ServerUtil; import java.util.Arrays; @@ -50,6 +52,7 @@ public class Producer { private static byte[] msgBody; public static void main(String[] args) throws MQClientException { + System.setProperty(RemotingCommand.SERIALIZE_TYPE_PROPERTY, SerializeType.ROCKETMQ.name()); Options options = ServerUtil.buildCommandlineOptions(new Options()); CommandLine commandLine = ServerUtil.parseCmdLine("benchmarkProducer", args, buildCommandlineOptions(options), new PosixParser()); diff --git a/example/src/main/java/org/apache/rocketmq/example/benchmark/TransactionProducer.java b/example/src/main/java/org/apache/rocketmq/example/benchmark/TransactionProducer.java index 5e2f287..be5ccf2 100644 --- a/example/src/main/java/org/apache/rocketmq/example/benchmark/TransactionProducer.java +++ b/example/src/main/java/org/apache/rocketmq/example/benchmark/TransactionProducer.java @@ -32,6 +32,8 @@ import org.apache.rocketmq.common.message.Message; import org.apache.rocketmq.common.message.MessageConst; import org.apache.rocketmq.common.message.MessageExt; import org.apache.rocketmq.remoting.RPCHook; +import org.apache.rocketmq.remoting.protocol.RemotingCommand; +import org.apache.rocketmq.remoting.protocol.SerializeType; import org.apache.rocketmq.srvutil.ServerUtil; import java.io.UnsupportedEncodingException; @@ -61,6 +63,7 @@ public class TransactionProducer { static final int MAX_CHECK_RESULT_IN_MSG = 20; public static void main(String[] args) throws MQClientException, UnsupportedEncodingException { + System.setProperty(RemotingCommand.SERIALIZE_TYPE_PROPERTY, SerializeType.ROCKETMQ.name()); Options options = ServerUtil.buildCommandlineOptions(new Options()); CommandLine commandLine = ServerUtil.parseCmdLine("TransactionProducer", args, buildCommandlineOptions(options), new PosixParser()); TxSendConfig config = new TxSendConfig(); diff --git a/remoting/src/main/java/org/apache/rocketmq/remoting/netty/NettyDecoder.java b/remoting/src/main/java/org/apache/rocketmq/remoting/netty/NettyDecoder.java index f64ab2d..57ee601 100644 --- a/remoting/src/main/java/org/apache/rocketmq/remoting/netty/NettyDecoder.java +++ b/remoting/src/main/java/org/apache/rocketmq/remoting/netty/NettyDecoder.java @@ -44,10 +44,7 @@ public class NettyDecoder extends LengthFieldBasedFrameDecoder { if (null == frame) { return null; } - - ByteBuffer byteBuffer = frame.nioBuffer(); - - return RemotingCommand.decode(byteBuffer); + return RemotingCommand.decode(frame); } catch (Exception e) { log.error("decode exception, " + RemotingHelper.parseChannelRemoteAddr(ctx.channel()), e); RemotingUtil.closeChannel(ctx.channel()); diff --git a/remoting/src/main/java/org/apache/rocketmq/remoting/netty/NettyEncoder.java b/remoting/src/main/java/org/apache/rocketmq/remoting/netty/NettyEncoder.java index 4506e71..7463619 100644 --- a/remoting/src/main/java/org/apache/rocketmq/remoting/netty/NettyEncoder.java +++ b/remoting/src/main/java/org/apache/rocketmq/remoting/netty/NettyEncoder.java @@ -35,8 +35,7 @@ public class NettyEncoder extends MessageToByteEncoder<RemotingCommand> { public void encode(ChannelHandlerContext ctx, RemotingCommand remotingCommand, ByteBuf out) throws Exception { try { - ByteBuffer header = remotingCommand.encodeHeader(); - out.writeBytes(header); + remotingCommand.fastEncodeHeader(out); byte[] body = remotingCommand.getBody(); if (body != null) { out.writeBytes(body); diff --git a/remoting/src/main/java/org/apache/rocketmq/remoting/netty/NettyRemotingAbstract.java b/remoting/src/main/java/org/apache/rocketmq/remoting/netty/NettyRemotingAbstract.java index b2e7294..eaa2e0d 100644 --- a/remoting/src/main/java/org/apache/rocketmq/remoting/netty/NettyRemotingAbstract.java +++ b/remoting/src/main/java/org/apache/rocketmq/remoting/netty/NettyRemotingAbstract.java @@ -209,6 +209,7 @@ public abstract class NettyRemotingAbstract { if (response != null) { response.setOpaque(opaque); response.markResponseType(); + response.setSerializeTypeCurrentRPC(cmd.getSerializeTypeCurrentRPC()); try { ctx.writeAndFlush(response); } catch (Throwable e) { diff --git a/remoting/src/main/java/org/apache/rocketmq/remoting/protocol/FastCodesHeader.java b/remoting/src/main/java/org/apache/rocketmq/remoting/protocol/FastCodesHeader.java index 4604ae1..f313da2 100644 --- a/remoting/src/main/java/org/apache/rocketmq/remoting/protocol/FastCodesHeader.java +++ b/remoting/src/main/java/org/apache/rocketmq/remoting/protocol/FastCodesHeader.java @@ -21,6 +21,8 @@ import java.util.HashMap; import org.apache.rocketmq.remoting.exception.RemotingCommandException; +import io.netty.buffer.ByteBuf; + public interface FastCodesHeader { default void checkNotNull(String s, String msg) throws RemotingCommandException { if (s == null) { @@ -28,6 +30,15 @@ public interface FastCodesHeader { } } + default void writeIfNotNull(ByteBuf out, String key, Object value) { + if (value != null) { + RocketMQSerializable.writeStr(out, true, key); + RocketMQSerializable.writeStr(out, false, value.toString()); + } + } + + public void encode(ByteBuf out); + void decode(HashMap<String, String> fields) throws RemotingCommandException; diff --git a/remoting/src/main/java/org/apache/rocketmq/remoting/protocol/RemotingCommand.java b/remoting/src/main/java/org/apache/rocketmq/remoting/protocol/RemotingCommand.java index 912eea5..d469d10 100644 --- a/remoting/src/main/java/org/apache/rocketmq/remoting/protocol/RemotingCommand.java +++ b/remoting/src/main/java/org/apache/rocketmq/remoting/protocol/RemotingCommand.java @@ -31,6 +31,9 @@ import org.apache.rocketmq.remoting.exception.RemotingCommandException; import org.apache.rocketmq.logging.InternalLogger; import org.apache.rocketmq.logging.InternalLoggerFactory; +import io.netty.buffer.ByteBuf; +import io.netty.buffer.Unpooled; + public class RemotingCommand { public static final String SERIALIZE_TYPE_PROPERTY = "rocketmq.serialize.type"; public static final String SERIALIZE_TYPE_ENV = "ROCKETMQ_SERIALIZE_TYPE"; @@ -142,20 +145,21 @@ public class RemotingCommand { } public static RemotingCommand decode(final ByteBuffer byteBuffer) { - int length = byteBuffer.limit(); - int oriHeaderLen = byteBuffer.getInt(); - int headerLength = getHeaderLength(oriHeaderLen); + return decode(Unpooled.wrappedBuffer(byteBuffer)); + } - byte[] headerData = new byte[headerLength]; - byteBuffer.get(headerData); + public static RemotingCommand decode(final ByteBuf byteBuffer) { + int length = byteBuffer.readableBytes(); + int oriHeaderLen = byteBuffer.readInt(); + int headerLength = getHeaderLength(oriHeaderLen); - RemotingCommand cmd = headerDecode(headerData, getProtocolType(oriHeaderLen)); + RemotingCommand cmd = headerDecode(byteBuffer, headerLength, getProtocolType(oriHeaderLen)); int bodyLength = length - 4 - headerLength; byte[] bodyData = null; if (bodyLength > 0) { bodyData = new byte[bodyLength]; - byteBuffer.get(bodyData); + byteBuffer.readBytes(bodyData); } cmd.body = bodyData; @@ -166,14 +170,16 @@ public class RemotingCommand { return length & 0xFFFFFF; } - private static RemotingCommand headerDecode(byte[] headerData, SerializeType type) { + private static RemotingCommand headerDecode(ByteBuf byteBuffer, int len, SerializeType type) { switch (type) { case JSON: + byte[] headerData = new byte[len]; + byteBuffer.readBytes(headerData); RemotingCommand resultJson = RemotingSerializable.decode(headerData, RemotingCommand.class); resultJson.setSerializeTypeCurrentRPC(type); return resultJson; case ROCKETMQ: - RemotingCommand resultRMQ = RocketMQSerializable.rocketMQProtocolDecode(headerData); + RemotingCommand resultRMQ = RocketMQSerializable.rocketMQProtocolDecode(byteBuffer); resultRMQ.setSerializeTypeCurrentRPC(type); return resultRMQ; default: @@ -208,14 +214,8 @@ public class RemotingCommand { return true; } - public static byte[] markProtocolType(int source, SerializeType type) { - byte[] result = new byte[4]; - - result[0] = type.getCode(); - result[1] = (byte) ((source >> 16) & 0xFF); - result[2] = (byte) ((source >> 8) & 0xFF); - result[3] = (byte) (source & 0xFF); - return result; + public static int markProtocolType(int source, SerializeType type) { + return (type.getCode() << 24) | (source & 0x00FFFFFF); } public void markResponseType() { @@ -349,7 +349,7 @@ public class RemotingCommand { result.putInt(length); // header length - result.put(markProtocolType(headerData.length, serializeTypeCurrentRPC)); + result.putInt(markProtocolType(headerData.length, serializeTypeCurrentRPC)); // header data result.put(headerData); @@ -401,6 +401,27 @@ public class RemotingCommand { } } + public void fastEncodeHeader(ByteBuf out) { + int bodySize = this.body != null ? this.body.length : 0; + int beginIndex = out.writerIndex(); + // skip 8 bytes + out.writeLong(0); + int headerSize; + if (SerializeType.ROCKETMQ == serializeTypeCurrentRPC) { + if (customHeader != null && !(customHeader instanceof FastCodesHeader)) { + this.makeCustomHeaderToNet(); + } + headerSize = RocketMQSerializable.rocketMQProtocolEncode(this, out); + } else { + this.makeCustomHeaderToNet(); + byte[] header = RemotingSerializable.encode(this); + headerSize = header.length; + out.writeBytes(header); + } + out.setInt(beginIndex, 4 + headerSize + bodySize); + out.setInt(beginIndex + 4, markProtocolType(headerSize, serializeTypeCurrentRPC)); + } + public ByteBuffer encodeHeader() { return encodeHeader(this.body != null ? this.body.length : 0); } @@ -424,7 +445,7 @@ public class RemotingCommand { result.putInt(length); // header length - result.put(markProtocolType(headerData.length, serializeTypeCurrentRPC)); + result.putInt(markProtocolType(headerData.length, serializeTypeCurrentRPC)); // header data result.put(headerData); diff --git a/remoting/src/main/java/org/apache/rocketmq/remoting/protocol/RocketMQSerializable.java b/remoting/src/main/java/org/apache/rocketmq/remoting/protocol/RocketMQSerializable.java index 66119e0..ed8a28f 100644 --- a/remoting/src/main/java/org/apache/rocketmq/remoting/protocol/RocketMQSerializable.java +++ b/remoting/src/main/java/org/apache/rocketmq/remoting/protocol/RocketMQSerializable.java @@ -18,12 +18,77 @@ package org.apache.rocketmq.remoting.protocol; import java.nio.ByteBuffer; import java.nio.charset.Charset; +import java.nio.charset.StandardCharsets; import java.util.HashMap; import java.util.Iterator; import java.util.Map; +import io.netty.buffer.ByteBuf; + public class RocketMQSerializable { - private static final Charset CHARSET_UTF8 = Charset.forName("UTF-8"); + private static final Charset CHARSET_UTF8 = StandardCharsets.UTF_8; + + public static void writeStr(ByteBuf buf, boolean useShortLength, String str) { + int lenIndex = buf.writerIndex(); + if (useShortLength) { + buf.writeShort(0); + } else { + buf.writeInt(0); + } + int len = buf.writeCharSequence(str, StandardCharsets.UTF_8); + if (useShortLength) { + buf.setShort(lenIndex, len); + } else { + buf.setInt(lenIndex, len); + } + } + + public static String readStr(ByteBuf buf, boolean useShortLength) { + int len = useShortLength ? buf.readShort() : buf.readInt(); + if (len == 0) { + return null; + } + CharSequence cs = buf.readCharSequence(len, StandardCharsets.UTF_8); + return cs == null ? null : cs.toString(); + } + + public static int rocketMQProtocolEncode(RemotingCommand cmd, ByteBuf out) { + int beginIndex = out.writerIndex(); + // int code(~32767) + out.writeShort(cmd.getCode()); + // LanguageCode language + out.writeByte(cmd.getLanguage().getCode()); + // int version(~32767) + out.writeShort(cmd.getVersion()); + // int opaque + out.writeInt(cmd.getOpaque()); + // int flag + out.writeInt(cmd.getFlag()); + // String remark + String remark = cmd.getRemark(); + if (remark != null && !remark.isEmpty()) { + writeStr(out, false, remark); + } else { + out.writeInt(0); + } + + int mapLenIndex = out.writerIndex(); + out.writeInt(0); + if (cmd.readCustomHeader() instanceof FastCodesHeader) { + ((FastCodesHeader) cmd.readCustomHeader()).encode(out); + } + HashMap<String, String> map = cmd.getExtFields(); + if (map != null && !map.isEmpty()) { + map.forEach((k, v) -> { + if (k != null && v != null) { + writeStr(out, true, k); + writeStr(out, false, v); + } + }); + } + out.setInt(mapLenIndex, out.writerIndex() - mapLenIndex - 4); + return out.writerIndex() - beginIndex; + } public static byte[] rocketMQProtocolEncode(RemotingCommand cmd) { // String remark @@ -133,58 +198,38 @@ public class RocketMQSerializable { return length; } - public static RemotingCommand rocketMQProtocolDecode(final byte[] headerArray) { + public static RemotingCommand rocketMQProtocolDecode(final ByteBuf headerBuffer) { RemotingCommand cmd = new RemotingCommand(); - ByteBuffer headerBuffer = ByteBuffer.wrap(headerArray); // int code(~32767) - cmd.setCode(headerBuffer.getShort()); + cmd.setCode(headerBuffer.readShort()); // LanguageCode language - cmd.setLanguage(LanguageCode.valueOf(headerBuffer.get())); + cmd.setLanguage(LanguageCode.valueOf(headerBuffer.readByte())); // int version(~32767) - cmd.setVersion(headerBuffer.getShort()); + cmd.setVersion(headerBuffer.readShort()); // int opaque - cmd.setOpaque(headerBuffer.getInt()); + cmd.setOpaque(headerBuffer.readInt()); // int flag - cmd.setFlag(headerBuffer.getInt()); + cmd.setFlag(headerBuffer.readInt()); // String remark - int remarkLength = headerBuffer.getInt(); - if (remarkLength > 0) { - byte[] remarkContent = new byte[remarkLength]; - headerBuffer.get(remarkContent); - cmd.setRemark(new String(remarkContent, CHARSET_UTF8)); - } + cmd.setRemark(readStr(headerBuffer, false)); // HashMap<String, String> extFields - int extFieldsLength = headerBuffer.getInt(); + int extFieldsLength = headerBuffer.readInt(); if (extFieldsLength > 0) { - byte[] extFieldsBytes = new byte[extFieldsLength]; - headerBuffer.get(extFieldsBytes); - cmd.setExtFields(mapDeserialize(extFieldsBytes)); + cmd.setExtFields(mapDeserialize(headerBuffer, extFieldsLength)); } return cmd; } - public static HashMap<String, String> mapDeserialize(byte[] bytes) { - if (bytes == null || bytes.length <= 0) - return null; - - HashMap<String, String> map = new HashMap<String, String>(); - ByteBuffer byteBuffer = ByteBuffer.wrap(bytes); - - short keySize; - byte[] keyContent; - int valSize; - byte[] valContent; - while (byteBuffer.hasRemaining()) { - keySize = byteBuffer.getShort(); - keyContent = new byte[keySize]; - byteBuffer.get(keyContent); + public static HashMap<String, String> mapDeserialize(ByteBuf byteBuffer, int len) { - valSize = byteBuffer.getInt(); - valContent = new byte[valSize]; - byteBuffer.get(valContent); + HashMap<String, String> map = new HashMap<>(); + int endIndex = byteBuffer.readerIndex() + len; - map.put(new String(keyContent, CHARSET_UTF8), new String(valContent, CHARSET_UTF8)); + while (byteBuffer.readerIndex() < endIndex) { + String k = readStr(byteBuffer, true); + String v = readStr(byteBuffer, false); + map.put(k, v); } return map; } diff --git a/remoting/src/test/java/org/apache/rocketmq/remoting/protocol/RemotingCommandTest.java b/remoting/src/test/java/org/apache/rocketmq/remoting/protocol/RemotingCommandTest.java index 2bd41ce..a5d1993 100644 --- a/remoting/src/test/java/org/apache/rocketmq/remoting/protocol/RemotingCommandTest.java +++ b/remoting/src/test/java/org/apache/rocketmq/remoting/protocol/RemotingCommandTest.java @@ -31,7 +31,13 @@ public class RemotingCommandTest { public void testMarkProtocolType_JSONProtocolType() { int source = 261; SerializeType type = SerializeType.JSON; - byte[] result = RemotingCommand.markProtocolType(source, type); + + byte[] result = new byte[4]; + int x = RemotingCommand.markProtocolType(source, type); + result[0] = (byte) (x >> 24); + result[1] = (byte) (x >> 16); + result[2] = (byte) (x >> 8); + result[3] = (byte) x; assertThat(result).isEqualTo(new byte[] {0, 0, 1, 5}); } @@ -39,7 +45,12 @@ public class RemotingCommandTest { public void testMarkProtocolType_ROCKETMQProtocolType() { int source = 16777215; SerializeType type = SerializeType.ROCKETMQ; - byte[] result = RemotingCommand.markProtocolType(source, type); + byte[] result = new byte[4]; + int x = RemotingCommand.markProtocolType(source, type); + result[0] = (byte) (x >> 24); + result[1] = (byte) (x >> 16); + result[2] = (byte) (x >> 8); + result[3] = (byte) x; assertThat(result).isEqualTo(new byte[] {1, -1, -1, -1}); } diff --git a/remoting/src/test/java/org/apache/rocketmq/remoting/protocol/RocketMQSerializableTest.java b/remoting/src/test/java/org/apache/rocketmq/remoting/protocol/RocketMQSerializableTest.java index f1db54f..83e3cae 100644 --- a/remoting/src/test/java/org/apache/rocketmq/remoting/protocol/RocketMQSerializableTest.java +++ b/remoting/src/test/java/org/apache/rocketmq/remoting/protocol/RocketMQSerializableTest.java @@ -17,10 +17,17 @@ package org.apache.rocketmq.remoting.protocol; import java.util.HashMap; + +import org.apache.rocketmq.remoting.CommandCustomHeader; +import org.apache.rocketmq.remoting.exception.RemotingCommandException; import org.junit.Test; import static org.assertj.core.api.Assertions.assertThat; +import io.netty.buffer.ByteBuf; +import io.netty.buffer.ByteBufAllocator; +import io.netty.buffer.Unpooled; + public class RocketMQSerializableTest { @Test public void testRocketMQProtocolEncodeAndDecode_WithoutRemarkWithoutExtFields() { @@ -42,7 +49,7 @@ public class RocketMQSerializableTest { assertThat(parseToInt(result, 13)).isEqualTo(0); //empty remark assertThat(parseToInt(result, 17)).isEqualTo(0); //empty extFields - RemotingCommand decodedCommand = RocketMQSerializable.rocketMQProtocolDecode(result); + RemotingCommand decodedCommand = RocketMQSerializable.rocketMQProtocolDecode(Unpooled.wrappedBuffer(result)); assertThat(decodedCommand.getCode()).isEqualTo(code); assertThat(decodedCommand.getLanguage()).isEqualTo(LanguageCode.JAVA); @@ -80,7 +87,7 @@ public class RocketMQSerializableTest { assertThat(parseToInt(result, 30)).isEqualTo(0); //empty extFields - RemotingCommand decodedCommand = RocketMQSerializable.rocketMQProtocolDecode(result); + RemotingCommand decodedCommand = RocketMQSerializable.rocketMQProtocolDecode(Unpooled.wrappedBuffer(result)); assertThat(decodedCommand.getCode()).isEqualTo(code); assertThat(decodedCommand.getLanguage()).isEqualTo(LanguageCode.JAVA); @@ -115,10 +122,11 @@ public class RocketMQSerializableTest { byte[] extFieldsArray = new byte[14]; System.arraycopy(result, 21, extFieldsArray, 0, 14); - HashMap<String, String> extFields = RocketMQSerializable.mapDeserialize(extFieldsArray); + HashMap<String, String> extFields = + RocketMQSerializable.mapDeserialize(Unpooled.wrappedBuffer(extFieldsArray), extFieldsArray.length); assertThat(extFields).contains(new HashMap.SimpleEntry("key", "value")); - RemotingCommand decodedCommand = RocketMQSerializable.rocketMQProtocolDecode(result); + RemotingCommand decodedCommand = RocketMQSerializable.rocketMQProtocolDecode(Unpooled.wrappedBuffer(result)); assertThat(decodedCommand.getCode()).isEqualTo(code); assertThat(decodedCommand.getLanguage()).isEqualTo(LanguageCode.JAVA); @@ -150,4 +158,56 @@ public class RocketMQSerializableTest { return array[index] * 16777216 + array[++index] * 65536 + array[++index] * 256 + array[++index]; } + + public static class MyHeader1 implements CommandCustomHeader { + private String str; + private int num; + + @Override + public void checkFields() throws RemotingCommandException { + } + + public String getStr() { + return str; + } + + public void setStr(String str) { + this.str = str; + } + + public int getNum() { + return num; + } + + public void setNum(int num) { + this.num = num; + } + } + + @Test + public void testFastEncode() throws Exception { + ByteBuf buf = ByteBufAllocator.DEFAULT.buffer(16); + MyHeader1 header1 = new MyHeader1(); + header1.setStr("s1"); + header1.setNum(100); + RemotingCommand cmd = RemotingCommand.createRequestCommand(1, header1); + cmd.setRemark("remark"); + cmd.setOpaque(1001); + cmd.setVersion(99); + cmd.setLanguage(LanguageCode.JAVA); + cmd.setFlag(3); + cmd.makeCustomHeaderToNet(); + RocketMQSerializable.rocketMQProtocolEncode(cmd, buf); + RemotingCommand cmd2 = RocketMQSerializable.rocketMQProtocolDecode(buf); + assertThat(cmd2.getRemark()).isEqualTo("remark"); + assertThat(cmd2.getCode()).isEqualTo(1); + assertThat(cmd2.getOpaque()).isEqualTo(1001); + assertThat(cmd2.getVersion()).isEqualTo(99); + assertThat(cmd2.getLanguage()).isEqualTo(LanguageCode.JAVA); + assertThat(cmd2.getFlag()).isEqualTo(3); + + MyHeader1 h2 = (MyHeader1) cmd2.decodeCommandCustomHeader(MyHeader1.class); + assertThat(h2.getStr()).isEqualTo("s1"); + assertThat(h2.getNum()).isEqualTo(100); + } } \ No newline at end of file
