This is an automated email from the ASF dual-hosted git repository. zhangyang pushed a commit to branch develop in repository https://gitbox.apache.org/repos/asf/rocketmq.git
The following commit(s) were added to refs/heads/develop by this push: new e2a4b37 fix(client): fetch and commit offset use master broker firstly e2a4b37 is described below commit e2a4b3778519ef761dac9ce4963eb20905fb35f4 Author: lushilin <lushilin...@bytedance.com> AuthorDate: Wed Dec 8 15:58:42 2021 +0800 fix(client): fetch and commit offset use master broker firstly --- .../consumer/store/RemoteBrokerOffsetStore.java | 8 +++--- .../client/impl/factory/MQClientInstance.java | 30 ---------------------- .../store/RemoteBrokerOffsetStoreTest.java | 3 ++- 3 files changed, 6 insertions(+), 35 deletions(-) diff --git a/client/src/main/java/org/apache/rocketmq/client/consumer/store/RemoteBrokerOffsetStore.java b/client/src/main/java/org/apache/rocketmq/client/consumer/store/RemoteBrokerOffsetStore.java index 6b76238..15b5bec 100644 --- a/client/src/main/java/org/apache/rocketmq/client/consumer/store/RemoteBrokerOffsetStore.java +++ b/client/src/main/java/org/apache/rocketmq/client/consumer/store/RemoteBrokerOffsetStore.java @@ -199,10 +199,10 @@ public class RemoteBrokerOffsetStore implements OffsetStore { @Override public void updateConsumeOffsetToBroker(MessageQueue mq, long offset, boolean isOneway) throws RemotingException, MQBrokerException, InterruptedException, MQClientException { - FindBrokerResult findBrokerResult = this.mQClientFactory.findBrokerAddressInAdmin(mq.getBrokerName()); + FindBrokerResult findBrokerResult = this.mQClientFactory.findBrokerAddressInSubscribe(mq.getBrokerName(), MixAll.MASTER_ID, true); if (null == findBrokerResult) { this.mQClientFactory.updateTopicRouteInfoFromNameServer(mq.getTopic()); - findBrokerResult = this.mQClientFactory.findBrokerAddressInAdmin(mq.getBrokerName()); + findBrokerResult = this.mQClientFactory.findBrokerAddressInSubscribe(mq.getBrokerName(), MixAll.MASTER_ID, false); } if (findBrokerResult != null) { @@ -226,11 +226,11 @@ public class RemoteBrokerOffsetStore implements OffsetStore { private long fetchConsumeOffsetFromBroker(MessageQueue mq) throws RemotingException, MQBrokerException, InterruptedException, MQClientException { - FindBrokerResult findBrokerResult = this.mQClientFactory.findBrokerAddressInAdmin(mq.getBrokerName()); + FindBrokerResult findBrokerResult = this.mQClientFactory.findBrokerAddressInSubscribe(mq.getBrokerName(), MixAll.MASTER_ID, true); if (null == findBrokerResult) { this.mQClientFactory.updateTopicRouteInfoFromNameServer(mq.getTopic()); - findBrokerResult = this.mQClientFactory.findBrokerAddressInAdmin(mq.getBrokerName()); + findBrokerResult = this.mQClientFactory.findBrokerAddressInSubscribe(mq.getBrokerName(), MixAll.MASTER_ID, false); } if (findBrokerResult != null) { diff --git a/client/src/main/java/org/apache/rocketmq/client/impl/factory/MQClientInstance.java b/client/src/main/java/org/apache/rocketmq/client/impl/factory/MQClientInstance.java index 9651943..db81143 100644 --- a/client/src/main/java/org/apache/rocketmq/client/impl/factory/MQClientInstance.java +++ b/client/src/main/java/org/apache/rocketmq/client/impl/factory/MQClientInstance.java @@ -967,36 +967,6 @@ public class MQClientInstance { return this.consumerTable.get(group); } - public FindBrokerResult findBrokerAddressInAdmin(final String brokerName) { - String brokerAddr = null; - boolean slave = false; - boolean found = false; - - HashMap<Long/* brokerId */, String/* address */> map = this.brokerAddrTable.get(brokerName); - if (map != null && !map.isEmpty()) { - for (Map.Entry<Long, String> entry : map.entrySet()) { - Long id = entry.getKey(); - brokerAddr = entry.getValue(); - if (brokerAddr != null) { - found = true; - if (MixAll.MASTER_ID == id) { - slave = false; - } else { - slave = true; - } - break; - - } - } // end of for - } - - if (found) { - return new FindBrokerResult(brokerAddr, slave, findBrokerVersion(brokerName, brokerAddr)); - } - - return null; - } - public String findBrokerAddressInPublish(final String brokerName) { HashMap<Long/* brokerId */, String/* address */> map = this.brokerAddrTable.get(brokerName); if (map != null && !map.isEmpty()) { diff --git a/client/src/test/java/org/apache/rocketmq/client/consumer/store/RemoteBrokerOffsetStoreTest.java b/client/src/test/java/org/apache/rocketmq/client/consumer/store/RemoteBrokerOffsetStoreTest.java index f762910..ec7a4cf 100644 --- a/client/src/test/java/org/apache/rocketmq/client/consumer/store/RemoteBrokerOffsetStoreTest.java +++ b/client/src/test/java/org/apache/rocketmq/client/consumer/store/RemoteBrokerOffsetStoreTest.java @@ -23,6 +23,7 @@ import org.apache.rocketmq.client.exception.MQBrokerException; import org.apache.rocketmq.client.impl.FindBrokerResult; import org.apache.rocketmq.client.impl.MQClientAPIImpl; import org.apache.rocketmq.client.impl.factory.MQClientInstance; +import org.apache.rocketmq.common.MixAll; import org.apache.rocketmq.common.message.MessageQueue; import org.apache.rocketmq.common.protocol.header.QueryConsumerOffsetRequestHeader; import org.apache.rocketmq.common.protocol.header.UpdateConsumerOffsetRequestHeader; @@ -58,7 +59,7 @@ public class RemoteBrokerOffsetStoreTest { System.setProperty("rocketmq.client.localOffsetStoreDir", System.getProperty("java.io.tmpdir") + ".rocketmq_offsets"); String clientId = new ClientConfig().buildMQClientId() + "#TestNamespace" + System.currentTimeMillis(); when(mQClientFactory.getClientId()).thenReturn(clientId); - when(mQClientFactory.findBrokerAddressInAdmin(brokerName)).thenReturn(new FindBrokerResult("127.0.0.1", false)); + when(mQClientFactory.findBrokerAddressInSubscribe(brokerName, MixAll.MASTER_ID, false)).thenReturn(new FindBrokerResult("127.0.0.1", false)); when(mQClientFactory.getMQClientAPIImpl()).thenReturn(mqClientAPI); }