This is an automated email from the ASF dual-hosted git repository.

lizhimin pushed a commit to branch develop
in repository https://gitbox.apache.org/repos/asf/rocketmq.git


The following commit(s) were added to refs/heads/develop by this push:
     new 525f877f3b [ISSUE #8589] Support file format CQ and json format offset 
in-place upgrade to rocksdb management (#8600)
525f877f3b is described below

commit 525f877f3bddced2d85d99520fd600bcbbfe3c6d
Author: LetLetMe <43874697+letle...@users.noreply.github.com>
AuthorDate: Mon Sep 23 19:24:15 2024 +0800

    [ISSUE #8589] Support file format CQ and json format offset in-place 
upgrade to rocksdb management (#8600)
---
 .../apache/rocketmq/broker/BrokerController.java   |   6 +-
 .../broker/offset/ConsumerOffsetManager.java       |  20 +++
 .../offset/RocksDBConsumerOffsetManager.java       |  77 +++++++++--
 .../broker/processor/AdminBrokerProcessor.java     |  92 +++++++++++-
 .../RocksDBSubscriptionGroupManager.java           |  36 ++---
 .../subscription/SubscriptionGroupManager.java     |  20 +++
 .../broker/topic/RocksDBTopicConfigManager.java    |  26 ++--
 .../rocketmq/broker/topic/TopicConfigManager.java  |  20 +++
 .../offset/RocksdbTransferOffsetAndCqTest.java     | 154 +++++++++++++++++++++
 .../rocketmq/client/impl/MQClientAPIImpl.java      |  15 ++
 .../common/config/AbstractRocksDBStorage.java      |  23 +--
 .../rocketmq/remoting/protocol/RequestCode.java    |   1 +
 .../CheckRocksdbCqWriteProgressResponseBody.java   |  35 +++++
 .../CheckRocksdbCqWriteProgressRequestHeader.java  |  47 +++++++
 .../apache/rocketmq/store/DefaultMessageStore.java |  42 +++++-
 .../apache/rocketmq/store/RocksDBMessageStore.java |  44 +++++-
 .../rocketmq/store/config/MessageStoreConfig.java  |  31 +++++
 .../org/apache/rocketmq/store/queue/CqUnit.java    |   1 +
 .../rocketmq/store/queue/RocksDBConsumeQueue.java  |   3 +-
 .../store/queue/RocksDBConsumeQueueStore.java      |  10 +-
 .../rocketmq/tools/admin/DefaultMQAdminExt.java    |   7 +
 .../tools/admin/DefaultMQAdminExtImpl.java         |   7 +
 .../apache/rocketmq/tools/admin/MQAdminExt.java    |   3 +
 .../export/ExportMetadataInRocksDBCommand.java     |   4 +-
 .../queue/CheckRocksdbCqWriteProgressCommand.java  |  97 +++++++++++++
 25 files changed, 755 insertions(+), 66 deletions(-)

diff --git 
a/broker/src/main/java/org/apache/rocketmq/broker/BrokerController.java 
b/broker/src/main/java/org/apache/rocketmq/broker/BrokerController.java
index 22ac7fedf1..aaf06caddf 100644
--- a/broker/src/main/java/org/apache/rocketmq/broker/BrokerController.java
+++ b/broker/src/main/java/org/apache/rocketmq/broker/BrokerController.java
@@ -18,7 +18,6 @@ package org.apache.rocketmq.broker;
 
 import com.google.common.annotations.VisibleForTesting;
 import com.google.common.collect.Lists;
-import java.io.IOException;
 import java.net.InetSocketAddress;
 import java.util.AbstractMap;
 import java.util.ArrayList;
@@ -789,6 +788,9 @@ public class BrokerController {
                 defaultMessageStore = new 
RocksDBMessageStore(this.messageStoreConfig, this.brokerStatsManager, 
this.messageArrivingListener, this.brokerConfig, 
topicConfigManager.getTopicConfigTable());
             } else {
                 defaultMessageStore = new 
DefaultMessageStore(this.messageStoreConfig, this.brokerStatsManager, 
this.messageArrivingListener, this.brokerConfig, 
topicConfigManager.getTopicConfigTable());
+                if (messageStoreConfig.isRocksdbCQDoubleWriteEnable()) {
+                    defaultMessageStore.enableRocksdbCQWrite();
+                }
             }
 
             if (messageStoreConfig.isEnableDLegerCommitLog()) {
@@ -812,7 +814,7 @@ public class BrokerController {
                 this.timerMessageStore.registerEscapeBridgeHook(msg -> 
escapeBridge.putMessage(msg));
                 this.messageStore.setTimerMessageStore(this.timerMessageStore);
             }
-        } catch (IOException e) {
+        } catch (Exception e) {
             result = false;
             LOG.error("BrokerController#initialize: unexpected error occurs", 
e);
         }
diff --git 
a/broker/src/main/java/org/apache/rocketmq/broker/offset/ConsumerOffsetManager.java
 
b/broker/src/main/java/org/apache/rocketmq/broker/offset/ConsumerOffsetManager.java
index 21f20dde32..403324137c 100644
--- 
a/broker/src/main/java/org/apache/rocketmq/broker/offset/ConsumerOffsetManager.java
+++ 
b/broker/src/main/java/org/apache/rocketmq/broker/offset/ConsumerOffsetManager.java
@@ -31,6 +31,7 @@ import com.google.common.base.Strings;
 import org.apache.rocketmq.broker.BrokerController;
 import org.apache.rocketmq.broker.BrokerPathConfigHelper;
 import org.apache.rocketmq.common.ConfigManager;
+import org.apache.rocketmq.common.MixAll;
 import org.apache.rocketmq.common.UtilAll;
 import org.apache.rocketmq.common.constant.LoggerName;
 import org.apache.rocketmq.logging.org.slf4j.Logger;
@@ -373,6 +374,25 @@ public class ConsumerOffsetManager extends ConfigManager {
         this.dataVersion = dataVersion;
     }
 
+    public boolean loadDataVersion() {
+        String fileName = null;
+        try {
+            fileName = this.configFilePath();
+            String jsonString = MixAll.file2String(fileName);
+            if (jsonString != null) {
+                ConsumerOffsetManager obj = 
RemotingSerializable.fromJson(jsonString, ConsumerOffsetManager.class);
+                if (obj != null) {
+                    this.dataVersion = obj.dataVersion;
+                }
+                LOG.info("load consumer offset dataVersion success,{},{} ", 
fileName, jsonString);
+            }
+            return true;
+        } catch (Exception e) {
+            LOG.error("load consumer offset dataVersion failed " + fileName, 
e);
+            return false;
+        }
+    }
+
     public void removeOffset(final String group) {
         Iterator<Entry<String, ConcurrentMap<Integer, Long>>> it = 
this.offsetTable.entrySet().iterator();
         while (it.hasNext()) {
diff --git 
a/broker/src/main/java/org/apache/rocketmq/broker/offset/RocksDBConsumerOffsetManager.java
 
b/broker/src/main/java/org/apache/rocketmq/broker/offset/RocksDBConsumerOffsetManager.java
index de293fc499..1e7cda71ee 100644
--- 
a/broker/src/main/java/org/apache/rocketmq/broker/offset/RocksDBConsumerOffsetManager.java
+++ 
b/broker/src/main/java/org/apache/rocketmq/broker/offset/RocksDBConsumerOffsetManager.java
@@ -16,26 +16,31 @@
  */
 package org.apache.rocketmq.broker.offset;
 
+import com.alibaba.fastjson.JSON;
+import com.alibaba.fastjson.serializer.SerializerFeature;
 import java.io.File;
 import java.util.Iterator;
 import java.util.Map.Entry;
 import java.util.concurrent.ConcurrentMap;
-
 import org.apache.rocketmq.broker.BrokerController;
 import org.apache.rocketmq.broker.RocksDBConfigManager;
+import org.apache.rocketmq.common.UtilAll;
+import org.apache.rocketmq.common.constant.LoggerName;
 import org.apache.rocketmq.common.utils.DataConverter;
+import org.apache.rocketmq.logging.org.slf4j.Logger;
+import org.apache.rocketmq.logging.org.slf4j.LoggerFactory;
+import org.apache.rocketmq.remoting.protocol.DataVersion;
 import org.rocksdb.WriteBatch;
 
-import com.alibaba.fastjson.JSON;
-import com.alibaba.fastjson.serializer.SerializerFeature;
-
 public class RocksDBConsumerOffsetManager extends ConsumerOffsetManager {
 
+    protected static final Logger log = 
LoggerFactory.getLogger(LoggerName.BROKER_LOGGER_NAME);
+
     protected RocksDBConfigManager rocksDBConfigManager;
 
     public RocksDBConsumerOffsetManager(BrokerController brokerController) {
         super(brokerController);
-        this.rocksDBConfigManager = new RocksDBConfigManager(configFilePath(), 
brokerController.getMessageStoreConfig().getMemTableFlushIntervalMs());
+        this.rocksDBConfigManager = new 
RocksDBConfigManager(rocksdbConfigFilePath(), 
brokerController.getMessageStoreConfig().getMemTableFlushIntervalMs());
     }
 
     @Override
@@ -43,9 +48,47 @@ public class RocksDBConsumerOffsetManager extends 
ConsumerOffsetManager {
         if (!rocksDBConfigManager.init()) {
             return false;
         }
-        return this.rocksDBConfigManager.loadData(this::decodeOffset);
+        if (!loadDataVersion() || !loadConsumerOffset()) {
+            return false;
+        }
+
+        return true;
+    }
+
+    public boolean loadConsumerOffset() {
+        return this.rocksDBConfigManager.loadData(this::decodeOffset) && 
merge();
+    }
+
+    private boolean merge() {
+        if 
(!brokerController.getMessageStoreConfig().isTransferOffsetJsonToRocksdb()) {
+            log.info("the switch transferOffsetJsonToRocksdb is off, no merge 
offset operation is needed.");
+            return true;
+        }
+        if (!UtilAll.isPathExists(this.configFilePath()) && 
!UtilAll.isPathExists(this.configFilePath() + ".bak")) {
+            log.info("consumerOffset json file does not exist, so skip merge");
+            return true;
+        }
+        if (!super.loadDataVersion()) {
+            log.error("load json consumerOffset dataVersion error, startup 
will exit");
+            return false;
+        }
+
+        final DataVersion dataVersion = super.getDataVersion();
+        final DataVersion kvDataVersion = this.getDataVersion();
+        if (dataVersion.getCounter().get() > kvDataVersion.getCounter().get()) 
{
+            if (!super.load()) {
+                log.error("load json consumerOffset info failed, startup will 
exit");
+                return false;
+            }
+            this.persist();
+            this.getDataVersion().assignNewOne(dataVersion);
+            updateDataVersion();
+            log.info("update offset from json, dataVersion:{}, offsetTable: {} 
", this.getDataVersion(), JSON.toJSONString(this.getOffsetTable()));
+        }
+        return true;
     }
 
+
     @Override
     public boolean stop() {
         return this.rocksDBConfigManager.stop();
@@ -69,8 +112,7 @@ public class RocksDBConsumerOffsetManager extends 
ConsumerOffsetManager {
         LOG.info("load exist local offset, {}, {}", topicAtGroup, 
wrapper.getOffsetTable());
     }
 
-    @Override
-    public String configFilePath() {
+    public String rocksdbConfigFilePath() {
         return 
this.brokerController.getMessageStoreConfig().getStorePathRootDir() + 
File.separator + "config" + File.separator + "consumerOffsets" + File.separator;
     }
 
@@ -103,4 +145,23 @@ public class RocksDBConsumerOffsetManager extends 
ConsumerOffsetManager {
         byte[] valueBytes = JSON.toJSONBytes(wrapper, 
SerializerFeature.BrowserCompatible);
         writeBatch.put(keyBytes, valueBytes);
     }
+
+    @Override
+    public boolean loadDataVersion() {
+        return this.rocksDBConfigManager.loadDataVersion();
+    }
+
+    @Override
+    public DataVersion getDataVersion() {
+        return rocksDBConfigManager.getKvDataVersion();
+    }
+
+    public void updateDataVersion() {
+        try {
+            rocksDBConfigManager.updateKvDataVersion();
+        } catch (Exception e) {
+            log.error("update consumer offset dataVersion error", e);
+            throw new RuntimeException(e);
+        }
+    }
 }
diff --git 
a/broker/src/main/java/org/apache/rocketmq/broker/processor/AdminBrokerProcessor.java
 
b/broker/src/main/java/org/apache/rocketmq/broker/processor/AdminBrokerProcessor.java
index 28bd254914..863f16e515 100644
--- 
a/broker/src/main/java/org/apache/rocketmq/broker/processor/AdminBrokerProcessor.java
+++ 
b/broker/src/main/java/org/apache/rocketmq/broker/processor/AdminBrokerProcessor.java
@@ -18,9 +18,11 @@ package org.apache.rocketmq.broker.processor;
 
 import com.alibaba.fastjson.JSON;
 import com.alibaba.fastjson.JSONObject;
+import com.google.common.collect.ImmutableMap;
 import com.google.common.collect.Sets;
 import io.netty.channel.Channel;
 import io.netty.channel.ChannelHandlerContext;
+import io.opentelemetry.api.common.Attributes;
 import java.io.UnsupportedEncodingException;
 import java.net.UnknownHostException;
 import java.nio.charset.StandardCharsets;
@@ -38,7 +40,6 @@ import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.ConcurrentMap;
 import java.util.concurrent.CountDownLatch;
 import java.util.concurrent.TimeUnit;
-import io.opentelemetry.api.common.Attributes;
 import org.apache.commons.collections.CollectionUtils;
 import org.apache.commons.lang3.StringUtils;
 import org.apache.rocketmq.acl.AccessValidator;
@@ -69,6 +70,7 @@ import org.apache.rocketmq.common.KeyBuilder;
 import org.apache.rocketmq.common.LockCallback;
 import org.apache.rocketmq.common.MQVersion;
 import org.apache.rocketmq.common.MixAll;
+import org.apache.rocketmq.common.Pair;
 import org.apache.rocketmq.common.PlainAccessConfig;
 import org.apache.rocketmq.common.TopicConfig;
 import org.apache.rocketmq.common.UnlockCallback;
@@ -137,6 +139,7 @@ import 
org.apache.rocketmq.remoting.protocol.body.TopicConfigAndMappingSerialize
 import org.apache.rocketmq.remoting.protocol.body.TopicList;
 import org.apache.rocketmq.remoting.protocol.body.UnlockBatchRequestBody;
 import org.apache.rocketmq.remoting.protocol.body.UserInfo;
+import 
org.apache.rocketmq.remoting.protocol.header.CheckRocksdbCqWriteProgressRequestHeader;
 import 
org.apache.rocketmq.remoting.protocol.header.CloneGroupOffsetRequestHeader;
 import 
org.apache.rocketmq.remoting.protocol.header.ConsumeMessageDirectlyResultRequestHeader;
 import 
org.apache.rocketmq.remoting.protocol.header.CreateAccessConfigRequestHeader;
@@ -209,6 +212,7 @@ import org.apache.rocketmq.store.MessageFilter;
 import org.apache.rocketmq.store.MessageStore;
 import org.apache.rocketmq.store.PutMessageResult;
 import org.apache.rocketmq.store.PutMessageStatus;
+import org.apache.rocketmq.store.RocksDBMessageStore;
 import org.apache.rocketmq.store.SelectMappedBufferResult;
 import org.apache.rocketmq.store.config.BrokerRole;
 import org.apache.rocketmq.store.queue.ConsumeQueueInterface;
@@ -217,8 +221,9 @@ import org.apache.rocketmq.store.queue.ReferredIterator;
 import org.apache.rocketmq.store.timer.TimerCheckpoint;
 import org.apache.rocketmq.store.timer.TimerMessageStore;
 import org.apache.rocketmq.store.util.LibC;
-import static 
org.apache.rocketmq.broker.metrics.BrokerMetricsConstant.LABEL_IS_SYSTEM;
+
 import static 
org.apache.rocketmq.broker.metrics.BrokerMetricsConstant.LABEL_INVOCATION_STATUS;
+import static 
org.apache.rocketmq.broker.metrics.BrokerMetricsConstant.LABEL_IS_SYSTEM;
 import static 
org.apache.rocketmq.remoting.protocol.RemotingCommand.buildErrorResponse;
 
 public class AdminBrokerProcessor implements NettyRequestProcessor {
@@ -339,6 +344,8 @@ public class AdminBrokerProcessor implements 
NettyRequestProcessor {
                 return fetchAllConsumeStatsInBroker(ctx, request);
             case RequestCode.QUERY_CONSUME_QUEUE:
                 return queryConsumeQueue(ctx, request);
+            case RequestCode.CHECK_ROCKSDB_CQ_WRITE_PROGRESS:
+                return this.checkRocksdbCqWriteProgress(ctx, request);
             case RequestCode.UPDATE_AND_GET_GROUP_FORBIDDEN:
                 return this.updateAndGetGroupForbidden(ctx, request);
             case RequestCode.GET_SUBSCRIPTIONGROUP_CONFIG:
@@ -458,6 +465,71 @@ public class AdminBrokerProcessor implements 
NettyRequestProcessor {
         return response;
     }
 
+    private RemotingCommand checkRocksdbCqWriteProgress(ChannelHandlerContext 
ctx, RemotingCommand request) throws RemotingCommandException {
+        CheckRocksdbCqWriteProgressRequestHeader requestHeader = 
request.decodeCommandCustomHeader(CheckRocksdbCqWriteProgressRequestHeader.class);
+        String requestTopic = requestHeader.getTopic();
+        final RemotingCommand response = 
RemotingCommand.createResponseCommand(null);
+        response.setCode(ResponseCode.SUCCESS);
+
+        DefaultMessageStore messageStore = (DefaultMessageStore) 
brokerController.getMessageStore();
+        RocksDBMessageStore rocksDBMessageStore = 
messageStore.getRocksDBMessageStore();
+        if 
(!messageStore.getMessageStoreConfig().isRocksdbCQDoubleWriteEnable()) {
+            response.setBody(JSON.toJSONBytes(ImmutableMap.of("diffResult", 
"rocksdbCQWriteEnable is false, checkRocksdbCqWriteProgressCommand is 
invalid")));
+            return response;
+        }
+
+        ConcurrentMap<String, ConcurrentMap<Integer, ConsumeQueueInterface>> 
cqTable = messageStore.getConsumeQueueTable();
+        StringBuilder diffResult = new StringBuilder("check success, all is 
ok!\n");
+        try {
+            if (StringUtils.isNotBlank(requestTopic)) {
+                processConsumeQueuesForTopic(cqTable.get(requestTopic), 
requestTopic, rocksDBMessageStore, diffResult,false);
+                
response.setBody(JSON.toJSONBytes(ImmutableMap.of("diffResult", 
diffResult.toString())));
+                return response;
+            }
+            for (Map.Entry<String, ConcurrentMap<Integer, 
ConsumeQueueInterface>> topicEntry : cqTable.entrySet()) {
+                String topic = topicEntry.getKey();
+                processConsumeQueuesForTopic(topicEntry.getValue(), topic, 
rocksDBMessageStore, diffResult,true);
+            }
+            diffResult.append("check all topic successful, 
size:").append(cqTable.size());
+            response.setBody(JSON.toJSONBytes(ImmutableMap.of("diffResult", 
diffResult.toString())));
+
+        } catch (Exception e) {
+            LOGGER.error("CheckRocksdbCqWriteProgressCommand error", e);
+            response.setBody(JSON.toJSONBytes(ImmutableMap.of("diffResult", 
e.getMessage())));
+        }
+        return response;
+    }
+
+    private void processConsumeQueuesForTopic(ConcurrentMap<Integer, 
ConsumeQueueInterface> queueMap, String topic, RocksDBMessageStore 
rocksDBMessageStore, StringBuilder diffResult, boolean checkAll) {
+        for (Map.Entry<Integer, ConsumeQueueInterface> queueEntry : 
queueMap.entrySet()) {
+            Integer queueId = queueEntry.getKey();
+            ConsumeQueueInterface jsonCq = queueEntry.getValue();
+            ConsumeQueueInterface kvCq = 
rocksDBMessageStore.getConsumeQueue(topic, queueId);
+            if (!checkAll) {
+                String format = String.format("\n[topic: %s, queue:  %s] \n  
kvEarliest : %s |  kvLatest : %s \n fileEarliest: %s | fileEarliest: %s ",
+                    topic, queueId, kvCq.getEarliestUnit(), 
kvCq.getLatestUnit(), jsonCq.getEarliestUnit(), jsonCq.getLatestUnit());
+                diffResult.append(format).append("\n");
+            }
+            long maxFileOffsetInQueue = jsonCq.getMaxOffsetInQueue();
+            long minOffsetInQueue = kvCq.getMinOffsetInQueue();
+            for (long i = minOffsetInQueue; i < maxFileOffsetInQueue; i++) {
+                Pair<CqUnit, Long> fileCqUnit = 
jsonCq.getCqUnitAndStoreTime(i);
+                Pair<CqUnit, Long> kvCqUnit = kvCq.getCqUnitAndStoreTime(i);
+                if (fileCqUnit == null || kvCqUnit == null) {
+                    diffResult.append(String.format("[topic: %s, queue: %s, 
offset: %s] \n kv  : %s  \n file: %s  \n",
+                        topic, queueId, i, kvCqUnit != null ? 
kvCqUnit.getObject1() : "null", fileCqUnit != null ? fileCqUnit.getObject1() : 
"null"));
+                    return;
+                }
+                if (!checkCqUnitEqual(kvCqUnit.getObject1(), 
fileCqUnit.getObject1())) {
+                    String diffInfo = String.format("[topic:%s, queue: %s 
offset: %s] \n file: %s  \n  kv: %s",
+                        topic, queueId, i, kvCqUnit.getObject1(), 
fileCqUnit.getObject1());
+                    LOGGER.error(diffInfo);
+                    diffResult.append(diffInfo).append("\n");
+                    return;
+                }
+            }
+        }
+    }
     @Override
     public boolean rejectRequest() {
         return false;
@@ -3305,4 +3377,20 @@ public class AdminBrokerProcessor implements 
NettyRequestProcessor {
         }
         return false;
     }
+
+    private boolean checkCqUnitEqual(CqUnit cqUnit1, CqUnit cqUnit2) {
+        if (cqUnit1.getQueueOffset() != cqUnit2.getQueueOffset()) {
+            return false;
+        }
+        if (cqUnit1.getSize() != cqUnit2.getSize()) {
+            return false;
+        }
+        if (cqUnit1.getPos() != cqUnit2.getPos()) {
+            return false;
+        }
+        if (cqUnit1.getBatchNum() != cqUnit2.getBatchNum()) {
+            return false;
+        }
+        return cqUnit1.getTagsCode() == cqUnit2.getTagsCode();
+    }
 }
diff --git 
a/broker/src/main/java/org/apache/rocketmq/broker/subscription/RocksDBSubscriptionGroupManager.java
 
b/broker/src/main/java/org/apache/rocketmq/broker/subscription/RocksDBSubscriptionGroupManager.java
index 7df72dbe68..5119f78672 100644
--- 
a/broker/src/main/java/org/apache/rocketmq/broker/subscription/RocksDBSubscriptionGroupManager.java
+++ 
b/broker/src/main/java/org/apache/rocketmq/broker/subscription/RocksDBSubscriptionGroupManager.java
@@ -19,6 +19,12 @@ package org.apache.rocketmq.broker.subscription;
 import com.alibaba.fastjson.JSON;
 import com.alibaba.fastjson.JSONObject;
 import com.alibaba.fastjson.serializer.SerializerFeature;
+import java.io.File;
+import java.util.Map;
+import java.util.Set;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ConcurrentMap;
+import java.util.function.BiConsumer;
 import org.apache.rocketmq.broker.BrokerController;
 import org.apache.rocketmq.broker.RocksDBConfigManager;
 import org.apache.rocketmq.common.UtilAll;
@@ -27,13 +33,6 @@ import org.apache.rocketmq.remoting.protocol.DataVersion;
 import 
org.apache.rocketmq.remoting.protocol.subscription.SubscriptionGroupConfig;
 import org.rocksdb.RocksIterator;
 
-import java.io.File;
-import java.util.Map;
-import java.util.Set;
-import java.util.concurrent.ConcurrentHashMap;
-import java.util.concurrent.ConcurrentMap;
-import java.util.function.BiConsumer;
-
 public class RocksDBSubscriptionGroupManager extends SubscriptionGroupManager {
 
     protected RocksDBConfigManager rocksDBConfigManager;
@@ -79,28 +78,30 @@ public class RocksDBSubscriptionGroupManager extends 
SubscriptionGroupManager {
 
     private boolean merge() {
         if 
(!brokerController.getMessageStoreConfig().isTransferMetadataJsonToRocksdb()) {
-            log.info("The switch is off, no merge operation is needed.");
+            log.info("the switch  transferMetadataJsonToRocksdb is off, no 
merge subGroup operation is needed.");
             return true;
         }
         if (!UtilAll.isPathExists(this.configFilePath()) && 
!UtilAll.isPathExists(this.configFilePath() + ".bak")) {
-            log.info("json file and json back file not exist, so skip merge");
+            log.info("subGroup json file does not exist, so skip merge");
             return true;
         }
-
-        if (!super.load()) {
-            log.error("load group and forbidden info from json file error, 
startup will exit");
+        if (!super.loadDataVersion()) {
+            log.error("load json subGroup dataVersion error, startup will 
exit");
             return false;
         }
-
-        final ConcurrentMap<String, SubscriptionGroupConfig> groupTable = 
this.getSubscriptionGroupTable();
-        final ConcurrentMap<String, ConcurrentMap<String, Integer>> 
forbiddenTable = this.getForbiddenTable();
         final DataVersion dataVersion = super.getDataVersion();
         final DataVersion kvDataVersion = this.getDataVersion();
         if (dataVersion.getCounter().get() > kvDataVersion.getCounter().get()) 
{
+            if (!super.load()) {
+                log.error("load group and forbidden info from json file error, 
startup will exit");
+                return false;
+            }
+            final ConcurrentMap<String, SubscriptionGroupConfig> groupTable = 
this.getSubscriptionGroupTable();
             for (Map.Entry<String, SubscriptionGroupConfig> entry : 
groupTable.entrySet()) {
                 putSubscriptionGroupConfig(entry.getValue());
                 log.info("import subscription config to rocksdb, group={}", 
entry.getValue());
             }
+            final ConcurrentMap<String, ConcurrentMap<String, Integer>> 
forbiddenTable = this.getForbiddenTable();
             for (Map.Entry<String, ConcurrentMap<String, Integer>> entry : 
forbiddenTable.entrySet()) {
                 try {
                     this.rocksDBConfigManager.updateForbidden(entry.getKey(), 
JSON.toJSONString(entry.getValue()));
@@ -110,8 +111,10 @@ public class RocksDBSubscriptionGroupManager extends 
SubscriptionGroupManager {
                     return false;
                 }
             }
-            
this.rocksDBConfigManager.getKvDataVersion().assignNewOne(dataVersion);
+            this.getDataVersion().assignNewOne(dataVersion);
             updateDataVersion();
+        } else {
+            log.info("dataVersion is not greater than kvDataVersion, no need 
to merge group metaData, dataVersion={}, kvDataVersion={}", dataVersion, 
kvDataVersion);
         }
         log.info("finish marge subscription config from json file and merge to 
rocksdb");
         this.persist();
@@ -196,6 +199,7 @@ public class RocksDBSubscriptionGroupManager extends 
SubscriptionGroupManager {
         try {
             rocksDBConfigManager.updateKvDataVersion();
         } catch (Exception e) {
+            log.error("update group config dataVersion error", e);
             throw new RuntimeException(e);
         }
     }
diff --git 
a/broker/src/main/java/org/apache/rocketmq/broker/subscription/SubscriptionGroupManager.java
 
b/broker/src/main/java/org/apache/rocketmq/broker/subscription/SubscriptionGroupManager.java
index f2a7e0482b..e6855ef9a2 100644
--- 
a/broker/src/main/java/org/apache/rocketmq/broker/subscription/SubscriptionGroupManager.java
+++ 
b/broker/src/main/java/org/apache/rocketmq/broker/subscription/SubscriptionGroupManager.java
@@ -334,6 +334,26 @@ public class SubscriptionGroupManager extends 
ConfigManager {
         return dataVersion;
     }
 
+    public boolean loadDataVersion() {
+        String fileName = null;
+        try {
+            fileName = this.configFilePath();
+            String jsonString = MixAll.file2String(fileName);
+            if (jsonString != null) {
+                SubscriptionGroupManager obj = 
RemotingSerializable.fromJson(jsonString, SubscriptionGroupManager.class);
+                if (obj != null) {
+                    this.dataVersion.assignNewOne(obj.dataVersion);
+                    this.printLoadDataWhenFirstBoot(obj);
+                    log.info("load subGroup dataVersion success,{},{}", 
fileName, obj.dataVersion);
+                }
+            }
+            return true;
+        } catch (Exception e) {
+            log.error("load subGroup dataVersion failed" + fileName, e);
+            return false;
+        }
+    }
+
     public void deleteSubscriptionGroupConfig(final String groupName) {
         SubscriptionGroupConfig old = removeSubscriptionGroupConfig(groupName);
         this.forbiddenTable.remove(groupName);
diff --git 
a/broker/src/main/java/org/apache/rocketmq/broker/topic/RocksDBTopicConfigManager.java
 
b/broker/src/main/java/org/apache/rocketmq/broker/topic/RocksDBTopicConfigManager.java
index 2a89dd7e02..466e6416f9 100644
--- 
a/broker/src/main/java/org/apache/rocketmq/broker/topic/RocksDBTopicConfigManager.java
+++ 
b/broker/src/main/java/org/apache/rocketmq/broker/topic/RocksDBTopicConfigManager.java
@@ -18,6 +18,9 @@ package org.apache.rocketmq.broker.topic;
 
 import com.alibaba.fastjson.JSON;
 import com.alibaba.fastjson.serializer.SerializerFeature;
+import java.io.File;
+import java.util.Map;
+import java.util.concurrent.ConcurrentMap;
 import org.apache.rocketmq.broker.BrokerController;
 import org.apache.rocketmq.broker.RocksDBConfigManager;
 import org.apache.rocketmq.common.TopicConfig;
@@ -25,10 +28,6 @@ import org.apache.rocketmq.common.UtilAll;
 import org.apache.rocketmq.common.utils.DataConverter;
 import org.apache.rocketmq.remoting.protocol.DataVersion;
 
-import java.io.File;
-import java.util.Map;
-import java.util.concurrent.ConcurrentMap;
-
 public class RocksDBTopicConfigManager extends TopicConfigManager {
 
     protected RocksDBConfigManager rocksDBConfigManager;
@@ -60,29 +59,35 @@ public class RocksDBTopicConfigManager extends 
TopicConfigManager {
 
     private boolean merge() {
         if 
(!brokerController.getMessageStoreConfig().isTransferMetadataJsonToRocksdb()) {
-            log.info("The switch is off, no merge operation is needed.");
+            log.info("the switch transferMetadataJsonToRocksdb is off, no 
merge topic operation is needed.");
             return true;
         }
         if (!UtilAll.isPathExists(this.configFilePath()) && 
!UtilAll.isPathExists(this.configFilePath() + ".bak")) {
-            log.info("json file and json back file not exist, so skip merge");
+            log.info("topic json file does not exist, so skip merge");
             return true;
         }
 
-        if (!super.load()) {
-            log.error("load topic config from json file error, startup will 
exit");
+        if (!super.loadDataVersion()) {
+            log.error("load json topic dataVersion error, startup will exit");
             return false;
         }
 
-        final ConcurrentMap<String, TopicConfig> topicConfigTable = 
this.getTopicConfigTable();
         final DataVersion dataVersion = super.getDataVersion();
         final DataVersion kvDataVersion = this.getDataVersion();
         if (dataVersion.getCounter().get() > kvDataVersion.getCounter().get()) 
{
+            if (!super.load()) {
+                log.error("load topic config from json file error, startup 
will exit");
+                return false;
+            }
+            final ConcurrentMap<String, TopicConfig> topicConfigTable = 
this.getTopicConfigTable();
             for (Map.Entry<String, TopicConfig> entry : 
topicConfigTable.entrySet()) {
                 putTopicConfig(entry.getValue());
                 log.info("import topic config to rocksdb, topic={}", 
entry.getValue());
             }
-            
this.rocksDBConfigManager.getKvDataVersion().assignNewOne(dataVersion);
+            this.getDataVersion().assignNewOne(dataVersion);
             updateDataVersion();
+        } else {
+            log.info("dataVersion is not greater than kvDataVersion, no need 
to merge topic metaData, dataVersion={}, kvDataVersion={}", dataVersion, 
kvDataVersion);
         }
         log.info("finish read topic config from json file and merge to 
rocksdb");
         this.persist();
@@ -150,6 +155,7 @@ public class RocksDBTopicConfigManager extends 
TopicConfigManager {
         try {
             rocksDBConfigManager.updateKvDataVersion();
         } catch (Exception e) {
+            log.error("update topic config dataVersion error", e);
             throw new RuntimeException(e);
         }
     }
diff --git 
a/broker/src/main/java/org/apache/rocketmq/broker/topic/TopicConfigManager.java 
b/broker/src/main/java/org/apache/rocketmq/broker/topic/TopicConfigManager.java
index eab2896b00..25d3218f2a 100644
--- 
a/broker/src/main/java/org/apache/rocketmq/broker/topic/TopicConfigManager.java
+++ 
b/broker/src/main/java/org/apache/rocketmq/broker/topic/TopicConfigManager.java
@@ -637,6 +637,26 @@ public class TopicConfigManager extends ConfigManager {
         return encode(false);
     }
 
+    public boolean loadDataVersion() {
+        String fileName = null;
+        try {
+            fileName = this.configFilePath();
+            String jsonString = MixAll.file2String(fileName);
+            if (jsonString != null) {
+                TopicConfigSerializeWrapper topicConfigSerializeWrapper =
+                    TopicConfigSerializeWrapper.fromJson(jsonString, 
TopicConfigSerializeWrapper.class);
+                if (topicConfigSerializeWrapper != null) {
+                    
this.dataVersion.assignNewOne(topicConfigSerializeWrapper.getDataVersion());
+                    log.info("load topic metadata dataVersion success {}, {}", 
fileName, topicConfigSerializeWrapper.getDataVersion());
+                }
+            }
+            return true;
+        } catch (Exception e) {
+            log.error("load topic metadata dataVersion failed" + fileName, e);
+            return false;
+        }
+    }
+
     @Override
     public String configFilePath() {
         return 
BrokerPathConfigHelper.getTopicConfigPath(this.brokerController.getMessageStoreConfig().getStorePathRootDir());
diff --git 
a/broker/src/test/java/org/apache/rocketmq/broker/offset/RocksdbTransferOffsetAndCqTest.java
 
b/broker/src/test/java/org/apache/rocketmq/broker/offset/RocksdbTransferOffsetAndCqTest.java
new file mode 100644
index 0000000000..b4800aec24
--- /dev/null
+++ 
b/broker/src/test/java/org/apache/rocketmq/broker/offset/RocksdbTransferOffsetAndCqTest.java
@@ -0,0 +1,154 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.rocketmq.broker.offset;
+
+import java.io.IOException;
+import java.nio.file.Paths;
+import java.util.HashMap;
+import java.util.UUID;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ConcurrentMap;
+import org.apache.commons.collections.MapUtils;
+import org.apache.rocketmq.broker.BrokerController;
+import org.apache.rocketmq.common.BrokerConfig;
+import org.apache.rocketmq.common.MixAll;
+import org.apache.rocketmq.common.Pair;
+import org.apache.rocketmq.common.TopicConfig;
+import org.apache.rocketmq.store.DefaultMessageStore;
+import org.apache.rocketmq.store.DispatchRequest;
+import org.apache.rocketmq.store.RocksDBMessageStore;
+import org.apache.rocketmq.store.config.MessageStoreConfig;
+import org.apache.rocketmq.store.queue.ConsumeQueueInterface;
+import org.apache.rocketmq.store.queue.ConsumeQueueStoreInterface;
+import org.apache.rocketmq.store.queue.CqUnit;
+import org.apache.rocketmq.store.stats.BrokerStatsManager;
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.mockito.Mock;
+import org.mockito.Mockito;
+import org.mockito.junit.MockitoJUnitRunner;
+import org.rocksdb.RocksDBException;
+
+@RunWith(MockitoJUnitRunner.class)
+public class RocksdbTransferOffsetAndCqTest {
+
+    private final String basePath = Paths.get(System.getProperty("user.home"),
+        "unit-test-store", UUID.randomUUID().toString().substring(0, 
16).toUpperCase()).toString();
+
+    private final String topic = "topic";
+    private final String group = "group";
+    private final String clientHost = "clientHost";
+    private final int queueId = 1;
+
+    private RocksDBConsumerOffsetManager rocksdbConsumerOffsetManager;
+
+    private ConsumerOffsetManager consumerOffsetManager;
+
+    private DefaultMessageStore defaultMessageStore;
+
+    @Mock
+    private BrokerController brokerController;
+
+    @Before
+    public void init() throws IOException {
+        if (notToBeExecuted()) {
+            return;
+        }
+        BrokerConfig brokerConfig = new BrokerConfig();
+        brokerConfig.setConsumerOffsetUpdateVersionStep(10);
+        MessageStoreConfig messageStoreConfig = new MessageStoreConfig();
+        messageStoreConfig.setStorePathRootDir(basePath);
+        messageStoreConfig.setTransferOffsetJsonToRocksdb(true);
+        messageStoreConfig.setRocksdbCQDoubleWriteEnable(true);
+        
Mockito.lenient().when(brokerController.getBrokerConfig()).thenReturn(brokerConfig);
+        
Mockito.lenient().when(brokerController.getMessageStoreConfig()).thenReturn(messageStoreConfig);
+
+        defaultMessageStore = new DefaultMessageStore(messageStoreConfig, new 
BrokerStatsManager("aaa", true), null,
+            brokerConfig, new ConcurrentHashMap<String, TopicConfig>());
+        defaultMessageStore.enableRocksdbCQWrite();
+        defaultMessageStore.loadCheckPoint();
+
+        consumerOffsetManager = new ConsumerOffsetManager(brokerController);
+        consumerOffsetManager.load();
+
+        rocksdbConsumerOffsetManager = new 
RocksDBConsumerOffsetManager(brokerController);
+    }
+
+    @Test
+    public void testTransferOffset() {
+        if (notToBeExecuted()) {
+            return;
+        }
+
+        for (int i = 0; i < 200; i++) {
+            consumerOffsetManager.commitOffset(clientHost, group, topic, 
queueId, i);
+        }
+
+        ConcurrentMap<String, ConcurrentMap<Integer, Long>> offsetTable = 
consumerOffsetManager.getOffsetTable();
+        ConcurrentMap<Integer, Long> map = offsetTable.get(topic + "@" + 
group);
+        Assert.assertTrue(MapUtils.isNotEmpty(map));
+
+        Long offset = map.get(queueId);
+        Assert.assertEquals(199L, (long) offset);
+
+        long offsetDataVersion = 
consumerOffsetManager.getDataVersion().getCounter().get();
+        Assert.assertEquals(20L, offsetDataVersion);
+
+        consumerOffsetManager.persist();
+
+        boolean loadResult = rocksdbConsumerOffsetManager.load();
+        Assert.assertTrue(loadResult);
+
+        ConcurrentMap<String, ConcurrentMap<Integer, Long>> rocksdbOffsetTable 
= rocksdbConsumerOffsetManager.getOffsetTable();
+
+        ConcurrentMap<Integer, Long> rocksdbMap = rocksdbOffsetTable.get(topic 
+ "@" + group);
+        Assert.assertTrue(MapUtils.isNotEmpty(rocksdbMap));
+
+        Long aLong1 = rocksdbMap.get(queueId);
+        Assert.assertEquals(199L, (long) aLong1);
+
+        long rocksdbOffset = 
rocksdbConsumerOffsetManager.getDataVersion().getCounter().get();
+        Assert.assertEquals(21L, rocksdbOffset);
+    }
+
+    @Test
+    public void testRocksdbCqWrite() throws RocksDBException {
+        if (notToBeExecuted()) {
+            return;
+        }
+        RocksDBMessageStore kvStore = 
defaultMessageStore.getRocksDBMessageStore();
+        ConsumeQueueStoreInterface store = kvStore.getConsumeQueueStore();
+        ConsumeQueueInterface rocksdbCq = 
defaultMessageStore.getRocksDBMessageStore().findConsumeQueue(topic, queueId);
+        ConsumeQueueInterface fileCq = 
defaultMessageStore.findConsumeQueue(topic, queueId);
+        for (int i = 0; i < 200; i++) {
+            DispatchRequest request = new DispatchRequest(topic, queueId, i, 
200, 0, System.currentTimeMillis(), i, "", "", 0, 0, new HashMap<>());
+            fileCq.putMessagePositionInfoWrapper(request);
+            store.putMessagePositionInfoWrapper(request);
+        }
+        Pair<CqUnit, Long> unit = rocksdbCq.getCqUnitAndStoreTime(100);
+        Pair<CqUnit, Long> unit1 = fileCq.getCqUnitAndStoreTime(100);
+        Assert.assertTrue(unit.getObject1().getPos() == 
unit1.getObject1().getPos());
+    }
+
+    private boolean notToBeExecuted() {
+        return MixAll.isMac();
+    }
+
+}
diff --git 
a/client/src/main/java/org/apache/rocketmq/client/impl/MQClientAPIImpl.java 
b/client/src/main/java/org/apache/rocketmq/client/impl/MQClientAPIImpl.java
index b539b8f098..0a45f09623 100644
--- a/client/src/main/java/org/apache/rocketmq/client/impl/MQClientAPIImpl.java
+++ b/client/src/main/java/org/apache/rocketmq/client/impl/MQClientAPIImpl.java
@@ -113,6 +113,7 @@ import 
org.apache.rocketmq.remoting.protocol.body.BrokerMemberGroup;
 import org.apache.rocketmq.remoting.protocol.body.BrokerReplicasInfo;
 import org.apache.rocketmq.remoting.protocol.body.BrokerStatsData;
 import org.apache.rocketmq.remoting.protocol.body.CheckClientRequestBody;
+import 
org.apache.rocketmq.remoting.protocol.body.CheckRocksdbCqWriteProgressResponseBody;
 import org.apache.rocketmq.remoting.protocol.body.ClusterAclVersionInfo;
 import org.apache.rocketmq.remoting.protocol.body.ClusterInfo;
 import org.apache.rocketmq.remoting.protocol.body.ConsumeMessageDirectlyResult;
@@ -148,6 +149,7 @@ import 
org.apache.rocketmq.remoting.protocol.header.AckMessageRequestHeader;
 import org.apache.rocketmq.remoting.protocol.header.AddBrokerRequestHeader;
 import 
org.apache.rocketmq.remoting.protocol.header.ChangeInvisibleTimeRequestHeader;
 import 
org.apache.rocketmq.remoting.protocol.header.ChangeInvisibleTimeResponseHeader;
+import 
org.apache.rocketmq.remoting.protocol.header.CheckRocksdbCqWriteProgressRequestHeader;
 import 
org.apache.rocketmq.remoting.protocol.header.CloneGroupOffsetRequestHeader;
 import 
org.apache.rocketmq.remoting.protocol.header.ConsumeMessageDirectlyResultRequestHeader;
 import 
org.apache.rocketmq.remoting.protocol.header.ConsumerSendMsgBackRequestHeader;
@@ -3017,6 +3019,19 @@ public class MQClientAPIImpl implements 
NameServerUpdateCallback, StartAndShutdo
         throw new MQClientException(response.getCode(), response.getRemark());
     }
 
+    public CheckRocksdbCqWriteProgressResponseBody 
checkRocksdbCqWriteProgress(final String brokerAddr, final String topic, final 
long timeoutMillis) throws InterruptedException,
+        RemotingTimeoutException, RemotingSendRequestException, 
RemotingConnectException, MQClientException {
+        CheckRocksdbCqWriteProgressRequestHeader header = new 
CheckRocksdbCqWriteProgressRequestHeader();
+        header.setTopic(topic);
+        RemotingCommand request = 
RemotingCommand.createRequestCommand(RequestCode.CHECK_ROCKSDB_CQ_WRITE_PROGRESS,
 header);
+        RemotingCommand response = this.remotingClient.invokeSync(brokerAddr, 
request, timeoutMillis);
+        assert response != null;
+        if (ResponseCode.SUCCESS == response.getCode()) {
+            return 
CheckRocksdbCqWriteProgressResponseBody.decode(response.getBody(), 
CheckRocksdbCqWriteProgressResponseBody.class);
+        }
+        throw new MQClientException(response.getCode(), response.getRemark());
+    }
+
     public void checkClientInBroker(final String brokerAddr, final String 
consumerGroup,
         final String clientId, final SubscriptionData subscriptionData,
         final long timeoutMillis)
diff --git 
a/common/src/main/java/org/apache/rocketmq/common/config/AbstractRocksDBStorage.java
 
b/common/src/main/java/org/apache/rocketmq/common/config/AbstractRocksDBStorage.java
index f88b8e198b..13522889bb 100644
--- 
a/common/src/main/java/org/apache/rocketmq/common/config/AbstractRocksDBStorage.java
+++ 
b/common/src/main/java/org/apache/rocketmq/common/config/AbstractRocksDBStorage.java
@@ -17,6 +17,15 @@
 package org.apache.rocketmq.common.config;
 
 import com.google.common.collect.Maps;
+import java.nio.ByteBuffer;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.ArrayBlockingQueue;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.Semaphore;
+import java.util.concurrent.ThreadPoolExecutor;
+import java.util.concurrent.TimeUnit;
 import org.apache.rocketmq.common.ThreadFactoryImpl;
 import org.apache.rocketmq.common.constant.LoggerName;
 import org.apache.rocketmq.common.utils.DataConverter;
@@ -40,16 +49,6 @@ import org.rocksdb.Status;
 import org.rocksdb.WriteBatch;
 import org.rocksdb.WriteOptions;
 
-import java.nio.ByteBuffer;
-import java.util.ArrayList;
-import java.util.List;
-import java.util.Map;
-import java.util.concurrent.ArrayBlockingQueue;
-import java.util.concurrent.ScheduledExecutorService;
-import java.util.concurrent.Semaphore;
-import java.util.concurrent.ThreadPoolExecutor;
-import java.util.concurrent.TimeUnit;
-
 import static org.rocksdb.RocksDB.NOT_FOUND;
 
 public abstract class AbstractRocksDBStorage {
@@ -495,7 +494,9 @@ public abstract class AbstractRocksDBStorage {
             String blocksPinnedByIteratorMemUsage = 
this.db.getProperty("rocksdb.block-cache-pinned-usage");
             logger.info("MemUsage. blockCache: {}, indexesAndFilterBlock: {}, 
memtable: {}, blocksPinnedByIterator: {}",
                     blockCacheMemUsage, indexesAndFilterBlockMemUsage, 
memTableMemUsage, blocksPinnedByIteratorMemUsage);
-        } catch (Exception ignored) {
+        } catch (Exception e) {
+            logger.error("statRocksdb Failed. {}", this.dbPath, e);
+            throw new RuntimeException(e);
         }
     }
 }
diff --git 
a/remoting/src/main/java/org/apache/rocketmq/remoting/protocol/RequestCode.java 
b/remoting/src/main/java/org/apache/rocketmq/remoting/protocol/RequestCode.java
index f45ff6fa48..cfc5cc2278 100644
--- 
a/remoting/src/main/java/org/apache/rocketmq/remoting/protocol/RequestCode.java
+++ 
b/remoting/src/main/java/org/apache/rocketmq/remoting/protocol/RequestCode.java
@@ -217,6 +217,7 @@ public class RequestCode {
 
     public static final int GET_SUBSCRIPTIONGROUP_CONFIG = 352;
     public static final int UPDATE_AND_GET_GROUP_FORBIDDEN = 353;
+    public static final int CHECK_ROCKSDB_CQ_WRITE_PROGRESS = 354;
 
     public static final int LITE_PULL_MESSAGE = 361;
 
diff --git 
a/remoting/src/main/java/org/apache/rocketmq/remoting/protocol/body/CheckRocksdbCqWriteProgressResponseBody.java
 
b/remoting/src/main/java/org/apache/rocketmq/remoting/protocol/body/CheckRocksdbCqWriteProgressResponseBody.java
new file mode 100644
index 0000000000..76719ac1a2
--- /dev/null
+++ 
b/remoting/src/main/java/org/apache/rocketmq/remoting/protocol/body/CheckRocksdbCqWriteProgressResponseBody.java
@@ -0,0 +1,35 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.rocketmq.remoting.protocol.body;
+
+import org.apache.rocketmq.remoting.protocol.RemotingSerializable;
+
+public class CheckRocksdbCqWriteProgressResponseBody extends 
RemotingSerializable  {
+
+    String diffResult;
+
+    public String getDiffResult() {
+        return diffResult;
+    }
+
+    public void setDiffResult(String diffResult) {
+        this.diffResult = diffResult;
+    }
+
+
+}
diff --git 
a/remoting/src/main/java/org/apache/rocketmq/remoting/protocol/header/CheckRocksdbCqWriteProgressRequestHeader.java
 
b/remoting/src/main/java/org/apache/rocketmq/remoting/protocol/header/CheckRocksdbCqWriteProgressRequestHeader.java
new file mode 100644
index 0000000000..fee158b497
--- /dev/null
+++ 
b/remoting/src/main/java/org/apache/rocketmq/remoting/protocol/header/CheckRocksdbCqWriteProgressRequestHeader.java
@@ -0,0 +1,47 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.rocketmq.remoting.protocol.header;
+
+import org.apache.rocketmq.common.action.Action;
+import org.apache.rocketmq.common.action.RocketMQAction;
+import org.apache.rocketmq.common.resource.ResourceType;
+import org.apache.rocketmq.common.resource.RocketMQResource;
+import org.apache.rocketmq.remoting.CommandCustomHeader;
+import org.apache.rocketmq.remoting.annotation.CFNotNull;
+import org.apache.rocketmq.remoting.exception.RemotingCommandException;
+import org.apache.rocketmq.remoting.protocol.RequestCode;
+
+@RocketMQAction(value = RequestCode.CHECK_ROCKSDB_CQ_WRITE_PROGRESS, action = 
Action.GET)
+public class CheckRocksdbCqWriteProgressRequestHeader implements 
CommandCustomHeader {
+
+    @CFNotNull
+    @RocketMQResource(ResourceType.TOPIC)
+    private String topic;
+
+    @Override
+    public void checkFields() throws RemotingCommandException {
+
+    }
+
+    public String getTopic() {
+        return topic;
+    }
+
+    public void setTopic(String topic) {
+        this.topic = topic;
+    }
+}
diff --git 
a/store/src/main/java/org/apache/rocketmq/store/DefaultMessageStore.java 
b/store/src/main/java/org/apache/rocketmq/store/DefaultMessageStore.java
index 8f564d5bc1..8b46c7f5ce 100644
--- a/store/src/main/java/org/apache/rocketmq/store/DefaultMessageStore.java
+++ b/store/src/main/java/org/apache/rocketmq/store/DefaultMessageStore.java
@@ -163,11 +163,13 @@ public class DefaultMessageStore implements MessageStore {
     private volatile boolean shutdown = true;
     protected boolean notifyMessageArriveInBatch = false;
 
-    private StoreCheckpoint storeCheckpoint;
+    protected StoreCheckpoint storeCheckpoint;
     private TimerMessageStore timerMessageStore;
 
     private final LinkedList<CommitLogDispatcher> dispatcherList;
 
+    private RocksDBMessageStore rocksDBMessageStore;
+
     private RandomAccessFile lockFile;
 
     private FileLock lock;
@@ -354,12 +356,7 @@ public class DefaultMessageStore implements MessageStore {
             }
 
             if (result) {
-                this.storeCheckpoint =
-                    new StoreCheckpoint(
-                        
StorePathConfigHelper.getStoreCheckpoint(this.messageStoreConfig.getStorePathRootDir()));
-                this.masterFlushedOffset = 
this.storeCheckpoint.getMasterFlushedOffset();
-                setConfirmOffset(this.storeCheckpoint.getConfirmPhyOffset());
-
+                loadCheckPoint();
                 result = this.indexService.load(lastExitOK);
                 this.recover(lastExitOK);
                 LOGGER.info("message store recover end, and the max phy offset 
= {}", this.getMaxPhyOffset());
@@ -381,6 +378,14 @@ public class DefaultMessageStore implements MessageStore {
         return result;
     }
 
+    public void loadCheckPoint() throws IOException {
+        this.storeCheckpoint =
+            new StoreCheckpoint(
+                
StorePathConfigHelper.getStoreCheckpoint(this.messageStoreConfig.getStorePathRootDir()));
+        this.masterFlushedOffset = 
this.storeCheckpoint.getMasterFlushedOffset();
+        setConfirmOffset(this.storeCheckpoint.getConfirmPhyOffset());
+    }
+
     /**
      * @throws Exception
      */
@@ -511,6 +516,10 @@ public class DefaultMessageStore implements MessageStore {
                 this.compactionService.shutdown();
             }
 
+            if (messageStoreConfig.isRocksdbCQDoubleWriteEnable()) {
+                this.rocksDBMessageStore.consumeQueueStore.shutdown();
+            }
+
             this.flushConsumeQueueService.shutdown();
             this.allocateMappedFileService.shutdown();
             this.storeCheckpoint.flush();
@@ -3251,6 +3260,17 @@ public class DefaultMessageStore implements MessageStore 
{
         }
     }
 
+    public void enableRocksdbCQWrite() {
+        try {
+            RocksDBMessageStore store = new 
RocksDBMessageStore(this.messageStoreConfig, this.brokerStatsManager, 
this.messageArrivingListener, this.brokerConfig, this.topicConfigTable);
+            this.rocksDBMessageStore = store;
+            store.loadAndStartConsumerServiceOnly();
+            addDispatcher(store.getDispatcherBuildRocksdbConsumeQueue());
+        } catch (Exception e) {
+            LOGGER.error("enableRocksdbCqWrite error", e);
+        }
+    }
+
     public int getMaxDelayLevel() {
         return maxDelayLevel;
     }
@@ -3338,4 +3358,12 @@ public class DefaultMessageStore implements MessageStore 
{
     public long getReputFromOffset() {
         return this.reputMessageService.getReputFromOffset();
     }
+
+    public RocksDBMessageStore getRocksDBMessageStore() {
+        return this.rocksDBMessageStore;
+    }
+
+    public ConsumeQueueStoreInterface getConsumeQueueStore() {
+        return consumeQueueStore;
+    }
 }
diff --git 
a/store/src/main/java/org/apache/rocketmq/store/RocksDBMessageStore.java 
b/store/src/main/java/org/apache/rocketmq/store/RocksDBMessageStore.java
index 6141b778bf..90df7aed59 100644
--- a/store/src/main/java/org/apache/rocketmq/store/RocksDBMessageStore.java
+++ b/store/src/main/java/org/apache/rocketmq/store/RocksDBMessageStore.java
@@ -16,16 +16,16 @@
  */
 package org.apache.rocketmq.store;
 
+import io.opentelemetry.api.common.AttributesBuilder;
+import io.opentelemetry.api.metrics.Meter;
 import java.io.IOException;
 import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.ConcurrentMap;
 import java.util.function.Supplier;
-
-import io.opentelemetry.api.common.AttributesBuilder;
-import io.opentelemetry.api.metrics.Meter;
 import org.apache.rocketmq.common.BrokerConfig;
 import org.apache.rocketmq.common.TopicConfig;
 import org.apache.rocketmq.common.UtilAll;
+import org.apache.rocketmq.common.sysflag.MessageSysFlag;
 import org.apache.rocketmq.store.config.MessageStoreConfig;
 import org.apache.rocketmq.store.config.StorePathConfigHelper;
 import org.apache.rocketmq.store.metrics.DefaultStoreMetricsManager;
@@ -39,6 +39,8 @@ import org.rocksdb.RocksDBException;
 
 public class RocksDBMessageStore extends DefaultMessageStore {
 
+    private CommitLogDispatcherBuildRocksdbConsumeQueue 
dispatcherBuildRocksdbConsumeQueue;
+
     public RocksDBMessageStore(final MessageStoreConfig messageStoreConfig, 
final BrokerStatsManager brokerStatsManager,
         final MessageArrivingListener messageArrivingListener, final 
BrokerConfig brokerConfig, final ConcurrentMap<String, TopicConfig> 
topicConfigTable) throws
         IOException {
@@ -178,4 +180,40 @@ public class RocksDBMessageStore extends 
DefaultMessageStore {
         // Also add some metrics for rocksdb's monitoring.
         RocksDBStoreMetricsManager.init(meter, attributesBuilderSupplier, 
this);
     }
+
+    public CommitLogDispatcherBuildRocksdbConsumeQueue 
getDispatcherBuildRocksdbConsumeQueue() {
+        return dispatcherBuildRocksdbConsumeQueue;
+    }
+
+    class CommitLogDispatcherBuildRocksdbConsumeQueue implements 
CommitLogDispatcher {
+        @Override
+        public void dispatch(DispatchRequest request) throws RocksDBException {
+            final int tranType = 
MessageSysFlag.getTransactionValue(request.getSysFlag());
+            switch (tranType) {
+                case MessageSysFlag.TRANSACTION_NOT_TYPE:
+                case MessageSysFlag.TRANSACTION_COMMIT_TYPE:
+                    putMessagePositionInfo(request);
+                    break;
+                case MessageSysFlag.TRANSACTION_PREPARED_TYPE:
+                case MessageSysFlag.TRANSACTION_ROLLBACK_TYPE:
+                    break;
+            }
+        }
+    }
+
+    public void loadAndStartConsumerServiceOnly() {
+        try {
+            this.dispatcherBuildRocksdbConsumeQueue = new 
CommitLogDispatcherBuildRocksdbConsumeQueue();
+            boolean loadResult = this.consumeQueueStore.load();
+            if (!loadResult) {
+                throw new RuntimeException("load consume queue failed");
+            }
+            super.loadCheckPoint();
+            this.consumeQueueStore.start();
+        } catch (Exception e) {
+            ERROR_LOG.error("loadAndStartConsumerServiceOnly error", e);
+            throw new RuntimeException(e);
+        }
+    }
+
 }
diff --git 
a/store/src/main/java/org/apache/rocketmq/store/config/MessageStoreConfig.java 
b/store/src/main/java/org/apache/rocketmq/store/config/MessageStoreConfig.java
index 0b45d92418..c077831f3c 100644
--- 
a/store/src/main/java/org/apache/rocketmq/store/config/MessageStoreConfig.java
+++ 
b/store/src/main/java/org/apache/rocketmq/store/config/MessageStoreConfig.java
@@ -424,6 +424,37 @@ public class MessageStoreConfig {
 
     private boolean putConsumeQueueDataByFileChannel = true;
 
+    private boolean transferOffsetJsonToRocksdb = false;
+
+    private boolean rocksdbCQDoubleWriteEnable = false;
+
+    private boolean enableBatchWriteKvCq = true;
+
+
+    public boolean isEnableBatchWriteKvCq() {
+        return enableBatchWriteKvCq;
+    }
+
+    public void setEnableBatchWriteKvCq(boolean enableBatchWriteKvCq) {
+        this.enableBatchWriteKvCq = enableBatchWriteKvCq;
+    }
+
+    public boolean isRocksdbCQDoubleWriteEnable() {
+        return rocksdbCQDoubleWriteEnable;
+    }
+
+    public void setRocksdbCQDoubleWriteEnable(boolean rocksdbWriteEnable) {
+        this.rocksdbCQDoubleWriteEnable = rocksdbWriteEnable;
+    }
+
+    public boolean isTransferOffsetJsonToRocksdb() {
+        return transferOffsetJsonToRocksdb;
+    }
+
+    public void setTransferOffsetJsonToRocksdb(boolean 
transferOffsetJsonToRocksdb) {
+        this.transferOffsetJsonToRocksdb = transferOffsetJsonToRocksdb;
+    }
+
     public boolean isEnabledAppendPropCRC() {
         return enabledAppendPropCRC;
     }
diff --git a/store/src/main/java/org/apache/rocketmq/store/queue/CqUnit.java 
b/store/src/main/java/org/apache/rocketmq/store/queue/CqUnit.java
index b8865fd919..34f5cb142b 100644
--- a/store/src/main/java/org/apache/rocketmq/store/queue/CqUnit.java
+++ b/store/src/main/java/org/apache/rocketmq/store/queue/CqUnit.java
@@ -109,6 +109,7 @@ public class CqUnit {
                 ", size=" + size +
                 ", pos=" + pos +
                 ", batchNum=" + batchNum +
+                ", tagsCode=" + tagsCode +
                 ", compactedOffset=" + compactedOffset +
                 '}';
     }
diff --git 
a/store/src/main/java/org/apache/rocketmq/store/queue/RocksDBConsumeQueue.java 
b/store/src/main/java/org/apache/rocketmq/store/queue/RocksDBConsumeQueue.java
index 5a981bb4df..2363c2896e 100644
--- 
a/store/src/main/java/org/apache/rocketmq/store/queue/RocksDBConsumeQueue.java
+++ 
b/store/src/main/java/org/apache/rocketmq/store/queue/RocksDBConsumeQueue.java
@@ -18,7 +18,6 @@ package org.apache.rocketmq.store.queue;
 
 import java.nio.ByteBuffer;
 import java.util.List;
-
 import org.apache.rocketmq.common.BoundaryType;
 import org.apache.rocketmq.common.Pair;
 import org.apache.rocketmq.common.attribute.CQType;
@@ -311,7 +310,7 @@ public class RocksDBConsumeQueue implements 
ConsumeQueueInterface {
     public CqUnit getLatestUnit() {
         try {
             long maxOffset = 
this.messageStore.getQueueStore().getMaxOffsetInQueue(topic, queueId);
-            return get(maxOffset);
+            return get(maxOffset > 0 ? maxOffset - 1 : maxOffset);
         } catch (RocksDBException e) {
             ERROR_LOG.error("getLatestUnit Failed. topic: {}, queueId: {}, 
{}", topic, queueId, e.getMessage());
         }
diff --git 
a/store/src/main/java/org/apache/rocketmq/store/queue/RocksDBConsumeQueueStore.java
 
b/store/src/main/java/org/apache/rocketmq/store/queue/RocksDBConsumeQueueStore.java
index 3c6b91ec01..34c6d2f395 100644
--- 
a/store/src/main/java/org/apache/rocketmq/store/queue/RocksDBConsumeQueueStore.java
+++ 
b/store/src/main/java/org/apache/rocketmq/store/queue/RocksDBConsumeQueueStore.java
@@ -28,7 +28,6 @@ import java.util.concurrent.ConcurrentMap;
 import java.util.concurrent.Executors;
 import java.util.concurrent.ScheduledExecutorService;
 import java.util.concurrent.TimeUnit;
-
 import org.apache.commons.io.FileUtils;
 import org.apache.commons.lang3.StringUtils;
 import org.apache.rocketmq.common.BoundaryType;
@@ -78,6 +77,8 @@ public class RocksDBConsumeQueueStore extends 
AbstractConsumeQueueStore {
     private final Map<ByteBuffer, Pair<ByteBuffer, DispatchRequest>> 
tempTopicQueueMaxOffsetMap;
     private volatile boolean isCQError = false;
 
+    private boolean enableBatchWriteKvCq;
+
     public RocksDBConsumeQueueStore(DefaultMessageStore messageStore) {
         super(messageStore);
 
@@ -87,6 +88,7 @@ public class RocksDBConsumeQueueStore extends 
AbstractConsumeQueueStore {
         this.rocksDBConsumeQueueOffsetTable = new 
RocksDBConsumeQueueOffsetTable(rocksDBConsumeQueueTable, rocksDBStorage, 
messageStore);
 
         this.writeBatch = new WriteBatch();
+        this.enableBatchWriteKvCq = 
messageStoreConfig.isEnableBatchWriteKvCq();
         this.bufferDRList = new ArrayList(BATCH_SIZE);
         this.cqBBPairList = new ArrayList(BATCH_SIZE);
         this.offsetBBPairList = new ArrayList(BATCH_SIZE);
@@ -164,12 +166,12 @@ public class RocksDBConsumeQueueStore extends 
AbstractConsumeQueueStore {
 
     @Override
     public void putMessagePositionInfoWrapper(DispatchRequest request) throws 
RocksDBException {
-        if (request == null || this.bufferDRList.size() >= BATCH_SIZE) {
-            putMessagePosition();
-        }
         if (request != null) {
             this.bufferDRList.add(request);
         }
+        if (request == null || !enableBatchWriteKvCq || 
this.bufferDRList.size() >= BATCH_SIZE) {
+            putMessagePosition();
+        }
     }
 
     public void putMessagePosition() throws RocksDBException {
diff --git 
a/tools/src/main/java/org/apache/rocketmq/tools/admin/DefaultMQAdminExt.java 
b/tools/src/main/java/org/apache/rocketmq/tools/admin/DefaultMQAdminExt.java
index 6ebee1d0dd..3686bf2644 100644
--- a/tools/src/main/java/org/apache/rocketmq/tools/admin/DefaultMQAdminExt.java
+++ b/tools/src/main/java/org/apache/rocketmq/tools/admin/DefaultMQAdminExt.java
@@ -52,6 +52,7 @@ import 
org.apache.rocketmq.remoting.protocol.body.ConsumeMessageDirectlyResult;
 import org.apache.rocketmq.remoting.protocol.body.ConsumeStatsList;
 import org.apache.rocketmq.remoting.protocol.body.ConsumerConnection;
 import org.apache.rocketmq.remoting.protocol.body.ConsumerRunningInfo;
+import 
org.apache.rocketmq.remoting.protocol.body.CheckRocksdbCqWriteProgressResponseBody;
 import org.apache.rocketmq.remoting.protocol.body.EpochEntryCache;
 import org.apache.rocketmq.remoting.protocol.body.GroupList;
 import org.apache.rocketmq.remoting.protocol.body.HARuntimeInfo;
@@ -771,6 +772,12 @@ public class DefaultMQAdminExt extends ClientConfig 
implements MQAdminExt {
         );
     }
 
+    @Override
+    public CheckRocksdbCqWriteProgressResponseBody 
checkRocksdbCqWriteProgress(String brokerAddr, String topic)
+        throws InterruptedException, RemotingTimeoutException, 
RemotingSendRequestException, RemotingConnectException, MQClientException {
+        return 
this.defaultMQAdminExtImpl.checkRocksdbCqWriteProgress(brokerAddr, topic);
+    }
+
     @Override
     public boolean resumeCheckHalfMessage(String topic,
         String msgId)
diff --git 
a/tools/src/main/java/org/apache/rocketmq/tools/admin/DefaultMQAdminExtImpl.java
 
b/tools/src/main/java/org/apache/rocketmq/tools/admin/DefaultMQAdminExtImpl.java
index dc4d35e704..883dcbe41d 100644
--- 
a/tools/src/main/java/org/apache/rocketmq/tools/admin/DefaultMQAdminExtImpl.java
+++ 
b/tools/src/main/java/org/apache/rocketmq/tools/admin/DefaultMQAdminExtImpl.java
@@ -90,6 +90,7 @@ import 
org.apache.rocketmq.remoting.protocol.body.ConsumeMessageDirectlyResult;
 import org.apache.rocketmq.remoting.protocol.body.ConsumeStatsList;
 import org.apache.rocketmq.remoting.protocol.body.ConsumerConnection;
 import org.apache.rocketmq.remoting.protocol.body.ConsumerRunningInfo;
+import 
org.apache.rocketmq.remoting.protocol.body.CheckRocksdbCqWriteProgressResponseBody;
 import org.apache.rocketmq.remoting.protocol.body.EpochEntryCache;
 import org.apache.rocketmq.remoting.protocol.body.GroupList;
 import org.apache.rocketmq.remoting.protocol.body.HARuntimeInfo;
@@ -1817,6 +1818,12 @@ public class DefaultMQAdminExtImpl implements 
MQAdminExt, MQAdminExtInner {
         return 
this.mqClientInstance.getMQClientAPIImpl().queryConsumeQueue(brokerAddr, topic, 
queueId, index, count, consumerGroup, timeoutMillis);
     }
 
+    @Override
+    public CheckRocksdbCqWriteProgressResponseBody 
checkRocksdbCqWriteProgress(String brokerAddr, String topic)
+        throws InterruptedException, RemotingTimeoutException, 
RemotingSendRequestException, RemotingConnectException, MQClientException {
+        return 
this.mqClientInstance.getMQClientAPIImpl().checkRocksdbCqWriteProgress(brokerAddr,
 topic, timeoutMillis);
+    }
+
     @Override
     public boolean resumeCheckHalfMessage(final String topic,
         final String msgId) throws RemotingException, MQClientException, 
InterruptedException, MQBrokerException {
diff --git 
a/tools/src/main/java/org/apache/rocketmq/tools/admin/MQAdminExt.java 
b/tools/src/main/java/org/apache/rocketmq/tools/admin/MQAdminExt.java
index ff78f22c70..09204ab7be 100644
--- a/tools/src/main/java/org/apache/rocketmq/tools/admin/MQAdminExt.java
+++ b/tools/src/main/java/org/apache/rocketmq/tools/admin/MQAdminExt.java
@@ -48,6 +48,7 @@ import 
org.apache.rocketmq.remoting.protocol.body.ConsumeMessageDirectlyResult;
 import org.apache.rocketmq.remoting.protocol.body.ConsumeStatsList;
 import org.apache.rocketmq.remoting.protocol.body.ConsumerConnection;
 import org.apache.rocketmq.remoting.protocol.body.ConsumerRunningInfo;
+import 
org.apache.rocketmq.remoting.protocol.body.CheckRocksdbCqWriteProgressResponseBody;
 import org.apache.rocketmq.remoting.protocol.body.EpochEntryCache;
 import org.apache.rocketmq.remoting.protocol.body.GroupList;
 import org.apache.rocketmq.remoting.protocol.body.HARuntimeInfo;
@@ -148,6 +149,8 @@ public interface MQAdminExt extends MQAdmin {
         final String consumerGroup) throws RemotingException, 
MQClientException, InterruptedException,
         MQBrokerException;
 
+    CheckRocksdbCqWriteProgressResponseBody checkRocksdbCqWriteProgress(String 
brokerAddr, String topic) throws InterruptedException, 
RemotingTimeoutException, RemotingSendRequestException, 
RemotingConnectException, MQClientException;
+
     ConsumeStats examineConsumeStats(final String consumerGroup,
         final String topic) throws RemotingException, MQClientException,
         InterruptedException, MQBrokerException;
diff --git 
a/tools/src/main/java/org/apache/rocketmq/tools/command/export/ExportMetadataInRocksDBCommand.java
 
b/tools/src/main/java/org/apache/rocketmq/tools/command/export/ExportMetadataInRocksDBCommand.java
index 1ecb1fa2cd..c466490b8a 100644
--- 
a/tools/src/main/java/org/apache/rocketmq/tools/command/export/ExportMetadataInRocksDBCommand.java
+++ 
b/tools/src/main/java/org/apache/rocketmq/tools/command/export/ExportMetadataInRocksDBCommand.java
@@ -14,6 +14,7 @@
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
+
 package org.apache.rocketmq.tools.command.export;
 
 import com.alibaba.fastjson.JSONObject;
@@ -77,6 +78,7 @@ public class ExportMetadataInRocksDBCommand implements 
SubCommand {
         }
 
         String configType = 
commandLine.getOptionValue("configType").trim().toLowerCase();
+        path += "/" + configType;
 
         boolean jsonEnable = false;
         if (commandLine.hasOption("jsonEnable")) {
@@ -86,7 +88,7 @@ public class ExportMetadataInRocksDBCommand implements 
SubCommand {
 
         ConfigRocksDBStorage kvStore = new ConfigRocksDBStorage(path, true /* 
readOnly */);
         if (!kvStore.start()) {
-            System.out.print("RocksDB load error, path=" + path + "\n");
+            System.out.printf("RocksDB load error, path=%s\n" , path);
             return;
         }
 
diff --git 
a/tools/src/main/java/org/apache/rocketmq/tools/command/queue/CheckRocksdbCqWriteProgressCommand.java
 
b/tools/src/main/java/org/apache/rocketmq/tools/command/queue/CheckRocksdbCqWriteProgressCommand.java
new file mode 100644
index 0000000000..82dcb74196
--- /dev/null
+++ 
b/tools/src/main/java/org/apache/rocketmq/tools/command/queue/CheckRocksdbCqWriteProgressCommand.java
@@ -0,0 +1,97 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.rocketmq.tools.command.queue;
+
+import java.util.Map;
+import java.util.Set;
+import org.apache.commons.cli.CommandLine;
+import org.apache.commons.cli.Option;
+import org.apache.commons.cli.Options;
+import org.apache.commons.lang3.StringUtils;
+import org.apache.rocketmq.remoting.RPCHook;
+import 
org.apache.rocketmq.remoting.protocol.body.CheckRocksdbCqWriteProgressResponseBody;
+import org.apache.rocketmq.remoting.protocol.body.ClusterInfo;
+import org.apache.rocketmq.remoting.protocol.route.BrokerData;
+import org.apache.rocketmq.tools.admin.DefaultMQAdminExt;
+import org.apache.rocketmq.tools.command.SubCommand;
+
+public class CheckRocksdbCqWriteProgressCommand implements SubCommand {
+
+    @Override
+    public String commandName() {
+        return "checkRocksdbCqWriteProgressCommandCommand";
+    }
+
+    @Override
+    public String commandDesc() {
+        return "check if rocksdb cq is same as file cq";
+    }
+
+    @Override
+    public Options buildCommandlineOptions(Options options) {
+        Option opt = new Option("c", "cluster", true, "cluster name");
+        opt.setRequired(true);
+        options.addOption(opt);
+
+        opt = new Option("n", "nameserverAddr", true, "nameserverAddr");
+        opt.setRequired(true);
+        options.addOption(opt);
+
+        opt = new Option("t", "topic", true, "topic name");
+        opt.setRequired(false);
+        options.addOption(opt);
+        return options;
+    }
+
+    @Override
+    public void execute(CommandLine commandLine, Options options, RPCHook 
rpcHook) {
+        DefaultMQAdminExt defaultMQAdminExt = new DefaultMQAdminExt(rpcHook);
+
+        
defaultMQAdminExt.setInstanceName(Long.toString(System.currentTimeMillis()));
+        
defaultMQAdminExt.setNamesrvAddr(StringUtils.trim(commandLine.getOptionValue('n')));
+        String clusterName = commandLine.hasOption('c') ? 
commandLine.getOptionValue('c').trim() : "";
+        String topic = commandLine.hasOption('t') ? 
commandLine.getOptionValue('t').trim() : "";
+
+        try {
+            defaultMQAdminExt.start();
+            ClusterInfo clusterInfo = 
defaultMQAdminExt.examineBrokerClusterInfo();
+            Map<String, Set<String>> clusterAddrTable = 
clusterInfo.getClusterAddrTable();
+            Map<String, BrokerData> brokerAddrTable = 
clusterInfo.getBrokerAddrTable();
+            if (clusterAddrTable.get(clusterName) == null) {
+                System.out.print("clusterAddrTable is empty");
+                return;
+            }
+            for (Map.Entry<String, BrokerData> entry : 
brokerAddrTable.entrySet()) {
+                String brokerName = entry.getKey();
+                BrokerData brokerData = entry.getValue();
+                String brokerAddr = brokerData.getBrokerAddrs().get(0L);
+                CheckRocksdbCqWriteProgressResponseBody body = 
defaultMQAdminExt.checkRocksdbCqWriteProgress(brokerAddr, topic);
+                if (StringUtils.isNotBlank(topic)) {
+                    System.out.printf(body.getDiffResult());
+                } else {
+                    System.out.printf(brokerName + " | " + brokerAddr + " | " 
+ body.getDiffResult());
+                }
+            }
+
+        } catch (Exception e) {
+            throw new RuntimeException(this.getClass().getSimpleName() + " 
command failed", e);
+        } finally {
+            defaultMQAdminExt.shutdown();
+        }
+    }
+}

Reply via email to