This is an automated email from the ASF dual-hosted git repository.

zhangyang pushed a commit to branch develop
in repository https://gitbox.apache.org/repos/asf/rocketmq.git


The following commit(s) were added to refs/heads/develop by this push:
     new e2a4b37  fix(client): fetch and commit offset use master broker firstly
e2a4b37 is described below

commit e2a4b3778519ef761dac9ce4963eb20905fb35f4
Author: lushilin <lushilin...@bytedance.com>
AuthorDate: Wed Dec 8 15:58:42 2021 +0800

    fix(client): fetch and commit offset use master broker firstly
---
 .../consumer/store/RemoteBrokerOffsetStore.java    |  8 +++---
 .../client/impl/factory/MQClientInstance.java      | 30 ----------------------
 .../store/RemoteBrokerOffsetStoreTest.java         |  3 ++-
 3 files changed, 6 insertions(+), 35 deletions(-)

diff --git 
a/client/src/main/java/org/apache/rocketmq/client/consumer/store/RemoteBrokerOffsetStore.java
 
b/client/src/main/java/org/apache/rocketmq/client/consumer/store/RemoteBrokerOffsetStore.java
index 6b76238..15b5bec 100644
--- 
a/client/src/main/java/org/apache/rocketmq/client/consumer/store/RemoteBrokerOffsetStore.java
+++ 
b/client/src/main/java/org/apache/rocketmq/client/consumer/store/RemoteBrokerOffsetStore.java
@@ -199,10 +199,10 @@ public class RemoteBrokerOffsetStore implements 
OffsetStore {
     @Override
     public void updateConsumeOffsetToBroker(MessageQueue mq, long offset, 
boolean isOneway) throws RemotingException,
         MQBrokerException, InterruptedException, MQClientException {
-        FindBrokerResult findBrokerResult = 
this.mQClientFactory.findBrokerAddressInAdmin(mq.getBrokerName());
+        FindBrokerResult findBrokerResult = 
this.mQClientFactory.findBrokerAddressInSubscribe(mq.getBrokerName(), 
MixAll.MASTER_ID, true);
         if (null == findBrokerResult) {
             
this.mQClientFactory.updateTopicRouteInfoFromNameServer(mq.getTopic());
-            findBrokerResult = 
this.mQClientFactory.findBrokerAddressInAdmin(mq.getBrokerName());
+            findBrokerResult = 
this.mQClientFactory.findBrokerAddressInSubscribe(mq.getBrokerName(), 
MixAll.MASTER_ID, false);
         }
 
         if (findBrokerResult != null) {
@@ -226,11 +226,11 @@ public class RemoteBrokerOffsetStore implements 
OffsetStore {
 
     private long fetchConsumeOffsetFromBroker(MessageQueue mq) throws 
RemotingException, MQBrokerException,
         InterruptedException, MQClientException {
-        FindBrokerResult findBrokerResult = 
this.mQClientFactory.findBrokerAddressInAdmin(mq.getBrokerName());
+        FindBrokerResult findBrokerResult = 
this.mQClientFactory.findBrokerAddressInSubscribe(mq.getBrokerName(), 
MixAll.MASTER_ID, true);
         if (null == findBrokerResult) {
 
             
this.mQClientFactory.updateTopicRouteInfoFromNameServer(mq.getTopic());
-            findBrokerResult = 
this.mQClientFactory.findBrokerAddressInAdmin(mq.getBrokerName());
+            findBrokerResult = 
this.mQClientFactory.findBrokerAddressInSubscribe(mq.getBrokerName(), 
MixAll.MASTER_ID, false);
         }
 
         if (findBrokerResult != null) {
diff --git 
a/client/src/main/java/org/apache/rocketmq/client/impl/factory/MQClientInstance.java
 
b/client/src/main/java/org/apache/rocketmq/client/impl/factory/MQClientInstance.java
index 9651943..db81143 100644
--- 
a/client/src/main/java/org/apache/rocketmq/client/impl/factory/MQClientInstance.java
+++ 
b/client/src/main/java/org/apache/rocketmq/client/impl/factory/MQClientInstance.java
@@ -967,36 +967,6 @@ public class MQClientInstance {
         return this.consumerTable.get(group);
     }
 
-    public FindBrokerResult findBrokerAddressInAdmin(final String brokerName) {
-        String brokerAddr = null;
-        boolean slave = false;
-        boolean found = false;
-
-        HashMap<Long/* brokerId */, String/* address */> map = 
this.brokerAddrTable.get(brokerName);
-        if (map != null && !map.isEmpty()) {
-            for (Map.Entry<Long, String> entry : map.entrySet()) {
-                Long id = entry.getKey();
-                brokerAddr = entry.getValue();
-                if (brokerAddr != null) {
-                    found = true;
-                    if (MixAll.MASTER_ID == id) {
-                        slave = false;
-                    } else {
-                        slave = true;
-                    }
-                    break;
-
-                }
-            } // end of for
-        }
-
-        if (found) {
-            return new FindBrokerResult(brokerAddr, slave, 
findBrokerVersion(brokerName, brokerAddr));
-        }
-
-        return null;
-    }
-
     public String findBrokerAddressInPublish(final String brokerName) {
         HashMap<Long/* brokerId */, String/* address */> map = 
this.brokerAddrTable.get(brokerName);
         if (map != null && !map.isEmpty()) {
diff --git 
a/client/src/test/java/org/apache/rocketmq/client/consumer/store/RemoteBrokerOffsetStoreTest.java
 
b/client/src/test/java/org/apache/rocketmq/client/consumer/store/RemoteBrokerOffsetStoreTest.java
index f762910..ec7a4cf 100644
--- 
a/client/src/test/java/org/apache/rocketmq/client/consumer/store/RemoteBrokerOffsetStoreTest.java
+++ 
b/client/src/test/java/org/apache/rocketmq/client/consumer/store/RemoteBrokerOffsetStoreTest.java
@@ -23,6 +23,7 @@ import org.apache.rocketmq.client.exception.MQBrokerException;
 import org.apache.rocketmq.client.impl.FindBrokerResult;
 import org.apache.rocketmq.client.impl.MQClientAPIImpl;
 import org.apache.rocketmq.client.impl.factory.MQClientInstance;
+import org.apache.rocketmq.common.MixAll;
 import org.apache.rocketmq.common.message.MessageQueue;
 import 
org.apache.rocketmq.common.protocol.header.QueryConsumerOffsetRequestHeader;
 import 
org.apache.rocketmq.common.protocol.header.UpdateConsumerOffsetRequestHeader;
@@ -58,7 +59,7 @@ public class RemoteBrokerOffsetStoreTest {
         System.setProperty("rocketmq.client.localOffsetStoreDir", 
System.getProperty("java.io.tmpdir") + ".rocketmq_offsets");
         String clientId = new ClientConfig().buildMQClientId() + 
"#TestNamespace" + System.currentTimeMillis();
         when(mQClientFactory.getClientId()).thenReturn(clientId);
-        
when(mQClientFactory.findBrokerAddressInAdmin(brokerName)).thenReturn(new 
FindBrokerResult("127.0.0.1", false));
+        when(mQClientFactory.findBrokerAddressInSubscribe(brokerName, 
MixAll.MASTER_ID, false)).thenReturn(new FindBrokerResult("127.0.0.1", false));
         when(mQClientFactory.getMQClientAPIImpl()).thenReturn(mqClientAPI);
     }
 

Reply via email to