This is an automated email from the ASF dual-hosted git repository. jinrongtong pushed a commit to branch 4.9.x in repository https://gitbox.apache.org/repos/asf/rocketmq.git
The following commit(s) were added to refs/heads/4.9.x by this push: new 8d2c6613be [ISSUE #7713] MQFaultStrategy check queue if writable 8d2c6613be is described below commit 8d2c6613be0500b1a9289a9d8db43d2d12642a74 Author: Lei Zhiyuan <leizhiy...@gmail.com> AuthorDate: Thu Jan 25 14:54:07 2024 +0800 [ISSUE #7713] MQFaultStrategy check queue if writable --- .../apache/rocketmq/client/impl/producer/TopicPublishInfo.java | 3 ++- .../org/apache/rocketmq/client/latency/MQFaultStrategy.java | 10 +++++++--- 2 files changed, 9 insertions(+), 4 deletions(-) diff --git a/client/src/main/java/org/apache/rocketmq/client/impl/producer/TopicPublishInfo.java b/client/src/main/java/org/apache/rocketmq/client/impl/producer/TopicPublishInfo.java index 2f8337edef..22d4eb9234 100644 --- a/client/src/main/java/org/apache/rocketmq/client/impl/producer/TopicPublishInfo.java +++ b/client/src/main/java/org/apache/rocketmq/client/impl/producer/TopicPublishInfo.java @@ -19,6 +19,7 @@ package org.apache.rocketmq.client.impl.producer; import java.util.ArrayList; import java.util.List; import org.apache.rocketmq.client.common.ThreadLocalIndex; +import org.apache.rocketmq.common.constant.PermName; import org.apache.rocketmq.common.message.MessageQueue; import org.apache.rocketmq.common.protocol.route.QueueData; import org.apache.rocketmq.common.protocol.route.TopicRouteData; @@ -95,7 +96,7 @@ public class TopicPublishInfo { public int getQueueIdByBroker(final String brokerName) { for (int i = 0; i < topicRouteData.getQueueDatas().size(); i++) { final QueueData queueData = this.topicRouteData.getQueueDatas().get(i); - if (queueData.getBrokerName().equals(brokerName)) { + if (PermName.isWriteable(queueData.getPerm()) && queueData.getBrokerName().equals(brokerName)) { return queueData.getWriteQueueNums(); } } diff --git a/client/src/main/java/org/apache/rocketmq/client/latency/MQFaultStrategy.java b/client/src/main/java/org/apache/rocketmq/client/latency/MQFaultStrategy.java index ea3d07e6d0..b6915cda07 100644 --- a/client/src/main/java/org/apache/rocketmq/client/latency/MQFaultStrategy.java +++ b/client/src/main/java/org/apache/rocketmq/client/latency/MQFaultStrategy.java @@ -73,10 +73,14 @@ public class MQFaultStrategy { if (writeQueueNums > 0) { final MessageQueue mq = tpInfo.selectOneMessageQueue(); if (notBestBroker != null) { - mq.setBrokerName(notBestBroker); - mq.setQueueId(tpInfo.getSendWhichQueue().incrementAndGet() % writeQueueNums); + MessageQueue selectedMessageQueue = new MessageQueue(); + selectedMessageQueue.setTopic(mq.getTopic()); + selectedMessageQueue.setBrokerName(notBestBroker); + selectedMessageQueue.setQueueId(tpInfo.getSendWhichQueue().incrementAndGet() % writeQueueNums); + return selectedMessageQueue; + } else { + return mq; } - return mq; } else { latencyFaultTolerance.remove(notBestBroker); }