This is an automated email from the ASF dual-hosted git repository. yuzhou pushed a commit to branch develop in repository https://gitbox.apache.org/repos/asf/rocketmq.git
The following commit(s) were added to refs/heads/develop by this push: new 646e2a4942 [ISSUE #7355] fix dledger recover abnormally may lost consume queue of tail (#7599) 646e2a4942 is described below commit 646e2a4942c62eb36b1601a41ebc0828e8580804 Author: bxfjb <48467309+bx...@users.noreply.github.com> AuthorDate: Thu Jan 18 15:43:01 2024 +0800 [ISSUE #7355] fix dledger recover abnormally may lost consume queue of tail (#7599) * fix dledger recover abnormally may lost consume queue of tail * fix correct storeTimestampPosition when bornhost is v6 * fix correct SYSFLAG offset --------- Co-authored-by: 赵宇晗 <zhaoyu...@xiaomi.com> --- .../rocketmq/store/dledger/DLedgerCommitLog.java | 137 ++++++++++++++++++++- .../store/dledger/DLedgerCommitlogTest.java | 40 ++++++ 2 files changed, 172 insertions(+), 5 deletions(-) diff --git a/store/src/main/java/org/apache/rocketmq/store/dledger/DLedgerCommitLog.java b/store/src/main/java/org/apache/rocketmq/store/dledger/DLedgerCommitLog.java index 70371d83b8..27a18abc9d 100644 --- a/store/src/main/java/org/apache/rocketmq/store/dledger/DLedgerCommitLog.java +++ b/store/src/main/java/org/apache/rocketmq/store/dledger/DLedgerCommitLog.java @@ -290,9 +290,9 @@ public class DLedgerCommitLog extends CommitLog { return false; } - private void recover(long maxPhyOffsetOfConsumeQueue) throws RocksDBException { + private void dledgerRecoverNormally(long maxPhyOffsetOfConsumeQueue) throws RocksDBException { dLedgerFileStore.load(); - if (dLedgerFileList.getMappedFiles().size() > 0) { + if (!dLedgerFileList.getMappedFiles().isEmpty()) { dLedgerFileStore.recover(); dividedCommitlogOffset = dLedgerFileList.getFirstMappedFile().getFileFromOffset(); MappedFile mappedFile = this.mappedFileQueue.getLastMappedFile(); @@ -309,9 +309,93 @@ public class DLedgerCommitLog extends CommitLog { } //Indicate that, it is the first time to load mixed commitlog, need to recover the old commitlog isInrecoveringOldCommitlog = true; - //No need the abnormal recover super.recoverNormally(maxPhyOffsetOfConsumeQueue); isInrecoveringOldCommitlog = false; + + setRecoverPosition(); + + } + + private void dledgerRecoverAbnormally(long maxPhyOffsetOfConsumeQueue) throws RocksDBException { + boolean checkCRCOnRecover = this.defaultMessageStore.getMessageStoreConfig().isCheckCRCOnRecover(); + dLedgerFileStore.load(); + if (!dLedgerFileList.getMappedFiles().isEmpty()) { + dLedgerFileStore.recover(); + dividedCommitlogOffset = dLedgerFileList.getFirstMappedFile().getFileFromOffset(); + MappedFile mappedFile = this.mappedFileQueue.getLastMappedFile(); + if (mappedFile != null) { + disableDeleteDledger(); + } + List<MmapFile> mmapFiles = dLedgerFileList.getMappedFiles(); + int index = mmapFiles.size() - 1; + MmapFile mmapFile = null; + for (; index >= 0; index--) { + mmapFile = mmapFiles.get(index); + if (isMmapFileMatchedRecover(mmapFile)) { + log.info("dledger recover from this mappFile " + mmapFile.getFileName()); + break; + } + } + + if (index < 0) { + index = 0; + mmapFile = mmapFiles.get(index); + } + + ByteBuffer byteBuffer = mmapFile.sliceByteBuffer(); + long processOffset = mmapFile.getFileFromOffset(); + long mmapFileOffset = 0; + while (true) { + DispatchRequest dispatchRequest = this.checkMessageAndReturnSize(byteBuffer, checkCRCOnRecover, true); + int size = dispatchRequest.getMsgSize(); + + if (dispatchRequest.isSuccess()) { + if (size > 0) { + mmapFileOffset += size; + if (this.defaultMessageStore.getMessageStoreConfig().isDuplicationEnable()) { + if (dispatchRequest.getCommitLogOffset() < this.defaultMessageStore.getConfirmOffset()) { + this.defaultMessageStore.doDispatch(dispatchRequest); + } + } else { + this.defaultMessageStore.doDispatch(dispatchRequest); + } + } else if (size == 0) { + index++; + if (index >= mmapFiles.size()) { + log.info("dledger recover physics file over, last mapped file " + mmapFile.getFileName()); + break; + } else { + mmapFile = mmapFiles.get(index); + byteBuffer = mmapFile.sliceByteBuffer(); + processOffset = mmapFile.getFileFromOffset(); + mmapFileOffset = 0; + log.info("dledger recover next physics file, " + mmapFile.getFileName()); + } + } + } else { + log.info("dledger recover physics file end, " + mmapFile.getFileName() + " pos=" + byteBuffer.position()); + break; + } + } + + processOffset += mmapFileOffset; + + if (maxPhyOffsetOfConsumeQueue >= processOffset) { + log.warn("dledger maxPhyOffsetOfConsumeQueue({}) >= processOffset({}), truncate dirty logic files", maxPhyOffsetOfConsumeQueue, processOffset); + this.defaultMessageStore.truncateDirtyLogicFiles(processOffset); + } + return; + } + isInrecoveringOldCommitlog = true; + super.recoverAbnormally(maxPhyOffsetOfConsumeQueue); + + isInrecoveringOldCommitlog = false; + + setRecoverPosition(); + + } + + private void setRecoverPosition() { MappedFile mappedFile = this.mappedFileQueue.getLastMappedFile(); if (mappedFile == null) { return; @@ -343,14 +427,57 @@ public class DLedgerCommitLog extends CommitLog { log.info("Will set the initial commitlog offset={} for dledger", dividedCommitlogOffset); } + private boolean isMmapFileMatchedRecover(final MmapFile mmapFile) { + ByteBuffer byteBuffer = mmapFile.sliceByteBuffer(); + + int magicCode = byteBuffer.getInt(DLedgerEntry.BODY_OFFSET + MessageDecoder.MESSAGE_MAGIC_CODE_POSITION); + if (magicCode != MESSAGE_MAGIC_CODE) { + return false; + } + + int storeTimestampPosition; + int sysFlag = byteBuffer.getInt(DLedgerEntry.BODY_OFFSET + MessageDecoder.SYSFLAG_POSITION); + if ((sysFlag & MessageSysFlag.BORNHOST_V6_FLAG) == 0) { + storeTimestampPosition = MessageDecoder.MESSAGE_STORE_TIMESTAMP_POSITION; + } else { + // v6 address is 12 byte larger than v4 + storeTimestampPosition = MessageDecoder.MESSAGE_STORE_TIMESTAMP_POSITION + 12; + } + + long storeTimestamp = byteBuffer.getLong(DLedgerEntry.BODY_OFFSET + storeTimestampPosition); + if (storeTimestamp == 0) { + return false; + } + + if (this.defaultMessageStore.getMessageStoreConfig().isMessageIndexEnable() + && this.defaultMessageStore.getMessageStoreConfig().isMessageIndexSafe()) { + if (storeTimestamp <= this.defaultMessageStore.getStoreCheckpoint().getMinTimestampIndex()) { + log.info("dledger find check timestamp, {} {}", + storeTimestamp, + UtilAll.timeMillisToHumanString(storeTimestamp)); + return true; + } + } else { + if (storeTimestamp <= this.defaultMessageStore.getStoreCheckpoint().getMinTimestamp()) { + log.info("dledger find check timestamp, {} {}", + storeTimestamp, + UtilAll.timeMillisToHumanString(storeTimestamp)); + return true; + } + } + + return false; + + } + @Override public void recoverNormally(long maxPhyOffsetOfConsumeQueue) throws RocksDBException { - recover(maxPhyOffsetOfConsumeQueue); + dledgerRecoverNormally(maxPhyOffsetOfConsumeQueue); } @Override public void recoverAbnormally(long maxPhyOffsetOfConsumeQueue) throws RocksDBException { - recover(maxPhyOffsetOfConsumeQueue); + dledgerRecoverAbnormally(maxPhyOffsetOfConsumeQueue); } @Override diff --git a/store/src/test/java/org/apache/rocketmq/store/dledger/DLedgerCommitlogTest.java b/store/src/test/java/org/apache/rocketmq/store/dledger/DLedgerCommitlogTest.java index 234273b6af..1e4bbf21bd 100644 --- a/store/src/test/java/org/apache/rocketmq/store/dledger/DLedgerCommitlogTest.java +++ b/store/src/test/java/org/apache/rocketmq/store/dledger/DLedgerCommitlogTest.java @@ -19,6 +19,8 @@ package org.apache.rocketmq.store.dledger; import io.openmessaging.storage.dledger.DLedgerServer; import io.openmessaging.storage.dledger.store.file.DLedgerMmapFileStore; import io.openmessaging.storage.dledger.store.file.MmapFileList; + +import java.io.File; import java.nio.ByteBuffer; import java.time.Duration; import java.util.ArrayList; @@ -36,6 +38,8 @@ import org.apache.rocketmq.store.GetMessageResult; import org.apache.rocketmq.store.GetMessageStatus; import org.apache.rocketmq.store.PutMessageResult; import org.apache.rocketmq.store.PutMessageStatus; +import org.apache.rocketmq.store.StoreCheckpoint; +import org.apache.rocketmq.store.config.StorePathConfigHelper; import org.junit.Assert; import org.junit.Test; import org.junit.Assume; @@ -146,6 +150,42 @@ public class DLedgerCommitlogTest extends MessageStoreTestBase { messageStore.shutdown(); } } + @Test + public void testDLedgerAbnormallyRecover() throws Exception { + String base = createBaseDir(); + String peers = String.format("n0-localhost:%d", nextPort()); + String group = UUID.randomUUID().toString(); + String topic = UUID.randomUUID().toString(); + + int messageNumPerQueue = 100; + + DefaultMessageStore messageStore = createDledgerMessageStore(base, group, "n0", peers, null, false, 0); + Thread.sleep(1000); + doPutMessages(messageStore, topic, 0, messageNumPerQueue, 0); + doPutMessages(messageStore, topic, 1, messageNumPerQueue, 0); + Thread.sleep(1000); + Assert.assertEquals(0, messageStore.getMinOffsetInQueue(topic, 0)); + Assert.assertEquals(messageNumPerQueue, messageStore.getMaxOffsetInQueue(topic, 0)); + Assert.assertEquals(0, messageStore.dispatchBehindBytes()); + doGetMessages(messageStore, topic, 0, messageNumPerQueue, 0); + StoreCheckpoint storeCheckpoint = messageStore.getStoreCheckpoint(); + storeCheckpoint.setPhysicMsgTimestamp(0); + storeCheckpoint.setLogicsMsgTimestamp(0); + messageStore.shutdown(); + + String fileName = StorePathConfigHelper.getAbortFile(base); + makeSureFileExists(fileName); + + File file = new File(base + File.separator + "consumequeue" + File.separator + topic + File.separator + "0" + File.separator + "00000000000000001040"); + file.delete(); +// truncateAllConsumeQueue(base + File.separator + "consumequeue" + File.separator + topic + File.separator); + messageStore = createDledgerMessageStore(base, group, "n0", peers, null, false, 0); + Thread.sleep(1000); + doGetMessages(messageStore, topic, 0, messageNumPerQueue, 0); + doGetMessages(messageStore, topic, 1, messageNumPerQueue, 0); + messageStore.shutdown(); + + } @Test public void testPutAndGetMessage() throws Exception {