This is an automated email from the ASF dual-hosted git repository.

dinglei pushed a commit to branch develop
in repository https://gitbox.apache.org/repos/asf/rocketmq.git


The following commit(s) were added to refs/heads/develop by this push:
     new 80c0330f75 ConfirmOffset directly takes the max offset when 
allAckInSyncStateSet is false (#7657)
80c0330f75 is described below

commit 80c0330f752dcf3219cb8631f3004c1725bedf1e
Author: rongtong <jinrongto...@163.com>
AuthorDate: Mon Dec 18 10:10:28 2023 +0800

    ConfirmOffset directly takes the max offset when allAckInSyncStateSet is 
false (#7657)
---
 .../java/org/apache/rocketmq/store/CommitLog.java  | 23 ++++++++++++----------
 1 file changed, 13 insertions(+), 10 deletions(-)

diff --git a/store/src/main/java/org/apache/rocketmq/store/CommitLog.java 
b/store/src/main/java/org/apache/rocketmq/store/CommitLog.java
index 35c1d0e2d7..cc29cca5d9 100644
--- a/store/src/main/java/org/apache/rocketmq/store/CommitLog.java
+++ b/store/src/main/java/org/apache/rocketmq/store/CommitLog.java
@@ -314,6 +314,7 @@ public class CommitLog implements Swappable {
 
     /**
      * When the normal exit, data recovery, all memory data have been flush
+     *
      * @throws RocksDBException only in rocksdb mode
      */
     public void recoverNormally(long maxPhyOffsetOfConsumeQueue) throws 
RocksDBException {
@@ -636,7 +637,8 @@ public class CommitLog implements Swappable {
     public long getConfirmOffset() {
         if 
(this.defaultMessageStore.getBrokerConfig().isEnableControllerMode()) {
             if 
(this.defaultMessageStore.getMessageStoreConfig().getBrokerRole() != 
BrokerRole.SLAVE && !this.defaultMessageStore.getRunningFlags().isFenced()) {
-                if (((AutoSwitchHAService) 
this.defaultMessageStore.getHaService()).getLocalSyncStateSet().size() == 1) {
+                if (((AutoSwitchHAService) 
this.defaultMessageStore.getHaService()).getLocalSyncStateSet().size() == 1
+                    || 
!this.defaultMessageStore.getMessageStoreConfig().isAllAckInSyncStateSet()) {
                     return this.defaultMessageStore.getMaxPhyOffset();
                 }
                 // First time it will compute the confirmOffset.
@@ -1214,7 +1216,7 @@ public class CommitLog implements Swappable {
             }
         } catch (RocksDBException e) {
             return CompletableFuture.completedFuture(new 
PutMessageResult(PutMessageStatus.UNKNOWN_ERROR, result));
-        }  finally {
+        } finally {
             topicQueueLock.unlock(topicQueueKey);
         }
 
@@ -1840,7 +1842,8 @@ public class CommitLog implements Swappable {
             this.messageStoreConfig = messageStoreConfig;
         }
 
-        public AppendMessageResult handlePropertiesForLmqMsg(ByteBuffer 
preEncodeBuffer, final MessageExtBrokerInner msgInner) {
+        public AppendMessageResult handlePropertiesForLmqMsg(ByteBuffer 
preEncodeBuffer,
+            final MessageExtBrokerInner msgInner) {
             if (msgInner.isEncodeCompleted()) {
                 return null;
             }
@@ -1850,10 +1853,10 @@ public class CommitLog implements Swappable {
             
msgInner.setPropertiesString(MessageDecoder.messageProperties2String(msgInner.getProperties()));
 
             final byte[] propertiesData =
-                    msgInner.getPropertiesString() == null ? null : 
msgInner.getPropertiesString().getBytes(MessageDecoder.CHARSET_UTF8);
+                msgInner.getPropertiesString() == null ? null : 
msgInner.getPropertiesString().getBytes(MessageDecoder.CHARSET_UTF8);
 
             boolean needAppendLastPropertySeparator = enabledAppendPropCRC && 
propertiesData != null && propertiesData.length > 0
-                    && propertiesData[propertiesData.length - 1] != 
MessageDecoder.PROPERTY_SEPARATOR;
+                && propertiesData[propertiesData.length - 1] != 
MessageDecoder.PROPERTY_SEPARATOR;
 
             final int propertiesLength = (propertiesData == null ? 0 : 
propertiesData.length) + (needAppendLastPropertySeparator ? 1 : 0) + 
crc32ReservedLength;
 
@@ -2312,7 +2315,7 @@ public class CommitLog implements Swappable {
                 return true;
             }
 
-            int pos = (int)(offset % 
defaultMessageStore.getMessageStoreConfig().getMappedFileSizeCommitLog());
+            int pos = (int) (offset % 
defaultMessageStore.getMessageStoreConfig().getMappedFileSizeCommitLog());
             int realIndex = pos / pageSize / sampleSteps;
             return bytes.length - 1 >= realIndex && bytes[realIndex] != 0;
         }
@@ -2356,8 +2359,8 @@ public class CommitLog implements Swappable {
 
         private byte[] checkFileInPageCache(MappedFile mappedFile) {
             long fileSize = mappedFile.getFileSize();
-            final long address = 
((DirectBuffer)mappedFile.getMappedByteBuffer()).address();
-            int pageNums = (int)(fileSize + this.pageSize - 1) / this.pageSize;
+            final long address = ((DirectBuffer) 
mappedFile.getMappedByteBuffer()).address();
+            int pageNums = (int) (fileSize + this.pageSize - 1) / 
this.pageSize;
             byte[] pageCacheRst = new byte[pageNums];
             int mincore = LibC.INSTANCE.mincore(new Pointer(address), new 
NativeLong(fileSize), pageCacheRst);
             if (mincore != 0) {
@@ -2395,7 +2398,7 @@ public class CommitLog implements Swappable {
                 return false;
             }
             try {
-                ConsumeQueue consumeQueue = 
(ConsumeQueue)defaultMessageStore.findConsumeQueue(topic, queueId);
+                ConsumeQueue consumeQueue = (ConsumeQueue) 
defaultMessageStore.findConsumeQueue(topic, queueId);
                 if (null == consumeQueue) {
                     return false;
                 }
@@ -2433,7 +2436,7 @@ public class CommitLog implements Swappable {
             log.error("setFileReadMode mappedFile is null");
             return -1;
         }
-        final long address = 
((DirectBuffer)mappedFile.getMappedByteBuffer()).address();
+        final long address = ((DirectBuffer) 
mappedFile.getMappedByteBuffer()).address();
         int madvise = LibC.INSTANCE.madvise(new Pointer(address), new 
NativeLong(mappedFile.getFileSize()), mode);
         if (madvise != 0) {
             log.error("setFileReadMode error fileName: {}, madvise: {}, 
mode:{}", mappedFile.getFileName(), madvise, mode);

Reply via email to