This is an automated email from the ASF dual-hosted git repository.

lizhimin pushed a commit to branch develop
in repository https://gitbox.apache.org/repos/asf/rocketmq.git


The following commit(s) were added to refs/heads/develop by this push:
     new d60198f621 [ISSUE #8239] Fix the issue of potential message loss after 
a crash under synchronous disk flushing configuration. (#8240)
d60198f621 is described below

commit d60198f621baac54bffb981d7a797a04dfa0bb5a
Author: rongtong <jinrongto...@163.com>
AuthorDate: Fri Jun 7 13:44:13 2024 +0800

    [ISSUE #8239] Fix the issue of potential message loss after a crash under 
synchronous disk flushing configuration. (#8240)
---
 .../java/org/apache/rocketmq/store/CommitLog.java  |   2 +-
 .../apache/rocketmq/store/DefaultMessageStore.java |  12 +-
 .../rocketmq/store/config/MessageStoreConfig.java  |  15 ++-
 .../rocketmq/store/ReputMessageServiceTest.java    | 148 +++++++++++++++++++++
 4 files changed, 171 insertions(+), 6 deletions(-)

diff --git a/store/src/main/java/org/apache/rocketmq/store/CommitLog.java 
b/store/src/main/java/org/apache/rocketmq/store/CommitLog.java
index 1174eca1ba..c2150d7a32 100644
--- a/store/src/main/java/org/apache/rocketmq/store/CommitLog.java
+++ b/store/src/main/java/org/apache/rocketmq/store/CommitLog.java
@@ -651,7 +651,7 @@ public class CommitLog implements Swappable {
         } else if 
(this.defaultMessageStore.getMessageStoreConfig().isDuplicationEnable()) {
             return this.confirmOffset;
         } else {
-            return getMaxOffset();
+            return this.defaultMessageStore.isSyncDiskFlush()  ? 
getFlushedWhere() : getMaxOffset();
         }
     }
 
diff --git 
a/store/src/main/java/org/apache/rocketmq/store/DefaultMessageStore.java 
b/store/src/main/java/org/apache/rocketmq/store/DefaultMessageStore.java
index 97833351d1..a901e850ed 100644
--- a/store/src/main/java/org/apache/rocketmq/store/DefaultMessageStore.java
+++ b/store/src/main/java/org/apache/rocketmq/store/DefaultMessageStore.java
@@ -2814,7 +2814,11 @@ public class DefaultMessageStore implements MessageStore 
{
         }
 
         public boolean isCommitLogAvailable() {
-            return this.reputFromOffset < 
DefaultMessageStore.this.getConfirmOffset();
+            return this.reputFromOffset < getReputEndOffset();
+        }
+
+        protected long getReputEndOffset() {
+            return 
DefaultMessageStore.this.getMessageStoreConfig().isReadUnCommitted() ? 
DefaultMessageStore.this.commitLog.getMaxOffset() : 
DefaultMessageStore.this.commitLog.getConfirmOffset();
         }
 
         public void doReput() {
@@ -2834,12 +2838,12 @@ public class DefaultMessageStore implements 
MessageStore {
                 try {
                     this.reputFromOffset = result.getStartOffset();
 
-                    for (int readSize = 0; readSize < result.getSize() && 
reputFromOffset < DefaultMessageStore.this.getConfirmOffset() && doNext; ) {
+                    for (int readSize = 0; readSize < result.getSize() && 
reputFromOffset < getReputEndOffset() && doNext; ) {
                         DispatchRequest dispatchRequest =
                             
DefaultMessageStore.this.commitLog.checkMessageAndReturnSize(result.getByteBuffer(),
 false, false, false);
                         int size = dispatchRequest.getBufferSize() == -1 ? 
dispatchRequest.getMsgSize() : dispatchRequest.getBufferSize();
 
-                        if (reputFromOffset + size > 
DefaultMessageStore.this.getConfirmOffset()) {
+                        if (reputFromOffset + size > getReputEndOffset()) {
                             doNext = false;
                             break;
                         }
@@ -3127,7 +3131,7 @@ public class DefaultMessageStore implements MessageStore {
                 try {
                     this.reputFromOffset = result.getStartOffset();
 
-                    for (int readSize = 0; readSize < result.getSize() && 
reputFromOffset < DefaultMessageStore.this.getConfirmOffset() && doNext; ) {
+                    for (int readSize = 0; readSize < result.getSize() && 
reputFromOffset < getReputEndOffset() && doNext; ) {
                         ByteBuffer byteBuffer = result.getByteBuffer();
 
                         int totalSize = 
preCheckMessageAndReturnSize(byteBuffer);
diff --git 
a/store/src/main/java/org/apache/rocketmq/store/config/MessageStoreConfig.java 
b/store/src/main/java/org/apache/rocketmq/store/config/MessageStoreConfig.java
index 9afc02a0c9..0060b144cf 100644
--- 
a/store/src/main/java/org/apache/rocketmq/store/config/MessageStoreConfig.java
+++ 
b/store/src/main/java/org/apache/rocketmq/store/config/MessageStoreConfig.java
@@ -413,6 +413,12 @@ public class MessageStoreConfig {
 
     private int topicQueueLockNum = 32;
 
+    /**
+     * If readUnCommitted is true, the dispatch of the consume queue will 
exceed the confirmOffset, which may cause the client to read uncommitted 
messages.
+     * For example, reput offset exceeding the flush offset during synchronous 
disk flushing.
+     */
+    private boolean readUnCommitted = false;
+
     public boolean isEnabledAppendPropCRC() {
         return enabledAppendPropCRC;
     }
@@ -672,7 +678,6 @@ public class MessageStoreConfig {
         this.forceVerifyPropCRC = forceVerifyPropCRC;
     }
 
-
     public String getStorePathCommitLog() {
         if (storePathCommitLog == null) {
             return storePathRootDir + File.separator + "commitlog";
@@ -1819,4 +1824,12 @@ public class MessageStoreConfig {
     public void setTopicQueueLockNum(int topicQueueLockNum) {
         this.topicQueueLockNum = topicQueueLockNum;
     }
+
+    public boolean isReadUnCommitted() {
+        return readUnCommitted;
+    }
+
+    public void setReadUnCommitted(boolean readUnCommitted) {
+        this.readUnCommitted = readUnCommitted;
+    }
 }
diff --git 
a/store/src/test/java/org/apache/rocketmq/store/ReputMessageServiceTest.java 
b/store/src/test/java/org/apache/rocketmq/store/ReputMessageServiceTest.java
new file mode 100644
index 0000000000..d1ce095333
--- /dev/null
+++ b/store/src/test/java/org/apache/rocketmq/store/ReputMessageServiceTest.java
@@ -0,0 +1,148 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.rocketmq.store;
+
+import java.lang.reflect.Field;
+import java.net.InetAddress;
+import java.net.InetSocketAddress;
+import java.net.SocketAddress;
+import java.util.UUID;
+import java.io.File;
+import java.util.concurrent.CompletableFuture;
+import org.apache.commons.lang3.StringUtils;
+import org.apache.rocketmq.common.BrokerConfig;
+import org.apache.rocketmq.common.UtilAll;
+import org.apache.rocketmq.common.message.MessageDecoder;
+
+import org.apache.rocketmq.common.message.MessageExt;
+import org.apache.rocketmq.common.message.MessageExtBrokerInner;
+import org.apache.rocketmq.store.config.FlushDiskType;
+import org.apache.rocketmq.store.config.MessageStoreConfig;
+import org.apache.rocketmq.store.stats.BrokerStatsManager;
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Test;
+
+import static org.junit.Assert.assertTrue;
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.when;
+import static org.mockito.ArgumentMatchers.any;
+
+public class ReputMessageServiceTest {
+    private DefaultMessageStore syncFlushMessageStore;
+    private DefaultMessageStore asyncFlushMessageStore;
+    private final String topic = "FooBar";
+    private final String tmpdir = System.getProperty("java.io.tmpdir");
+    private final String storePathRootParentDir = 
(StringUtils.endsWith(tmpdir, File.separator) ? tmpdir : tmpdir + 
File.separator) + UUID.randomUUID();
+    private SocketAddress bornHost;
+    private SocketAddress storeHost;
+
+    @Before
+    public void init() throws Exception {
+        File file = new File(storePathRootParentDir);
+        UtilAll.deleteFile(file);
+        syncFlushMessageStore = buildMessageStore(FlushDiskType.SYNC_FLUSH);
+        asyncFlushMessageStore = buildMessageStore(FlushDiskType.ASYNC_FLUSH);
+        assertTrue(syncFlushMessageStore.load());
+        assertTrue(asyncFlushMessageStore.load());
+        syncFlushMessageStore.start();
+        asyncFlushMessageStore.start();
+        storeHost = new InetSocketAddress(InetAddress.getLocalHost(), 8123);
+        bornHost = new InetSocketAddress(InetAddress.getByName("127.0.0.1"), 
0);
+    }
+
+    private DefaultMessageStore buildMessageStore(FlushDiskType flushDiskType) 
throws Exception {
+        MessageStoreConfig messageStoreConfig = new MessageStoreConfig();
+        messageStoreConfig.setHaListenPort(0);
+        messageStoreConfig.setFlushDiskType(flushDiskType);
+        messageStoreConfig.setStorePathRootDir(storePathRootParentDir + 
File.separator + flushDiskType);
+        messageStoreConfig.setStorePathCommitLog(storePathRootParentDir + 
File.separator + flushDiskType + File.separator + "commitlog");
+        BrokerConfig brokerConfig = new BrokerConfig();
+        brokerConfig.setLongPollingEnable(false);
+        DefaultMessageStore messageStore = new 
DefaultMessageStore(messageStoreConfig, mock(BrokerStatsManager.class), null, 
brokerConfig, null);
+        // Mock flush disk service
+        Field field = CommitLog.class.getDeclaredField("flushManager");
+        field.setAccessible(true);
+        FlushManager flushManager = mock(FlushManager.class);
+        CompletableFuture<PutMessageStatus> completableFuture = new 
CompletableFuture<>();
+        completableFuture.complete(PutMessageStatus.PUT_OK);
+        when(flushManager.handleDiskFlush(any(AppendMessageResult.class), 
any(MessageExt.class))).thenReturn(completableFuture);
+        field.set(messageStore.getCommitLog(), flushManager);
+        return messageStore;
+    }
+
+    @Test
+    public void testReputEndOffset_whenSyncFlush() throws Exception {
+        for (int i = 0; i < 10; i++) {
+            assertEquals(PutMessageStatus.PUT_OK, 
syncFlushMessageStore.putMessage(buildMessage()).getPutMessageStatus());
+        }
+        assertEquals(1580, syncFlushMessageStore.getMaxPhyOffset());
+        assertEquals(0, 
syncFlushMessageStore.getCommitLog().getFlushedWhere());
+        // wait for cq dispatch
+        Thread.sleep(3000);
+        assertEquals(0, 
syncFlushMessageStore.getCommitLog().getFlushedWhere());
+        assertEquals(0, syncFlushMessageStore.getMaxOffsetInQueue(topic, 0));
+        GetMessageResult getMessageResult = 
syncFlushMessageStore.getMessage("testGroup", topic, 0, 0, 32, null);
+        assertEquals(GetMessageStatus.NO_MESSAGE_IN_QUEUE, 
getMessageResult.getStatus());
+    }
+
+    @Test
+    public void testReputEndOffset_whenAsyncFlush() throws Exception {
+        for (int i = 0; i < 10; i++) {
+            assertEquals(PutMessageStatus.PUT_OK, 
asyncFlushMessageStore.putMessage(buildMessage()).getPutMessageStatus());
+        }
+        assertEquals(1580, asyncFlushMessageStore.getMaxPhyOffset());
+        assertEquals(0, 
asyncFlushMessageStore.getCommitLog().getFlushedWhere());
+        // wait for cq dispatch
+        Thread.sleep(3000);
+        assertEquals(0, 
asyncFlushMessageStore.getCommitLog().getFlushedWhere());
+        assertEquals(10, asyncFlushMessageStore.getMaxOffsetInQueue(topic, 0));
+        GetMessageResult getMessageResult = 
asyncFlushMessageStore.getMessage("testGroup", topic, 0, 0, 32, null);
+        assertEquals(10, getMessageResult.getMessageCount());
+    }
+
+    private MessageExtBrokerInner buildMessage() {
+        MessageExtBrokerInner msg = new MessageExtBrokerInner();
+        msg.setTopic(topic);
+        msg.setTags("TAG1");
+        msg.setBody("Once, there was a chance for me!".getBytes());
+        msg.setKeys(String.valueOf(System.currentTimeMillis()));
+        msg.setQueueId(0);
+        msg.setSysFlag(0);
+        msg.setBornTimestamp(System.currentTimeMillis());
+        msg.setStoreHost(storeHost);
+        msg.setBornHost(bornHost);
+        
msg.setPropertiesString(MessageDecoder.messageProperties2String(msg.getProperties()));
+        return msg;
+    }
+
+    @After
+    public void destroy() throws Exception {
+        if (this.syncFlushMessageStore != null) {
+            syncFlushMessageStore.shutdown();
+            syncFlushMessageStore.destroy();
+        }
+        if (this.asyncFlushMessageStore != null) {
+            asyncFlushMessageStore.shutdown();
+            asyncFlushMessageStore.destroy();
+        }
+        File file = new File(storePathRootParentDir);
+        UtilAll.deleteFile(file);
+    }
+}

Reply via email to