absolute8511 commented on code in PR #8267:
URL: https://github.com/apache/rocketmq/pull/8267#discussion_r1708497551


##########
broker/src/main/java/org/apache/rocketmq/broker/processor/AdminBrokerProcessor.java:
##########
@@ -536,6 +539,84 @@ private synchronized RemotingCommand 
updateAndCreateTopic(ChannelHandlerContext
         return response;
     }
 
+    private synchronized RemotingCommand 
updateAndCreateTopicList(ChannelHandlerContext ctx,
+        RemotingCommand request) throws RemotingCommandException {
+        long startTime = System.currentTimeMillis();
+
+        final CreateTopicListRequestBody requestBody = 
CreateTopicListRequestBody.decode(request.getBody(), 
CreateTopicListRequestBody.class);
+        List<TopicConfig> topicConfigList = requestBody.getTopicConfigList();
+
+        StringBuilder builder = new StringBuilder();
+        for (TopicConfig topicConfig : topicConfigList) {
+            builder.append(topicConfig.getTopicName()).append(";");
+        }
+        String topicNames = builder.toString();
+        LOGGER.info("AdminBrokerProcessor#updateAndCreateTopicList: 
topicNames: {}, called by {}", topicNames, 
RemotingHelper.parseChannelRemoteAddr(ctx.channel()));
+
+        final RemotingCommand response = 
RemotingCommand.createResponseCommand(null);
+
+        long executionTime;
+
+        try {
+            // Valid topics
+            for (TopicConfig topicConfig : topicConfigList) {
+                String topic = topicConfig.getTopicName();
+                TopicValidator.ValidateTopicResult result = 
TopicValidator.validateTopic(topic);
+                if (!result.isValid()) {
+                    response.setCode(ResponseCode.SYSTEM_ERROR);
+                    response.setRemark(result.getRemark());
+                    return response;
+                }
+                if 
(brokerController.getBrokerConfig().isValidateSystemTopicWhenUpdateTopic()) {
+                    if (TopicValidator.isSystemTopic(topic)) {
+                        response.setCode(ResponseCode.SYSTEM_ERROR);
+                        response.setRemark("The topic[" + topic + "] is 
conflict with system topic.");
+                        return response;
+                    }
+                }
+                if (topicConfig.getTopicMessageType() == TopicMessageType.MIXED
+                    && 
!brokerController.getBrokerConfig().isEnableMixedMessageType()) {
+                    response.setCode(ResponseCode.SYSTEM_ERROR);
+                    response.setRemark("MIXED message type is not supported.");
+                    return response;
+                }
+                if 
(topicConfig.equals(this.brokerController.getTopicConfigManager().getTopicConfigTable().get(topic)))
 {
+                    LOGGER.info("Broker receive request to update or create 
topic={}, but topicConfig has  no changes , so idempotent, caller address={}",
+                        topic, 
RemotingHelper.parseChannelRemoteAddr(ctx.channel()));
+                    response.setCode(ResponseCode.SUCCESS);
+                    return response;

Review Comment:
   we should continue here instead return, otherwise we will left the other 
topics uncreated after return success.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@rocketmq.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to