This is an automated email from the ASF dual-hosted git repository.

lizhimin pushed a commit to branch develop
in repository https://gitbox.apache.org/repos/asf/rocketmq.git


The following commit(s) were added to refs/heads/develop by this push:
     new cee19630ad [ISSUE #9241] RocksDBConsumeQueueStore do not need to 
update StoreCheckpoint (#9242)
cee19630ad is described below

commit cee19630ad1afd036bca54c84f40f1c0d752f800
Author: qianye <wuxingcan....@alibaba-inc.com>
AuthorDate: Mon Mar 17 17:30:48 2025 +0800

    [ISSUE #9241] RocksDBConsumeQueueStore do not need to update 
StoreCheckpoint (#9242)
---
 .../rocketmq/store/dledger/DLedgerCommitLog.java   | 91 ++++++++++++----------
 .../store/queue/RocksDBConsumeQueueStore.java      |  9 ---
 2 files changed, 48 insertions(+), 52 deletions(-)

diff --git 
a/store/src/main/java/org/apache/rocketmq/store/dledger/DLedgerCommitLog.java 
b/store/src/main/java/org/apache/rocketmq/store/dledger/DLedgerCommitLog.java
index 5f4ef08374..29be9e7c61 100644
--- 
a/store/src/main/java/org/apache/rocketmq/store/dledger/DLedgerCommitLog.java
+++ 
b/store/src/main/java/org/apache/rocketmq/store/dledger/DLedgerCommitLog.java
@@ -16,13 +16,26 @@
  */
 package org.apache.rocketmq.store.dledger;
 
+import io.openmessaging.storage.dledger.AppendFuture;
+import io.openmessaging.storage.dledger.BatchAppendFuture;
+import io.openmessaging.storage.dledger.DLedgerConfig;
+import io.openmessaging.storage.dledger.DLedgerServer;
+import io.openmessaging.storage.dledger.entry.DLedgerEntry;
+import io.openmessaging.storage.dledger.protocol.AppendEntryRequest;
+import io.openmessaging.storage.dledger.protocol.AppendEntryResponse;
+import io.openmessaging.storage.dledger.protocol.BatchAppendEntryRequest;
+import io.openmessaging.storage.dledger.protocol.DLedgerResponseCode;
+import io.openmessaging.storage.dledger.store.file.DLedgerMmapFileStore;
+import io.openmessaging.storage.dledger.store.file.MmapFile;
+import io.openmessaging.storage.dledger.store.file.MmapFileList;
+import io.openmessaging.storage.dledger.store.file.SelectMmapBufferResult;
+import io.openmessaging.storage.dledger.utils.DLedgerUtils;
 import java.net.Inet6Address;
 import java.net.InetSocketAddress;
 import java.nio.ByteBuffer;
 import java.util.LinkedList;
 import java.util.List;
 import java.util.concurrent.CompletableFuture;
-
 import org.apache.rocketmq.common.UtilAll;
 import org.apache.rocketmq.common.message.MessageDecoder;
 import org.apache.rocketmq.common.message.MessageExtBatch;
@@ -43,21 +56,6 @@ import org.apache.rocketmq.store.config.MessageStoreConfig;
 import org.apache.rocketmq.store.logfile.MappedFile;
 import org.rocksdb.RocksDBException;
 
-import io.openmessaging.storage.dledger.AppendFuture;
-import io.openmessaging.storage.dledger.BatchAppendFuture;
-import io.openmessaging.storage.dledger.DLedgerConfig;
-import io.openmessaging.storage.dledger.DLedgerServer;
-import io.openmessaging.storage.dledger.entry.DLedgerEntry;
-import io.openmessaging.storage.dledger.protocol.AppendEntryRequest;
-import io.openmessaging.storage.dledger.protocol.AppendEntryResponse;
-import io.openmessaging.storage.dledger.protocol.BatchAppendEntryRequest;
-import io.openmessaging.storage.dledger.protocol.DLedgerResponseCode;
-import io.openmessaging.storage.dledger.store.file.DLedgerMmapFileStore;
-import io.openmessaging.storage.dledger.store.file.MmapFile;
-import io.openmessaging.storage.dledger.store.file.MmapFileList;
-import io.openmessaging.storage.dledger.store.file.SelectMmapBufferResult;
-import io.openmessaging.storage.dledger.utils.DLedgerUtils;
-
 /**
  * Store all metadata downtime for recovery, data protection reliability
  */
@@ -428,7 +426,7 @@ public class DLedgerCommitLog extends CommitLog {
         log.info("Will set the initial commitlog offset={} for dledger", 
dividedCommitlogOffset);
     }
 
-    private boolean isMmapFileMatchedRecover(final MmapFile mmapFile) {
+    private boolean isMmapFileMatchedRecover(final MmapFile mmapFile) throws 
RocksDBException {
         ByteBuffer byteBuffer = mmapFile.sliceByteBuffer();
 
         int magicCode = byteBuffer.getInt(DLedgerEntry.BODY_OFFSET + 
MessageDecoder.MESSAGE_MAGIC_CODE_POSITION);
@@ -436,39 +434,46 @@ public class DLedgerCommitLog extends CommitLog {
             return false;
         }
 
-        int storeTimestampPosition;
-        int sysFlag = byteBuffer.getInt(DLedgerEntry.BODY_OFFSET + 
MessageDecoder.SYSFLAG_POSITION);
-        if ((sysFlag & MessageSysFlag.BORNHOST_V6_FLAG) == 0) {
-            storeTimestampPosition = 
MessageDecoder.MESSAGE_STORE_TIMESTAMP_POSITION;
+        if 
(this.defaultMessageStore.getMessageStoreConfig().isEnableRocksDBStore()) {
+            final long maxPhyOffsetInConsumeQueue = 
this.defaultMessageStore.getQueueStore().getMaxPhyOffsetInConsumeQueue();
+            long phyOffset = byteBuffer.getLong(DLedgerEntry.BODY_OFFSET + 
MessageDecoder.MESSAGE_PHYSIC_OFFSET_POSITION);
+            if (phyOffset <= maxPhyOffsetInConsumeQueue) {
+                log.info("find check. beginPhyOffset: {}, 
maxPhyOffsetInConsumeQueue: {}", phyOffset, maxPhyOffsetInConsumeQueue);
+                return true;
+            }
         } else {
-            // v6 address is 12 byte larger than v4
-            storeTimestampPosition = 
MessageDecoder.MESSAGE_STORE_TIMESTAMP_POSITION + 12;
-        }
+            int storeTimestampPosition;
+            int sysFlag = byteBuffer.getInt(DLedgerEntry.BODY_OFFSET + 
MessageDecoder.SYSFLAG_POSITION);
+            if ((sysFlag & MessageSysFlag.BORNHOST_V6_FLAG) == 0) {
+                storeTimestampPosition = 
MessageDecoder.MESSAGE_STORE_TIMESTAMP_POSITION;
+            } else {
+                // v6 address is 12 byte larger than v4
+                storeTimestampPosition = 
MessageDecoder.MESSAGE_STORE_TIMESTAMP_POSITION + 12;
+            }
 
-        long storeTimestamp = byteBuffer.getLong(DLedgerEntry.BODY_OFFSET + 
storeTimestampPosition);
-        if (storeTimestamp == 0) {
-            return false;
-        }
+            long storeTimestamp = byteBuffer.getLong(DLedgerEntry.BODY_OFFSET 
+ storeTimestampPosition);
+            if (storeTimestamp == 0) {
+                return false;
+            }
 
-        if 
(this.defaultMessageStore.getMessageStoreConfig().isMessageIndexEnable()
+            if 
(this.defaultMessageStore.getMessageStoreConfig().isMessageIndexEnable()
                 && 
this.defaultMessageStore.getMessageStoreConfig().isMessageIndexSafe()) {
-            if (storeTimestamp <= 
this.defaultMessageStore.getStoreCheckpoint().getMinTimestampIndex()) {
-                log.info("dledger find check timestamp, {} {}",
-                    storeTimestamp,
-                    UtilAll.timeMillisToHumanString(storeTimestamp));
-                return true;
-            }
-        } else {
-            if (storeTimestamp <= 
this.defaultMessageStore.getStoreCheckpoint().getMinTimestamp()) {
-                log.info("dledger find check timestamp, {} {}",
-                    storeTimestamp,
-                    UtilAll.timeMillisToHumanString(storeTimestamp));
-                return true;
+                if (storeTimestamp <= 
this.defaultMessageStore.getStoreCheckpoint().getMinTimestampIndex()) {
+                    log.info("dledger find check timestamp, {} {}",
+                        storeTimestamp,
+                        UtilAll.timeMillisToHumanString(storeTimestamp));
+                    return true;
+                }
+            } else {
+                if (storeTimestamp <= 
this.defaultMessageStore.getStoreCheckpoint().getMinTimestamp()) {
+                    log.info("dledger find check timestamp, {} {}",
+                        storeTimestamp,
+                        UtilAll.timeMillisToHumanString(storeTimestamp));
+                    return true;
+                }
             }
         }
-
         return false;
-
     }
 
     @Override
diff --git 
a/store/src/main/java/org/apache/rocketmq/store/queue/RocksDBConsumeQueueStore.java
 
b/store/src/main/java/org/apache/rocketmq/store/queue/RocksDBConsumeQueueStore.java
index 7e3aa70d02..94ed0c926a 100644
--- 
a/store/src/main/java/org/apache/rocketmq/store/queue/RocksDBConsumeQueueStore.java
+++ 
b/store/src/main/java/org/apache/rocketmq/store/queue/RocksDBConsumeQueueStore.java
@@ -29,7 +29,6 @@ import java.util.concurrent.ConcurrentMap;
 import java.util.concurrent.Executors;
 import java.util.concurrent.ScheduledExecutorService;
 import java.util.concurrent.TimeUnit;
-
 import java.util.concurrent.atomic.AtomicReference;
 import javax.annotation.Nonnull;
 import org.apache.commons.io.FileUtils;
@@ -46,7 +45,6 @@ import org.apache.rocketmq.logging.org.slf4j.Logger;
 import org.apache.rocketmq.logging.org.slf4j.LoggerFactory;
 import org.apache.rocketmq.store.DefaultMessageStore;
 import org.apache.rocketmq.store.DispatchRequest;
-import org.apache.rocketmq.store.config.BrokerRole;
 import org.apache.rocketmq.store.config.StorePathConfigHelper;
 import org.apache.rocketmq.store.exception.ConsumeQueueException;
 import org.apache.rocketmq.store.exception.StoreException;
@@ -265,13 +263,6 @@ public class RocksDBConsumeQueueStore extends 
AbstractConsumeQueueStore {
             this.rocksDBStorage.batchPut(writeBatch);
 
             
this.rocksDBConsumeQueueOffsetTable.putHeapMaxCqOffset(tempTopicQueueMaxOffsetMap);
-
-            long storeTimeStamp = requests.get(size - 1).getStoreTimestamp();
-            if (this.messageStore.getMessageStoreConfig().getBrokerRole() == 
BrokerRole.SLAVE
-                || 
this.messageStore.getMessageStoreConfig().isEnableDLegerCommitLog()) {
-                
this.messageStore.getStoreCheckpoint().setPhysicMsgTimestamp(storeTimeStamp);
-            }
-            
this.messageStore.getStoreCheckpoint().setLogicsMsgTimestamp(storeTimeStamp);
             notifyMessageArriveAndClear(requests);
             return true;
         } catch (Exception e) {

Reply via email to