This is an automated email from the ASF dual-hosted git repository. lizhimin pushed a commit to branch develop in repository https://gitbox.apache.org/repos/asf/rocketmq.git
The following commit(s) were added to refs/heads/develop by this push: new 586a274839 [ISSUE #9279] Restrict system subscription group creation and add pull request rejection policy (#9280) 586a274839 is described below commit 586a2748394be4825d17af45f31731fb7d5649fc Author: ymwneu <ymw...@126.com> AuthorDate: Tue Apr 1 17:12:36 2025 +0800 [ISSUE #9279] Restrict system subscription group creation and add pull request rejection policy (#9280) --- .../rocketmq/broker/coldctr/ColdDataCgCtrService.java | 2 +- .../rocketmq/broker/processor/PullMessageProcessor.java | 13 +++++++++++++ .../broker/subscription/SubscriptionGroupManager.java | 3 ++- .../main/java/org/apache/rocketmq/common/BrokerConfig.java | 10 ++++++++++ common/src/main/java/org/apache/rocketmq/common/MixAll.java | 6 +++++- .../java/org/apache/rocketmq/store/DefaultMessageStore.java | 2 +- 6 files changed, 32 insertions(+), 4 deletions(-) diff --git a/broker/src/main/java/org/apache/rocketmq/broker/coldctr/ColdDataCgCtrService.java b/broker/src/main/java/org/apache/rocketmq/broker/coldctr/ColdDataCgCtrService.java index 2e24930405..5b8b2fb9ce 100644 --- a/broker/src/main/java/org/apache/rocketmq/broker/coldctr/ColdDataCgCtrService.java +++ b/broker/src/main/java/org/apache/rocketmq/broker/coldctr/ColdDataCgCtrService.java @@ -187,7 +187,7 @@ public class ColdDataCgCtrService extends ServiceThread { if (!this.messageStoreConfig.isColdDataFlowControlEnable()) { return false; } - if (MixAll.isSysConsumerGroupForNoColdReadLimit(consumerGroup)) { + if (MixAll.isSysConsumerGroupPullMessage(consumerGroup)) { return false; } AccAndTimeStamp accAndTimeStamp = cgColdThresholdMapRuntime.get(consumerGroup); diff --git a/broker/src/main/java/org/apache/rocketmq/broker/processor/PullMessageProcessor.java b/broker/src/main/java/org/apache/rocketmq/broker/processor/PullMessageProcessor.java index 5f0735e74c..5d947fd088 100644 --- a/broker/src/main/java/org/apache/rocketmq/broker/processor/PullMessageProcessor.java +++ b/broker/src/main/java/org/apache/rocketmq/broker/processor/PullMessageProcessor.java @@ -489,6 +489,19 @@ public class PullMessageProcessor implements NettyRequestProcessor { this.brokerController.getConsumerFilterManager()); } + if (brokerController.getBrokerConfig().isRejectPullConsumerEnable()) { + ConsumerGroupInfo consumerGroupInfo = + this.brokerController.getConsumerManager().getConsumerGroupInfo(requestHeader.getConsumerGroup()); + if (null == consumerGroupInfo || ConsumeType.CONSUME_ACTIVELY == consumerGroupInfo.getConsumeType()) { + if ((null == consumerGroupInfo || null == consumerGroupInfo.findChannel(channel)) + && !MixAll.isSysConsumerGroupPullMessage(requestHeader.getConsumerGroup())) { + response.setCode(ResponseCode.SUBSCRIPTION_NOT_EXIST); + response.setRemark("the consumer's group info not exist, or the pull consumer is rejected by server." + FAQUrl.suggestTodo(FAQUrl.SUBSCRIPTION_GROUP_NOT_EXIST)); + return response; + } + } + } + final MessageStore messageStore = brokerController.getMessageStore(); if (this.brokerController.getMessageStore() instanceof DefaultMessageStore) { DefaultMessageStore defaultMessageStore = (DefaultMessageStore) this.brokerController.getMessageStore(); diff --git a/broker/src/main/java/org/apache/rocketmq/broker/subscription/SubscriptionGroupManager.java b/broker/src/main/java/org/apache/rocketmq/broker/subscription/SubscriptionGroupManager.java index d85342e1a1..f3e669fb3e 100644 --- a/broker/src/main/java/org/apache/rocketmq/broker/subscription/SubscriptionGroupManager.java +++ b/broker/src/main/java/org/apache/rocketmq/broker/subscription/SubscriptionGroupManager.java @@ -260,7 +260,8 @@ public class SubscriptionGroupManager extends ConfigManager { public SubscriptionGroupConfig findSubscriptionGroupConfig(final String group) { SubscriptionGroupConfig subscriptionGroupConfig = getSubscriptionGroupConfig(group); if (null == subscriptionGroupConfig) { - if (brokerController.getBrokerConfig().isAutoCreateSubscriptionGroup() || MixAll.isSysConsumerGroup(group)) { + if (brokerController.getBrokerConfig().isAutoCreateSubscriptionGroup() + || MixAll.isSysConsumerGroupAndEnableCreate(group, brokerController.getBrokerConfig().isEnableCreateSysGroup())) { if (group.length() > Validators.CHARACTER_MAX_LENGTH || TopicValidator.isTopicOrGroupIllegal(group)) { return null; } diff --git a/common/src/main/java/org/apache/rocketmq/common/BrokerConfig.java b/common/src/main/java/org/apache/rocketmq/common/BrokerConfig.java index b7ec944505..44f5e1eff0 100644 --- a/common/src/main/java/org/apache/rocketmq/common/BrokerConfig.java +++ b/common/src/main/java/org/apache/rocketmq/common/BrokerConfig.java @@ -457,6 +457,8 @@ public class BrokerConfig extends BrokerIdentity { private boolean recallMessageEnable = false; + private boolean enableCreateSysGroup = true; + public String getConfigBlackList() { return configBlackList; } @@ -2016,4 +2018,12 @@ public class BrokerConfig extends BrokerIdentity { public void setRecallMessageEnable(boolean recallMessageEnable) { this.recallMessageEnable = recallMessageEnable; } + + public boolean isEnableCreateSysGroup() { + return enableCreateSysGroup; + } + + public void setEnableCreateSysGroup(boolean enableCreateSysGroup) { + this.enableCreateSysGroup = enableCreateSysGroup; + } } diff --git a/common/src/main/java/org/apache/rocketmq/common/MixAll.java b/common/src/main/java/org/apache/rocketmq/common/MixAll.java index c05a1d1926..aca9bd4ed7 100644 --- a/common/src/main/java/org/apache/rocketmq/common/MixAll.java +++ b/common/src/main/java/org/apache/rocketmq/common/MixAll.java @@ -178,6 +178,10 @@ public class MixAll { return consumerGroup.startsWith(CID_RMQ_SYS_PREFIX); } + public static boolean isSysConsumerGroupAndEnableCreate(final String consumerGroup, final boolean isEnableCreateSysGroup) { + return isEnableCreateSysGroup && isSysConsumerGroup(consumerGroup); + } + public static boolean isPredefinedGroup(final String consumerGroup) { return PREDEFINE_GROUP_SET.contains(consumerGroup); } @@ -530,7 +534,7 @@ public class MixAll { return path.normalize().toString(); } - public static boolean isSysConsumerGroupForNoColdReadLimit(String consumerGroup) { + public static boolean isSysConsumerGroupPullMessage(String consumerGroup) { if (DEFAULT_CONSUMER_GROUP.equals(consumerGroup) || TOOLS_CONSUMER_GROUP.equals(consumerGroup) || SCHEDULE_CONSUMER_GROUP.equals(consumerGroup) diff --git a/store/src/main/java/org/apache/rocketmq/store/DefaultMessageStore.java b/store/src/main/java/org/apache/rocketmq/store/DefaultMessageStore.java index 13af812c3f..fc6bc4213a 100644 --- a/store/src/main/java/org/apache/rocketmq/store/DefaultMessageStore.java +++ b/store/src/main/java/org/apache/rocketmq/store/DefaultMessageStore.java @@ -917,7 +917,7 @@ public class DefaultMessageStore implements MessageStore { continue; } - if (messageStoreConfig.isColdDataFlowControlEnable() && !MixAll.isSysConsumerGroupForNoColdReadLimit(group) && !selectResult.isInCache()) { + if (messageStoreConfig.isColdDataFlowControlEnable() && !MixAll.isSysConsumerGroupPullMessage(group) && !selectResult.isInCache()) { getResult.setColdDataSum(getResult.getColdDataSum() + sizePy); }