This is an automated email from the ASF dual-hosted git repository.

jinrongtong pushed a commit to branch 4.9.x
in repository https://gitbox.apache.org/repos/asf/rocketmq.git


The following commit(s) were added to refs/heads/4.9.x by this push:
     new 8d2c6613be [ISSUE #7713] MQFaultStrategy check queue if writable
8d2c6613be is described below

commit 8d2c6613be0500b1a9289a9d8db43d2d12642a74
Author: Lei Zhiyuan <leizhiy...@gmail.com>
AuthorDate: Thu Jan 25 14:54:07 2024 +0800

    [ISSUE #7713] MQFaultStrategy check queue if writable
---
 .../apache/rocketmq/client/impl/producer/TopicPublishInfo.java |  3 ++-
 .../org/apache/rocketmq/client/latency/MQFaultStrategy.java    | 10 +++++++---
 2 files changed, 9 insertions(+), 4 deletions(-)

diff --git 
a/client/src/main/java/org/apache/rocketmq/client/impl/producer/TopicPublishInfo.java
 
b/client/src/main/java/org/apache/rocketmq/client/impl/producer/TopicPublishInfo.java
index 2f8337edef..22d4eb9234 100644
--- 
a/client/src/main/java/org/apache/rocketmq/client/impl/producer/TopicPublishInfo.java
+++ 
b/client/src/main/java/org/apache/rocketmq/client/impl/producer/TopicPublishInfo.java
@@ -19,6 +19,7 @@ package org.apache.rocketmq.client.impl.producer;
 import java.util.ArrayList;
 import java.util.List;
 import org.apache.rocketmq.client.common.ThreadLocalIndex;
+import org.apache.rocketmq.common.constant.PermName;
 import org.apache.rocketmq.common.message.MessageQueue;
 import org.apache.rocketmq.common.protocol.route.QueueData;
 import org.apache.rocketmq.common.protocol.route.TopicRouteData;
@@ -95,7 +96,7 @@ public class TopicPublishInfo {
     public int getQueueIdByBroker(final String brokerName) {
         for (int i = 0; i < topicRouteData.getQueueDatas().size(); i++) {
             final QueueData queueData = 
this.topicRouteData.getQueueDatas().get(i);
-            if (queueData.getBrokerName().equals(brokerName)) {
+            if (PermName.isWriteable(queueData.getPerm()) && 
queueData.getBrokerName().equals(brokerName)) {
                 return queueData.getWriteQueueNums();
             }
         }
diff --git 
a/client/src/main/java/org/apache/rocketmq/client/latency/MQFaultStrategy.java 
b/client/src/main/java/org/apache/rocketmq/client/latency/MQFaultStrategy.java
index ea3d07e6d0..b6915cda07 100644
--- 
a/client/src/main/java/org/apache/rocketmq/client/latency/MQFaultStrategy.java
+++ 
b/client/src/main/java/org/apache/rocketmq/client/latency/MQFaultStrategy.java
@@ -73,10 +73,14 @@ public class MQFaultStrategy {
                 if (writeQueueNums > 0) {
                     final MessageQueue mq = tpInfo.selectOneMessageQueue();
                     if (notBestBroker != null) {
-                        mq.setBrokerName(notBestBroker);
-                        
mq.setQueueId(tpInfo.getSendWhichQueue().incrementAndGet() % writeQueueNums);
+                        MessageQueue selectedMessageQueue = new MessageQueue();
+                        selectedMessageQueue.setTopic(mq.getTopic());
+                        selectedMessageQueue.setBrokerName(notBestBroker);
+                        
selectedMessageQueue.setQueueId(tpInfo.getSendWhichQueue().incrementAndGet() % 
writeQueueNums);
+                        return selectedMessageQueue;
+                    } else {
+                        return mq;
                     }
-                    return mq;
                 } else {
                     latencyFaultTolerance.remove(notBestBroker);
                 }

Reply via email to