This is an automated email from the ASF dual-hosted git repository.
kaili pushed a commit to branch develop
in repository https://gitbox.apache.org/repos/asf/rocketmq.git
The following commit(s) were added to refs/heads/develop by this push:
new 70a66eda2c Fix LEAK: HAProxyMessage.release() was not called before
it's garbage-collected (#7025)
70a66eda2c is described below
commit 70a66eda2c08eb5fca38356659cb6de1ac75e25e
Author: ShuangxiDing <[email protected]>
AuthorDate: Fri Jul 14 16:46:40 2023 +0800
Fix LEAK: HAProxyMessage.release() was not called before it's
garbage-collected (#7025)
Call HAProxyMessage.release() after reading it.
---------
Co-authored-by: 徒钟 <[email protected]>
---
.../proxy/grpc/ProxyAndTlsProtocolNegotiator.java | 52 ++++++++++++----------
.../remoting/netty/NettyRemotingServer.java | 46 ++++++++++---------
2 files changed, 53 insertions(+), 45 deletions(-)
diff --git
a/proxy/src/main/java/org/apache/rocketmq/proxy/grpc/ProxyAndTlsProtocolNegotiator.java
b/proxy/src/main/java/org/apache/rocketmq/proxy/grpc/ProxyAndTlsProtocolNegotiator.java
index ceb9becc0c..ee167bd7be 100644
---
a/proxy/src/main/java/org/apache/rocketmq/proxy/grpc/ProxyAndTlsProtocolNegotiator.java
+++
b/proxy/src/main/java/org/apache/rocketmq/proxy/grpc/ProxyAndTlsProtocolNegotiator.java
@@ -160,7 +160,7 @@ public class ProxyAndTlsProtocolNegotiator implements
InternalProtocolNegotiator
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) throws
Exception {
if (msg instanceof HAProxyMessage) {
- replaceEventWithMessage((HAProxyMessage) msg);
+ handleWithMessage((HAProxyMessage) msg);
ctx.fireUserEventTriggered(pne);
} else {
super.channelRead(ctx, msg);
@@ -174,30 +174,34 @@ public class ProxyAndTlsProtocolNegotiator implements
InternalProtocolNegotiator
*
* @param msg
*/
- private void replaceEventWithMessage(HAProxyMessage msg) {
- Attributes.Builder builder =
InternalProtocolNegotiationEvent.getAttributes(pne).toBuilder();
- if (StringUtils.isNotBlank(msg.sourceAddress())) {
- builder.set(AttributeKeys.PROXY_PROTOCOL_ADDR,
msg.sourceAddress());
- }
- if (msg.sourcePort() > 0) {
- builder.set(AttributeKeys.PROXY_PROTOCOL_PORT,
String.valueOf(msg.sourcePort()));
- }
- if (StringUtils.isNotBlank(msg.destinationAddress())) {
- builder.set(AttributeKeys.PROXY_PROTOCOL_SERVER_ADDR,
msg.destinationAddress());
- }
- if (msg.destinationPort() > 0) {
- builder.set(AttributeKeys.PROXY_PROTOCOL_SERVER_PORT,
String.valueOf(msg.destinationPort()));
- }
- if (CollectionUtils.isNotEmpty(msg.tlvs())) {
- msg.tlvs().forEach(tlv -> {
- Attributes.Key<String> key = AttributeKeys.valueOf(
- HAProxyConstants.PROXY_PROTOCOL_TLV_PREFIX +
String.format("%02x", tlv.typeByteValue()));
- String value =
StringUtils.trim(tlv.content().toString(CharsetUtil.UTF_8));
- builder.set(key, value);
- });
+ private void handleWithMessage(HAProxyMessage msg) {
+ try {
+ Attributes.Builder builder =
InternalProtocolNegotiationEvent.getAttributes(pne).toBuilder();
+ if (StringUtils.isNotBlank(msg.sourceAddress())) {
+ builder.set(AttributeKeys.PROXY_PROTOCOL_ADDR,
msg.sourceAddress());
+ }
+ if (msg.sourcePort() > 0) {
+ builder.set(AttributeKeys.PROXY_PROTOCOL_PORT,
String.valueOf(msg.sourcePort()));
+ }
+ if (StringUtils.isNotBlank(msg.destinationAddress())) {
+ builder.set(AttributeKeys.PROXY_PROTOCOL_SERVER_ADDR,
msg.destinationAddress());
+ }
+ if (msg.destinationPort() > 0) {
+ builder.set(AttributeKeys.PROXY_PROTOCOL_SERVER_PORT,
String.valueOf(msg.destinationPort()));
+ }
+ if (CollectionUtils.isNotEmpty(msg.tlvs())) {
+ msg.tlvs().forEach(tlv -> {
+ Attributes.Key<String> key = AttributeKeys.valueOf(
+ HAProxyConstants.PROXY_PROTOCOL_TLV_PREFIX +
String.format("%02x", tlv.typeByteValue()));
+ String value =
StringUtils.trim(tlv.content().toString(CharsetUtil.UTF_8));
+ builder.set(key, value);
+ });
+ }
+ pne = InternalProtocolNegotiationEvent
+
.withAttributes(InternalProtocolNegotiationEvent.getDefault(), builder.build());
+ } finally {
+ msg.release();
}
- pne = InternalProtocolNegotiationEvent
-
.withAttributes(InternalProtocolNegotiationEvent.getDefault(), builder.build());
}
}
diff --git
a/remoting/src/main/java/org/apache/rocketmq/remoting/netty/NettyRemotingServer.java
b/remoting/src/main/java/org/apache/rocketmq/remoting/netty/NettyRemotingServer.java
index 8ae87a6fa5..90e358ce3b 100644
---
a/remoting/src/main/java/org/apache/rocketmq/remoting/netty/NettyRemotingServer.java
+++
b/remoting/src/main/java/org/apache/rocketmq/remoting/netty/NettyRemotingServer.java
@@ -758,7 +758,7 @@ public class NettyRemotingServer extends
NettyRemotingAbstract implements Remoti
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) throws
Exception {
if (msg instanceof HAProxyMessage) {
- fillChannelWithMessage((HAProxyMessage) msg, ctx.channel());
+ handleWithMessage((HAProxyMessage) msg, ctx.channel());
} else {
super.channelRead(ctx, msg);
}
@@ -771,26 +771,30 @@ public class NettyRemotingServer extends
NettyRemotingAbstract implements Remoti
* @param msg
* @param channel
*/
- private void fillChannelWithMessage(HAProxyMessage msg, Channel
channel) {
- if (StringUtils.isNotBlank(msg.sourceAddress())) {
-
channel.attr(AttributeKeys.PROXY_PROTOCOL_ADDR).set(msg.sourceAddress());
- }
- if (msg.sourcePort() > 0) {
-
channel.attr(AttributeKeys.PROXY_PROTOCOL_PORT).set(String.valueOf(msg.sourcePort()));
- }
- if (StringUtils.isNotBlank(msg.destinationAddress())) {
-
channel.attr(AttributeKeys.PROXY_PROTOCOL_SERVER_ADDR).set(msg.destinationAddress());
- }
- if (msg.destinationPort() > 0) {
-
channel.attr(AttributeKeys.PROXY_PROTOCOL_SERVER_PORT).set(String.valueOf(msg.destinationPort()));
- }
- if (CollectionUtils.isNotEmpty(msg.tlvs())) {
- msg.tlvs().forEach(tlv -> {
- AttributeKey<String> key = AttributeKeys.valueOf(
- HAProxyConstants.PROXY_PROTOCOL_TLV_PREFIX +
String.format("%02x", tlv.typeByteValue()));
- String value =
StringUtils.trim(tlv.content().toString(CharsetUtil.UTF_8));
- channel.attr(key).set(value);
- });
+ private void handleWithMessage(HAProxyMessage msg, Channel channel) {
+ try {
+ if (StringUtils.isNotBlank(msg.sourceAddress())) {
+
channel.attr(AttributeKeys.PROXY_PROTOCOL_ADDR).set(msg.sourceAddress());
+ }
+ if (msg.sourcePort() > 0) {
+
channel.attr(AttributeKeys.PROXY_PROTOCOL_PORT).set(String.valueOf(msg.sourcePort()));
+ }
+ if (StringUtils.isNotBlank(msg.destinationAddress())) {
+
channel.attr(AttributeKeys.PROXY_PROTOCOL_SERVER_ADDR).set(msg.destinationAddress());
+ }
+ if (msg.destinationPort() > 0) {
+
channel.attr(AttributeKeys.PROXY_PROTOCOL_SERVER_PORT).set(String.valueOf(msg.destinationPort()));
+ }
+ if (CollectionUtils.isNotEmpty(msg.tlvs())) {
+ msg.tlvs().forEach(tlv -> {
+ AttributeKey<String> key = AttributeKeys.valueOf(
+ HAProxyConstants.PROXY_PROTOCOL_TLV_PREFIX +
String.format("%02x", tlv.typeByteValue()));
+ String value =
StringUtils.trim(tlv.content().toString(CharsetUtil.UTF_8));
+ channel.attr(key).set(value);
+ });
+ }
+ } finally {
+ msg.release();
}
}
}