lollipopjin commented on code in PR #8823:
URL: https://github.com/apache/rocketmq/pull/8823#discussion_r1814814545


##########
broker/src/main/java/org/apache/rocketmq/broker/processor/AdminBrokerProcessor.java:
##########
@@ -3418,6 +3376,115 @@ private boolean validateBlackListConfigExist(Properties 
properties) {
         return false;
     }
 
+    private CheckRocksdbCqWriteResult 
doCheckRocksdbCqWriteProgress(ChannelHandlerContext ctx, RemotingCommand 
request) throws RemotingCommandException {
+        CheckRocksdbCqWriteProgressRequestHeader requestHeader = 
request.decodeCommandCustomHeader(CheckRocksdbCqWriteProgressRequestHeader.class);
+        String requestTopic = requestHeader.getTopic();
+        MessageStore messageStore = brokerController.getMessageStore();
+        DefaultMessageStore defaultMessageStore;
+        if (messageStore instanceof AbstractPluginMessageStore) {
+            defaultMessageStore = (DefaultMessageStore) 
((AbstractPluginMessageStore) messageStore).getNext();
+        } else {
+            defaultMessageStore = (DefaultMessageStore) messageStore;
+        }
+        RocksDBMessageStore rocksDBMessageStore = 
defaultMessageStore.getRocksDBMessageStore();
+        CheckRocksdbCqWriteResult result = new CheckRocksdbCqWriteResult();
+
+        if 
(defaultMessageStore.getMessageStoreConfig().getStoreType().equals(StoreType.DEFAULT_ROCKSDB.getStoreType()))
 {
+            result.setCheckResult("storeType is DEFAULT_ROCKSDB, no need 
check");
+            
result.setCheckStatus(CheckRocksdbCqWriteResult.CheckStatus.CHECK_OK.getValue());
+            return result;
+        }
+
+        if 
(!defaultMessageStore.getMessageStoreConfig().isRocksdbCQDoubleWriteEnable()) {
+            result.setCheckResult("rocksdbCQWriteEnable is false, 
checkRocksdbCqWriteProgressCommand is invalid");
+            
result.setCheckStatus(CheckRocksdbCqWriteResult.CheckStatus.CHECK_NOT_OK.getValue());
+            return result;
+        }
+
+        ConcurrentMap<String, ConcurrentMap<Integer, ConsumeQueueInterface>> 
cqTable = defaultMessageStore.getConsumeQueueTable();
+        StringBuilder diffResult = new StringBuilder();
+        try {
+            if (StringUtils.isNotBlank(requestTopic)) {
+                boolean checkResult = 
processConsumeQueuesForTopic(cqTable.get(requestTopic), requestTopic, 
rocksDBMessageStore, diffResult, true, requestHeader.getCheckStoreTime());
+                result.setCheckResult(diffResult.toString());
+                result.setCheckStatus(checkResult ? 
CheckRocksdbCqWriteResult.CheckStatus.CHECK_OK.getValue() : 
CheckRocksdbCqWriteResult.CheckStatus.CHECK_NOT_OK.getValue());
+                return result;
+            }
+            boolean checkResult = true;
+            int successNum = 0;
+            for (Map.Entry<String, ConcurrentMap<Integer, 
ConsumeQueueInterface>> topicEntry : cqTable.entrySet()) {
+                String topic = topicEntry.getKey();
+                checkResult = 
processConsumeQueuesForTopic(topicEntry.getValue(), topic, rocksDBMessageStore, 
diffResult, false, requestHeader.getCheckStoreTime());
+                if (checkResult) {
+                    successNum++;
+                }
+            }
+            // check all topic finish, all topic is ready   -> ready
+            // check all topic finish, success/all : 89/100 -> not ready
+            String checkResultString = successNum == cqTable.size() ? "all 
topic is ready" : String.format("success/all : %s/%s", successNum, 
cqTable.size());
+            diffResult.append("check all topic finish, 
").append(checkResultString);
+            result.setCheckResult(diffResult.toString());
+            result.setCheckStatus(checkResult ? 
CheckRocksdbCqWriteResult.CheckStatus.CHECK_OK.getValue() : 
CheckRocksdbCqWriteResult.CheckStatus.CHECK_NOT_OK.getValue());
+        } catch (Exception e) {
+            LOGGER.error("CheckRocksdbCqWriteProgressCommand error", e);
+            result.setCheckResult(e.getMessage() + 
Arrays.toString(e.getStackTrace()));
+            
result.setCheckStatus(CheckRocksdbCqWriteResult.CheckStatus.CHECK_ERROR.getValue());
+        }
+        return result;
+    }
+
+    private boolean processConsumeQueuesForTopic(ConcurrentMap<Integer, 
ConsumeQueueInterface> queueMap, String topic, RocksDBMessageStore 
rocksDBMessageStore, StringBuilder diffResult, boolean printDetail, long 
checkpointByStoreTime) {
+        for (Map.Entry<Integer, ConsumeQueueInterface> queueEntry : 
queueMap.entrySet()) {
+            Integer queueId = queueEntry.getKey();
+            ConsumeQueueInterface jsonCq = queueEntry.getValue();
+            ConsumeQueueInterface kvCq = 
rocksDBMessageStore.getConsumeQueue(topic, queueId);
+            if (printDetail) {
+                String format = String.format("[topic: %s, queue:  %s] \n  
kvEarliest : %s |  kvLatest : %s \n fileEarliest: %s | fileEarliest: %s ",
+                    topic, queueId, kvCq.getEarliestUnit(), 
kvCq.getLatestUnit(), jsonCq.getEarliestUnit(), jsonCq.getLatestUnit());
+                diffResult.append(format).append("\n");
+            }
+
+            long minOffsetByTime = 0L;
+            try {
+                minOffsetByTime = 
rocksDBMessageStore.getConsumeQueueStore().getOffsetInQueueByTime(topic, 
queueId, checkpointByStoreTime, BoundaryType.UPPER);
+            } catch (Exception e) {
+                // ignore
+            }
+            long minOffsetInQueue = kvCq.getMinOffsetInQueue();
+            long checkFrom = Math.max(minOffsetInQueue, minOffsetByTime);
+            long checkTo = jsonCq.getMaxOffsetInQueue();
+            // The latest message is earlier than the check time
+            Pair<CqUnit, Long> fileLatestCq = 
jsonCq.getCqUnitAndStoreTime(checkTo);
+            if (fileLatestCq != null) {
+                if (fileLatestCq.getObject2() < checkpointByStoreTime) {
+                    continue;
+                }
+            }
+
+            for (long i = checkFrom; i < checkTo; i++) {
+                Pair<CqUnit, Long> fileCqUnit = 
jsonCq.getCqUnitAndStoreTime(i);
+                Pair<CqUnit, Long> kvCqUnit = kvCq.getCqUnitAndStoreTime(i);
+                if (fileCqUnit == null || kvCqUnit == null) {
+                    String format = String.format("[topic: %s, queue: %s, 
offset: %s] \n kv   : %s  \n file : %s  \n",
+                        topic, queueId, i, kvCqUnit != null ? 
kvCqUnit.getObject1() : "null", fileCqUnit != null ? fileCqUnit.getObject1() : 
"null");
+                    LOGGER.error(format);
+                    break;

Review Comment:
   break here, the check result is true?



##########
broker/src/main/java/org/apache/rocketmq/broker/processor/AdminBrokerProcessor.java:
##########
@@ -469,74 +475,26 @@ private RemotingCommand 
updateAndGetGroupForbidden(ChannelHandlerContext ctx, Re
 
     private RemotingCommand checkRocksdbCqWriteProgress(ChannelHandlerContext 
ctx, RemotingCommand request) throws RemotingCommandException {
         CheckRocksdbCqWriteProgressRequestHeader requestHeader = 
request.decodeCommandCustomHeader(CheckRocksdbCqWriteProgressRequestHeader.class);
-        String requestTopic = requestHeader.getTopic();
-        final RemotingCommand response = 
RemotingCommand.createResponseCommand(null);
-        response.setCode(ResponseCode.SUCCESS);
-        MessageStore messageStore = brokerController.getMessageStore();
-        DefaultMessageStore defaultMessageStore;
-        if (messageStore instanceof AbstractPluginMessageStore) {
-            defaultMessageStore = (DefaultMessageStore) 
((AbstractPluginMessageStore) messageStore).getNext();
+        CheckRocksdbCqWriteResult result = new CheckRocksdbCqWriteResult();
+        
result.setCheckStatus(CheckRocksdbCqWriteResult.CheckStatus.CHECK_IN_PROGRESS.getValue());
+        if (requestHeader.isAsync()) {

Review Comment:
   Remove the async switch, by default, all check request should use async way 
to avoid admin threads blocked.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@rocketmq.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to