xdkxlk commented on code in PR #5755:
URL: https://github.com/apache/rocketmq/pull/5755#discussion_r1055140306
##########
broker/src/main/java/org/apache/rocketmq/broker/processor/PopReviveService.java:
##########
@@ -451,30 +465,79 @@ protected void mergeAndRevive(ConsumeReviveObj
consumeReviveObj) throws Throwabl
}
this.brokerController.getConsumerOffsetManager().commitOffset(PopAckConstants.LOCAL_HOST,
PopAckConstants.REVIVE_GROUP, reviveTopic, queueId, newOffset);
}
+ reviveOffset = newOffset;
consumeReviveObj.newOffset = newOffset;
}
- private void reviveMsgFromCk(PopCheckPoint popCheckPoint) throws Throwable
{
+ private void reviveMsgFromCk(PopCheckPoint popCheckPoint) {
+ if (!shouldRunPopRevive) {
+ POP_LOGGER.info("slave skip retry , revive topic={},
reviveQueueId={}", reviveTopic, queueId);
+ return;
+ }
+ inflightReviveRequestMap.put(popCheckPoint, new
Pair<>(System.currentTimeMillis(), false));
+ List<CompletableFuture<Pair<Long, Boolean>>> futureList = new
ArrayList<>(popCheckPoint.getNum());
for (int j = 0; j < popCheckPoint.getNum(); j++) {
if (DataConverter.getBit(popCheckPoint.getBitMap(), j)) {
continue;
}
// retry msg
long msgOffset = popCheckPoint.ackOffsetByIndex((byte) j);
- MessageExt messageExt = getBizMessage(popCheckPoint.getTopic(),
msgOffset, popCheckPoint.getQueueId(), popCheckPoint.getBrokerName());
- if (messageExt == null) {
- POP_LOGGER.warn("reviveQueueId={},can not get biz msg topic is
{}, offset is {} , then continue ",
- queueId, popCheckPoint.getTopic(), msgOffset);
- continue;
- }
- //skip ck from last epoch
- if (popCheckPoint.getPopTime() < messageExt.getStoreTimestamp()) {
- POP_LOGGER.warn("reviveQueueId={},skip ck from last epoch {}",
queueId, popCheckPoint);
- continue;
- }
- reviveRetry(popCheckPoint, messageExt);
+ CompletableFuture<Pair<Long, Boolean>> future =
getBizMessage(popCheckPoint.getTopic(), msgOffset, popCheckPoint.getQueueId(),
popCheckPoint.getBrokerName())
+ .thenApply(messageExt -> {
+ if (messageExt == null) {
+ POP_LOGGER.warn("reviveQueueId={}, can not get biz msg
topic is {}, offset is {}, then continue",
+ queueId, popCheckPoint.getTopic(), msgOffset);
+ return new Pair<>(msgOffset, true);
+ }
+ //skip ck from last epoch
+ if (popCheckPoint.getPopTime() <
messageExt.getStoreTimestamp()) {
+ POP_LOGGER.warn("reviveQueueId={}, skip ck from last
epoch {}", queueId, popCheckPoint);
+ return new Pair<>(msgOffset, false);
Review Comment:
In my opinion, there should be `new Pair<>(msgOffset, true)` to skip ck from
last epoch
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]