This is an automated email from the ASF dual-hosted git repository. jinrongtong pushed a commit to branch develop in repository https://gitbox.apache.org/repos/asf/rocketmq.git
The following commit(s) were added to refs/heads/develop by this push: new 620e6a2544 [ISSUE #7642] Add return value for sendHeartbeat related method 620e6a2544 is described below commit 620e6a25441441b6430ce377ba1c5734a5cc7dfa Author: Zhouxiang Zhan <zhouxz...@apache.org> AuthorDate: Wed Dec 27 10:42:22 2023 +0800 [ISSUE #7642] Add return value for sendHeartbeat related method Add sendHeartbeatToBroker --- .../impl/consumer/DefaultMQPushConsumerImpl.java | 5 +- .../client/impl/factory/MQClientInstance.java | 194 +++++++++++++-------- 2 files changed, 126 insertions(+), 73 deletions(-) diff --git a/client/src/main/java/org/apache/rocketmq/client/impl/consumer/DefaultMQPushConsumerImpl.java b/client/src/main/java/org/apache/rocketmq/client/impl/consumer/DefaultMQPushConsumerImpl.java index cbde258655..15563a4f0e 100644 --- a/client/src/main/java/org/apache/rocketmq/client/impl/consumer/DefaultMQPushConsumerImpl.java +++ b/client/src/main/java/org/apache/rocketmq/client/impl/consumer/DefaultMQPushConsumerImpl.java @@ -999,8 +999,9 @@ public class DefaultMQPushConsumerImpl implements MQConsumerInner { this.updateTopicSubscribeInfoWhenSubscriptionChanged(); this.mQClientFactory.checkClientInBroker(); - this.mQClientFactory.sendHeartbeatToAllBrokerWithLock(); - this.mQClientFactory.rebalanceImmediately(); + if (this.mQClientFactory.sendHeartbeatToAllBrokerWithLock()) { + this.mQClientFactory.rebalanceImmediately(); + } } private void checkConfig() throws MQClientException { diff --git a/client/src/main/java/org/apache/rocketmq/client/impl/factory/MQClientInstance.java b/client/src/main/java/org/apache/rocketmq/client/impl/factory/MQClientInstance.java index ba72a6dce7..ad39372d35 100644 --- a/client/src/main/java/org/apache/rocketmq/client/impl/factory/MQClientInstance.java +++ b/client/src/main/java/org/apache/rocketmq/client/impl/factory/MQClientInstance.java @@ -176,9 +176,14 @@ public class MQClientInstance { @Override public void onChannelActive(String remoteAddr, Channel channel) { for (Map.Entry<String, HashMap<Long, String>> addressEntry : brokerAddrTable.entrySet()) { - for (String address : addressEntry.getValue().values()) { - if (address.equals(remoteAddr)) { - sendHeartbeatToAllBrokerWithLockV2(false); + for (Map.Entry<Long, String> entry : addressEntry.getValue().entrySet()) { + String addr = entry.getValue(); + if (addr.equals(remoteAddr)) { + long id = entry.getKey(); + String brokerName = addressEntry.getKey(); + if (sendHeartbeatToBroker(id, brokerName, addr)) { + rebalanceImmediately(); + } break; } } @@ -504,13 +509,13 @@ public class MQClientInstance { } } - public void sendHeartbeatToAllBrokerWithLockV2(boolean isRebalance) { + public boolean sendHeartbeatToAllBrokerWithLockV2(boolean isRebalance) { if (this.lockHeartbeat.tryLock()) { try { if (clientConfig.isUseHeartbeatV2()) { - this.sendHeartbeatToAllBrokerV2(isRebalance); + return this.sendHeartbeatToAllBrokerV2(isRebalance); } else { - this.sendHeartbeatToAllBroker(); + return this.sendHeartbeatToAllBroker(); } } catch (final Exception e) { log.error("sendHeartbeatToAllBrokerWithLockV2 exception", e); @@ -520,15 +525,16 @@ public class MQClientInstance { } else { log.warn("sendHeartbeatToAllBrokerWithLockV2 lock heartBeat, but failed."); } + return false; } - public void sendHeartbeatToAllBrokerWithLock() { + public boolean sendHeartbeatToAllBrokerWithLock() { if (this.lockHeartbeat.tryLock()) { try { if (clientConfig.isUseHeartbeatV2()) { - this.sendHeartbeatToAllBrokerV2(false); + return this.sendHeartbeatToAllBrokerV2(false); } else { - this.sendHeartbeatToAllBroker(); + return this.sendHeartbeatToAllBroker(); } } catch (final Exception e) { log.error("sendHeartbeatToAllBroker exception", e); @@ -538,6 +544,7 @@ public class MQClientInstance { } else { log.warn("lock heartBeat, but failed. [{}]", this.clientId); } + return false; } private void persistAllConsumerOffset() { @@ -582,19 +589,72 @@ public class MQClientInstance { return false; } - private void sendHeartbeatToAllBroker() { + public boolean sendHeartbeatToBroker(long id, String brokerName, String addr) { + if (this.lockHeartbeat.tryLock()) { + final HeartbeatData heartbeatDataWithSub = this.prepareHeartbeatData(false); + final boolean producerEmpty = heartbeatDataWithSub.getProducerDataSet().isEmpty(); + final boolean consumerEmpty = heartbeatDataWithSub.getConsumerDataSet().isEmpty(); + if (producerEmpty && consumerEmpty) { + log.warn("sendHeartbeatToBroker sending heartbeat, but no consumer and no producer. [{}]", this.clientId); + return false; + } + try { + if (clientConfig.isUseHeartbeatV2()) { + int currentHeartbeatFingerprint = heartbeatDataWithSub.computeHeartbeatFingerprint(); + heartbeatDataWithSub.setHeartbeatFingerprint(currentHeartbeatFingerprint); + HeartbeatData heartbeatDataWithoutSub = this.prepareHeartbeatData(true); + heartbeatDataWithoutSub.setHeartbeatFingerprint(currentHeartbeatFingerprint); + return this.sendHeartbeatToBrokerV2(id, brokerName, addr, heartbeatDataWithSub, heartbeatDataWithoutSub, currentHeartbeatFingerprint); + } else { + return this.sendHeartbeatToBroker(id, brokerName, addr, heartbeatDataWithSub); + } + } catch (final Exception e) { + log.error("sendHeartbeatToAllBroker exception", e); + } finally { + this.lockHeartbeat.unlock(); + } + } else { + log.warn("lock heartBeat, but failed. [{}]", this.clientId); + } + return false; + } + + private boolean sendHeartbeatToBroker(long id, String brokerName, String addr, HeartbeatData heartbeatData) { + try { + int version = this.mQClientAPIImpl.sendHeartbeat(addr, heartbeatData, clientConfig.getMqClientApiTimeout()); + if (!this.brokerVersionTable.containsKey(brokerName)) { + this.brokerVersionTable.put(brokerName, new HashMap<>(4)); + } + this.brokerVersionTable.get(brokerName).put(addr, version); + long times = this.sendHeartbeatTimesTotal.getAndIncrement(); + if (times % 20 == 0) { + log.info("send heart beat to broker[{} {} {}] success", brokerName, id, addr); + log.info(heartbeatData.toString()); + } + return true; + } catch (Exception e) { + if (this.isBrokerInNameServer(addr)) { + log.warn("send heart beat to broker[{} {} {}] failed", brokerName, id, addr, e); + } else { + log.warn("send heart beat to broker[{} {} {}] exception, because the broker not up, forget it", brokerName, + id, addr, e); + } + } + return false; + } + + private boolean sendHeartbeatToAllBroker() { final HeartbeatData heartbeatData = this.prepareHeartbeatData(false); final boolean producerEmpty = heartbeatData.getProducerDataSet().isEmpty(); final boolean consumerEmpty = heartbeatData.getConsumerDataSet().isEmpty(); if (producerEmpty && consumerEmpty) { log.warn("sending heartbeat, but no consumer and no producer. [{}]", this.clientId); - return; + return false; } if (this.brokerAddrTable.isEmpty()) { - return; + return false; } - long times = this.sendHeartbeatTimesTotal.getAndIncrement(); for (Entry<String, HashMap<Long, String>> brokerClusterInfo : this.brokerAddrTable.entrySet()) { String brokerName = brokerClusterInfo.getKey(); HashMap<Long, String> oneTable = brokerClusterInfo.getValue(); @@ -611,43 +671,71 @@ public class MQClientInstance { continue; } - try { - int version = this.mQClientAPIImpl.sendHeartbeat(addr, heartbeatData, clientConfig.getMqClientApiTimeout()); - if (!this.brokerVersionTable.containsKey(brokerName)) { - this.brokerVersionTable.put(brokerName, new HashMap<>(4)); - } - this.brokerVersionTable.get(brokerName).put(addr, version); - if (times % 20 == 0) { - log.info("send heart beat to broker[{} {} {}] success", brokerName, id, addr); - log.info(heartbeatData.toString()); - } - } catch (Exception e) { - if (this.isBrokerInNameServer(addr)) { - log.warn("send heart beat to broker[{} {} {}] failed", brokerName, id, addr, e); - } else { - log.warn("send heart beat to broker[{} {} {}] exception, because the broker not up, forget it", brokerName, - id, addr, e); + sendHeartbeatToBroker(id, brokerName, addr, heartbeatData); + } + } + return true; + } + + private boolean sendHeartbeatToBrokerV2(long id, String brokerName, String addr, HeartbeatData heartbeatDataWithSub, + HeartbeatData heartbeatDataWithoutSub, int currentHeartbeatFingerprint) { + try { + int version = 0; + boolean isBrokerSupportV2 = brokerSupportV2HeartbeatSet.contains(addr); + HeartbeatV2Result heartbeatV2Result = null; + if (isBrokerSupportV2 && null != brokerAddrHeartbeatFingerprintTable.get(addr) && brokerAddrHeartbeatFingerprintTable.get(addr) == currentHeartbeatFingerprint) { + heartbeatV2Result = this.mQClientAPIImpl.sendHeartbeatV2(addr, heartbeatDataWithoutSub, clientConfig.getMqClientApiTimeout()); + if (heartbeatV2Result.isSubChange()) { + brokerAddrHeartbeatFingerprintTable.remove(addr); + } + log.info("sendHeartbeatToAllBrokerV2 simple brokerName: {} subChange: {} brokerAddrHeartbeatFingerprintTable: {}", brokerName, heartbeatV2Result.isSubChange(), JSON.toJSONString(brokerAddrHeartbeatFingerprintTable)); + } else { + heartbeatV2Result = this.mQClientAPIImpl.sendHeartbeatV2(addr, heartbeatDataWithSub, clientConfig.getMqClientApiTimeout()); + if (heartbeatV2Result.isSupportV2()) { + brokerSupportV2HeartbeatSet.add(addr); + if (heartbeatV2Result.isSubChange()) { + brokerAddrHeartbeatFingerprintTable.remove(addr); + } else if (!brokerAddrHeartbeatFingerprintTable.containsKey(addr) || brokerAddrHeartbeatFingerprintTable.get(addr) != currentHeartbeatFingerprint) { + brokerAddrHeartbeatFingerprintTable.put(addr, currentHeartbeatFingerprint); } } + log.info("sendHeartbeatToAllBrokerV2 normal brokerName: {} subChange: {} brokerAddrHeartbeatFingerprintTable: {}", brokerName, heartbeatV2Result.isSubChange(), JSON.toJSONString(brokerAddrHeartbeatFingerprintTable)); + } + version = heartbeatV2Result.getVersion(); + if (!this.brokerVersionTable.containsKey(brokerName)) { + this.brokerVersionTable.put(brokerName, new HashMap<>(4)); + } + this.brokerVersionTable.get(brokerName).put(addr, version); + long times = this.sendHeartbeatTimesTotal.getAndIncrement(); + if (times % 20 == 0) { + log.info("send heart beat to broker[{} {} {}] success", brokerName, id, addr); + log.info(heartbeatDataWithSub.toString()); + } + return true; + } catch (Exception e) { + if (this.isBrokerInNameServer(addr)) { + log.warn("sendHeartbeatToAllBrokerV2 send heart beat to broker[{} {} {}] failed", brokerName, id, addr, e); + } else { + log.warn("sendHeartbeatToAllBrokerV2 send heart beat to broker[{} {} {}] exception, because the broker not up, forget it", brokerName, id, addr, e); } } + return false; } - private void sendHeartbeatToAllBrokerV2(boolean isRebalance) { + private boolean sendHeartbeatToAllBrokerV2(boolean isRebalance) { final HeartbeatData heartbeatDataWithSub = this.prepareHeartbeatData(false); final boolean producerEmpty = heartbeatDataWithSub.getProducerDataSet().isEmpty(); final boolean consumerEmpty = heartbeatDataWithSub.getConsumerDataSet().isEmpty(); if (producerEmpty && consumerEmpty) { log.warn("sendHeartbeatToAllBrokerV2 sending heartbeat, but no consumer and no producer. [{}]", this.clientId); - return; + return false; } if (this.brokerAddrTable.isEmpty()) { - return; + return false; } if (isRebalance) { resetBrokerAddrHeartbeatFingerprintMap(); } - long times = this.sendHeartbeatTimesTotal.getAndIncrement(); int currentHeartbeatFingerprint = heartbeatDataWithSub.computeHeartbeatFingerprint(); heartbeatDataWithSub.setHeartbeatFingerprint(currentHeartbeatFingerprint); HeartbeatData heartbeatDataWithoutSub = this.prepareHeartbeatData(true); @@ -668,46 +756,10 @@ public class MQClientInstance { if (consumerEmpty && MixAll.MASTER_ID != id) { continue; } - try { - int version = 0; - boolean isBrokerSupportV2 = brokerSupportV2HeartbeatSet.contains(addr); - HeartbeatV2Result heartbeatV2Result = null; - if (isBrokerSupportV2 && null != brokerAddrHeartbeatFingerprintTable.get(addr) && brokerAddrHeartbeatFingerprintTable.get(addr) == currentHeartbeatFingerprint) { - heartbeatV2Result = this.mQClientAPIImpl.sendHeartbeatV2(addr, heartbeatDataWithoutSub, clientConfig.getMqClientApiTimeout()); - if (heartbeatV2Result.isSubChange()) { - brokerAddrHeartbeatFingerprintTable.remove(addr); - } - log.info("sendHeartbeatToAllBrokerV2 simple brokerName: {} subChange: {} brokerAddrHeartbeatFingerprintTable: {}", brokerName, heartbeatV2Result.isSubChange(), JSON.toJSONString(brokerAddrHeartbeatFingerprintTable)); - } else { - heartbeatV2Result = this.mQClientAPIImpl.sendHeartbeatV2(addr, heartbeatDataWithSub, clientConfig.getMqClientApiTimeout()); - if (heartbeatV2Result.isSupportV2()) { - brokerSupportV2HeartbeatSet.add(addr); - if (heartbeatV2Result.isSubChange()) { - brokerAddrHeartbeatFingerprintTable.remove(addr); - } else if (!brokerAddrHeartbeatFingerprintTable.containsKey(addr) || brokerAddrHeartbeatFingerprintTable.get(addr) != currentHeartbeatFingerprint) { - brokerAddrHeartbeatFingerprintTable.put(addr, currentHeartbeatFingerprint); - } - } - log.info("sendHeartbeatToAllBrokerV2 normal brokerName: {} subChange: {} brokerAddrHeartbeatFingerprintTable: {}", brokerName, heartbeatV2Result.isSubChange(), JSON.toJSONString(brokerAddrHeartbeatFingerprintTable)); - } - version = heartbeatV2Result.getVersion(); - if (!this.brokerVersionTable.containsKey(brokerName)) { - this.brokerVersionTable.put(brokerName, new HashMap<>(4)); - } - this.brokerVersionTable.get(brokerName).put(addr, version); - if (times % 20 == 0) { - log.info("send heart beat to broker[{} {} {}] success", brokerName, id, addr); - log.info(heartbeatDataWithSub.toString()); - } - } catch (Exception e) { - if (this.isBrokerInNameServer(addr)) { - log.warn("sendHeartbeatToAllBrokerV2 send heart beat to broker[{} {} {}] failed", brokerName, id, addr, e); - } else { - log.warn("sendHeartbeatToAllBrokerV2 send heart beat to broker[{} {} {}] exception, because the broker not up, forget it", brokerName, id, addr, e); - } - } + sendHeartbeatToBrokerV2(id, brokerName, addr, heartbeatDataWithSub, heartbeatDataWithoutSub, currentHeartbeatFingerprint); } } + return true; } public boolean updateTopicRouteInfoFromNameServer(final String topic, boolean isDefault,