This is an automated email from the ASF dual-hosted git repository.

jinrongtong pushed a commit to branch develop
in repository https://gitbox.apache.org/repos/asf/rocketmq.git


The following commit(s) were added to refs/heads/develop by this push:
     new fb3b87da1b  [ISSUE #8984] Fix the broker switch enableMixedMessageType 
doesn't work
fb3b87da1b is described below

commit fb3b87da1bb3337039cc80d7a3fcf2dff4bd6ce3
Author: Liu Shengzhong <szliu0...@gmail.com>
AuthorDate: Mon Dec 2 10:01:51 2024 +0800

     [ISSUE #8984] Fix the broker switch enableMixedMessageType doesn't work
---
 .../broker/processor/AdminBrokerProcessor.java     | 29 +++++++++------
 .../broker/processor/AdminBrokerProcessorTest.java | 42 ++++++++++++++++++++++
 2 files changed, 61 insertions(+), 10 deletions(-)

diff --git 
a/broker/src/main/java/org/apache/rocketmq/broker/processor/AdminBrokerProcessor.java
 
b/broker/src/main/java/org/apache/rocketmq/broker/processor/AdminBrokerProcessor.java
index cc70e69a46..fc3b618273 100644
--- 
a/broker/src/main/java/org/apache/rocketmq/broker/processor/AdminBrokerProcessor.java
+++ 
b/broker/src/main/java/org/apache/rocketmq/broker/processor/AdminBrokerProcessor.java
@@ -76,6 +76,7 @@ import org.apache.rocketmq.common.MQVersion;
 import org.apache.rocketmq.common.MixAll;
 import org.apache.rocketmq.common.Pair;
 import org.apache.rocketmq.common.PlainAccessConfig;
+import org.apache.rocketmq.common.TopicAttributes;
 import org.apache.rocketmq.common.TopicConfig;
 import org.apache.rocketmq.common.UnlockCallback;
 import org.apache.rocketmq.common.UtilAll;
@@ -534,11 +535,15 @@ public class AdminBrokerProcessor implements 
NettyRequestProcessor {
             String attributesModification = requestHeader.getAttributes();
             
topicConfig.setAttributes(AttributeParser.parseToMap(attributesModification));
 
-            if (topicConfig.getTopicMessageType() == TopicMessageType.MIXED
-                && 
!brokerController.getBrokerConfig().isEnableMixedMessageType()) {
-                response.setCode(ResponseCode.SYSTEM_ERROR);
-                response.setRemark("MIXED message type is not supported.");
-                return response;
+            if (!brokerController.getBrokerConfig().isEnableMixedMessageType() 
&& topicConfig.getAttributes() != null) {
+                // Get attribute by key with prefix sign
+                String msgTypeAttrKey = AttributeParser.ATTR_ADD_PLUS_SIGN + 
TopicAttributes.TOPIC_MESSAGE_TYPE_ATTRIBUTE.getName();
+                String msgTypeAttrValue = 
topicConfig.getAttributes().get(msgTypeAttrKey);
+                if (msgTypeAttrValue != null && 
msgTypeAttrValue.equals(TopicMessageType.MIXED.getValue())) {
+                    response.setCode(ResponseCode.SYSTEM_ERROR);
+                    response.setRemark("MIXED message type is not supported.");
+                    return response;
+                }
             }
 
             if 
(topicConfig.equals(this.brokerController.getTopicConfigManager().getTopicConfigTable().get(topic)))
 {
@@ -609,11 +614,15 @@ public class AdminBrokerProcessor implements 
NettyRequestProcessor {
                         return response;
                     }
                 }
-                if (topicConfig.getTopicMessageType() == TopicMessageType.MIXED
-                    && 
!brokerController.getBrokerConfig().isEnableMixedMessageType()) {
-                    response.setCode(ResponseCode.SYSTEM_ERROR);
-                    response.setRemark("MIXED message type is not supported.");
-                    return response;
+                if 
(!brokerController.getBrokerConfig().isEnableMixedMessageType() && 
topicConfig.getAttributes() != null) {
+                    // Get attribute by key with prefix sign
+                    String msgTypeAttrKey = AttributeParser.ATTR_ADD_PLUS_SIGN 
+ TopicAttributes.TOPIC_MESSAGE_TYPE_ATTRIBUTE.getName();
+                    String msgTypeAttrValue = 
topicConfig.getAttributes().get(msgTypeAttrKey);
+                    if (msgTypeAttrValue != null && 
msgTypeAttrValue.equals(TopicMessageType.MIXED.getValue())) {
+                        response.setCode(ResponseCode.SYSTEM_ERROR);
+                        response.setRemark("MIXED message type is not 
supported.");
+                        return response;
+                    }
                 }
                 if 
(topicConfig.equals(this.brokerController.getTopicConfigManager().getTopicConfigTable().get(topic)))
 {
                     LOGGER.info("Broker receive request to update or create 
topic={}, but topicConfig has  no changes , so idempotent, caller address={}",
diff --git 
a/broker/src/test/java/org/apache/rocketmq/broker/processor/AdminBrokerProcessorTest.java
 
b/broker/src/test/java/org/apache/rocketmq/broker/processor/AdminBrokerProcessorTest.java
index d87f513355..48ddb89172 100644
--- 
a/broker/src/test/java/org/apache/rocketmq/broker/processor/AdminBrokerProcessorTest.java
+++ 
b/broker/src/test/java/org/apache/rocketmq/broker/processor/AdminBrokerProcessorTest.java
@@ -48,6 +48,7 @@ import org.apache.rocketmq.common.TopicConfig;
 import org.apache.rocketmq.common.TopicFilterType;
 import org.apache.rocketmq.common.TopicQueueId;
 import org.apache.rocketmq.common.action.Action;
+import org.apache.rocketmq.common.attribute.AttributeParser;
 import org.apache.rocketmq.common.constant.FIleReadaheadMode;
 import org.apache.rocketmq.common.constant.PermName;
 import org.apache.rocketmq.common.consumer.ConsumeFromWhere;
@@ -330,6 +331,19 @@ public class AdminBrokerProcessorTest {
         request = buildCreateTopicRequest(topic);
         response = adminBrokerProcessor.processRequest(handlerContext, 
request);
         assertThat(response.getCode()).isEqualTo(ResponseCode.SUCCESS);
+
+        // test deny MIXED topic type
+        brokerController.getBrokerConfig().setEnableMixedMessageType(false);
+        topic = "TEST_MIXED_TYPE";
+        Map<String, String> attributes = new HashMap<>();
+        attributes.put("+message.type", "MIXED");
+        request = buildCreateTopicRequest(topic, attributes);
+        response = adminBrokerProcessor.processRequest(handlerContext, 
request);
+        assertThat(response.getCode()).isEqualTo(ResponseCode.SYSTEM_ERROR);
+        // test allow MIXED topic type
+        brokerController.getBrokerConfig().setEnableMixedMessageType(true);
+        response = adminBrokerProcessor.processRequest(handlerContext, 
request);
+        assertThat(response.getCode()).isEqualTo(ResponseCode.SUCCESS);
     }
 
     @Test
@@ -355,6 +369,20 @@ public class AdminBrokerProcessorTest {
         //test no changes
         response = adminBrokerProcessor.processRequest(handlerContext, 
request);
         assertThat(response.getCode()).isEqualTo(ResponseCode.SUCCESS);
+
+        // test deny MIXED topic type
+        brokerController.getBrokerConfig().setEnableMixedMessageType(false);
+        topicList.add("TEST_MIXED_TYPE");
+        topicList.add("TEST_MIXED_TYPE1");
+        Map<String, String> attributes = new HashMap<>();
+        attributes.put("+message.type", "MIXED");
+        request = buildCreateTopicListRequest(topicList, attributes);
+        response = adminBrokerProcessor.processRequest(handlerContext, 
request);
+        assertThat(response.getCode()).isEqualTo(ResponseCode.SYSTEM_ERROR);
+        // test allow MIXED topic type
+        brokerController.getBrokerConfig().setEnableMixedMessageType(true);
+        response = adminBrokerProcessor.processRequest(handlerContext, 
request);
+        assertThat(response.getCode()).isEqualTo(ResponseCode.SUCCESS);
     }
 
     @Test
@@ -1312,18 +1340,29 @@ public class AdminBrokerProcessorTest {
     }
 
     private RemotingCommand buildCreateTopicRequest(String topic) {
+        return buildCreateTopicRequest(topic, null);
+    }
+
+    private RemotingCommand buildCreateTopicRequest(String topic, Map<String, 
String> attributes) {
         CreateTopicRequestHeader requestHeader = new 
CreateTopicRequestHeader();
         requestHeader.setTopic(topic);
         requestHeader.setTopicFilterType(TopicFilterType.SINGLE_TAG.name());
         requestHeader.setReadQueueNums(8);
         requestHeader.setWriteQueueNums(8);
         requestHeader.setPerm(PermName.PERM_READ | PermName.PERM_WRITE);
+        if (attributes != null) {
+            
requestHeader.setAttributes(AttributeParser.parseToString(attributes));
+        }
         RemotingCommand request = 
RemotingCommand.createRequestCommand(RequestCode.UPDATE_AND_CREATE_TOPIC, 
requestHeader);
         request.makeCustomHeaderToNet();
         return request;
     }
 
     private RemotingCommand buildCreateTopicListRequest(List<String> 
topicList) {
+        return buildCreateTopicListRequest(topicList, null);
+    }
+
+    private RemotingCommand buildCreateTopicListRequest(List<String> 
topicList, Map<String, String> attributes) {
         List<TopicConfig> topicConfigList = new ArrayList<>();
         for (String topic:topicList) {
             TopicConfig topicConfig = new TopicConfig(topic);
@@ -1333,6 +1372,9 @@ public class AdminBrokerProcessorTest {
             topicConfig.setPerm(PermName.PERM_READ | PermName.PERM_WRITE);
             topicConfig.setTopicSysFlag(0);
             topicConfig.setOrder(false);
+            if (attributes != null) {
+                topicConfig.setAttributes(new HashMap<>(attributes));
+            }
             topicConfigList.add(topicConfig);
         }
         RemotingCommand request = 
RemotingCommand.createRequestCommand(RequestCode.UPDATE_AND_CREATE_TOPIC_LIST, 
null);

Reply via email to