This is an automated email from the ASF dual-hosted git repository.

lollipop pushed a commit to branch develop
in repository https://gitbox.apache.org/repos/asf/rocketmq.git


The following commit(s) were added to refs/heads/develop by this push:
     new 92e058c823 [ISSUE #7774] Make the handle of ppv2 tlv more extendable  
(#7775)
92e058c823 is described below

commit 92e058c823d7d064834710ff470a0d61dc067074
Author: dingshuangxi888 <dingshuangxi...@gmail.com>
AuthorDate: Wed Jan 24 09:50:15 2024 +0800

    [ISSUE #7774] Make the handle of ppv2 tlv more extendable  (#7775)
    
    * Fix ascii validate for ppv2 tls.
    * make the handle of ppv2 tlv extendable.
---
 .../proxy/grpc/ProxyAndTlsProtocolNegotiator.java  | 38 ++++++-------
 .../remoting/MultiProtocolRemotingServer.java      |  2 +-
 .../http2proxy/HAProxyMessageForwarder.java        | 23 +++++---
 .../http2proxy/Http2ProtocolProxyHandler.java      | 12 +++--
 .../grpc/ProxyAndTlsProtocolNegotiatorTest.java    | 49 +++++++++++++++++
 .../http2proxy/HAProxyMessageForwarderTest.java    | 47 ++++++++++++++++
 .../http2proxy/Http2ProtocolProxyHandlerTest.java  | 61 +++++++++++++++++++++
 .../remoting/netty/NettyRemotingServer.java        | 23 ++++----
 .../remoting/netty/NettyRemotingServerTest.java    | 63 ++++++++++++++++++++++
 9 files changed, 276 insertions(+), 42 deletions(-)

diff --git 
a/proxy/src/main/java/org/apache/rocketmq/proxy/grpc/ProxyAndTlsProtocolNegotiator.java
 
b/proxy/src/main/java/org/apache/rocketmq/proxy/grpc/ProxyAndTlsProtocolNegotiator.java
index cdf33165d7..7c92866803 100644
--- 
a/proxy/src/main/java/org/apache/rocketmq/proxy/grpc/ProxyAndTlsProtocolNegotiator.java
+++ 
b/proxy/src/main/java/org/apache/rocketmq/proxy/grpc/ProxyAndTlsProtocolNegotiator.java
@@ -34,6 +34,7 @@ import 
io.grpc.netty.shaded.io.netty.handler.codec.ProtocolDetectionState;
 import io.grpc.netty.shaded.io.netty.handler.codec.haproxy.HAProxyMessage;
 import 
io.grpc.netty.shaded.io.netty.handler.codec.haproxy.HAProxyMessageDecoder;
 import 
io.grpc.netty.shaded.io.netty.handler.codec.haproxy.HAProxyProtocolVersion;
+import io.grpc.netty.shaded.io.netty.handler.codec.haproxy.HAProxyTLV;
 import io.grpc.netty.shaded.io.netty.handler.ssl.ClientAuth;
 import io.grpc.netty.shaded.io.netty.handler.ssl.SslContext;
 import io.grpc.netty.shaded.io.netty.handler.ssl.SslHandler;
@@ -41,7 +42,10 @@ import 
io.grpc.netty.shaded.io.netty.handler.ssl.util.InsecureTrustManagerFactor
 import io.grpc.netty.shaded.io.netty.handler.ssl.util.SelfSignedCertificate;
 import io.grpc.netty.shaded.io.netty.util.AsciiString;
 import io.grpc.netty.shaded.io.netty.util.CharsetUtil;
-import java.nio.charset.StandardCharsets;
+import java.io.InputStream;
+import java.nio.file.Files;
+import java.nio.file.Paths;
+import java.util.List;
 import org.apache.commons.collections.CollectionUtils;
 import org.apache.commons.lang3.StringUtils;
 import org.apache.rocketmq.common.constant.HAProxyConstants;
@@ -55,11 +59,6 @@ import org.apache.rocketmq.proxy.grpc.constant.AttributeKeys;
 import org.apache.rocketmq.remoting.common.TlsMode;
 import org.apache.rocketmq.remoting.netty.TlsSystemConfig;
 
-import java.io.InputStream;
-import java.nio.file.Files;
-import java.nio.file.Paths;
-import java.util.List;
-
 public class ProxyAndTlsProtocolNegotiator implements 
InternalProtocolNegotiator.ProtocolNegotiator {
     protected static final Logger log = 
LoggerFactory.getLogger(LoggerName.PROXY_LOGGER_NAME);
 
@@ -123,7 +122,7 @@ public class ProxyAndTlsProtocolNegotiator implements 
InternalProtocolNegotiator
         }
     }
 
-    private static class ProxyAndTlsProtocolHandler extends 
ByteToMessageDecoder {
+    private class ProxyAndTlsProtocolHandler extends ByteToMessageDecoder {
 
         private final GrpcHttp2ConnectionHandler grpcHandler;
 
@@ -156,7 +155,7 @@ public class ProxyAndTlsProtocolNegotiator implements 
InternalProtocolNegotiator
         }
     }
 
-    private static class HAProxyMessageHandler extends 
ChannelInboundHandlerAdapter {
+    private class HAProxyMessageHandler extends ChannelInboundHandlerAdapter {
 
         private ProtocolNegotiationEvent pne = 
InternalProtocolNegotiationEvent.getDefault();
 
@@ -193,16 +192,7 @@ public class ProxyAndTlsProtocolNegotiator implements 
InternalProtocolNegotiator
                     builder.set(AttributeKeys.PROXY_PROTOCOL_SERVER_PORT, 
String.valueOf(msg.destinationPort()));
                 }
                 if (CollectionUtils.isNotEmpty(msg.tlvs())) {
-                    msg.tlvs().forEach(tlv -> {
-                        Attributes.Key<String> key = AttributeKeys.valueOf(
-                                HAProxyConstants.PROXY_PROTOCOL_TLV_PREFIX + 
String.format("%02x", tlv.typeByteValue()));
-                        byte[] valueBytes = 
ByteBufUtil.getBytes(tlv.content());
-                        String value = StringUtils.trim(new String(valueBytes, 
CharsetUtil.UTF_8));
-                        if 
(!BinaryUtil.isAscii(value.getBytes(StandardCharsets.UTF_8))) {
-                            return;
-                        }
-                        builder.set(key, value);
-                    });
+                    msg.tlvs().forEach(tlv -> handleHAProxyTLV(tlv, builder));
                 }
                 pne = InternalProtocolNegotiationEvent
                         
.withAttributes(InternalProtocolNegotiationEvent.getDefault(), builder.build());
@@ -212,7 +202,17 @@ public class ProxyAndTlsProtocolNegotiator implements 
InternalProtocolNegotiator
         }
     }
 
-    private static class TlsModeHandler extends ByteToMessageDecoder {
+    protected void handleHAProxyTLV(HAProxyTLV tlv, Attributes.Builder 
builder) {
+        byte[] valueBytes = ByteBufUtil.getBytes(tlv.content());
+        if (!BinaryUtil.isAscii(valueBytes)) {
+            return;
+        }
+        Attributes.Key<String> key = AttributeKeys.valueOf(
+            HAProxyConstants.PROXY_PROTOCOL_TLV_PREFIX + String.format("%02x", 
tlv.typeByteValue()));
+        builder.set(key, new String(valueBytes, CharsetUtil.UTF_8));
+    }
+
+    private class TlsModeHandler extends ByteToMessageDecoder {
 
         private ProtocolNegotiationEvent pne = 
InternalProtocolNegotiationEvent.getDefault();
 
diff --git 
a/proxy/src/main/java/org/apache/rocketmq/proxy/remoting/MultiProtocolRemotingServer.java
 
b/proxy/src/main/java/org/apache/rocketmq/proxy/remoting/MultiProtocolRemotingServer.java
index 12d728fff1..d7c2820b27 100644
--- 
a/proxy/src/main/java/org/apache/rocketmq/proxy/remoting/MultiProtocolRemotingServer.java
+++ 
b/proxy/src/main/java/org/apache/rocketmq/proxy/remoting/MultiProtocolRemotingServer.java
@@ -46,7 +46,7 @@ public class MultiProtocolRemotingServer extends 
NettyRemotingServer {
     private final NettyServerConfig nettyServerConfig;
 
     private final RemotingProtocolHandler remotingProtocolHandler;
-    private final Http2ProtocolProxyHandler http2ProtocolProxyHandler;
+    protected Http2ProtocolProxyHandler http2ProtocolProxyHandler;
 
     public MultiProtocolRemotingServer(NettyServerConfig nettyServerConfig, 
ChannelEventListener channelEventListener) {
         super(nettyServerConfig, channelEventListener);
diff --git 
a/proxy/src/main/java/org/apache/rocketmq/proxy/remoting/protocol/http2proxy/HAProxyMessageForwarder.java
 
b/proxy/src/main/java/org/apache/rocketmq/proxy/remoting/protocol/http2proxy/HAProxyMessageForwarder.java
index 8f139d3d9a..99cb99d530 100644
--- 
a/proxy/src/main/java/org/apache/rocketmq/proxy/remoting/protocol/http2proxy/HAProxyMessageForwarder.java
+++ 
b/proxy/src/main/java/org/apache/rocketmq/proxy/remoting/protocol/http2proxy/HAProxyMessageForwarder.java
@@ -29,6 +29,7 @@ import io.netty.handler.codec.haproxy.HAProxyProxiedProtocol;
 import io.netty.handler.codec.haproxy.HAProxyTLV;
 import io.netty.util.Attribute;
 import io.netty.util.DefaultAttributeMap;
+import org.apache.commons.codec.DecoderException;
 import org.apache.commons.codec.binary.Hex;
 import org.apache.commons.lang3.ArrayUtils;
 import org.apache.commons.lang3.StringUtils;
@@ -50,7 +51,7 @@ public class HAProxyMessageForwarder extends 
ChannelInboundHandlerAdapter {
     private static final Logger log = 
LoggerFactory.getLogger(LoggerName.ROCKETMQ_REMOTING_NAME);
 
     private static final Field FIELD_ATTRIBUTE =
-            FieldUtils.getField(DefaultAttributeMap.class, "attributes", true);
+        FieldUtils.getField(DefaultAttributeMap.class, "attributes", true);
 
     private final Channel outboundChannel;
 
@@ -111,19 +112,25 @@ public class HAProxyMessageForwarder extends 
ChannelInboundHandlerAdapter {
                 destinationPort = Integer.parseInt(attributeValue);
             }
             if (StringUtils.startsWith(attributeKey, 
HAProxyConstants.PROXY_PROTOCOL_TLV_PREFIX)) {
-                String typeString = StringUtils.substringAfter(attributeKey, 
HAProxyConstants.PROXY_PROTOCOL_TLV_PREFIX);
-                ByteBuf byteBuf = Unpooled.buffer();
-                
byteBuf.writeBytes(attributeValue.getBytes(Charset.defaultCharset()));
-                HAProxyTLV haProxyTLV = new 
HAProxyTLV(Hex.decodeHex(typeString)[0], byteBuf);
-                haProxyTLVs.add(haProxyTLV);
+                HAProxyTLV haProxyTLV = buildHAProxyTLV(attributeKey, 
attributeValue);
+                if (haProxyTLV != null) {
+                    haProxyTLVs.add(haProxyTLV);
+                }
             }
         }
 
         HAProxyProxiedProtocol proxiedProtocol = 
AclUtils.isColon(sourceAddress) ? HAProxyProxiedProtocol.TCP6 :
-                HAProxyProxiedProtocol.TCP4;
+            HAProxyProxiedProtocol.TCP4;
 
         HAProxyMessage message = new HAProxyMessage(HAProxyProtocolVersion.V2, 
HAProxyCommand.PROXY,
-                proxiedProtocol, sourceAddress, destinationAddress, 
sourcePort, destinationPort, haProxyTLVs);
+            proxiedProtocol, sourceAddress, destinationAddress, sourcePort, 
destinationPort, haProxyTLVs);
         outboundChannel.writeAndFlush(message).sync();
     }
+
+    protected HAProxyTLV buildHAProxyTLV(String attributeKey, String 
attributeValue) throws DecoderException {
+        String typeString = StringUtils.substringAfter(attributeKey, 
HAProxyConstants.PROXY_PROTOCOL_TLV_PREFIX);
+        ByteBuf byteBuf = Unpooled.buffer();
+        byteBuf.writeBytes(attributeValue.getBytes(Charset.defaultCharset()));
+        return new HAProxyTLV(Hex.decodeHex(typeString)[0], byteBuf);
+    }
 }
diff --git 
a/proxy/src/main/java/org/apache/rocketmq/proxy/remoting/protocol/http2proxy/Http2ProtocolProxyHandler.java
 
b/proxy/src/main/java/org/apache/rocketmq/proxy/remoting/protocol/http2proxy/Http2ProtocolProxyHandler.java
index c37db92af4..7ce563b030 100644
--- 
a/proxy/src/main/java/org/apache/rocketmq/proxy/remoting/protocol/http2proxy/Http2ProtocolProxyHandler.java
+++ 
b/proxy/src/main/java/org/apache/rocketmq/proxy/remoting/protocol/http2proxy/Http2ProtocolProxyHandler.java
@@ -121,10 +121,7 @@ public class Http2ProtocolProxyHandler implements 
ProtocolHandler {
         }
 
         final Channel outboundChannel = f.channel();
-        if (inboundChannel.hasAttr(AttributeKeys.PROXY_PROTOCOL_ADDR)) {
-            ctx.pipeline().addLast(new 
HAProxyMessageForwarder(outboundChannel));
-            
outboundChannel.pipeline().addFirst(HAProxyMessageEncoder.INSTANCE);
-        }
+        configPipeline(inboundChannel, outboundChannel);
 
         SslHandler sslHandler = null;
         if (sslContext != null) {
@@ -132,4 +129,11 @@ public class Http2ProtocolProxyHandler implements 
ProtocolHandler {
         }
         ctx.pipeline().addLast(new Http2ProxyFrontendHandler(outboundChannel, 
sslHandler));
     }
+
+    protected void configPipeline(Channel inboundChannel, Channel 
outboundChannel) {
+        if (inboundChannel.hasAttr(AttributeKeys.PROXY_PROTOCOL_ADDR)) {
+            inboundChannel.pipeline().addLast(new 
HAProxyMessageForwarder(outboundChannel));
+            
outboundChannel.pipeline().addFirst(HAProxyMessageEncoder.INSTANCE);
+        }
+    }
 }
diff --git 
a/proxy/src/test/java/org/apache/rocketmq/proxy/grpc/ProxyAndTlsProtocolNegotiatorTest.java
 
b/proxy/src/test/java/org/apache/rocketmq/proxy/grpc/ProxyAndTlsProtocolNegotiatorTest.java
new file mode 100644
index 0000000000..699491f03d
--- /dev/null
+++ 
b/proxy/src/test/java/org/apache/rocketmq/proxy/grpc/ProxyAndTlsProtocolNegotiatorTest.java
@@ -0,0 +1,49 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *  Unless required by applicable law or agreed to in writing, software
+ *  distributed under the License is distributed on an "AS IS" BASIS,
+ *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *  See the License for the specific language governing permissions and
+ *  limitations under the License.
+ */
+package org.apache.rocketmq.proxy.grpc;
+
+import io.grpc.Attributes;
+import io.grpc.netty.shaded.io.netty.buffer.ByteBuf;
+import io.grpc.netty.shaded.io.netty.buffer.Unpooled;
+import io.grpc.netty.shaded.io.netty.handler.codec.haproxy.HAProxyTLV;
+import java.nio.charset.StandardCharsets;
+import org.apache.rocketmq.proxy.config.ConfigurationManager;
+import org.junit.Before;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.mockito.junit.MockitoJUnitRunner;
+
+@RunWith(MockitoJUnitRunner.class)
+public class ProxyAndTlsProtocolNegotiatorTest {
+
+    private ProxyAndTlsProtocolNegotiator negotiator;
+
+    @Before
+    public void setUp() throws Exception {
+        ConfigurationManager.intConfig();
+        ConfigurationManager.getProxyConfig().setTlsTestModeEnable(true);
+        negotiator = new ProxyAndTlsProtocolNegotiator();
+    }
+
+    @Test
+    public void handleHAProxyTLV() {
+        ByteBuf content = Unpooled.buffer();
+        content.writeBytes("xxxx".getBytes(StandardCharsets.UTF_8));
+        HAProxyTLV haProxyTLV = new HAProxyTLV((byte) 0xE1, content);
+        negotiator.handleHAProxyTLV(haProxyTLV, Attributes.newBuilder());
+    }
+}
\ No newline at end of file
diff --git 
a/proxy/src/test/java/org/apache/rocketmq/proxy/remoting/protocol/http2proxy/HAProxyMessageForwarderTest.java
 
b/proxy/src/test/java/org/apache/rocketmq/proxy/remoting/protocol/http2proxy/HAProxyMessageForwarderTest.java
new file mode 100644
index 0000000000..f57116f0da
--- /dev/null
+++ 
b/proxy/src/test/java/org/apache/rocketmq/proxy/remoting/protocol/http2proxy/HAProxyMessageForwarderTest.java
@@ -0,0 +1,47 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *  Unless required by applicable law or agreed to in writing, software
+ *  distributed under the License is distributed on an "AS IS" BASIS,
+ *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *  See the License for the specific language governing permissions and
+ *  limitations under the License.
+ */
+package org.apache.rocketmq.proxy.remoting.protocol.http2proxy;
+
+import io.netty.channel.Channel;
+import io.netty.handler.codec.haproxy.HAProxyTLV;
+import org.apache.commons.codec.DecoderException;
+import org.junit.Before;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.mockito.Mock;
+import org.mockito.junit.MockitoJUnitRunner;
+
+@RunWith(MockitoJUnitRunner.class)
+public class HAProxyMessageForwarderTest {
+
+    private HAProxyMessageForwarder haProxyMessageForwarder;
+
+    @Mock
+    private Channel outboundChannel;
+
+    @Before
+    public void setUp() throws Exception {
+        haProxyMessageForwarder = new HAProxyMessageForwarder(outboundChannel);
+    }
+
+    @Test
+    public void buildHAProxyTLV() throws DecoderException {
+        HAProxyTLV haProxyTLV = 
haProxyMessageForwarder.buildHAProxyTLV("proxy_protocol_tlv_0xe1", "xxxx");
+        assert haProxyTLV != null;
+        assert haProxyTLV.typeByteValue() == (byte) 0xe1;
+    }
+}
\ No newline at end of file
diff --git 
a/proxy/src/test/java/org/apache/rocketmq/proxy/remoting/protocol/http2proxy/Http2ProtocolProxyHandlerTest.java
 
b/proxy/src/test/java/org/apache/rocketmq/proxy/remoting/protocol/http2proxy/Http2ProtocolProxyHandlerTest.java
new file mode 100644
index 0000000000..bf03786d34
--- /dev/null
+++ 
b/proxy/src/test/java/org/apache/rocketmq/proxy/remoting/protocol/http2proxy/Http2ProtocolProxyHandlerTest.java
@@ -0,0 +1,61 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *  Unless required by applicable law or agreed to in writing, software
+ *  distributed under the License is distributed on an "AS IS" BASIS,
+ *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *  See the License for the specific language governing permissions and
+ *  limitations under the License.
+ */
+
+package org.apache.rocketmq.proxy.remoting.protocol.http2proxy;
+
+import io.netty.channel.Channel;
+import io.netty.channel.ChannelPipeline;
+import io.netty.handler.codec.haproxy.HAProxyMessageEncoder;
+import org.apache.rocketmq.remoting.netty.AttributeKeys;
+import org.junit.Before;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.mockito.Mock;
+import org.mockito.junit.MockitoJUnitRunner;
+
+import static org.mockito.ArgumentMatchers.any;
+import static org.mockito.ArgumentMatchers.eq;
+import static org.mockito.Mockito.when;
+
+@RunWith(MockitoJUnitRunner.class)
+public class Http2ProtocolProxyHandlerTest {
+
+    private Http2ProtocolProxyHandler http2ProtocolProxyHandler;
+    @Mock
+    private Channel inboundChannel;
+    @Mock
+    private ChannelPipeline inboundPipeline;
+    @Mock
+    private Channel outboundChannel;
+    @Mock
+    private ChannelPipeline outboundPipeline;
+
+    @Before
+    public void setUp() throws Exception {
+        http2ProtocolProxyHandler = new Http2ProtocolProxyHandler();
+    }
+
+    @Test
+    public void configPipeline() {
+        
when(inboundChannel.hasAttr(eq(AttributeKeys.PROXY_PROTOCOL_ADDR))).thenReturn(true);
+        when(inboundChannel.pipeline()).thenReturn(inboundPipeline);
+        
when(inboundPipeline.addLast(any(HAProxyMessageForwarder.class))).thenReturn(inboundPipeline);
+        when(outboundChannel.pipeline()).thenReturn(outboundPipeline);
+        
when(outboundPipeline.addFirst(any(HAProxyMessageEncoder.class))).thenReturn(outboundPipeline);
+        http2ProtocolProxyHandler.configPipeline(inboundChannel, 
outboundChannel);
+    }
+}
\ No newline at end of file
diff --git 
a/remoting/src/main/java/org/apache/rocketmq/remoting/netty/NettyRemotingServer.java
 
b/remoting/src/main/java/org/apache/rocketmq/remoting/netty/NettyRemotingServer.java
index 7213b0c24f..51f8b85009 100644
--- 
a/remoting/src/main/java/org/apache/rocketmq/remoting/netty/NettyRemotingServer.java
+++ 
b/remoting/src/main/java/org/apache/rocketmq/remoting/netty/NettyRemotingServer.java
@@ -44,6 +44,7 @@ import io.netty.handler.codec.ProtocolDetectionState;
 import io.netty.handler.codec.haproxy.HAProxyMessage;
 import io.netty.handler.codec.haproxy.HAProxyMessageDecoder;
 import io.netty.handler.codec.haproxy.HAProxyProtocolVersion;
+import io.netty.handler.codec.haproxy.HAProxyTLV;
 import io.netty.handler.timeout.IdleState;
 import io.netty.handler.timeout.IdleStateEvent;
 import io.netty.handler.timeout.IdleStateHandler;
@@ -55,7 +56,6 @@ import io.netty.util.TimerTask;
 import io.netty.util.concurrent.DefaultEventExecutorGroup;
 import java.io.IOException;
 import java.net.InetSocketAddress;
-import java.nio.charset.StandardCharsets;
 import java.security.cert.CertificateException;
 import java.time.Duration;
 import java.util.List;
@@ -761,7 +761,7 @@ public class NettyRemotingServer extends 
NettyRemotingAbstract implements Remoti
         }
     }
 
-    public static class HAProxyMessageHandler extends 
ChannelInboundHandlerAdapter {
+    public class HAProxyMessageHandler extends ChannelInboundHandlerAdapter {
 
         @Override
         public void channelRead(ChannelHandlerContext ctx, Object msg) throws 
Exception {
@@ -795,14 +795,7 @@ public class NettyRemotingServer extends 
NettyRemotingAbstract implements Remoti
                 }
                 if (CollectionUtils.isNotEmpty(msg.tlvs())) {
                     msg.tlvs().forEach(tlv -> {
-                        AttributeKey<String> key = AttributeKeys.valueOf(
-                            HAProxyConstants.PROXY_PROTOCOL_TLV_PREFIX + 
String.format("%02x", tlv.typeByteValue()));
-                        byte[] valueBytes = 
ByteBufUtil.getBytes(tlv.content());
-                        String value = StringUtils.trim(new String(valueBytes, 
CharsetUtil.UTF_8));
-                        if 
(!BinaryUtil.isAscii(value.getBytes(StandardCharsets.UTF_8))) {
-                            return;
-                        }
-                        channel.attr(key).set(value);
+                        handleHAProxyTLV(tlv, channel);
                     });
                 }
             } finally {
@@ -810,4 +803,14 @@ public class NettyRemotingServer extends 
NettyRemotingAbstract implements Remoti
             }
         }
     }
+
+    protected void handleHAProxyTLV(HAProxyTLV tlv, Channel channel) {
+        byte[] valueBytes = ByteBufUtil.getBytes(tlv.content());
+        if (!BinaryUtil.isAscii(valueBytes)) {
+            return;
+        }
+        AttributeKey<String> key = AttributeKeys.valueOf(
+            HAProxyConstants.PROXY_PROTOCOL_TLV_PREFIX + String.format("%02x", 
tlv.typeByteValue()));
+        channel.attr(key).set(new String(valueBytes, CharsetUtil.UTF_8));
+    }
 }
diff --git 
a/remoting/src/test/java/org/apache/rocketmq/remoting/netty/NettyRemotingServerTest.java
 
b/remoting/src/test/java/org/apache/rocketmq/remoting/netty/NettyRemotingServerTest.java
new file mode 100644
index 0000000000..c69fcebd45
--- /dev/null
+++ 
b/remoting/src/test/java/org/apache/rocketmq/remoting/netty/NettyRemotingServerTest.java
@@ -0,0 +1,63 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *  Unless required by applicable law or agreed to in writing, software
+ *  distributed under the License is distributed on an "AS IS" BASIS,
+ *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *  See the License for the specific language governing permissions and
+ *  limitations under the License.
+ */
+package org.apache.rocketmq.remoting.netty;
+
+import io.netty.buffer.ByteBuf;
+import io.netty.buffer.Unpooled;
+import io.netty.channel.Channel;
+import io.netty.handler.codec.haproxy.HAProxyTLV;
+import io.netty.util.Attribute;
+import io.netty.util.AttributeKey;
+import java.nio.charset.StandardCharsets;
+import org.junit.Before;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.mockito.Mock;
+import org.mockito.junit.MockitoJUnitRunner;
+
+import static org.mockito.ArgumentMatchers.any;
+import static org.mockito.Mockito.doNothing;
+import static org.mockito.Mockito.when;
+
+@RunWith(MockitoJUnitRunner.class)
+public class NettyRemotingServerTest {
+
+    private NettyRemotingServer nettyRemotingServer;
+
+    @Mock
+    private Channel channel;
+
+    @Mock
+    private Attribute attribute;
+
+    @Before
+    public void setUp() throws Exception {
+        NettyServerConfig nettyServerConfig = new NettyServerConfig();
+        nettyRemotingServer = new NettyRemotingServer(nettyServerConfig);
+    }
+
+    @Test
+    public void handleHAProxyTLV() {
+        when(channel.attr(any(AttributeKey.class))).thenReturn(attribute);
+        doNothing().when(attribute).set(any());
+
+        ByteBuf content = Unpooled.buffer();
+        content.writeBytes("xxxx".getBytes(StandardCharsets.UTF_8));
+        HAProxyTLV haProxyTLV = new HAProxyTLV((byte) 0xE1, content);
+        nettyRemotingServer.handleHAProxyTLV(haProxyTLV, channel);
+    }
+}
\ No newline at end of file

Reply via email to