lollipopjin commented on code in PR #8823: URL: https://github.com/apache/rocketmq/pull/8823#discussion_r1814814545
########## broker/src/main/java/org/apache/rocketmq/broker/processor/AdminBrokerProcessor.java: ########## @@ -3418,6 +3376,115 @@ private boolean validateBlackListConfigExist(Properties properties) { return false; } + private CheckRocksdbCqWriteResult doCheckRocksdbCqWriteProgress(ChannelHandlerContext ctx, RemotingCommand request) throws RemotingCommandException { + CheckRocksdbCqWriteProgressRequestHeader requestHeader = request.decodeCommandCustomHeader(CheckRocksdbCqWriteProgressRequestHeader.class); + String requestTopic = requestHeader.getTopic(); + MessageStore messageStore = brokerController.getMessageStore(); + DefaultMessageStore defaultMessageStore; + if (messageStore instanceof AbstractPluginMessageStore) { + defaultMessageStore = (DefaultMessageStore) ((AbstractPluginMessageStore) messageStore).getNext(); + } else { + defaultMessageStore = (DefaultMessageStore) messageStore; + } + RocksDBMessageStore rocksDBMessageStore = defaultMessageStore.getRocksDBMessageStore(); + CheckRocksdbCqWriteResult result = new CheckRocksdbCqWriteResult(); + + if (defaultMessageStore.getMessageStoreConfig().getStoreType().equals(StoreType.DEFAULT_ROCKSDB.getStoreType())) { + result.setCheckResult("storeType is DEFAULT_ROCKSDB, no need check"); + result.setCheckStatus(CheckRocksdbCqWriteResult.CheckStatus.CHECK_OK.getValue()); + return result; + } + + if (!defaultMessageStore.getMessageStoreConfig().isRocksdbCQDoubleWriteEnable()) { + result.setCheckResult("rocksdbCQWriteEnable is false, checkRocksdbCqWriteProgressCommand is invalid"); + result.setCheckStatus(CheckRocksdbCqWriteResult.CheckStatus.CHECK_NOT_OK.getValue()); + return result; + } + + ConcurrentMap<String, ConcurrentMap<Integer, ConsumeQueueInterface>> cqTable = defaultMessageStore.getConsumeQueueTable(); + StringBuilder diffResult = new StringBuilder(); + try { + if (StringUtils.isNotBlank(requestTopic)) { + boolean checkResult = processConsumeQueuesForTopic(cqTable.get(requestTopic), requestTopic, rocksDBMessageStore, diffResult, true, requestHeader.getCheckStoreTime()); + result.setCheckResult(diffResult.toString()); + result.setCheckStatus(checkResult ? CheckRocksdbCqWriteResult.CheckStatus.CHECK_OK.getValue() : CheckRocksdbCqWriteResult.CheckStatus.CHECK_NOT_OK.getValue()); + return result; + } + boolean checkResult = true; + int successNum = 0; + for (Map.Entry<String, ConcurrentMap<Integer, ConsumeQueueInterface>> topicEntry : cqTable.entrySet()) { + String topic = topicEntry.getKey(); + checkResult = processConsumeQueuesForTopic(topicEntry.getValue(), topic, rocksDBMessageStore, diffResult, false, requestHeader.getCheckStoreTime()); + if (checkResult) { + successNum++; + } + } + // check all topic finish, all topic is ready -> ready + // check all topic finish, success/all : 89/100 -> not ready + String checkResultString = successNum == cqTable.size() ? "all topic is ready" : String.format("success/all : %s/%s", successNum, cqTable.size()); + diffResult.append("check all topic finish, ").append(checkResultString); + result.setCheckResult(diffResult.toString()); + result.setCheckStatus(checkResult ? CheckRocksdbCqWriteResult.CheckStatus.CHECK_OK.getValue() : CheckRocksdbCqWriteResult.CheckStatus.CHECK_NOT_OK.getValue()); + } catch (Exception e) { + LOGGER.error("CheckRocksdbCqWriteProgressCommand error", e); + result.setCheckResult(e.getMessage() + Arrays.toString(e.getStackTrace())); + result.setCheckStatus(CheckRocksdbCqWriteResult.CheckStatus.CHECK_ERROR.getValue()); + } + return result; + } + + private boolean processConsumeQueuesForTopic(ConcurrentMap<Integer, ConsumeQueueInterface> queueMap, String topic, RocksDBMessageStore rocksDBMessageStore, StringBuilder diffResult, boolean printDetail, long checkpointByStoreTime) { + for (Map.Entry<Integer, ConsumeQueueInterface> queueEntry : queueMap.entrySet()) { + Integer queueId = queueEntry.getKey(); + ConsumeQueueInterface jsonCq = queueEntry.getValue(); + ConsumeQueueInterface kvCq = rocksDBMessageStore.getConsumeQueue(topic, queueId); + if (printDetail) { + String format = String.format("[topic: %s, queue: %s] \n kvEarliest : %s | kvLatest : %s \n fileEarliest: %s | fileEarliest: %s ", + topic, queueId, kvCq.getEarliestUnit(), kvCq.getLatestUnit(), jsonCq.getEarliestUnit(), jsonCq.getLatestUnit()); + diffResult.append(format).append("\n"); + } + + long minOffsetByTime = 0L; + try { + minOffsetByTime = rocksDBMessageStore.getConsumeQueueStore().getOffsetInQueueByTime(topic, queueId, checkpointByStoreTime, BoundaryType.UPPER); + } catch (Exception e) { + // ignore + } + long minOffsetInQueue = kvCq.getMinOffsetInQueue(); + long checkFrom = Math.max(minOffsetInQueue, minOffsetByTime); + long checkTo = jsonCq.getMaxOffsetInQueue(); + // The latest message is earlier than the check time + Pair<CqUnit, Long> fileLatestCq = jsonCq.getCqUnitAndStoreTime(checkTo); + if (fileLatestCq != null) { + if (fileLatestCq.getObject2() < checkpointByStoreTime) { + continue; + } + } + + for (long i = checkFrom; i < checkTo; i++) { + Pair<CqUnit, Long> fileCqUnit = jsonCq.getCqUnitAndStoreTime(i); + Pair<CqUnit, Long> kvCqUnit = kvCq.getCqUnitAndStoreTime(i); + if (fileCqUnit == null || kvCqUnit == null) { + String format = String.format("[topic: %s, queue: %s, offset: %s] \n kv : %s \n file : %s \n", + topic, queueId, i, kvCqUnit != null ? kvCqUnit.getObject1() : "null", fileCqUnit != null ? fileCqUnit.getObject1() : "null"); + LOGGER.error(format); + break; Review Comment: break here, the check result is true? ########## broker/src/main/java/org/apache/rocketmq/broker/processor/AdminBrokerProcessor.java: ########## @@ -469,74 +475,26 @@ private RemotingCommand updateAndGetGroupForbidden(ChannelHandlerContext ctx, Re private RemotingCommand checkRocksdbCqWriteProgress(ChannelHandlerContext ctx, RemotingCommand request) throws RemotingCommandException { CheckRocksdbCqWriteProgressRequestHeader requestHeader = request.decodeCommandCustomHeader(CheckRocksdbCqWriteProgressRequestHeader.class); - String requestTopic = requestHeader.getTopic(); - final RemotingCommand response = RemotingCommand.createResponseCommand(null); - response.setCode(ResponseCode.SUCCESS); - MessageStore messageStore = brokerController.getMessageStore(); - DefaultMessageStore defaultMessageStore; - if (messageStore instanceof AbstractPluginMessageStore) { - defaultMessageStore = (DefaultMessageStore) ((AbstractPluginMessageStore) messageStore).getNext(); + CheckRocksdbCqWriteResult result = new CheckRocksdbCqWriteResult(); + result.setCheckStatus(CheckRocksdbCqWriteResult.CheckStatus.CHECK_IN_PROGRESS.getValue()); + if (requestHeader.isAsync()) { Review Comment: Remove the async switch, by default, all check request should use async way to avoid admin threads blocked. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@rocketmq.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org