This is an automated email from the ASF dual-hosted git repository.

lollipop pushed a commit to branch develop
in repository https://gitbox.apache.org/repos/asf/rocketmq.git


The following commit(s) were added to refs/heads/develop by this push:
     new 87b97f271c [ISSUE #8917]Topic route return none permission message 
queues for gRPC client  (#8919)
87b97f271c is described below

commit 87b97f271c96bdbb320b1e127cbeaaa4e83c4c2a
Author: dingshuangxi888 <dingshuangxi...@gmail.com>
AuthorDate: Thu Nov 14 19:24:19 2024 +0800

    [ISSUE #8917]Topic route return none permission message queues for gRPC 
client  (#8919)
    
    * When the queue lacks permission, return one queue to allow the client to 
upload a heartbeat for gRPC Topic route interface.
---
 .../apache/rocketmq/common/constant/PermName.java    |  4 ++++
 .../rocketmq/proxy/grpc/v2/route/RouteActivity.java  | 12 ++++++++++++
 .../proxy/grpc/v2/route/RouteActivityTest.java       | 20 ++++++++++++++++++++
 3 files changed, 36 insertions(+)

diff --git 
a/common/src/main/java/org/apache/rocketmq/common/constant/PermName.java 
b/common/src/main/java/org/apache/rocketmq/common/constant/PermName.java
index d87461d7f5..d9a26a2be1 100644
--- a/common/src/main/java/org/apache/rocketmq/common/constant/PermName.java
+++ b/common/src/main/java/org/apache/rocketmq/common/constant/PermName.java
@@ -68,4 +68,8 @@ public class PermName {
     public static boolean isPriority(final int perm) {
         return (perm & PERM_PRIORITY) == PERM_PRIORITY;
     }
+
+    public static boolean isAccessible(final int perm) {
+        return isReadable(perm) || isWriteable(perm);
+    }
 }
diff --git 
a/proxy/src/main/java/org/apache/rocketmq/proxy/grpc/v2/route/RouteActivity.java
 
b/proxy/src/main/java/org/apache/rocketmq/proxy/grpc/v2/route/RouteActivity.java
index fe14fe01c6..20ae3aa6c8 100644
--- 
a/proxy/src/main/java/org/apache/rocketmq/proxy/grpc/v2/route/RouteActivity.java
+++ 
b/proxy/src/main/java/org/apache/rocketmq/proxy/grpc/v2/route/RouteActivity.java
@@ -244,6 +244,7 @@ public class RouteActivity extends AbstractMessingActivity {
         int r = 0;
         int w = 0;
         int rw = 0;
+        int n = 0;
         if (PermName.isWriteable(queueData.getPerm()) && 
PermName.isReadable(queueData.getPerm())) {
             rw = Math.min(queueData.getWriteQueueNums(), 
queueData.getReadQueueNums());
             r = queueData.getReadQueueNums() - rw;
@@ -252,6 +253,8 @@ public class RouteActivity extends AbstractMessingActivity {
             w = queueData.getWriteQueueNums();
         } else if (PermName.isReadable(queueData.getPerm())) {
             r = queueData.getReadQueueNums();
+        } else if (!PermName.isAccessible(queueData.getPerm())) {
+            n = Math.max(1, Math.max(queueData.getWriteQueueNums(), 
queueData.getReadQueueNums()));
         }
 
         // r here means readOnly queue nums, w means writeOnly queue nums, 
while rw means both readable and writable queue nums.
@@ -283,6 +286,15 @@ public class RouteActivity extends AbstractMessingActivity 
{
             messageQueueList.add(messageQueue);
         }
 
+        for (int i = 0; i < n; i++) {
+            MessageQueue messageQueue = 
MessageQueue.newBuilder().setBroker(broker).setTopic(topic)
+                .setId(queueIdIndex++)
+                .setPermission(Permission.NONE)
+                
.addAllAcceptMessageTypes(parseTopicMessageType(topicMessageType))
+                .build();
+            messageQueueList.add(messageQueue);
+        }
+
         return messageQueueList;
     }
 
diff --git 
a/proxy/src/test/java/org/apache/rocketmq/proxy/grpc/v2/route/RouteActivityTest.java
 
b/proxy/src/test/java/org/apache/rocketmq/proxy/grpc/v2/route/RouteActivityTest.java
index a7ba69098b..abbf82452e 100644
--- 
a/proxy/src/test/java/org/apache/rocketmq/proxy/grpc/v2/route/RouteActivityTest.java
+++ 
b/proxy/src/test/java/org/apache/rocketmq/proxy/grpc/v2/route/RouteActivityTest.java
@@ -272,6 +272,26 @@ public class RouteActivityTest extends BaseActivityTest {
         assertEquals(4, partitionWith4R8WPermRW.stream().filter(a -> 
a.getPermission() == Permission.WRITE).count());
         assertEquals(4, partitionWith4R8WPermRW.stream().filter(a -> 
a.getPermission() == Permission.READ_WRITE).count());
         assertEquals(0, partitionWith4R8WPermRW.stream().filter(a -> 
a.getPermission() == Permission.READ).count());
+
+        // test queueData with 2 read queues, 2 write queues, and no 
permission, expect 2 no permission queues.
+        QueueData queueDataWith2R2WNoPerm = createQueueData(2, 2, 0);
+        List<MessageQueue> partitionWith2R2WNoPerm = 
this.routeActivity.genMessageQueueFromQueueData(queueDataWith2R2WNoPerm, 
GRPC_TOPIC, TopicMessageType.UNSPECIFIED, GRPC_BROKER);
+        assertEquals(2, partitionWith2R2WNoPerm.size());
+        assertEquals(2, partitionWith2R2WNoPerm.stream().filter(a -> 
a.getAcceptMessageTypesValue(0) == 
MessageType.MESSAGE_TYPE_UNSPECIFIED.getNumber()).count());
+        assertEquals(2, partitionWith2R2WNoPerm.stream().filter(a -> 
a.getPermission() == Permission.NONE).count());
+        assertEquals(0, partitionWith2R2WNoPerm.stream().filter(a -> 
a.getPermission() == Permission.WRITE).count());
+        assertEquals(0, partitionWith2R2WNoPerm.stream().filter(a -> 
a.getPermission() == Permission.READ_WRITE).count());
+        assertEquals(0, partitionWith2R2WNoPerm.stream().filter(a -> 
a.getPermission() == Permission.READ).count());
+
+        // test queueData with 0 read queues, 0 write queues, and no 
permission, expect 1 no permission queue.
+        QueueData queueDataWith0R0WNoPerm = createQueueData(0, 0, 0);
+        List<MessageQueue> partitionWith0R0WNoPerm = 
this.routeActivity.genMessageQueueFromQueueData(queueDataWith0R0WNoPerm, 
GRPC_TOPIC, TopicMessageType.UNSPECIFIED, GRPC_BROKER);
+        assertEquals(1, partitionWith0R0WNoPerm.size());
+        assertEquals(1, partitionWith0R0WNoPerm.stream().filter(a -> 
a.getAcceptMessageTypesValue(0) == 
MessageType.MESSAGE_TYPE_UNSPECIFIED.getNumber()).count());
+        assertEquals(1, partitionWith0R0WNoPerm.stream().filter(a -> 
a.getPermission() == Permission.NONE).count());
+        assertEquals(0, partitionWith0R0WNoPerm.stream().filter(a -> 
a.getPermission() == Permission.WRITE).count());
+        assertEquals(0, partitionWith0R0WNoPerm.stream().filter(a -> 
a.getPermission() == Permission.READ_WRITE).count());
+        assertEquals(0, partitionWith0R0WNoPerm.stream().filter(a -> 
a.getPermission() == Permission.READ).count());
     }
 
     private static QueueData createQueueData(int r, int w, int perm) {

Reply via email to