This is an automated email from the ASF dual-hosted git repository.

jinrongtong pushed a commit to branch develop
in repository https://gitbox.apache.org/repos/asf/rocketmq.git


The following commit(s) were added to refs/heads/develop by this push:
     new 3e100103af [ISSUE #7277] Enhance rocksDBConfigToJson to support 
metadata counting (#7276)
3e100103af is described below

commit 3e100103af68588528bf32f3752a85e8023f46f8
Author: Ziyi Tan <tanziyi0...@gmail.com>
AuthorDate: Tue Aug 29 13:48:51 2023 +0800

    [ISSUE #7277] Enhance rocksDBConfigToJson to support metadata counting 
(#7276)
---
 .../common/config/AbstractRocksDBStorage.java      |   4 +-
 .../common/config/ConfigRocksDBStorage.java        |   6 +
 .../rocketmq/tools/command/MQAdminStartup.java     |   4 +-
 .../export/ExportMetadataInRocksDBCommand.java     | 138 +++++++++++++++++++++
 .../metadata/RocksDBConfigToJsonCommand.java       | 122 ------------------
 ...ava => ExportMetadataInRocksDBCommandTest.java} |  38 +++---
 6 files changed, 173 insertions(+), 139 deletions(-)

diff --git 
a/common/src/main/java/org/apache/rocketmq/common/config/AbstractRocksDBStorage.java
 
b/common/src/main/java/org/apache/rocketmq/common/config/AbstractRocksDBStorage.java
index e3673baad0..a720a5be32 100644
--- 
a/common/src/main/java/org/apache/rocketmq/common/config/AbstractRocksDBStorage.java
+++ 
b/common/src/main/java/org/apache/rocketmq/common/config/AbstractRocksDBStorage.java
@@ -385,8 +385,10 @@ public abstract class AbstractRocksDBStorage {
                 this.options.close();
             }
             //4. close db.
-            if (db != null) {
+            if (db != null && !this.readOnly) {
                 this.db.syncWal();
+            }
+            if (db != null) {
                 this.db.closeE();
             }
             //5. help gc.
diff --git 
a/common/src/main/java/org/apache/rocketmq/common/config/ConfigRocksDBStorage.java
 
b/common/src/main/java/org/apache/rocketmq/common/config/ConfigRocksDBStorage.java
index 9d05ed2828..463bd8fed0 100644
--- 
a/common/src/main/java/org/apache/rocketmq/common/config/ConfigRocksDBStorage.java
+++ 
b/common/src/main/java/org/apache/rocketmq/common/config/ConfigRocksDBStorage.java
@@ -60,6 +60,12 @@ public class ConfigRocksDBStorage extends 
AbstractRocksDBStorage {
         this.readOnly = false;
     }
 
+    public ConfigRocksDBStorage(final String dbPath, boolean readOnly) {
+        super();
+        this.dbPath = dbPath;
+        this.readOnly = readOnly;
+    }
+
     private void initOptions() {
         this.options = createConfigDBOptions();
 
diff --git 
a/tools/src/main/java/org/apache/rocketmq/tools/command/MQAdminStartup.java 
b/tools/src/main/java/org/apache/rocketmq/tools/command/MQAdminStartup.java
index 324aa18569..788fa83c2c 100644
--- a/tools/src/main/java/org/apache/rocketmq/tools/command/MQAdminStartup.java
+++ b/tools/src/main/java/org/apache/rocketmq/tools/command/MQAdminStartup.java
@@ -80,7 +80,7 @@ import 
org.apache.rocketmq.tools.command.message.QueryMsgByOffsetSubCommand;
 import org.apache.rocketmq.tools.command.message.QueryMsgByUniqueKeySubCommand;
 import org.apache.rocketmq.tools.command.message.QueryMsgTraceByIdSubCommand;
 import org.apache.rocketmq.tools.command.message.SendMessageCommand;
-import org.apache.rocketmq.tools.command.metadata.RocksDBConfigToJsonCommand;
+import org.apache.rocketmq.tools.command.export.ExportMetadataInRocksDBCommand;
 import org.apache.rocketmq.tools.command.namesrv.AddWritePermSubCommand;
 import org.apache.rocketmq.tools.command.namesrv.DeleteKvConfigCommand;
 import org.apache.rocketmq.tools.command.namesrv.GetNamesrvConfigCommand;
@@ -212,7 +212,6 @@ public class MQAdminStartup {
 
         initCommand(new ClusterListSubCommand());
         initCommand(new TopicListSubCommand());
-        initCommand(new RocksDBConfigToJsonCommand());
 
         initCommand(new UpdateKvConfigCommand());
         initCommand(new DeleteKvConfigCommand());
@@ -257,6 +256,7 @@ public class MQAdminStartup {
         initCommand(new ExportMetadataCommand());
         initCommand(new ExportConfigsCommand());
         initCommand(new ExportMetricsCommand());
+        initCommand(new ExportMetadataInRocksDBCommand());
 
         initCommand(new HAStatusSubCommand());
 
diff --git 
a/tools/src/main/java/org/apache/rocketmq/tools/command/export/ExportMetadataInRocksDBCommand.java
 
b/tools/src/main/java/org/apache/rocketmq/tools/command/export/ExportMetadataInRocksDBCommand.java
new file mode 100644
index 0000000000..2a7d3fba43
--- /dev/null
+++ 
b/tools/src/main/java/org/apache/rocketmq/tools/command/export/ExportMetadataInRocksDBCommand.java
@@ -0,0 +1,138 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.rocketmq.tools.command.export;
+
+import com.alibaba.fastjson.JSONObject;
+import org.apache.commons.cli.CommandLine;
+import org.apache.commons.cli.Option;
+import org.apache.commons.cli.Options;
+import org.apache.commons.lang3.StringUtils;
+import org.apache.rocketmq.common.UtilAll;
+import org.apache.rocketmq.common.config.ConfigRocksDBStorage;
+import org.apache.rocketmq.common.utils.DataConverter;
+import org.apache.rocketmq.remoting.RPCHook;
+import org.apache.rocketmq.tools.command.SubCommand;
+import org.apache.rocketmq.tools.command.SubCommandException;
+import org.rocksdb.RocksIterator;
+
+import java.util.HashMap;
+import java.util.Map;
+import java.util.concurrent.atomic.AtomicLong;
+import java.util.function.BiConsumer;
+
+public class ExportMetadataInRocksDBCommand implements SubCommand {
+    private static final String TOPICS_JSON_CONFIG = "topics";
+    private static final String SUBSCRIPTION_GROUP_JSON_CONFIG = 
"subscriptionGroups";
+
+    @Override
+    public String commandName() {
+        return "exportMetadataInRocksDB";
+    }
+
+    @Override
+    public String commandDesc() {
+        return "export RocksDB kv config (topics/subscriptionGroups)";
+    }
+
+    @Override
+    public Options buildCommandlineOptions(Options options) {
+        Option pathOption = new Option("p", "path", true,
+            "Absolute path for the metadata directory");
+        pathOption.setRequired(true);
+        options.addOption(pathOption);
+
+        Option configTypeOption = new Option("t", "configType", true, "Name of 
kv config, e.g. " +
+            "topics/subscriptionGroups");
+        configTypeOption.setRequired(true);
+        options.addOption(configTypeOption);
+
+        Option jsonEnableOption = new Option("j", "jsonEnable", true,
+            "Json format enable, Default: false");
+        jsonEnableOption.setRequired(false);
+        options.addOption(jsonEnableOption);
+
+        return options;
+    }
+
+    @Override
+    public void execute(CommandLine commandLine, Options options, RPCHook 
rpcHook) throws SubCommandException {
+        String path = commandLine.getOptionValue("path").trim();
+        if (StringUtils.isEmpty(path) || !UtilAll.isPathExists(path)) {
+            System.out.print("RocksDB path is invalid.\n");
+            return;
+        }
+
+        String configType = 
commandLine.getOptionValue("configType").trim().toLowerCase();
+
+        boolean jsonEnable = false;
+        if (commandLine.hasOption("jsonEnable")) {
+            jsonEnable = 
Boolean.parseBoolean(commandLine.getOptionValue("jsonEnable").trim());
+        }
+
+
+        ConfigRocksDBStorage kvStore = new ConfigRocksDBStorage(path, true /* 
readOnly */);
+        if (!kvStore.start()) {
+            System.out.print("RocksDB load error, path=" + path + "\n");
+            return;
+        }
+
+        try {
+            if (TOPICS_JSON_CONFIG.equalsIgnoreCase(configType) || 
SUBSCRIPTION_GROUP_JSON_CONFIG.equalsIgnoreCase(configType)) {
+                handleExportMetadata(kvStore, configType, jsonEnable);
+            } else {
+                System.out.printf("Invalid config type=%s, Options: 
topics,subscriptionGroups\n", configType);
+            }
+        } finally {
+            kvStore.shutdown();
+        }
+    }
+
+    private static void handleExportMetadata(ConfigRocksDBStorage kvStore, 
String configType, boolean jsonEnable) {
+        if (jsonEnable) {
+            final Map<String, JSONObject> jsonConfig = new HashMap<>();
+            final Map<String, JSONObject> configTable = new HashMap<>();
+            iterateKvStore(kvStore, (key, value) -> {
+                    final String configKey = new String(key, 
DataConverter.charset);
+                    final String configValue = new String(value, 
DataConverter.charset);
+                    final JSONObject jsonObject = 
JSONObject.parseObject(configValue);
+                    configTable.put(configKey, jsonObject);
+                }
+            );
+
+            jsonConfig.put(configType.equalsIgnoreCase(TOPICS_JSON_CONFIG) ? 
"topicConfigTable" : "subscriptionGroupTable",
+                (JSONObject) JSONObject.toJSON(configTable));
+            final String jsonConfigStr = JSONObject.toJSONString(jsonConfig, 
true);
+            System.out.print(jsonConfigStr + "\n");
+        } else {
+            AtomicLong count = new AtomicLong(0);
+            iterateKvStore(kvStore, (key, value) -> {
+                final String configKey = new String(key, 
DataConverter.charset);
+                final String configValue = new String(value, 
DataConverter.charset);
+                System.out.printf("%d, Key: %s, Value: %s%n", 
count.incrementAndGet(), configKey, configValue);
+            });
+        }
+    }
+
+    private static void iterateKvStore(ConfigRocksDBStorage kvStore, 
BiConsumer<byte[], byte[]> biConsumer) {
+        try (RocksIterator iterator = kvStore.iterator()) {
+            iterator.seekToFirst();
+            for (iterator.seekToFirst(); iterator.isValid(); iterator.next()) {
+                biConsumer.accept(iterator.key(), iterator.value());
+            }
+        }
+    }
+}
diff --git 
a/tools/src/main/java/org/apache/rocketmq/tools/command/metadata/RocksDBConfigToJsonCommand.java
 
b/tools/src/main/java/org/apache/rocketmq/tools/command/metadata/RocksDBConfigToJsonCommand.java
deleted file mode 100644
index 3fc63e4dd4..0000000000
--- 
a/tools/src/main/java/org/apache/rocketmq/tools/command/metadata/RocksDBConfigToJsonCommand.java
+++ /dev/null
@@ -1,122 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.rocketmq.tools.command.metadata;
-
-import com.alibaba.fastjson.JSONObject;
-import org.apache.commons.cli.CommandLine;
-import org.apache.commons.cli.Option;
-import org.apache.commons.cli.Options;
-import org.apache.commons.lang3.StringUtils;
-import org.apache.rocketmq.common.UtilAll;
-import org.apache.rocketmq.common.config.RocksDBConfigManager;
-import org.apache.rocketmq.common.utils.DataConverter;
-import org.apache.rocketmq.remoting.RPCHook;
-import org.apache.rocketmq.tools.command.SubCommand;
-import org.apache.rocketmq.tools.command.SubCommandException;
-
-import java.util.HashMap;
-import java.util.Map;
-
-public class RocksDBConfigToJsonCommand implements SubCommand {
-    private static final String TOPICS_JSON_CONFIG = "topics";
-    private static final String SUBSCRIPTION_GROUP_JSON_CONFIG = 
"subscriptionGroups";
-
-    @Override
-    public String commandName() {
-        return "rocksDBConfigToJson";
-    }
-
-    @Override
-    public String commandDesc() {
-        return "Convert RocksDB kv config (topics/subscriptionGroups) to json";
-    }
-
-    @Override
-    public Options buildCommandlineOptions(Options options) {
-        Option pathOption = new Option("p", "path", true,
-                "Absolute path for the metadata directory");
-        pathOption.setRequired(true);
-        options.addOption(pathOption);
-
-        Option configTypeOption = new Option("t", "configType", true, "Name of 
kv config, e.g. " +
-                "topics/subscriptionGroups");
-        configTypeOption.setRequired(true);
-        options.addOption(configTypeOption);
-
-        return options;
-    }
-
-    @Override
-    public void execute(CommandLine commandLine, Options options, RPCHook 
rpcHook) throws SubCommandException {
-        String path = commandLine.getOptionValue("path").trim();
-        if (StringUtils.isEmpty(path) || !UtilAll.isPathExists(path)) {
-            System.out.print("Rocksdb path is invalid.\n");
-            return;
-        }
-
-        String configType = 
commandLine.getOptionValue("configType").trim().toLowerCase();
-
-        RocksDBConfigManager kvConfigManager = new RocksDBConfigManager(60 * 
60 * 1000L);
-        try {
-            if (TOPICS_JSON_CONFIG.toLowerCase().equals(configType)) {
-                // for topics.json
-                final Map<String, JSONObject> topicsJsonConfig = new 
HashMap<>();
-                final Map<String, JSONObject> topicConfigTable = new 
HashMap<>();
-                boolean isLoad = kvConfigManager.load(path, (key, value) -> {
-                    final String topic = new String(key, 
DataConverter.charset);
-                    final String topicConfig = new String(value, 
DataConverter.charset);
-                    final JSONObject jsonObject = 
JSONObject.parseObject(topicConfig);
-                    topicConfigTable.put(topic, jsonObject);
-                });
-
-                if (!isLoad) {
-                    System.out.print("RocksDB load error, path=" + path);
-                    return;
-                }
-                topicsJsonConfig.put("topicConfigTable", (JSONObject) 
JSONObject.toJSON(topicConfigTable));
-                final String topicsJsonStr = 
JSONObject.toJSONString(topicsJsonConfig, true);
-                System.out.print(topicsJsonStr + "\n");
-                return;
-            }
-
-            if 
(SUBSCRIPTION_GROUP_JSON_CONFIG.toLowerCase().equals(configType)) {
-                // for subscriptionGroup.json
-                final Map<String, JSONObject> subscriptionGroupJsonConfig = 
new HashMap<>();
-                final Map<String, JSONObject> subscriptionGroupTable = new 
HashMap<>();
-                boolean isLoad = kvConfigManager.load(path, (key, value) -> {
-                    final String subscriptionGroup = new String(key, 
DataConverter.charset);
-                    final String subscriptionGroupConfig = new String(value, 
DataConverter.charset);
-                    final JSONObject jsonObject = 
JSONObject.parseObject(subscriptionGroupConfig);
-                    subscriptionGroupTable.put(subscriptionGroup, jsonObject);
-                });
-
-                if (!isLoad) {
-                    System.out.print("RocksDB load error, path=" + path);
-                    return;
-                }
-                subscriptionGroupJsonConfig.put("subscriptionGroupTable",
-                        (JSONObject) 
JSONObject.toJSON(subscriptionGroupTable));
-                final String subscriptionGroupJsonStr = 
JSONObject.toJSONString(subscriptionGroupJsonConfig, true);
-                System.out.print(subscriptionGroupJsonStr + "\n");
-                return;
-            }
-            System.out.print("Config type was not recognized, configType=" + 
configType + "\n");
-        } finally {
-            kvConfigManager.stop();
-        }
-    }
-}
diff --git 
a/tools/src/test/java/org/apache/rocketmq/tools/command/metadata/KvConfigToJsonCommandTest.java
 
b/tools/src/test/java/org/apache/rocketmq/tools/command/metadata/ExportMetadataInRocksDBCommandTest.java
similarity index 62%
rename from 
tools/src/test/java/org/apache/rocketmq/tools/command/metadata/KvConfigToJsonCommandTest.java
rename to 
tools/src/test/java/org/apache/rocketmq/tools/command/metadata/ExportMetadataInRocksDBCommandTest.java
index b2f66c7b0b..2b938c90fb 100644
--- 
a/tools/src/test/java/org/apache/rocketmq/tools/command/metadata/KvConfigToJsonCommandTest.java
+++ 
b/tools/src/test/java/org/apache/rocketmq/tools/command/metadata/ExportMetadataInRocksDBCommandTest.java
@@ -21,43 +21,53 @@ import org.apache.commons.cli.DefaultParser;
 import org.apache.commons.cli.Options;
 import org.apache.rocketmq.srvutil.ServerUtil;
 import org.apache.rocketmq.tools.command.SubCommandException;
+import org.apache.rocketmq.tools.command.export.ExportMetadataInRocksDBCommand;
 import org.junit.Test;
 
 import java.io.File;
 
 import static org.assertj.core.api.Assertions.assertThat;
 
-public class KvConfigToJsonCommandTest {
+public class ExportMetadataInRocksDBCommandTest {
     private static final String BASE_PATH = System.getProperty("user.home") + 
File.separator + "store/config/";
 
     @Test
     public void testExecute() throws SubCommandException {
         {
-            String[] cases = new String[]{"topics", "subscriptionGroups"};
-            for (String c : cases) {
-                RocksDBConfigToJsonCommand cmd = new 
RocksDBConfigToJsonCommand();
+            String[][] cases = new String[][] {
+                {"topics", "false"},
+                {"topics", "false1"},
+                {"topics", "true"},
+                {"subscriptionGroups", "false"},
+                {"subscriptionGroups", "false2"},
+                {"subscriptionGroups", "true"}
+            };
+
+            for (String[] c : cases) {
+                ExportMetadataInRocksDBCommand cmd = new 
ExportMetadataInRocksDBCommand();
                 Options options = ServerUtil.buildCommandlineOptions(new 
Options());
-                String[] subargs = new String[]{"-p " + BASE_PATH + c, "-t " + 
c};
+                String[] subargs = new String[] {"-p " + BASE_PATH + c[0], "-t 
" + c[0], "-j " + c[1]};
                 final CommandLine commandLine = 
ServerUtil.parseCmdLine("mqadmin " + cmd.commandName(), subargs,
-                        cmd.buildCommandlineOptions(options), new 
DefaultParser());
+                    cmd.buildCommandlineOptions(options), new DefaultParser());
                 cmd.execute(commandLine, options, null);
-                
assertThat(commandLine.getOptionValue("p").trim()).isEqualTo(BASE_PATH + c);
-                
assertThat(commandLine.getOptionValue("t").trim()).isEqualTo(c);
+                
assertThat(commandLine.getOptionValue("p").trim()).isEqualTo(BASE_PATH + c[0]);
+                
assertThat(commandLine.getOptionValue("t").trim()).isEqualTo(c[0]);
+                
assertThat(commandLine.getOptionValue("j").trim()).isEqualTo(c[1]);
             }
         }
         // invalid cases
         {
-            String[][] cases = new String[][]{
-                    {"-p " + BASE_PATH + "tmpPath", "-t topics"},
-                    {"-p  ", "-t topics"},
-                    {"-p " + BASE_PATH + "topics", "-t invalid_type"}
+            String[][] cases = new String[][] {
+                {"-p " + BASE_PATH + "tmpPath", "-t topics", "-j true"},
+                {"-p  ", "-t topics", "-j true"},
+                {"-p " + BASE_PATH + "topics", "-t invalid_type", "-j true"}
             };
 
             for (String[] c : cases) {
-                RocksDBConfigToJsonCommand cmd = new 
RocksDBConfigToJsonCommand();
+                ExportMetadataInRocksDBCommand cmd = new 
ExportMetadataInRocksDBCommand();
                 Options options = ServerUtil.buildCommandlineOptions(new 
Options());
                 final CommandLine commandLine = 
ServerUtil.parseCmdLine("mqadmin " + cmd.commandName(), c,
-                        cmd.buildCommandlineOptions(options), new 
DefaultParser());
+                    cmd.buildCommandlineOptions(options), new DefaultParser());
                 cmd.execute(commandLine, options, null);
             }
         }

Reply via email to