This is an automated email from the ASF dual-hosted git repository.

lizhimin pushed a commit to branch develop
in repository https://gitbox.apache.org/repos/asf/rocketmq.git


The following commit(s) were added to refs/heads/develop by this push:
     new d63373a152 [ISSUE #8957] Remove excess traffic and fix cache 
inconsistencies (#8958)
d63373a152 is described below

commit d63373a152ebd395cdce6a2e04e01b62e54c76af
Author: hqbfz <125714719+3424672...@users.noreply.github.com>
AuthorDate: Wed Dec 25 14:38:57 2024 +0800

    [ISSUE #8957] Remove excess traffic and fix cache inconsistencies (#8958)
---
 .../client/impl/consumer/RebalanceImpl.java        | 53 +---------------------
 1 file changed, 1 insertion(+), 52 deletions(-)

diff --git 
a/client/src/main/java/org/apache/rocketmq/client/impl/consumer/RebalanceImpl.java
 
b/client/src/main/java/org/apache/rocketmq/client/impl/consumer/RebalanceImpl.java
index d1f0d116e0..b6f1d99b1c 100644
--- 
a/client/src/main/java/org/apache/rocketmq/client/impl/consumer/RebalanceImpl.java
+++ 
b/client/src/main/java/org/apache/rocketmq/client/impl/consumer/RebalanceImpl.java
@@ -36,7 +36,6 @@ import org.apache.rocketmq.common.MixAll;
 import org.apache.rocketmq.common.message.MessageQueue;
 import org.apache.rocketmq.common.message.MessageQueueAssignment;
 import org.apache.rocketmq.common.message.MessageRequestMode;
-import org.apache.rocketmq.remoting.exception.RemotingTimeoutException;
 import org.apache.rocketmq.remoting.protocol.body.LockBatchRequestBody;
 import org.apache.rocketmq.remoting.protocol.body.UnlockBatchRequestBody;
 import org.apache.rocketmq.remoting.protocol.filter.FilterAPI;
@@ -60,12 +59,8 @@ public abstract class RebalanceImpl {
     protected MessageModel messageModel;
     protected AllocateMessageQueueStrategy allocateMessageQueueStrategy;
     protected MQClientInstance mQClientFactory;
-    private static final int TIMEOUT_CHECK_TIMES = 3;
     private static final int QUERY_ASSIGNMENT_TIMEOUT = 3000;
 
-    private Map<String, String> topicBrokerRebalance = new 
ConcurrentHashMap<>();
-    private Map<String, String> topicClientRebalance = new 
ConcurrentHashMap<>();
-
     public RebalanceImpl(String consumerGroup, MessageModel messageModel,
         AllocateMessageQueueStrategy allocateMessageQueueStrategy,
         MQClientInstance mQClientFactory) {
@@ -241,7 +236,7 @@ public abstract class RebalanceImpl {
             for (final Map.Entry<String, SubscriptionData> entry : 
subTable.entrySet()) {
                 final String topic = entry.getKey();
                 try {
-                    if (!clientRebalance(topic) && tryQueryAssignment(topic)) {
+                    if (!clientRebalance(topic)) {
                         boolean result = 
this.getRebalanceResultFromBroker(topic, isOrder);
                         if (!result) {
                             balanced = false;
@@ -266,38 +261,6 @@ public abstract class RebalanceImpl {
         return balanced;
     }
 
-    private boolean tryQueryAssignment(String topic) {
-        if (topicClientRebalance.containsKey(topic)) {
-            return false;
-        }
-
-        if (topicBrokerRebalance.containsKey(topic)) {
-            return true;
-        }
-        String strategyName = allocateMessageQueueStrategy != null ? 
allocateMessageQueueStrategy.getName() : null;
-        int retryTimes = 0;
-        while (retryTimes++ < TIMEOUT_CHECK_TIMES) {
-            try {
-                Set<MessageQueueAssignment> resultSet = 
mQClientFactory.queryAssignment(topic, consumerGroup,
-                    strategyName, messageModel, QUERY_ASSIGNMENT_TIMEOUT / 
TIMEOUT_CHECK_TIMES * retryTimes);
-                topicBrokerRebalance.put(topic, topic);
-                return true;
-            } catch (Throwable t) {
-                if (!(t instanceof RemotingTimeoutException)) {
-                    log.error("tryQueryAssignment error.", t);
-                    topicClientRebalance.put(topic, topic);
-                    return false;
-                }
-            }
-        }
-        if (retryTimes >= TIMEOUT_CHECK_TIMES) {
-            // if never success before and timeout exceed TIMEOUT_CHECK_TIMES, 
force client rebalance
-            topicClientRebalance.put(topic, topic);
-            return false;
-        }
-        return true;
-    }
-
     public ConcurrentMap<String, SubscriptionData> getSubscriptionInner() {
         return subscriptionInner;
     }
@@ -460,20 +423,6 @@ public abstract class RebalanceImpl {
                 }
             }
         }
-
-        Iterator<Map.Entry<String, String>> clientIter = 
topicClientRebalance.entrySet().iterator();
-        while (clientIter.hasNext()) {
-            if (!subTable.containsKey(clientIter.next().getKey())) {
-                clientIter.remove();
-            }
-        }
-
-        Iterator<Map.Entry<String, String>> brokerIter = 
topicBrokerRebalance.entrySet().iterator();
-        while (brokerIter.hasNext()) {
-            if (!subTable.containsKey(brokerIter.next().getKey())) {
-                brokerIter.remove();
-            }
-        }
     }
 
     private boolean updateProcessQueueTableInRebalance(final String topic, 
final Set<MessageQueue> mqSet,

Reply via email to