This is an automated email from the ASF dual-hosted git repository.

lizhimin pushed a commit to branch develop
in repository https://gitbox.apache.org/repos/asf/rocketmq.git


The following commit(s) were added to refs/heads/develop by this push:
     new 564e55ea58 [ISSUE #8970] Remove redundant heartbeats (#8971)
564e55ea58 is described below

commit 564e55ea58ba10e366d1136b5381f10e5a5c58e0
Author: weihubeats <we...@apache.org>
AuthorDate: Tue Dec 10 14:54:42 2024 +0800

    [ISSUE #8970] Remove redundant heartbeats (#8971)
---
 .../client/impl/factory/MQClientInstance.java      | 26 +++++++++++++++++-----
 1 file changed, 21 insertions(+), 5 deletions(-)

diff --git 
a/client/src/main/java/org/apache/rocketmq/client/impl/factory/MQClientInstance.java
 
b/client/src/main/java/org/apache/rocketmq/client/impl/factory/MQClientInstance.java
index 8cc910487c..eba654c22d 100644
--- 
a/client/src/main/java/org/apache/rocketmq/client/impl/factory/MQClientInstance.java
+++ 
b/client/src/main/java/org/apache/rocketmq/client/impl/factory/MQClientInstance.java
@@ -16,6 +16,7 @@
  */
 package org.apache.rocketmq.client.impl.factory;
 
+import com.alibaba.fastjson.JSON;
 import io.netty.channel.Channel;
 import java.util.Collections;
 import java.util.HashMap;
@@ -35,7 +36,6 @@ import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicLong;
 import java.util.concurrent.locks.Lock;
 import java.util.concurrent.locks.ReentrantLock;
-import com.alibaba.fastjson.JSON;
 import org.apache.commons.lang3.StringUtils;
 import org.apache.rocketmq.client.ClientConfig;
 import org.apache.rocketmq.client.admin.MQAdminExtInner;
@@ -66,6 +66,8 @@ import org.apache.rocketmq.common.message.MessageExt;
 import org.apache.rocketmq.common.message.MessageQueue;
 import org.apache.rocketmq.common.message.MessageQueueAssignment;
 import org.apache.rocketmq.common.topic.TopicValidator;
+import org.apache.rocketmq.logging.org.slf4j.Logger;
+import org.apache.rocketmq.logging.org.slf4j.LoggerFactory;
 import org.apache.rocketmq.remoting.ChannelEventListener;
 import org.apache.rocketmq.remoting.RPCHook;
 import org.apache.rocketmq.remoting.common.HeartbeatV2Result;
@@ -83,8 +85,6 @@ import 
org.apache.rocketmq.remoting.protocol.heartbeat.SubscriptionData;
 import org.apache.rocketmq.remoting.protocol.route.BrokerData;
 import org.apache.rocketmq.remoting.protocol.route.QueueData;
 import org.apache.rocketmq.remoting.protocol.route.TopicRouteData;
-import org.apache.rocketmq.logging.org.slf4j.Logger;
-import org.apache.rocketmq.logging.org.slf4j.LoggerFactory;
 
 import static 
org.apache.rocketmq.remoting.rpc.ClientMetadata.topicRouteData2EndpointsForStaticTopic;
 
@@ -157,7 +157,9 @@ public class MQClientInstance {
         ChannelEventListener channelEventListener;
         if (clientConfig.isEnableHeartbeatChannelEventListener()) {
             channelEventListener = new ChannelEventListener() {
+                
                 private final ConcurrentMap<String, HashMap<Long, String>> 
brokerAddrTable = MQClientInstance.this.brokerAddrTable;
+                
                 @Override
                 public void onChannelConnect(String remoteAddr, Channel 
channel) {
                 }
@@ -182,7 +184,7 @@ public class MQClientInstance {
                             if (addr.equals(remoteAddr)) {
                                 long id = entry.getKey();
                                 String brokerName = addressEntry.getKey();
-                                if (sendHeartbeatToBroker(id, brokerName, 
addr)) {
+                                if (sendHeartbeatToBroker(id, brokerName, 
addr, false)) {
                                     rebalanceImmediately();
                                 }
                                 break;
@@ -591,6 +593,18 @@ public class MQClientInstance {
     }
 
     public boolean sendHeartbeatToBroker(long id, String brokerName, String 
addr) {
+        return sendHeartbeatToBroker(id, brokerName, addr, true);
+    }
+
+    /**
+     * @param id
+     * @param brokerName
+     * @param addr
+     * @param strictLockMode When the connection is initially established, 
sending a heartbeat will simultaneously trigger the onChannelActive event to 
acquire the lock again, causing an exception. Therefore,
+     *                       the exception that occurs when sending the 
heartbeat during the initial onChannelActive event can be ignored.
+     * @return
+     */
+    public boolean sendHeartbeatToBroker(long id, String brokerName, String 
addr, boolean strictLockMode) {
         if (this.lockHeartbeat.tryLock()) {
             final HeartbeatData heartbeatDataWithSub = 
this.prepareHeartbeatData(false);
             final boolean producerEmpty = 
heartbeatDataWithSub.getProducerDataSet().isEmpty();
@@ -615,7 +629,9 @@ public class MQClientInstance {
                 this.lockHeartbeat.unlock();
             }
         } else {
-            log.warn("lock heartBeat, but failed. [{}]", this.clientId);
+            if (strictLockMode) {
+                log.warn("lock heartBeat, but failed. [{}]", this.clientId);
+            }
         }
         return false;
     }

Reply via email to