This is an automated email from the ASF dual-hosted git repository. lizhimin pushed a commit to branch develop in repository https://gitbox.apache.org/repos/asf/rocketmq.git
The following commit(s) were added to refs/heads/develop by this push: new 2219fd8c1a [ISSUE #9244] Avoid writing dirty data in consumption mode (#9245) 2219fd8c1a is described below commit 2219fd8c1aee3bc42f6a66394e0e4cf131006a26 Author: hqbfz <125714719+3424672...@users.noreply.github.com> AuthorDate: Mon Mar 17 17:32:41 2025 +0800 [ISSUE #9244] Avoid writing dirty data in consumption mode (#9245) --- .../broker/processor/QueryAssignmentProcessor.java | 14 ++++++++++++++ 1 file changed, 14 insertions(+) diff --git a/broker/src/main/java/org/apache/rocketmq/broker/processor/QueryAssignmentProcessor.java b/broker/src/main/java/org/apache/rocketmq/broker/processor/QueryAssignmentProcessor.java index 2f4cb7b15f..d29e3d0e06 100644 --- a/broker/src/main/java/org/apache/rocketmq/broker/processor/QueryAssignmentProcessor.java +++ b/broker/src/main/java/org/apache/rocketmq/broker/processor/QueryAssignmentProcessor.java @@ -33,6 +33,7 @@ import org.apache.rocketmq.client.consumer.AllocateMessageQueueStrategy; import org.apache.rocketmq.client.consumer.rebalance.AllocateMessageQueueAveragely; import org.apache.rocketmq.client.consumer.rebalance.AllocateMessageQueueAveragelyByCircle; import org.apache.rocketmq.common.MixAll; +import org.apache.rocketmq.common.TopicConfig; import org.apache.rocketmq.common.constant.LoggerName; import org.apache.rocketmq.common.message.MessageQueue; import org.apache.rocketmq.common.message.MessageQueueAssignment; @@ -49,6 +50,7 @@ import org.apache.rocketmq.remoting.protocol.body.QueryAssignmentRequestBody; import org.apache.rocketmq.remoting.protocol.body.QueryAssignmentResponseBody; import org.apache.rocketmq.remoting.protocol.body.SetMessageRequestModeRequestBody; import org.apache.rocketmq.remoting.protocol.heartbeat.MessageModel; +import org.apache.rocketmq.remoting.protocol.subscription.SubscriptionGroupConfig; public class QueryAssignmentProcessor implements NettyRequestProcessor { private static final Logger log = LoggerFactory.getLogger(LoggerName.BROKER_LOGGER_NAME); @@ -314,8 +316,20 @@ public class QueryAssignmentProcessor implements NettyRequestProcessor { response.setRemark("retry topic is not allowed to set mode"); return response; } + TopicConfig topicConfig = this.brokerController.getTopicConfigManager().selectTopicConfig(topic); + if (null == topicConfig) { + response.setCode(ResponseCode.TOPIC_NOT_EXIST); + response.setRemark("topic[" + topic + "] not exist"); + return response; + } final String consumerGroup = requestBody.getConsumerGroup(); + SubscriptionGroupConfig groupConfig = this.brokerController.getSubscriptionGroupManager().findSubscriptionGroupConfig(consumerGroup); + if (null == groupConfig) { + response.setCode(ResponseCode.SUBSCRIPTION_GROUP_NOT_EXIST); + response.setRemark("subscription group does not exist"); + return response; + } this.messageRequestModeManager.setMessageRequestMode(topic, consumerGroup, requestBody); this.messageRequestModeManager.persist();