This is an automated email from the ASF dual-hosted git repository.

jinrongtong pushed a commit to branch develop
in repository https://gitbox.apache.org/repos/asf/rocketmq.git


The following commit(s) were added to refs/heads/develop by this push:
     new 620e6a2544 [ISSUE #7642] Add return value for sendHeartbeat related 
method
620e6a2544 is described below

commit 620e6a25441441b6430ce377ba1c5734a5cc7dfa
Author: Zhouxiang Zhan <zhouxz...@apache.org>
AuthorDate: Wed Dec 27 10:42:22 2023 +0800

    [ISSUE #7642] Add return value for sendHeartbeat related method
    
    Add sendHeartbeatToBroker
---
 .../impl/consumer/DefaultMQPushConsumerImpl.java   |   5 +-
 .../client/impl/factory/MQClientInstance.java      | 194 +++++++++++++--------
 2 files changed, 126 insertions(+), 73 deletions(-)

diff --git 
a/client/src/main/java/org/apache/rocketmq/client/impl/consumer/DefaultMQPushConsumerImpl.java
 
b/client/src/main/java/org/apache/rocketmq/client/impl/consumer/DefaultMQPushConsumerImpl.java
index cbde258655..15563a4f0e 100644
--- 
a/client/src/main/java/org/apache/rocketmq/client/impl/consumer/DefaultMQPushConsumerImpl.java
+++ 
b/client/src/main/java/org/apache/rocketmq/client/impl/consumer/DefaultMQPushConsumerImpl.java
@@ -999,8 +999,9 @@ public class DefaultMQPushConsumerImpl implements 
MQConsumerInner {
 
         this.updateTopicSubscribeInfoWhenSubscriptionChanged();
         this.mQClientFactory.checkClientInBroker();
-        this.mQClientFactory.sendHeartbeatToAllBrokerWithLock();
-        this.mQClientFactory.rebalanceImmediately();
+        if (this.mQClientFactory.sendHeartbeatToAllBrokerWithLock()) {
+            this.mQClientFactory.rebalanceImmediately();
+        }
     }
 
     private void checkConfig() throws MQClientException {
diff --git 
a/client/src/main/java/org/apache/rocketmq/client/impl/factory/MQClientInstance.java
 
b/client/src/main/java/org/apache/rocketmq/client/impl/factory/MQClientInstance.java
index ba72a6dce7..ad39372d35 100644
--- 
a/client/src/main/java/org/apache/rocketmq/client/impl/factory/MQClientInstance.java
+++ 
b/client/src/main/java/org/apache/rocketmq/client/impl/factory/MQClientInstance.java
@@ -176,9 +176,14 @@ public class MQClientInstance {
                 @Override
                 public void onChannelActive(String remoteAddr, Channel 
channel) {
                     for (Map.Entry<String, HashMap<Long, String>> addressEntry 
: brokerAddrTable.entrySet()) {
-                        for (String address : 
addressEntry.getValue().values()) {
-                            if (address.equals(remoteAddr)) {
-                                sendHeartbeatToAllBrokerWithLockV2(false);
+                        for (Map.Entry<Long, String> entry : 
addressEntry.getValue().entrySet()) {
+                            String addr = entry.getValue();
+                            if (addr.equals(remoteAddr)) {
+                                long id = entry.getKey();
+                                String brokerName = addressEntry.getKey();
+                                if (sendHeartbeatToBroker(id, brokerName, 
addr)) {
+                                    rebalanceImmediately();
+                                }
                                 break;
                             }
                         }
@@ -504,13 +509,13 @@ public class MQClientInstance {
         }
     }
 
-    public void sendHeartbeatToAllBrokerWithLockV2(boolean isRebalance) {
+    public boolean sendHeartbeatToAllBrokerWithLockV2(boolean isRebalance) {
         if (this.lockHeartbeat.tryLock()) {
             try {
                 if (clientConfig.isUseHeartbeatV2()) {
-                    this.sendHeartbeatToAllBrokerV2(isRebalance);
+                    return this.sendHeartbeatToAllBrokerV2(isRebalance);
                 } else {
-                    this.sendHeartbeatToAllBroker();
+                    return this.sendHeartbeatToAllBroker();
                 }
             } catch (final Exception e) {
                 log.error("sendHeartbeatToAllBrokerWithLockV2 exception", e);
@@ -520,15 +525,16 @@ public class MQClientInstance {
         } else {
             log.warn("sendHeartbeatToAllBrokerWithLockV2 lock heartBeat, but 
failed.");
         }
+        return false;
     }
 
-    public void sendHeartbeatToAllBrokerWithLock() {
+    public boolean sendHeartbeatToAllBrokerWithLock() {
         if (this.lockHeartbeat.tryLock()) {
             try {
                 if (clientConfig.isUseHeartbeatV2()) {
-                    this.sendHeartbeatToAllBrokerV2(false);
+                    return this.sendHeartbeatToAllBrokerV2(false);
                 } else {
-                    this.sendHeartbeatToAllBroker();
+                    return this.sendHeartbeatToAllBroker();
                 }
             } catch (final Exception e) {
                 log.error("sendHeartbeatToAllBroker exception", e);
@@ -538,6 +544,7 @@ public class MQClientInstance {
         } else {
             log.warn("lock heartBeat, but failed. [{}]", this.clientId);
         }
+        return false;
     }
 
     private void persistAllConsumerOffset() {
@@ -582,19 +589,72 @@ public class MQClientInstance {
         return false;
     }
 
-    private void sendHeartbeatToAllBroker() {
+    public boolean sendHeartbeatToBroker(long id, String brokerName, String 
addr) {
+        if (this.lockHeartbeat.tryLock()) {
+            final HeartbeatData heartbeatDataWithSub = 
this.prepareHeartbeatData(false);
+            final boolean producerEmpty = 
heartbeatDataWithSub.getProducerDataSet().isEmpty();
+            final boolean consumerEmpty = 
heartbeatDataWithSub.getConsumerDataSet().isEmpty();
+            if (producerEmpty && consumerEmpty) {
+                log.warn("sendHeartbeatToBroker sending heartbeat, but no 
consumer and no producer. [{}]", this.clientId);
+                return false;
+            }
+            try {
+                if (clientConfig.isUseHeartbeatV2()) {
+                    int currentHeartbeatFingerprint = 
heartbeatDataWithSub.computeHeartbeatFingerprint();
+                    
heartbeatDataWithSub.setHeartbeatFingerprint(currentHeartbeatFingerprint);
+                    HeartbeatData heartbeatDataWithoutSub = 
this.prepareHeartbeatData(true);
+                    
heartbeatDataWithoutSub.setHeartbeatFingerprint(currentHeartbeatFingerprint);
+                    return this.sendHeartbeatToBrokerV2(id, brokerName, addr, 
heartbeatDataWithSub, heartbeatDataWithoutSub, currentHeartbeatFingerprint);
+                } else {
+                    return this.sendHeartbeatToBroker(id, brokerName, addr, 
heartbeatDataWithSub);
+                }
+            } catch (final Exception e) {
+                log.error("sendHeartbeatToAllBroker exception", e);
+            } finally {
+                this.lockHeartbeat.unlock();
+            }
+        } else {
+            log.warn("lock heartBeat, but failed. [{}]", this.clientId);
+        }
+        return false;
+    }
+
+    private boolean sendHeartbeatToBroker(long id, String brokerName, String 
addr, HeartbeatData heartbeatData) {
+        try {
+            int version = this.mQClientAPIImpl.sendHeartbeat(addr, 
heartbeatData, clientConfig.getMqClientApiTimeout());
+            if (!this.brokerVersionTable.containsKey(brokerName)) {
+                this.brokerVersionTable.put(brokerName, new HashMap<>(4));
+            }
+            this.brokerVersionTable.get(brokerName).put(addr, version);
+            long times = this.sendHeartbeatTimesTotal.getAndIncrement();
+            if (times % 20 == 0) {
+                log.info("send heart beat to broker[{} {} {}] success", 
brokerName, id, addr);
+                log.info(heartbeatData.toString());
+            }
+            return true;
+        } catch (Exception e) {
+            if (this.isBrokerInNameServer(addr)) {
+                log.warn("send heart beat to broker[{} {} {}] failed", 
brokerName, id, addr, e);
+            } else {
+                log.warn("send heart beat to broker[{} {} {}] exception, 
because the broker not up, forget it", brokerName,
+                    id, addr, e);
+            }
+        }
+        return false;
+    }
+
+    private boolean sendHeartbeatToAllBroker() {
         final HeartbeatData heartbeatData = this.prepareHeartbeatData(false);
         final boolean producerEmpty = 
heartbeatData.getProducerDataSet().isEmpty();
         final boolean consumerEmpty = 
heartbeatData.getConsumerDataSet().isEmpty();
         if (producerEmpty && consumerEmpty) {
             log.warn("sending heartbeat, but no consumer and no producer. 
[{}]", this.clientId);
-            return;
+            return false;
         }
 
         if (this.brokerAddrTable.isEmpty()) {
-            return;
+            return false;
         }
-        long times = this.sendHeartbeatTimesTotal.getAndIncrement();
         for (Entry<String, HashMap<Long, String>> brokerClusterInfo : 
this.brokerAddrTable.entrySet()) {
             String brokerName = brokerClusterInfo.getKey();
             HashMap<Long, String> oneTable = brokerClusterInfo.getValue();
@@ -611,43 +671,71 @@ public class MQClientInstance {
                     continue;
                 }
 
-                try {
-                    int version = this.mQClientAPIImpl.sendHeartbeat(addr, 
heartbeatData, clientConfig.getMqClientApiTimeout());
-                    if (!this.brokerVersionTable.containsKey(brokerName)) {
-                        this.brokerVersionTable.put(brokerName, new 
HashMap<>(4));
-                    }
-                    this.brokerVersionTable.get(brokerName).put(addr, version);
-                    if (times % 20 == 0) {
-                        log.info("send heart beat to broker[{} {} {}] 
success", brokerName, id, addr);
-                        log.info(heartbeatData.toString());
-                    }
-                } catch (Exception e) {
-                    if (this.isBrokerInNameServer(addr)) {
-                        log.warn("send heart beat to broker[{} {} {}] failed", 
brokerName, id, addr, e);
-                    } else {
-                        log.warn("send heart beat to broker[{} {} {}] 
exception, because the broker not up, forget it", brokerName,
-                            id, addr, e);
+                sendHeartbeatToBroker(id, brokerName, addr, heartbeatData);
+            }
+        }
+        return true;
+    }
+
+    private boolean sendHeartbeatToBrokerV2(long id, String brokerName, String 
addr, HeartbeatData heartbeatDataWithSub,
+        HeartbeatData heartbeatDataWithoutSub, int 
currentHeartbeatFingerprint) {
+        try {
+            int version = 0;
+            boolean isBrokerSupportV2 = 
brokerSupportV2HeartbeatSet.contains(addr);
+            HeartbeatV2Result heartbeatV2Result = null;
+            if (isBrokerSupportV2 && null != 
brokerAddrHeartbeatFingerprintTable.get(addr) && 
brokerAddrHeartbeatFingerprintTable.get(addr) == currentHeartbeatFingerprint) {
+                heartbeatV2Result = this.mQClientAPIImpl.sendHeartbeatV2(addr, 
heartbeatDataWithoutSub, clientConfig.getMqClientApiTimeout());
+                if (heartbeatV2Result.isSubChange()) {
+                    brokerAddrHeartbeatFingerprintTable.remove(addr);
+                }
+                log.info("sendHeartbeatToAllBrokerV2 simple brokerName: {} 
subChange: {} brokerAddrHeartbeatFingerprintTable: {}", brokerName, 
heartbeatV2Result.isSubChange(), 
JSON.toJSONString(brokerAddrHeartbeatFingerprintTable));
+            } else {
+                heartbeatV2Result = this.mQClientAPIImpl.sendHeartbeatV2(addr, 
heartbeatDataWithSub, clientConfig.getMqClientApiTimeout());
+                if (heartbeatV2Result.isSupportV2()) {
+                    brokerSupportV2HeartbeatSet.add(addr);
+                    if (heartbeatV2Result.isSubChange()) {
+                        brokerAddrHeartbeatFingerprintTable.remove(addr);
+                    } else if 
(!brokerAddrHeartbeatFingerprintTable.containsKey(addr) || 
brokerAddrHeartbeatFingerprintTable.get(addr) != currentHeartbeatFingerprint) {
+                        brokerAddrHeartbeatFingerprintTable.put(addr, 
currentHeartbeatFingerprint);
                     }
                 }
+                log.info("sendHeartbeatToAllBrokerV2 normal brokerName: {} 
subChange: {} brokerAddrHeartbeatFingerprintTable: {}", brokerName, 
heartbeatV2Result.isSubChange(), 
JSON.toJSONString(brokerAddrHeartbeatFingerprintTable));
+            }
+            version = heartbeatV2Result.getVersion();
+            if (!this.brokerVersionTable.containsKey(brokerName)) {
+                this.brokerVersionTable.put(brokerName, new HashMap<>(4));
+            }
+            this.brokerVersionTable.get(brokerName).put(addr, version);
+            long times = this.sendHeartbeatTimesTotal.getAndIncrement();
+            if (times % 20 == 0) {
+                log.info("send heart beat to broker[{} {} {}] success", 
brokerName, id, addr);
+                log.info(heartbeatDataWithSub.toString());
+            }
+            return true;
+        } catch (Exception e) {
+            if (this.isBrokerInNameServer(addr)) {
+                log.warn("sendHeartbeatToAllBrokerV2 send heart beat to 
broker[{} {} {}] failed", brokerName, id, addr, e);
+            } else {
+                log.warn("sendHeartbeatToAllBrokerV2 send heart beat to 
broker[{} {} {}] exception, because the broker not up, forget it", brokerName, 
id, addr, e);
             }
         }
+        return false;
     }
 
-    private void sendHeartbeatToAllBrokerV2(boolean isRebalance) {
+    private boolean sendHeartbeatToAllBrokerV2(boolean isRebalance) {
         final HeartbeatData heartbeatDataWithSub = 
this.prepareHeartbeatData(false);
         final boolean producerEmpty = 
heartbeatDataWithSub.getProducerDataSet().isEmpty();
         final boolean consumerEmpty = 
heartbeatDataWithSub.getConsumerDataSet().isEmpty();
         if (producerEmpty && consumerEmpty) {
             log.warn("sendHeartbeatToAllBrokerV2 sending heartbeat, but no 
consumer and no producer. [{}]", this.clientId);
-            return;
+            return false;
         }
         if (this.brokerAddrTable.isEmpty()) {
-            return;
+            return false;
         }
         if (isRebalance) {
             resetBrokerAddrHeartbeatFingerprintMap();
         }
-        long times = this.sendHeartbeatTimesTotal.getAndIncrement();
         int currentHeartbeatFingerprint = 
heartbeatDataWithSub.computeHeartbeatFingerprint();
         
heartbeatDataWithSub.setHeartbeatFingerprint(currentHeartbeatFingerprint);
         HeartbeatData heartbeatDataWithoutSub = 
this.prepareHeartbeatData(true);
@@ -668,46 +756,10 @@ public class MQClientInstance {
                 if (consumerEmpty && MixAll.MASTER_ID != id) {
                     continue;
                 }
-                try {
-                    int version = 0;
-                    boolean isBrokerSupportV2 = 
brokerSupportV2HeartbeatSet.contains(addr);
-                    HeartbeatV2Result heartbeatV2Result = null;
-                    if (isBrokerSupportV2 && null != 
brokerAddrHeartbeatFingerprintTable.get(addr) && 
brokerAddrHeartbeatFingerprintTable.get(addr) == currentHeartbeatFingerprint) {
-                        heartbeatV2Result = 
this.mQClientAPIImpl.sendHeartbeatV2(addr, heartbeatDataWithoutSub, 
clientConfig.getMqClientApiTimeout());
-                        if (heartbeatV2Result.isSubChange()) {
-                            brokerAddrHeartbeatFingerprintTable.remove(addr);
-                        }
-                        log.info("sendHeartbeatToAllBrokerV2 simple 
brokerName: {} subChange: {} brokerAddrHeartbeatFingerprintTable: {}", 
brokerName, heartbeatV2Result.isSubChange(), 
JSON.toJSONString(brokerAddrHeartbeatFingerprintTable));
-                    } else {
-                        heartbeatV2Result = 
this.mQClientAPIImpl.sendHeartbeatV2(addr, heartbeatDataWithSub, 
clientConfig.getMqClientApiTimeout());
-                        if (heartbeatV2Result.isSupportV2()) {
-                            brokerSupportV2HeartbeatSet.add(addr);
-                            if (heartbeatV2Result.isSubChange()) {
-                                
brokerAddrHeartbeatFingerprintTable.remove(addr);
-                            } else if 
(!brokerAddrHeartbeatFingerprintTable.containsKey(addr) || 
brokerAddrHeartbeatFingerprintTable.get(addr) != currentHeartbeatFingerprint) {
-                                brokerAddrHeartbeatFingerprintTable.put(addr, 
currentHeartbeatFingerprint);
-                            }
-                        }
-                        log.info("sendHeartbeatToAllBrokerV2 normal 
brokerName: {} subChange: {} brokerAddrHeartbeatFingerprintTable: {}", 
brokerName, heartbeatV2Result.isSubChange(), 
JSON.toJSONString(brokerAddrHeartbeatFingerprintTable));
-                    }
-                    version = heartbeatV2Result.getVersion();
-                    if (!this.brokerVersionTable.containsKey(brokerName)) {
-                        this.brokerVersionTable.put(brokerName, new 
HashMap<>(4));
-                    }
-                    this.brokerVersionTable.get(brokerName).put(addr, version);
-                    if (times % 20 == 0) {
-                        log.info("send heart beat to broker[{} {} {}] 
success", brokerName, id, addr);
-                        log.info(heartbeatDataWithSub.toString());
-                    }
-                } catch (Exception e) {
-                    if (this.isBrokerInNameServer(addr)) {
-                        log.warn("sendHeartbeatToAllBrokerV2 send heart beat 
to broker[{} {} {}] failed", brokerName, id, addr, e);
-                    } else {
-                        log.warn("sendHeartbeatToAllBrokerV2 send heart beat 
to broker[{} {} {}] exception, because the broker not up, forget it", 
brokerName, id, addr, e);
-                    }
-                }
+                sendHeartbeatToBrokerV2(id, brokerName, addr, 
heartbeatDataWithSub, heartbeatDataWithoutSub, currentHeartbeatFingerprint);
             }
         }
+        return true;
     }
 
     public boolean updateTopicRouteInfoFromNameServer(final String topic, 
boolean isDefault,

Reply via email to