This is an automated email from the ASF dual-hosted git repository. jinrongtong pushed a commit to branch develop in repository https://gitbox.apache.org/repos/asf/rocketmq.git
The following commit(s) were added to refs/heads/develop by this push: new 2c898c9b31 [ISSUE #7689] In Controller mode, messages may lost due to sharing the same cq offset (#7690) 2c898c9b31 is described below commit 2c898c9b31bf195174cf1e3a626a7c61f7576381 Author: Ji Juntao <juntao....@alibaba-inc.com> AuthorDate: Thu Dec 21 14:41:51 2023 +0800 [ISSUE #7689] In Controller mode, messages may lost due to sharing the same cq offset (#7690) * fix the reput bug. * add more logs. * refactor the method of compensating for HA. * not modify the imports. * refactor the log. * refactor the log. --- .../rocketmq/store/queue/ConsumeQueueStore.java | 76 ++++++++++++---------- 1 file changed, 40 insertions(+), 36 deletions(-) diff --git a/store/src/main/java/org/apache/rocketmq/store/queue/ConsumeQueueStore.java b/store/src/main/java/org/apache/rocketmq/store/queue/ConsumeQueueStore.java index 616511b67f..cbe9b4f5ac 100644 --- a/store/src/main/java/org/apache/rocketmq/store/queue/ConsumeQueueStore.java +++ b/store/src/main/java/org/apache/rocketmq/store/queue/ConsumeQueueStore.java @@ -455,48 +455,52 @@ public class ConsumeQueueStore extends AbstractConsumeQueueStore { } // Correct unSubmit consumeOffset - if (messageStoreConfig.isDuplicationEnable()) { - SelectMappedBufferResult lastBuffer = null; - long startReadOffset = messageStore.getCommitLog().getConfirmOffset() == -1 ? 0 : messageStore.getCommitLog().getConfirmOffset(); - while ((lastBuffer = messageStore.selectOneMessageByOffset(startReadOffset)) != null) { - try { - if (lastBuffer.getStartOffset() > startReadOffset) { - startReadOffset = lastBuffer.getStartOffset(); - continue; - } + if (messageStoreConfig.isDuplicationEnable() || messageStore.getBrokerConfig().isEnableControllerMode()) { + compensateForHA(cqOffsetTable); + } - ByteBuffer bb = lastBuffer.getByteBuffer(); - int magicCode = bb.getInt(bb.position() + 4); - if (magicCode == CommitLog.BLANK_MAGIC_CODE) { - startReadOffset += bb.getInt(bb.position()); - continue; - } else if (magicCode != MessageDecoder.MESSAGE_MAGIC_CODE) { - throw new RuntimeException("Unknown magicCode: " + magicCode); - } + this.setTopicQueueTable(cqOffsetTable); + this.setBatchTopicQueueTable(bcqOffsetTable); + } + private void compensateForHA(ConcurrentMap<String, Long> cqOffsetTable) { + SelectMappedBufferResult lastBuffer = null; + long startReadOffset = messageStore.getCommitLog().getConfirmOffset() == -1 ? 0 : messageStore.getCommitLog().getConfirmOffset(); + log.info("Correct unsubmitted offset...StartReadOffset = {}", startReadOffset); + while ((lastBuffer = messageStore.selectOneMessageByOffset(startReadOffset)) != null) { + try { + if (lastBuffer.getStartOffset() > startReadOffset) { + startReadOffset = lastBuffer.getStartOffset(); + continue; + } - lastBuffer.getByteBuffer().mark(); - DispatchRequest dispatchRequest = messageStore.getCommitLog().checkMessageAndReturnSize(lastBuffer.getByteBuffer(), true, true, true); - if (!dispatchRequest.isSuccess()) - break; - lastBuffer.getByteBuffer().reset(); - - MessageExt msg = MessageDecoder.decode(lastBuffer.getByteBuffer(), true, false, false, false, true); - if (msg == null) - break; - - String key = msg.getTopic() + "-" + msg.getQueueId(); - cqOffsetTable.put(key, msg.getQueueOffset() + 1); - startReadOffset += msg.getStoreSize(); - } finally { - if (lastBuffer != null) - lastBuffer.release(); + ByteBuffer bb = lastBuffer.getByteBuffer(); + int magicCode = bb.getInt(bb.position() + 4); + if (magicCode == CommitLog.BLANK_MAGIC_CODE) { + startReadOffset += bb.getInt(bb.position()); + continue; + } else if (magicCode != MessageDecoder.MESSAGE_MAGIC_CODE) { + throw new RuntimeException("Unknown magicCode: " + magicCode); } + lastBuffer.getByteBuffer().mark(); + DispatchRequest dispatchRequest = messageStore.getCommitLog().checkMessageAndReturnSize(lastBuffer.getByteBuffer(), true, messageStoreConfig.isDuplicationEnable(), true); + if (!dispatchRequest.isSuccess()) + break; + lastBuffer.getByteBuffer().reset(); + + MessageExt msg = MessageDecoder.decode(lastBuffer.getByteBuffer(), true, false, false, false, true); + if (msg == null) + break; + + String key = msg.getTopic() + "-" + msg.getQueueId(); + cqOffsetTable.put(key, msg.getQueueOffset() + 1); + startReadOffset += msg.getStoreSize(); + log.info("Correcting. Key:{}, start read Offset: {}", key, startReadOffset); + } finally { + if (lastBuffer != null) + lastBuffer.release(); } } - - this.setTopicQueueTable(cqOffsetTable); - this.setBatchTopicQueueTable(bcqOffsetTable); } @Override