This is an automated email from the ASF dual-hosted git repository.

lollipop pushed a commit to branch develop
in repository https://gitbox.apache.org/repos/asf/rocketmq.git


The following commit(s) were added to refs/heads/develop by this push:
     new 0b24768500 [ISSUE #8822] Double write cq, reduce unnecessary switches 
(#8823)
0b24768500 is described below

commit 0b247685007258d1502fa992434402fd40b92cea
Author: LetLetMe <43874697+letle...@users.noreply.github.com>
AuthorDate: Tue Oct 29 19:09:35 2024 +0800

    [ISSUE #8822] Double write cq, reduce unnecessary switches (#8823)
    
    * Reduce unnecessary switches
---
 .../rocketmq/broker/RocksDBConfigManager.java      |  14 +-
 .../config/v1/RocksDBConsumerOffsetManager.java    |   9 +-
 .../config/v1/RocksDBSubscriptionGroupManager.java |   8 +-
 .../config/v1/RocksDBTopicConfigManager.java       |   8 +-
 .../broker/processor/AdminBrokerProcessor.java     | 198 ++++++++++++++-------
 .../offset/RocksdbTransferOffsetAndCqTest.java     |   1 -
 .../RocksdbGroupConfigTransferTest.java            |   1 -
 .../topic/RocksdbTopicConfigManagerTest.java       |   1 -
 .../topic/RocksdbTopicConfigTransferTest.java      |   1 -
 .../rocketmq/client/impl/MQClientAPIImpl.java      |   7 +-
 .../rocketmq/common/CheckRocksdbCqWriteResult.java |  38 +++-
 .../common/config/AbstractRocksDBStorage.java      |   3 +-
 .../common/config/ConfigRocksDBStorage.java        |  12 +-
 .../CheckRocksdbCqWriteProgressRequestHeader.java  |  11 ++
 .../apache/rocketmq/store/RocksDBMessageStore.java |   4 +
 .../rocketmq/store/config/MessageStoreConfig.java  |  32 ++--
 .../store/rocksdb/RocksDBOptionsFactory.java       |   5 +-
 .../rocketmq/tools/admin/DefaultMQAdminExt.java    |   6 +-
 .../tools/admin/DefaultMQAdminExtImpl.java         |   6 +-
 .../apache/rocketmq/tools/admin/MQAdminExt.java    |   5 +-
 .../queue/CheckRocksdbCqWriteProgressCommand.java  |  21 ++-
 21 files changed, 246 insertions(+), 145 deletions(-)

diff --git 
a/broker/src/main/java/org/apache/rocketmq/broker/RocksDBConfigManager.java 
b/broker/src/main/java/org/apache/rocketmq/broker/RocksDBConfigManager.java
index 20358c4707..ee2d4e54a6 100644
--- a/broker/src/main/java/org/apache/rocketmq/broker/RocksDBConfigManager.java
+++ b/broker/src/main/java/org/apache/rocketmq/broker/RocksDBConfigManager.java
@@ -17,40 +17,40 @@
 package org.apache.rocketmq.broker;
 
 import com.alibaba.fastjson.JSON;
+import java.nio.charset.StandardCharsets;
+import java.util.function.BiConsumer;
 import org.apache.commons.lang3.StringUtils;
 import org.apache.rocketmq.common.config.ConfigRocksDBStorage;
 import org.apache.rocketmq.common.constant.LoggerName;
 import org.apache.rocketmq.logging.org.slf4j.Logger;
 import org.apache.rocketmq.logging.org.slf4j.LoggerFactory;
 import org.apache.rocketmq.remoting.protocol.DataVersion;
+import org.rocksdb.CompressionType;
 import org.rocksdb.FlushOptions;
 import org.rocksdb.RocksIterator;
 import org.rocksdb.Statistics;
 import org.rocksdb.WriteBatch;
 
-import java.nio.charset.StandardCharsets;
-import java.util.function.BiConsumer;
-
 public class RocksDBConfigManager {
     protected static final Logger BROKER_LOG = 
LoggerFactory.getLogger(LoggerName.BROKER_LOGGER_NAME);
     public volatile boolean isStop = false;
     public ConfigRocksDBStorage configRocksDBStorage = null;
     private FlushOptions flushOptions = null;
     private volatile long lastFlushMemTableMicroSecond = 0;
-
     private final String filePath;
     private final long memTableFlushInterval;
+    private final CompressionType compressionType;
     private DataVersion kvDataVersion = new DataVersion();
 
-
-    public RocksDBConfigManager(String filePath, long memTableFlushInterval) {
+    public RocksDBConfigManager(String filePath, long memTableFlushInterval, 
CompressionType compressionType) {
         this.filePath = filePath;
         this.memTableFlushInterval = memTableFlushInterval;
+        this.compressionType = compressionType;
     }
 
     public boolean init() {
         this.isStop = false;
-        this.configRocksDBStorage = new ConfigRocksDBStorage(filePath);
+        this.configRocksDBStorage = new ConfigRocksDBStorage(filePath, 
compressionType);
         return this.configRocksDBStorage.start();
     }
     public boolean loadDataVersion() {
diff --git 
a/broker/src/main/java/org/apache/rocketmq/broker/config/v1/RocksDBConsumerOffsetManager.java
 
b/broker/src/main/java/org/apache/rocketmq/broker/config/v1/RocksDBConsumerOffsetManager.java
index 8066fe769a..824fc0fee3 100644
--- 
a/broker/src/main/java/org/apache/rocketmq/broker/config/v1/RocksDBConsumerOffsetManager.java
+++ 
b/broker/src/main/java/org/apache/rocketmq/broker/config/v1/RocksDBConsumerOffsetManager.java
@@ -31,6 +31,7 @@ import org.apache.rocketmq.common.utils.DataConverter;
 import org.apache.rocketmq.logging.org.slf4j.Logger;
 import org.apache.rocketmq.logging.org.slf4j.LoggerFactory;
 import org.apache.rocketmq.remoting.protocol.DataVersion;
+import org.rocksdb.CompressionType;
 import org.rocksdb.WriteBatch;
 
 public class RocksDBConsumerOffsetManager extends ConsumerOffsetManager {
@@ -41,7 +42,9 @@ public class RocksDBConsumerOffsetManager extends 
ConsumerOffsetManager {
 
     public RocksDBConsumerOffsetManager(BrokerController brokerController) {
         super(brokerController);
-        this.rocksDBConfigManager = new 
RocksDBConfigManager(rocksdbConfigFilePath(), 
brokerController.getMessageStoreConfig().getMemTableFlushIntervalMs());
+        this.rocksDBConfigManager = new 
RocksDBConfigManager(rocksdbConfigFilePath(), 
brokerController.getMessageStoreConfig().getMemTableFlushIntervalMs(),
+            
CompressionType.getCompressionType(brokerController.getMessageStoreConfig().getRocksdbCompressionType()));
+
     }
 
     @Override
@@ -61,10 +64,6 @@ public class RocksDBConsumerOffsetManager extends 
ConsumerOffsetManager {
     }
 
     private boolean merge() {
-        if 
(!brokerController.getMessageStoreConfig().isTransferOffsetJsonToRocksdb()) {
-            log.info("the switch transferOffsetJsonToRocksdb is off, no merge 
offset operation is needed.");
-            return true;
-        }
         if (!UtilAll.isPathExists(this.configFilePath()) && 
!UtilAll.isPathExists(this.configFilePath() + ".bak")) {
             log.info("consumerOffset json file does not exist, so skip merge");
             return true;
diff --git 
a/broker/src/main/java/org/apache/rocketmq/broker/config/v1/RocksDBSubscriptionGroupManager.java
 
b/broker/src/main/java/org/apache/rocketmq/broker/config/v1/RocksDBSubscriptionGroupManager.java
index 8175d63cce..8fc7a4d6ed 100644
--- 
a/broker/src/main/java/org/apache/rocketmq/broker/config/v1/RocksDBSubscriptionGroupManager.java
+++ 
b/broker/src/main/java/org/apache/rocketmq/broker/config/v1/RocksDBSubscriptionGroupManager.java
@@ -32,6 +32,7 @@ import org.apache.rocketmq.common.UtilAll;
 import org.apache.rocketmq.common.utils.DataConverter;
 import org.apache.rocketmq.remoting.protocol.DataVersion;
 import 
org.apache.rocketmq.remoting.protocol.subscription.SubscriptionGroupConfig;
+import org.rocksdb.CompressionType;
 import org.rocksdb.RocksIterator;
 
 public class RocksDBSubscriptionGroupManager extends SubscriptionGroupManager {
@@ -40,7 +41,8 @@ public class RocksDBSubscriptionGroupManager extends 
SubscriptionGroupManager {
 
     public RocksDBSubscriptionGroupManager(BrokerController brokerController) {
         super(brokerController, false);
-        this.rocksDBConfigManager = new 
RocksDBConfigManager(rocksdbConfigFilePath(), 
brokerController.getMessageStoreConfig().getMemTableFlushIntervalMs());
+        this.rocksDBConfigManager = new 
RocksDBConfigManager(rocksdbConfigFilePath(), 
brokerController.getMessageStoreConfig().getMemTableFlushIntervalMs(),
+            
CompressionType.getCompressionType(brokerController.getMessageStoreConfig().getRocksdbCompressionType()));
     }
 
     @Override
@@ -78,10 +80,6 @@ public class RocksDBSubscriptionGroupManager extends 
SubscriptionGroupManager {
 
 
     private boolean merge() {
-        if 
(!brokerController.getMessageStoreConfig().isTransferMetadataJsonToRocksdb()) {
-            log.info("the switch  transferMetadataJsonToRocksdb is off, no 
merge subGroup operation is needed.");
-            return true;
-        }
         if (!UtilAll.isPathExists(this.configFilePath()) && 
!UtilAll.isPathExists(this.configFilePath() + ".bak")) {
             log.info("subGroup json file does not exist, so skip merge");
             return true;
diff --git 
a/broker/src/main/java/org/apache/rocketmq/broker/config/v1/RocksDBTopicConfigManager.java
 
b/broker/src/main/java/org/apache/rocketmq/broker/config/v1/RocksDBTopicConfigManager.java
index bce67392f6..18e633d348 100644
--- 
a/broker/src/main/java/org/apache/rocketmq/broker/config/v1/RocksDBTopicConfigManager.java
+++ 
b/broker/src/main/java/org/apache/rocketmq/broker/config/v1/RocksDBTopicConfigManager.java
@@ -28,6 +28,7 @@ import org.apache.rocketmq.common.TopicConfig;
 import org.apache.rocketmq.common.UtilAll;
 import org.apache.rocketmq.common.utils.DataConverter;
 import org.apache.rocketmq.remoting.protocol.DataVersion;
+import org.rocksdb.CompressionType;
 
 public class RocksDBTopicConfigManager extends TopicConfigManager {
 
@@ -35,7 +36,8 @@ public class RocksDBTopicConfigManager extends 
TopicConfigManager {
 
     public RocksDBTopicConfigManager(BrokerController brokerController) {
         super(brokerController, false);
-        this.rocksDBConfigManager = new 
RocksDBConfigManager(rocksdbConfigFilePath(), 
brokerController.getMessageStoreConfig().getMemTableFlushIntervalMs());
+        this.rocksDBConfigManager = new 
RocksDBConfigManager(rocksdbConfigFilePath(), 
brokerController.getMessageStoreConfig().getMemTableFlushIntervalMs(),
+            
CompressionType.getCompressionType(brokerController.getMessageStoreConfig().getRocksdbCompressionType()));
     }
 
     @Override
@@ -59,10 +61,6 @@ public class RocksDBTopicConfigManager extends 
TopicConfigManager {
     }
 
     private boolean merge() {
-        if 
(!brokerController.getMessageStoreConfig().isTransferMetadataJsonToRocksdb()) {
-            log.info("the switch transferMetadataJsonToRocksdb is off, no 
merge topic operation is needed.");
-            return true;
-        }
         if (!UtilAll.isPathExists(this.configFilePath()) && 
!UtilAll.isPathExists(this.configFilePath() + ".bak")) {
             log.info("topic json file does not exist, so skip merge");
             return true;
diff --git 
a/broker/src/main/java/org/apache/rocketmq/broker/processor/AdminBrokerProcessor.java
 
b/broker/src/main/java/org/apache/rocketmq/broker/processor/AdminBrokerProcessor.java
index aa962513df..381889c624 100644
--- 
a/broker/src/main/java/org/apache/rocketmq/broker/processor/AdminBrokerProcessor.java
+++ 
b/broker/src/main/java/org/apache/rocketmq/broker/processor/AdminBrokerProcessor.java
@@ -18,7 +18,6 @@ package org.apache.rocketmq.broker.processor;
 
 import com.alibaba.fastjson.JSON;
 import com.alibaba.fastjson.JSONObject;
-import com.google.common.collect.ImmutableMap;
 import com.google.common.collect.Sets;
 import io.netty.channel.Channel;
 import io.netty.channel.ChannelHandlerContext;
@@ -39,6 +38,9 @@ import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.ConcurrentMap;
 import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.SynchronousQueue;
+import java.util.concurrent.ThreadPoolExecutor;
 import java.util.concurrent.TimeUnit;
 import org.apache.commons.collections.CollectionUtils;
 import org.apache.commons.lang3.StringUtils;
@@ -65,7 +67,9 @@ import org.apache.rocketmq.broker.metrics.InvocationStatus;
 import org.apache.rocketmq.broker.plugin.BrokerAttachedPlugin;
 import org.apache.rocketmq.broker.subscription.SubscriptionGroupManager;
 import org.apache.rocketmq.broker.transaction.queue.TransactionalMessageUtil;
+import org.apache.rocketmq.common.BoundaryType;
 import org.apache.rocketmq.common.BrokerConfig;
+import org.apache.rocketmq.common.CheckRocksdbCqWriteResult;
 import org.apache.rocketmq.common.KeyBuilder;
 import org.apache.rocketmq.common.LockCallback;
 import org.apache.rocketmq.common.MQVersion;
@@ -214,6 +218,7 @@ import org.apache.rocketmq.store.PutMessageResult;
 import org.apache.rocketmq.store.PutMessageStatus;
 import org.apache.rocketmq.store.RocksDBMessageStore;
 import org.apache.rocketmq.store.SelectMappedBufferResult;
+import org.apache.rocketmq.store.StoreType;
 import org.apache.rocketmq.store.config.BrokerRole;
 import org.apache.rocketmq.store.exception.ConsumeQueueException;
 import org.apache.rocketmq.store.plugin.AbstractPluginMessageStore;
@@ -232,6 +237,7 @@ public class AdminBrokerProcessor implements 
NettyRequestProcessor {
     private static final Logger LOGGER = 
LoggerFactory.getLogger(LoggerName.BROKER_LOGGER_NAME);
     protected final BrokerController brokerController;
     protected Set<String> configBlackList = new HashSet<>();
+    private final ExecutorService asyncExecuteWorker = new 
ThreadPoolExecutor(0, 1, 60L, TimeUnit.SECONDS, new SynchronousQueue<>());
 
     public AdminBrokerProcessor(final BrokerController brokerController) {
         this.brokerController = brokerController;
@@ -467,76 +473,23 @@ public class AdminBrokerProcessor implements 
NettyRequestProcessor {
         return response;
     }
 
-    private RemotingCommand checkRocksdbCqWriteProgress(ChannelHandlerContext 
ctx, RemotingCommand request) throws RemotingCommandException {
-        CheckRocksdbCqWriteProgressRequestHeader requestHeader = 
request.decodeCommandCustomHeader(CheckRocksdbCqWriteProgressRequestHeader.class);
-        String requestTopic = requestHeader.getTopic();
-        final RemotingCommand response = 
RemotingCommand.createResponseCommand(null);
-        response.setCode(ResponseCode.SUCCESS);
-        MessageStore messageStore = brokerController.getMessageStore();
-        DefaultMessageStore defaultMessageStore;
-        if (messageStore instanceof AbstractPluginMessageStore) {
-            defaultMessageStore = (DefaultMessageStore) 
((AbstractPluginMessageStore) messageStore).getNext();
-        } else {
-            defaultMessageStore = (DefaultMessageStore) messageStore;
-        }
-        RocksDBMessageStore rocksDBMessageStore = 
defaultMessageStore.getRocksDBMessageStore();
-        if 
(!defaultMessageStore.getMessageStoreConfig().isRocksdbCQDoubleWriteEnable()) {
-            response.setBody(JSON.toJSONBytes(ImmutableMap.of("diffResult", 
"rocksdbCQWriteEnable is false, checkRocksdbCqWriteProgressCommand is 
invalid")));
-            return response;
-        }
-
-        ConcurrentMap<String, ConcurrentMap<Integer, ConsumeQueueInterface>> 
cqTable = defaultMessageStore.getConsumeQueueTable();
-        StringBuilder diffResult = new StringBuilder();
-        try {
-            if (StringUtils.isNotBlank(requestTopic)) {
-                processConsumeQueuesForTopic(cqTable.get(requestTopic), 
requestTopic, rocksDBMessageStore, diffResult,false);
-                
response.setBody(JSON.toJSONBytes(ImmutableMap.of("diffResult", 
diffResult.toString())));
-                return response;
-            }
-            for (Map.Entry<String, ConcurrentMap<Integer, 
ConsumeQueueInterface>> topicEntry : cqTable.entrySet()) {
-                String topic = topicEntry.getKey();
-                processConsumeQueuesForTopic(topicEntry.getValue(), topic, 
rocksDBMessageStore, diffResult,true);
+    private RemotingCommand checkRocksdbCqWriteProgress(ChannelHandlerContext 
ctx, RemotingCommand request) {
+        CheckRocksdbCqWriteResult result = new CheckRocksdbCqWriteResult();
+        
result.setCheckStatus(CheckRocksdbCqWriteResult.CheckStatus.CHECK_IN_PROGRESS.getValue());
+        Runnable runnable = () -> {
+            try {
+                CheckRocksdbCqWriteResult checkResult = 
doCheckRocksdbCqWriteProgress(ctx, request);
+                LOGGER.info("checkRocksdbCqWriteProgress result: {}", 
JSON.toJSONString(checkResult));
+            } catch (Exception e) {
+                LOGGER.error("checkRocksdbCqWriteProgress error", e);
             }
-            diffResult.append("check all topic successful, 
size:").append(cqTable.size());
-            response.setBody(JSON.toJSONBytes(ImmutableMap.of("diffResult", 
diffResult.toString())));
-
-        } catch (Exception e) {
-            LOGGER.error("CheckRocksdbCqWriteProgressCommand error", e);
-            response.setBody(JSON.toJSONBytes(ImmutableMap.of("diffResult", 
e.getMessage())));
-        }
+        };
+        asyncExecuteWorker.submit(runnable);
+        RemotingCommand response = RemotingCommand.createResponseCommand(null);
+        response.setCode(ResponseCode.SUCCESS);
+        response.setBody(JSON.toJSONBytes(result));
         return response;
     }
-
-    private void processConsumeQueuesForTopic(ConcurrentMap<Integer, 
ConsumeQueueInterface> queueMap, String topic, RocksDBMessageStore 
rocksDBMessageStore, StringBuilder diffResult, boolean checkAll) {
-        for (Map.Entry<Integer, ConsumeQueueInterface> queueEntry : 
queueMap.entrySet()) {
-            Integer queueId = queueEntry.getKey();
-            ConsumeQueueInterface jsonCq = queueEntry.getValue();
-            ConsumeQueueInterface kvCq = 
rocksDBMessageStore.getConsumeQueue(topic, queueId);
-            if (!checkAll) {
-                String format = String.format("\n[topic: %s, queue:  %s] \n  
kvEarliest : %s |  kvLatest : %s \n fileEarliest: %s | fileEarliest: %s ",
-                    topic, queueId, kvCq.getEarliestUnit(), 
kvCq.getLatestUnit(), jsonCq.getEarliestUnit(), jsonCq.getLatestUnit());
-                diffResult.append(format).append("\n");
-            }
-            long maxFileOffsetInQueue = jsonCq.getMaxOffsetInQueue();
-            long minOffsetInQueue = kvCq.getMinOffsetInQueue();
-            for (long i = minOffsetInQueue; i < maxFileOffsetInQueue; i++) {
-                Pair<CqUnit, Long> fileCqUnit = 
jsonCq.getCqUnitAndStoreTime(i);
-                Pair<CqUnit, Long> kvCqUnit = kvCq.getCqUnitAndStoreTime(i);
-                if (fileCqUnit == null || kvCqUnit == null) {
-                    diffResult.append(String.format("[topic: %s, queue: %s, 
offset: %s] \n kv   : %s  \n file : %s  \n",
-                        topic, queueId, i, kvCqUnit != null ? 
kvCqUnit.getObject1() : "null", fileCqUnit != null ? fileCqUnit.getObject1() : 
"null"));
-                    return;
-                }
-                if (!checkCqUnitEqual(kvCqUnit.getObject1(), 
fileCqUnit.getObject1())) {
-                    String diffInfo = String.format("[topic:%s, queue: %s 
offset: %s] \n file : %s  \n  kv : %s \n",
-                        topic, queueId, i, kvCqUnit.getObject1(), 
fileCqUnit.getObject1());
-                    LOGGER.error(diffInfo);
-                    diffResult.append(diffInfo).append(System.lineSeparator());
-                    return;
-                }
-            }
-        }
-    }
     @Override
     public boolean rejectRequest() {
         return false;
@@ -3418,6 +3371,115 @@ public class AdminBrokerProcessor implements 
NettyRequestProcessor {
         return false;
     }
 
+    private CheckRocksdbCqWriteResult 
doCheckRocksdbCqWriteProgress(ChannelHandlerContext ctx, RemotingCommand 
request) throws RemotingCommandException {
+        CheckRocksdbCqWriteProgressRequestHeader requestHeader = 
request.decodeCommandCustomHeader(CheckRocksdbCqWriteProgressRequestHeader.class);
+        String requestTopic = requestHeader.getTopic();
+        MessageStore messageStore = brokerController.getMessageStore();
+        DefaultMessageStore defaultMessageStore;
+        if (messageStore instanceof AbstractPluginMessageStore) {
+            defaultMessageStore = (DefaultMessageStore) 
((AbstractPluginMessageStore) messageStore).getNext();
+        } else {
+            defaultMessageStore = (DefaultMessageStore) messageStore;
+        }
+        RocksDBMessageStore rocksDBMessageStore = 
defaultMessageStore.getRocksDBMessageStore();
+        CheckRocksdbCqWriteResult result = new CheckRocksdbCqWriteResult();
+
+        if 
(defaultMessageStore.getMessageStoreConfig().getStoreType().equals(StoreType.DEFAULT_ROCKSDB.getStoreType()))
 {
+            result.setCheckResult("storeType is DEFAULT_ROCKSDB, no need 
check");
+            
result.setCheckStatus(CheckRocksdbCqWriteResult.CheckStatus.CHECK_OK.getValue());
+            return result;
+        }
+
+        if 
(!defaultMessageStore.getMessageStoreConfig().isRocksdbCQDoubleWriteEnable()) {
+            result.setCheckResult("rocksdbCQWriteEnable is false, 
checkRocksdbCqWriteProgressCommand is invalid");
+            
result.setCheckStatus(CheckRocksdbCqWriteResult.CheckStatus.CHECK_NOT_OK.getValue());
+            return result;
+        }
+
+        ConcurrentMap<String, ConcurrentMap<Integer, ConsumeQueueInterface>> 
cqTable = defaultMessageStore.getConsumeQueueTable();
+        StringBuilder diffResult = new StringBuilder();
+        try {
+            if (StringUtils.isNotBlank(requestTopic)) {
+                boolean checkResult = 
processConsumeQueuesForTopic(cqTable.get(requestTopic), requestTopic, 
rocksDBMessageStore, diffResult, true, requestHeader.getCheckStoreTime());
+                result.setCheckResult(diffResult.toString());
+                result.setCheckStatus(checkResult ? 
CheckRocksdbCqWriteResult.CheckStatus.CHECK_OK.getValue() : 
CheckRocksdbCqWriteResult.CheckStatus.CHECK_NOT_OK.getValue());
+                return result;
+            }
+            int successNum = 0;
+            int checkSize = 0;
+            for (Map.Entry<String, ConcurrentMap<Integer, 
ConsumeQueueInterface>> topicEntry : cqTable.entrySet()) {
+                boolean checkResult = 
processConsumeQueuesForTopic(topicEntry.getValue(), topicEntry.getKey(), 
rocksDBMessageStore, diffResult, false, requestHeader.getCheckStoreTime());
+                successNum += checkResult ? 1 : 0;
+                checkSize++;
+            }
+            // check all topic finish, all topic is ready, checkSize: 100, 
currentQueueNum: 110      -> ready  (The currentQueueNum means when we do 
checking, new topics are added.)
+            // check all topic finish, success/all : 89/100, currentQueueNum: 
110                    -> not ready
+            boolean checkReady = successNum == checkSize;
+            String checkResultString = checkReady ? String.format("all topic 
is ready, checkSize: %s, currentQueueNum: %s", checkSize, cqTable.size()) :
+                String.format("success/all : %s/%s, currentQueueNum: %s", 
successNum, checkSize, cqTable.size());
+            diffResult.append("check all topic finish, 
").append(checkResultString);
+            result.setCheckResult(diffResult.toString());
+            result.setCheckStatus(checkReady ? 
CheckRocksdbCqWriteResult.CheckStatus.CHECK_OK.getValue() : 
CheckRocksdbCqWriteResult.CheckStatus.CHECK_NOT_OK.getValue());
+        } catch (Exception e) {
+            LOGGER.error("CheckRocksdbCqWriteProgressCommand error", e);
+            result.setCheckResult(e.getMessage() + 
Arrays.toString(e.getStackTrace()));
+            
result.setCheckStatus(CheckRocksdbCqWriteResult.CheckStatus.CHECK_ERROR.getValue());
+        }
+        return result;
+    }
+
+    private boolean processConsumeQueuesForTopic(ConcurrentMap<Integer, 
ConsumeQueueInterface> queueMap, String topic, RocksDBMessageStore 
rocksDBMessageStore, StringBuilder diffResult, boolean printDetail, long 
checkpointByStoreTime) {
+        boolean processResult = true;
+        for (Map.Entry<Integer, ConsumeQueueInterface> queueEntry : 
queueMap.entrySet()) {
+            Integer queueId = queueEntry.getKey();
+            ConsumeQueueInterface jsonCq = queueEntry.getValue();
+            ConsumeQueueInterface kvCq = 
rocksDBMessageStore.getConsumeQueue(topic, queueId);
+            if (printDetail) {
+                String format = String.format("[topic: %s, queue:  %s] \n  
kvEarliest : %s |  kvLatest : %s \n fileEarliest: %s | fileEarliest: %s ",
+                    topic, queueId, kvCq.getEarliestUnit(), 
kvCq.getLatestUnit(), jsonCq.getEarliestUnit(), jsonCq.getLatestUnit());
+                diffResult.append(format).append("\n");
+            }
+
+            long minOffsetByTime = 0L;
+            try {
+                minOffsetByTime = 
rocksDBMessageStore.getConsumeQueueStore().getOffsetInQueueByTime(topic, 
queueId, checkpointByStoreTime, BoundaryType.UPPER);
+            } catch (Exception e) {
+                // ignore
+            }
+            long minOffsetInQueue = kvCq.getMinOffsetInQueue();
+            long checkFrom = Math.max(minOffsetInQueue, minOffsetByTime);
+            long checkTo = jsonCq.getMaxOffsetInQueue() - 1;
+            /*
+                                                            
checkTo(maxOffsetInQueue - 1)
+                                                                        v
+        fileCq   +------------------------------------------------------+
+        kvCq             +----------------------------------------------+
+                         ^                ^
+                   minOffsetInQueue   minOffsetByTime
+                                   ^
+                        checkFrom = max(minOffsetInQueue, minOffsetByTime)
+             */
+            // The latest message is earlier than the check time
+            Pair<CqUnit, Long> fileLatestCq = 
jsonCq.getCqUnitAndStoreTime(checkTo);
+            if (fileLatestCq != null) {
+                if (fileLatestCq.getObject2() < checkpointByStoreTime) {
+                    continue;
+                }
+            }
+            for (long i = checkFrom; i <= checkTo; i++) {
+                Pair<CqUnit, Long> fileCqUnit = 
jsonCq.getCqUnitAndStoreTime(i);
+                Pair<CqUnit, Long> kvCqUnit = kvCq.getCqUnitAndStoreTime(i);
+                if (fileCqUnit == null || kvCqUnit == null || 
!checkCqUnitEqual(kvCqUnit.getObject1(), fileCqUnit.getObject1())) {
+                    LOGGER.error(String.format("[topic: %s, queue: %s, offset: 
%s] \n file : %s  \n  kv : %s \n",
+                        topic, queueId, i, kvCqUnit != null ? 
kvCqUnit.getObject1() : "null", fileCqUnit != null ? fileCqUnit.getObject1() : 
"null"));
+                    processResult = false;
+                    break;
+                }
+            }
+        }
+        return processResult;
+    }
+
     private boolean checkCqUnitEqual(CqUnit cqUnit1, CqUnit cqUnit2) {
         if (cqUnit1.getQueueOffset() != cqUnit2.getQueueOffset()) {
             return false;
diff --git 
a/broker/src/test/java/org/apache/rocketmq/broker/offset/RocksdbTransferOffsetAndCqTest.java
 
b/broker/src/test/java/org/apache/rocketmq/broker/offset/RocksdbTransferOffsetAndCqTest.java
index 64c505eb77..4b320eb53f 100644
--- 
a/broker/src/test/java/org/apache/rocketmq/broker/offset/RocksdbTransferOffsetAndCqTest.java
+++ 
b/broker/src/test/java/org/apache/rocketmq/broker/offset/RocksdbTransferOffsetAndCqTest.java
@@ -76,7 +76,6 @@ public class RocksdbTransferOffsetAndCqTest {
         brokerConfig.setConsumerOffsetUpdateVersionStep(10);
         MessageStoreConfig messageStoreConfig = new MessageStoreConfig();
         messageStoreConfig.setStorePathRootDir(basePath);
-        messageStoreConfig.setTransferOffsetJsonToRocksdb(true);
         messageStoreConfig.setRocksdbCQDoubleWriteEnable(true);
         
Mockito.lenient().when(brokerController.getBrokerConfig()).thenReturn(brokerConfig);
         
Mockito.lenient().when(brokerController.getMessageStoreConfig()).thenReturn(messageStoreConfig);
diff --git 
a/broker/src/test/java/org/apache/rocketmq/broker/subscription/RocksdbGroupConfigTransferTest.java
 
b/broker/src/test/java/org/apache/rocketmq/broker/subscription/RocksdbGroupConfigTransferTest.java
index 26017af8a6..c75fe0d6a0 100644
--- 
a/broker/src/test/java/org/apache/rocketmq/broker/subscription/RocksdbGroupConfigTransferTest.java
+++ 
b/broker/src/test/java/org/apache/rocketmq/broker/subscription/RocksdbGroupConfigTransferTest.java
@@ -68,7 +68,6 @@ public class RocksdbGroupConfigTransferTest {
         
Mockito.lenient().when(brokerController.getBrokerConfig()).thenReturn(brokerConfig);
         MessageStoreConfig messageStoreConfig = new MessageStoreConfig();
         messageStoreConfig.setStorePathRootDir(basePath);
-        messageStoreConfig.setTransferMetadataJsonToRocksdb(true);
         
Mockito.lenient().when(brokerController.getMessageStoreConfig()).thenReturn(messageStoreConfig);
         
Mockito.lenient().when(brokerController.getMessageStore()).thenReturn(defaultMessageStore);
         when(defaultMessageStore.getStateMachineVersion()).thenReturn(0L);
diff --git 
a/broker/src/test/java/org/apache/rocketmq/broker/topic/RocksdbTopicConfigManagerTest.java
 
b/broker/src/test/java/org/apache/rocketmq/broker/topic/RocksdbTopicConfigManagerTest.java
index 080e1dd5a3..fa3ef95f55 100644
--- 
a/broker/src/test/java/org/apache/rocketmq/broker/topic/RocksdbTopicConfigManagerTest.java
+++ 
b/broker/src/test/java/org/apache/rocketmq/broker/topic/RocksdbTopicConfigManagerTest.java
@@ -72,7 +72,6 @@ public class RocksdbTopicConfigManagerTest {
         when(brokerController.getBrokerConfig()).thenReturn(brokerConfig);
         MessageStoreConfig messageStoreConfig = new MessageStoreConfig();
         messageStoreConfig.setStorePathRootDir(basePath);
-        messageStoreConfig.setTransferMetadataJsonToRocksdb(true);
         
when(brokerController.getMessageStoreConfig()).thenReturn(messageStoreConfig);
         
Mockito.lenient().when(brokerController.getMessageStore()).thenReturn(defaultMessageStore);
         
Mockito.lenient().when(defaultMessageStore.getStateMachineVersion()).thenReturn(0L);
diff --git 
a/broker/src/test/java/org/apache/rocketmq/broker/topic/RocksdbTopicConfigTransferTest.java
 
b/broker/src/test/java/org/apache/rocketmq/broker/topic/RocksdbTopicConfigTransferTest.java
index fb345548e4..e925ed4bd8 100644
--- 
a/broker/src/test/java/org/apache/rocketmq/broker/topic/RocksdbTopicConfigTransferTest.java
+++ 
b/broker/src/test/java/org/apache/rocketmq/broker/topic/RocksdbTopicConfigTransferTest.java
@@ -69,7 +69,6 @@ public class RocksdbTopicConfigTransferTest {
         when(brokerController.getBrokerConfig()).thenReturn(brokerConfig);
         MessageStoreConfig messageStoreConfig = new MessageStoreConfig();
         messageStoreConfig.setStorePathRootDir(basePath);
-        messageStoreConfig.setTransferMetadataJsonToRocksdb(true);
         
Mockito.lenient().when(brokerController.getMessageStoreConfig()).thenReturn(messageStoreConfig);
         
when(brokerController.getMessageStore()).thenReturn(defaultMessageStore);
         when(defaultMessageStore.getStateMachineVersion()).thenReturn(0L);
diff --git 
a/client/src/main/java/org/apache/rocketmq/client/impl/MQClientAPIImpl.java 
b/client/src/main/java/org/apache/rocketmq/client/impl/MQClientAPIImpl.java
index 716d081ef4..554b1efa52 100644
--- a/client/src/main/java/org/apache/rocketmq/client/impl/MQClientAPIImpl.java
+++ b/client/src/main/java/org/apache/rocketmq/client/impl/MQClientAPIImpl.java
@@ -56,6 +56,7 @@ import org.apache.rocketmq.client.producer.SendResult;
 import org.apache.rocketmq.client.producer.SendStatus;
 import org.apache.rocketmq.client.rpchook.NamespaceRpcHook;
 import org.apache.rocketmq.common.BoundaryType;
+import org.apache.rocketmq.common.CheckRocksdbCqWriteResult;
 import org.apache.rocketmq.common.MQVersion;
 import org.apache.rocketmq.common.MixAll;
 import org.apache.rocketmq.common.Pair;
@@ -113,7 +114,6 @@ import 
org.apache.rocketmq.remoting.protocol.body.BrokerMemberGroup;
 import org.apache.rocketmq.remoting.protocol.body.BrokerReplicasInfo;
 import org.apache.rocketmq.remoting.protocol.body.BrokerStatsData;
 import org.apache.rocketmq.remoting.protocol.body.CheckClientRequestBody;
-import 
org.apache.rocketmq.remoting.protocol.body.CheckRocksdbCqWriteProgressResponseBody;
 import org.apache.rocketmq.remoting.protocol.body.ClusterAclVersionInfo;
 import org.apache.rocketmq.remoting.protocol.body.ClusterInfo;
 import org.apache.rocketmq.remoting.protocol.body.ConsumeMessageDirectlyResult;
@@ -3019,15 +3019,16 @@ public class MQClientAPIImpl implements 
NameServerUpdateCallback, StartAndShutdo
         throw new MQClientException(response.getCode(), response.getRemark());
     }
 
-    public CheckRocksdbCqWriteProgressResponseBody 
checkRocksdbCqWriteProgress(final String brokerAddr, final String topic, final 
long timeoutMillis) throws InterruptedException,
+    public CheckRocksdbCqWriteResult checkRocksdbCqWriteProgress(final String 
brokerAddr, final String topic, final long checkStoreTime, final long 
timeoutMillis) throws InterruptedException,
         RemotingTimeoutException, RemotingSendRequestException, 
RemotingConnectException, MQClientException {
         CheckRocksdbCqWriteProgressRequestHeader header = new 
CheckRocksdbCqWriteProgressRequestHeader();
         header.setTopic(topic);
+        header.setCheckStoreTime(checkStoreTime);
         RemotingCommand request = 
RemotingCommand.createRequestCommand(RequestCode.CHECK_ROCKSDB_CQ_WRITE_PROGRESS,
 header);
         RemotingCommand response = this.remotingClient.invokeSync(brokerAddr, 
request, timeoutMillis);
         assert response != null;
         if (ResponseCode.SUCCESS == response.getCode()) {
-            return 
CheckRocksdbCqWriteProgressResponseBody.decode(response.getBody(), 
CheckRocksdbCqWriteProgressResponseBody.class);
+            return JSON.parseObject(response.getBody(), 
CheckRocksdbCqWriteResult.class);
         }
         throw new MQClientException(response.getCode(), response.getRemark());
     }
diff --git 
a/remoting/src/main/java/org/apache/rocketmq/remoting/protocol/body/CheckRocksdbCqWriteProgressResponseBody.java
 
b/common/src/main/java/org/apache/rocketmq/common/CheckRocksdbCqWriteResult.java
similarity index 52%
rename from 
remoting/src/main/java/org/apache/rocketmq/remoting/protocol/body/CheckRocksdbCqWriteProgressResponseBody.java
rename to 
common/src/main/java/org/apache/rocketmq/common/CheckRocksdbCqWriteResult.java
index 76719ac1a2..fc67df86c2 100644
--- 
a/remoting/src/main/java/org/apache/rocketmq/remoting/protocol/body/CheckRocksdbCqWriteProgressResponseBody.java
+++ 
b/common/src/main/java/org/apache/rocketmq/common/CheckRocksdbCqWriteResult.java
@@ -15,21 +15,43 @@
  * limitations under the License.
  */
 
-package org.apache.rocketmq.remoting.protocol.body;
+package org.apache.rocketmq.common;
 
-import org.apache.rocketmq.remoting.protocol.RemotingSerializable;
+public class CheckRocksdbCqWriteResult {
+    String checkResult;
 
-public class CheckRocksdbCqWriteProgressResponseBody extends 
RemotingSerializable  {
+    int checkStatus;
 
-    String diffResult;
+    public enum CheckStatus {
+        CHECK_OK(0),
+        CHECK_NOT_OK(1),
+        CHECK_IN_PROGRESS(2),
+        CHECK_ERROR(3);
 
-    public String getDiffResult() {
-        return diffResult;
+        private int value;
+
+        CheckStatus(int value) {
+            this.value = value;
+        }
+
+        public int getValue() {
+            return value;
+        }
+    }
+
+    public String getCheckResult() {
+        return checkResult;
     }
 
-    public void setDiffResult(String diffResult) {
-        this.diffResult = diffResult;
+    public void setCheckResult(String checkResult) {
+        this.checkResult = checkResult;
     }
 
+    public int getCheckStatus() {
+        return checkStatus;
+    }
 
+    public void setCheckStatus(int checkStatus) {
+        this.checkStatus = checkStatus;
+    }
 }
diff --git 
a/common/src/main/java/org/apache/rocketmq/common/config/AbstractRocksDBStorage.java
 
b/common/src/main/java/org/apache/rocketmq/common/config/AbstractRocksDBStorage.java
index 42ddbdc728..d434cce745 100644
--- 
a/common/src/main/java/org/apache/rocketmq/common/config/AbstractRocksDBStorage.java
+++ 
b/common/src/main/java/org/apache/rocketmq/common/config/AbstractRocksDBStorage.java
@@ -86,6 +86,7 @@ public abstract class AbstractRocksDBStorage {
     protected final List<ColumnFamilyHandle> cfHandles = new ArrayList<>();
 
     protected volatile boolean loaded;
+    protected CompressionType compressionType = 
CompressionType.LZ4_COMPRESSION;
     private volatile boolean closed;
 
     private final Semaphore reloadPermit = new Semaphore(1);
@@ -156,7 +157,7 @@ public abstract class AbstractRocksDBStorage {
 
     protected void initCompactionOptions() {
         this.compactionOptions = new CompactionOptions();
-        this.compactionOptions.setCompression(CompressionType.LZ4_COMPRESSION);
+        this.compactionOptions.setCompression(compressionType);
         this.compactionOptions.setMaxSubcompactions(4);
         this.compactionOptions.setOutputFileSizeLimit(4 * 1024 * 1024 * 1024L);
     }
diff --git 
a/common/src/main/java/org/apache/rocketmq/common/config/ConfigRocksDBStorage.java
 
b/common/src/main/java/org/apache/rocketmq/common/config/ConfigRocksDBStorage.java
index 36da6834ff..3b924a6a0d 100644
--- 
a/common/src/main/java/org/apache/rocketmq/common/config/ConfigRocksDBStorage.java
+++ 
b/common/src/main/java/org/apache/rocketmq/common/config/ConfigRocksDBStorage.java
@@ -25,6 +25,7 @@ import org.apache.rocketmq.common.UtilAll;
 import org.rocksdb.ColumnFamilyDescriptor;
 import org.rocksdb.ColumnFamilyHandle;
 import org.rocksdb.ColumnFamilyOptions;
+import org.rocksdb.CompressionType;
 import org.rocksdb.ReadOptions;
 import org.rocksdb.RocksDB;
 import org.rocksdb.RocksDBException;
@@ -37,12 +38,17 @@ public class ConfigRocksDBStorage extends 
AbstractRocksDBStorage {
 
     protected ColumnFamilyHandle kvDataVersionFamilyHandle;
     protected ColumnFamilyHandle forbiddenFamilyHandle;
-
     public static final byte[] KV_DATA_VERSION_KEY = 
"kvDataVersionKey".getBytes(StandardCharsets.UTF_8);
 
+
+
     public ConfigRocksDBStorage(final String dbPath) {
-        super(dbPath);
-        this.readOnly = false;
+        this(dbPath, false);
+    }
+
+    public ConfigRocksDBStorage(final String dbPath, CompressionType 
compressionType) {
+        this(dbPath, false);
+        this.compressionType = compressionType;
     }
 
     public ConfigRocksDBStorage(final String dbPath, boolean readOnly) {
diff --git 
a/remoting/src/main/java/org/apache/rocketmq/remoting/protocol/header/CheckRocksdbCqWriteProgressRequestHeader.java
 
b/remoting/src/main/java/org/apache/rocketmq/remoting/protocol/header/CheckRocksdbCqWriteProgressRequestHeader.java
index fee158b497..f679077fdd 100644
--- 
a/remoting/src/main/java/org/apache/rocketmq/remoting/protocol/header/CheckRocksdbCqWriteProgressRequestHeader.java
+++ 
b/remoting/src/main/java/org/apache/rocketmq/remoting/protocol/header/CheckRocksdbCqWriteProgressRequestHeader.java
@@ -32,6 +32,8 @@ public class CheckRocksdbCqWriteProgressRequestHeader 
implements CommandCustomHe
     @RocketMQResource(ResourceType.TOPIC)
     private String topic;
 
+    private long checkStoreTime;
+
     @Override
     public void checkFields() throws RemotingCommandException {
 
@@ -44,4 +46,13 @@ public class CheckRocksdbCqWriteProgressRequestHeader 
implements CommandCustomHe
     public void setTopic(String topic) {
         this.topic = topic;
     }
+
+    public long getCheckStoreTime() {
+        return checkStoreTime;
+    }
+
+    public void setCheckStoreTime(long checkStoreTime) {
+        this.checkStoreTime = checkStoreTime;
+    }
+
 }
diff --git 
a/store/src/main/java/org/apache/rocketmq/store/RocksDBMessageStore.java 
b/store/src/main/java/org/apache/rocketmq/store/RocksDBMessageStore.java
index 0a7119cab1..321689ac8f 100644
--- a/store/src/main/java/org/apache/rocketmq/store/RocksDBMessageStore.java
+++ b/store/src/main/java/org/apache/rocketmq/store/RocksDBMessageStore.java
@@ -173,6 +173,10 @@ public class RocksDBMessageStore extends 
DefaultMessageStore {
     class CommitLogDispatcherBuildRocksdbConsumeQueue implements 
CommitLogDispatcher {
         @Override
         public void dispatch(DispatchRequest request) throws RocksDBException {
+            boolean enable = 
getMessageStoreConfig().isRocksdbCQDoubleWriteEnable();
+            if (!enable) {
+                return;
+            }
             final int tranType = 
MessageSysFlag.getTransactionValue(request.getSysFlag());
             switch (tranType) {
                 case MessageSysFlag.TRANSACTION_NOT_TYPE:
diff --git 
a/store/src/main/java/org/apache/rocketmq/store/config/MessageStoreConfig.java 
b/store/src/main/java/org/apache/rocketmq/store/config/MessageStoreConfig.java
index e31c03dd22..fe090e3fa2 100644
--- 
a/store/src/main/java/org/apache/rocketmq/store/config/MessageStoreConfig.java
+++ 
b/store/src/main/java/org/apache/rocketmq/store/config/MessageStoreConfig.java
@@ -22,6 +22,7 @@ import org.apache.rocketmq.common.annotation.ImportantField;
 import org.apache.rocketmq.store.ConsumeQueue;
 import org.apache.rocketmq.store.StoreType;
 import org.apache.rocketmq.store.queue.BatchConsumeQueue;
+import org.rocksdb.CompressionType;
 
 public class MessageStoreConfig {
 
@@ -106,8 +107,6 @@ public class MessageStoreConfig {
     @ImportantField
     private String storeType = StoreType.DEFAULT.getStoreType();
 
-    private boolean transferMetadataJsonToRocksdb = false;
-
     // ConsumeQueue file size,default is 30W
     private int mappedFileSizeConsumeQueue = 300000 * 
ConsumeQueue.CQ_STORE_UNIT_SIZE;
     // enable consume queue ext
@@ -424,8 +423,6 @@ public class MessageStoreConfig {
 
     private boolean putConsumeQueueDataByFileChannel = true;
 
-    private boolean transferOffsetJsonToRocksdb = false;
-
     private boolean rocksdbCQDoubleWriteEnable = false;
 
     /**
@@ -443,7 +440,17 @@ public class MessageStoreConfig {
      *
      * LZ4 is the recommended one.
      */
-    private String bottomMostCompressionTypeForConsumeQueueStore = "zstd";
+    private String bottomMostCompressionTypeForConsumeQueueStore = 
CompressionType.ZSTD_COMPRESSION.getLibraryName();
+
+    private String rocksdbCompressionType = 
CompressionType.LZ4_COMPRESSION.getLibraryName();
+
+    public String getRocksdbCompressionType() {
+        return rocksdbCompressionType;
+    }
+
+    public void setRocksdbCompressionType(String compressionType) {
+        this.rocksdbCompressionType = compressionType;
+    }
 
     /**
      * Spin number in the retreat strategy of spin lock
@@ -464,13 +471,6 @@ public class MessageStoreConfig {
         this.rocksdbCQDoubleWriteEnable = rocksdbWriteEnable;
     }
 
-    public boolean isTransferOffsetJsonToRocksdb() {
-        return transferOffsetJsonToRocksdb;
-    }
-
-    public void setTransferOffsetJsonToRocksdb(boolean 
transferOffsetJsonToRocksdb) {
-        this.transferOffsetJsonToRocksdb = transferOffsetJsonToRocksdb;
-    }
 
     public boolean isEnabledAppendPropCRC() {
         return enabledAppendPropCRC;
@@ -1894,14 +1894,6 @@ public class MessageStoreConfig {
         this.putConsumeQueueDataByFileChannel = 
putConsumeQueueDataByFileChannel;
     }
 
-    public boolean isTransferMetadataJsonToRocksdb() {
-        return transferMetadataJsonToRocksdb;
-    }
-
-    public void setTransferMetadataJsonToRocksdb(boolean 
transferMetadataJsonToRocksdb) {
-        this.transferMetadataJsonToRocksdb = transferMetadataJsonToRocksdb;
-    }
-
     public String getBottomMostCompressionTypeForConsumeQueueStore() {
         return bottomMostCompressionTypeForConsumeQueueStore;
     }
diff --git 
a/store/src/main/java/org/apache/rocketmq/store/rocksdb/RocksDBOptionsFactory.java
 
b/store/src/main/java/org/apache/rocketmq/store/rocksdb/RocksDBOptionsFactory.java
index d373ba6249..66f5cbd095 100644
--- 
a/store/src/main/java/org/apache/rocketmq/store/rocksdb/RocksDBOptionsFactory.java
+++ 
b/store/src/main/java/org/apache/rocketmq/store/rocksdb/RocksDBOptionsFactory.java
@@ -67,13 +67,16 @@ public class RocksDBOptionsFactory {
                 setCompressionSizePercent(-1);
         String bottomMostCompressionTypeOpt = 
messageStore.getMessageStoreConfig()
             .getBottomMostCompressionTypeForConsumeQueueStore();
+        String compressionTypeOpt = messageStore.getMessageStoreConfig()
+            .getRocksdbCompressionType();
         CompressionType bottomMostCompressionType = 
CompressionType.getCompressionType(bottomMostCompressionTypeOpt);
+        CompressionType compressionType = 
CompressionType.getCompressionType(compressionTypeOpt);
         return columnFamilyOptions.setMaxWriteBufferNumber(4).
                 setWriteBufferSize(128 * SizeUnit.MB).
                 setMinWriteBufferNumberToMerge(1).
                 setTableFormatConfig(blockBasedTableConfig).
                 setMemTableConfig(new SkipListMemTableConfig()).
-                setCompressionType(CompressionType.LZ4_COMPRESSION).
+                setCompressionType(compressionType).
                 setBottommostCompressionType(bottomMostCompressionType).
                 setNumLevels(7).
                 setCompactionStyle(CompactionStyle.UNIVERSAL).
diff --git 
a/tools/src/main/java/org/apache/rocketmq/tools/admin/DefaultMQAdminExt.java 
b/tools/src/main/java/org/apache/rocketmq/tools/admin/DefaultMQAdminExt.java
index 3686bf2644..c5ecdefb52 100644
--- a/tools/src/main/java/org/apache/rocketmq/tools/admin/DefaultMQAdminExt.java
+++ b/tools/src/main/java/org/apache/rocketmq/tools/admin/DefaultMQAdminExt.java
@@ -26,6 +26,7 @@ import org.apache.rocketmq.client.QueryResult;
 import org.apache.rocketmq.client.exception.MQBrokerException;
 import org.apache.rocketmq.client.exception.MQClientException;
 import org.apache.rocketmq.common.BoundaryType;
+import org.apache.rocketmq.common.CheckRocksdbCqWriteResult;
 import org.apache.rocketmq.common.Pair;
 import org.apache.rocketmq.common.PlainAccessConfig;
 import org.apache.rocketmq.common.TopicConfig;
@@ -52,7 +53,6 @@ import 
org.apache.rocketmq.remoting.protocol.body.ConsumeMessageDirectlyResult;
 import org.apache.rocketmq.remoting.protocol.body.ConsumeStatsList;
 import org.apache.rocketmq.remoting.protocol.body.ConsumerConnection;
 import org.apache.rocketmq.remoting.protocol.body.ConsumerRunningInfo;
-import 
org.apache.rocketmq.remoting.protocol.body.CheckRocksdbCqWriteProgressResponseBody;
 import org.apache.rocketmq.remoting.protocol.body.EpochEntryCache;
 import org.apache.rocketmq.remoting.protocol.body.GroupList;
 import org.apache.rocketmq.remoting.protocol.body.HARuntimeInfo;
@@ -773,9 +773,9 @@ public class DefaultMQAdminExt extends ClientConfig 
implements MQAdminExt {
     }
 
     @Override
-    public CheckRocksdbCqWriteProgressResponseBody 
checkRocksdbCqWriteProgress(String brokerAddr, String topic)
+    public CheckRocksdbCqWriteResult checkRocksdbCqWriteProgress(String 
brokerAddr, String topic, long checkStoreTime)
         throws InterruptedException, RemotingTimeoutException, 
RemotingSendRequestException, RemotingConnectException, MQClientException {
-        return 
this.defaultMQAdminExtImpl.checkRocksdbCqWriteProgress(brokerAddr, topic);
+        return 
this.defaultMQAdminExtImpl.checkRocksdbCqWriteProgress(brokerAddr, topic, 
checkStoreTime);
     }
 
     @Override
diff --git 
a/tools/src/main/java/org/apache/rocketmq/tools/admin/DefaultMQAdminExtImpl.java
 
b/tools/src/main/java/org/apache/rocketmq/tools/admin/DefaultMQAdminExtImpl.java
index 883dcbe41d..17f14f23af 100644
--- 
a/tools/src/main/java/org/apache/rocketmq/tools/admin/DefaultMQAdminExtImpl.java
+++ 
b/tools/src/main/java/org/apache/rocketmq/tools/admin/DefaultMQAdminExtImpl.java
@@ -46,6 +46,7 @@ import org.apache.rocketmq.client.exception.MQClientException;
 import org.apache.rocketmq.client.impl.MQClientManager;
 import org.apache.rocketmq.client.impl.factory.MQClientInstance;
 import org.apache.rocketmq.common.BoundaryType;
+import org.apache.rocketmq.common.CheckRocksdbCqWriteResult;
 import org.apache.rocketmq.common.KeyBuilder;
 import org.apache.rocketmq.common.MixAll;
 import org.apache.rocketmq.common.Pair;
@@ -90,7 +91,6 @@ import 
org.apache.rocketmq.remoting.protocol.body.ConsumeMessageDirectlyResult;
 import org.apache.rocketmq.remoting.protocol.body.ConsumeStatsList;
 import org.apache.rocketmq.remoting.protocol.body.ConsumerConnection;
 import org.apache.rocketmq.remoting.protocol.body.ConsumerRunningInfo;
-import 
org.apache.rocketmq.remoting.protocol.body.CheckRocksdbCqWriteProgressResponseBody;
 import org.apache.rocketmq.remoting.protocol.body.EpochEntryCache;
 import org.apache.rocketmq.remoting.protocol.body.GroupList;
 import org.apache.rocketmq.remoting.protocol.body.HARuntimeInfo;
@@ -1819,9 +1819,9 @@ public class DefaultMQAdminExtImpl implements MQAdminExt, 
MQAdminExtInner {
     }
 
     @Override
-    public CheckRocksdbCqWriteProgressResponseBody 
checkRocksdbCqWriteProgress(String brokerAddr, String topic)
+    public CheckRocksdbCqWriteResult checkRocksdbCqWriteProgress(String 
brokerAddr, String topic, long checkStoreTime)
         throws InterruptedException, RemotingTimeoutException, 
RemotingSendRequestException, RemotingConnectException, MQClientException {
-        return 
this.mqClientInstance.getMQClientAPIImpl().checkRocksdbCqWriteProgress(brokerAddr,
 topic, timeoutMillis);
+        return 
this.mqClientInstance.getMQClientAPIImpl().checkRocksdbCqWriteProgress(brokerAddr,
 topic, checkStoreTime, timeoutMillis);
     }
 
     @Override
diff --git 
a/tools/src/main/java/org/apache/rocketmq/tools/admin/MQAdminExt.java 
b/tools/src/main/java/org/apache/rocketmq/tools/admin/MQAdminExt.java
index 09204ab7be..aea43376ea 100644
--- a/tools/src/main/java/org/apache/rocketmq/tools/admin/MQAdminExt.java
+++ b/tools/src/main/java/org/apache/rocketmq/tools/admin/MQAdminExt.java
@@ -24,6 +24,7 @@ import java.util.Set;
 import org.apache.rocketmq.client.MQAdmin;
 import org.apache.rocketmq.client.exception.MQBrokerException;
 import org.apache.rocketmq.client.exception.MQClientException;
+import org.apache.rocketmq.common.CheckRocksdbCqWriteResult;
 import org.apache.rocketmq.common.Pair;
 import org.apache.rocketmq.common.PlainAccessConfig;
 import org.apache.rocketmq.common.TopicConfig;
@@ -48,7 +49,6 @@ import 
org.apache.rocketmq.remoting.protocol.body.ConsumeMessageDirectlyResult;
 import org.apache.rocketmq.remoting.protocol.body.ConsumeStatsList;
 import org.apache.rocketmq.remoting.protocol.body.ConsumerConnection;
 import org.apache.rocketmq.remoting.protocol.body.ConsumerRunningInfo;
-import 
org.apache.rocketmq.remoting.protocol.body.CheckRocksdbCqWriteProgressResponseBody;
 import org.apache.rocketmq.remoting.protocol.body.EpochEntryCache;
 import org.apache.rocketmq.remoting.protocol.body.GroupList;
 import org.apache.rocketmq.remoting.protocol.body.HARuntimeInfo;
@@ -149,7 +149,8 @@ public interface MQAdminExt extends MQAdmin {
         final String consumerGroup) throws RemotingException, 
MQClientException, InterruptedException,
         MQBrokerException;
 
-    CheckRocksdbCqWriteProgressResponseBody checkRocksdbCqWriteProgress(String 
brokerAddr, String topic) throws InterruptedException, 
RemotingTimeoutException, RemotingSendRequestException, 
RemotingConnectException, MQClientException;
+    CheckRocksdbCqWriteResult checkRocksdbCqWriteProgress(String brokerAddr, 
String topic, long checkStoreTime)
+        throws InterruptedException, RemotingTimeoutException, 
RemotingSendRequestException, RemotingConnectException, MQClientException;
 
     ConsumeStats examineConsumeStats(final String consumerGroup,
         final String topic) throws RemotingException, MQClientException,
diff --git 
a/tools/src/main/java/org/apache/rocketmq/tools/command/queue/CheckRocksdbCqWriteProgressCommand.java
 
b/tools/src/main/java/org/apache/rocketmq/tools/command/queue/CheckRocksdbCqWriteProgressCommand.java
index d18a24ee1d..a0fc9fce1f 100644
--- 
a/tools/src/main/java/org/apache/rocketmq/tools/command/queue/CheckRocksdbCqWriteProgressCommand.java
+++ 
b/tools/src/main/java/org/apache/rocketmq/tools/command/queue/CheckRocksdbCqWriteProgressCommand.java
@@ -19,12 +19,13 @@ package org.apache.rocketmq.tools.command.queue;
 
 import java.util.Map;
 import java.util.Set;
+import java.util.concurrent.TimeUnit;
 import org.apache.commons.cli.CommandLine;
 import org.apache.commons.cli.Option;
 import org.apache.commons.cli.Options;
 import org.apache.commons.lang3.StringUtils;
+import org.apache.rocketmq.common.CheckRocksdbCqWriteResult;
 import org.apache.rocketmq.remoting.RPCHook;
-import 
org.apache.rocketmq.remoting.protocol.body.CheckRocksdbCqWriteProgressResponseBody;
 import org.apache.rocketmq.remoting.protocol.body.ClusterInfo;
 import org.apache.rocketmq.remoting.protocol.route.BrokerData;
 import org.apache.rocketmq.tools.admin.DefaultMQAdminExt;
@@ -53,8 +54,11 @@ public class CheckRocksdbCqWriteProgressCommand implements 
SubCommand {
         options.addOption(opt);
 
         opt = new Option("t", "topic", true, "topic name");
-        opt.setRequired(false);
         options.addOption(opt);
+
+        opt = new Option("cf", "checkFrom", true, "check from time");
+        options.addOption(opt);
+
         return options;
     }
 
@@ -66,6 +70,10 @@ public class CheckRocksdbCqWriteProgressCommand implements 
SubCommand {
         
defaultMQAdminExt.setNamesrvAddr(StringUtils.trim(commandLine.getOptionValue('n')));
         String clusterName = commandLine.hasOption('c') ? 
commandLine.getOptionValue('c').trim() : "";
         String topic = commandLine.hasOption('t') ? 
commandLine.getOptionValue('t').trim() : "";
+        // The default check is 30 days
+        long checkStoreTime = commandLine.hasOption("cf")
+            ? Long.parseLong(commandLine.getOptionValue("cf").trim())
+            : System.currentTimeMillis() - TimeUnit.DAYS.toMillis(30L);
 
         try {
             defaultMQAdminExt.start();
@@ -80,14 +88,13 @@ public class CheckRocksdbCqWriteProgressCommand implements 
SubCommand {
                 String brokerName = entry.getKey();
                 BrokerData brokerData = entry.getValue();
                 String brokerAddr = brokerData.getBrokerAddrs().get(0L);
-                CheckRocksdbCqWriteProgressResponseBody body = 
defaultMQAdminExt.checkRocksdbCqWriteProgress(brokerAddr, topic);
-                if (StringUtils.isNotBlank(topic)) {
-                    System.out.print(body.getDiffResult());
+                CheckRocksdbCqWriteResult result = 
defaultMQAdminExt.checkRocksdbCqWriteProgress(brokerAddr, topic, 
checkStoreTime);
+                if (result.getCheckStatus() == 
CheckRocksdbCqWriteResult.CheckStatus.CHECK_ERROR.getValue()) {
+                    System.out.print(brokerName + " check error, please check 
log... errInfo: " + result.getCheckResult());
                 } else {
-                    System.out.print(brokerName + " | " + brokerAddr + " | \n" 
+ body.getDiffResult());
+                    System.out.print(brokerName + " check doing, please wait 
and get the result from log... \n");
                 }
             }
-
         } catch (Exception e) {
             throw new RuntimeException(this.getClass().getSimpleName() + " 
command failed", e);
         } finally {

Reply via email to