This is an automated email from the ASF dual-hosted git repository. jinrongtong pushed a commit to branch develop in repository https://gitbox.apache.org/repos/asf/rocketmq.git
The following commit(s) were added to refs/heads/develop by this push: new fb3b87da1b [ISSUE #8984] Fix the broker switch enableMixedMessageType doesn't work fb3b87da1b is described below commit fb3b87da1bb3337039cc80d7a3fcf2dff4bd6ce3 Author: Liu Shengzhong <szliu0...@gmail.com> AuthorDate: Mon Dec 2 10:01:51 2024 +0800 [ISSUE #8984] Fix the broker switch enableMixedMessageType doesn't work --- .../broker/processor/AdminBrokerProcessor.java | 29 +++++++++------ .../broker/processor/AdminBrokerProcessorTest.java | 42 ++++++++++++++++++++++ 2 files changed, 61 insertions(+), 10 deletions(-) diff --git a/broker/src/main/java/org/apache/rocketmq/broker/processor/AdminBrokerProcessor.java b/broker/src/main/java/org/apache/rocketmq/broker/processor/AdminBrokerProcessor.java index cc70e69a46..fc3b618273 100644 --- a/broker/src/main/java/org/apache/rocketmq/broker/processor/AdminBrokerProcessor.java +++ b/broker/src/main/java/org/apache/rocketmq/broker/processor/AdminBrokerProcessor.java @@ -76,6 +76,7 @@ import org.apache.rocketmq.common.MQVersion; import org.apache.rocketmq.common.MixAll; import org.apache.rocketmq.common.Pair; import org.apache.rocketmq.common.PlainAccessConfig; +import org.apache.rocketmq.common.TopicAttributes; import org.apache.rocketmq.common.TopicConfig; import org.apache.rocketmq.common.UnlockCallback; import org.apache.rocketmq.common.UtilAll; @@ -534,11 +535,15 @@ public class AdminBrokerProcessor implements NettyRequestProcessor { String attributesModification = requestHeader.getAttributes(); topicConfig.setAttributes(AttributeParser.parseToMap(attributesModification)); - if (topicConfig.getTopicMessageType() == TopicMessageType.MIXED - && !brokerController.getBrokerConfig().isEnableMixedMessageType()) { - response.setCode(ResponseCode.SYSTEM_ERROR); - response.setRemark("MIXED message type is not supported."); - return response; + if (!brokerController.getBrokerConfig().isEnableMixedMessageType() && topicConfig.getAttributes() != null) { + // Get attribute by key with prefix sign + String msgTypeAttrKey = AttributeParser.ATTR_ADD_PLUS_SIGN + TopicAttributes.TOPIC_MESSAGE_TYPE_ATTRIBUTE.getName(); + String msgTypeAttrValue = topicConfig.getAttributes().get(msgTypeAttrKey); + if (msgTypeAttrValue != null && msgTypeAttrValue.equals(TopicMessageType.MIXED.getValue())) { + response.setCode(ResponseCode.SYSTEM_ERROR); + response.setRemark("MIXED message type is not supported."); + return response; + } } if (topicConfig.equals(this.brokerController.getTopicConfigManager().getTopicConfigTable().get(topic))) { @@ -609,11 +614,15 @@ public class AdminBrokerProcessor implements NettyRequestProcessor { return response; } } - if (topicConfig.getTopicMessageType() == TopicMessageType.MIXED - && !brokerController.getBrokerConfig().isEnableMixedMessageType()) { - response.setCode(ResponseCode.SYSTEM_ERROR); - response.setRemark("MIXED message type is not supported."); - return response; + if (!brokerController.getBrokerConfig().isEnableMixedMessageType() && topicConfig.getAttributes() != null) { + // Get attribute by key with prefix sign + String msgTypeAttrKey = AttributeParser.ATTR_ADD_PLUS_SIGN + TopicAttributes.TOPIC_MESSAGE_TYPE_ATTRIBUTE.getName(); + String msgTypeAttrValue = topicConfig.getAttributes().get(msgTypeAttrKey); + if (msgTypeAttrValue != null && msgTypeAttrValue.equals(TopicMessageType.MIXED.getValue())) { + response.setCode(ResponseCode.SYSTEM_ERROR); + response.setRemark("MIXED message type is not supported."); + return response; + } } if (topicConfig.equals(this.brokerController.getTopicConfigManager().getTopicConfigTable().get(topic))) { LOGGER.info("Broker receive request to update or create topic={}, but topicConfig has no changes , so idempotent, caller address={}", diff --git a/broker/src/test/java/org/apache/rocketmq/broker/processor/AdminBrokerProcessorTest.java b/broker/src/test/java/org/apache/rocketmq/broker/processor/AdminBrokerProcessorTest.java index d87f513355..48ddb89172 100644 --- a/broker/src/test/java/org/apache/rocketmq/broker/processor/AdminBrokerProcessorTest.java +++ b/broker/src/test/java/org/apache/rocketmq/broker/processor/AdminBrokerProcessorTest.java @@ -48,6 +48,7 @@ import org.apache.rocketmq.common.TopicConfig; import org.apache.rocketmq.common.TopicFilterType; import org.apache.rocketmq.common.TopicQueueId; import org.apache.rocketmq.common.action.Action; +import org.apache.rocketmq.common.attribute.AttributeParser; import org.apache.rocketmq.common.constant.FIleReadaheadMode; import org.apache.rocketmq.common.constant.PermName; import org.apache.rocketmq.common.consumer.ConsumeFromWhere; @@ -330,6 +331,19 @@ public class AdminBrokerProcessorTest { request = buildCreateTopicRequest(topic); response = adminBrokerProcessor.processRequest(handlerContext, request); assertThat(response.getCode()).isEqualTo(ResponseCode.SUCCESS); + + // test deny MIXED topic type + brokerController.getBrokerConfig().setEnableMixedMessageType(false); + topic = "TEST_MIXED_TYPE"; + Map<String, String> attributes = new HashMap<>(); + attributes.put("+message.type", "MIXED"); + request = buildCreateTopicRequest(topic, attributes); + response = adminBrokerProcessor.processRequest(handlerContext, request); + assertThat(response.getCode()).isEqualTo(ResponseCode.SYSTEM_ERROR); + // test allow MIXED topic type + brokerController.getBrokerConfig().setEnableMixedMessageType(true); + response = adminBrokerProcessor.processRequest(handlerContext, request); + assertThat(response.getCode()).isEqualTo(ResponseCode.SUCCESS); } @Test @@ -355,6 +369,20 @@ public class AdminBrokerProcessorTest { //test no changes response = adminBrokerProcessor.processRequest(handlerContext, request); assertThat(response.getCode()).isEqualTo(ResponseCode.SUCCESS); + + // test deny MIXED topic type + brokerController.getBrokerConfig().setEnableMixedMessageType(false); + topicList.add("TEST_MIXED_TYPE"); + topicList.add("TEST_MIXED_TYPE1"); + Map<String, String> attributes = new HashMap<>(); + attributes.put("+message.type", "MIXED"); + request = buildCreateTopicListRequest(topicList, attributes); + response = adminBrokerProcessor.processRequest(handlerContext, request); + assertThat(response.getCode()).isEqualTo(ResponseCode.SYSTEM_ERROR); + // test allow MIXED topic type + brokerController.getBrokerConfig().setEnableMixedMessageType(true); + response = adminBrokerProcessor.processRequest(handlerContext, request); + assertThat(response.getCode()).isEqualTo(ResponseCode.SUCCESS); } @Test @@ -1312,18 +1340,29 @@ public class AdminBrokerProcessorTest { } private RemotingCommand buildCreateTopicRequest(String topic) { + return buildCreateTopicRequest(topic, null); + } + + private RemotingCommand buildCreateTopicRequest(String topic, Map<String, String> attributes) { CreateTopicRequestHeader requestHeader = new CreateTopicRequestHeader(); requestHeader.setTopic(topic); requestHeader.setTopicFilterType(TopicFilterType.SINGLE_TAG.name()); requestHeader.setReadQueueNums(8); requestHeader.setWriteQueueNums(8); requestHeader.setPerm(PermName.PERM_READ | PermName.PERM_WRITE); + if (attributes != null) { + requestHeader.setAttributes(AttributeParser.parseToString(attributes)); + } RemotingCommand request = RemotingCommand.createRequestCommand(RequestCode.UPDATE_AND_CREATE_TOPIC, requestHeader); request.makeCustomHeaderToNet(); return request; } private RemotingCommand buildCreateTopicListRequest(List<String> topicList) { + return buildCreateTopicListRequest(topicList, null); + } + + private RemotingCommand buildCreateTopicListRequest(List<String> topicList, Map<String, String> attributes) { List<TopicConfig> topicConfigList = new ArrayList<>(); for (String topic:topicList) { TopicConfig topicConfig = new TopicConfig(topic); @@ -1333,6 +1372,9 @@ public class AdminBrokerProcessorTest { topicConfig.setPerm(PermName.PERM_READ | PermName.PERM_WRITE); topicConfig.setTopicSysFlag(0); topicConfig.setOrder(false); + if (attributes != null) { + topicConfig.setAttributes(new HashMap<>(attributes)); + } topicConfigList.add(topicConfig); } RemotingCommand request = RemotingCommand.createRequestCommand(RequestCode.UPDATE_AND_CREATE_TOPIC_LIST, null);