This is an automated email from the ASF dual-hosted git repository.

yuzhou pushed a commit to branch develop
in repository https://gitbox.apache.org/repos/asf/rocketmq.git


The following commit(s) were added to refs/heads/develop by this push:
     new 646e2a4942 [ISSUE #7355] fix dledger recover abnormally may lost 
consume queue of tail (#7599)
646e2a4942 is described below

commit 646e2a4942c62eb36b1601a41ebc0828e8580804
Author: bxfjb <48467309+bx...@users.noreply.github.com>
AuthorDate: Thu Jan 18 15:43:01 2024 +0800

    [ISSUE #7355] fix dledger recover abnormally may lost consume queue of tail 
(#7599)
    
    * fix dledger recover abnormally may lost consume queue of tail
    
    * fix correct storeTimestampPosition when bornhost is v6
    
    * fix correct SYSFLAG offset
    
    ---------
    
    Co-authored-by: 赵宇晗 <zhaoyu...@xiaomi.com>
---
 .../rocketmq/store/dledger/DLedgerCommitLog.java   | 137 ++++++++++++++++++++-
 .../store/dledger/DLedgerCommitlogTest.java        |  40 ++++++
 2 files changed, 172 insertions(+), 5 deletions(-)

diff --git 
a/store/src/main/java/org/apache/rocketmq/store/dledger/DLedgerCommitLog.java 
b/store/src/main/java/org/apache/rocketmq/store/dledger/DLedgerCommitLog.java
index 70371d83b8..27a18abc9d 100644
--- 
a/store/src/main/java/org/apache/rocketmq/store/dledger/DLedgerCommitLog.java
+++ 
b/store/src/main/java/org/apache/rocketmq/store/dledger/DLedgerCommitLog.java
@@ -290,9 +290,9 @@ public class DLedgerCommitLog extends CommitLog {
         return false;
     }
 
-    private void recover(long maxPhyOffsetOfConsumeQueue) throws 
RocksDBException {
+    private void dledgerRecoverNormally(long maxPhyOffsetOfConsumeQueue) 
throws RocksDBException {
         dLedgerFileStore.load();
-        if (dLedgerFileList.getMappedFiles().size() > 0) {
+        if (!dLedgerFileList.getMappedFiles().isEmpty()) {
             dLedgerFileStore.recover();
             dividedCommitlogOffset = 
dLedgerFileList.getFirstMappedFile().getFileFromOffset();
             MappedFile mappedFile = this.mappedFileQueue.getLastMappedFile();
@@ -309,9 +309,93 @@ public class DLedgerCommitLog extends CommitLog {
         }
         //Indicate that, it is the first time to load mixed commitlog, need to 
recover the old commitlog
         isInrecoveringOldCommitlog = true;
-        //No need the abnormal recover
         super.recoverNormally(maxPhyOffsetOfConsumeQueue);
         isInrecoveringOldCommitlog = false;
+
+        setRecoverPosition();
+
+    }
+
+    private void dledgerRecoverAbnormally(long maxPhyOffsetOfConsumeQueue) 
throws RocksDBException {
+        boolean checkCRCOnRecover = 
this.defaultMessageStore.getMessageStoreConfig().isCheckCRCOnRecover();
+        dLedgerFileStore.load();
+        if (!dLedgerFileList.getMappedFiles().isEmpty()) {
+            dLedgerFileStore.recover();
+            dividedCommitlogOffset = 
dLedgerFileList.getFirstMappedFile().getFileFromOffset();
+            MappedFile mappedFile = this.mappedFileQueue.getLastMappedFile();
+            if (mappedFile != null) {
+                disableDeleteDledger();
+            }
+            List<MmapFile> mmapFiles = dLedgerFileList.getMappedFiles();
+            int index = mmapFiles.size() - 1;
+            MmapFile mmapFile = null;
+            for (; index >= 0; index--) {
+                mmapFile = mmapFiles.get(index);
+                if (isMmapFileMatchedRecover(mmapFile)) {
+                    log.info("dledger recover from this mappFile " + 
mmapFile.getFileName());
+                    break;
+                }
+            }
+
+            if (index < 0) {
+                index = 0;
+                mmapFile = mmapFiles.get(index);
+            }
+
+            ByteBuffer byteBuffer = mmapFile.sliceByteBuffer();
+            long processOffset = mmapFile.getFileFromOffset();
+            long mmapFileOffset = 0;
+            while (true) {
+                DispatchRequest dispatchRequest = 
this.checkMessageAndReturnSize(byteBuffer, checkCRCOnRecover, true);
+                int size = dispatchRequest.getMsgSize();
+
+                if (dispatchRequest.isSuccess()) {
+                    if (size > 0) {
+                        mmapFileOffset += size;
+                        if 
(this.defaultMessageStore.getMessageStoreConfig().isDuplicationEnable()) {
+                            if (dispatchRequest.getCommitLogOffset() < 
this.defaultMessageStore.getConfirmOffset()) {
+                                
this.defaultMessageStore.doDispatch(dispatchRequest);
+                            }
+                        } else {
+                            
this.defaultMessageStore.doDispatch(dispatchRequest);
+                        }
+                    } else if (size == 0) {
+                        index++;
+                        if (index >= mmapFiles.size()) {
+                            log.info("dledger recover physics file over, last 
mapped file " + mmapFile.getFileName());
+                            break;
+                        } else {
+                            mmapFile = mmapFiles.get(index);
+                            byteBuffer = mmapFile.sliceByteBuffer();
+                            processOffset = mmapFile.getFileFromOffset();
+                            mmapFileOffset = 0;
+                            log.info("dledger recover next physics file, " + 
mmapFile.getFileName());
+                        }
+                    }
+                } else {
+                    log.info("dledger recover physics file end, " + 
mmapFile.getFileName() + " pos=" + byteBuffer.position());
+                    break;
+                }
+            }
+
+            processOffset += mmapFileOffset;
+
+            if (maxPhyOffsetOfConsumeQueue >= processOffset) {
+                log.warn("dledger maxPhyOffsetOfConsumeQueue({}) >= 
processOffset({}), truncate dirty logic files", maxPhyOffsetOfConsumeQueue, 
processOffset);
+                
this.defaultMessageStore.truncateDirtyLogicFiles(processOffset);
+            }
+            return;
+        }
+        isInrecoveringOldCommitlog = true;
+        super.recoverAbnormally(maxPhyOffsetOfConsumeQueue);
+
+        isInrecoveringOldCommitlog = false;
+
+        setRecoverPosition();
+
+    }
+
+    private void setRecoverPosition() {
         MappedFile mappedFile = this.mappedFileQueue.getLastMappedFile();
         if (mappedFile == null) {
             return;
@@ -343,14 +427,57 @@ public class DLedgerCommitLog extends CommitLog {
         log.info("Will set the initial commitlog offset={} for dledger", 
dividedCommitlogOffset);
     }
 
+    private boolean isMmapFileMatchedRecover(final MmapFile mmapFile) {
+        ByteBuffer byteBuffer = mmapFile.sliceByteBuffer();
+
+        int magicCode = byteBuffer.getInt(DLedgerEntry.BODY_OFFSET + 
MessageDecoder.MESSAGE_MAGIC_CODE_POSITION);
+        if (magicCode != MESSAGE_MAGIC_CODE) {
+            return false;
+        }
+
+        int storeTimestampPosition;
+        int sysFlag = byteBuffer.getInt(DLedgerEntry.BODY_OFFSET + 
MessageDecoder.SYSFLAG_POSITION);
+        if ((sysFlag & MessageSysFlag.BORNHOST_V6_FLAG) == 0) {
+            storeTimestampPosition = 
MessageDecoder.MESSAGE_STORE_TIMESTAMP_POSITION;
+        } else {
+            // v6 address is 12 byte larger than v4
+            storeTimestampPosition = 
MessageDecoder.MESSAGE_STORE_TIMESTAMP_POSITION + 12;
+        }
+
+        long storeTimestamp = byteBuffer.getLong(DLedgerEntry.BODY_OFFSET + 
storeTimestampPosition);
+        if (storeTimestamp == 0) {
+            return false;
+        }
+
+        if 
(this.defaultMessageStore.getMessageStoreConfig().isMessageIndexEnable()
+                && 
this.defaultMessageStore.getMessageStoreConfig().isMessageIndexSafe()) {
+            if (storeTimestamp <= 
this.defaultMessageStore.getStoreCheckpoint().getMinTimestampIndex()) {
+                log.info("dledger find check timestamp, {} {}",
+                    storeTimestamp,
+                    UtilAll.timeMillisToHumanString(storeTimestamp));
+                return true;
+            }
+        } else {
+            if (storeTimestamp <= 
this.defaultMessageStore.getStoreCheckpoint().getMinTimestamp()) {
+                log.info("dledger find check timestamp, {} {}",
+                    storeTimestamp,
+                    UtilAll.timeMillisToHumanString(storeTimestamp));
+                return true;
+            }
+        }
+
+        return false;
+
+    }
+
     @Override
     public void recoverNormally(long maxPhyOffsetOfConsumeQueue) throws 
RocksDBException {
-        recover(maxPhyOffsetOfConsumeQueue);
+        dledgerRecoverNormally(maxPhyOffsetOfConsumeQueue);
     }
 
     @Override
     public void recoverAbnormally(long maxPhyOffsetOfConsumeQueue) throws 
RocksDBException {
-        recover(maxPhyOffsetOfConsumeQueue);
+        dledgerRecoverAbnormally(maxPhyOffsetOfConsumeQueue);
     }
 
     @Override
diff --git 
a/store/src/test/java/org/apache/rocketmq/store/dledger/DLedgerCommitlogTest.java
 
b/store/src/test/java/org/apache/rocketmq/store/dledger/DLedgerCommitlogTest.java
index 234273b6af..1e4bbf21bd 100644
--- 
a/store/src/test/java/org/apache/rocketmq/store/dledger/DLedgerCommitlogTest.java
+++ 
b/store/src/test/java/org/apache/rocketmq/store/dledger/DLedgerCommitlogTest.java
@@ -19,6 +19,8 @@ package org.apache.rocketmq.store.dledger;
 import io.openmessaging.storage.dledger.DLedgerServer;
 import io.openmessaging.storage.dledger.store.file.DLedgerMmapFileStore;
 import io.openmessaging.storage.dledger.store.file.MmapFileList;
+
+import java.io.File;
 import java.nio.ByteBuffer;
 import java.time.Duration;
 import java.util.ArrayList;
@@ -36,6 +38,8 @@ import org.apache.rocketmq.store.GetMessageResult;
 import org.apache.rocketmq.store.GetMessageStatus;
 import org.apache.rocketmq.store.PutMessageResult;
 import org.apache.rocketmq.store.PutMessageStatus;
+import org.apache.rocketmq.store.StoreCheckpoint;
+import org.apache.rocketmq.store.config.StorePathConfigHelper;
 import org.junit.Assert;
 import org.junit.Test;
 import org.junit.Assume;
@@ -146,6 +150,42 @@ public class DLedgerCommitlogTest extends 
MessageStoreTestBase {
             messageStore.shutdown();
         }
     }
+    @Test
+    public void testDLedgerAbnormallyRecover() throws Exception {
+        String base = createBaseDir();
+        String peers = String.format("n0-localhost:%d", nextPort());
+        String group = UUID.randomUUID().toString();
+        String topic = UUID.randomUUID().toString();
+
+        int messageNumPerQueue = 100;
+
+        DefaultMessageStore messageStore = createDledgerMessageStore(base, 
group, "n0", peers, null, false, 0);
+        Thread.sleep(1000);
+        doPutMessages(messageStore, topic, 0, messageNumPerQueue, 0);
+        doPutMessages(messageStore, topic, 1, messageNumPerQueue, 0);
+        Thread.sleep(1000);
+        Assert.assertEquals(0, messageStore.getMinOffsetInQueue(topic, 0));
+        Assert.assertEquals(messageNumPerQueue, 
messageStore.getMaxOffsetInQueue(topic, 0));
+        Assert.assertEquals(0, messageStore.dispatchBehindBytes());
+        doGetMessages(messageStore, topic, 0, messageNumPerQueue, 0);
+        StoreCheckpoint storeCheckpoint = messageStore.getStoreCheckpoint();
+        storeCheckpoint.setPhysicMsgTimestamp(0);
+        storeCheckpoint.setLogicsMsgTimestamp(0);
+        messageStore.shutdown();
+
+        String fileName = StorePathConfigHelper.getAbortFile(base);
+        makeSureFileExists(fileName);
+
+        File file = new File(base + File.separator + "consumequeue" + 
File.separator + topic + File.separator + "0" + File.separator + 
"00000000000000001040");
+        file.delete();
+//        truncateAllConsumeQueue(base + File.separator + "consumequeue" + 
File.separator + topic + File.separator);
+        messageStore = createDledgerMessageStore(base, group, "n0", peers, 
null, false, 0);
+        Thread.sleep(1000);
+        doGetMessages(messageStore, topic, 0, messageNumPerQueue, 0);
+        doGetMessages(messageStore, topic, 1, messageNumPerQueue, 0);
+        messageStore.shutdown();
+
+    }
 
     @Test
     public void testPutAndGetMessage() throws Exception {

Reply via email to