This is an automated email from the ASF dual-hosted git repository.

jinrongtong pushed a commit to branch 5.0.0-alpha
in repository https://gitbox.apache.org/repos/asf/rocketmq.git


The following commit(s) were added to refs/heads/5.0.0-alpha by this push:
     new 7be401e  [ISSUE #3708] add CorrectLogicOffsetService to periodically 
correct min logic offset (#3722)
7be401e is described below

commit 7be401e7774ac09d07edef6edb79a3b9c0636392
Author: Hongjian Fei <[email protected]>
AuthorDate: Mon Jan 10 18:44:12 2022 +0800

    [ISSUE #3708] add CorrectLogicOffsetService to periodically correct min 
logic offset (#3722)
    
    * [ISSUE #3708] add CorrectLogicOffsetService to periodically correct min 
logic offset; refactor QueueOffsetAssigner.
    
    * Mock getDiskSpaceWarningLevelRatio and getDiskSpaceCleanForciblyRatio to 
get around configuration protection in unit-test.
    
    * Fix check style.
---
 .../apache/rocketmq/common/attribute/CQType.java   |   3 +-
 .../org/apache/rocketmq/store/ConsumeQueue.java    |   9 +-
 .../apache/rocketmq/store/DefaultMessageStore.java | 124 +++++++++++++++++++++
 .../rocketmq/store/queue/BatchConsumeQueue.java    |   9 +-
 .../rocketmq/store/queue/ConsumeQueueStore.java    |   2 +-
 .../rocketmq/store/queue/QueueOffsetAssigner.java  |  30 +++--
 .../store/DefaultMessageStoreCleanFilesTest.java   |  21 ++++
 7 files changed, 173 insertions(+), 25 deletions(-)

diff --git 
a/common/src/main/java/org/apache/rocketmq/common/attribute/CQType.java 
b/common/src/main/java/org/apache/rocketmq/common/attribute/CQType.java
index 6bd6ad2..73ef218 100644
--- a/common/src/main/java/org/apache/rocketmq/common/attribute/CQType.java
+++ b/common/src/main/java/org/apache/rocketmq/common/attribute/CQType.java
@@ -19,6 +19,5 @@ package org.apache.rocketmq.common.attribute;
 
 public enum CQType {
     SimpleCQ,
-    BatchCQ,
-    MillionCQ;
+    BatchCQ
 }
diff --git a/store/src/main/java/org/apache/rocketmq/store/ConsumeQueue.java 
b/store/src/main/java/org/apache/rocketmq/store/ConsumeQueue.java
index 7763a0f..a1fc870 100644
--- a/store/src/main/java/org/apache/rocketmq/store/ConsumeQueue.java
+++ b/store/src/main/java/org/apache/rocketmq/store/ConsumeQueue.java
@@ -18,7 +18,6 @@ package org.apache.rocketmq.store;
 
 import java.io.File;
 import java.nio.ByteBuffer;
-import java.util.HashMap;
 import java.util.List;
 import org.apache.rocketmq.common.constant.LoggerName;
 import org.apache.rocketmq.logging.InternalLogger;
@@ -443,12 +442,8 @@ public class ConsumeQueue implements 
ConsumeQueueInterface, FileQueueLifeCycle {
     @Override
     public void assignQueueOffset(QueueOffsetAssigner queueOffsetAssigner, 
MessageExtBrokerInner msg, short messageNum) {
         String topicQueueKey = getTopic() + "-" + getQueueId();
-        HashMap<String, Long> topicQueueTable = 
queueOffsetAssigner.getTopicQueueTable();
-
-        long topicOffset = topicQueueTable.computeIfAbsent(topicQueueKey, k -> 
0L);
-        topicQueueTable.put(topicQueueKey, topicOffset + messageNum);
-
-        msg.setQueueOffset(topicOffset);
+        long queueOffset = 
queueOffsetAssigner.assignQueueOffset(topicQueueKey, messageNum);
+        msg.setQueueOffset(queueOffset);
     }
 
     private boolean putMessagePositionInfo(final long offset, final int size, 
final long tagsCode,
diff --git 
a/store/src/main/java/org/apache/rocketmq/store/DefaultMessageStore.java 
b/store/src/main/java/org/apache/rocketmq/store/DefaultMessageStore.java
index da47be6..76165d9 100644
--- a/store/src/main/java/org/apache/rocketmq/store/DefaultMessageStore.java
+++ b/store/src/main/java/org/apache/rocketmq/store/DefaultMessageStore.java
@@ -32,6 +32,7 @@ import java.util.LinkedList;
 import java.util.List;
 import java.util.Map;
 import java.util.Map.Entry;
+import java.util.Objects;
 import java.util.Optional;
 import java.util.Set;
 import java.util.concurrent.CompletableFuture;
@@ -49,6 +50,7 @@ import org.apache.rocketmq.common.SystemClock;
 import org.apache.rocketmq.common.ThreadFactoryImpl;
 import org.apache.rocketmq.common.TopicConfig;
 import org.apache.rocketmq.common.UtilAll;
+import org.apache.rocketmq.common.attribute.CQType;
 import org.apache.rocketmq.common.constant.LoggerName;
 import org.apache.rocketmq.common.message.MessageConst;
 import org.apache.rocketmq.common.message.MessageDecoder;
@@ -94,6 +96,8 @@ public class DefaultMessageStore implements MessageStore {
 
     private final CleanConsumeQueueService cleanConsumeQueueService;
 
+    private final CorrectLogicOffsetService correctLogicOffsetService;
+
     private final IndexService indexService;
 
     private final AllocateMappedFileService allocateMappedFileService;
@@ -156,6 +160,7 @@ public class DefaultMessageStore implements MessageStore {
         this.flushConsumeQueueService = new FlushConsumeQueueService();
         this.cleanCommitLogService = new CleanCommitLogService();
         this.cleanConsumeQueueService = new CleanConsumeQueueService();
+        this.correctLogicOffsetService = new CorrectLogicOffsetService();
         this.storeStatsService = new StoreStatsService();
         this.indexService = new IndexService(this);
         if (!messageStoreConfig.isEnableDLegerCommitLog()) {
@@ -1351,6 +1356,8 @@ public class DefaultMessageStore implements MessageStore {
         long deleteCount = 0L;
         deleteCount += this.cleanCommitLogService.run();
         deleteCount += this.cleanConsumeQueueService.run();
+
+        this.correctLogicOffsetService.run();
         return deleteCount;
     }
 
@@ -1879,6 +1886,123 @@ public class DefaultMessageStore implements 
MessageStore {
         }
     }
 
+    class CorrectLogicOffsetService {
+        private long lastForceCorrectTime = -1L;
+
+        public void run() {
+            try {
+                this.correctLogicMinOffset();
+            } catch (Throwable e) {
+                log.warn(this.getServiceName() + " service has exception. ", 
e);
+            }
+        }
+
+        private boolean needCorrect(ConsumeQueueInterface logic, long 
minPhyOffset, long lastForeCorrectTimeCurRun) {
+            if (logic == null) {
+                return false;
+            }
+            // If first exist and not available, it means first file may 
destroy failed, delete it.
+            if 
(DefaultMessageStore.this.consumeQueueStore.isFirstFileExist(logic) && 
!DefaultMessageStore.this.consumeQueueStore.isFirstFileAvailable(logic)) {
+                log.error("CorrectLogicOffsetService.needCorrect. first file 
not available, trigger correct." +
+                                " topic:{}, queue:{}, maxPhyOffset in 
queue:{}, minPhyOffset " +
+                                "in commit log:{}, minOffset in queue:{}, 
maxOffset in queue:{}, cqType:{}"
+                        , logic.getTopic(), logic.getQueueId(), 
logic.getMaxPhysicOffset()
+                        , minPhyOffset, logic.getMinOffsetInQueue(), 
logic.getMaxOffsetInQueue(), logic.getCQType());
+                return true;
+            }
+
+            // logic.getMaxPhysicOffset() or minPhyOffset = -1
+            // means there is no message in current queue, so no need to 
correct.
+            if (logic.getMaxPhysicOffset() == -1 || minPhyOffset == -1) {
+                return false;
+            }
+
+            if (logic.getMaxPhysicOffset() < minPhyOffset) {
+                if (logic.getMinOffsetInQueue() < logic.getMaxOffsetInQueue()) 
{
+                    log.error("CorrectLogicOffsetService.needCorrect. logic 
max phy offset: {} is less than min phy offset: {}, " +
+                                    "but min offset: {} is less than max 
offset: {}. topic:{}, queue:{}, cqType:{}."
+                            , logic.getMaxPhysicOffset(), minPhyOffset, 
logic.getMinOffsetInQueue()
+                            , logic.getMaxOffsetInQueue(), logic.getTopic(), 
logic.getQueueId(), logic.getCQType());
+                    return true;
+                } else if (logic.getMinOffsetInQueue() == 
logic.getMaxOffsetInQueue()) {
+                    return false;
+                } else {
+                    log.error("CorrectLogicOffsetService.needCorrect. It 
should not happen, logic max phy offset: {} is less than min phy offset: {}," +
+                                    " but min offset: {} is larger than max 
offset: {}. topic:{}, queue:{}, cqType:{}"
+                            , logic.getMaxPhysicOffset(), minPhyOffset, 
logic.getMinOffsetInQueue()
+                            , logic.getMaxOffsetInQueue(), logic.getTopic(), 
logic.getQueueId(), logic.getCQType());
+                    return false;
+                }
+            }
+            //the logic.getMaxPhysicOffset() >= minPhyOffset
+            int forceCorrectInterval = 
DefaultMessageStore.this.getMessageStoreConfig().getCorrectLogicMinOffsetForceInterval();
+            if ((System.currentTimeMillis() - lastForeCorrectTimeCurRun) > 
forceCorrectInterval) {
+                lastForceCorrectTime = System.currentTimeMillis();
+                CqUnit cqUnit = logic.getEarliestUnit();
+                if (cqUnit == null) {
+                    if (logic.getMinOffsetInQueue() == 
logic.getMaxOffsetInQueue()) {
+                        return false;
+                    } else {
+                        log.error("CorrectLogicOffsetService.needCorrect. 
cqUnit is null, logic max phy offset: {} is greater than min phy offset: {}, " +
+                                        "but min offset: {} is not equal to 
max offset: {}. topic:{}, queue:{}, cqType:{}."
+                                , logic.getMaxPhysicOffset(), minPhyOffset, 
logic.getMinOffsetInQueue()
+                                , logic.getMaxOffsetInQueue(), 
logic.getTopic(), logic.getQueueId(), logic.getCQType());
+                        return true;
+                    }
+                }
+
+                if (cqUnit.getPos() < minPhyOffset) {
+                    log.error("CorrectLogicOffsetService.needCorrect. logic 
max phy offset: {} is greater than min phy offset: {}, " +
+                                    "but minPhyPos in cq is: {}. min offset in 
queue: {}, max offset in queue: {}, topic:{}, queue:{}, cqType:{}."
+                            , logic.getMaxPhysicOffset(), minPhyOffset, 
cqUnit.getPos(), logic.getMinOffsetInQueue()
+                            , logic.getMaxOffsetInQueue(), logic.getTopic(), 
logic.getQueueId(), logic.getCQType());
+                    return true;
+                }
+
+                if (cqUnit.getPos() >= minPhyOffset) {
+
+                    // Normal case, do not need correct.
+                    return false;
+                }
+            }
+
+            return false;
+        }
+
+        private void correctLogicMinOffset() {
+
+            long lastForeCorrectTimeCurRun = lastForceCorrectTime;
+            long minPhyOffset = getMinPhyOffset();
+            ConcurrentMap<String, ConcurrentMap<Integer, 
ConsumeQueueInterface>> tables = 
DefaultMessageStore.this.getConsumeQueueTable();
+            for (ConcurrentMap<Integer, ConsumeQueueInterface> maps : 
tables.values()) {
+                for (ConsumeQueueInterface logic : maps.values()) {
+                    if (Objects.equals(CQType.SimpleCQ, logic.getCQType())) {
+                        // cq is not supported for now.
+                        continue;
+                    }
+                    if (needCorrect(logic, minPhyOffset, 
lastForeCorrectTimeCurRun)) {
+                        doCorrect(logic, minPhyOffset);
+                    }
+                }
+            }
+        }
+
+        private void doCorrect(ConsumeQueueInterface logic, long minPhyOffset) 
{
+            
DefaultMessageStore.this.consumeQueueStore.deleteExpiredFile(logic, 
minPhyOffset);
+            int sleepIntervalWhenCorrectMinOffset = 
DefaultMessageStore.this.getMessageStoreConfig().getCorrectLogicMinOffsetSleepInterval();
+            if (sleepIntervalWhenCorrectMinOffset > 0) {
+                try {
+                    Thread.sleep(sleepIntervalWhenCorrectMinOffset);
+                } catch (InterruptedException ignored) {
+                }
+            }
+        }
+
+        public String getServiceName() {
+            return CorrectLogicOffsetService.class.getSimpleName();
+        }
+    }
+
     class FlushConsumeQueueService extends ServiceThread {
         private static final int RETRY_TIMES_OVER = 3;
         private long lastFlushTimestamp = 0;
diff --git 
a/store/src/main/java/org/apache/rocketmq/store/queue/BatchConsumeQueue.java 
b/store/src/main/java/org/apache/rocketmq/store/queue/BatchConsumeQueue.java
index 648a472..3400120 100644
--- a/store/src/main/java/org/apache/rocketmq/store/queue/BatchConsumeQueue.java
+++ b/store/src/main/java/org/apache/rocketmq/store/queue/BatchConsumeQueue.java
@@ -35,7 +35,6 @@ import org.apache.rocketmq.store.logfile.MappedFile;
 
 import java.io.File;
 import java.nio.ByteBuffer;
-import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
 import java.util.concurrent.ConcurrentSkipListMap;
@@ -482,17 +481,15 @@ public class BatchConsumeQueue implements 
ConsumeQueueInterface, FileQueueLifeCy
 
     @Override
     public void assignQueueOffset(QueueOffsetAssigner queueOffsetAssigner, 
MessageExtBrokerInner msg, short messageNum) {
-        HashMap<String, Long> batchTopicQueueTable = 
queueOffsetAssigner.getBatchTopicQueueTable();
         String topicQueueKey = getTopic() + "-" + getQueueId();
 
-        Long topicOffset = batchTopicQueueTable.computeIfAbsent(topicQueueKey, 
k -> 0L);
+        long queueOffset = 
queueOffsetAssigner.assignBatchQueueOffset(topicQueueKey, messageNum);
 
         if (MessageSysFlag.check(msg.getSysFlag(), 
MessageSysFlag.INNER_BATCH_FLAG)) {
-            MessageAccessor.putProperty(msg, MessageConst.PROPERTY_INNER_BASE, 
String.valueOf(topicOffset));
+            MessageAccessor.putProperty(msg, MessageConst.PROPERTY_INNER_BASE, 
String.valueOf(queueOffset));
             
msg.setPropertiesString(MessageDecoder.messageProperties2String(msg.getProperties()));
         }
-        msg.setQueueOffset(topicOffset);
-        batchTopicQueueTable.put(topicQueueKey, topicOffset + messageNum);
+        msg.setQueueOffset(queueOffset);
     }
 
     boolean putBatchMessagePositionInfo(final long offset, final int size, 
final long tagsCode, final long storeTime,
diff --git 
a/store/src/main/java/org/apache/rocketmq/store/queue/ConsumeQueueStore.java 
b/store/src/main/java/org/apache/rocketmq/store/queue/ConsumeQueueStore.java
index d3bfe75..d2d147c 100644
--- a/store/src/main/java/org/apache/rocketmq/store/queue/ConsumeQueueStore.java
+++ b/store/src/main/java/org/apache/rocketmq/store/queue/ConsumeQueueStore.java
@@ -295,7 +295,7 @@ public class ConsumeQueueStore {
     }
 
     public Long getMaxOffset(String topic, int queueId) {
-        return this.queueOffsetAssigner.getTopicQueueTable().get(topic + "-" + 
queueId);
+        return this.queueOffsetAssigner.currentQueueOffset(topic + "-" + 
queueId);
     }
 
     public void setTopicQueueTable(HashMap<String, Long> topicQueueTable) {
diff --git 
a/store/src/main/java/org/apache/rocketmq/store/queue/QueueOffsetAssigner.java 
b/store/src/main/java/org/apache/rocketmq/store/queue/QueueOffsetAssigner.java
index 09e18ec..4ca1126 100644
--- 
a/store/src/main/java/org/apache/rocketmq/store/queue/QueueOffsetAssigner.java
+++ 
b/store/src/main/java/org/apache/rocketmq/store/queue/QueueOffsetAssigner.java
@@ -24,7 +24,7 @@ import org.apache.rocketmq.logging.InternalLoggerFactory;
 import java.util.HashMap;
 
 /**
- * QueueOffsetAssigner is a component for assigning queue.
+ * QueueOffsetAssigner is a component for assigning offsets for queues.
  *
  */
 public class QueueOffsetAssigner {
@@ -33,20 +33,24 @@ public class QueueOffsetAssigner {
     private HashMap<String, Long> topicQueueTable = new HashMap<>(1024);
     private HashMap<String, Long> batchTopicQueueTable = new HashMap<>(1024);
 
-    public HashMap<String, Long> getTopicQueueTable() {
-        return topicQueueTable;
+    public long assignQueueOffset(String topicQueueKey, short messageNum) {
+        long queueOffset = this.topicQueueTable.computeIfAbsent(topicQueueKey, 
k -> 0L);
+        this.topicQueueTable.put(topicQueueKey, queueOffset + messageNum);
+        return queueOffset;
     }
 
-    public void setTopicQueueTable(HashMap<String, Long> topicQueueTable) {
-        this.topicQueueTable = topicQueueTable;
+    public long assignBatchQueueOffset(String topicQueueKey, short messageNum) 
{
+        Long topicOffset = 
this.batchTopicQueueTable.computeIfAbsent(topicQueueKey, k -> 0L);
+        this.batchTopicQueueTable.put(topicQueueKey, topicOffset + messageNum);
+        return topicOffset;
     }
 
-    public HashMap<String, Long> getBatchTopicQueueTable() {
-        return batchTopicQueueTable;
+    public long currentQueueOffset(String topicQueueKey) {
+        return this.topicQueueTable.get(topicQueueKey);
     }
 
-    public void setBatchTopicQueueTable(HashMap<String, Long> 
batchTopicQueueTable) {
-        this.batchTopicQueueTable = batchTopicQueueTable;
+    public long currentBatchQueueOffset(String topicQueueKey) {
+        return this.batchTopicQueueTable.get(topicQueueKey);
     }
 
     public synchronized void remove(String topic, Integer queueId) {
@@ -57,4 +61,12 @@ public class QueueOffsetAssigner {
 
         log.info("removeQueueFromTopicQueueTable OK Topic: {} QueueId: {}", 
topic, queueId);
     }
+
+    public void setTopicQueueTable(HashMap<String, Long> topicQueueTable) {
+        this.topicQueueTable = topicQueueTable;
+    }
+
+    public void setBatchTopicQueueTable(HashMap<String, Long> 
batchTopicQueueTable) {
+        this.batchTopicQueueTable = batchTopicQueueTable;
+    }
 }
\ No newline at end of file
diff --git 
a/store/src/test/java/org/apache/rocketmq/store/DefaultMessageStoreCleanFilesTest.java
 
b/store/src/test/java/org/apache/rocketmq/store/DefaultMessageStoreCleanFilesTest.java
index 9dad5ea..356e653 100644
--- 
a/store/src/test/java/org/apache/rocketmq/store/DefaultMessageStoreCleanFilesTest.java
+++ 
b/store/src/test/java/org/apache/rocketmq/store/DefaultMessageStoreCleanFilesTest.java
@@ -336,6 +336,17 @@ public class DefaultMessageStoreCleanFilesTest {
         }
     }
 
+    private DefaultMessageStore.CleanCommitLogService 
getCleanCommitLogService()
+            throws Exception {
+        Field serviceField = 
messageStore.getClass().getDeclaredField("cleanCommitLogService");
+        serviceField.setAccessible(true);
+        DefaultMessageStore.CleanCommitLogService cleanCommitLogService =
+                (DefaultMessageStore.CleanCommitLogService) 
serviceField.get(messageStore);
+        serviceField.setAccessible(false);
+
+        return cleanCommitLogService;
+    }
+
     private DefaultMessageStore.CleanConsumeQueueService 
getCleanConsumeQueueService()
             throws Exception {
         Field serviceField = 
messageStore.getClass().getDeclaredField("cleanConsumeQueueService");
@@ -472,6 +483,7 @@ public class DefaultMessageStoreCleanFilesTest {
         messageStore = new DefaultMessageStore(messageStoreConfig,
                 new BrokerStatsManager("test"), new 
MyMessageArrivingListener(), new BrokerConfig());
 
+        cleanCommitLogService = getCleanCommitLogService();
         cleanConsumeQueueService = getCleanConsumeQueueService();
 
         assertTrue(messageStore.load());
@@ -481,6 +493,15 @@ public class DefaultMessageStoreCleanFilesTest {
         cleanCommitLogService = spy(cleanCommitLogService);
         
when(cleanCommitLogService.getDiskSpaceWarningLevelRatio()).thenReturn(diskSpaceCleanForciblyRatio);
         
when(cleanCommitLogService.getDiskSpaceCleanForciblyRatio()).thenReturn(diskSpaceCleanForciblyRatio);
+
+        putFiledBackToMessageStore(cleanCommitLogService);
+    }
+
+    private void 
putFiledBackToMessageStore(DefaultMessageStore.CleanCommitLogService 
cleanCommitLogService) throws Exception {
+        Field cleanCommitLogServiceField = 
DefaultMessageStore.class.getDeclaredField("cleanCommitLogService");
+        cleanCommitLogServiceField.setAccessible(true);
+        cleanCommitLogServiceField.set(messageStore, cleanCommitLogService);
+        cleanCommitLogServiceField.setAccessible(false);
     }
 
     private class MyMessageArrivingListener implements MessageArrivingListener 
{

Reply via email to