lollipopjin commented on code in PR #8823:
URL: https://github.com/apache/rocketmq/pull/8823#discussion_r1814167555


##########
broker/src/main/java/org/apache/rocketmq/broker/processor/AdminBrokerProcessor.java:
##########
@@ -3418,6 +3375,103 @@ private boolean validateBlackListConfigExist(Properties 
properties) {
         return false;
     }
 
+    private CheckRocksdbCqWriteResult 
doCheckRocksdbCqWriteProgress(ChannelHandlerContext ctx, RemotingCommand 
request) throws RemotingCommandException {
+        CheckRocksdbCqWriteProgressRequestHeader requestHeader = 
request.decodeCommandCustomHeader(CheckRocksdbCqWriteProgressRequestHeader.class);
+        String requestTopic = requestHeader.getTopic();
+        MessageStore messageStore = brokerController.getMessageStore();
+        DefaultMessageStore defaultMessageStore;
+        if (messageStore instanceof AbstractPluginMessageStore) {
+            defaultMessageStore = (DefaultMessageStore) 
((AbstractPluginMessageStore) messageStore).getNext();
+        } else {
+            defaultMessageStore = (DefaultMessageStore) messageStore;
+        }
+        RocksDBMessageStore rocksDBMessageStore = 
defaultMessageStore.getRocksDBMessageStore();
+        CheckRocksdbCqWriteResult result = new CheckRocksdbCqWriteResult();
+
+        if 
(defaultMessageStore.getMessageStoreConfig().getStoreType().equals(StoreType.DEFAULT_ROCKSDB.getStoreType()))
 {
+            result.setCheckResult("storeType is DEFAULT_ROCKSDB, no need 
check");
+            
result.setCheckStatus(CheckRocksdbCqWriteResult.CheckStatus.CHECK_OK.getValue());
+            return result;
+        }
+
+        if 
(!defaultMessageStore.getMessageStoreConfig().isRocksdbCQDoubleWriteEnable()) {
+            result.setCheckResult("rocksdbCQWriteEnable is false, 
checkRocksdbCqWriteProgressCommand is invalid");
+            
result.setCheckStatus(CheckRocksdbCqWriteResult.CheckStatus.CHECK_NOT_OK.getValue());
+            return result;
+        }
+
+        ConcurrentMap<String, ConcurrentMap<Integer, ConsumeQueueInterface>> 
cqTable = defaultMessageStore.getConsumeQueueTable();
+        StringBuilder diffResult = new StringBuilder();
+        try {
+            if (StringUtils.isNotBlank(requestTopic)) {
+                boolean checkResult = 
processConsumeQueuesForTopic(cqTable.get(requestTopic), requestTopic, 
rocksDBMessageStore, diffResult, false, requestHeader.getCheckStoreTime());
+                result.setCheckResult(diffResult.toString());
+                result.setCheckStatus(checkResult ? 
CheckRocksdbCqWriteResult.CheckStatus.CHECK_OK.getValue() : 
CheckRocksdbCqWriteResult.CheckStatus.CHECK_NOT_OK.getValue());
+                return result;
+            }
+            boolean checkResult = true;
+            for (Map.Entry<String, ConcurrentMap<Integer, 
ConsumeQueueInterface>> topicEntry : cqTable.entrySet()) {
+                String topic = topicEntry.getKey();
+                checkResult = 
processConsumeQueuesForTopic(topicEntry.getValue(), topic, rocksDBMessageStore, 
diffResult, true, requestHeader.getCheckStoreTime());
+                if (!checkResult) {
+                    break;
+                }
+            }
+            diffResult.append("check all topic successful, 
size:").append(cqTable.size());
+            result.setCheckResult(diffResult.toString());
+            result.setCheckStatus(checkResult ? 
CheckRocksdbCqWriteResult.CheckStatus.CHECK_OK.getValue() : 
CheckRocksdbCqWriteResult.CheckStatus.CHECK_NOT_OK.getValue());
+        } catch (Exception e) {
+            LOGGER.error("CheckRocksdbCqWriteProgressCommand error", e);
+            result.setCheckResult(e.getMessage() + 
Arrays.toString(e.getStackTrace()));
+            
result.setCheckStatus(CheckRocksdbCqWriteResult.CheckStatus.CHECK_ERROR.getValue());
+        }
+        return result;
+    }
+
+    private boolean processConsumeQueuesForTopic(ConcurrentMap<Integer, 
ConsumeQueueInterface> queueMap, String topic, RocksDBMessageStore 
rocksDBMessageStore, StringBuilder diffResult, boolean checkAll, long 
checkStoreTime) {
+        for (Map.Entry<Integer, ConsumeQueueInterface> queueEntry : 
queueMap.entrySet()) {
+            Integer queueId = queueEntry.getKey();
+            ConsumeQueueInterface jsonCq = queueEntry.getValue();
+            ConsumeQueueInterface kvCq = 
rocksDBMessageStore.getConsumeQueue(topic, queueId);
+            if (!checkAll) {
+                String format = String.format("[topic: %s, queue:  %s] \n  
kvEarliest : %s |  kvLatest : %s \n fileEarliest: %s | fileEarliest: %s ",
+                    topic, queueId, kvCq.getEarliestUnit(), 
kvCq.getLatestUnit(), jsonCq.getEarliestUnit(), jsonCq.getLatestUnit());
+                diffResult.append(format).append("\n");
+            }
+            long maxFileOffsetInQueue = jsonCq.getMaxOffsetInQueue();
+            long minOffsetInQueue = kvCq.getMinOffsetInQueue();
+
+            // The latest message is earlier than the check time
+            Pair<CqUnit, Long> fileLatestCq = 
jsonCq.getCqUnitAndStoreTime(maxFileOffsetInQueue);
+            if (fileLatestCq != null) {
+                if (fileLatestCq.getObject2() < checkStoreTime) {
+                    continue;
+                }
+            }
+
+            for (long i = minOffsetInQueue; i < maxFileOffsetInQueue; i++) {

Review Comment:
   Should start from the checkpointByStoreTime to reduce unnecessary Iterate.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@rocketmq.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to