This is an automated email from the ASF dual-hosted git repository.

fuyou pushed a commit to branch develop
in repository https://gitbox.apache.org/repos/asf/rocketmq.git


The following commit(s) were added to refs/heads/develop by this push:
     new 4be8fd4372 [ISSUE #8265] Implement Batch Creation of Topics in 
RocketMQ Admin (#8267)
4be8fd4372 is described below

commit 4be8fd43720c8635fe135404a7fd000c00bb2a15
Author: guyinyou <36399867+guyin...@users.noreply.github.com>
AuthorDate: Fri Jun 7 10:28:36 2024 +0800

    [ISSUE #8265] Implement Batch Creation of Topics in RocketMQ Admin (#8267)
    
    * add UPDATE_AND_CREATE_TOPIC_LIST
    
    * support creating or updating topic config in batch
    
    
    ---------
    
    Co-authored-by: guyinyou <guyinyou....@alibaba-inc.com>
    Co-authored-by: gaoyang.cgy <gaoyang....@alibaba-inc.com>
---
 .../broker/processor/AdminBrokerProcessor.java     |  81 ++++++++++++++
 .../rocketmq/broker/topic/TopicConfigManager.java  |  11 +-
 .../rocketmq/client/impl/MQClientAPIImpl.java      |  20 ++++
 .../rocketmq/client/impl/MQClientAPIImplTest.java  |  23 ++++
 .../rocketmq/remoting/protocol/RequestCode.java    |   1 +
 .../protocol/body/CreateTopicListRequestBody.java  |  42 ++++++++
 .../header/CreateTopicListRequestHeader.java       |  31 ++++++
 .../rocketmq/tools/admin/DefaultMQAdminExt.java    |   5 +
 .../tools/admin/DefaultMQAdminExtImpl.java         |   6 ++
 .../apache/rocketmq/tools/admin/MQAdminExt.java    |   3 +
 .../rocketmq/tools/command/MQAdminStartup.java     |   2 +
 .../command/topic/UpdateTopicListSubCommand.java   | 118 +++++++++++++++++++++
 .../topic/UpdateTopicListSubCommandTest.java       |  41 +++++++
 13 files changed, 383 insertions(+), 1 deletion(-)

diff --git 
a/broker/src/main/java/org/apache/rocketmq/broker/processor/AdminBrokerProcessor.java
 
b/broker/src/main/java/org/apache/rocketmq/broker/processor/AdminBrokerProcessor.java
index 40a7a461e8..44bf2a4813 100644
--- 
a/broker/src/main/java/org/apache/rocketmq/broker/processor/AdminBrokerProcessor.java
+++ 
b/broker/src/main/java/org/apache/rocketmq/broker/processor/AdminBrokerProcessor.java
@@ -116,6 +116,7 @@ import 
org.apache.rocketmq.remoting.protocol.body.Connection;
 import org.apache.rocketmq.remoting.protocol.body.ConsumeQueueData;
 import org.apache.rocketmq.remoting.protocol.body.ConsumeStatsList;
 import org.apache.rocketmq.remoting.protocol.body.ConsumerConnection;
+import org.apache.rocketmq.remoting.protocol.body.CreateTopicListRequestBody;
 import org.apache.rocketmq.remoting.protocol.body.EpochEntryCache;
 import org.apache.rocketmq.remoting.protocol.body.GroupList;
 import org.apache.rocketmq.remoting.protocol.body.HARuntimeInfo;
@@ -243,6 +244,8 @@ public class AdminBrokerProcessor implements 
NettyRequestProcessor {
         switch (request.getCode()) {
             case RequestCode.UPDATE_AND_CREATE_TOPIC:
                 return this.updateAndCreateTopic(ctx, request);
+            case RequestCode.UPDATE_AND_CREATE_TOPIC_LIST:
+                return this.updateAndCreateTopicList(ctx, request);
             case RequestCode.DELETE_TOPIC_IN_BROKER:
                 return this.deleteTopic(ctx, request);
             case RequestCode.GET_ALL_TOPIC_CONFIG:
@@ -536,6 +539,84 @@ public class AdminBrokerProcessor implements 
NettyRequestProcessor {
         return response;
     }
 
+    private synchronized RemotingCommand 
updateAndCreateTopicList(ChannelHandlerContext ctx,
+        RemotingCommand request) throws RemotingCommandException {
+        long startTime = System.currentTimeMillis();
+
+        final CreateTopicListRequestBody requestBody = 
CreateTopicListRequestBody.decode(request.getBody(), 
CreateTopicListRequestBody.class);
+        List<TopicConfig> topicConfigList = requestBody.getTopicConfigList();
+
+        StringBuilder builder = new StringBuilder();
+        for (TopicConfig topicConfig : topicConfigList) {
+            builder.append(topicConfig.getTopicName()).append(";");
+        }
+        String topicNames = builder.toString();
+        LOGGER.info("AdminBrokerProcessor#updateAndCreateTopicList: 
topicNames: {}, called by {}", topicNames, 
RemotingHelper.parseChannelRemoteAddr(ctx.channel()));
+
+        final RemotingCommand response = 
RemotingCommand.createResponseCommand(null);
+
+        long executionTime;
+
+        try {
+            // Valid topics
+            for (TopicConfig topicConfig : topicConfigList) {
+                String topic = topicConfig.getTopicName();
+                TopicValidator.ValidateTopicResult result = 
TopicValidator.validateTopic(topic);
+                if (!result.isValid()) {
+                    response.setCode(ResponseCode.SYSTEM_ERROR);
+                    response.setRemark(result.getRemark());
+                    return response;
+                }
+                if 
(brokerController.getBrokerConfig().isValidateSystemTopicWhenUpdateTopic()) {
+                    if (TopicValidator.isSystemTopic(topic)) {
+                        response.setCode(ResponseCode.SYSTEM_ERROR);
+                        response.setRemark("The topic[" + topic + "] is 
conflict with system topic.");
+                        return response;
+                    }
+                }
+                if (topicConfig.getTopicMessageType() == TopicMessageType.MIXED
+                    && 
!brokerController.getBrokerConfig().isEnableMixedMessageType()) {
+                    response.setCode(ResponseCode.SYSTEM_ERROR);
+                    response.setRemark("MIXED message type is not supported.");
+                    return response;
+                }
+                if 
(topicConfig.equals(this.brokerController.getTopicConfigManager().getTopicConfigTable().get(topic)))
 {
+                    LOGGER.info("Broker receive request to update or create 
topic={}, but topicConfig has  no changes , so idempotent, caller address={}",
+                        topic, 
RemotingHelper.parseChannelRemoteAddr(ctx.channel()));
+                    response.setCode(ResponseCode.SUCCESS);
+                    return response;
+                }
+            }
+
+            
this.brokerController.getTopicConfigManager().updateTopicConfigList(topicConfigList);
+            if 
(brokerController.getBrokerConfig().isEnableSingleTopicRegister()) {
+                for (TopicConfig topicConfig : topicConfigList) {
+                    this.brokerController.registerSingleTopicAll(topicConfig);
+                }
+            } else {
+                
this.brokerController.registerIncrementBrokerData(topicConfigList, 
this.brokerController.getTopicConfigManager().getDataVersion());
+            }
+            response.setCode(ResponseCode.SUCCESS);
+        } catch (Exception e) {
+            LOGGER.error("Update / create topic failed for [{}]", request, e);
+            response.setCode(ResponseCode.SYSTEM_ERROR);
+            response.setRemark(e.getMessage());
+            return response;
+        }
+        finally {
+            executionTime = System.currentTimeMillis() - startTime;
+            InvocationStatus status = response.getCode() == 
ResponseCode.SUCCESS ?
+                InvocationStatus.SUCCESS : InvocationStatus.FAILURE;
+            Attributes attributes = BrokerMetricsManager.newAttributesBuilder()
+                .put(LABEL_INVOCATION_STATUS, status.getName())
+                .put(LABEL_IS_SYSTEM, TopicValidator.isSystemTopic(topicNames))
+                .build();
+            BrokerMetricsManager.topicCreateExecuteTime.record(executionTime, 
attributes);
+        }
+        LOGGER.info("executionTime of all topics:{} is {} ms" , topicNames, 
executionTime);
+        return response;
+    }
+
     private synchronized RemotingCommand 
updateAndCreateStaticTopic(ChannelHandlerContext ctx,
         RemotingCommand request) throws RemotingCommandException {
         final RemotingCommand response = 
RemotingCommand.createResponseCommand(null);
diff --git 
a/broker/src/main/java/org/apache/rocketmq/broker/topic/TopicConfigManager.java 
b/broker/src/main/java/org/apache/rocketmq/broker/topic/TopicConfigManager.java
index 1ed9cbab5f..d7c06180e9 100644
--- 
a/broker/src/main/java/org/apache/rocketmq/broker/topic/TopicConfigManager.java
+++ 
b/broker/src/main/java/org/apache/rocketmq/broker/topic/TopicConfigManager.java
@@ -18,6 +18,7 @@ package org.apache.rocketmq.broker.topic;
 
 import java.util.HashMap;
 import java.util.Iterator;
+import java.util.List;
 import java.util.Map;
 import java.util.Map.Entry;
 import java.util.Set;
@@ -490,7 +491,7 @@ public class TopicConfigManager extends ConfigManager {
         }
     }
 
-    public void updateTopicConfig(final TopicConfig topicConfig) {
+    private void updateSingleTopicConfigWithoutPersist(final TopicConfig 
topicConfig) {
         checkNotNull(topicConfig, "topicConfig shouldn't be null");
 
         Map<String, String> newAttributes = request(topicConfig);
@@ -515,10 +516,18 @@ public class TopicConfigManager extends ConfigManager {
 
         long stateMachineVersion = brokerController.getMessageStore() != null 
? brokerController.getMessageStore().getStateMachineVersion() : 0;
         dataVersion.nextVersion(stateMachineVersion);
+    }
 
+    public void updateTopicConfig(final TopicConfig topicConfig) {
+        updateSingleTopicConfigWithoutPersist(topicConfig);
         this.persist(topicConfig.getTopicName(), topicConfig);
     }
 
+    public void updateTopicConfigList(final List<TopicConfig> topicConfigList) 
{
+        topicConfigList.forEach(this::updateSingleTopicConfigWithoutPersist);
+        this.persist();
+    }
+
     private synchronized void updateTieredStoreTopicMetadata(final TopicConfig 
topicConfig, Map<String, String> newAttributes) {
         if (!(brokerController.getMessageStore() instanceof 
TieredMessageStore)) {
             if 
(newAttributes.get(TopicAttributes.TOPIC_RESERVE_TIME_ATTRIBUTE.getName()) != 
null) {
diff --git 
a/client/src/main/java/org/apache/rocketmq/client/impl/MQClientAPIImpl.java 
b/client/src/main/java/org/apache/rocketmq/client/impl/MQClientAPIImpl.java
index 816ae877ac..f3d7e7c70f 100644
--- a/client/src/main/java/org/apache/rocketmq/client/impl/MQClientAPIImpl.java
+++ b/client/src/main/java/org/apache/rocketmq/client/impl/MQClientAPIImpl.java
@@ -118,6 +118,7 @@ import 
org.apache.rocketmq.remoting.protocol.body.ConsumeMessageDirectlyResult;
 import org.apache.rocketmq.remoting.protocol.body.ConsumeStatsList;
 import org.apache.rocketmq.remoting.protocol.body.ConsumerConnection;
 import org.apache.rocketmq.remoting.protocol.body.ConsumerRunningInfo;
+import org.apache.rocketmq.remoting.protocol.body.CreateTopicListRequestBody;
 import org.apache.rocketmq.remoting.protocol.body.EpochEntryCache;
 import org.apache.rocketmq.remoting.protocol.body.GetConsumerStatusBody;
 import org.apache.rocketmq.remoting.protocol.body.GroupList;
@@ -150,6 +151,7 @@ import 
org.apache.rocketmq.remoting.protocol.header.ConsumeMessageDirectlyResult
 import 
org.apache.rocketmq.remoting.protocol.header.ConsumerSendMsgBackRequestHeader;
 import 
org.apache.rocketmq.remoting.protocol.header.CreateAccessConfigRequestHeader;
 import org.apache.rocketmq.remoting.protocol.header.CreateAclRequestHeader;
+import 
org.apache.rocketmq.remoting.protocol.header.CreateTopicListRequestHeader;
 import org.apache.rocketmq.remoting.protocol.header.CreateTopicRequestHeader;
 import org.apache.rocketmq.remoting.protocol.header.CreateUserRequestHeader;
 import 
org.apache.rocketmq.remoting.protocol.header.DeleteAccessConfigRequestHeader;
@@ -430,6 +432,24 @@ public class MQClientAPIImpl implements 
NameServerUpdateCallback {
         throw new MQClientException(response.getCode(), response.getRemark());
     }
 
+    public void createTopicList(final String address, final List<TopicConfig> 
topicConfigList, final long timeoutMillis)
+        throws InterruptedException, RemotingException, MQClientException {
+        CreateTopicListRequestHeader requestHeader = new 
CreateTopicListRequestHeader();
+        CreateTopicListRequestBody requestBody = new 
CreateTopicListRequestBody(topicConfigList);
+
+        RemotingCommand request = 
RemotingCommand.createRequestCommand(RequestCode.UPDATE_AND_CREATE_TOPIC_LIST, 
requestHeader);
+        request.setBody(requestBody.encode());
+
+        RemotingCommand response = this.remotingClient.invokeSync(
+            MixAll.brokerVIPChannel(this.clientConfig.isVipChannelEnabled(), 
address), request, timeoutMillis);
+        assert response != null;
+        if (response.getCode() == ResponseCode.SUCCESS) {
+            return;
+        }
+
+        throw new MQClientException(response.getCode(), response.getRemark());
+    }
+
     public void createPlainAccessConfig(final String addr, final 
PlainAccessConfig plainAccessConfig,
         final long timeoutMillis)
         throws RemotingException, InterruptedException, MQClientException {
diff --git 
a/client/src/test/java/org/apache/rocketmq/client/impl/MQClientAPIImplTest.java 
b/client/src/test/java/org/apache/rocketmq/client/impl/MQClientAPIImplTest.java
index 97d8d04e64..b0876c7c0d 100644
--- 
a/client/src/test/java/org/apache/rocketmq/client/impl/MQClientAPIImplTest.java
+++ 
b/client/src/test/java/org/apache/rocketmq/client/impl/MQClientAPIImplTest.java
@@ -19,6 +19,7 @@ package org.apache.rocketmq.client.impl;
 import java.lang.reflect.Field;
 import java.net.InetSocketAddress;
 import java.util.Collections;
+import java.util.LinkedList;
 import java.util.List;
 import java.util.Set;
 import java.util.concurrent.CountDownLatch;
@@ -1068,4 +1069,26 @@ public class MQClientAPIImplTest {
         int topicCnt = mqClientAPI.addWritePermOfBroker("127.0.0.1", 
"default-broker", 1000);
         assertThat(topicCnt).isEqualTo(7);
     }
+
+    @Test
+    public void testCreateTopicList_Success() throws RemotingException, 
InterruptedException, MQClientException {
+        doAnswer((Answer<RemotingCommand>) mock -> {
+            RemotingCommand request = mock.getArgument(1);
+
+            RemotingCommand response = 
RemotingCommand.createResponseCommand(null);
+            response.setCode(ResponseCode.SUCCESS);
+            response.setOpaque(request.getOpaque());
+            return response;
+        }).when(remotingClient).invokeSync(anyString(), 
any(RemotingCommand.class), anyLong());
+
+        final List<TopicConfig> topicConfigList = new LinkedList<>();
+        for (int i = 0; i < 16; i++) {
+            TopicConfig topicConfig = new TopicConfig();
+            topicConfig.setTopicName("Topic" + i);
+            topicConfigList.add(topicConfig);
+        }
+
+        mqClientAPI.createTopicList(brokerAddr, topicConfigList, 10000);
+    }
+
 }
diff --git 
a/remoting/src/main/java/org/apache/rocketmq/remoting/protocol/RequestCode.java 
b/remoting/src/main/java/org/apache/rocketmq/remoting/protocol/RequestCode.java
index 1de724e0f1..3be22fc56b 100644
--- 
a/remoting/src/main/java/org/apache/rocketmq/remoting/protocol/RequestCode.java
+++ 
b/remoting/src/main/java/org/apache/rocketmq/remoting/protocol/RequestCode.java
@@ -28,6 +28,7 @@ public class RequestCode {
     public static final int QUERY_CONSUMER_OFFSET = 14;
     public static final int UPDATE_CONSUMER_OFFSET = 15;
     public static final int UPDATE_AND_CREATE_TOPIC = 17;
+    public static final int UPDATE_AND_CREATE_TOPIC_LIST = 18;
     public static final int GET_ALL_TOPIC_CONFIG = 21;
     public static final int GET_TOPIC_CONFIG_LIST = 22;
 
diff --git 
a/remoting/src/main/java/org/apache/rocketmq/remoting/protocol/body/CreateTopicListRequestBody.java
 
b/remoting/src/main/java/org/apache/rocketmq/remoting/protocol/body/CreateTopicListRequestBody.java
new file mode 100644
index 0000000000..a72be31ac9
--- /dev/null
+++ 
b/remoting/src/main/java/org/apache/rocketmq/remoting/protocol/body/CreateTopicListRequestBody.java
@@ -0,0 +1,42 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.rocketmq.remoting.protocol.body;
+
+import java.util.List;
+import org.apache.rocketmq.common.TopicConfig;
+import org.apache.rocketmq.remoting.annotation.CFNotNull;
+import org.apache.rocketmq.remoting.protocol.RemotingSerializable;
+
+public class CreateTopicListRequestBody extends RemotingSerializable {
+    @CFNotNull
+    private List<TopicConfig> topicConfigList;
+
+    public CreateTopicListRequestBody() {}
+
+    public CreateTopicListRequestBody(List<TopicConfig> topicConfigList) {
+        this.topicConfigList = topicConfigList;
+    }
+
+    public List<TopicConfig> getTopicConfigList() {
+        return topicConfigList;
+    }
+
+    public void setTopicConfigList(List<TopicConfig> topicConfigList) {
+        this.topicConfigList = topicConfigList;
+    }
+
+}
diff --git 
a/remoting/src/main/java/org/apache/rocketmq/remoting/protocol/header/CreateTopicListRequestHeader.java
 
b/remoting/src/main/java/org/apache/rocketmq/remoting/protocol/header/CreateTopicListRequestHeader.java
new file mode 100644
index 0000000000..615de750c4
--- /dev/null
+++ 
b/remoting/src/main/java/org/apache/rocketmq/remoting/protocol/header/CreateTopicListRequestHeader.java
@@ -0,0 +1,31 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.rocketmq.remoting.protocol.header;
+
+import org.apache.rocketmq.common.action.Action;
+import org.apache.rocketmq.common.action.RocketMQAction;
+import org.apache.rocketmq.remoting.exception.RemotingCommandException;
+import org.apache.rocketmq.remoting.protocol.RequestCode;
+import org.apache.rocketmq.remoting.rpc.RpcRequestHeader;
+
+@RocketMQAction(value = RequestCode.UPDATE_AND_CREATE_TOPIC_LIST, action = 
Action.CREATE)
+public class CreateTopicListRequestHeader extends RpcRequestHeader {
+    @Override
+    public void checkFields() throws RemotingCommandException {
+
+    }
+}
diff --git 
a/tools/src/main/java/org/apache/rocketmq/tools/admin/DefaultMQAdminExt.java 
b/tools/src/main/java/org/apache/rocketmq/tools/admin/DefaultMQAdminExt.java
index a02c878d96..37dd322488 100644
--- a/tools/src/main/java/org/apache/rocketmq/tools/admin/DefaultMQAdminExt.java
+++ b/tools/src/main/java/org/apache/rocketmq/tools/admin/DefaultMQAdminExt.java
@@ -195,6 +195,11 @@ public class DefaultMQAdminExt extends ClientConfig 
implements MQAdminExt {
         defaultMQAdminExtImpl.createAndUpdateTopicConfig(addr, config);
     }
 
+    @Override
+    public void createAndUpdateTopicConfigList(String addr, List<TopicConfig> 
topicConfigList) throws InterruptedException, RemotingException, 
MQClientException {
+        defaultMQAdminExtImpl.createAndUpdateTopicConfigList(addr, 
topicConfigList);
+    }
+
     @Override
     public void createAndUpdatePlainAccessConfig(String addr,
         PlainAccessConfig config) throws RemotingException, MQBrokerException, 
InterruptedException, MQClientException {
diff --git 
a/tools/src/main/java/org/apache/rocketmq/tools/admin/DefaultMQAdminExtImpl.java
 
b/tools/src/main/java/org/apache/rocketmq/tools/admin/DefaultMQAdminExtImpl.java
index 2046b1a44c..b5a20673da 100644
--- 
a/tools/src/main/java/org/apache/rocketmq/tools/admin/DefaultMQAdminExtImpl.java
+++ 
b/tools/src/main/java/org/apache/rocketmq/tools/admin/DefaultMQAdminExtImpl.java
@@ -275,6 +275,12 @@ public class DefaultMQAdminExtImpl implements MQAdminExt, 
MQAdminExtInner {
         this.mqClientInstance.getMQClientAPIImpl().createTopic(addr, 
this.defaultMQAdminExt.getCreateTopicKey(), config, timeoutMillis);
     }
 
+    @Override
+    public void createAndUpdateTopicConfigList(final String brokerAddr,
+        final List<TopicConfig> topicConfigList) throws RemotingException, 
InterruptedException, MQClientException {
+        this.mqClientInstance.getMQClientAPIImpl().createTopicList(brokerAddr, 
topicConfigList, timeoutMillis);
+    }
+
     @Override
     public void createAndUpdatePlainAccessConfig(String addr,
         PlainAccessConfig config) throws RemotingException, MQBrokerException, 
InterruptedException, MQClientException {
diff --git 
a/tools/src/main/java/org/apache/rocketmq/tools/admin/MQAdminExt.java 
b/tools/src/main/java/org/apache/rocketmq/tools/admin/MQAdminExt.java
index 50deb7edfc..96940c38b2 100644
--- a/tools/src/main/java/org/apache/rocketmq/tools/admin/MQAdminExt.java
+++ b/tools/src/main/java/org/apache/rocketmq/tools/admin/MQAdminExt.java
@@ -92,6 +92,9 @@ public interface MQAdminExt extends MQAdmin {
         final TopicConfig config) throws RemotingException, MQBrokerException,
         InterruptedException, MQClientException;
 
+    void createAndUpdateTopicConfigList(final String addr,
+        final List<TopicConfig> topicConfigList) throws InterruptedException, 
RemotingException, MQClientException;
+
     void createAndUpdatePlainAccessConfig(final String addr,
         final PlainAccessConfig plainAccessConfig) throws RemotingException, 
MQBrokerException,
         InterruptedException, MQClientException;
diff --git 
a/tools/src/main/java/org/apache/rocketmq/tools/command/MQAdminStartup.java 
b/tools/src/main/java/org/apache/rocketmq/tools/command/MQAdminStartup.java
index f8b8ec248a..e785934ba3 100644
--- a/tools/src/main/java/org/apache/rocketmq/tools/command/MQAdminStartup.java
+++ b/tools/src/main/java/org/apache/rocketmq/tools/command/MQAdminStartup.java
@@ -113,6 +113,7 @@ import 
org.apache.rocketmq.tools.command.topic.TopicRouteSubCommand;
 import org.apache.rocketmq.tools.command.topic.TopicStatusSubCommand;
 import org.apache.rocketmq.tools.command.topic.UpdateOrderConfCommand;
 import org.apache.rocketmq.tools.command.topic.UpdateStaticTopicSubCommand;
+import org.apache.rocketmq.tools.command.topic.UpdateTopicListSubCommand;
 import org.apache.rocketmq.tools.command.topic.UpdateTopicPermSubCommand;
 import org.apache.rocketmq.tools.command.topic.UpdateTopicSubCommand;
 
@@ -187,6 +188,7 @@ public class MQAdminStartup {
 
     public static void initCommand() {
         initCommand(new UpdateTopicSubCommand());
+        initCommand(new UpdateTopicListSubCommand());
         initCommand(new DeleteTopicSubCommand());
         initCommand(new UpdateSubGroupSubCommand());
         initCommand(new SetConsumeModeSubCommand());
diff --git 
a/tools/src/main/java/org/apache/rocketmq/tools/command/topic/UpdateTopicListSubCommand.java
 
b/tools/src/main/java/org/apache/rocketmq/tools/command/topic/UpdateTopicListSubCommand.java
new file mode 100644
index 0000000000..a246059e11
--- /dev/null
+++ 
b/tools/src/main/java/org/apache/rocketmq/tools/command/topic/UpdateTopicListSubCommand.java
@@ -0,0 +1,118 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.rocketmq.tools.command.topic;
+
+import com.alibaba.fastjson2.JSON;
+import java.nio.file.Files;
+import java.nio.file.Path;
+import java.nio.file.Paths;
+import java.util.List;
+import java.util.Set;
+import org.apache.commons.cli.CommandLine;
+import org.apache.commons.cli.Option;
+import org.apache.commons.cli.OptionGroup;
+import org.apache.commons.cli.Options;
+import org.apache.rocketmq.common.TopicConfig;
+import org.apache.rocketmq.remoting.RPCHook;
+import org.apache.rocketmq.srvutil.ServerUtil;
+import org.apache.rocketmq.tools.admin.DefaultMQAdminExt;
+import org.apache.rocketmq.tools.command.CommandUtil;
+import org.apache.rocketmq.tools.command.SubCommand;
+import org.apache.rocketmq.tools.command.SubCommandException;
+
+public class UpdateTopicListSubCommand implements SubCommand {
+    @Override
+    public String commandName() {
+        return "updateTopicList";
+    }
+
+    @Override
+    public String commandDesc() {
+        return "create or update topic in batch";
+    }
+
+    @Override
+    public Options buildCommandlineOptions(Options options) {
+        final OptionGroup optionGroup = new OptionGroup();
+        Option opt = new Option("b", "brokerAddr", true, "create topic to 
which broker");
+        optionGroup.addOption(opt);
+        opt = new Option("c", "clusterName", true, "create topic to which 
cluster");
+        optionGroup.addOption(opt);
+        optionGroup.setRequired(true);
+        options.addOptionGroup(optionGroup);
+
+        opt = new Option("f", "filename", true, "Path to a file with list of 
org.apache.rocketmq.common.TopicConfig in json format");
+        opt.setRequired(true);
+        options.addOption(opt);
+
+        return options;
+    }
+
+    @Override
+    public void execute(CommandLine commandLine, Options options,
+        RPCHook rpcHook) throws SubCommandException {
+        final DefaultMQAdminExt defaultMQAdminExt = new 
DefaultMQAdminExt(rpcHook);
+        
defaultMQAdminExt.setInstanceName(Long.toString(System.currentTimeMillis()));
+
+        final String fileName = commandLine.getOptionValue('f').trim();
+
+
+        try {
+            final Path filePath = Paths.get(fileName);
+            if (!Files.exists(filePath)) {
+                System.out.printf("the file path %s does not exists%n", 
fileName);
+                return;
+            }
+            final byte[] topicConfigListBytes = Files.readAllBytes(filePath);
+            final List<TopicConfig> topicConfigs = 
JSON.parseArray(topicConfigListBytes, TopicConfig.class);
+            if (null == topicConfigs || topicConfigs.isEmpty()) {
+                return;
+            }
+
+            if (commandLine.hasOption('b')) {
+                String brokerAddress = commandLine.getOptionValue('b').trim();
+                defaultMQAdminExt.start();
+                
defaultMQAdminExt.createAndUpdateTopicConfigList(brokerAddress, topicConfigs);
+
+                System.out.printf("submit batch of topic config to %s success, 
please check the result later.%n",
+                    brokerAddress);
+                return;
+
+            } else if (commandLine.hasOption('c')) {
+                final String clusterName = 
commandLine.getOptionValue('c').trim();
+
+                defaultMQAdminExt.start();
+
+                Set<String> masterSet =
+                    
CommandUtil.fetchMasterAddrByClusterName(defaultMQAdminExt, clusterName);
+                for (String brokerAddress : masterSet) {
+                    
defaultMQAdminExt.createAndUpdateTopicConfigList(brokerAddress, topicConfigs);
+
+                    System.out.printf("submit batch of topic config to %s 
success, please check the result later.%n",
+                        brokerAddress);
+                }
+            }
+
+            ServerUtil.printCommandLineHelp("mqadmin " + this.commandName(), 
options);
+        } catch (Exception e) {
+            throw new SubCommandException(this.getClass().getSimpleName() + " 
command failed", e);
+        } finally {
+            defaultMQAdminExt.shutdown();
+        }
+    }
+}
diff --git 
a/tools/src/test/java/org/apache/rocketmq/tools/command/topic/UpdateTopicListSubCommandTest.java
 
b/tools/src/test/java/org/apache/rocketmq/tools/command/topic/UpdateTopicListSubCommandTest.java
new file mode 100644
index 0000000000..95bb579da8
--- /dev/null
+++ 
b/tools/src/test/java/org/apache/rocketmq/tools/command/topic/UpdateTopicListSubCommandTest.java
@@ -0,0 +1,41 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.rocketmq.tools.command.topic;
+
+import org.apache.commons.cli.CommandLine;
+import org.apache.commons.cli.DefaultParser;
+import org.apache.commons.cli.Options;
+import org.apache.rocketmq.srvutil.ServerUtil;
+import org.junit.jupiter.api.Test;
+
+import static org.junit.jupiter.api.Assertions.assertEquals;
+
+class UpdateTopicListSubCommandTest {
+
+    @Test
+    public void testArguments() {
+        UpdateTopicListSubCommand cmd = new UpdateTopicListSubCommand();
+        Options options = ServerUtil.buildCommandlineOptions(new Options());
+        String[] subargs = new String[] {"-b 127.0.0.1:10911", "-f 
topics.json"};
+        final CommandLine commandLine =
+            ServerUtil.parseCmdLine("mqadmin " + cmd.commandName(), subargs,
+                cmd.buildCommandlineOptions(options), new DefaultParser());
+        assertEquals("127.0.0.1:10911", 
commandLine.getOptionValue('b').trim());
+        assertEquals("topics.json", commandLine.getOptionValue('f').trim());
+    }
+}
\ No newline at end of file

Reply via email to