This is an automated email from the ASF dual-hosted git repository.

lollipop pushed a commit to branch develop
in repository https://gitbox.apache.org/repos/asf/rocketmq.git


The following commit(s) were added to refs/heads/develop by this push:
     new b30afe8184 [ISSUE #9111] Support export broker RocksDB Config to json 
file (#9114)
b30afe8184 is described below

commit b30afe81840a0aacac730556432ebcdb276bbe85
Author: qianye <wuxingcan....@alibaba-inc.com>
AuthorDate: Tue Jan 14 10:29:40 2025 +0800

    [ISSUE #9111] Support export broker RocksDB Config to json file (#9114)
    
    * [ISSUE #9111] Broker Support export RocksDB Config to json file and 
enhance admin tools
---
 .../config/v1/RocksDBConsumerOffsetManager.java    |  13 +-
 .../config/v1/RocksDBSubscriptionGroupManager.java |   7 +-
 .../config/v1/RocksDBTopicConfigManager.java       |   7 +-
 .../broker/processor/AdminBrokerProcessor.java     |  54 ++++-
 .../rocketmq/client/impl/MQClientAPIImpl.java      |  16 ++
 .../rocketmq/remoting/protocol/RequestCode.java    |   1 +
 .../ExportRocksDBConfigToJsonRequestHeader.java    | 100 +++++++++
 ...ExportRocksDBConfigToJsonRequestHeaderTest.java |  51 +++++
 .../rocketmq/tools/admin/DefaultMQAdminExt.java    |   8 +
 .../tools/admin/DefaultMQAdminExtImpl.java         |   8 +
 .../apache/rocketmq/tools/admin/MQAdminExt.java    |   5 +
 .../export/ExportMetadataInRocksDBCommand.java     |  11 +-
 .../metadata/RocksDBConfigToJsonCommand.java       | 224 ++++++++++++++++++---
 13 files changed, 463 insertions(+), 42 deletions(-)

diff --git 
a/broker/src/main/java/org/apache/rocketmq/broker/config/v1/RocksDBConsumerOffsetManager.java
 
b/broker/src/main/java/org/apache/rocketmq/broker/config/v1/RocksDBConsumerOffsetManager.java
index 824fc0fee3..963c5046f2 100644
--- 
a/broker/src/main/java/org/apache/rocketmq/broker/config/v1/RocksDBConsumerOffsetManager.java
+++ 
b/broker/src/main/java/org/apache/rocketmq/broker/config/v1/RocksDBConsumerOffsetManager.java
@@ -38,7 +38,7 @@ public class RocksDBConsumerOffsetManager extends 
ConsumerOffsetManager {
 
     protected static final Logger log = 
LoggerFactory.getLogger(LoggerName.BROKER_LOGGER_NAME);
 
-    protected RocksDBConfigManager rocksDBConfigManager;
+    protected transient RocksDBConfigManager rocksDBConfigManager;
 
     public RocksDBConsumerOffsetManager(BrokerController brokerController) {
         super(brokerController);
@@ -100,7 +100,7 @@ public class RocksDBConsumerOffsetManager extends 
ConsumerOffsetManager {
             byte[] keyBytes = 
topicAtGroup.getBytes(DataConverter.CHARSET_UTF8);
             this.rocksDBConfigManager.delete(keyBytes);
         } catch (Exception e) {
-            LOG.error("kv remove consumerOffset Failed, {}", topicAtGroup);
+            log.error("kv remove consumerOffset Failed, {}", topicAtGroup);
         }
     }
 
@@ -109,7 +109,7 @@ public class RocksDBConsumerOffsetManager extends 
ConsumerOffsetManager {
         RocksDBOffsetSerializeWrapper wrapper = JSON.parseObject(body, 
RocksDBOffsetSerializeWrapper.class);
 
         this.offsetTable.put(topicAtGroup, wrapper.getOffsetTable());
-        LOG.info("load exist local offset, {}, {}", topicAtGroup, 
wrapper.getOffsetTable());
+        log.info("load exist local offset, {}, {}", topicAtGroup, 
wrapper.getOffsetTable());
     }
 
     public String rocksdbConfigFilePath() {
@@ -132,12 +132,17 @@ public class RocksDBConsumerOffsetManager extends 
ConsumerOffsetManager {
             this.rocksDBConfigManager.batchPutWithWal(writeBatch);
             this.rocksDBConfigManager.flushWAL();
         } catch (Exception e) {
-            LOG.error("consumer offset persist Failed", e);
+            log.error("consumer offset persist Failed", e);
         } finally {
             writeBatch.close();
         }
     }
 
+    public synchronized void exportToJson() {
+        log.info("RocksDBConsumerOffsetManager export consumer offset to json 
file");
+        super.persist();
+    }
+
     private void putWriteBatch(final WriteBatch writeBatch, final String 
topicGroupName, final ConcurrentMap<Integer, Long> offsetMap) throws Exception {
         byte[] keyBytes = topicGroupName.getBytes(DataConverter.CHARSET_UTF8);
         RocksDBOffsetSerializeWrapper wrapper = new 
RocksDBOffsetSerializeWrapper();
diff --git 
a/broker/src/main/java/org/apache/rocketmq/broker/config/v1/RocksDBSubscriptionGroupManager.java
 
b/broker/src/main/java/org/apache/rocketmq/broker/config/v1/RocksDBSubscriptionGroupManager.java
index 8fc7a4d6ed..ff47152569 100644
--- 
a/broker/src/main/java/org/apache/rocketmq/broker/config/v1/RocksDBSubscriptionGroupManager.java
+++ 
b/broker/src/main/java/org/apache/rocketmq/broker/config/v1/RocksDBSubscriptionGroupManager.java
@@ -37,7 +37,7 @@ import org.rocksdb.RocksIterator;
 
 public class RocksDBSubscriptionGroupManager extends SubscriptionGroupManager {
 
-    protected RocksDBConfigManager rocksDBConfigManager;
+    protected transient RocksDBConfigManager rocksDBConfigManager;
 
     public RocksDBSubscriptionGroupManager(BrokerController brokerController) {
         super(brokerController, false);
@@ -184,6 +184,11 @@ public class RocksDBSubscriptionGroupManager extends 
SubscriptionGroupManager {
         }
     }
 
+    public synchronized void exportToJson() {
+        log.info("RocksDBSubscriptionGroupManager export subscription group to 
json file");
+        super.persist();
+    }
+
     public String rocksdbConfigFilePath() {
         return 
this.brokerController.getMessageStoreConfig().getStorePathRootDir() + 
File.separator + "config" + File.separator + "subscriptionGroups" + 
File.separator;
     }
diff --git 
a/broker/src/main/java/org/apache/rocketmq/broker/config/v1/RocksDBTopicConfigManager.java
 
b/broker/src/main/java/org/apache/rocketmq/broker/config/v1/RocksDBTopicConfigManager.java
index 18e633d348..d64f808067 100644
--- 
a/broker/src/main/java/org/apache/rocketmq/broker/config/v1/RocksDBTopicConfigManager.java
+++ 
b/broker/src/main/java/org/apache/rocketmq/broker/config/v1/RocksDBTopicConfigManager.java
@@ -32,7 +32,7 @@ import org.rocksdb.CompressionType;
 
 public class RocksDBTopicConfigManager extends TopicConfigManager {
 
-    protected RocksDBConfigManager rocksDBConfigManager;
+    protected transient RocksDBConfigManager rocksDBConfigManager;
 
     public RocksDBTopicConfigManager(BrokerController brokerController) {
         super(brokerController, false);
@@ -139,6 +139,11 @@ public class RocksDBTopicConfigManager extends 
TopicConfigManager {
         }
     }
 
+    public synchronized void exportToJson() {
+        log.info("RocksDBTopicConfigManager export topic config to json file");
+        super.persist();
+    }
+
     public String rocksdbConfigFilePath() {
         return 
this.brokerController.getMessageStoreConfig().getStorePathRootDir() + 
File.separator + "config" + File.separator + "topics" + File.separator;
     }
diff --git 
a/broker/src/main/java/org/apache/rocketmq/broker/processor/AdminBrokerProcessor.java
 
b/broker/src/main/java/org/apache/rocketmq/broker/processor/AdminBrokerProcessor.java
index 6fb7584aa9..a9b913192f 100644
--- 
a/broker/src/main/java/org/apache/rocketmq/broker/processor/AdminBrokerProcessor.java
+++ 
b/broker/src/main/java/org/apache/rocketmq/broker/processor/AdminBrokerProcessor.java
@@ -35,6 +35,7 @@ import java.util.Map;
 import java.util.Properties;
 import java.util.Set;
 import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.CompletionException;
 import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.ConcurrentMap;
 import java.util.concurrent.CountDownLatch;
@@ -60,6 +61,9 @@ import org.apache.rocketmq.broker.auth.converter.AclConverter;
 import org.apache.rocketmq.broker.auth.converter.UserConverter;
 import org.apache.rocketmq.broker.client.ClientChannelInfo;
 import org.apache.rocketmq.broker.client.ConsumerGroupInfo;
+import org.apache.rocketmq.broker.config.v1.RocksDBConsumerOffsetManager;
+import org.apache.rocketmq.broker.config.v1.RocksDBSubscriptionGroupManager;
+import org.apache.rocketmq.broker.config.v1.RocksDBTopicConfigManager;
 import org.apache.rocketmq.broker.controller.ReplicasManager;
 import org.apache.rocketmq.broker.filter.ConsumerFilterData;
 import org.apache.rocketmq.broker.filter.ExpressionMessageFilter;
@@ -159,6 +163,7 @@ import 
org.apache.rocketmq.remoting.protocol.header.DeleteTopicRequestHeader;
 import org.apache.rocketmq.remoting.protocol.header.DeleteUserRequestHeader;
 import 
org.apache.rocketmq.remoting.protocol.header.ExchangeHAInfoRequestHeader;
 import 
org.apache.rocketmq.remoting.protocol.header.ExchangeHAInfoResponseHeader;
+import 
org.apache.rocketmq.remoting.protocol.header.ExportRocksDBConfigToJsonRequestHeader;
 import org.apache.rocketmq.remoting.protocol.header.GetAclRequestHeader;
 import 
org.apache.rocketmq.remoting.protocol.header.GetAllProducerInfoRequestHeader;
 import 
org.apache.rocketmq.remoting.protocol.header.GetAllTopicConfigResponseHeader;
@@ -239,7 +244,7 @@ public class AdminBrokerProcessor implements 
NettyRequestProcessor {
     private static final Logger LOGGER = 
LoggerFactory.getLogger(LoggerName.BROKER_LOGGER_NAME);
     protected final BrokerController brokerController;
     protected Set<String> configBlackList = new HashSet<>();
-    private final ExecutorService asyncExecuteWorker = new 
ThreadPoolExecutor(0, 1, 60L, TimeUnit.SECONDS, new SynchronousQueue<>());
+    private final ExecutorService asyncExecuteWorker = new 
ThreadPoolExecutor(0, 4, 60L, TimeUnit.SECONDS, new SynchronousQueue<>());
 
     public AdminBrokerProcessor(final BrokerController brokerController) {
         this.brokerController = brokerController;
@@ -356,6 +361,8 @@ public class AdminBrokerProcessor implements 
NettyRequestProcessor {
                 return queryConsumeQueue(ctx, request);
             case RequestCode.CHECK_ROCKSDB_CQ_WRITE_PROGRESS:
                 return this.checkRocksdbCqWriteProgress(ctx, request);
+            case RequestCode.EXPORT_ROCKSDB_CONFIG_TO_JSON:
+                return this.exportRocksDBConfigToJson(ctx, request);
             case RequestCode.UPDATE_AND_GET_GROUP_FORBIDDEN:
                 return this.updateAndGetGroupForbidden(ctx, request);
             case RequestCode.GET_SUBSCRIPTIONGROUP_CONFIG:
@@ -495,6 +502,51 @@ public class AdminBrokerProcessor implements 
NettyRequestProcessor {
         return response;
     }
 
+    private RemotingCommand exportRocksDBConfigToJson(ChannelHandlerContext 
ctx,
+        RemotingCommand request) throws RemotingCommandException {
+        ExportRocksDBConfigToJsonRequestHeader requestHeader = 
request.decodeCommandCustomHeader(ExportRocksDBConfigToJsonRequestHeader.class);
+        List<ExportRocksDBConfigToJsonRequestHeader.ConfigType> configTypes = 
requestHeader.fetchConfigType();
+        List<CompletableFuture<Void>> futureList = new 
ArrayList<>(configTypes.size());
+        for (ExportRocksDBConfigToJsonRequestHeader.ConfigType type : 
configTypes) {
+            switch (type) {
+                case TOPICS:
+                    if (this.brokerController.getTopicConfigManager() 
instanceof RocksDBTopicConfigManager) {
+                        RocksDBTopicConfigManager rocksDBTopicConfigManager = 
(RocksDBTopicConfigManager) this.brokerController.getTopicConfigManager();
+                        
futureList.add(CompletableFuture.runAsync(rocksDBTopicConfigManager::exportToJson,
 asyncExecuteWorker));
+                    }
+                    break;
+                case SUBSCRIPTION_GROUPS:
+                    if (this.brokerController.getSubscriptionGroupManager() 
instanceof RocksDBSubscriptionGroupManager) {
+                        RocksDBSubscriptionGroupManager 
rocksDBSubscriptionGroupManager = (RocksDBSubscriptionGroupManager) 
this.brokerController.getSubscriptionGroupManager();
+                        
futureList.add(CompletableFuture.runAsync(rocksDBSubscriptionGroupManager::exportToJson,
 asyncExecuteWorker));
+                    }
+                    break;
+                case CONSUMER_OFFSETS:
+                    if (this.brokerController.getConsumerOffsetManager() 
instanceof RocksDBConsumerOffsetManager) {
+                        RocksDBConsumerOffsetManager 
rocksDBConsumerOffsetManager = (RocksDBConsumerOffsetManager) 
this.brokerController.getConsumerOffsetManager();
+                        
futureList.add(CompletableFuture.runAsync(rocksDBConsumerOffsetManager::exportToJson,
 asyncExecuteWorker));
+                    }
+                    break;
+                default:
+                    break;
+            }
+        }
+
+        try {
+            CompletableFuture.allOf(futureList.toArray(new 
CompletableFuture[0])).join();
+        } catch (CompletionException e) {
+            RemotingCommand response = 
RemotingCommand.createResponseCommand(null);
+            response.setCode(ResponseCode.SYSTEM_ERROR);
+            response.setRemark(String.valueOf(e));
+            return response;
+        }
+
+        RemotingCommand response = RemotingCommand.createResponseCommand(null);
+        response.setCode(ResponseCode.SUCCESS);
+        response.setRemark("export done.");
+        return response;
+    }
+
     @Override
     public boolean rejectRequest() {
         return false;
diff --git 
a/client/src/main/java/org/apache/rocketmq/client/impl/MQClientAPIImpl.java 
b/client/src/main/java/org/apache/rocketmq/client/impl/MQClientAPIImpl.java
index 7d4b51cfc5..114093e350 100644
--- a/client/src/main/java/org/apache/rocketmq/client/impl/MQClientAPIImpl.java
+++ b/client/src/main/java/org/apache/rocketmq/client/impl/MQClientAPIImpl.java
@@ -164,6 +164,7 @@ import 
org.apache.rocketmq.remoting.protocol.header.DeleteSubscriptionGroupReque
 import org.apache.rocketmq.remoting.protocol.header.DeleteTopicRequestHeader;
 import org.apache.rocketmq.remoting.protocol.header.DeleteUserRequestHeader;
 import 
org.apache.rocketmq.remoting.protocol.header.EndTransactionRequestHeader;
+import 
org.apache.rocketmq.remoting.protocol.header.ExportRocksDBConfigToJsonRequestHeader;
 import org.apache.rocketmq.remoting.protocol.header.ExtraInfoUtil;
 import org.apache.rocketmq.remoting.protocol.header.GetAclRequestHeader;
 import 
org.apache.rocketmq.remoting.protocol.header.GetAllProducerInfoRequestHeader;
@@ -3036,6 +3037,21 @@ public class MQClientAPIImpl implements 
NameServerUpdateCallback, StartAndShutdo
         throw new MQClientException(response.getCode(), response.getRemark());
     }
 
+    public void exportRocksDBConfigToJson(final String brokerAddr,
+        final List<ExportRocksDBConfigToJsonRequestHeader.ConfigType> 
configType,
+        final long timeoutMillis) throws InterruptedException,
+        RemotingTimeoutException, RemotingSendRequestException, 
RemotingConnectException, MQClientException {
+        ExportRocksDBConfigToJsonRequestHeader header = new 
ExportRocksDBConfigToJsonRequestHeader();
+        header.updateConfigType(configType);
+        RemotingCommand request = 
RemotingCommand.createRequestCommand(RequestCode.EXPORT_ROCKSDB_CONFIG_TO_JSON, 
header);
+        RemotingCommand response = this.remotingClient.invokeSync(brokerAddr, 
request, timeoutMillis);
+        assert response != null;
+
+        if (ResponseCode.SUCCESS != response.getCode()) {
+            throw new MQClientException(response.getCode(), 
response.getRemark());
+        }
+    }
+
     public void checkClientInBroker(final String brokerAddr, final String 
consumerGroup,
         final String clientId, final SubscriptionData subscriptionData,
         final long timeoutMillis)
diff --git 
a/remoting/src/main/java/org/apache/rocketmq/remoting/protocol/RequestCode.java 
b/remoting/src/main/java/org/apache/rocketmq/remoting/protocol/RequestCode.java
index 623f5748d5..e3b180a537 100644
--- 
a/remoting/src/main/java/org/apache/rocketmq/remoting/protocol/RequestCode.java
+++ 
b/remoting/src/main/java/org/apache/rocketmq/remoting/protocol/RequestCode.java
@@ -219,6 +219,7 @@ public class RequestCode {
     public static final int GET_SUBSCRIPTIONGROUP_CONFIG = 352;
     public static final int UPDATE_AND_GET_GROUP_FORBIDDEN = 353;
     public static final int CHECK_ROCKSDB_CQ_WRITE_PROGRESS = 354;
+    public static final int EXPORT_ROCKSDB_CONFIG_TO_JSON = 355;
 
     public static final int LITE_PULL_MESSAGE = 361;
     public static final int RECALL_MESSAGE = 370;
diff --git 
a/remoting/src/main/java/org/apache/rocketmq/remoting/protocol/header/ExportRocksDBConfigToJsonRequestHeader.java
 
b/remoting/src/main/java/org/apache/rocketmq/remoting/protocol/header/ExportRocksDBConfigToJsonRequestHeader.java
new file mode 100644
index 0000000000..7b1f9470e1
--- /dev/null
+++ 
b/remoting/src/main/java/org/apache/rocketmq/remoting/protocol/header/ExportRocksDBConfigToJsonRequestHeader.java
@@ -0,0 +1,100 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.rocketmq.remoting.protocol.header;
+
+import java.util.ArrayList;
+import java.util.List;
+import org.apache.commons.lang3.StringUtils;
+import org.apache.rocketmq.common.action.Action;
+import org.apache.rocketmq.common.action.RocketMQAction;
+import org.apache.rocketmq.remoting.CommandCustomHeader;
+import org.apache.rocketmq.remoting.annotation.CFNotNull;
+import org.apache.rocketmq.remoting.exception.RemotingCommandException;
+import org.apache.rocketmq.remoting.protocol.RequestCode;
+
+@RocketMQAction(value = RequestCode.EXPORT_ROCKSDB_CONFIG_TO_JSON, action = 
Action.GET)
+public class ExportRocksDBConfigToJsonRequestHeader implements 
CommandCustomHeader {
+    private static final String CONFIG_TYPE_SEPARATOR = ";";
+
+    public enum ConfigType {
+        TOPICS("topics"),
+        SUBSCRIPTION_GROUPS("subscriptionGroups"),
+        CONSUMER_OFFSETS("consumerOffsets");
+
+        private final String typeName;
+
+        ConfigType(String typeName) {
+            this.typeName = typeName;
+        }
+
+        public static ConfigType getConfigTypeByName(String typeName) {
+            for (ConfigType configType : ConfigType.values()) {
+                if 
(configType.getTypeName().equalsIgnoreCase(typeName.trim())) {
+                    return configType;
+                }
+            }
+            throw new IllegalArgumentException("Unknown config type: " + 
typeName);
+        }
+
+        public static List<ConfigType> fromString(String ordinal) {
+            String[] configTypeNames = StringUtils.split(ordinal, 
CONFIG_TYPE_SEPARATOR);
+            List<ConfigType> configTypes = new ArrayList<>();
+            for (String configTypeName : configTypeNames) {
+                if (StringUtils.isNotEmpty(configTypeName)) {
+                    configTypes.add(getConfigTypeByName(configTypeName));
+                }
+            }
+            return configTypes;
+        }
+
+        public static String toString(List<ConfigType> configTypes) {
+            StringBuilder sb = new StringBuilder();
+            for (ConfigType configType : configTypes) {
+                
sb.append(configType.getTypeName()).append(CONFIG_TYPE_SEPARATOR);
+            }
+            return sb.toString();
+        }
+
+        public String getTypeName() {
+            return typeName;
+        }
+    }
+
+    @CFNotNull
+    private String configType;
+
+    @Override
+    public void checkFields() throws RemotingCommandException {
+
+    }
+
+    public List<ConfigType> fetchConfigType() {
+        return ConfigType.fromString(configType);
+    }
+
+    public void updateConfigType(List<ConfigType> configType) {
+        this.configType = ConfigType.toString(configType);
+    }
+
+    public String getConfigType() {
+        return configType;
+    }
+
+    public void setConfigType(String configType) {
+        this.configType = configType;
+    }
+}
\ No newline at end of file
diff --git 
a/remoting/src/test/java/org/apache/rocketmq/remoting/protocol/header/ExportRocksDBConfigToJsonRequestHeaderTest.java
 
b/remoting/src/test/java/org/apache/rocketmq/remoting/protocol/header/ExportRocksDBConfigToJsonRequestHeaderTest.java
new file mode 100644
index 0000000000..bbe625a42a
--- /dev/null
+++ 
b/remoting/src/test/java/org/apache/rocketmq/remoting/protocol/header/ExportRocksDBConfigToJsonRequestHeaderTest.java
@@ -0,0 +1,51 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.rocketmq.remoting.protocol.header;
+
+import java.util.ArrayList;
+import java.util.List;
+import org.junit.Assert;
+import org.junit.Test;
+
+public class ExportRocksDBConfigToJsonRequestHeaderTest {
+    @Test
+    public void configTypeTest() {
+        List<ExportRocksDBConfigToJsonRequestHeader.ConfigType> configTypes = 
new ArrayList<>();
+        
configTypes.add(ExportRocksDBConfigToJsonRequestHeader.ConfigType.TOPICS);
+        
configTypes.add(ExportRocksDBConfigToJsonRequestHeader.ConfigType.SUBSCRIPTION_GROUPS);
+
+        String string = 
ExportRocksDBConfigToJsonRequestHeader.ConfigType.toString(configTypes);
+
+        List<ExportRocksDBConfigToJsonRequestHeader.ConfigType> newConfigTypes 
= ExportRocksDBConfigToJsonRequestHeader.ConfigType.fromString(string);
+        assert newConfigTypes.size() == 2;
+        assert configTypes.equals(newConfigTypes);
+
+        List<ExportRocksDBConfigToJsonRequestHeader.ConfigType> topics = 
ExportRocksDBConfigToJsonRequestHeader.ConfigType.fromString("topics");
+        assert topics.size() == 1;
+        assert 
topics.get(0).equals(ExportRocksDBConfigToJsonRequestHeader.ConfigType.TOPICS);
+
+        List<ExportRocksDBConfigToJsonRequestHeader.ConfigType> mix = 
ExportRocksDBConfigToJsonRequestHeader.ConfigType.fromString("toPics; 
subScriptiongroups");
+        assert mix.size() == 2;
+        assert 
mix.get(0).equals(ExportRocksDBConfigToJsonRequestHeader.ConfigType.TOPICS);
+        assert 
mix.get(1).equals(ExportRocksDBConfigToJsonRequestHeader.ConfigType.SUBSCRIPTION_GROUPS);
+
+        Assert.assertThrows(IllegalArgumentException.class, () -> {
+            
ExportRocksDBConfigToJsonRequestHeader.ConfigType.fromString("topics; 
subscription");
+        });
+
+    }
+}
diff --git 
a/tools/src/main/java/org/apache/rocketmq/tools/admin/DefaultMQAdminExt.java 
b/tools/src/main/java/org/apache/rocketmq/tools/admin/DefaultMQAdminExt.java
index 4b97e14866..f224f749cb 100644
--- a/tools/src/main/java/org/apache/rocketmq/tools/admin/DefaultMQAdminExt.java
+++ b/tools/src/main/java/org/apache/rocketmq/tools/admin/DefaultMQAdminExt.java
@@ -65,6 +65,7 @@ import 
org.apache.rocketmq.remoting.protocol.body.SubscriptionGroupWrapper;
 import org.apache.rocketmq.remoting.protocol.body.TopicConfigSerializeWrapper;
 import org.apache.rocketmq.remoting.protocol.body.TopicList;
 import org.apache.rocketmq.remoting.protocol.body.UserInfo;
+import 
org.apache.rocketmq.remoting.protocol.header.ExportRocksDBConfigToJsonRequestHeader;
 import 
org.apache.rocketmq.remoting.protocol.header.controller.ElectMasterResponseHeader;
 import 
org.apache.rocketmq.remoting.protocol.header.controller.GetMetaDataResponseHeader;
 import org.apache.rocketmq.remoting.protocol.heartbeat.SubscriptionData;
@@ -778,6 +779,13 @@ public class DefaultMQAdminExt extends ClientConfig 
implements MQAdminExt {
         return 
this.defaultMQAdminExtImpl.checkRocksdbCqWriteProgress(brokerAddr, topic, 
checkStoreTime);
     }
 
+    @Override
+    public void exportRocksDBConfigToJson(String brokerAddr,
+        List<ExportRocksDBConfigToJsonRequestHeader.ConfigType> configType)
+        throws InterruptedException, RemotingTimeoutException, 
RemotingSendRequestException, RemotingConnectException, MQClientException {
+        this.defaultMQAdminExtImpl.exportRocksDBConfigToJson(brokerAddr, 
configType);
+    }
+
     @Override
     public boolean resumeCheckHalfMessage(String topic,
         String msgId)
diff --git 
a/tools/src/main/java/org/apache/rocketmq/tools/admin/DefaultMQAdminExtImpl.java
 
b/tools/src/main/java/org/apache/rocketmq/tools/admin/DefaultMQAdminExtImpl.java
index 2523013af0..5be99606dc 100644
--- 
a/tools/src/main/java/org/apache/rocketmq/tools/admin/DefaultMQAdminExtImpl.java
+++ 
b/tools/src/main/java/org/apache/rocketmq/tools/admin/DefaultMQAdminExtImpl.java
@@ -103,6 +103,7 @@ import 
org.apache.rocketmq.remoting.protocol.body.SubscriptionGroupWrapper;
 import org.apache.rocketmq.remoting.protocol.body.TopicConfigSerializeWrapper;
 import org.apache.rocketmq.remoting.protocol.body.TopicList;
 import org.apache.rocketmq.remoting.protocol.body.UserInfo;
+import 
org.apache.rocketmq.remoting.protocol.header.ExportRocksDBConfigToJsonRequestHeader;
 import 
org.apache.rocketmq.remoting.protocol.header.UpdateConsumerOffsetRequestHeader;
 import 
org.apache.rocketmq.remoting.protocol.header.UpdateGroupForbiddenRequestHeader;
 import 
org.apache.rocketmq.remoting.protocol.header.controller.ElectMasterResponseHeader;
@@ -1824,6 +1825,13 @@ public class DefaultMQAdminExtImpl implements 
MQAdminExt, MQAdminExtInner {
         return 
this.mqClientInstance.getMQClientAPIImpl().checkRocksdbCqWriteProgress(brokerAddr,
 topic, checkStoreTime, timeoutMillis);
     }
 
+    @Override
+    public void exportRocksDBConfigToJson(String brokerAddr,
+        List<ExportRocksDBConfigToJsonRequestHeader.ConfigType> configType)
+        throws InterruptedException, RemotingTimeoutException, 
RemotingSendRequestException, RemotingConnectException, MQClientException {
+        
this.mqClientInstance.getMQClientAPIImpl().exportRocksDBConfigToJson(brokerAddr,
 configType, timeoutMillis);
+    }
+
     @Override
     public boolean resumeCheckHalfMessage(final String topic,
         final String msgId) throws RemotingException, MQClientException, 
InterruptedException, MQBrokerException {
diff --git 
a/tools/src/main/java/org/apache/rocketmq/tools/admin/MQAdminExt.java 
b/tools/src/main/java/org/apache/rocketmq/tools/admin/MQAdminExt.java
index 69a0821864..2f01b6cba8 100644
--- a/tools/src/main/java/org/apache/rocketmq/tools/admin/MQAdminExt.java
+++ b/tools/src/main/java/org/apache/rocketmq/tools/admin/MQAdminExt.java
@@ -61,6 +61,7 @@ import 
org.apache.rocketmq.remoting.protocol.body.SubscriptionGroupWrapper;
 import org.apache.rocketmq.remoting.protocol.body.TopicConfigSerializeWrapper;
 import org.apache.rocketmq.remoting.protocol.body.TopicList;
 import org.apache.rocketmq.remoting.protocol.body.UserInfo;
+import 
org.apache.rocketmq.remoting.protocol.header.ExportRocksDBConfigToJsonRequestHeader;
 import 
org.apache.rocketmq.remoting.protocol.header.controller.ElectMasterResponseHeader;
 import 
org.apache.rocketmq.remoting.protocol.header.controller.GetMetaDataResponseHeader;
 import org.apache.rocketmq.remoting.protocol.heartbeat.SubscriptionData;
@@ -392,6 +393,10 @@ public interface MQAdminExt extends MQAdmin {
         final long index, final int count, final String consumerGroup)
         throws InterruptedException, RemotingTimeoutException, 
RemotingSendRequestException, RemotingConnectException, MQClientException;
 
+    void exportRocksDBConfigToJson(String brokerAddr,
+        List<ExportRocksDBConfigToJsonRequestHeader.ConfigType> configType)
+        throws InterruptedException, RemotingTimeoutException, 
RemotingSendRequestException, RemotingConnectException, MQClientException;
+
     boolean resumeCheckHalfMessage(final String topic,
         final String msgId) throws RemotingException, MQClientException, 
InterruptedException, MQBrokerException;
 
diff --git 
a/tools/src/main/java/org/apache/rocketmq/tools/command/export/ExportMetadataInRocksDBCommand.java
 
b/tools/src/main/java/org/apache/rocketmq/tools/command/export/ExportMetadataInRocksDBCommand.java
index d5726985e3..438d17d668 100644
--- 
a/tools/src/main/java/org/apache/rocketmq/tools/command/export/ExportMetadataInRocksDBCommand.java
+++ 
b/tools/src/main/java/org/apache/rocketmq/tools/command/export/ExportMetadataInRocksDBCommand.java
@@ -18,6 +18,10 @@
 package org.apache.rocketmq.tools.command.export;
 
 import com.alibaba.fastjson.JSONObject;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.concurrent.atomic.AtomicLong;
+import java.util.function.BiConsumer;
 import org.apache.commons.cli.CommandLine;
 import org.apache.commons.cli.Option;
 import org.apache.commons.cli.Options;
@@ -30,11 +34,6 @@ import org.apache.rocketmq.tools.command.SubCommand;
 import org.apache.rocketmq.tools.command.SubCommandException;
 import org.rocksdb.RocksIterator;
 
-import java.util.HashMap;
-import java.util.Map;
-import java.util.concurrent.atomic.AtomicLong;
-import java.util.function.BiConsumer;
-
 public class ExportMetadataInRocksDBCommand implements SubCommand {
     private static final String TOPICS_JSON_CONFIG = "topics";
     private static final String SUBSCRIPTION_GROUP_JSON_CONFIG = 
"subscriptionGroups";
@@ -46,7 +45,7 @@ public class ExportMetadataInRocksDBCommand implements 
SubCommand {
 
     @Override
     public String commandDesc() {
-        return "export RocksDB kv config (topics/subscriptionGroups)";
+        return "export RocksDB kv config (topics/subscriptionGroups). 
Recommend to use [mqadmin rocksDBConfigToJson]";
     }
 
     @Override
diff --git 
a/tools/src/main/java/org/apache/rocketmq/tools/command/metadata/RocksDBConfigToJsonCommand.java
 
b/tools/src/main/java/org/apache/rocketmq/tools/command/metadata/RocksDBConfigToJsonCommand.java
index f2803b0cbb..48bc163678 100644
--- 
a/tools/src/main/java/org/apache/rocketmq/tools/command/metadata/RocksDBConfigToJsonCommand.java
+++ 
b/tools/src/main/java/org/apache/rocketmq/tools/command/metadata/RocksDBConfigToJsonCommand.java
@@ -18,27 +18,38 @@
 package org.apache.rocketmq.tools.command.metadata;
 
 import com.alibaba.fastjson.JSONObject;
+import java.io.File;
+import java.io.IOException;
+import java.nio.file.Paths;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Objects;
+import java.util.Set;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.CompletionException;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ConcurrentMap;
+import java.util.concurrent.atomic.AtomicLong;
 import org.apache.commons.cli.CommandLine;
 import org.apache.commons.cli.Option;
 import org.apache.commons.cli.Options;
 import org.apache.commons.lang3.StringUtils;
+import org.apache.rocketmq.common.MixAll;
 import org.apache.rocketmq.common.config.ConfigRocksDBStorage;
 import org.apache.rocketmq.common.utils.DataConverter;
 import org.apache.rocketmq.remoting.RPCHook;
+import org.apache.rocketmq.remoting.protocol.body.ClusterInfo;
+import 
org.apache.rocketmq.remoting.protocol.header.ExportRocksDBConfigToJsonRequestHeader;
+import org.apache.rocketmq.remoting.protocol.route.BrokerData;
+import org.apache.rocketmq.tools.admin.DefaultMQAdminExt;
 import org.apache.rocketmq.tools.command.SubCommand;
 import org.apache.rocketmq.tools.command.SubCommandException;
 import org.rocksdb.RocksIterator;
 
-import java.io.File;
-import java.util.HashMap;
-import java.util.Map;
-import java.util.concurrent.ConcurrentHashMap;
-import java.util.concurrent.ConcurrentMap;
-
 public class RocksDBConfigToJsonCommand implements SubCommand {
-    private static final String TOPICS_JSON_CONFIG = "topics";
-    private static final String SUBSCRIPTION_GROUP_JSON_CONFIG = 
"subscriptionGroups";
-    private static final String CONSUMER_OFFSETS_JSON_CONFIG = 
"consumerOffsets";
 
     @Override
     public String commandName() {
@@ -47,41 +58,140 @@ public class RocksDBConfigToJsonCommand implements 
SubCommand {
 
     @Override
     public String commandDesc() {
-        return "Convert RocksDB kv config 
(topics/subscriptionGroups/consumerOffsets) to json";
+        return "Convert RocksDB kv config 
(topics/subscriptionGroups/consumerOffsets) to json. " +
+            "[rpc mode] Use [-n, -c, -b, -t] to send Request to broker ( 
version >= 5.3.2 ) or [local mode] use [-p, -t, -j, -e] to load RocksDB. " +
+            "If -e is provided, tools will export json file instead of std 
print";
     }
 
     @Override
     public Options buildCommandlineOptions(Options options) {
+        Option configTypeOption = new Option("t", "configType", true, "Name of 
kv config, e.g. " +
+            "topics/subscriptionGroups/consumerOffsets. Required in local mode 
and default all in rpc mode.");
+        options.addOption(configTypeOption);
+
+        // [local mode] options
         Option pathOption = new Option("p", "configPath", true,
-                "Absolute path to the metadata config directory");
-        pathOption.setRequired(true);
+            "[local mode] Absolute path to the metadata config directory");
         options.addOption(pathOption);
 
-        Option configTypeOption = new Option("t", "configType", true, "Name of 
kv config, e.g. " +
-                "topics/subscriptionGroups/consumerOffsets");
-        configTypeOption.setRequired(true);
-        options.addOption(configTypeOption);
+        Option exportPathOption = new Option("e", "exportFile", true,
+            "[local mode] Absolute file path for exporting, auto backup 
existing file, not directory. If exportFile is provided, will export Json file 
and ignore [-j].");
+        options.addOption(exportPathOption);
+
+        Option jsonEnableOption = new Option("j", "jsonEnable", true,
+            "[local mode] Json format enable, Default: true. If exportFile is 
provided, will export Json file and ignore [-j].");
+        options.addOption(jsonEnableOption);
+
+        // [rpc mode] options
+        Option nameserverOption = new Option("n", "nameserverAddr", true,
+            "[rpc mode] nameserverAddr. If nameserverAddr and clusterName are 
provided, will ignore [-p, -e, -j, -b] args");
+        options.addOption(nameserverOption);
+
+        Option clusterOption = new Option("c", "cluster", true,
+            "[rpc mode] Cluster name. If nameserverAddr and clusterName are 
provided, will ignore [-p, -e, -j, -b] args");
+        options.addOption(clusterOption);
+
+        Option brokerAddrOption = new Option("b", "brokerAddr", true,
+            "[rpc mode] Broker address. If brokerAddr is provided, will ignore 
[-p, -e, -j] args");
+        options.addOption(brokerAddrOption);
 
         return options;
     }
 
     @Override
     public void execute(CommandLine commandLine, Options options, RPCHook 
rpcHook) throws SubCommandException {
+        List<ExportRocksDBConfigToJsonRequestHeader.ConfigType> typeList = 
getConfigTypeList(commandLine);
+
+        if (commandLine.hasOption("nameserverAddr")) {
+            // [rpc mode] call all brokers in cluster to export to json file
+            System.out.print("Use [rpc mode] call all brokers in cluster to 
export to json file \n");
+            checkRequiredArgsProvided(commandLine, "rpc mode", "cluster");
+            handleRpcMode(commandLine, rpcHook, typeList);
+        } else if (commandLine.hasOption("brokerAddr")) {
+            // [rpc mode] call broker to export to json file
+            System.out.print("Use [rpc mode] call broker to export to json 
file \n");
+            handleRpcMode(commandLine, rpcHook, typeList);
+        } else if (commandLine.hasOption("configPath")) {
+            // [local mode] load rocksdb to print or export file
+            System.out.print("Use [local mode] load rocksdb to print or export 
file \n");
+            checkRequiredArgsProvided(commandLine, "local mode", "configType");
+            handleLocalMode(commandLine);
+        } else {
+            System.out.print(commandDesc() + "\n");
+        }
+    }
+
+    private void handleLocalMode(CommandLine commandLine) {
+        ExportRocksDBConfigToJsonRequestHeader.ConfigType type = 
Objects.requireNonNull(getConfigTypeList(commandLine)).get(0);
         String path = commandLine.getOptionValue("configPath").trim();
         if (StringUtils.isEmpty(path) || !new File(path).exists()) {
             System.out.print("Rocksdb path is invalid.\n");
             return;
         }
+        path = Paths.get(path, type.toString()).toString();
+        String exportFile = commandLine.hasOption("exportFile") ? 
commandLine.getOptionValue("exportFile").trim() : null;
+        Map<String, JSONObject> configMap = getConfigMapFromRocksDB(path, 
type);
+        if (configMap != null) {
+            if (exportFile == null) {
+                if (commandLine.hasOption("jsonEnable") && 
"false".equalsIgnoreCase(commandLine.getOptionValue("jsonEnable").trim())) {
+                    printConfigMapJsonDisable(configMap);
+                } else {
+                    System.out.print(JSONObject.toJSONString(configMap, true) 
+ "\n");
+                }
+            } else {
+                String jsonString = JSONObject.toJSONString(configMap, true);
+                try {
+                    MixAll.string2File(jsonString, exportFile);
+                } catch (IOException e) {
+                    System.out.print("persist file " + exportFile + " 
exception" + e);
+                }
+            }
+        }
+    }
 
-        String configType = commandLine.getOptionValue("configType").trim();
-        if (!path.endsWith("/")) {
-            path += "/";
+    private void checkRequiredArgsProvided(CommandLine commandLine, String 
mode,
+        String... args) throws SubCommandException {
+        for (String arg : args) {
+            if (!commandLine.hasOption(arg)) {
+                System.out.printf("%s Invalid args, please input %s\n", mode, 
String.join(",", args));
+                throw new SubCommandException("Invalid args");
+            }
         }
-        path += configType;
-        if (CONSUMER_OFFSETS_JSON_CONFIG.equalsIgnoreCase(configType)) {
-            printConsumerOffsets(path);
-            return;
+    }
+
+    private List<ExportRocksDBConfigToJsonRequestHeader.ConfigType> 
getConfigTypeList(CommandLine commandLine) {
+        List<ExportRocksDBConfigToJsonRequestHeader.ConfigType> typeList = new 
ArrayList<>();
+        if (commandLine.hasOption("configType")) {
+            String configType = 
commandLine.getOptionValue("configType").trim();
+            try {
+                
typeList.addAll(ExportRocksDBConfigToJsonRequestHeader.ConfigType.fromString(configType));
+            } catch (IllegalArgumentException e) {
+                System.out.print("Invalid configType: " + configType + " 
please input topics/subscriptionGroups/consumerOffsets \n");
+                return null;
+            }
+        } else {
+            
typeList.addAll(Arrays.asList(ExportRocksDBConfigToJsonRequestHeader.ConfigType.values()));
         }
+        return typeList;
+    }
+
+    private static void printConfigMapJsonDisable(Map<String, JSONObject> 
configMap) {
+        AtomicLong count = new AtomicLong(0);
+        for (Map.Entry<String, JSONObject> entry : configMap.entrySet()) {
+            String configKey = entry.getKey();
+            System.out.printf("type: %s", configKey);
+            JSONObject jsonObject = entry.getValue();
+            jsonObject.forEach((k, v) -> System.out.printf("%d, Key: %s, 
Value: %s%n", count.incrementAndGet(), k, v));
+        }
+    }
+
+    private static Map<String, JSONObject> getConfigMapFromRocksDB(String path,
+        ExportRocksDBConfigToJsonRequestHeader.ConfigType configType) {
+
+        if 
(ExportRocksDBConfigToJsonRequestHeader.ConfigType.CONSUMER_OFFSETS.equals(configType))
 {
+            return loadConsumerOffsets(path);
+        }
+
         ConfigRocksDBStorage configRocksDBStorage = new 
ConfigRocksDBStorage(path, true);
         configRocksDBStorage.start();
         RocksIterator iterator = configRocksDBStorage.iterator();
@@ -101,24 +211,79 @@ public class RocksDBConfigToJsonCommand implements 
SubCommand {
             byte[] kvDataVersion = configRocksDBStorage.getKvDataVersion();
             if (kvDataVersion != null) {
                 configMap.put("dataVersion",
-                        JSONObject.parseObject(new String(kvDataVersion, 
DataConverter.CHARSET_UTF8)));
+                    JSONObject.parseObject(new String(kvDataVersion, 
DataConverter.CHARSET_UTF8)));
             }
 
-            if (TOPICS_JSON_CONFIG.equalsIgnoreCase(configType)) {
+            if 
(ExportRocksDBConfigToJsonRequestHeader.ConfigType.TOPICS.equals(configType)) {
                 configMap.put("topicConfigTable", configTable);
             }
-            if (SUBSCRIPTION_GROUP_JSON_CONFIG.equalsIgnoreCase(configType)) {
+            if 
(ExportRocksDBConfigToJsonRequestHeader.ConfigType.SUBSCRIPTION_GROUPS.equals(configType))
 {
                 configMap.put("subscriptionGroupTable", configTable);
             }
-            System.out.print(JSONObject.toJSONString(configMap, true) + "\n");
+            return configMap;
         } catch (Exception e) {
             System.out.print("Error occurred while converting RocksDB kv 
config to json, " + "configType=" + configType + ", " + e.getMessage() + "\n");
         } finally {
             configRocksDBStorage.shutdown();
         }
+        return null;
+    }
+
+    private void handleRpcMode(CommandLine commandLine, RPCHook rpcHook,
+        List<ExportRocksDBConfigToJsonRequestHeader.ConfigType> type) {
+        String nameserverAddr = commandLine.hasOption('n') ? 
commandLine.getOptionValue("nameserverAddr").trim() : null;
+        String inputBrokerAddr = commandLine.hasOption('b') ? 
commandLine.getOptionValue('b').trim() : null;
+        String clusterName = commandLine.hasOption('c') ? 
commandLine.getOptionValue('c').trim() : null;
+
+        DefaultMQAdminExt defaultMQAdminExt = new DefaultMQAdminExt(rpcHook, 
30 * 1000);
+        
defaultMQAdminExt.setInstanceName(Long.toString(System.currentTimeMillis()));
+        defaultMQAdminExt.setNamesrvAddr(nameserverAddr);
+
+        List<CompletableFuture<Void>> futureList = new ArrayList<>();
+
+        try {
+            defaultMQAdminExt.start();
+            if (clusterName != null) {
+                ClusterInfo clusterInfo = 
defaultMQAdminExt.examineBrokerClusterInfo();
+                Map<String, Set<String>> clusterAddrTable = 
clusterInfo.getClusterAddrTable();
+                Map<String, BrokerData> brokerAddrTable = 
clusterInfo.getBrokerAddrTable();
+                if (clusterAddrTable.get(clusterName) == null) {
+                    System.out.print("clusterAddrTable is empty");
+                    return;
+                }
+                for (Map.Entry<String, BrokerData> entry : 
brokerAddrTable.entrySet()) {
+                    String brokerName = entry.getKey();
+                    BrokerData brokerData = entry.getValue();
+                    String brokerAddr = brokerData.getBrokerAddrs().get(0L);
+                    futureList.add(sendRequest(type, defaultMQAdminExt, 
brokerAddr, brokerName));
+                }
+            } else if (inputBrokerAddr != null) {
+                futureList.add(sendRequest(type, defaultMQAdminExt, 
inputBrokerAddr, null));
+            }
+            CompletableFuture.allOf(futureList.toArray(new 
CompletableFuture[0])).whenComplete(
+                (v, t) -> System.out.print("broker export done.")
+            ).join();
+        } catch (Exception e) {
+            throw new RuntimeException(this.getClass().getSimpleName() + " 
command failed", e);
+        } finally {
+            defaultMQAdminExt.shutdown();
+        }
+    }
+
+    private CompletableFuture<Void> 
sendRequest(List<ExportRocksDBConfigToJsonRequestHeader.ConfigType> type,
+        DefaultMQAdminExt defaultMQAdminExt, String brokerAddr, String 
brokerName) {
+        return CompletableFuture.supplyAsync(() -> {
+            try {
+                defaultMQAdminExt.exportRocksDBConfigToJson(brokerAddr, type);
+            } catch (Throwable t) {
+                System.out.print((brokerName != null) ? brokerName : 
brokerAddr + " export error");
+                throw new CompletionException(this.getClass().getSimpleName() 
+ " command failed", t);
+            }
+            return null;
+        });
     }
 
-    private void printConsumerOffsets(String path) {
+    private static Map<String, JSONObject> loadConsumerOffsets(String path) {
         ConfigRocksDBStorage configRocksDBStorage = new 
ConfigRocksDBStorage(path, true);
         configRocksDBStorage.start();
         RocksIterator iterator = configRocksDBStorage.iterator();
@@ -136,12 +301,13 @@ public class RocksDBConfigToJsonCommand implements 
SubCommand {
                 iterator.next();
             }
             configMap.put("offsetTable", configTable);
-            System.out.print(JSONObject.toJSONString(configMap, true) + "\n");
+            return configMap;
         } catch (Exception e) {
             System.out.print("Error occurred while converting RocksDB kv 
config to json, " + "configType=consumerOffsets, " + e.getMessage() + "\n");
         } finally {
             configRocksDBStorage.shutdown();
         }
+        return null;
     }
 
     static class RocksDBOffsetSerializeWrapper {


Reply via email to