This is an automated email from the ASF dual-hosted git repository. lizhimin pushed a commit to branch develop in repository https://gitbox.apache.org/repos/asf/rocketmq.git
The following commit(s) were added to refs/heads/develop by this push: new 564e55ea58 [ISSUE #8970] Remove redundant heartbeats (#8971) 564e55ea58 is described below commit 564e55ea58ba10e366d1136b5381f10e5a5c58e0 Author: weihubeats <we...@apache.org> AuthorDate: Tue Dec 10 14:54:42 2024 +0800 [ISSUE #8970] Remove redundant heartbeats (#8971) --- .../client/impl/factory/MQClientInstance.java | 26 +++++++++++++++++----- 1 file changed, 21 insertions(+), 5 deletions(-) diff --git a/client/src/main/java/org/apache/rocketmq/client/impl/factory/MQClientInstance.java b/client/src/main/java/org/apache/rocketmq/client/impl/factory/MQClientInstance.java index 8cc910487c..eba654c22d 100644 --- a/client/src/main/java/org/apache/rocketmq/client/impl/factory/MQClientInstance.java +++ b/client/src/main/java/org/apache/rocketmq/client/impl/factory/MQClientInstance.java @@ -16,6 +16,7 @@ */ package org.apache.rocketmq.client.impl.factory; +import com.alibaba.fastjson.JSON; import io.netty.channel.Channel; import java.util.Collections; import java.util.HashMap; @@ -35,7 +36,6 @@ import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicLong; import java.util.concurrent.locks.Lock; import java.util.concurrent.locks.ReentrantLock; -import com.alibaba.fastjson.JSON; import org.apache.commons.lang3.StringUtils; import org.apache.rocketmq.client.ClientConfig; import org.apache.rocketmq.client.admin.MQAdminExtInner; @@ -66,6 +66,8 @@ import org.apache.rocketmq.common.message.MessageExt; import org.apache.rocketmq.common.message.MessageQueue; import org.apache.rocketmq.common.message.MessageQueueAssignment; import org.apache.rocketmq.common.topic.TopicValidator; +import org.apache.rocketmq.logging.org.slf4j.Logger; +import org.apache.rocketmq.logging.org.slf4j.LoggerFactory; import org.apache.rocketmq.remoting.ChannelEventListener; import org.apache.rocketmq.remoting.RPCHook; import org.apache.rocketmq.remoting.common.HeartbeatV2Result; @@ -83,8 +85,6 @@ import org.apache.rocketmq.remoting.protocol.heartbeat.SubscriptionData; import org.apache.rocketmq.remoting.protocol.route.BrokerData; import org.apache.rocketmq.remoting.protocol.route.QueueData; import org.apache.rocketmq.remoting.protocol.route.TopicRouteData; -import org.apache.rocketmq.logging.org.slf4j.Logger; -import org.apache.rocketmq.logging.org.slf4j.LoggerFactory; import static org.apache.rocketmq.remoting.rpc.ClientMetadata.topicRouteData2EndpointsForStaticTopic; @@ -157,7 +157,9 @@ public class MQClientInstance { ChannelEventListener channelEventListener; if (clientConfig.isEnableHeartbeatChannelEventListener()) { channelEventListener = new ChannelEventListener() { + private final ConcurrentMap<String, HashMap<Long, String>> brokerAddrTable = MQClientInstance.this.brokerAddrTable; + @Override public void onChannelConnect(String remoteAddr, Channel channel) { } @@ -182,7 +184,7 @@ public class MQClientInstance { if (addr.equals(remoteAddr)) { long id = entry.getKey(); String brokerName = addressEntry.getKey(); - if (sendHeartbeatToBroker(id, brokerName, addr)) { + if (sendHeartbeatToBroker(id, brokerName, addr, false)) { rebalanceImmediately(); } break; @@ -591,6 +593,18 @@ public class MQClientInstance { } public boolean sendHeartbeatToBroker(long id, String brokerName, String addr) { + return sendHeartbeatToBroker(id, brokerName, addr, true); + } + + /** + * @param id + * @param brokerName + * @param addr + * @param strictLockMode When the connection is initially established, sending a heartbeat will simultaneously trigger the onChannelActive event to acquire the lock again, causing an exception. Therefore, + * the exception that occurs when sending the heartbeat during the initial onChannelActive event can be ignored. + * @return + */ + public boolean sendHeartbeatToBroker(long id, String brokerName, String addr, boolean strictLockMode) { if (this.lockHeartbeat.tryLock()) { final HeartbeatData heartbeatDataWithSub = this.prepareHeartbeatData(false); final boolean producerEmpty = heartbeatDataWithSub.getProducerDataSet().isEmpty(); @@ -615,7 +629,9 @@ public class MQClientInstance { this.lockHeartbeat.unlock(); } } else { - log.warn("lock heartBeat, but failed. [{}]", this.clientId); + if (strictLockMode) { + log.warn("lock heartBeat, but failed. [{}]", this.clientId); + } } return false; }