This is an automated email from the ASF dual-hosted git repository. lizhimin pushed a commit to branch develop in repository https://gitbox.apache.org/repos/asf/rocketmq.git
The following commit(s) were added to refs/heads/develop by this push: new cee19630ad [ISSUE #9241] RocksDBConsumeQueueStore do not need to update StoreCheckpoint (#9242) cee19630ad is described below commit cee19630ad1afd036bca54c84f40f1c0d752f800 Author: qianye <wuxingcan....@alibaba-inc.com> AuthorDate: Mon Mar 17 17:30:48 2025 +0800 [ISSUE #9241] RocksDBConsumeQueueStore do not need to update StoreCheckpoint (#9242) --- .../rocketmq/store/dledger/DLedgerCommitLog.java | 91 ++++++++++++---------- .../store/queue/RocksDBConsumeQueueStore.java | 9 --- 2 files changed, 48 insertions(+), 52 deletions(-) diff --git a/store/src/main/java/org/apache/rocketmq/store/dledger/DLedgerCommitLog.java b/store/src/main/java/org/apache/rocketmq/store/dledger/DLedgerCommitLog.java index 5f4ef08374..29be9e7c61 100644 --- a/store/src/main/java/org/apache/rocketmq/store/dledger/DLedgerCommitLog.java +++ b/store/src/main/java/org/apache/rocketmq/store/dledger/DLedgerCommitLog.java @@ -16,13 +16,26 @@ */ package org.apache.rocketmq.store.dledger; +import io.openmessaging.storage.dledger.AppendFuture; +import io.openmessaging.storage.dledger.BatchAppendFuture; +import io.openmessaging.storage.dledger.DLedgerConfig; +import io.openmessaging.storage.dledger.DLedgerServer; +import io.openmessaging.storage.dledger.entry.DLedgerEntry; +import io.openmessaging.storage.dledger.protocol.AppendEntryRequest; +import io.openmessaging.storage.dledger.protocol.AppendEntryResponse; +import io.openmessaging.storage.dledger.protocol.BatchAppendEntryRequest; +import io.openmessaging.storage.dledger.protocol.DLedgerResponseCode; +import io.openmessaging.storage.dledger.store.file.DLedgerMmapFileStore; +import io.openmessaging.storage.dledger.store.file.MmapFile; +import io.openmessaging.storage.dledger.store.file.MmapFileList; +import io.openmessaging.storage.dledger.store.file.SelectMmapBufferResult; +import io.openmessaging.storage.dledger.utils.DLedgerUtils; import java.net.Inet6Address; import java.net.InetSocketAddress; import java.nio.ByteBuffer; import java.util.LinkedList; import java.util.List; import java.util.concurrent.CompletableFuture; - import org.apache.rocketmq.common.UtilAll; import org.apache.rocketmq.common.message.MessageDecoder; import org.apache.rocketmq.common.message.MessageExtBatch; @@ -43,21 +56,6 @@ import org.apache.rocketmq.store.config.MessageStoreConfig; import org.apache.rocketmq.store.logfile.MappedFile; import org.rocksdb.RocksDBException; -import io.openmessaging.storage.dledger.AppendFuture; -import io.openmessaging.storage.dledger.BatchAppendFuture; -import io.openmessaging.storage.dledger.DLedgerConfig; -import io.openmessaging.storage.dledger.DLedgerServer; -import io.openmessaging.storage.dledger.entry.DLedgerEntry; -import io.openmessaging.storage.dledger.protocol.AppendEntryRequest; -import io.openmessaging.storage.dledger.protocol.AppendEntryResponse; -import io.openmessaging.storage.dledger.protocol.BatchAppendEntryRequest; -import io.openmessaging.storage.dledger.protocol.DLedgerResponseCode; -import io.openmessaging.storage.dledger.store.file.DLedgerMmapFileStore; -import io.openmessaging.storage.dledger.store.file.MmapFile; -import io.openmessaging.storage.dledger.store.file.MmapFileList; -import io.openmessaging.storage.dledger.store.file.SelectMmapBufferResult; -import io.openmessaging.storage.dledger.utils.DLedgerUtils; - /** * Store all metadata downtime for recovery, data protection reliability */ @@ -428,7 +426,7 @@ public class DLedgerCommitLog extends CommitLog { log.info("Will set the initial commitlog offset={} for dledger", dividedCommitlogOffset); } - private boolean isMmapFileMatchedRecover(final MmapFile mmapFile) { + private boolean isMmapFileMatchedRecover(final MmapFile mmapFile) throws RocksDBException { ByteBuffer byteBuffer = mmapFile.sliceByteBuffer(); int magicCode = byteBuffer.getInt(DLedgerEntry.BODY_OFFSET + MessageDecoder.MESSAGE_MAGIC_CODE_POSITION); @@ -436,39 +434,46 @@ public class DLedgerCommitLog extends CommitLog { return false; } - int storeTimestampPosition; - int sysFlag = byteBuffer.getInt(DLedgerEntry.BODY_OFFSET + MessageDecoder.SYSFLAG_POSITION); - if ((sysFlag & MessageSysFlag.BORNHOST_V6_FLAG) == 0) { - storeTimestampPosition = MessageDecoder.MESSAGE_STORE_TIMESTAMP_POSITION; + if (this.defaultMessageStore.getMessageStoreConfig().isEnableRocksDBStore()) { + final long maxPhyOffsetInConsumeQueue = this.defaultMessageStore.getQueueStore().getMaxPhyOffsetInConsumeQueue(); + long phyOffset = byteBuffer.getLong(DLedgerEntry.BODY_OFFSET + MessageDecoder.MESSAGE_PHYSIC_OFFSET_POSITION); + if (phyOffset <= maxPhyOffsetInConsumeQueue) { + log.info("find check. beginPhyOffset: {}, maxPhyOffsetInConsumeQueue: {}", phyOffset, maxPhyOffsetInConsumeQueue); + return true; + } } else { - // v6 address is 12 byte larger than v4 - storeTimestampPosition = MessageDecoder.MESSAGE_STORE_TIMESTAMP_POSITION + 12; - } + int storeTimestampPosition; + int sysFlag = byteBuffer.getInt(DLedgerEntry.BODY_OFFSET + MessageDecoder.SYSFLAG_POSITION); + if ((sysFlag & MessageSysFlag.BORNHOST_V6_FLAG) == 0) { + storeTimestampPosition = MessageDecoder.MESSAGE_STORE_TIMESTAMP_POSITION; + } else { + // v6 address is 12 byte larger than v4 + storeTimestampPosition = MessageDecoder.MESSAGE_STORE_TIMESTAMP_POSITION + 12; + } - long storeTimestamp = byteBuffer.getLong(DLedgerEntry.BODY_OFFSET + storeTimestampPosition); - if (storeTimestamp == 0) { - return false; - } + long storeTimestamp = byteBuffer.getLong(DLedgerEntry.BODY_OFFSET + storeTimestampPosition); + if (storeTimestamp == 0) { + return false; + } - if (this.defaultMessageStore.getMessageStoreConfig().isMessageIndexEnable() + if (this.defaultMessageStore.getMessageStoreConfig().isMessageIndexEnable() && this.defaultMessageStore.getMessageStoreConfig().isMessageIndexSafe()) { - if (storeTimestamp <= this.defaultMessageStore.getStoreCheckpoint().getMinTimestampIndex()) { - log.info("dledger find check timestamp, {} {}", - storeTimestamp, - UtilAll.timeMillisToHumanString(storeTimestamp)); - return true; - } - } else { - if (storeTimestamp <= this.defaultMessageStore.getStoreCheckpoint().getMinTimestamp()) { - log.info("dledger find check timestamp, {} {}", - storeTimestamp, - UtilAll.timeMillisToHumanString(storeTimestamp)); - return true; + if (storeTimestamp <= this.defaultMessageStore.getStoreCheckpoint().getMinTimestampIndex()) { + log.info("dledger find check timestamp, {} {}", + storeTimestamp, + UtilAll.timeMillisToHumanString(storeTimestamp)); + return true; + } + } else { + if (storeTimestamp <= this.defaultMessageStore.getStoreCheckpoint().getMinTimestamp()) { + log.info("dledger find check timestamp, {} {}", + storeTimestamp, + UtilAll.timeMillisToHumanString(storeTimestamp)); + return true; + } } } - return false; - } @Override diff --git a/store/src/main/java/org/apache/rocketmq/store/queue/RocksDBConsumeQueueStore.java b/store/src/main/java/org/apache/rocketmq/store/queue/RocksDBConsumeQueueStore.java index 7e3aa70d02..94ed0c926a 100644 --- a/store/src/main/java/org/apache/rocketmq/store/queue/RocksDBConsumeQueueStore.java +++ b/store/src/main/java/org/apache/rocketmq/store/queue/RocksDBConsumeQueueStore.java @@ -29,7 +29,6 @@ import java.util.concurrent.ConcurrentMap; import java.util.concurrent.Executors; import java.util.concurrent.ScheduledExecutorService; import java.util.concurrent.TimeUnit; - import java.util.concurrent.atomic.AtomicReference; import javax.annotation.Nonnull; import org.apache.commons.io.FileUtils; @@ -46,7 +45,6 @@ import org.apache.rocketmq.logging.org.slf4j.Logger; import org.apache.rocketmq.logging.org.slf4j.LoggerFactory; import org.apache.rocketmq.store.DefaultMessageStore; import org.apache.rocketmq.store.DispatchRequest; -import org.apache.rocketmq.store.config.BrokerRole; import org.apache.rocketmq.store.config.StorePathConfigHelper; import org.apache.rocketmq.store.exception.ConsumeQueueException; import org.apache.rocketmq.store.exception.StoreException; @@ -265,13 +263,6 @@ public class RocksDBConsumeQueueStore extends AbstractConsumeQueueStore { this.rocksDBStorage.batchPut(writeBatch); this.rocksDBConsumeQueueOffsetTable.putHeapMaxCqOffset(tempTopicQueueMaxOffsetMap); - - long storeTimeStamp = requests.get(size - 1).getStoreTimestamp(); - if (this.messageStore.getMessageStoreConfig().getBrokerRole() == BrokerRole.SLAVE - || this.messageStore.getMessageStoreConfig().isEnableDLegerCommitLog()) { - this.messageStore.getStoreCheckpoint().setPhysicMsgTimestamp(storeTimeStamp); - } - this.messageStore.getStoreCheckpoint().setLogicsMsgTimestamp(storeTimeStamp); notifyMessageArriveAndClear(requests); return true; } catch (Exception e) {