This is an automated email from the ASF dual-hosted git repository. lollipop pushed a commit to branch develop in repository https://gitbox.apache.org/repos/asf/rocketmq.git
The following commit(s) were added to refs/heads/develop by this push: new 92e058c823 [ISSUE #7774] Make the handle of ppv2 tlv more extendable (#7775) 92e058c823 is described below commit 92e058c823d7d064834710ff470a0d61dc067074 Author: dingshuangxi888 <dingshuangxi...@gmail.com> AuthorDate: Wed Jan 24 09:50:15 2024 +0800 [ISSUE #7774] Make the handle of ppv2 tlv more extendable (#7775) * Fix ascii validate for ppv2 tls. * make the handle of ppv2 tlv extendable. --- .../proxy/grpc/ProxyAndTlsProtocolNegotiator.java | 38 ++++++------- .../remoting/MultiProtocolRemotingServer.java | 2 +- .../http2proxy/HAProxyMessageForwarder.java | 23 +++++--- .../http2proxy/Http2ProtocolProxyHandler.java | 12 +++-- .../grpc/ProxyAndTlsProtocolNegotiatorTest.java | 49 +++++++++++++++++ .../http2proxy/HAProxyMessageForwarderTest.java | 47 ++++++++++++++++ .../http2proxy/Http2ProtocolProxyHandlerTest.java | 61 +++++++++++++++++++++ .../remoting/netty/NettyRemotingServer.java | 23 ++++---- .../remoting/netty/NettyRemotingServerTest.java | 63 ++++++++++++++++++++++ 9 files changed, 276 insertions(+), 42 deletions(-) diff --git a/proxy/src/main/java/org/apache/rocketmq/proxy/grpc/ProxyAndTlsProtocolNegotiator.java b/proxy/src/main/java/org/apache/rocketmq/proxy/grpc/ProxyAndTlsProtocolNegotiator.java index cdf33165d7..7c92866803 100644 --- a/proxy/src/main/java/org/apache/rocketmq/proxy/grpc/ProxyAndTlsProtocolNegotiator.java +++ b/proxy/src/main/java/org/apache/rocketmq/proxy/grpc/ProxyAndTlsProtocolNegotiator.java @@ -34,6 +34,7 @@ import io.grpc.netty.shaded.io.netty.handler.codec.ProtocolDetectionState; import io.grpc.netty.shaded.io.netty.handler.codec.haproxy.HAProxyMessage; import io.grpc.netty.shaded.io.netty.handler.codec.haproxy.HAProxyMessageDecoder; import io.grpc.netty.shaded.io.netty.handler.codec.haproxy.HAProxyProtocolVersion; +import io.grpc.netty.shaded.io.netty.handler.codec.haproxy.HAProxyTLV; import io.grpc.netty.shaded.io.netty.handler.ssl.ClientAuth; import io.grpc.netty.shaded.io.netty.handler.ssl.SslContext; import io.grpc.netty.shaded.io.netty.handler.ssl.SslHandler; @@ -41,7 +42,10 @@ import io.grpc.netty.shaded.io.netty.handler.ssl.util.InsecureTrustManagerFactor import io.grpc.netty.shaded.io.netty.handler.ssl.util.SelfSignedCertificate; import io.grpc.netty.shaded.io.netty.util.AsciiString; import io.grpc.netty.shaded.io.netty.util.CharsetUtil; -import java.nio.charset.StandardCharsets; +import java.io.InputStream; +import java.nio.file.Files; +import java.nio.file.Paths; +import java.util.List; import org.apache.commons.collections.CollectionUtils; import org.apache.commons.lang3.StringUtils; import org.apache.rocketmq.common.constant.HAProxyConstants; @@ -55,11 +59,6 @@ import org.apache.rocketmq.proxy.grpc.constant.AttributeKeys; import org.apache.rocketmq.remoting.common.TlsMode; import org.apache.rocketmq.remoting.netty.TlsSystemConfig; -import java.io.InputStream; -import java.nio.file.Files; -import java.nio.file.Paths; -import java.util.List; - public class ProxyAndTlsProtocolNegotiator implements InternalProtocolNegotiator.ProtocolNegotiator { protected static final Logger log = LoggerFactory.getLogger(LoggerName.PROXY_LOGGER_NAME); @@ -123,7 +122,7 @@ public class ProxyAndTlsProtocolNegotiator implements InternalProtocolNegotiator } } - private static class ProxyAndTlsProtocolHandler extends ByteToMessageDecoder { + private class ProxyAndTlsProtocolHandler extends ByteToMessageDecoder { private final GrpcHttp2ConnectionHandler grpcHandler; @@ -156,7 +155,7 @@ public class ProxyAndTlsProtocolNegotiator implements InternalProtocolNegotiator } } - private static class HAProxyMessageHandler extends ChannelInboundHandlerAdapter { + private class HAProxyMessageHandler extends ChannelInboundHandlerAdapter { private ProtocolNegotiationEvent pne = InternalProtocolNegotiationEvent.getDefault(); @@ -193,16 +192,7 @@ public class ProxyAndTlsProtocolNegotiator implements InternalProtocolNegotiator builder.set(AttributeKeys.PROXY_PROTOCOL_SERVER_PORT, String.valueOf(msg.destinationPort())); } if (CollectionUtils.isNotEmpty(msg.tlvs())) { - msg.tlvs().forEach(tlv -> { - Attributes.Key<String> key = AttributeKeys.valueOf( - HAProxyConstants.PROXY_PROTOCOL_TLV_PREFIX + String.format("%02x", tlv.typeByteValue())); - byte[] valueBytes = ByteBufUtil.getBytes(tlv.content()); - String value = StringUtils.trim(new String(valueBytes, CharsetUtil.UTF_8)); - if (!BinaryUtil.isAscii(value.getBytes(StandardCharsets.UTF_8))) { - return; - } - builder.set(key, value); - }); + msg.tlvs().forEach(tlv -> handleHAProxyTLV(tlv, builder)); } pne = InternalProtocolNegotiationEvent .withAttributes(InternalProtocolNegotiationEvent.getDefault(), builder.build()); @@ -212,7 +202,17 @@ public class ProxyAndTlsProtocolNegotiator implements InternalProtocolNegotiator } } - private static class TlsModeHandler extends ByteToMessageDecoder { + protected void handleHAProxyTLV(HAProxyTLV tlv, Attributes.Builder builder) { + byte[] valueBytes = ByteBufUtil.getBytes(tlv.content()); + if (!BinaryUtil.isAscii(valueBytes)) { + return; + } + Attributes.Key<String> key = AttributeKeys.valueOf( + HAProxyConstants.PROXY_PROTOCOL_TLV_PREFIX + String.format("%02x", tlv.typeByteValue())); + builder.set(key, new String(valueBytes, CharsetUtil.UTF_8)); + } + + private class TlsModeHandler extends ByteToMessageDecoder { private ProtocolNegotiationEvent pne = InternalProtocolNegotiationEvent.getDefault(); diff --git a/proxy/src/main/java/org/apache/rocketmq/proxy/remoting/MultiProtocolRemotingServer.java b/proxy/src/main/java/org/apache/rocketmq/proxy/remoting/MultiProtocolRemotingServer.java index 12d728fff1..d7c2820b27 100644 --- a/proxy/src/main/java/org/apache/rocketmq/proxy/remoting/MultiProtocolRemotingServer.java +++ b/proxy/src/main/java/org/apache/rocketmq/proxy/remoting/MultiProtocolRemotingServer.java @@ -46,7 +46,7 @@ public class MultiProtocolRemotingServer extends NettyRemotingServer { private final NettyServerConfig nettyServerConfig; private final RemotingProtocolHandler remotingProtocolHandler; - private final Http2ProtocolProxyHandler http2ProtocolProxyHandler; + protected Http2ProtocolProxyHandler http2ProtocolProxyHandler; public MultiProtocolRemotingServer(NettyServerConfig nettyServerConfig, ChannelEventListener channelEventListener) { super(nettyServerConfig, channelEventListener); diff --git a/proxy/src/main/java/org/apache/rocketmq/proxy/remoting/protocol/http2proxy/HAProxyMessageForwarder.java b/proxy/src/main/java/org/apache/rocketmq/proxy/remoting/protocol/http2proxy/HAProxyMessageForwarder.java index 8f139d3d9a..99cb99d530 100644 --- a/proxy/src/main/java/org/apache/rocketmq/proxy/remoting/protocol/http2proxy/HAProxyMessageForwarder.java +++ b/proxy/src/main/java/org/apache/rocketmq/proxy/remoting/protocol/http2proxy/HAProxyMessageForwarder.java @@ -29,6 +29,7 @@ import io.netty.handler.codec.haproxy.HAProxyProxiedProtocol; import io.netty.handler.codec.haproxy.HAProxyTLV; import io.netty.util.Attribute; import io.netty.util.DefaultAttributeMap; +import org.apache.commons.codec.DecoderException; import org.apache.commons.codec.binary.Hex; import org.apache.commons.lang3.ArrayUtils; import org.apache.commons.lang3.StringUtils; @@ -50,7 +51,7 @@ public class HAProxyMessageForwarder extends ChannelInboundHandlerAdapter { private static final Logger log = LoggerFactory.getLogger(LoggerName.ROCKETMQ_REMOTING_NAME); private static final Field FIELD_ATTRIBUTE = - FieldUtils.getField(DefaultAttributeMap.class, "attributes", true); + FieldUtils.getField(DefaultAttributeMap.class, "attributes", true); private final Channel outboundChannel; @@ -111,19 +112,25 @@ public class HAProxyMessageForwarder extends ChannelInboundHandlerAdapter { destinationPort = Integer.parseInt(attributeValue); } if (StringUtils.startsWith(attributeKey, HAProxyConstants.PROXY_PROTOCOL_TLV_PREFIX)) { - String typeString = StringUtils.substringAfter(attributeKey, HAProxyConstants.PROXY_PROTOCOL_TLV_PREFIX); - ByteBuf byteBuf = Unpooled.buffer(); - byteBuf.writeBytes(attributeValue.getBytes(Charset.defaultCharset())); - HAProxyTLV haProxyTLV = new HAProxyTLV(Hex.decodeHex(typeString)[0], byteBuf); - haProxyTLVs.add(haProxyTLV); + HAProxyTLV haProxyTLV = buildHAProxyTLV(attributeKey, attributeValue); + if (haProxyTLV != null) { + haProxyTLVs.add(haProxyTLV); + } } } HAProxyProxiedProtocol proxiedProtocol = AclUtils.isColon(sourceAddress) ? HAProxyProxiedProtocol.TCP6 : - HAProxyProxiedProtocol.TCP4; + HAProxyProxiedProtocol.TCP4; HAProxyMessage message = new HAProxyMessage(HAProxyProtocolVersion.V2, HAProxyCommand.PROXY, - proxiedProtocol, sourceAddress, destinationAddress, sourcePort, destinationPort, haProxyTLVs); + proxiedProtocol, sourceAddress, destinationAddress, sourcePort, destinationPort, haProxyTLVs); outboundChannel.writeAndFlush(message).sync(); } + + protected HAProxyTLV buildHAProxyTLV(String attributeKey, String attributeValue) throws DecoderException { + String typeString = StringUtils.substringAfter(attributeKey, HAProxyConstants.PROXY_PROTOCOL_TLV_PREFIX); + ByteBuf byteBuf = Unpooled.buffer(); + byteBuf.writeBytes(attributeValue.getBytes(Charset.defaultCharset())); + return new HAProxyTLV(Hex.decodeHex(typeString)[0], byteBuf); + } } diff --git a/proxy/src/main/java/org/apache/rocketmq/proxy/remoting/protocol/http2proxy/Http2ProtocolProxyHandler.java b/proxy/src/main/java/org/apache/rocketmq/proxy/remoting/protocol/http2proxy/Http2ProtocolProxyHandler.java index c37db92af4..7ce563b030 100644 --- a/proxy/src/main/java/org/apache/rocketmq/proxy/remoting/protocol/http2proxy/Http2ProtocolProxyHandler.java +++ b/proxy/src/main/java/org/apache/rocketmq/proxy/remoting/protocol/http2proxy/Http2ProtocolProxyHandler.java @@ -121,10 +121,7 @@ public class Http2ProtocolProxyHandler implements ProtocolHandler { } final Channel outboundChannel = f.channel(); - if (inboundChannel.hasAttr(AttributeKeys.PROXY_PROTOCOL_ADDR)) { - ctx.pipeline().addLast(new HAProxyMessageForwarder(outboundChannel)); - outboundChannel.pipeline().addFirst(HAProxyMessageEncoder.INSTANCE); - } + configPipeline(inboundChannel, outboundChannel); SslHandler sslHandler = null; if (sslContext != null) { @@ -132,4 +129,11 @@ public class Http2ProtocolProxyHandler implements ProtocolHandler { } ctx.pipeline().addLast(new Http2ProxyFrontendHandler(outboundChannel, sslHandler)); } + + protected void configPipeline(Channel inboundChannel, Channel outboundChannel) { + if (inboundChannel.hasAttr(AttributeKeys.PROXY_PROTOCOL_ADDR)) { + inboundChannel.pipeline().addLast(new HAProxyMessageForwarder(outboundChannel)); + outboundChannel.pipeline().addFirst(HAProxyMessageEncoder.INSTANCE); + } + } } diff --git a/proxy/src/test/java/org/apache/rocketmq/proxy/grpc/ProxyAndTlsProtocolNegotiatorTest.java b/proxy/src/test/java/org/apache/rocketmq/proxy/grpc/ProxyAndTlsProtocolNegotiatorTest.java new file mode 100644 index 0000000000..699491f03d --- /dev/null +++ b/proxy/src/test/java/org/apache/rocketmq/proxy/grpc/ProxyAndTlsProtocolNegotiatorTest.java @@ -0,0 +1,49 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.rocketmq.proxy.grpc; + +import io.grpc.Attributes; +import io.grpc.netty.shaded.io.netty.buffer.ByteBuf; +import io.grpc.netty.shaded.io.netty.buffer.Unpooled; +import io.grpc.netty.shaded.io.netty.handler.codec.haproxy.HAProxyTLV; +import java.nio.charset.StandardCharsets; +import org.apache.rocketmq.proxy.config.ConfigurationManager; +import org.junit.Before; +import org.junit.Test; +import org.junit.runner.RunWith; +import org.mockito.junit.MockitoJUnitRunner; + +@RunWith(MockitoJUnitRunner.class) +public class ProxyAndTlsProtocolNegotiatorTest { + + private ProxyAndTlsProtocolNegotiator negotiator; + + @Before + public void setUp() throws Exception { + ConfigurationManager.intConfig(); + ConfigurationManager.getProxyConfig().setTlsTestModeEnable(true); + negotiator = new ProxyAndTlsProtocolNegotiator(); + } + + @Test + public void handleHAProxyTLV() { + ByteBuf content = Unpooled.buffer(); + content.writeBytes("xxxx".getBytes(StandardCharsets.UTF_8)); + HAProxyTLV haProxyTLV = new HAProxyTLV((byte) 0xE1, content); + negotiator.handleHAProxyTLV(haProxyTLV, Attributes.newBuilder()); + } +} \ No newline at end of file diff --git a/proxy/src/test/java/org/apache/rocketmq/proxy/remoting/protocol/http2proxy/HAProxyMessageForwarderTest.java b/proxy/src/test/java/org/apache/rocketmq/proxy/remoting/protocol/http2proxy/HAProxyMessageForwarderTest.java new file mode 100644 index 0000000000..f57116f0da --- /dev/null +++ b/proxy/src/test/java/org/apache/rocketmq/proxy/remoting/protocol/http2proxy/HAProxyMessageForwarderTest.java @@ -0,0 +1,47 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.rocketmq.proxy.remoting.protocol.http2proxy; + +import io.netty.channel.Channel; +import io.netty.handler.codec.haproxy.HAProxyTLV; +import org.apache.commons.codec.DecoderException; +import org.junit.Before; +import org.junit.Test; +import org.junit.runner.RunWith; +import org.mockito.Mock; +import org.mockito.junit.MockitoJUnitRunner; + +@RunWith(MockitoJUnitRunner.class) +public class HAProxyMessageForwarderTest { + + private HAProxyMessageForwarder haProxyMessageForwarder; + + @Mock + private Channel outboundChannel; + + @Before + public void setUp() throws Exception { + haProxyMessageForwarder = new HAProxyMessageForwarder(outboundChannel); + } + + @Test + public void buildHAProxyTLV() throws DecoderException { + HAProxyTLV haProxyTLV = haProxyMessageForwarder.buildHAProxyTLV("proxy_protocol_tlv_0xe1", "xxxx"); + assert haProxyTLV != null; + assert haProxyTLV.typeByteValue() == (byte) 0xe1; + } +} \ No newline at end of file diff --git a/proxy/src/test/java/org/apache/rocketmq/proxy/remoting/protocol/http2proxy/Http2ProtocolProxyHandlerTest.java b/proxy/src/test/java/org/apache/rocketmq/proxy/remoting/protocol/http2proxy/Http2ProtocolProxyHandlerTest.java new file mode 100644 index 0000000000..bf03786d34 --- /dev/null +++ b/proxy/src/test/java/org/apache/rocketmq/proxy/remoting/protocol/http2proxy/Http2ProtocolProxyHandlerTest.java @@ -0,0 +1,61 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.rocketmq.proxy.remoting.protocol.http2proxy; + +import io.netty.channel.Channel; +import io.netty.channel.ChannelPipeline; +import io.netty.handler.codec.haproxy.HAProxyMessageEncoder; +import org.apache.rocketmq.remoting.netty.AttributeKeys; +import org.junit.Before; +import org.junit.Test; +import org.junit.runner.RunWith; +import org.mockito.Mock; +import org.mockito.junit.MockitoJUnitRunner; + +import static org.mockito.ArgumentMatchers.any; +import static org.mockito.ArgumentMatchers.eq; +import static org.mockito.Mockito.when; + +@RunWith(MockitoJUnitRunner.class) +public class Http2ProtocolProxyHandlerTest { + + private Http2ProtocolProxyHandler http2ProtocolProxyHandler; + @Mock + private Channel inboundChannel; + @Mock + private ChannelPipeline inboundPipeline; + @Mock + private Channel outboundChannel; + @Mock + private ChannelPipeline outboundPipeline; + + @Before + public void setUp() throws Exception { + http2ProtocolProxyHandler = new Http2ProtocolProxyHandler(); + } + + @Test + public void configPipeline() { + when(inboundChannel.hasAttr(eq(AttributeKeys.PROXY_PROTOCOL_ADDR))).thenReturn(true); + when(inboundChannel.pipeline()).thenReturn(inboundPipeline); + when(inboundPipeline.addLast(any(HAProxyMessageForwarder.class))).thenReturn(inboundPipeline); + when(outboundChannel.pipeline()).thenReturn(outboundPipeline); + when(outboundPipeline.addFirst(any(HAProxyMessageEncoder.class))).thenReturn(outboundPipeline); + http2ProtocolProxyHandler.configPipeline(inboundChannel, outboundChannel); + } +} \ No newline at end of file diff --git a/remoting/src/main/java/org/apache/rocketmq/remoting/netty/NettyRemotingServer.java b/remoting/src/main/java/org/apache/rocketmq/remoting/netty/NettyRemotingServer.java index 7213b0c24f..51f8b85009 100644 --- a/remoting/src/main/java/org/apache/rocketmq/remoting/netty/NettyRemotingServer.java +++ b/remoting/src/main/java/org/apache/rocketmq/remoting/netty/NettyRemotingServer.java @@ -44,6 +44,7 @@ import io.netty.handler.codec.ProtocolDetectionState; import io.netty.handler.codec.haproxy.HAProxyMessage; import io.netty.handler.codec.haproxy.HAProxyMessageDecoder; import io.netty.handler.codec.haproxy.HAProxyProtocolVersion; +import io.netty.handler.codec.haproxy.HAProxyTLV; import io.netty.handler.timeout.IdleState; import io.netty.handler.timeout.IdleStateEvent; import io.netty.handler.timeout.IdleStateHandler; @@ -55,7 +56,6 @@ import io.netty.util.TimerTask; import io.netty.util.concurrent.DefaultEventExecutorGroup; import java.io.IOException; import java.net.InetSocketAddress; -import java.nio.charset.StandardCharsets; import java.security.cert.CertificateException; import java.time.Duration; import java.util.List; @@ -761,7 +761,7 @@ public class NettyRemotingServer extends NettyRemotingAbstract implements Remoti } } - public static class HAProxyMessageHandler extends ChannelInboundHandlerAdapter { + public class HAProxyMessageHandler extends ChannelInboundHandlerAdapter { @Override public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception { @@ -795,14 +795,7 @@ public class NettyRemotingServer extends NettyRemotingAbstract implements Remoti } if (CollectionUtils.isNotEmpty(msg.tlvs())) { msg.tlvs().forEach(tlv -> { - AttributeKey<String> key = AttributeKeys.valueOf( - HAProxyConstants.PROXY_PROTOCOL_TLV_PREFIX + String.format("%02x", tlv.typeByteValue())); - byte[] valueBytes = ByteBufUtil.getBytes(tlv.content()); - String value = StringUtils.trim(new String(valueBytes, CharsetUtil.UTF_8)); - if (!BinaryUtil.isAscii(value.getBytes(StandardCharsets.UTF_8))) { - return; - } - channel.attr(key).set(value); + handleHAProxyTLV(tlv, channel); }); } } finally { @@ -810,4 +803,14 @@ public class NettyRemotingServer extends NettyRemotingAbstract implements Remoti } } } + + protected void handleHAProxyTLV(HAProxyTLV tlv, Channel channel) { + byte[] valueBytes = ByteBufUtil.getBytes(tlv.content()); + if (!BinaryUtil.isAscii(valueBytes)) { + return; + } + AttributeKey<String> key = AttributeKeys.valueOf( + HAProxyConstants.PROXY_PROTOCOL_TLV_PREFIX + String.format("%02x", tlv.typeByteValue())); + channel.attr(key).set(new String(valueBytes, CharsetUtil.UTF_8)); + } } diff --git a/remoting/src/test/java/org/apache/rocketmq/remoting/netty/NettyRemotingServerTest.java b/remoting/src/test/java/org/apache/rocketmq/remoting/netty/NettyRemotingServerTest.java new file mode 100644 index 0000000000..c69fcebd45 --- /dev/null +++ b/remoting/src/test/java/org/apache/rocketmq/remoting/netty/NettyRemotingServerTest.java @@ -0,0 +1,63 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.rocketmq.remoting.netty; + +import io.netty.buffer.ByteBuf; +import io.netty.buffer.Unpooled; +import io.netty.channel.Channel; +import io.netty.handler.codec.haproxy.HAProxyTLV; +import io.netty.util.Attribute; +import io.netty.util.AttributeKey; +import java.nio.charset.StandardCharsets; +import org.junit.Before; +import org.junit.Test; +import org.junit.runner.RunWith; +import org.mockito.Mock; +import org.mockito.junit.MockitoJUnitRunner; + +import static org.mockito.ArgumentMatchers.any; +import static org.mockito.Mockito.doNothing; +import static org.mockito.Mockito.when; + +@RunWith(MockitoJUnitRunner.class) +public class NettyRemotingServerTest { + + private NettyRemotingServer nettyRemotingServer; + + @Mock + private Channel channel; + + @Mock + private Attribute attribute; + + @Before + public void setUp() throws Exception { + NettyServerConfig nettyServerConfig = new NettyServerConfig(); + nettyRemotingServer = new NettyRemotingServer(nettyServerConfig); + } + + @Test + public void handleHAProxyTLV() { + when(channel.attr(any(AttributeKey.class))).thenReturn(attribute); + doNothing().when(attribute).set(any()); + + ByteBuf content = Unpooled.buffer(); + content.writeBytes("xxxx".getBytes(StandardCharsets.UTF_8)); + HAProxyTLV haProxyTLV = new HAProxyTLV((byte) 0xE1, content); + nettyRemotingServer.handleHAProxyTLV(haProxyTLV, channel); + } +} \ No newline at end of file