This is an automated email from the ASF dual-hosted git repository. dinglei pushed a commit to branch develop in repository https://gitbox.apache.org/repos/asf/rocketmq.git
The following commit(s) were added to refs/heads/develop by this push: new 80c0330f75 ConfirmOffset directly takes the max offset when allAckInSyncStateSet is false (#7657) 80c0330f75 is described below commit 80c0330f752dcf3219cb8631f3004c1725bedf1e Author: rongtong <jinrongto...@163.com> AuthorDate: Mon Dec 18 10:10:28 2023 +0800 ConfirmOffset directly takes the max offset when allAckInSyncStateSet is false (#7657) --- .../java/org/apache/rocketmq/store/CommitLog.java | 23 ++++++++++++---------- 1 file changed, 13 insertions(+), 10 deletions(-) diff --git a/store/src/main/java/org/apache/rocketmq/store/CommitLog.java b/store/src/main/java/org/apache/rocketmq/store/CommitLog.java index 35c1d0e2d7..cc29cca5d9 100644 --- a/store/src/main/java/org/apache/rocketmq/store/CommitLog.java +++ b/store/src/main/java/org/apache/rocketmq/store/CommitLog.java @@ -314,6 +314,7 @@ public class CommitLog implements Swappable { /** * When the normal exit, data recovery, all memory data have been flush + * * @throws RocksDBException only in rocksdb mode */ public void recoverNormally(long maxPhyOffsetOfConsumeQueue) throws RocksDBException { @@ -636,7 +637,8 @@ public class CommitLog implements Swappable { public long getConfirmOffset() { if (this.defaultMessageStore.getBrokerConfig().isEnableControllerMode()) { if (this.defaultMessageStore.getMessageStoreConfig().getBrokerRole() != BrokerRole.SLAVE && !this.defaultMessageStore.getRunningFlags().isFenced()) { - if (((AutoSwitchHAService) this.defaultMessageStore.getHaService()).getLocalSyncStateSet().size() == 1) { + if (((AutoSwitchHAService) this.defaultMessageStore.getHaService()).getLocalSyncStateSet().size() == 1 + || !this.defaultMessageStore.getMessageStoreConfig().isAllAckInSyncStateSet()) { return this.defaultMessageStore.getMaxPhyOffset(); } // First time it will compute the confirmOffset. @@ -1214,7 +1216,7 @@ public class CommitLog implements Swappable { } } catch (RocksDBException e) { return CompletableFuture.completedFuture(new PutMessageResult(PutMessageStatus.UNKNOWN_ERROR, result)); - } finally { + } finally { topicQueueLock.unlock(topicQueueKey); } @@ -1840,7 +1842,8 @@ public class CommitLog implements Swappable { this.messageStoreConfig = messageStoreConfig; } - public AppendMessageResult handlePropertiesForLmqMsg(ByteBuffer preEncodeBuffer, final MessageExtBrokerInner msgInner) { + public AppendMessageResult handlePropertiesForLmqMsg(ByteBuffer preEncodeBuffer, + final MessageExtBrokerInner msgInner) { if (msgInner.isEncodeCompleted()) { return null; } @@ -1850,10 +1853,10 @@ public class CommitLog implements Swappable { msgInner.setPropertiesString(MessageDecoder.messageProperties2String(msgInner.getProperties())); final byte[] propertiesData = - msgInner.getPropertiesString() == null ? null : msgInner.getPropertiesString().getBytes(MessageDecoder.CHARSET_UTF8); + msgInner.getPropertiesString() == null ? null : msgInner.getPropertiesString().getBytes(MessageDecoder.CHARSET_UTF8); boolean needAppendLastPropertySeparator = enabledAppendPropCRC && propertiesData != null && propertiesData.length > 0 - && propertiesData[propertiesData.length - 1] != MessageDecoder.PROPERTY_SEPARATOR; + && propertiesData[propertiesData.length - 1] != MessageDecoder.PROPERTY_SEPARATOR; final int propertiesLength = (propertiesData == null ? 0 : propertiesData.length) + (needAppendLastPropertySeparator ? 1 : 0) + crc32ReservedLength; @@ -2312,7 +2315,7 @@ public class CommitLog implements Swappable { return true; } - int pos = (int)(offset % defaultMessageStore.getMessageStoreConfig().getMappedFileSizeCommitLog()); + int pos = (int) (offset % defaultMessageStore.getMessageStoreConfig().getMappedFileSizeCommitLog()); int realIndex = pos / pageSize / sampleSteps; return bytes.length - 1 >= realIndex && bytes[realIndex] != 0; } @@ -2356,8 +2359,8 @@ public class CommitLog implements Swappable { private byte[] checkFileInPageCache(MappedFile mappedFile) { long fileSize = mappedFile.getFileSize(); - final long address = ((DirectBuffer)mappedFile.getMappedByteBuffer()).address(); - int pageNums = (int)(fileSize + this.pageSize - 1) / this.pageSize; + final long address = ((DirectBuffer) mappedFile.getMappedByteBuffer()).address(); + int pageNums = (int) (fileSize + this.pageSize - 1) / this.pageSize; byte[] pageCacheRst = new byte[pageNums]; int mincore = LibC.INSTANCE.mincore(new Pointer(address), new NativeLong(fileSize), pageCacheRst); if (mincore != 0) { @@ -2395,7 +2398,7 @@ public class CommitLog implements Swappable { return false; } try { - ConsumeQueue consumeQueue = (ConsumeQueue)defaultMessageStore.findConsumeQueue(topic, queueId); + ConsumeQueue consumeQueue = (ConsumeQueue) defaultMessageStore.findConsumeQueue(topic, queueId); if (null == consumeQueue) { return false; } @@ -2433,7 +2436,7 @@ public class CommitLog implements Swappable { log.error("setFileReadMode mappedFile is null"); return -1; } - final long address = ((DirectBuffer)mappedFile.getMappedByteBuffer()).address(); + final long address = ((DirectBuffer) mappedFile.getMappedByteBuffer()).address(); int madvise = LibC.INSTANCE.madvise(new Pointer(address), new NativeLong(mappedFile.getFileSize()), mode); if (madvise != 0) { log.error("setFileReadMode error fileName: {}, madvise: {}, mode:{}", mappedFile.getFileName(), madvise, mode);