This is an automated email from the ASF dual-hosted git repository.

jinrongtong pushed a commit to branch develop
in repository https://gitbox.apache.org/repos/asf/rocketmq.git


The following commit(s) were added to refs/heads/develop by this push:
     new 2c898c9b31 [ISSUE #7689] In Controller mode, messages may lost due to 
sharing the same cq offset (#7690)
2c898c9b31 is described below

commit 2c898c9b31bf195174cf1e3a626a7c61f7576381
Author: Ji Juntao <juntao....@alibaba-inc.com>
AuthorDate: Thu Dec 21 14:41:51 2023 +0800

    [ISSUE #7689] In Controller mode, messages may lost due to sharing the same 
cq offset (#7690)
    
    * fix the reput bug.
    
    * add more logs.
    
    * refactor the method of compensating for HA.
    
    * not modify the imports.
    
    * refactor the log.
    
    * refactor the log.
---
 .../rocketmq/store/queue/ConsumeQueueStore.java    | 76 ++++++++++++----------
 1 file changed, 40 insertions(+), 36 deletions(-)

diff --git 
a/store/src/main/java/org/apache/rocketmq/store/queue/ConsumeQueueStore.java 
b/store/src/main/java/org/apache/rocketmq/store/queue/ConsumeQueueStore.java
index 616511b67f..cbe9b4f5ac 100644
--- a/store/src/main/java/org/apache/rocketmq/store/queue/ConsumeQueueStore.java
+++ b/store/src/main/java/org/apache/rocketmq/store/queue/ConsumeQueueStore.java
@@ -455,48 +455,52 @@ public class ConsumeQueueStore extends 
AbstractConsumeQueueStore {
         }
 
         // Correct unSubmit consumeOffset
-        if (messageStoreConfig.isDuplicationEnable()) {
-            SelectMappedBufferResult lastBuffer = null;
-            long startReadOffset = 
messageStore.getCommitLog().getConfirmOffset() == -1 ? 0 : 
messageStore.getCommitLog().getConfirmOffset();
-            while ((lastBuffer = 
messageStore.selectOneMessageByOffset(startReadOffset)) != null) {
-                try {
-                    if (lastBuffer.getStartOffset() > startReadOffset) {
-                        startReadOffset = lastBuffer.getStartOffset();
-                        continue;
-                    }
+        if (messageStoreConfig.isDuplicationEnable() || 
messageStore.getBrokerConfig().isEnableControllerMode()) {
+            compensateForHA(cqOffsetTable);
+        }
 
-                    ByteBuffer bb = lastBuffer.getByteBuffer();
-                    int magicCode = bb.getInt(bb.position() + 4);
-                    if (magicCode == CommitLog.BLANK_MAGIC_CODE) {
-                        startReadOffset += bb.getInt(bb.position());
-                        continue;
-                    } else if (magicCode != MessageDecoder.MESSAGE_MAGIC_CODE) 
{
-                        throw new RuntimeException("Unknown magicCode: " + 
magicCode);
-                    }
+        this.setTopicQueueTable(cqOffsetTable);
+        this.setBatchTopicQueueTable(bcqOffsetTable);
+    }
+    private void compensateForHA(ConcurrentMap<String, Long> cqOffsetTable) {
+        SelectMappedBufferResult lastBuffer = null;
+        long startReadOffset = messageStore.getCommitLog().getConfirmOffset() 
== -1 ? 0 : messageStore.getCommitLog().getConfirmOffset();
+        log.info("Correct unsubmitted offset...StartReadOffset = {}", 
startReadOffset);
+        while ((lastBuffer = 
messageStore.selectOneMessageByOffset(startReadOffset)) != null) {
+            try {
+                if (lastBuffer.getStartOffset() > startReadOffset) {
+                    startReadOffset = lastBuffer.getStartOffset();
+                    continue;
+                }
 
-                    lastBuffer.getByteBuffer().mark();
-                    DispatchRequest dispatchRequest = 
messageStore.getCommitLog().checkMessageAndReturnSize(lastBuffer.getByteBuffer(),
 true, true, true);
-                    if (!dispatchRequest.isSuccess())
-                        break;
-                    lastBuffer.getByteBuffer().reset();
-
-                    MessageExt msg = 
MessageDecoder.decode(lastBuffer.getByteBuffer(), true, false, false, false, 
true);
-                    if (msg == null)
-                        break;
-
-                    String key = msg.getTopic() + "-" + msg.getQueueId();
-                    cqOffsetTable.put(key, msg.getQueueOffset() + 1);
-                    startReadOffset += msg.getStoreSize();
-                } finally {
-                    if (lastBuffer != null)
-                        lastBuffer.release();
+                ByteBuffer bb = lastBuffer.getByteBuffer();
+                int magicCode = bb.getInt(bb.position() + 4);
+                if (magicCode == CommitLog.BLANK_MAGIC_CODE) {
+                    startReadOffset += bb.getInt(bb.position());
+                    continue;
+                } else if (magicCode != MessageDecoder.MESSAGE_MAGIC_CODE) {
+                    throw new RuntimeException("Unknown magicCode: " + 
magicCode);
                 }
 
+                lastBuffer.getByteBuffer().mark();
+                DispatchRequest dispatchRequest = 
messageStore.getCommitLog().checkMessageAndReturnSize(lastBuffer.getByteBuffer(),
 true, messageStoreConfig.isDuplicationEnable(), true);
+                if (!dispatchRequest.isSuccess())
+                    break;
+                lastBuffer.getByteBuffer().reset();
+
+                MessageExt msg = 
MessageDecoder.decode(lastBuffer.getByteBuffer(), true, false, false, false, 
true);
+                if (msg == null)
+                    break;
+
+                String key = msg.getTopic() + "-" + msg.getQueueId();
+                cqOffsetTable.put(key, msg.getQueueOffset() + 1);
+                startReadOffset += msg.getStoreSize();
+                log.info("Correcting. Key:{}, start read Offset: {}", key, 
startReadOffset);
+            } finally {
+                if (lastBuffer != null)
+                    lastBuffer.release();
             }
         }
-
-        this.setTopicQueueTable(cqOffsetTable);
-        this.setBatchTopicQueueTable(bcqOffsetTable);
     }
 
     @Override

Reply via email to