This is an automated email from the ASF dual-hosted git repository. jinrongtong pushed a commit to branch develop in repository https://gitbox.apache.org/repos/asf/rocketmq.git
The following commit(s) were added to refs/heads/develop by this push: new 3e100103af [ISSUE #7277] Enhance rocksDBConfigToJson to support metadata counting (#7276) 3e100103af is described below commit 3e100103af68588528bf32f3752a85e8023f46f8 Author: Ziyi Tan <tanziyi0...@gmail.com> AuthorDate: Tue Aug 29 13:48:51 2023 +0800 [ISSUE #7277] Enhance rocksDBConfigToJson to support metadata counting (#7276) --- .../common/config/AbstractRocksDBStorage.java | 4 +- .../common/config/ConfigRocksDBStorage.java | 6 + .../rocketmq/tools/command/MQAdminStartup.java | 4 +- .../export/ExportMetadataInRocksDBCommand.java | 138 +++++++++++++++++++++ .../metadata/RocksDBConfigToJsonCommand.java | 122 ------------------ ...ava => ExportMetadataInRocksDBCommandTest.java} | 38 +++--- 6 files changed, 173 insertions(+), 139 deletions(-) diff --git a/common/src/main/java/org/apache/rocketmq/common/config/AbstractRocksDBStorage.java b/common/src/main/java/org/apache/rocketmq/common/config/AbstractRocksDBStorage.java index e3673baad0..a720a5be32 100644 --- a/common/src/main/java/org/apache/rocketmq/common/config/AbstractRocksDBStorage.java +++ b/common/src/main/java/org/apache/rocketmq/common/config/AbstractRocksDBStorage.java @@ -385,8 +385,10 @@ public abstract class AbstractRocksDBStorage { this.options.close(); } //4. close db. - if (db != null) { + if (db != null && !this.readOnly) { this.db.syncWal(); + } + if (db != null) { this.db.closeE(); } //5. help gc. diff --git a/common/src/main/java/org/apache/rocketmq/common/config/ConfigRocksDBStorage.java b/common/src/main/java/org/apache/rocketmq/common/config/ConfigRocksDBStorage.java index 9d05ed2828..463bd8fed0 100644 --- a/common/src/main/java/org/apache/rocketmq/common/config/ConfigRocksDBStorage.java +++ b/common/src/main/java/org/apache/rocketmq/common/config/ConfigRocksDBStorage.java @@ -60,6 +60,12 @@ public class ConfigRocksDBStorage extends AbstractRocksDBStorage { this.readOnly = false; } + public ConfigRocksDBStorage(final String dbPath, boolean readOnly) { + super(); + this.dbPath = dbPath; + this.readOnly = readOnly; + } + private void initOptions() { this.options = createConfigDBOptions(); diff --git a/tools/src/main/java/org/apache/rocketmq/tools/command/MQAdminStartup.java b/tools/src/main/java/org/apache/rocketmq/tools/command/MQAdminStartup.java index 324aa18569..788fa83c2c 100644 --- a/tools/src/main/java/org/apache/rocketmq/tools/command/MQAdminStartup.java +++ b/tools/src/main/java/org/apache/rocketmq/tools/command/MQAdminStartup.java @@ -80,7 +80,7 @@ import org.apache.rocketmq.tools.command.message.QueryMsgByOffsetSubCommand; import org.apache.rocketmq.tools.command.message.QueryMsgByUniqueKeySubCommand; import org.apache.rocketmq.tools.command.message.QueryMsgTraceByIdSubCommand; import org.apache.rocketmq.tools.command.message.SendMessageCommand; -import org.apache.rocketmq.tools.command.metadata.RocksDBConfigToJsonCommand; +import org.apache.rocketmq.tools.command.export.ExportMetadataInRocksDBCommand; import org.apache.rocketmq.tools.command.namesrv.AddWritePermSubCommand; import org.apache.rocketmq.tools.command.namesrv.DeleteKvConfigCommand; import org.apache.rocketmq.tools.command.namesrv.GetNamesrvConfigCommand; @@ -212,7 +212,6 @@ public class MQAdminStartup { initCommand(new ClusterListSubCommand()); initCommand(new TopicListSubCommand()); - initCommand(new RocksDBConfigToJsonCommand()); initCommand(new UpdateKvConfigCommand()); initCommand(new DeleteKvConfigCommand()); @@ -257,6 +256,7 @@ public class MQAdminStartup { initCommand(new ExportMetadataCommand()); initCommand(new ExportConfigsCommand()); initCommand(new ExportMetricsCommand()); + initCommand(new ExportMetadataInRocksDBCommand()); initCommand(new HAStatusSubCommand()); diff --git a/tools/src/main/java/org/apache/rocketmq/tools/command/export/ExportMetadataInRocksDBCommand.java b/tools/src/main/java/org/apache/rocketmq/tools/command/export/ExportMetadataInRocksDBCommand.java new file mode 100644 index 0000000000..2a7d3fba43 --- /dev/null +++ b/tools/src/main/java/org/apache/rocketmq/tools/command/export/ExportMetadataInRocksDBCommand.java @@ -0,0 +1,138 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.rocketmq.tools.command.export; + +import com.alibaba.fastjson.JSONObject; +import org.apache.commons.cli.CommandLine; +import org.apache.commons.cli.Option; +import org.apache.commons.cli.Options; +import org.apache.commons.lang3.StringUtils; +import org.apache.rocketmq.common.UtilAll; +import org.apache.rocketmq.common.config.ConfigRocksDBStorage; +import org.apache.rocketmq.common.utils.DataConverter; +import org.apache.rocketmq.remoting.RPCHook; +import org.apache.rocketmq.tools.command.SubCommand; +import org.apache.rocketmq.tools.command.SubCommandException; +import org.rocksdb.RocksIterator; + +import java.util.HashMap; +import java.util.Map; +import java.util.concurrent.atomic.AtomicLong; +import java.util.function.BiConsumer; + +public class ExportMetadataInRocksDBCommand implements SubCommand { + private static final String TOPICS_JSON_CONFIG = "topics"; + private static final String SUBSCRIPTION_GROUP_JSON_CONFIG = "subscriptionGroups"; + + @Override + public String commandName() { + return "exportMetadataInRocksDB"; + } + + @Override + public String commandDesc() { + return "export RocksDB kv config (topics/subscriptionGroups)"; + } + + @Override + public Options buildCommandlineOptions(Options options) { + Option pathOption = new Option("p", "path", true, + "Absolute path for the metadata directory"); + pathOption.setRequired(true); + options.addOption(pathOption); + + Option configTypeOption = new Option("t", "configType", true, "Name of kv config, e.g. " + + "topics/subscriptionGroups"); + configTypeOption.setRequired(true); + options.addOption(configTypeOption); + + Option jsonEnableOption = new Option("j", "jsonEnable", true, + "Json format enable, Default: false"); + jsonEnableOption.setRequired(false); + options.addOption(jsonEnableOption); + + return options; + } + + @Override + public void execute(CommandLine commandLine, Options options, RPCHook rpcHook) throws SubCommandException { + String path = commandLine.getOptionValue("path").trim(); + if (StringUtils.isEmpty(path) || !UtilAll.isPathExists(path)) { + System.out.print("RocksDB path is invalid.\n"); + return; + } + + String configType = commandLine.getOptionValue("configType").trim().toLowerCase(); + + boolean jsonEnable = false; + if (commandLine.hasOption("jsonEnable")) { + jsonEnable = Boolean.parseBoolean(commandLine.getOptionValue("jsonEnable").trim()); + } + + + ConfigRocksDBStorage kvStore = new ConfigRocksDBStorage(path, true /* readOnly */); + if (!kvStore.start()) { + System.out.print("RocksDB load error, path=" + path + "\n"); + return; + } + + try { + if (TOPICS_JSON_CONFIG.equalsIgnoreCase(configType) || SUBSCRIPTION_GROUP_JSON_CONFIG.equalsIgnoreCase(configType)) { + handleExportMetadata(kvStore, configType, jsonEnable); + } else { + System.out.printf("Invalid config type=%s, Options: topics,subscriptionGroups\n", configType); + } + } finally { + kvStore.shutdown(); + } + } + + private static void handleExportMetadata(ConfigRocksDBStorage kvStore, String configType, boolean jsonEnable) { + if (jsonEnable) { + final Map<String, JSONObject> jsonConfig = new HashMap<>(); + final Map<String, JSONObject> configTable = new HashMap<>(); + iterateKvStore(kvStore, (key, value) -> { + final String configKey = new String(key, DataConverter.charset); + final String configValue = new String(value, DataConverter.charset); + final JSONObject jsonObject = JSONObject.parseObject(configValue); + configTable.put(configKey, jsonObject); + } + ); + + jsonConfig.put(configType.equalsIgnoreCase(TOPICS_JSON_CONFIG) ? "topicConfigTable" : "subscriptionGroupTable", + (JSONObject) JSONObject.toJSON(configTable)); + final String jsonConfigStr = JSONObject.toJSONString(jsonConfig, true); + System.out.print(jsonConfigStr + "\n"); + } else { + AtomicLong count = new AtomicLong(0); + iterateKvStore(kvStore, (key, value) -> { + final String configKey = new String(key, DataConverter.charset); + final String configValue = new String(value, DataConverter.charset); + System.out.printf("%d, Key: %s, Value: %s%n", count.incrementAndGet(), configKey, configValue); + }); + } + } + + private static void iterateKvStore(ConfigRocksDBStorage kvStore, BiConsumer<byte[], byte[]> biConsumer) { + try (RocksIterator iterator = kvStore.iterator()) { + iterator.seekToFirst(); + for (iterator.seekToFirst(); iterator.isValid(); iterator.next()) { + biConsumer.accept(iterator.key(), iterator.value()); + } + } + } +} diff --git a/tools/src/main/java/org/apache/rocketmq/tools/command/metadata/RocksDBConfigToJsonCommand.java b/tools/src/main/java/org/apache/rocketmq/tools/command/metadata/RocksDBConfigToJsonCommand.java deleted file mode 100644 index 3fc63e4dd4..0000000000 --- a/tools/src/main/java/org/apache/rocketmq/tools/command/metadata/RocksDBConfigToJsonCommand.java +++ /dev/null @@ -1,122 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.rocketmq.tools.command.metadata; - -import com.alibaba.fastjson.JSONObject; -import org.apache.commons.cli.CommandLine; -import org.apache.commons.cli.Option; -import org.apache.commons.cli.Options; -import org.apache.commons.lang3.StringUtils; -import org.apache.rocketmq.common.UtilAll; -import org.apache.rocketmq.common.config.RocksDBConfigManager; -import org.apache.rocketmq.common.utils.DataConverter; -import org.apache.rocketmq.remoting.RPCHook; -import org.apache.rocketmq.tools.command.SubCommand; -import org.apache.rocketmq.tools.command.SubCommandException; - -import java.util.HashMap; -import java.util.Map; - -public class RocksDBConfigToJsonCommand implements SubCommand { - private static final String TOPICS_JSON_CONFIG = "topics"; - private static final String SUBSCRIPTION_GROUP_JSON_CONFIG = "subscriptionGroups"; - - @Override - public String commandName() { - return "rocksDBConfigToJson"; - } - - @Override - public String commandDesc() { - return "Convert RocksDB kv config (topics/subscriptionGroups) to json"; - } - - @Override - public Options buildCommandlineOptions(Options options) { - Option pathOption = new Option("p", "path", true, - "Absolute path for the metadata directory"); - pathOption.setRequired(true); - options.addOption(pathOption); - - Option configTypeOption = new Option("t", "configType", true, "Name of kv config, e.g. " + - "topics/subscriptionGroups"); - configTypeOption.setRequired(true); - options.addOption(configTypeOption); - - return options; - } - - @Override - public void execute(CommandLine commandLine, Options options, RPCHook rpcHook) throws SubCommandException { - String path = commandLine.getOptionValue("path").trim(); - if (StringUtils.isEmpty(path) || !UtilAll.isPathExists(path)) { - System.out.print("Rocksdb path is invalid.\n"); - return; - } - - String configType = commandLine.getOptionValue("configType").trim().toLowerCase(); - - RocksDBConfigManager kvConfigManager = new RocksDBConfigManager(60 * 60 * 1000L); - try { - if (TOPICS_JSON_CONFIG.toLowerCase().equals(configType)) { - // for topics.json - final Map<String, JSONObject> topicsJsonConfig = new HashMap<>(); - final Map<String, JSONObject> topicConfigTable = new HashMap<>(); - boolean isLoad = kvConfigManager.load(path, (key, value) -> { - final String topic = new String(key, DataConverter.charset); - final String topicConfig = new String(value, DataConverter.charset); - final JSONObject jsonObject = JSONObject.parseObject(topicConfig); - topicConfigTable.put(topic, jsonObject); - }); - - if (!isLoad) { - System.out.print("RocksDB load error, path=" + path); - return; - } - topicsJsonConfig.put("topicConfigTable", (JSONObject) JSONObject.toJSON(topicConfigTable)); - final String topicsJsonStr = JSONObject.toJSONString(topicsJsonConfig, true); - System.out.print(topicsJsonStr + "\n"); - return; - } - - if (SUBSCRIPTION_GROUP_JSON_CONFIG.toLowerCase().equals(configType)) { - // for subscriptionGroup.json - final Map<String, JSONObject> subscriptionGroupJsonConfig = new HashMap<>(); - final Map<String, JSONObject> subscriptionGroupTable = new HashMap<>(); - boolean isLoad = kvConfigManager.load(path, (key, value) -> { - final String subscriptionGroup = new String(key, DataConverter.charset); - final String subscriptionGroupConfig = new String(value, DataConverter.charset); - final JSONObject jsonObject = JSONObject.parseObject(subscriptionGroupConfig); - subscriptionGroupTable.put(subscriptionGroup, jsonObject); - }); - - if (!isLoad) { - System.out.print("RocksDB load error, path=" + path); - return; - } - subscriptionGroupJsonConfig.put("subscriptionGroupTable", - (JSONObject) JSONObject.toJSON(subscriptionGroupTable)); - final String subscriptionGroupJsonStr = JSONObject.toJSONString(subscriptionGroupJsonConfig, true); - System.out.print(subscriptionGroupJsonStr + "\n"); - return; - } - System.out.print("Config type was not recognized, configType=" + configType + "\n"); - } finally { - kvConfigManager.stop(); - } - } -} diff --git a/tools/src/test/java/org/apache/rocketmq/tools/command/metadata/KvConfigToJsonCommandTest.java b/tools/src/test/java/org/apache/rocketmq/tools/command/metadata/ExportMetadataInRocksDBCommandTest.java similarity index 62% rename from tools/src/test/java/org/apache/rocketmq/tools/command/metadata/KvConfigToJsonCommandTest.java rename to tools/src/test/java/org/apache/rocketmq/tools/command/metadata/ExportMetadataInRocksDBCommandTest.java index b2f66c7b0b..2b938c90fb 100644 --- a/tools/src/test/java/org/apache/rocketmq/tools/command/metadata/KvConfigToJsonCommandTest.java +++ b/tools/src/test/java/org/apache/rocketmq/tools/command/metadata/ExportMetadataInRocksDBCommandTest.java @@ -21,43 +21,53 @@ import org.apache.commons.cli.DefaultParser; import org.apache.commons.cli.Options; import org.apache.rocketmq.srvutil.ServerUtil; import org.apache.rocketmq.tools.command.SubCommandException; +import org.apache.rocketmq.tools.command.export.ExportMetadataInRocksDBCommand; import org.junit.Test; import java.io.File; import static org.assertj.core.api.Assertions.assertThat; -public class KvConfigToJsonCommandTest { +public class ExportMetadataInRocksDBCommandTest { private static final String BASE_PATH = System.getProperty("user.home") + File.separator + "store/config/"; @Test public void testExecute() throws SubCommandException { { - String[] cases = new String[]{"topics", "subscriptionGroups"}; - for (String c : cases) { - RocksDBConfigToJsonCommand cmd = new RocksDBConfigToJsonCommand(); + String[][] cases = new String[][] { + {"topics", "false"}, + {"topics", "false1"}, + {"topics", "true"}, + {"subscriptionGroups", "false"}, + {"subscriptionGroups", "false2"}, + {"subscriptionGroups", "true"} + }; + + for (String[] c : cases) { + ExportMetadataInRocksDBCommand cmd = new ExportMetadataInRocksDBCommand(); Options options = ServerUtil.buildCommandlineOptions(new Options()); - String[] subargs = new String[]{"-p " + BASE_PATH + c, "-t " + c}; + String[] subargs = new String[] {"-p " + BASE_PATH + c[0], "-t " + c[0], "-j " + c[1]}; final CommandLine commandLine = ServerUtil.parseCmdLine("mqadmin " + cmd.commandName(), subargs, - cmd.buildCommandlineOptions(options), new DefaultParser()); + cmd.buildCommandlineOptions(options), new DefaultParser()); cmd.execute(commandLine, options, null); - assertThat(commandLine.getOptionValue("p").trim()).isEqualTo(BASE_PATH + c); - assertThat(commandLine.getOptionValue("t").trim()).isEqualTo(c); + assertThat(commandLine.getOptionValue("p").trim()).isEqualTo(BASE_PATH + c[0]); + assertThat(commandLine.getOptionValue("t").trim()).isEqualTo(c[0]); + assertThat(commandLine.getOptionValue("j").trim()).isEqualTo(c[1]); } } // invalid cases { - String[][] cases = new String[][]{ - {"-p " + BASE_PATH + "tmpPath", "-t topics"}, - {"-p ", "-t topics"}, - {"-p " + BASE_PATH + "topics", "-t invalid_type"} + String[][] cases = new String[][] { + {"-p " + BASE_PATH + "tmpPath", "-t topics", "-j true"}, + {"-p ", "-t topics", "-j true"}, + {"-p " + BASE_PATH + "topics", "-t invalid_type", "-j true"} }; for (String[] c : cases) { - RocksDBConfigToJsonCommand cmd = new RocksDBConfigToJsonCommand(); + ExportMetadataInRocksDBCommand cmd = new ExportMetadataInRocksDBCommand(); Options options = ServerUtil.buildCommandlineOptions(new Options()); final CommandLine commandLine = ServerUtil.parseCmdLine("mqadmin " + cmd.commandName(), c, - cmd.buildCommandlineOptions(options), new DefaultParser()); + cmd.buildCommandlineOptions(options), new DefaultParser()); cmd.execute(commandLine, options, null); } }