This is an automated email from the ASF dual-hosted git repository. lizhimin pushed a commit to branch develop in repository https://gitbox.apache.org/repos/asf/rocketmq.git
The following commit(s) were added to refs/heads/develop by this push: new d63373a152 [ISSUE #8957] Remove excess traffic and fix cache inconsistencies (#8958) d63373a152 is described below commit d63373a152ebd395cdce6a2e04e01b62e54c76af Author: hqbfz <125714719+3424672...@users.noreply.github.com> AuthorDate: Wed Dec 25 14:38:57 2024 +0800 [ISSUE #8957] Remove excess traffic and fix cache inconsistencies (#8958) --- .../client/impl/consumer/RebalanceImpl.java | 53 +--------------------- 1 file changed, 1 insertion(+), 52 deletions(-) diff --git a/client/src/main/java/org/apache/rocketmq/client/impl/consumer/RebalanceImpl.java b/client/src/main/java/org/apache/rocketmq/client/impl/consumer/RebalanceImpl.java index d1f0d116e0..b6f1d99b1c 100644 --- a/client/src/main/java/org/apache/rocketmq/client/impl/consumer/RebalanceImpl.java +++ b/client/src/main/java/org/apache/rocketmq/client/impl/consumer/RebalanceImpl.java @@ -36,7 +36,6 @@ import org.apache.rocketmq.common.MixAll; import org.apache.rocketmq.common.message.MessageQueue; import org.apache.rocketmq.common.message.MessageQueueAssignment; import org.apache.rocketmq.common.message.MessageRequestMode; -import org.apache.rocketmq.remoting.exception.RemotingTimeoutException; import org.apache.rocketmq.remoting.protocol.body.LockBatchRequestBody; import org.apache.rocketmq.remoting.protocol.body.UnlockBatchRequestBody; import org.apache.rocketmq.remoting.protocol.filter.FilterAPI; @@ -60,12 +59,8 @@ public abstract class RebalanceImpl { protected MessageModel messageModel; protected AllocateMessageQueueStrategy allocateMessageQueueStrategy; protected MQClientInstance mQClientFactory; - private static final int TIMEOUT_CHECK_TIMES = 3; private static final int QUERY_ASSIGNMENT_TIMEOUT = 3000; - private Map<String, String> topicBrokerRebalance = new ConcurrentHashMap<>(); - private Map<String, String> topicClientRebalance = new ConcurrentHashMap<>(); - public RebalanceImpl(String consumerGroup, MessageModel messageModel, AllocateMessageQueueStrategy allocateMessageQueueStrategy, MQClientInstance mQClientFactory) { @@ -241,7 +236,7 @@ public abstract class RebalanceImpl { for (final Map.Entry<String, SubscriptionData> entry : subTable.entrySet()) { final String topic = entry.getKey(); try { - if (!clientRebalance(topic) && tryQueryAssignment(topic)) { + if (!clientRebalance(topic)) { boolean result = this.getRebalanceResultFromBroker(topic, isOrder); if (!result) { balanced = false; @@ -266,38 +261,6 @@ public abstract class RebalanceImpl { return balanced; } - private boolean tryQueryAssignment(String topic) { - if (topicClientRebalance.containsKey(topic)) { - return false; - } - - if (topicBrokerRebalance.containsKey(topic)) { - return true; - } - String strategyName = allocateMessageQueueStrategy != null ? allocateMessageQueueStrategy.getName() : null; - int retryTimes = 0; - while (retryTimes++ < TIMEOUT_CHECK_TIMES) { - try { - Set<MessageQueueAssignment> resultSet = mQClientFactory.queryAssignment(topic, consumerGroup, - strategyName, messageModel, QUERY_ASSIGNMENT_TIMEOUT / TIMEOUT_CHECK_TIMES * retryTimes); - topicBrokerRebalance.put(topic, topic); - return true; - } catch (Throwable t) { - if (!(t instanceof RemotingTimeoutException)) { - log.error("tryQueryAssignment error.", t); - topicClientRebalance.put(topic, topic); - return false; - } - } - } - if (retryTimes >= TIMEOUT_CHECK_TIMES) { - // if never success before and timeout exceed TIMEOUT_CHECK_TIMES, force client rebalance - topicClientRebalance.put(topic, topic); - return false; - } - return true; - } - public ConcurrentMap<String, SubscriptionData> getSubscriptionInner() { return subscriptionInner; } @@ -460,20 +423,6 @@ public abstract class RebalanceImpl { } } } - - Iterator<Map.Entry<String, String>> clientIter = topicClientRebalance.entrySet().iterator(); - while (clientIter.hasNext()) { - if (!subTable.containsKey(clientIter.next().getKey())) { - clientIter.remove(); - } - } - - Iterator<Map.Entry<String, String>> brokerIter = topicBrokerRebalance.entrySet().iterator(); - while (brokerIter.hasNext()) { - if (!subTable.containsKey(brokerIter.next().getKey())) { - brokerIter.remove(); - } - } } private boolean updateProcessQueueTableInRebalance(final String topic, final Set<MessageQueue> mqSet,