This is an automated email from the ASF dual-hosted git repository. fuyou pushed a commit to branch develop in repository https://gitbox.apache.org/repos/asf/rocketmq.git
The following commit(s) were added to refs/heads/develop by this push: new 4be8fd4372 [ISSUE #8265] Implement Batch Creation of Topics in RocketMQ Admin (#8267) 4be8fd4372 is described below commit 4be8fd43720c8635fe135404a7fd000c00bb2a15 Author: guyinyou <36399867+guyin...@users.noreply.github.com> AuthorDate: Fri Jun 7 10:28:36 2024 +0800 [ISSUE #8265] Implement Batch Creation of Topics in RocketMQ Admin (#8267) * add UPDATE_AND_CREATE_TOPIC_LIST * support creating or updating topic config in batch --------- Co-authored-by: guyinyou <guyinyou....@alibaba-inc.com> Co-authored-by: gaoyang.cgy <gaoyang....@alibaba-inc.com> --- .../broker/processor/AdminBrokerProcessor.java | 81 ++++++++++++++ .../rocketmq/broker/topic/TopicConfigManager.java | 11 +- .../rocketmq/client/impl/MQClientAPIImpl.java | 20 ++++ .../rocketmq/client/impl/MQClientAPIImplTest.java | 23 ++++ .../rocketmq/remoting/protocol/RequestCode.java | 1 + .../protocol/body/CreateTopicListRequestBody.java | 42 ++++++++ .../header/CreateTopicListRequestHeader.java | 31 ++++++ .../rocketmq/tools/admin/DefaultMQAdminExt.java | 5 + .../tools/admin/DefaultMQAdminExtImpl.java | 6 ++ .../apache/rocketmq/tools/admin/MQAdminExt.java | 3 + .../rocketmq/tools/command/MQAdminStartup.java | 2 + .../command/topic/UpdateTopicListSubCommand.java | 118 +++++++++++++++++++++ .../topic/UpdateTopicListSubCommandTest.java | 41 +++++++ 13 files changed, 383 insertions(+), 1 deletion(-) diff --git a/broker/src/main/java/org/apache/rocketmq/broker/processor/AdminBrokerProcessor.java b/broker/src/main/java/org/apache/rocketmq/broker/processor/AdminBrokerProcessor.java index 40a7a461e8..44bf2a4813 100644 --- a/broker/src/main/java/org/apache/rocketmq/broker/processor/AdminBrokerProcessor.java +++ b/broker/src/main/java/org/apache/rocketmq/broker/processor/AdminBrokerProcessor.java @@ -116,6 +116,7 @@ import org.apache.rocketmq.remoting.protocol.body.Connection; import org.apache.rocketmq.remoting.protocol.body.ConsumeQueueData; import org.apache.rocketmq.remoting.protocol.body.ConsumeStatsList; import org.apache.rocketmq.remoting.protocol.body.ConsumerConnection; +import org.apache.rocketmq.remoting.protocol.body.CreateTopicListRequestBody; import org.apache.rocketmq.remoting.protocol.body.EpochEntryCache; import org.apache.rocketmq.remoting.protocol.body.GroupList; import org.apache.rocketmq.remoting.protocol.body.HARuntimeInfo; @@ -243,6 +244,8 @@ public class AdminBrokerProcessor implements NettyRequestProcessor { switch (request.getCode()) { case RequestCode.UPDATE_AND_CREATE_TOPIC: return this.updateAndCreateTopic(ctx, request); + case RequestCode.UPDATE_AND_CREATE_TOPIC_LIST: + return this.updateAndCreateTopicList(ctx, request); case RequestCode.DELETE_TOPIC_IN_BROKER: return this.deleteTopic(ctx, request); case RequestCode.GET_ALL_TOPIC_CONFIG: @@ -536,6 +539,84 @@ public class AdminBrokerProcessor implements NettyRequestProcessor { return response; } + private synchronized RemotingCommand updateAndCreateTopicList(ChannelHandlerContext ctx, + RemotingCommand request) throws RemotingCommandException { + long startTime = System.currentTimeMillis(); + + final CreateTopicListRequestBody requestBody = CreateTopicListRequestBody.decode(request.getBody(), CreateTopicListRequestBody.class); + List<TopicConfig> topicConfigList = requestBody.getTopicConfigList(); + + StringBuilder builder = new StringBuilder(); + for (TopicConfig topicConfig : topicConfigList) { + builder.append(topicConfig.getTopicName()).append(";"); + } + String topicNames = builder.toString(); + LOGGER.info("AdminBrokerProcessor#updateAndCreateTopicList: topicNames: {}, called by {}", topicNames, RemotingHelper.parseChannelRemoteAddr(ctx.channel())); + + final RemotingCommand response = RemotingCommand.createResponseCommand(null); + + long executionTime; + + try { + // Valid topics + for (TopicConfig topicConfig : topicConfigList) { + String topic = topicConfig.getTopicName(); + TopicValidator.ValidateTopicResult result = TopicValidator.validateTopic(topic); + if (!result.isValid()) { + response.setCode(ResponseCode.SYSTEM_ERROR); + response.setRemark(result.getRemark()); + return response; + } + if (brokerController.getBrokerConfig().isValidateSystemTopicWhenUpdateTopic()) { + if (TopicValidator.isSystemTopic(topic)) { + response.setCode(ResponseCode.SYSTEM_ERROR); + response.setRemark("The topic[" + topic + "] is conflict with system topic."); + return response; + } + } + if (topicConfig.getTopicMessageType() == TopicMessageType.MIXED + && !brokerController.getBrokerConfig().isEnableMixedMessageType()) { + response.setCode(ResponseCode.SYSTEM_ERROR); + response.setRemark("MIXED message type is not supported."); + return response; + } + if (topicConfig.equals(this.brokerController.getTopicConfigManager().getTopicConfigTable().get(topic))) { + LOGGER.info("Broker receive request to update or create topic={}, but topicConfig has no changes , so idempotent, caller address={}", + topic, RemotingHelper.parseChannelRemoteAddr(ctx.channel())); + response.setCode(ResponseCode.SUCCESS); + return response; + } + } + + this.brokerController.getTopicConfigManager().updateTopicConfigList(topicConfigList); + if (brokerController.getBrokerConfig().isEnableSingleTopicRegister()) { + for (TopicConfig topicConfig : topicConfigList) { + this.brokerController.registerSingleTopicAll(topicConfig); + } + } else { + this.brokerController.registerIncrementBrokerData(topicConfigList, this.brokerController.getTopicConfigManager().getDataVersion()); + } + response.setCode(ResponseCode.SUCCESS); + } catch (Exception e) { + LOGGER.error("Update / create topic failed for [{}]", request, e); + response.setCode(ResponseCode.SYSTEM_ERROR); + response.setRemark(e.getMessage()); + return response; + } + finally { + executionTime = System.currentTimeMillis() - startTime; + InvocationStatus status = response.getCode() == ResponseCode.SUCCESS ? + InvocationStatus.SUCCESS : InvocationStatus.FAILURE; + Attributes attributes = BrokerMetricsManager.newAttributesBuilder() + .put(LABEL_INVOCATION_STATUS, status.getName()) + .put(LABEL_IS_SYSTEM, TopicValidator.isSystemTopic(topicNames)) + .build(); + BrokerMetricsManager.topicCreateExecuteTime.record(executionTime, attributes); + } + LOGGER.info("executionTime of all topics:{} is {} ms" , topicNames, executionTime); + return response; + } + private synchronized RemotingCommand updateAndCreateStaticTopic(ChannelHandlerContext ctx, RemotingCommand request) throws RemotingCommandException { final RemotingCommand response = RemotingCommand.createResponseCommand(null); diff --git a/broker/src/main/java/org/apache/rocketmq/broker/topic/TopicConfigManager.java b/broker/src/main/java/org/apache/rocketmq/broker/topic/TopicConfigManager.java index 1ed9cbab5f..d7c06180e9 100644 --- a/broker/src/main/java/org/apache/rocketmq/broker/topic/TopicConfigManager.java +++ b/broker/src/main/java/org/apache/rocketmq/broker/topic/TopicConfigManager.java @@ -18,6 +18,7 @@ package org.apache.rocketmq.broker.topic; import java.util.HashMap; import java.util.Iterator; +import java.util.List; import java.util.Map; import java.util.Map.Entry; import java.util.Set; @@ -490,7 +491,7 @@ public class TopicConfigManager extends ConfigManager { } } - public void updateTopicConfig(final TopicConfig topicConfig) { + private void updateSingleTopicConfigWithoutPersist(final TopicConfig topicConfig) { checkNotNull(topicConfig, "topicConfig shouldn't be null"); Map<String, String> newAttributes = request(topicConfig); @@ -515,10 +516,18 @@ public class TopicConfigManager extends ConfigManager { long stateMachineVersion = brokerController.getMessageStore() != null ? brokerController.getMessageStore().getStateMachineVersion() : 0; dataVersion.nextVersion(stateMachineVersion); + } + public void updateTopicConfig(final TopicConfig topicConfig) { + updateSingleTopicConfigWithoutPersist(topicConfig); this.persist(topicConfig.getTopicName(), topicConfig); } + public void updateTopicConfigList(final List<TopicConfig> topicConfigList) { + topicConfigList.forEach(this::updateSingleTopicConfigWithoutPersist); + this.persist(); + } + private synchronized void updateTieredStoreTopicMetadata(final TopicConfig topicConfig, Map<String, String> newAttributes) { if (!(brokerController.getMessageStore() instanceof TieredMessageStore)) { if (newAttributes.get(TopicAttributes.TOPIC_RESERVE_TIME_ATTRIBUTE.getName()) != null) { diff --git a/client/src/main/java/org/apache/rocketmq/client/impl/MQClientAPIImpl.java b/client/src/main/java/org/apache/rocketmq/client/impl/MQClientAPIImpl.java index 816ae877ac..f3d7e7c70f 100644 --- a/client/src/main/java/org/apache/rocketmq/client/impl/MQClientAPIImpl.java +++ b/client/src/main/java/org/apache/rocketmq/client/impl/MQClientAPIImpl.java @@ -118,6 +118,7 @@ import org.apache.rocketmq.remoting.protocol.body.ConsumeMessageDirectlyResult; import org.apache.rocketmq.remoting.protocol.body.ConsumeStatsList; import org.apache.rocketmq.remoting.protocol.body.ConsumerConnection; import org.apache.rocketmq.remoting.protocol.body.ConsumerRunningInfo; +import org.apache.rocketmq.remoting.protocol.body.CreateTopicListRequestBody; import org.apache.rocketmq.remoting.protocol.body.EpochEntryCache; import org.apache.rocketmq.remoting.protocol.body.GetConsumerStatusBody; import org.apache.rocketmq.remoting.protocol.body.GroupList; @@ -150,6 +151,7 @@ import org.apache.rocketmq.remoting.protocol.header.ConsumeMessageDirectlyResult import org.apache.rocketmq.remoting.protocol.header.ConsumerSendMsgBackRequestHeader; import org.apache.rocketmq.remoting.protocol.header.CreateAccessConfigRequestHeader; import org.apache.rocketmq.remoting.protocol.header.CreateAclRequestHeader; +import org.apache.rocketmq.remoting.protocol.header.CreateTopicListRequestHeader; import org.apache.rocketmq.remoting.protocol.header.CreateTopicRequestHeader; import org.apache.rocketmq.remoting.protocol.header.CreateUserRequestHeader; import org.apache.rocketmq.remoting.protocol.header.DeleteAccessConfigRequestHeader; @@ -430,6 +432,24 @@ public class MQClientAPIImpl implements NameServerUpdateCallback { throw new MQClientException(response.getCode(), response.getRemark()); } + public void createTopicList(final String address, final List<TopicConfig> topicConfigList, final long timeoutMillis) + throws InterruptedException, RemotingException, MQClientException { + CreateTopicListRequestHeader requestHeader = new CreateTopicListRequestHeader(); + CreateTopicListRequestBody requestBody = new CreateTopicListRequestBody(topicConfigList); + + RemotingCommand request = RemotingCommand.createRequestCommand(RequestCode.UPDATE_AND_CREATE_TOPIC_LIST, requestHeader); + request.setBody(requestBody.encode()); + + RemotingCommand response = this.remotingClient.invokeSync( + MixAll.brokerVIPChannel(this.clientConfig.isVipChannelEnabled(), address), request, timeoutMillis); + assert response != null; + if (response.getCode() == ResponseCode.SUCCESS) { + return; + } + + throw new MQClientException(response.getCode(), response.getRemark()); + } + public void createPlainAccessConfig(final String addr, final PlainAccessConfig plainAccessConfig, final long timeoutMillis) throws RemotingException, InterruptedException, MQClientException { diff --git a/client/src/test/java/org/apache/rocketmq/client/impl/MQClientAPIImplTest.java b/client/src/test/java/org/apache/rocketmq/client/impl/MQClientAPIImplTest.java index 97d8d04e64..b0876c7c0d 100644 --- a/client/src/test/java/org/apache/rocketmq/client/impl/MQClientAPIImplTest.java +++ b/client/src/test/java/org/apache/rocketmq/client/impl/MQClientAPIImplTest.java @@ -19,6 +19,7 @@ package org.apache.rocketmq.client.impl; import java.lang.reflect.Field; import java.net.InetSocketAddress; import java.util.Collections; +import java.util.LinkedList; import java.util.List; import java.util.Set; import java.util.concurrent.CountDownLatch; @@ -1068,4 +1069,26 @@ public class MQClientAPIImplTest { int topicCnt = mqClientAPI.addWritePermOfBroker("127.0.0.1", "default-broker", 1000); assertThat(topicCnt).isEqualTo(7); } + + @Test + public void testCreateTopicList_Success() throws RemotingException, InterruptedException, MQClientException { + doAnswer((Answer<RemotingCommand>) mock -> { + RemotingCommand request = mock.getArgument(1); + + RemotingCommand response = RemotingCommand.createResponseCommand(null); + response.setCode(ResponseCode.SUCCESS); + response.setOpaque(request.getOpaque()); + return response; + }).when(remotingClient).invokeSync(anyString(), any(RemotingCommand.class), anyLong()); + + final List<TopicConfig> topicConfigList = new LinkedList<>(); + for (int i = 0; i < 16; i++) { + TopicConfig topicConfig = new TopicConfig(); + topicConfig.setTopicName("Topic" + i); + topicConfigList.add(topicConfig); + } + + mqClientAPI.createTopicList(brokerAddr, topicConfigList, 10000); + } + } diff --git a/remoting/src/main/java/org/apache/rocketmq/remoting/protocol/RequestCode.java b/remoting/src/main/java/org/apache/rocketmq/remoting/protocol/RequestCode.java index 1de724e0f1..3be22fc56b 100644 --- a/remoting/src/main/java/org/apache/rocketmq/remoting/protocol/RequestCode.java +++ b/remoting/src/main/java/org/apache/rocketmq/remoting/protocol/RequestCode.java @@ -28,6 +28,7 @@ public class RequestCode { public static final int QUERY_CONSUMER_OFFSET = 14; public static final int UPDATE_CONSUMER_OFFSET = 15; public static final int UPDATE_AND_CREATE_TOPIC = 17; + public static final int UPDATE_AND_CREATE_TOPIC_LIST = 18; public static final int GET_ALL_TOPIC_CONFIG = 21; public static final int GET_TOPIC_CONFIG_LIST = 22; diff --git a/remoting/src/main/java/org/apache/rocketmq/remoting/protocol/body/CreateTopicListRequestBody.java b/remoting/src/main/java/org/apache/rocketmq/remoting/protocol/body/CreateTopicListRequestBody.java new file mode 100644 index 0000000000..a72be31ac9 --- /dev/null +++ b/remoting/src/main/java/org/apache/rocketmq/remoting/protocol/body/CreateTopicListRequestBody.java @@ -0,0 +1,42 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.rocketmq.remoting.protocol.body; + +import java.util.List; +import org.apache.rocketmq.common.TopicConfig; +import org.apache.rocketmq.remoting.annotation.CFNotNull; +import org.apache.rocketmq.remoting.protocol.RemotingSerializable; + +public class CreateTopicListRequestBody extends RemotingSerializable { + @CFNotNull + private List<TopicConfig> topicConfigList; + + public CreateTopicListRequestBody() {} + + public CreateTopicListRequestBody(List<TopicConfig> topicConfigList) { + this.topicConfigList = topicConfigList; + } + + public List<TopicConfig> getTopicConfigList() { + return topicConfigList; + } + + public void setTopicConfigList(List<TopicConfig> topicConfigList) { + this.topicConfigList = topicConfigList; + } + +} diff --git a/remoting/src/main/java/org/apache/rocketmq/remoting/protocol/header/CreateTopicListRequestHeader.java b/remoting/src/main/java/org/apache/rocketmq/remoting/protocol/header/CreateTopicListRequestHeader.java new file mode 100644 index 0000000000..615de750c4 --- /dev/null +++ b/remoting/src/main/java/org/apache/rocketmq/remoting/protocol/header/CreateTopicListRequestHeader.java @@ -0,0 +1,31 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.rocketmq.remoting.protocol.header; + +import org.apache.rocketmq.common.action.Action; +import org.apache.rocketmq.common.action.RocketMQAction; +import org.apache.rocketmq.remoting.exception.RemotingCommandException; +import org.apache.rocketmq.remoting.protocol.RequestCode; +import org.apache.rocketmq.remoting.rpc.RpcRequestHeader; + +@RocketMQAction(value = RequestCode.UPDATE_AND_CREATE_TOPIC_LIST, action = Action.CREATE) +public class CreateTopicListRequestHeader extends RpcRequestHeader { + @Override + public void checkFields() throws RemotingCommandException { + + } +} diff --git a/tools/src/main/java/org/apache/rocketmq/tools/admin/DefaultMQAdminExt.java b/tools/src/main/java/org/apache/rocketmq/tools/admin/DefaultMQAdminExt.java index a02c878d96..37dd322488 100644 --- a/tools/src/main/java/org/apache/rocketmq/tools/admin/DefaultMQAdminExt.java +++ b/tools/src/main/java/org/apache/rocketmq/tools/admin/DefaultMQAdminExt.java @@ -195,6 +195,11 @@ public class DefaultMQAdminExt extends ClientConfig implements MQAdminExt { defaultMQAdminExtImpl.createAndUpdateTopicConfig(addr, config); } + @Override + public void createAndUpdateTopicConfigList(String addr, List<TopicConfig> topicConfigList) throws InterruptedException, RemotingException, MQClientException { + defaultMQAdminExtImpl.createAndUpdateTopicConfigList(addr, topicConfigList); + } + @Override public void createAndUpdatePlainAccessConfig(String addr, PlainAccessConfig config) throws RemotingException, MQBrokerException, InterruptedException, MQClientException { diff --git a/tools/src/main/java/org/apache/rocketmq/tools/admin/DefaultMQAdminExtImpl.java b/tools/src/main/java/org/apache/rocketmq/tools/admin/DefaultMQAdminExtImpl.java index 2046b1a44c..b5a20673da 100644 --- a/tools/src/main/java/org/apache/rocketmq/tools/admin/DefaultMQAdminExtImpl.java +++ b/tools/src/main/java/org/apache/rocketmq/tools/admin/DefaultMQAdminExtImpl.java @@ -275,6 +275,12 @@ public class DefaultMQAdminExtImpl implements MQAdminExt, MQAdminExtInner { this.mqClientInstance.getMQClientAPIImpl().createTopic(addr, this.defaultMQAdminExt.getCreateTopicKey(), config, timeoutMillis); } + @Override + public void createAndUpdateTopicConfigList(final String brokerAddr, + final List<TopicConfig> topicConfigList) throws RemotingException, InterruptedException, MQClientException { + this.mqClientInstance.getMQClientAPIImpl().createTopicList(brokerAddr, topicConfigList, timeoutMillis); + } + @Override public void createAndUpdatePlainAccessConfig(String addr, PlainAccessConfig config) throws RemotingException, MQBrokerException, InterruptedException, MQClientException { diff --git a/tools/src/main/java/org/apache/rocketmq/tools/admin/MQAdminExt.java b/tools/src/main/java/org/apache/rocketmq/tools/admin/MQAdminExt.java index 50deb7edfc..96940c38b2 100644 --- a/tools/src/main/java/org/apache/rocketmq/tools/admin/MQAdminExt.java +++ b/tools/src/main/java/org/apache/rocketmq/tools/admin/MQAdminExt.java @@ -92,6 +92,9 @@ public interface MQAdminExt extends MQAdmin { final TopicConfig config) throws RemotingException, MQBrokerException, InterruptedException, MQClientException; + void createAndUpdateTopicConfigList(final String addr, + final List<TopicConfig> topicConfigList) throws InterruptedException, RemotingException, MQClientException; + void createAndUpdatePlainAccessConfig(final String addr, final PlainAccessConfig plainAccessConfig) throws RemotingException, MQBrokerException, InterruptedException, MQClientException; diff --git a/tools/src/main/java/org/apache/rocketmq/tools/command/MQAdminStartup.java b/tools/src/main/java/org/apache/rocketmq/tools/command/MQAdminStartup.java index f8b8ec248a..e785934ba3 100644 --- a/tools/src/main/java/org/apache/rocketmq/tools/command/MQAdminStartup.java +++ b/tools/src/main/java/org/apache/rocketmq/tools/command/MQAdminStartup.java @@ -113,6 +113,7 @@ import org.apache.rocketmq.tools.command.topic.TopicRouteSubCommand; import org.apache.rocketmq.tools.command.topic.TopicStatusSubCommand; import org.apache.rocketmq.tools.command.topic.UpdateOrderConfCommand; import org.apache.rocketmq.tools.command.topic.UpdateStaticTopicSubCommand; +import org.apache.rocketmq.tools.command.topic.UpdateTopicListSubCommand; import org.apache.rocketmq.tools.command.topic.UpdateTopicPermSubCommand; import org.apache.rocketmq.tools.command.topic.UpdateTopicSubCommand; @@ -187,6 +188,7 @@ public class MQAdminStartup { public static void initCommand() { initCommand(new UpdateTopicSubCommand()); + initCommand(new UpdateTopicListSubCommand()); initCommand(new DeleteTopicSubCommand()); initCommand(new UpdateSubGroupSubCommand()); initCommand(new SetConsumeModeSubCommand()); diff --git a/tools/src/main/java/org/apache/rocketmq/tools/command/topic/UpdateTopicListSubCommand.java b/tools/src/main/java/org/apache/rocketmq/tools/command/topic/UpdateTopicListSubCommand.java new file mode 100644 index 0000000000..a246059e11 --- /dev/null +++ b/tools/src/main/java/org/apache/rocketmq/tools/command/topic/UpdateTopicListSubCommand.java @@ -0,0 +1,118 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.rocketmq.tools.command.topic; + +import com.alibaba.fastjson2.JSON; +import java.nio.file.Files; +import java.nio.file.Path; +import java.nio.file.Paths; +import java.util.List; +import java.util.Set; +import org.apache.commons.cli.CommandLine; +import org.apache.commons.cli.Option; +import org.apache.commons.cli.OptionGroup; +import org.apache.commons.cli.Options; +import org.apache.rocketmq.common.TopicConfig; +import org.apache.rocketmq.remoting.RPCHook; +import org.apache.rocketmq.srvutil.ServerUtil; +import org.apache.rocketmq.tools.admin.DefaultMQAdminExt; +import org.apache.rocketmq.tools.command.CommandUtil; +import org.apache.rocketmq.tools.command.SubCommand; +import org.apache.rocketmq.tools.command.SubCommandException; + +public class UpdateTopicListSubCommand implements SubCommand { + @Override + public String commandName() { + return "updateTopicList"; + } + + @Override + public String commandDesc() { + return "create or update topic in batch"; + } + + @Override + public Options buildCommandlineOptions(Options options) { + final OptionGroup optionGroup = new OptionGroup(); + Option opt = new Option("b", "brokerAddr", true, "create topic to which broker"); + optionGroup.addOption(opt); + opt = new Option("c", "clusterName", true, "create topic to which cluster"); + optionGroup.addOption(opt); + optionGroup.setRequired(true); + options.addOptionGroup(optionGroup); + + opt = new Option("f", "filename", true, "Path to a file with list of org.apache.rocketmq.common.TopicConfig in json format"); + opt.setRequired(true); + options.addOption(opt); + + return options; + } + + @Override + public void execute(CommandLine commandLine, Options options, + RPCHook rpcHook) throws SubCommandException { + final DefaultMQAdminExt defaultMQAdminExt = new DefaultMQAdminExt(rpcHook); + defaultMQAdminExt.setInstanceName(Long.toString(System.currentTimeMillis())); + + final String fileName = commandLine.getOptionValue('f').trim(); + + + try { + final Path filePath = Paths.get(fileName); + if (!Files.exists(filePath)) { + System.out.printf("the file path %s does not exists%n", fileName); + return; + } + final byte[] topicConfigListBytes = Files.readAllBytes(filePath); + final List<TopicConfig> topicConfigs = JSON.parseArray(topicConfigListBytes, TopicConfig.class); + if (null == topicConfigs || topicConfigs.isEmpty()) { + return; + } + + if (commandLine.hasOption('b')) { + String brokerAddress = commandLine.getOptionValue('b').trim(); + defaultMQAdminExt.start(); + defaultMQAdminExt.createAndUpdateTopicConfigList(brokerAddress, topicConfigs); + + System.out.printf("submit batch of topic config to %s success, please check the result later.%n", + brokerAddress); + return; + + } else if (commandLine.hasOption('c')) { + final String clusterName = commandLine.getOptionValue('c').trim(); + + defaultMQAdminExt.start(); + + Set<String> masterSet = + CommandUtil.fetchMasterAddrByClusterName(defaultMQAdminExt, clusterName); + for (String brokerAddress : masterSet) { + defaultMQAdminExt.createAndUpdateTopicConfigList(brokerAddress, topicConfigs); + + System.out.printf("submit batch of topic config to %s success, please check the result later.%n", + brokerAddress); + } + } + + ServerUtil.printCommandLineHelp("mqadmin " + this.commandName(), options); + } catch (Exception e) { + throw new SubCommandException(this.getClass().getSimpleName() + " command failed", e); + } finally { + defaultMQAdminExt.shutdown(); + } + } +} diff --git a/tools/src/test/java/org/apache/rocketmq/tools/command/topic/UpdateTopicListSubCommandTest.java b/tools/src/test/java/org/apache/rocketmq/tools/command/topic/UpdateTopicListSubCommandTest.java new file mode 100644 index 0000000000..95bb579da8 --- /dev/null +++ b/tools/src/test/java/org/apache/rocketmq/tools/command/topic/UpdateTopicListSubCommandTest.java @@ -0,0 +1,41 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.rocketmq.tools.command.topic; + +import org.apache.commons.cli.CommandLine; +import org.apache.commons.cli.DefaultParser; +import org.apache.commons.cli.Options; +import org.apache.rocketmq.srvutil.ServerUtil; +import org.junit.jupiter.api.Test; + +import static org.junit.jupiter.api.Assertions.assertEquals; + +class UpdateTopicListSubCommandTest { + + @Test + public void testArguments() { + UpdateTopicListSubCommand cmd = new UpdateTopicListSubCommand(); + Options options = ServerUtil.buildCommandlineOptions(new Options()); + String[] subargs = new String[] {"-b 127.0.0.1:10911", "-f topics.json"}; + final CommandLine commandLine = + ServerUtil.parseCmdLine("mqadmin " + cmd.commandName(), subargs, + cmd.buildCommandlineOptions(options), new DefaultParser()); + assertEquals("127.0.0.1:10911", commandLine.getOptionValue('b').trim()); + assertEquals("topics.json", commandLine.getOptionValue('f').trim()); + } +} \ No newline at end of file