This is an automated email from the ASF dual-hosted git repository.

jinrongtong pushed a commit to branch develop
in repository https://gitbox.apache.org/repos/asf/rocketmq.git


The following commit(s) were added to refs/heads/develop by this push:
     new f4c498433d [ISSUE #7480] Fix the offset in the timerCheckPoint will 
not be corrected when the commitlog and consumeQueue are truncated (#7488)
f4c498433d is described below

commit f4c498433d19e83510d4181ea9a63fbc7e3115eb
Author: rongtong <jinrongton...@mails.ucas.ac.cn>
AuthorDate: Wed Dec 4 15:46:56 2024 +0800

    [ISSUE #7480] Fix the offset in the timerCheckPoint will not be corrected 
when the commitlog and consumeQueue are truncated (#7488)
---
 .../apache/rocketmq/store/timer/TimerMessageStore.java    | 15 ++++++++++++++-
 1 file changed, 14 insertions(+), 1 deletion(-)

diff --git 
a/store/src/main/java/org/apache/rocketmq/store/timer/TimerMessageStore.java 
b/store/src/main/java/org/apache/rocketmq/store/timer/TimerMessageStore.java
index 071b1c0219..fb166678e6 100644
--- a/store/src/main/java/org/apache/rocketmq/store/timer/TimerMessageStore.java
+++ b/store/src/main/java/org/apache/rocketmq/store/timer/TimerMessageStore.java
@@ -293,6 +293,19 @@ public class TimerMessageStore {
         }
         currQueueOffset = Math.min(currQueueOffset, 
timerCheckpoint.getMasterTimerQueueOffset());
 
+        ConsumeQueueInterface cq = 
this.messageStore.getConsumeQueue(TIMER_TOPIC, 0);
+
+        // Correction based consume queue
+        if (cq != null && currQueueOffset < cq.getMinOffsetInQueue()) {
+            LOGGER.warn("Timer currQueueOffset:{} is smaller than 
minOffsetInQueue:{}",
+                currQueueOffset, cq.getMinOffsetInQueue());
+            currQueueOffset = cq.getMinOffsetInQueue();
+        } else if (cq != null && currQueueOffset > cq.getMaxOffsetInQueue()) {
+            LOGGER.warn("Timer currQueueOffset:{} is larger than 
maxOffsetInQueue:{}",
+                currQueueOffset, cq.getMaxOffsetInQueue());
+            currQueueOffset = cq.getMaxOffsetInQueue();
+        }
+
         //check timer wheel
         currReadTimeMs = timerCheckpoint.getLastReadTimeMs();
         long nextReadTimeMs = formatTimeMs(
@@ -614,7 +627,7 @@ public class TimerMessageStore {
                 return;
             }
             if (msg.getProperty(TIMER_ENQUEUE_MS) != null
-                    && NumberUtils.toLong(msg.getProperty(TIMER_ENQUEUE_MS)) 
== Long.MAX_VALUE) {
+                && NumberUtils.toLong(msg.getProperty(TIMER_ENQUEUE_MS)) == 
Long.MAX_VALUE) {
                 return;
             }
             // pass msg into addAndGet, for further more judgement extension.

Reply via email to