This is an automated email from the ASF dual-hosted git repository. lollipop pushed a commit to branch develop in repository https://gitbox.apache.org/repos/asf/rocketmq.git
The following commit(s) were added to refs/heads/develop by this push: new 0b24768500 [ISSUE #8822] Double write cq, reduce unnecessary switches (#8823) 0b24768500 is described below commit 0b247685007258d1502fa992434402fd40b92cea Author: LetLetMe <43874697+letle...@users.noreply.github.com> AuthorDate: Tue Oct 29 19:09:35 2024 +0800 [ISSUE #8822] Double write cq, reduce unnecessary switches (#8823) * Reduce unnecessary switches --- .../rocketmq/broker/RocksDBConfigManager.java | 14 +- .../config/v1/RocksDBConsumerOffsetManager.java | 9 +- .../config/v1/RocksDBSubscriptionGroupManager.java | 8 +- .../config/v1/RocksDBTopicConfigManager.java | 8 +- .../broker/processor/AdminBrokerProcessor.java | 198 ++++++++++++++------- .../offset/RocksdbTransferOffsetAndCqTest.java | 1 - .../RocksdbGroupConfigTransferTest.java | 1 - .../topic/RocksdbTopicConfigManagerTest.java | 1 - .../topic/RocksdbTopicConfigTransferTest.java | 1 - .../rocketmq/client/impl/MQClientAPIImpl.java | 7 +- .../rocketmq/common/CheckRocksdbCqWriteResult.java | 38 +++- .../common/config/AbstractRocksDBStorage.java | 3 +- .../common/config/ConfigRocksDBStorage.java | 12 +- .../CheckRocksdbCqWriteProgressRequestHeader.java | 11 ++ .../apache/rocketmq/store/RocksDBMessageStore.java | 4 + .../rocketmq/store/config/MessageStoreConfig.java | 32 ++-- .../store/rocksdb/RocksDBOptionsFactory.java | 5 +- .../rocketmq/tools/admin/DefaultMQAdminExt.java | 6 +- .../tools/admin/DefaultMQAdminExtImpl.java | 6 +- .../apache/rocketmq/tools/admin/MQAdminExt.java | 5 +- .../queue/CheckRocksdbCqWriteProgressCommand.java | 21 ++- 21 files changed, 246 insertions(+), 145 deletions(-) diff --git a/broker/src/main/java/org/apache/rocketmq/broker/RocksDBConfigManager.java b/broker/src/main/java/org/apache/rocketmq/broker/RocksDBConfigManager.java index 20358c4707..ee2d4e54a6 100644 --- a/broker/src/main/java/org/apache/rocketmq/broker/RocksDBConfigManager.java +++ b/broker/src/main/java/org/apache/rocketmq/broker/RocksDBConfigManager.java @@ -17,40 +17,40 @@ package org.apache.rocketmq.broker; import com.alibaba.fastjson.JSON; +import java.nio.charset.StandardCharsets; +import java.util.function.BiConsumer; import org.apache.commons.lang3.StringUtils; import org.apache.rocketmq.common.config.ConfigRocksDBStorage; import org.apache.rocketmq.common.constant.LoggerName; import org.apache.rocketmq.logging.org.slf4j.Logger; import org.apache.rocketmq.logging.org.slf4j.LoggerFactory; import org.apache.rocketmq.remoting.protocol.DataVersion; +import org.rocksdb.CompressionType; import org.rocksdb.FlushOptions; import org.rocksdb.RocksIterator; import org.rocksdb.Statistics; import org.rocksdb.WriteBatch; -import java.nio.charset.StandardCharsets; -import java.util.function.BiConsumer; - public class RocksDBConfigManager { protected static final Logger BROKER_LOG = LoggerFactory.getLogger(LoggerName.BROKER_LOGGER_NAME); public volatile boolean isStop = false; public ConfigRocksDBStorage configRocksDBStorage = null; private FlushOptions flushOptions = null; private volatile long lastFlushMemTableMicroSecond = 0; - private final String filePath; private final long memTableFlushInterval; + private final CompressionType compressionType; private DataVersion kvDataVersion = new DataVersion(); - - public RocksDBConfigManager(String filePath, long memTableFlushInterval) { + public RocksDBConfigManager(String filePath, long memTableFlushInterval, CompressionType compressionType) { this.filePath = filePath; this.memTableFlushInterval = memTableFlushInterval; + this.compressionType = compressionType; } public boolean init() { this.isStop = false; - this.configRocksDBStorage = new ConfigRocksDBStorage(filePath); + this.configRocksDBStorage = new ConfigRocksDBStorage(filePath, compressionType); return this.configRocksDBStorage.start(); } public boolean loadDataVersion() { diff --git a/broker/src/main/java/org/apache/rocketmq/broker/config/v1/RocksDBConsumerOffsetManager.java b/broker/src/main/java/org/apache/rocketmq/broker/config/v1/RocksDBConsumerOffsetManager.java index 8066fe769a..824fc0fee3 100644 --- a/broker/src/main/java/org/apache/rocketmq/broker/config/v1/RocksDBConsumerOffsetManager.java +++ b/broker/src/main/java/org/apache/rocketmq/broker/config/v1/RocksDBConsumerOffsetManager.java @@ -31,6 +31,7 @@ import org.apache.rocketmq.common.utils.DataConverter; import org.apache.rocketmq.logging.org.slf4j.Logger; import org.apache.rocketmq.logging.org.slf4j.LoggerFactory; import org.apache.rocketmq.remoting.protocol.DataVersion; +import org.rocksdb.CompressionType; import org.rocksdb.WriteBatch; public class RocksDBConsumerOffsetManager extends ConsumerOffsetManager { @@ -41,7 +42,9 @@ public class RocksDBConsumerOffsetManager extends ConsumerOffsetManager { public RocksDBConsumerOffsetManager(BrokerController brokerController) { super(brokerController); - this.rocksDBConfigManager = new RocksDBConfigManager(rocksdbConfigFilePath(), brokerController.getMessageStoreConfig().getMemTableFlushIntervalMs()); + this.rocksDBConfigManager = new RocksDBConfigManager(rocksdbConfigFilePath(), brokerController.getMessageStoreConfig().getMemTableFlushIntervalMs(), + CompressionType.getCompressionType(brokerController.getMessageStoreConfig().getRocksdbCompressionType())); + } @Override @@ -61,10 +64,6 @@ public class RocksDBConsumerOffsetManager extends ConsumerOffsetManager { } private boolean merge() { - if (!brokerController.getMessageStoreConfig().isTransferOffsetJsonToRocksdb()) { - log.info("the switch transferOffsetJsonToRocksdb is off, no merge offset operation is needed."); - return true; - } if (!UtilAll.isPathExists(this.configFilePath()) && !UtilAll.isPathExists(this.configFilePath() + ".bak")) { log.info("consumerOffset json file does not exist, so skip merge"); return true; diff --git a/broker/src/main/java/org/apache/rocketmq/broker/config/v1/RocksDBSubscriptionGroupManager.java b/broker/src/main/java/org/apache/rocketmq/broker/config/v1/RocksDBSubscriptionGroupManager.java index 8175d63cce..8fc7a4d6ed 100644 --- a/broker/src/main/java/org/apache/rocketmq/broker/config/v1/RocksDBSubscriptionGroupManager.java +++ b/broker/src/main/java/org/apache/rocketmq/broker/config/v1/RocksDBSubscriptionGroupManager.java @@ -32,6 +32,7 @@ import org.apache.rocketmq.common.UtilAll; import org.apache.rocketmq.common.utils.DataConverter; import org.apache.rocketmq.remoting.protocol.DataVersion; import org.apache.rocketmq.remoting.protocol.subscription.SubscriptionGroupConfig; +import org.rocksdb.CompressionType; import org.rocksdb.RocksIterator; public class RocksDBSubscriptionGroupManager extends SubscriptionGroupManager { @@ -40,7 +41,8 @@ public class RocksDBSubscriptionGroupManager extends SubscriptionGroupManager { public RocksDBSubscriptionGroupManager(BrokerController brokerController) { super(brokerController, false); - this.rocksDBConfigManager = new RocksDBConfigManager(rocksdbConfigFilePath(), brokerController.getMessageStoreConfig().getMemTableFlushIntervalMs()); + this.rocksDBConfigManager = new RocksDBConfigManager(rocksdbConfigFilePath(), brokerController.getMessageStoreConfig().getMemTableFlushIntervalMs(), + CompressionType.getCompressionType(brokerController.getMessageStoreConfig().getRocksdbCompressionType())); } @Override @@ -78,10 +80,6 @@ public class RocksDBSubscriptionGroupManager extends SubscriptionGroupManager { private boolean merge() { - if (!brokerController.getMessageStoreConfig().isTransferMetadataJsonToRocksdb()) { - log.info("the switch transferMetadataJsonToRocksdb is off, no merge subGroup operation is needed."); - return true; - } if (!UtilAll.isPathExists(this.configFilePath()) && !UtilAll.isPathExists(this.configFilePath() + ".bak")) { log.info("subGroup json file does not exist, so skip merge"); return true; diff --git a/broker/src/main/java/org/apache/rocketmq/broker/config/v1/RocksDBTopicConfigManager.java b/broker/src/main/java/org/apache/rocketmq/broker/config/v1/RocksDBTopicConfigManager.java index bce67392f6..18e633d348 100644 --- a/broker/src/main/java/org/apache/rocketmq/broker/config/v1/RocksDBTopicConfigManager.java +++ b/broker/src/main/java/org/apache/rocketmq/broker/config/v1/RocksDBTopicConfigManager.java @@ -28,6 +28,7 @@ import org.apache.rocketmq.common.TopicConfig; import org.apache.rocketmq.common.UtilAll; import org.apache.rocketmq.common.utils.DataConverter; import org.apache.rocketmq.remoting.protocol.DataVersion; +import org.rocksdb.CompressionType; public class RocksDBTopicConfigManager extends TopicConfigManager { @@ -35,7 +36,8 @@ public class RocksDBTopicConfigManager extends TopicConfigManager { public RocksDBTopicConfigManager(BrokerController brokerController) { super(brokerController, false); - this.rocksDBConfigManager = new RocksDBConfigManager(rocksdbConfigFilePath(), brokerController.getMessageStoreConfig().getMemTableFlushIntervalMs()); + this.rocksDBConfigManager = new RocksDBConfigManager(rocksdbConfigFilePath(), brokerController.getMessageStoreConfig().getMemTableFlushIntervalMs(), + CompressionType.getCompressionType(brokerController.getMessageStoreConfig().getRocksdbCompressionType())); } @Override @@ -59,10 +61,6 @@ public class RocksDBTopicConfigManager extends TopicConfigManager { } private boolean merge() { - if (!brokerController.getMessageStoreConfig().isTransferMetadataJsonToRocksdb()) { - log.info("the switch transferMetadataJsonToRocksdb is off, no merge topic operation is needed."); - return true; - } if (!UtilAll.isPathExists(this.configFilePath()) && !UtilAll.isPathExists(this.configFilePath() + ".bak")) { log.info("topic json file does not exist, so skip merge"); return true; diff --git a/broker/src/main/java/org/apache/rocketmq/broker/processor/AdminBrokerProcessor.java b/broker/src/main/java/org/apache/rocketmq/broker/processor/AdminBrokerProcessor.java index aa962513df..381889c624 100644 --- a/broker/src/main/java/org/apache/rocketmq/broker/processor/AdminBrokerProcessor.java +++ b/broker/src/main/java/org/apache/rocketmq/broker/processor/AdminBrokerProcessor.java @@ -18,7 +18,6 @@ package org.apache.rocketmq.broker.processor; import com.alibaba.fastjson.JSON; import com.alibaba.fastjson.JSONObject; -import com.google.common.collect.ImmutableMap; import com.google.common.collect.Sets; import io.netty.channel.Channel; import io.netty.channel.ChannelHandlerContext; @@ -39,6 +38,9 @@ import java.util.concurrent.CompletableFuture; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ConcurrentMap; import java.util.concurrent.CountDownLatch; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.SynchronousQueue; +import java.util.concurrent.ThreadPoolExecutor; import java.util.concurrent.TimeUnit; import org.apache.commons.collections.CollectionUtils; import org.apache.commons.lang3.StringUtils; @@ -65,7 +67,9 @@ import org.apache.rocketmq.broker.metrics.InvocationStatus; import org.apache.rocketmq.broker.plugin.BrokerAttachedPlugin; import org.apache.rocketmq.broker.subscription.SubscriptionGroupManager; import org.apache.rocketmq.broker.transaction.queue.TransactionalMessageUtil; +import org.apache.rocketmq.common.BoundaryType; import org.apache.rocketmq.common.BrokerConfig; +import org.apache.rocketmq.common.CheckRocksdbCqWriteResult; import org.apache.rocketmq.common.KeyBuilder; import org.apache.rocketmq.common.LockCallback; import org.apache.rocketmq.common.MQVersion; @@ -214,6 +218,7 @@ import org.apache.rocketmq.store.PutMessageResult; import org.apache.rocketmq.store.PutMessageStatus; import org.apache.rocketmq.store.RocksDBMessageStore; import org.apache.rocketmq.store.SelectMappedBufferResult; +import org.apache.rocketmq.store.StoreType; import org.apache.rocketmq.store.config.BrokerRole; import org.apache.rocketmq.store.exception.ConsumeQueueException; import org.apache.rocketmq.store.plugin.AbstractPluginMessageStore; @@ -232,6 +237,7 @@ public class AdminBrokerProcessor implements NettyRequestProcessor { private static final Logger LOGGER = LoggerFactory.getLogger(LoggerName.BROKER_LOGGER_NAME); protected final BrokerController brokerController; protected Set<String> configBlackList = new HashSet<>(); + private final ExecutorService asyncExecuteWorker = new ThreadPoolExecutor(0, 1, 60L, TimeUnit.SECONDS, new SynchronousQueue<>()); public AdminBrokerProcessor(final BrokerController brokerController) { this.brokerController = brokerController; @@ -467,76 +473,23 @@ public class AdminBrokerProcessor implements NettyRequestProcessor { return response; } - private RemotingCommand checkRocksdbCqWriteProgress(ChannelHandlerContext ctx, RemotingCommand request) throws RemotingCommandException { - CheckRocksdbCqWriteProgressRequestHeader requestHeader = request.decodeCommandCustomHeader(CheckRocksdbCqWriteProgressRequestHeader.class); - String requestTopic = requestHeader.getTopic(); - final RemotingCommand response = RemotingCommand.createResponseCommand(null); - response.setCode(ResponseCode.SUCCESS); - MessageStore messageStore = brokerController.getMessageStore(); - DefaultMessageStore defaultMessageStore; - if (messageStore instanceof AbstractPluginMessageStore) { - defaultMessageStore = (DefaultMessageStore) ((AbstractPluginMessageStore) messageStore).getNext(); - } else { - defaultMessageStore = (DefaultMessageStore) messageStore; - } - RocksDBMessageStore rocksDBMessageStore = defaultMessageStore.getRocksDBMessageStore(); - if (!defaultMessageStore.getMessageStoreConfig().isRocksdbCQDoubleWriteEnable()) { - response.setBody(JSON.toJSONBytes(ImmutableMap.of("diffResult", "rocksdbCQWriteEnable is false, checkRocksdbCqWriteProgressCommand is invalid"))); - return response; - } - - ConcurrentMap<String, ConcurrentMap<Integer, ConsumeQueueInterface>> cqTable = defaultMessageStore.getConsumeQueueTable(); - StringBuilder diffResult = new StringBuilder(); - try { - if (StringUtils.isNotBlank(requestTopic)) { - processConsumeQueuesForTopic(cqTable.get(requestTopic), requestTopic, rocksDBMessageStore, diffResult,false); - response.setBody(JSON.toJSONBytes(ImmutableMap.of("diffResult", diffResult.toString()))); - return response; - } - for (Map.Entry<String, ConcurrentMap<Integer, ConsumeQueueInterface>> topicEntry : cqTable.entrySet()) { - String topic = topicEntry.getKey(); - processConsumeQueuesForTopic(topicEntry.getValue(), topic, rocksDBMessageStore, diffResult,true); + private RemotingCommand checkRocksdbCqWriteProgress(ChannelHandlerContext ctx, RemotingCommand request) { + CheckRocksdbCqWriteResult result = new CheckRocksdbCqWriteResult(); + result.setCheckStatus(CheckRocksdbCqWriteResult.CheckStatus.CHECK_IN_PROGRESS.getValue()); + Runnable runnable = () -> { + try { + CheckRocksdbCqWriteResult checkResult = doCheckRocksdbCqWriteProgress(ctx, request); + LOGGER.info("checkRocksdbCqWriteProgress result: {}", JSON.toJSONString(checkResult)); + } catch (Exception e) { + LOGGER.error("checkRocksdbCqWriteProgress error", e); } - diffResult.append("check all topic successful, size:").append(cqTable.size()); - response.setBody(JSON.toJSONBytes(ImmutableMap.of("diffResult", diffResult.toString()))); - - } catch (Exception e) { - LOGGER.error("CheckRocksdbCqWriteProgressCommand error", e); - response.setBody(JSON.toJSONBytes(ImmutableMap.of("diffResult", e.getMessage()))); - } + }; + asyncExecuteWorker.submit(runnable); + RemotingCommand response = RemotingCommand.createResponseCommand(null); + response.setCode(ResponseCode.SUCCESS); + response.setBody(JSON.toJSONBytes(result)); return response; } - - private void processConsumeQueuesForTopic(ConcurrentMap<Integer, ConsumeQueueInterface> queueMap, String topic, RocksDBMessageStore rocksDBMessageStore, StringBuilder diffResult, boolean checkAll) { - for (Map.Entry<Integer, ConsumeQueueInterface> queueEntry : queueMap.entrySet()) { - Integer queueId = queueEntry.getKey(); - ConsumeQueueInterface jsonCq = queueEntry.getValue(); - ConsumeQueueInterface kvCq = rocksDBMessageStore.getConsumeQueue(topic, queueId); - if (!checkAll) { - String format = String.format("\n[topic: %s, queue: %s] \n kvEarliest : %s | kvLatest : %s \n fileEarliest: %s | fileEarliest: %s ", - topic, queueId, kvCq.getEarliestUnit(), kvCq.getLatestUnit(), jsonCq.getEarliestUnit(), jsonCq.getLatestUnit()); - diffResult.append(format).append("\n"); - } - long maxFileOffsetInQueue = jsonCq.getMaxOffsetInQueue(); - long minOffsetInQueue = kvCq.getMinOffsetInQueue(); - for (long i = minOffsetInQueue; i < maxFileOffsetInQueue; i++) { - Pair<CqUnit, Long> fileCqUnit = jsonCq.getCqUnitAndStoreTime(i); - Pair<CqUnit, Long> kvCqUnit = kvCq.getCqUnitAndStoreTime(i); - if (fileCqUnit == null || kvCqUnit == null) { - diffResult.append(String.format("[topic: %s, queue: %s, offset: %s] \n kv : %s \n file : %s \n", - topic, queueId, i, kvCqUnit != null ? kvCqUnit.getObject1() : "null", fileCqUnit != null ? fileCqUnit.getObject1() : "null")); - return; - } - if (!checkCqUnitEqual(kvCqUnit.getObject1(), fileCqUnit.getObject1())) { - String diffInfo = String.format("[topic:%s, queue: %s offset: %s] \n file : %s \n kv : %s \n", - topic, queueId, i, kvCqUnit.getObject1(), fileCqUnit.getObject1()); - LOGGER.error(diffInfo); - diffResult.append(diffInfo).append(System.lineSeparator()); - return; - } - } - } - } @Override public boolean rejectRequest() { return false; @@ -3418,6 +3371,115 @@ public class AdminBrokerProcessor implements NettyRequestProcessor { return false; } + private CheckRocksdbCqWriteResult doCheckRocksdbCqWriteProgress(ChannelHandlerContext ctx, RemotingCommand request) throws RemotingCommandException { + CheckRocksdbCqWriteProgressRequestHeader requestHeader = request.decodeCommandCustomHeader(CheckRocksdbCqWriteProgressRequestHeader.class); + String requestTopic = requestHeader.getTopic(); + MessageStore messageStore = brokerController.getMessageStore(); + DefaultMessageStore defaultMessageStore; + if (messageStore instanceof AbstractPluginMessageStore) { + defaultMessageStore = (DefaultMessageStore) ((AbstractPluginMessageStore) messageStore).getNext(); + } else { + defaultMessageStore = (DefaultMessageStore) messageStore; + } + RocksDBMessageStore rocksDBMessageStore = defaultMessageStore.getRocksDBMessageStore(); + CheckRocksdbCqWriteResult result = new CheckRocksdbCqWriteResult(); + + if (defaultMessageStore.getMessageStoreConfig().getStoreType().equals(StoreType.DEFAULT_ROCKSDB.getStoreType())) { + result.setCheckResult("storeType is DEFAULT_ROCKSDB, no need check"); + result.setCheckStatus(CheckRocksdbCqWriteResult.CheckStatus.CHECK_OK.getValue()); + return result; + } + + if (!defaultMessageStore.getMessageStoreConfig().isRocksdbCQDoubleWriteEnable()) { + result.setCheckResult("rocksdbCQWriteEnable is false, checkRocksdbCqWriteProgressCommand is invalid"); + result.setCheckStatus(CheckRocksdbCqWriteResult.CheckStatus.CHECK_NOT_OK.getValue()); + return result; + } + + ConcurrentMap<String, ConcurrentMap<Integer, ConsumeQueueInterface>> cqTable = defaultMessageStore.getConsumeQueueTable(); + StringBuilder diffResult = new StringBuilder(); + try { + if (StringUtils.isNotBlank(requestTopic)) { + boolean checkResult = processConsumeQueuesForTopic(cqTable.get(requestTopic), requestTopic, rocksDBMessageStore, diffResult, true, requestHeader.getCheckStoreTime()); + result.setCheckResult(diffResult.toString()); + result.setCheckStatus(checkResult ? CheckRocksdbCqWriteResult.CheckStatus.CHECK_OK.getValue() : CheckRocksdbCqWriteResult.CheckStatus.CHECK_NOT_OK.getValue()); + return result; + } + int successNum = 0; + int checkSize = 0; + for (Map.Entry<String, ConcurrentMap<Integer, ConsumeQueueInterface>> topicEntry : cqTable.entrySet()) { + boolean checkResult = processConsumeQueuesForTopic(topicEntry.getValue(), topicEntry.getKey(), rocksDBMessageStore, diffResult, false, requestHeader.getCheckStoreTime()); + successNum += checkResult ? 1 : 0; + checkSize++; + } + // check all topic finish, all topic is ready, checkSize: 100, currentQueueNum: 110 -> ready (The currentQueueNum means when we do checking, new topics are added.) + // check all topic finish, success/all : 89/100, currentQueueNum: 110 -> not ready + boolean checkReady = successNum == checkSize; + String checkResultString = checkReady ? String.format("all topic is ready, checkSize: %s, currentQueueNum: %s", checkSize, cqTable.size()) : + String.format("success/all : %s/%s, currentQueueNum: %s", successNum, checkSize, cqTable.size()); + diffResult.append("check all topic finish, ").append(checkResultString); + result.setCheckResult(diffResult.toString()); + result.setCheckStatus(checkReady ? CheckRocksdbCqWriteResult.CheckStatus.CHECK_OK.getValue() : CheckRocksdbCqWriteResult.CheckStatus.CHECK_NOT_OK.getValue()); + } catch (Exception e) { + LOGGER.error("CheckRocksdbCqWriteProgressCommand error", e); + result.setCheckResult(e.getMessage() + Arrays.toString(e.getStackTrace())); + result.setCheckStatus(CheckRocksdbCqWriteResult.CheckStatus.CHECK_ERROR.getValue()); + } + return result; + } + + private boolean processConsumeQueuesForTopic(ConcurrentMap<Integer, ConsumeQueueInterface> queueMap, String topic, RocksDBMessageStore rocksDBMessageStore, StringBuilder diffResult, boolean printDetail, long checkpointByStoreTime) { + boolean processResult = true; + for (Map.Entry<Integer, ConsumeQueueInterface> queueEntry : queueMap.entrySet()) { + Integer queueId = queueEntry.getKey(); + ConsumeQueueInterface jsonCq = queueEntry.getValue(); + ConsumeQueueInterface kvCq = rocksDBMessageStore.getConsumeQueue(topic, queueId); + if (printDetail) { + String format = String.format("[topic: %s, queue: %s] \n kvEarliest : %s | kvLatest : %s \n fileEarliest: %s | fileEarliest: %s ", + topic, queueId, kvCq.getEarliestUnit(), kvCq.getLatestUnit(), jsonCq.getEarliestUnit(), jsonCq.getLatestUnit()); + diffResult.append(format).append("\n"); + } + + long minOffsetByTime = 0L; + try { + minOffsetByTime = rocksDBMessageStore.getConsumeQueueStore().getOffsetInQueueByTime(topic, queueId, checkpointByStoreTime, BoundaryType.UPPER); + } catch (Exception e) { + // ignore + } + long minOffsetInQueue = kvCq.getMinOffsetInQueue(); + long checkFrom = Math.max(minOffsetInQueue, minOffsetByTime); + long checkTo = jsonCq.getMaxOffsetInQueue() - 1; + /* + checkTo(maxOffsetInQueue - 1) + v + fileCq +------------------------------------------------------+ + kvCq +----------------------------------------------+ + ^ ^ + minOffsetInQueue minOffsetByTime + ^ + checkFrom = max(minOffsetInQueue, minOffsetByTime) + */ + // The latest message is earlier than the check time + Pair<CqUnit, Long> fileLatestCq = jsonCq.getCqUnitAndStoreTime(checkTo); + if (fileLatestCq != null) { + if (fileLatestCq.getObject2() < checkpointByStoreTime) { + continue; + } + } + for (long i = checkFrom; i <= checkTo; i++) { + Pair<CqUnit, Long> fileCqUnit = jsonCq.getCqUnitAndStoreTime(i); + Pair<CqUnit, Long> kvCqUnit = kvCq.getCqUnitAndStoreTime(i); + if (fileCqUnit == null || kvCqUnit == null || !checkCqUnitEqual(kvCqUnit.getObject1(), fileCqUnit.getObject1())) { + LOGGER.error(String.format("[topic: %s, queue: %s, offset: %s] \n file : %s \n kv : %s \n", + topic, queueId, i, kvCqUnit != null ? kvCqUnit.getObject1() : "null", fileCqUnit != null ? fileCqUnit.getObject1() : "null")); + processResult = false; + break; + } + } + } + return processResult; + } + private boolean checkCqUnitEqual(CqUnit cqUnit1, CqUnit cqUnit2) { if (cqUnit1.getQueueOffset() != cqUnit2.getQueueOffset()) { return false; diff --git a/broker/src/test/java/org/apache/rocketmq/broker/offset/RocksdbTransferOffsetAndCqTest.java b/broker/src/test/java/org/apache/rocketmq/broker/offset/RocksdbTransferOffsetAndCqTest.java index 64c505eb77..4b320eb53f 100644 --- a/broker/src/test/java/org/apache/rocketmq/broker/offset/RocksdbTransferOffsetAndCqTest.java +++ b/broker/src/test/java/org/apache/rocketmq/broker/offset/RocksdbTransferOffsetAndCqTest.java @@ -76,7 +76,6 @@ public class RocksdbTransferOffsetAndCqTest { brokerConfig.setConsumerOffsetUpdateVersionStep(10); MessageStoreConfig messageStoreConfig = new MessageStoreConfig(); messageStoreConfig.setStorePathRootDir(basePath); - messageStoreConfig.setTransferOffsetJsonToRocksdb(true); messageStoreConfig.setRocksdbCQDoubleWriteEnable(true); Mockito.lenient().when(brokerController.getBrokerConfig()).thenReturn(brokerConfig); Mockito.lenient().when(brokerController.getMessageStoreConfig()).thenReturn(messageStoreConfig); diff --git a/broker/src/test/java/org/apache/rocketmq/broker/subscription/RocksdbGroupConfigTransferTest.java b/broker/src/test/java/org/apache/rocketmq/broker/subscription/RocksdbGroupConfigTransferTest.java index 26017af8a6..c75fe0d6a0 100644 --- a/broker/src/test/java/org/apache/rocketmq/broker/subscription/RocksdbGroupConfigTransferTest.java +++ b/broker/src/test/java/org/apache/rocketmq/broker/subscription/RocksdbGroupConfigTransferTest.java @@ -68,7 +68,6 @@ public class RocksdbGroupConfigTransferTest { Mockito.lenient().when(brokerController.getBrokerConfig()).thenReturn(brokerConfig); MessageStoreConfig messageStoreConfig = new MessageStoreConfig(); messageStoreConfig.setStorePathRootDir(basePath); - messageStoreConfig.setTransferMetadataJsonToRocksdb(true); Mockito.lenient().when(brokerController.getMessageStoreConfig()).thenReturn(messageStoreConfig); Mockito.lenient().when(brokerController.getMessageStore()).thenReturn(defaultMessageStore); when(defaultMessageStore.getStateMachineVersion()).thenReturn(0L); diff --git a/broker/src/test/java/org/apache/rocketmq/broker/topic/RocksdbTopicConfigManagerTest.java b/broker/src/test/java/org/apache/rocketmq/broker/topic/RocksdbTopicConfigManagerTest.java index 080e1dd5a3..fa3ef95f55 100644 --- a/broker/src/test/java/org/apache/rocketmq/broker/topic/RocksdbTopicConfigManagerTest.java +++ b/broker/src/test/java/org/apache/rocketmq/broker/topic/RocksdbTopicConfigManagerTest.java @@ -72,7 +72,6 @@ public class RocksdbTopicConfigManagerTest { when(brokerController.getBrokerConfig()).thenReturn(brokerConfig); MessageStoreConfig messageStoreConfig = new MessageStoreConfig(); messageStoreConfig.setStorePathRootDir(basePath); - messageStoreConfig.setTransferMetadataJsonToRocksdb(true); when(brokerController.getMessageStoreConfig()).thenReturn(messageStoreConfig); Mockito.lenient().when(brokerController.getMessageStore()).thenReturn(defaultMessageStore); Mockito.lenient().when(defaultMessageStore.getStateMachineVersion()).thenReturn(0L); diff --git a/broker/src/test/java/org/apache/rocketmq/broker/topic/RocksdbTopicConfigTransferTest.java b/broker/src/test/java/org/apache/rocketmq/broker/topic/RocksdbTopicConfigTransferTest.java index fb345548e4..e925ed4bd8 100644 --- a/broker/src/test/java/org/apache/rocketmq/broker/topic/RocksdbTopicConfigTransferTest.java +++ b/broker/src/test/java/org/apache/rocketmq/broker/topic/RocksdbTopicConfigTransferTest.java @@ -69,7 +69,6 @@ public class RocksdbTopicConfigTransferTest { when(brokerController.getBrokerConfig()).thenReturn(brokerConfig); MessageStoreConfig messageStoreConfig = new MessageStoreConfig(); messageStoreConfig.setStorePathRootDir(basePath); - messageStoreConfig.setTransferMetadataJsonToRocksdb(true); Mockito.lenient().when(brokerController.getMessageStoreConfig()).thenReturn(messageStoreConfig); when(brokerController.getMessageStore()).thenReturn(defaultMessageStore); when(defaultMessageStore.getStateMachineVersion()).thenReturn(0L); diff --git a/client/src/main/java/org/apache/rocketmq/client/impl/MQClientAPIImpl.java b/client/src/main/java/org/apache/rocketmq/client/impl/MQClientAPIImpl.java index 716d081ef4..554b1efa52 100644 --- a/client/src/main/java/org/apache/rocketmq/client/impl/MQClientAPIImpl.java +++ b/client/src/main/java/org/apache/rocketmq/client/impl/MQClientAPIImpl.java @@ -56,6 +56,7 @@ import org.apache.rocketmq.client.producer.SendResult; import org.apache.rocketmq.client.producer.SendStatus; import org.apache.rocketmq.client.rpchook.NamespaceRpcHook; import org.apache.rocketmq.common.BoundaryType; +import org.apache.rocketmq.common.CheckRocksdbCqWriteResult; import org.apache.rocketmq.common.MQVersion; import org.apache.rocketmq.common.MixAll; import org.apache.rocketmq.common.Pair; @@ -113,7 +114,6 @@ import org.apache.rocketmq.remoting.protocol.body.BrokerMemberGroup; import org.apache.rocketmq.remoting.protocol.body.BrokerReplicasInfo; import org.apache.rocketmq.remoting.protocol.body.BrokerStatsData; import org.apache.rocketmq.remoting.protocol.body.CheckClientRequestBody; -import org.apache.rocketmq.remoting.protocol.body.CheckRocksdbCqWriteProgressResponseBody; import org.apache.rocketmq.remoting.protocol.body.ClusterAclVersionInfo; import org.apache.rocketmq.remoting.protocol.body.ClusterInfo; import org.apache.rocketmq.remoting.protocol.body.ConsumeMessageDirectlyResult; @@ -3019,15 +3019,16 @@ public class MQClientAPIImpl implements NameServerUpdateCallback, StartAndShutdo throw new MQClientException(response.getCode(), response.getRemark()); } - public CheckRocksdbCqWriteProgressResponseBody checkRocksdbCqWriteProgress(final String brokerAddr, final String topic, final long timeoutMillis) throws InterruptedException, + public CheckRocksdbCqWriteResult checkRocksdbCqWriteProgress(final String brokerAddr, final String topic, final long checkStoreTime, final long timeoutMillis) throws InterruptedException, RemotingTimeoutException, RemotingSendRequestException, RemotingConnectException, MQClientException { CheckRocksdbCqWriteProgressRequestHeader header = new CheckRocksdbCqWriteProgressRequestHeader(); header.setTopic(topic); + header.setCheckStoreTime(checkStoreTime); RemotingCommand request = RemotingCommand.createRequestCommand(RequestCode.CHECK_ROCKSDB_CQ_WRITE_PROGRESS, header); RemotingCommand response = this.remotingClient.invokeSync(brokerAddr, request, timeoutMillis); assert response != null; if (ResponseCode.SUCCESS == response.getCode()) { - return CheckRocksdbCqWriteProgressResponseBody.decode(response.getBody(), CheckRocksdbCqWriteProgressResponseBody.class); + return JSON.parseObject(response.getBody(), CheckRocksdbCqWriteResult.class); } throw new MQClientException(response.getCode(), response.getRemark()); } diff --git a/remoting/src/main/java/org/apache/rocketmq/remoting/protocol/body/CheckRocksdbCqWriteProgressResponseBody.java b/common/src/main/java/org/apache/rocketmq/common/CheckRocksdbCqWriteResult.java similarity index 52% rename from remoting/src/main/java/org/apache/rocketmq/remoting/protocol/body/CheckRocksdbCqWriteProgressResponseBody.java rename to common/src/main/java/org/apache/rocketmq/common/CheckRocksdbCqWriteResult.java index 76719ac1a2..fc67df86c2 100644 --- a/remoting/src/main/java/org/apache/rocketmq/remoting/protocol/body/CheckRocksdbCqWriteProgressResponseBody.java +++ b/common/src/main/java/org/apache/rocketmq/common/CheckRocksdbCqWriteResult.java @@ -15,21 +15,43 @@ * limitations under the License. */ -package org.apache.rocketmq.remoting.protocol.body; +package org.apache.rocketmq.common; -import org.apache.rocketmq.remoting.protocol.RemotingSerializable; +public class CheckRocksdbCqWriteResult { + String checkResult; -public class CheckRocksdbCqWriteProgressResponseBody extends RemotingSerializable { + int checkStatus; - String diffResult; + public enum CheckStatus { + CHECK_OK(0), + CHECK_NOT_OK(1), + CHECK_IN_PROGRESS(2), + CHECK_ERROR(3); - public String getDiffResult() { - return diffResult; + private int value; + + CheckStatus(int value) { + this.value = value; + } + + public int getValue() { + return value; + } + } + + public String getCheckResult() { + return checkResult; } - public void setDiffResult(String diffResult) { - this.diffResult = diffResult; + public void setCheckResult(String checkResult) { + this.checkResult = checkResult; } + public int getCheckStatus() { + return checkStatus; + } + public void setCheckStatus(int checkStatus) { + this.checkStatus = checkStatus; + } } diff --git a/common/src/main/java/org/apache/rocketmq/common/config/AbstractRocksDBStorage.java b/common/src/main/java/org/apache/rocketmq/common/config/AbstractRocksDBStorage.java index 42ddbdc728..d434cce745 100644 --- a/common/src/main/java/org/apache/rocketmq/common/config/AbstractRocksDBStorage.java +++ b/common/src/main/java/org/apache/rocketmq/common/config/AbstractRocksDBStorage.java @@ -86,6 +86,7 @@ public abstract class AbstractRocksDBStorage { protected final List<ColumnFamilyHandle> cfHandles = new ArrayList<>(); protected volatile boolean loaded; + protected CompressionType compressionType = CompressionType.LZ4_COMPRESSION; private volatile boolean closed; private final Semaphore reloadPermit = new Semaphore(1); @@ -156,7 +157,7 @@ public abstract class AbstractRocksDBStorage { protected void initCompactionOptions() { this.compactionOptions = new CompactionOptions(); - this.compactionOptions.setCompression(CompressionType.LZ4_COMPRESSION); + this.compactionOptions.setCompression(compressionType); this.compactionOptions.setMaxSubcompactions(4); this.compactionOptions.setOutputFileSizeLimit(4 * 1024 * 1024 * 1024L); } diff --git a/common/src/main/java/org/apache/rocketmq/common/config/ConfigRocksDBStorage.java b/common/src/main/java/org/apache/rocketmq/common/config/ConfigRocksDBStorage.java index 36da6834ff..3b924a6a0d 100644 --- a/common/src/main/java/org/apache/rocketmq/common/config/ConfigRocksDBStorage.java +++ b/common/src/main/java/org/apache/rocketmq/common/config/ConfigRocksDBStorage.java @@ -25,6 +25,7 @@ import org.apache.rocketmq.common.UtilAll; import org.rocksdb.ColumnFamilyDescriptor; import org.rocksdb.ColumnFamilyHandle; import org.rocksdb.ColumnFamilyOptions; +import org.rocksdb.CompressionType; import org.rocksdb.ReadOptions; import org.rocksdb.RocksDB; import org.rocksdb.RocksDBException; @@ -37,12 +38,17 @@ public class ConfigRocksDBStorage extends AbstractRocksDBStorage { protected ColumnFamilyHandle kvDataVersionFamilyHandle; protected ColumnFamilyHandle forbiddenFamilyHandle; - public static final byte[] KV_DATA_VERSION_KEY = "kvDataVersionKey".getBytes(StandardCharsets.UTF_8); + + public ConfigRocksDBStorage(final String dbPath) { - super(dbPath); - this.readOnly = false; + this(dbPath, false); + } + + public ConfigRocksDBStorage(final String dbPath, CompressionType compressionType) { + this(dbPath, false); + this.compressionType = compressionType; } public ConfigRocksDBStorage(final String dbPath, boolean readOnly) { diff --git a/remoting/src/main/java/org/apache/rocketmq/remoting/protocol/header/CheckRocksdbCqWriteProgressRequestHeader.java b/remoting/src/main/java/org/apache/rocketmq/remoting/protocol/header/CheckRocksdbCqWriteProgressRequestHeader.java index fee158b497..f679077fdd 100644 --- a/remoting/src/main/java/org/apache/rocketmq/remoting/protocol/header/CheckRocksdbCqWriteProgressRequestHeader.java +++ b/remoting/src/main/java/org/apache/rocketmq/remoting/protocol/header/CheckRocksdbCqWriteProgressRequestHeader.java @@ -32,6 +32,8 @@ public class CheckRocksdbCqWriteProgressRequestHeader implements CommandCustomHe @RocketMQResource(ResourceType.TOPIC) private String topic; + private long checkStoreTime; + @Override public void checkFields() throws RemotingCommandException { @@ -44,4 +46,13 @@ public class CheckRocksdbCqWriteProgressRequestHeader implements CommandCustomHe public void setTopic(String topic) { this.topic = topic; } + + public long getCheckStoreTime() { + return checkStoreTime; + } + + public void setCheckStoreTime(long checkStoreTime) { + this.checkStoreTime = checkStoreTime; + } + } diff --git a/store/src/main/java/org/apache/rocketmq/store/RocksDBMessageStore.java b/store/src/main/java/org/apache/rocketmq/store/RocksDBMessageStore.java index 0a7119cab1..321689ac8f 100644 --- a/store/src/main/java/org/apache/rocketmq/store/RocksDBMessageStore.java +++ b/store/src/main/java/org/apache/rocketmq/store/RocksDBMessageStore.java @@ -173,6 +173,10 @@ public class RocksDBMessageStore extends DefaultMessageStore { class CommitLogDispatcherBuildRocksdbConsumeQueue implements CommitLogDispatcher { @Override public void dispatch(DispatchRequest request) throws RocksDBException { + boolean enable = getMessageStoreConfig().isRocksdbCQDoubleWriteEnable(); + if (!enable) { + return; + } final int tranType = MessageSysFlag.getTransactionValue(request.getSysFlag()); switch (tranType) { case MessageSysFlag.TRANSACTION_NOT_TYPE: diff --git a/store/src/main/java/org/apache/rocketmq/store/config/MessageStoreConfig.java b/store/src/main/java/org/apache/rocketmq/store/config/MessageStoreConfig.java index e31c03dd22..fe090e3fa2 100644 --- a/store/src/main/java/org/apache/rocketmq/store/config/MessageStoreConfig.java +++ b/store/src/main/java/org/apache/rocketmq/store/config/MessageStoreConfig.java @@ -22,6 +22,7 @@ import org.apache.rocketmq.common.annotation.ImportantField; import org.apache.rocketmq.store.ConsumeQueue; import org.apache.rocketmq.store.StoreType; import org.apache.rocketmq.store.queue.BatchConsumeQueue; +import org.rocksdb.CompressionType; public class MessageStoreConfig { @@ -106,8 +107,6 @@ public class MessageStoreConfig { @ImportantField private String storeType = StoreType.DEFAULT.getStoreType(); - private boolean transferMetadataJsonToRocksdb = false; - // ConsumeQueue file size,default is 30W private int mappedFileSizeConsumeQueue = 300000 * ConsumeQueue.CQ_STORE_UNIT_SIZE; // enable consume queue ext @@ -424,8 +423,6 @@ public class MessageStoreConfig { private boolean putConsumeQueueDataByFileChannel = true; - private boolean transferOffsetJsonToRocksdb = false; - private boolean rocksdbCQDoubleWriteEnable = false; /** @@ -443,7 +440,17 @@ public class MessageStoreConfig { * * LZ4 is the recommended one. */ - private String bottomMostCompressionTypeForConsumeQueueStore = "zstd"; + private String bottomMostCompressionTypeForConsumeQueueStore = CompressionType.ZSTD_COMPRESSION.getLibraryName(); + + private String rocksdbCompressionType = CompressionType.LZ4_COMPRESSION.getLibraryName(); + + public String getRocksdbCompressionType() { + return rocksdbCompressionType; + } + + public void setRocksdbCompressionType(String compressionType) { + this.rocksdbCompressionType = compressionType; + } /** * Spin number in the retreat strategy of spin lock @@ -464,13 +471,6 @@ public class MessageStoreConfig { this.rocksdbCQDoubleWriteEnable = rocksdbWriteEnable; } - public boolean isTransferOffsetJsonToRocksdb() { - return transferOffsetJsonToRocksdb; - } - - public void setTransferOffsetJsonToRocksdb(boolean transferOffsetJsonToRocksdb) { - this.transferOffsetJsonToRocksdb = transferOffsetJsonToRocksdb; - } public boolean isEnabledAppendPropCRC() { return enabledAppendPropCRC; @@ -1894,14 +1894,6 @@ public class MessageStoreConfig { this.putConsumeQueueDataByFileChannel = putConsumeQueueDataByFileChannel; } - public boolean isTransferMetadataJsonToRocksdb() { - return transferMetadataJsonToRocksdb; - } - - public void setTransferMetadataJsonToRocksdb(boolean transferMetadataJsonToRocksdb) { - this.transferMetadataJsonToRocksdb = transferMetadataJsonToRocksdb; - } - public String getBottomMostCompressionTypeForConsumeQueueStore() { return bottomMostCompressionTypeForConsumeQueueStore; } diff --git a/store/src/main/java/org/apache/rocketmq/store/rocksdb/RocksDBOptionsFactory.java b/store/src/main/java/org/apache/rocketmq/store/rocksdb/RocksDBOptionsFactory.java index d373ba6249..66f5cbd095 100644 --- a/store/src/main/java/org/apache/rocketmq/store/rocksdb/RocksDBOptionsFactory.java +++ b/store/src/main/java/org/apache/rocketmq/store/rocksdb/RocksDBOptionsFactory.java @@ -67,13 +67,16 @@ public class RocksDBOptionsFactory { setCompressionSizePercent(-1); String bottomMostCompressionTypeOpt = messageStore.getMessageStoreConfig() .getBottomMostCompressionTypeForConsumeQueueStore(); + String compressionTypeOpt = messageStore.getMessageStoreConfig() + .getRocksdbCompressionType(); CompressionType bottomMostCompressionType = CompressionType.getCompressionType(bottomMostCompressionTypeOpt); + CompressionType compressionType = CompressionType.getCompressionType(compressionTypeOpt); return columnFamilyOptions.setMaxWriteBufferNumber(4). setWriteBufferSize(128 * SizeUnit.MB). setMinWriteBufferNumberToMerge(1). setTableFormatConfig(blockBasedTableConfig). setMemTableConfig(new SkipListMemTableConfig()). - setCompressionType(CompressionType.LZ4_COMPRESSION). + setCompressionType(compressionType). setBottommostCompressionType(bottomMostCompressionType). setNumLevels(7). setCompactionStyle(CompactionStyle.UNIVERSAL). diff --git a/tools/src/main/java/org/apache/rocketmq/tools/admin/DefaultMQAdminExt.java b/tools/src/main/java/org/apache/rocketmq/tools/admin/DefaultMQAdminExt.java index 3686bf2644..c5ecdefb52 100644 --- a/tools/src/main/java/org/apache/rocketmq/tools/admin/DefaultMQAdminExt.java +++ b/tools/src/main/java/org/apache/rocketmq/tools/admin/DefaultMQAdminExt.java @@ -26,6 +26,7 @@ import org.apache.rocketmq.client.QueryResult; import org.apache.rocketmq.client.exception.MQBrokerException; import org.apache.rocketmq.client.exception.MQClientException; import org.apache.rocketmq.common.BoundaryType; +import org.apache.rocketmq.common.CheckRocksdbCqWriteResult; import org.apache.rocketmq.common.Pair; import org.apache.rocketmq.common.PlainAccessConfig; import org.apache.rocketmq.common.TopicConfig; @@ -52,7 +53,6 @@ import org.apache.rocketmq.remoting.protocol.body.ConsumeMessageDirectlyResult; import org.apache.rocketmq.remoting.protocol.body.ConsumeStatsList; import org.apache.rocketmq.remoting.protocol.body.ConsumerConnection; import org.apache.rocketmq.remoting.protocol.body.ConsumerRunningInfo; -import org.apache.rocketmq.remoting.protocol.body.CheckRocksdbCqWriteProgressResponseBody; import org.apache.rocketmq.remoting.protocol.body.EpochEntryCache; import org.apache.rocketmq.remoting.protocol.body.GroupList; import org.apache.rocketmq.remoting.protocol.body.HARuntimeInfo; @@ -773,9 +773,9 @@ public class DefaultMQAdminExt extends ClientConfig implements MQAdminExt { } @Override - public CheckRocksdbCqWriteProgressResponseBody checkRocksdbCqWriteProgress(String brokerAddr, String topic) + public CheckRocksdbCqWriteResult checkRocksdbCqWriteProgress(String brokerAddr, String topic, long checkStoreTime) throws InterruptedException, RemotingTimeoutException, RemotingSendRequestException, RemotingConnectException, MQClientException { - return this.defaultMQAdminExtImpl.checkRocksdbCqWriteProgress(brokerAddr, topic); + return this.defaultMQAdminExtImpl.checkRocksdbCqWriteProgress(brokerAddr, topic, checkStoreTime); } @Override diff --git a/tools/src/main/java/org/apache/rocketmq/tools/admin/DefaultMQAdminExtImpl.java b/tools/src/main/java/org/apache/rocketmq/tools/admin/DefaultMQAdminExtImpl.java index 883dcbe41d..17f14f23af 100644 --- a/tools/src/main/java/org/apache/rocketmq/tools/admin/DefaultMQAdminExtImpl.java +++ b/tools/src/main/java/org/apache/rocketmq/tools/admin/DefaultMQAdminExtImpl.java @@ -46,6 +46,7 @@ import org.apache.rocketmq.client.exception.MQClientException; import org.apache.rocketmq.client.impl.MQClientManager; import org.apache.rocketmq.client.impl.factory.MQClientInstance; import org.apache.rocketmq.common.BoundaryType; +import org.apache.rocketmq.common.CheckRocksdbCqWriteResult; import org.apache.rocketmq.common.KeyBuilder; import org.apache.rocketmq.common.MixAll; import org.apache.rocketmq.common.Pair; @@ -90,7 +91,6 @@ import org.apache.rocketmq.remoting.protocol.body.ConsumeMessageDirectlyResult; import org.apache.rocketmq.remoting.protocol.body.ConsumeStatsList; import org.apache.rocketmq.remoting.protocol.body.ConsumerConnection; import org.apache.rocketmq.remoting.protocol.body.ConsumerRunningInfo; -import org.apache.rocketmq.remoting.protocol.body.CheckRocksdbCqWriteProgressResponseBody; import org.apache.rocketmq.remoting.protocol.body.EpochEntryCache; import org.apache.rocketmq.remoting.protocol.body.GroupList; import org.apache.rocketmq.remoting.protocol.body.HARuntimeInfo; @@ -1819,9 +1819,9 @@ public class DefaultMQAdminExtImpl implements MQAdminExt, MQAdminExtInner { } @Override - public CheckRocksdbCqWriteProgressResponseBody checkRocksdbCqWriteProgress(String brokerAddr, String topic) + public CheckRocksdbCqWriteResult checkRocksdbCqWriteProgress(String brokerAddr, String topic, long checkStoreTime) throws InterruptedException, RemotingTimeoutException, RemotingSendRequestException, RemotingConnectException, MQClientException { - return this.mqClientInstance.getMQClientAPIImpl().checkRocksdbCqWriteProgress(brokerAddr, topic, timeoutMillis); + return this.mqClientInstance.getMQClientAPIImpl().checkRocksdbCqWriteProgress(brokerAddr, topic, checkStoreTime, timeoutMillis); } @Override diff --git a/tools/src/main/java/org/apache/rocketmq/tools/admin/MQAdminExt.java b/tools/src/main/java/org/apache/rocketmq/tools/admin/MQAdminExt.java index 09204ab7be..aea43376ea 100644 --- a/tools/src/main/java/org/apache/rocketmq/tools/admin/MQAdminExt.java +++ b/tools/src/main/java/org/apache/rocketmq/tools/admin/MQAdminExt.java @@ -24,6 +24,7 @@ import java.util.Set; import org.apache.rocketmq.client.MQAdmin; import org.apache.rocketmq.client.exception.MQBrokerException; import org.apache.rocketmq.client.exception.MQClientException; +import org.apache.rocketmq.common.CheckRocksdbCqWriteResult; import org.apache.rocketmq.common.Pair; import org.apache.rocketmq.common.PlainAccessConfig; import org.apache.rocketmq.common.TopicConfig; @@ -48,7 +49,6 @@ import org.apache.rocketmq.remoting.protocol.body.ConsumeMessageDirectlyResult; import org.apache.rocketmq.remoting.protocol.body.ConsumeStatsList; import org.apache.rocketmq.remoting.protocol.body.ConsumerConnection; import org.apache.rocketmq.remoting.protocol.body.ConsumerRunningInfo; -import org.apache.rocketmq.remoting.protocol.body.CheckRocksdbCqWriteProgressResponseBody; import org.apache.rocketmq.remoting.protocol.body.EpochEntryCache; import org.apache.rocketmq.remoting.protocol.body.GroupList; import org.apache.rocketmq.remoting.protocol.body.HARuntimeInfo; @@ -149,7 +149,8 @@ public interface MQAdminExt extends MQAdmin { final String consumerGroup) throws RemotingException, MQClientException, InterruptedException, MQBrokerException; - CheckRocksdbCqWriteProgressResponseBody checkRocksdbCqWriteProgress(String brokerAddr, String topic) throws InterruptedException, RemotingTimeoutException, RemotingSendRequestException, RemotingConnectException, MQClientException; + CheckRocksdbCqWriteResult checkRocksdbCqWriteProgress(String brokerAddr, String topic, long checkStoreTime) + throws InterruptedException, RemotingTimeoutException, RemotingSendRequestException, RemotingConnectException, MQClientException; ConsumeStats examineConsumeStats(final String consumerGroup, final String topic) throws RemotingException, MQClientException, diff --git a/tools/src/main/java/org/apache/rocketmq/tools/command/queue/CheckRocksdbCqWriteProgressCommand.java b/tools/src/main/java/org/apache/rocketmq/tools/command/queue/CheckRocksdbCqWriteProgressCommand.java index d18a24ee1d..a0fc9fce1f 100644 --- a/tools/src/main/java/org/apache/rocketmq/tools/command/queue/CheckRocksdbCqWriteProgressCommand.java +++ b/tools/src/main/java/org/apache/rocketmq/tools/command/queue/CheckRocksdbCqWriteProgressCommand.java @@ -19,12 +19,13 @@ package org.apache.rocketmq.tools.command.queue; import java.util.Map; import java.util.Set; +import java.util.concurrent.TimeUnit; import org.apache.commons.cli.CommandLine; import org.apache.commons.cli.Option; import org.apache.commons.cli.Options; import org.apache.commons.lang3.StringUtils; +import org.apache.rocketmq.common.CheckRocksdbCqWriteResult; import org.apache.rocketmq.remoting.RPCHook; -import org.apache.rocketmq.remoting.protocol.body.CheckRocksdbCqWriteProgressResponseBody; import org.apache.rocketmq.remoting.protocol.body.ClusterInfo; import org.apache.rocketmq.remoting.protocol.route.BrokerData; import org.apache.rocketmq.tools.admin.DefaultMQAdminExt; @@ -53,8 +54,11 @@ public class CheckRocksdbCqWriteProgressCommand implements SubCommand { options.addOption(opt); opt = new Option("t", "topic", true, "topic name"); - opt.setRequired(false); options.addOption(opt); + + opt = new Option("cf", "checkFrom", true, "check from time"); + options.addOption(opt); + return options; } @@ -66,6 +70,10 @@ public class CheckRocksdbCqWriteProgressCommand implements SubCommand { defaultMQAdminExt.setNamesrvAddr(StringUtils.trim(commandLine.getOptionValue('n'))); String clusterName = commandLine.hasOption('c') ? commandLine.getOptionValue('c').trim() : ""; String topic = commandLine.hasOption('t') ? commandLine.getOptionValue('t').trim() : ""; + // The default check is 30 days + long checkStoreTime = commandLine.hasOption("cf") + ? Long.parseLong(commandLine.getOptionValue("cf").trim()) + : System.currentTimeMillis() - TimeUnit.DAYS.toMillis(30L); try { defaultMQAdminExt.start(); @@ -80,14 +88,13 @@ public class CheckRocksdbCqWriteProgressCommand implements SubCommand { String brokerName = entry.getKey(); BrokerData brokerData = entry.getValue(); String brokerAddr = brokerData.getBrokerAddrs().get(0L); - CheckRocksdbCqWriteProgressResponseBody body = defaultMQAdminExt.checkRocksdbCqWriteProgress(brokerAddr, topic); - if (StringUtils.isNotBlank(topic)) { - System.out.print(body.getDiffResult()); + CheckRocksdbCqWriteResult result = defaultMQAdminExt.checkRocksdbCqWriteProgress(brokerAddr, topic, checkStoreTime); + if (result.getCheckStatus() == CheckRocksdbCqWriteResult.CheckStatus.CHECK_ERROR.getValue()) { + System.out.print(brokerName + " check error, please check log... errInfo: " + result.getCheckResult()); } else { - System.out.print(brokerName + " | " + brokerAddr + " | \n" + body.getDiffResult()); + System.out.print(brokerName + " check doing, please wait and get the result from log... \n"); } } - } catch (Exception e) { throw new RuntimeException(this.getClass().getSimpleName() + " command failed", e); } finally {