This is an automated email from the ASF dual-hosted git repository. lollipop pushed a commit to branch develop in repository https://gitbox.apache.org/repos/asf/rocketmq.git
The following commit(s) were added to refs/heads/develop by this push: new 87b97f271c [ISSUE #8917]Topic route return none permission message queues for gRPC client (#8919) 87b97f271c is described below commit 87b97f271c96bdbb320b1e127cbeaaa4e83c4c2a Author: dingshuangxi888 <dingshuangxi...@gmail.com> AuthorDate: Thu Nov 14 19:24:19 2024 +0800 [ISSUE #8917]Topic route return none permission message queues for gRPC client (#8919) * When the queue lacks permission, return one queue to allow the client to upload a heartbeat for gRPC Topic route interface. --- .../apache/rocketmq/common/constant/PermName.java | 4 ++++ .../rocketmq/proxy/grpc/v2/route/RouteActivity.java | 12 ++++++++++++ .../proxy/grpc/v2/route/RouteActivityTest.java | 20 ++++++++++++++++++++ 3 files changed, 36 insertions(+) diff --git a/common/src/main/java/org/apache/rocketmq/common/constant/PermName.java b/common/src/main/java/org/apache/rocketmq/common/constant/PermName.java index d87461d7f5..d9a26a2be1 100644 --- a/common/src/main/java/org/apache/rocketmq/common/constant/PermName.java +++ b/common/src/main/java/org/apache/rocketmq/common/constant/PermName.java @@ -68,4 +68,8 @@ public class PermName { public static boolean isPriority(final int perm) { return (perm & PERM_PRIORITY) == PERM_PRIORITY; } + + public static boolean isAccessible(final int perm) { + return isReadable(perm) || isWriteable(perm); + } } diff --git a/proxy/src/main/java/org/apache/rocketmq/proxy/grpc/v2/route/RouteActivity.java b/proxy/src/main/java/org/apache/rocketmq/proxy/grpc/v2/route/RouteActivity.java index fe14fe01c6..20ae3aa6c8 100644 --- a/proxy/src/main/java/org/apache/rocketmq/proxy/grpc/v2/route/RouteActivity.java +++ b/proxy/src/main/java/org/apache/rocketmq/proxy/grpc/v2/route/RouteActivity.java @@ -244,6 +244,7 @@ public class RouteActivity extends AbstractMessingActivity { int r = 0; int w = 0; int rw = 0; + int n = 0; if (PermName.isWriteable(queueData.getPerm()) && PermName.isReadable(queueData.getPerm())) { rw = Math.min(queueData.getWriteQueueNums(), queueData.getReadQueueNums()); r = queueData.getReadQueueNums() - rw; @@ -252,6 +253,8 @@ public class RouteActivity extends AbstractMessingActivity { w = queueData.getWriteQueueNums(); } else if (PermName.isReadable(queueData.getPerm())) { r = queueData.getReadQueueNums(); + } else if (!PermName.isAccessible(queueData.getPerm())) { + n = Math.max(1, Math.max(queueData.getWriteQueueNums(), queueData.getReadQueueNums())); } // r here means readOnly queue nums, w means writeOnly queue nums, while rw means both readable and writable queue nums. @@ -283,6 +286,15 @@ public class RouteActivity extends AbstractMessingActivity { messageQueueList.add(messageQueue); } + for (int i = 0; i < n; i++) { + MessageQueue messageQueue = MessageQueue.newBuilder().setBroker(broker).setTopic(topic) + .setId(queueIdIndex++) + .setPermission(Permission.NONE) + .addAllAcceptMessageTypes(parseTopicMessageType(topicMessageType)) + .build(); + messageQueueList.add(messageQueue); + } + return messageQueueList; } diff --git a/proxy/src/test/java/org/apache/rocketmq/proxy/grpc/v2/route/RouteActivityTest.java b/proxy/src/test/java/org/apache/rocketmq/proxy/grpc/v2/route/RouteActivityTest.java index a7ba69098b..abbf82452e 100644 --- a/proxy/src/test/java/org/apache/rocketmq/proxy/grpc/v2/route/RouteActivityTest.java +++ b/proxy/src/test/java/org/apache/rocketmq/proxy/grpc/v2/route/RouteActivityTest.java @@ -272,6 +272,26 @@ public class RouteActivityTest extends BaseActivityTest { assertEquals(4, partitionWith4R8WPermRW.stream().filter(a -> a.getPermission() == Permission.WRITE).count()); assertEquals(4, partitionWith4R8WPermRW.stream().filter(a -> a.getPermission() == Permission.READ_WRITE).count()); assertEquals(0, partitionWith4R8WPermRW.stream().filter(a -> a.getPermission() == Permission.READ).count()); + + // test queueData with 2 read queues, 2 write queues, and no permission, expect 2 no permission queues. + QueueData queueDataWith2R2WNoPerm = createQueueData(2, 2, 0); + List<MessageQueue> partitionWith2R2WNoPerm = this.routeActivity.genMessageQueueFromQueueData(queueDataWith2R2WNoPerm, GRPC_TOPIC, TopicMessageType.UNSPECIFIED, GRPC_BROKER); + assertEquals(2, partitionWith2R2WNoPerm.size()); + assertEquals(2, partitionWith2R2WNoPerm.stream().filter(a -> a.getAcceptMessageTypesValue(0) == MessageType.MESSAGE_TYPE_UNSPECIFIED.getNumber()).count()); + assertEquals(2, partitionWith2R2WNoPerm.stream().filter(a -> a.getPermission() == Permission.NONE).count()); + assertEquals(0, partitionWith2R2WNoPerm.stream().filter(a -> a.getPermission() == Permission.WRITE).count()); + assertEquals(0, partitionWith2R2WNoPerm.stream().filter(a -> a.getPermission() == Permission.READ_WRITE).count()); + assertEquals(0, partitionWith2R2WNoPerm.stream().filter(a -> a.getPermission() == Permission.READ).count()); + + // test queueData with 0 read queues, 0 write queues, and no permission, expect 1 no permission queue. + QueueData queueDataWith0R0WNoPerm = createQueueData(0, 0, 0); + List<MessageQueue> partitionWith0R0WNoPerm = this.routeActivity.genMessageQueueFromQueueData(queueDataWith0R0WNoPerm, GRPC_TOPIC, TopicMessageType.UNSPECIFIED, GRPC_BROKER); + assertEquals(1, partitionWith0R0WNoPerm.size()); + assertEquals(1, partitionWith0R0WNoPerm.stream().filter(a -> a.getAcceptMessageTypesValue(0) == MessageType.MESSAGE_TYPE_UNSPECIFIED.getNumber()).count()); + assertEquals(1, partitionWith0R0WNoPerm.stream().filter(a -> a.getPermission() == Permission.NONE).count()); + assertEquals(0, partitionWith0R0WNoPerm.stream().filter(a -> a.getPermission() == Permission.WRITE).count()); + assertEquals(0, partitionWith0R0WNoPerm.stream().filter(a -> a.getPermission() == Permission.READ_WRITE).count()); + assertEquals(0, partitionWith0R0WNoPerm.stream().filter(a -> a.getPermission() == Permission.READ).count()); } private static QueueData createQueueData(int r, int w, int perm) {