This is an automated email from the ASF dual-hosted git repository.

kaili pushed a commit to branch develop
in repository https://gitbox.apache.org/repos/asf/rocketmq.git


The following commit(s) were added to refs/heads/develop by this push:
     new 70a66eda2c Fix LEAK: HAProxyMessage.release() was not called before 
it's garbage-collected (#7025)
70a66eda2c is described below

commit 70a66eda2c08eb5fca38356659cb6de1ac75e25e
Author: ShuangxiDing <[email protected]>
AuthorDate: Fri Jul 14 16:46:40 2023 +0800

    Fix LEAK: HAProxyMessage.release() was not called before it's 
garbage-collected (#7025)
    
    Call HAProxyMessage.release() after reading it.
    
    ---------
    
    Co-authored-by: 徒钟 <[email protected]>
---
 .../proxy/grpc/ProxyAndTlsProtocolNegotiator.java  | 52 ++++++++++++----------
 .../remoting/netty/NettyRemotingServer.java        | 46 ++++++++++---------
 2 files changed, 53 insertions(+), 45 deletions(-)

diff --git 
a/proxy/src/main/java/org/apache/rocketmq/proxy/grpc/ProxyAndTlsProtocolNegotiator.java
 
b/proxy/src/main/java/org/apache/rocketmq/proxy/grpc/ProxyAndTlsProtocolNegotiator.java
index ceb9becc0c..ee167bd7be 100644
--- 
a/proxy/src/main/java/org/apache/rocketmq/proxy/grpc/ProxyAndTlsProtocolNegotiator.java
+++ 
b/proxy/src/main/java/org/apache/rocketmq/proxy/grpc/ProxyAndTlsProtocolNegotiator.java
@@ -160,7 +160,7 @@ public class ProxyAndTlsProtocolNegotiator implements 
InternalProtocolNegotiator
         @Override
         public void channelRead(ChannelHandlerContext ctx, Object msg) throws 
Exception {
             if (msg instanceof HAProxyMessage) {
-                replaceEventWithMessage((HAProxyMessage) msg);
+                handleWithMessage((HAProxyMessage) msg);
                 ctx.fireUserEventTriggered(pne);
             } else {
                 super.channelRead(ctx, msg);
@@ -174,30 +174,34 @@ public class ProxyAndTlsProtocolNegotiator implements 
InternalProtocolNegotiator
          *
          * @param msg
          */
-        private void replaceEventWithMessage(HAProxyMessage msg) {
-            Attributes.Builder builder = 
InternalProtocolNegotiationEvent.getAttributes(pne).toBuilder();
-            if (StringUtils.isNotBlank(msg.sourceAddress())) {
-                builder.set(AttributeKeys.PROXY_PROTOCOL_ADDR, 
msg.sourceAddress());
-            }
-            if (msg.sourcePort() > 0) {
-                builder.set(AttributeKeys.PROXY_PROTOCOL_PORT, 
String.valueOf(msg.sourcePort()));
-            }
-            if (StringUtils.isNotBlank(msg.destinationAddress())) {
-                builder.set(AttributeKeys.PROXY_PROTOCOL_SERVER_ADDR, 
msg.destinationAddress());
-            }
-            if (msg.destinationPort() > 0) {
-                builder.set(AttributeKeys.PROXY_PROTOCOL_SERVER_PORT, 
String.valueOf(msg.destinationPort()));
-            }
-            if (CollectionUtils.isNotEmpty(msg.tlvs())) {
-                msg.tlvs().forEach(tlv -> {
-                    Attributes.Key<String> key = AttributeKeys.valueOf(
-                            HAProxyConstants.PROXY_PROTOCOL_TLV_PREFIX + 
String.format("%02x", tlv.typeByteValue()));
-                    String value = 
StringUtils.trim(tlv.content().toString(CharsetUtil.UTF_8));
-                    builder.set(key, value);
-                });
+        private void handleWithMessage(HAProxyMessage msg) {
+            try {
+                Attributes.Builder builder = 
InternalProtocolNegotiationEvent.getAttributes(pne).toBuilder();
+                if (StringUtils.isNotBlank(msg.sourceAddress())) {
+                    builder.set(AttributeKeys.PROXY_PROTOCOL_ADDR, 
msg.sourceAddress());
+                }
+                if (msg.sourcePort() > 0) {
+                    builder.set(AttributeKeys.PROXY_PROTOCOL_PORT, 
String.valueOf(msg.sourcePort()));
+                }
+                if (StringUtils.isNotBlank(msg.destinationAddress())) {
+                    builder.set(AttributeKeys.PROXY_PROTOCOL_SERVER_ADDR, 
msg.destinationAddress());
+                }
+                if (msg.destinationPort() > 0) {
+                    builder.set(AttributeKeys.PROXY_PROTOCOL_SERVER_PORT, 
String.valueOf(msg.destinationPort()));
+                }
+                if (CollectionUtils.isNotEmpty(msg.tlvs())) {
+                    msg.tlvs().forEach(tlv -> {
+                        Attributes.Key<String> key = AttributeKeys.valueOf(
+                                HAProxyConstants.PROXY_PROTOCOL_TLV_PREFIX + 
String.format("%02x", tlv.typeByteValue()));
+                        String value = 
StringUtils.trim(tlv.content().toString(CharsetUtil.UTF_8));
+                        builder.set(key, value);
+                    });
+                }
+                pne = InternalProtocolNegotiationEvent
+                        
.withAttributes(InternalProtocolNegotiationEvent.getDefault(), builder.build());
+            } finally {
+                msg.release();
             }
-            pne = InternalProtocolNegotiationEvent
-                    
.withAttributes(InternalProtocolNegotiationEvent.getDefault(), builder.build());
         }
     }
 
diff --git 
a/remoting/src/main/java/org/apache/rocketmq/remoting/netty/NettyRemotingServer.java
 
b/remoting/src/main/java/org/apache/rocketmq/remoting/netty/NettyRemotingServer.java
index 8ae87a6fa5..90e358ce3b 100644
--- 
a/remoting/src/main/java/org/apache/rocketmq/remoting/netty/NettyRemotingServer.java
+++ 
b/remoting/src/main/java/org/apache/rocketmq/remoting/netty/NettyRemotingServer.java
@@ -758,7 +758,7 @@ public class NettyRemotingServer extends 
NettyRemotingAbstract implements Remoti
         @Override
         public void channelRead(ChannelHandlerContext ctx, Object msg) throws 
Exception {
             if (msg instanceof HAProxyMessage) {
-                fillChannelWithMessage((HAProxyMessage) msg, ctx.channel());
+                handleWithMessage((HAProxyMessage) msg, ctx.channel());
             } else {
                 super.channelRead(ctx, msg);
             }
@@ -771,26 +771,30 @@ public class NettyRemotingServer extends 
NettyRemotingAbstract implements Remoti
          * @param msg
          * @param channel
          */
-        private void fillChannelWithMessage(HAProxyMessage msg, Channel 
channel) {
-            if (StringUtils.isNotBlank(msg.sourceAddress())) {
-                
channel.attr(AttributeKeys.PROXY_PROTOCOL_ADDR).set(msg.sourceAddress());
-            }
-            if (msg.sourcePort() > 0) {
-                
channel.attr(AttributeKeys.PROXY_PROTOCOL_PORT).set(String.valueOf(msg.sourcePort()));
-            }
-            if (StringUtils.isNotBlank(msg.destinationAddress())) {
-                
channel.attr(AttributeKeys.PROXY_PROTOCOL_SERVER_ADDR).set(msg.destinationAddress());
-            }
-            if (msg.destinationPort() > 0) {
-                
channel.attr(AttributeKeys.PROXY_PROTOCOL_SERVER_PORT).set(String.valueOf(msg.destinationPort()));
-            }
-            if (CollectionUtils.isNotEmpty(msg.tlvs())) {
-                msg.tlvs().forEach(tlv -> {
-                    AttributeKey<String> key = AttributeKeys.valueOf(
-                            HAProxyConstants.PROXY_PROTOCOL_TLV_PREFIX + 
String.format("%02x", tlv.typeByteValue()));
-                    String value = 
StringUtils.trim(tlv.content().toString(CharsetUtil.UTF_8));
-                    channel.attr(key).set(value);
-                });
+        private void handleWithMessage(HAProxyMessage msg, Channel channel) {
+            try {
+                if (StringUtils.isNotBlank(msg.sourceAddress())) {
+                    
channel.attr(AttributeKeys.PROXY_PROTOCOL_ADDR).set(msg.sourceAddress());
+                }
+                if (msg.sourcePort() > 0) {
+                    
channel.attr(AttributeKeys.PROXY_PROTOCOL_PORT).set(String.valueOf(msg.sourcePort()));
+                }
+                if (StringUtils.isNotBlank(msg.destinationAddress())) {
+                    
channel.attr(AttributeKeys.PROXY_PROTOCOL_SERVER_ADDR).set(msg.destinationAddress());
+                }
+                if (msg.destinationPort() > 0) {
+                    
channel.attr(AttributeKeys.PROXY_PROTOCOL_SERVER_PORT).set(String.valueOf(msg.destinationPort()));
+                }
+                if (CollectionUtils.isNotEmpty(msg.tlvs())) {
+                    msg.tlvs().forEach(tlv -> {
+                        AttributeKey<String> key = AttributeKeys.valueOf(
+                                HAProxyConstants.PROXY_PROTOCOL_TLV_PREFIX + 
String.format("%02x", tlv.typeByteValue()));
+                        String value = 
StringUtils.trim(tlv.content().toString(CharsetUtil.UTF_8));
+                        channel.attr(key).set(value);
+                    });
+                }
+            } finally {
+                msg.release();
             }
         }
     }

Reply via email to