This is an automated email from the ASF dual-hosted git repository.

lizhimin pushed a commit to branch develop
in repository https://gitbox.apache.org/repos/asf/rocketmq.git


The following commit(s) were added to refs/heads/develop by this push:
     new 586a274839 [ISSUE #9279] Restrict system subscription group creation 
and add pull request rejection policy (#9280)
586a274839 is described below

commit 586a2748394be4825d17af45f31731fb7d5649fc
Author: ymwneu <ymw...@126.com>
AuthorDate: Tue Apr 1 17:12:36 2025 +0800

    [ISSUE #9279] Restrict system subscription group creation and add pull 
request rejection policy (#9280)
---
 .../rocketmq/broker/coldctr/ColdDataCgCtrService.java       |  2 +-
 .../rocketmq/broker/processor/PullMessageProcessor.java     | 13 +++++++++++++
 .../broker/subscription/SubscriptionGroupManager.java       |  3 ++-
 .../main/java/org/apache/rocketmq/common/BrokerConfig.java  | 10 ++++++++++
 common/src/main/java/org/apache/rocketmq/common/MixAll.java |  6 +++++-
 .../java/org/apache/rocketmq/store/DefaultMessageStore.java |  2 +-
 6 files changed, 32 insertions(+), 4 deletions(-)

diff --git 
a/broker/src/main/java/org/apache/rocketmq/broker/coldctr/ColdDataCgCtrService.java
 
b/broker/src/main/java/org/apache/rocketmq/broker/coldctr/ColdDataCgCtrService.java
index 2e24930405..5b8b2fb9ce 100644
--- 
a/broker/src/main/java/org/apache/rocketmq/broker/coldctr/ColdDataCgCtrService.java
+++ 
b/broker/src/main/java/org/apache/rocketmq/broker/coldctr/ColdDataCgCtrService.java
@@ -187,7 +187,7 @@ public class ColdDataCgCtrService extends ServiceThread {
         if (!this.messageStoreConfig.isColdDataFlowControlEnable()) {
             return false;
         }
-        if (MixAll.isSysConsumerGroupForNoColdReadLimit(consumerGroup)) {
+        if (MixAll.isSysConsumerGroupPullMessage(consumerGroup)) {
             return false;
         }
         AccAndTimeStamp accAndTimeStamp = 
cgColdThresholdMapRuntime.get(consumerGroup);
diff --git 
a/broker/src/main/java/org/apache/rocketmq/broker/processor/PullMessageProcessor.java
 
b/broker/src/main/java/org/apache/rocketmq/broker/processor/PullMessageProcessor.java
index 5f0735e74c..5d947fd088 100644
--- 
a/broker/src/main/java/org/apache/rocketmq/broker/processor/PullMessageProcessor.java
+++ 
b/broker/src/main/java/org/apache/rocketmq/broker/processor/PullMessageProcessor.java
@@ -489,6 +489,19 @@ public class PullMessageProcessor implements 
NettyRequestProcessor {
                 this.brokerController.getConsumerFilterManager());
         }
 
+        if (brokerController.getBrokerConfig().isRejectPullConsumerEnable()) {
+            ConsumerGroupInfo consumerGroupInfo =
+                    
this.brokerController.getConsumerManager().getConsumerGroupInfo(requestHeader.getConsumerGroup());
+            if (null == consumerGroupInfo || ConsumeType.CONSUME_ACTIVELY == 
consumerGroupInfo.getConsumeType()) {
+                if ((null == consumerGroupInfo || null == 
consumerGroupInfo.findChannel(channel))
+                        && 
!MixAll.isSysConsumerGroupPullMessage(requestHeader.getConsumerGroup())) {
+                    response.setCode(ResponseCode.SUBSCRIPTION_NOT_EXIST);
+                    response.setRemark("the consumer's group info not exist, 
or the pull consumer is rejected by server." + 
FAQUrl.suggestTodo(FAQUrl.SUBSCRIPTION_GROUP_NOT_EXIST));
+                    return response;
+                }
+            }
+        }
+
         final MessageStore messageStore = brokerController.getMessageStore();
         if (this.brokerController.getMessageStore() instanceof 
DefaultMessageStore) {
             DefaultMessageStore defaultMessageStore = (DefaultMessageStore) 
this.brokerController.getMessageStore();
diff --git 
a/broker/src/main/java/org/apache/rocketmq/broker/subscription/SubscriptionGroupManager.java
 
b/broker/src/main/java/org/apache/rocketmq/broker/subscription/SubscriptionGroupManager.java
index d85342e1a1..f3e669fb3e 100644
--- 
a/broker/src/main/java/org/apache/rocketmq/broker/subscription/SubscriptionGroupManager.java
+++ 
b/broker/src/main/java/org/apache/rocketmq/broker/subscription/SubscriptionGroupManager.java
@@ -260,7 +260,8 @@ public class SubscriptionGroupManager extends ConfigManager 
{
     public SubscriptionGroupConfig findSubscriptionGroupConfig(final String 
group) {
         SubscriptionGroupConfig subscriptionGroupConfig = 
getSubscriptionGroupConfig(group);
         if (null == subscriptionGroupConfig) {
-            if 
(brokerController.getBrokerConfig().isAutoCreateSubscriptionGroup() || 
MixAll.isSysConsumerGroup(group)) {
+            if 
(brokerController.getBrokerConfig().isAutoCreateSubscriptionGroup()
+                    || MixAll.isSysConsumerGroupAndEnableCreate(group, 
brokerController.getBrokerConfig().isEnableCreateSysGroup())) {
                 if (group.length() > Validators.CHARACTER_MAX_LENGTH || 
TopicValidator.isTopicOrGroupIllegal(group)) {
                     return null;
                 }
diff --git a/common/src/main/java/org/apache/rocketmq/common/BrokerConfig.java 
b/common/src/main/java/org/apache/rocketmq/common/BrokerConfig.java
index b7ec944505..44f5e1eff0 100644
--- a/common/src/main/java/org/apache/rocketmq/common/BrokerConfig.java
+++ b/common/src/main/java/org/apache/rocketmq/common/BrokerConfig.java
@@ -457,6 +457,8 @@ public class BrokerConfig extends BrokerIdentity {
 
     private boolean recallMessageEnable = false;
 
+    private boolean enableCreateSysGroup = true;
+
     public String getConfigBlackList() {
         return configBlackList;
     }
@@ -2016,4 +2018,12 @@ public class BrokerConfig extends BrokerIdentity {
     public void setRecallMessageEnable(boolean recallMessageEnable) {
         this.recallMessageEnable = recallMessageEnable;
     }
+
+    public boolean isEnableCreateSysGroup() {
+        return enableCreateSysGroup;
+    }
+
+    public void setEnableCreateSysGroup(boolean enableCreateSysGroup) {
+        this.enableCreateSysGroup = enableCreateSysGroup;
+    }
 }
diff --git a/common/src/main/java/org/apache/rocketmq/common/MixAll.java 
b/common/src/main/java/org/apache/rocketmq/common/MixAll.java
index c05a1d1926..aca9bd4ed7 100644
--- a/common/src/main/java/org/apache/rocketmq/common/MixAll.java
+++ b/common/src/main/java/org/apache/rocketmq/common/MixAll.java
@@ -178,6 +178,10 @@ public class MixAll {
         return consumerGroup.startsWith(CID_RMQ_SYS_PREFIX);
     }
 
+    public static boolean isSysConsumerGroupAndEnableCreate(final String 
consumerGroup, final boolean isEnableCreateSysGroup) {
+        return isEnableCreateSysGroup && isSysConsumerGroup(consumerGroup);
+    }
+
     public static boolean isPredefinedGroup(final String consumerGroup) {
         return PREDEFINE_GROUP_SET.contains(consumerGroup);
     }
@@ -530,7 +534,7 @@ public class MixAll {
         return path.normalize().toString();
     }
 
-    public static boolean isSysConsumerGroupForNoColdReadLimit(String 
consumerGroup) {
+    public static boolean isSysConsumerGroupPullMessage(String consumerGroup) {
         if (DEFAULT_CONSUMER_GROUP.equals(consumerGroup)
             || TOOLS_CONSUMER_GROUP.equals(consumerGroup)
             || SCHEDULE_CONSUMER_GROUP.equals(consumerGroup)
diff --git 
a/store/src/main/java/org/apache/rocketmq/store/DefaultMessageStore.java 
b/store/src/main/java/org/apache/rocketmq/store/DefaultMessageStore.java
index 13af812c3f..fc6bc4213a 100644
--- a/store/src/main/java/org/apache/rocketmq/store/DefaultMessageStore.java
+++ b/store/src/main/java/org/apache/rocketmq/store/DefaultMessageStore.java
@@ -917,7 +917,7 @@ public class DefaultMessageStore implements MessageStore {
                                 continue;
                             }
 
-                            if 
(messageStoreConfig.isColdDataFlowControlEnable() && 
!MixAll.isSysConsumerGroupForNoColdReadLimit(group) && 
!selectResult.isInCache()) {
+                            if 
(messageStoreConfig.isColdDataFlowControlEnable() && 
!MixAll.isSysConsumerGroupPullMessage(group) && !selectResult.isInCache()) {
                                 
getResult.setColdDataSum(getResult.getColdDataSum() + sizePy);
                             }
 

Reply via email to