This is an automated email from the ASF dual-hosted git repository.

lizhimin pushed a commit to branch develop
in repository https://gitbox.apache.org/repos/asf/rocketmq.git


The following commit(s) were added to refs/heads/develop by this push:
     new 2219fd8c1a [ISSUE #9244] Avoid writing dirty data in consumption mode 
(#9245)
2219fd8c1a is described below

commit 2219fd8c1aee3bc42f6a66394e0e4cf131006a26
Author: hqbfz <125714719+3424672...@users.noreply.github.com>
AuthorDate: Mon Mar 17 17:32:41 2025 +0800

    [ISSUE #9244] Avoid writing dirty data in consumption mode (#9245)
---
 .../broker/processor/QueryAssignmentProcessor.java         | 14 ++++++++++++++
 1 file changed, 14 insertions(+)

diff --git 
a/broker/src/main/java/org/apache/rocketmq/broker/processor/QueryAssignmentProcessor.java
 
b/broker/src/main/java/org/apache/rocketmq/broker/processor/QueryAssignmentProcessor.java
index 2f4cb7b15f..d29e3d0e06 100644
--- 
a/broker/src/main/java/org/apache/rocketmq/broker/processor/QueryAssignmentProcessor.java
+++ 
b/broker/src/main/java/org/apache/rocketmq/broker/processor/QueryAssignmentProcessor.java
@@ -33,6 +33,7 @@ import 
org.apache.rocketmq.client.consumer.AllocateMessageQueueStrategy;
 import 
org.apache.rocketmq.client.consumer.rebalance.AllocateMessageQueueAveragely;
 import 
org.apache.rocketmq.client.consumer.rebalance.AllocateMessageQueueAveragelyByCircle;
 import org.apache.rocketmq.common.MixAll;
+import org.apache.rocketmq.common.TopicConfig;
 import org.apache.rocketmq.common.constant.LoggerName;
 import org.apache.rocketmq.common.message.MessageQueue;
 import org.apache.rocketmq.common.message.MessageQueueAssignment;
@@ -49,6 +50,7 @@ import 
org.apache.rocketmq.remoting.protocol.body.QueryAssignmentRequestBody;
 import org.apache.rocketmq.remoting.protocol.body.QueryAssignmentResponseBody;
 import 
org.apache.rocketmq.remoting.protocol.body.SetMessageRequestModeRequestBody;
 import org.apache.rocketmq.remoting.protocol.heartbeat.MessageModel;
+import 
org.apache.rocketmq.remoting.protocol.subscription.SubscriptionGroupConfig;
 
 public class QueryAssignmentProcessor implements NettyRequestProcessor {
     private static final Logger log = 
LoggerFactory.getLogger(LoggerName.BROKER_LOGGER_NAME);
@@ -314,8 +316,20 @@ public class QueryAssignmentProcessor implements 
NettyRequestProcessor {
             response.setRemark("retry topic is not allowed to set mode");
             return response;
         }
+        TopicConfig topicConfig = 
this.brokerController.getTopicConfigManager().selectTopicConfig(topic);
+        if (null == topicConfig) {
+            response.setCode(ResponseCode.TOPIC_NOT_EXIST);
+            response.setRemark("topic[" + topic + "] not exist");
+            return response;
+        }
 
         final String consumerGroup = requestBody.getConsumerGroup();
+        SubscriptionGroupConfig groupConfig = 
this.brokerController.getSubscriptionGroupManager().findSubscriptionGroupConfig(consumerGroup);
+        if (null == groupConfig) {
+            response.setCode(ResponseCode.SUBSCRIPTION_GROUP_NOT_EXIST);
+            response.setRemark("subscription group does not exist");
+            return response;
+        }
 
         this.messageRequestModeManager.setMessageRequestMode(topic, 
consumerGroup, requestBody);
         this.messageRequestModeManager.persist();

Reply via email to