This is an automated email from the ASF dual-hosted git repository. jinrongtong pushed a commit to branch develop in repository https://gitbox.apache.org/repos/asf/rocketmq.git
The following commit(s) were added to refs/heads/develop by this push: new f4c498433d [ISSUE #7480] Fix the offset in the timerCheckPoint will not be corrected when the commitlog and consumeQueue are truncated (#7488) f4c498433d is described below commit f4c498433d19e83510d4181ea9a63fbc7e3115eb Author: rongtong <jinrongton...@mails.ucas.ac.cn> AuthorDate: Wed Dec 4 15:46:56 2024 +0800 [ISSUE #7480] Fix the offset in the timerCheckPoint will not be corrected when the commitlog and consumeQueue are truncated (#7488) --- .../apache/rocketmq/store/timer/TimerMessageStore.java | 15 ++++++++++++++- 1 file changed, 14 insertions(+), 1 deletion(-) diff --git a/store/src/main/java/org/apache/rocketmq/store/timer/TimerMessageStore.java b/store/src/main/java/org/apache/rocketmq/store/timer/TimerMessageStore.java index 071b1c0219..fb166678e6 100644 --- a/store/src/main/java/org/apache/rocketmq/store/timer/TimerMessageStore.java +++ b/store/src/main/java/org/apache/rocketmq/store/timer/TimerMessageStore.java @@ -293,6 +293,19 @@ public class TimerMessageStore { } currQueueOffset = Math.min(currQueueOffset, timerCheckpoint.getMasterTimerQueueOffset()); + ConsumeQueueInterface cq = this.messageStore.getConsumeQueue(TIMER_TOPIC, 0); + + // Correction based consume queue + if (cq != null && currQueueOffset < cq.getMinOffsetInQueue()) { + LOGGER.warn("Timer currQueueOffset:{} is smaller than minOffsetInQueue:{}", + currQueueOffset, cq.getMinOffsetInQueue()); + currQueueOffset = cq.getMinOffsetInQueue(); + } else if (cq != null && currQueueOffset > cq.getMaxOffsetInQueue()) { + LOGGER.warn("Timer currQueueOffset:{} is larger than maxOffsetInQueue:{}", + currQueueOffset, cq.getMaxOffsetInQueue()); + currQueueOffset = cq.getMaxOffsetInQueue(); + } + //check timer wheel currReadTimeMs = timerCheckpoint.getLastReadTimeMs(); long nextReadTimeMs = formatTimeMs( @@ -614,7 +627,7 @@ public class TimerMessageStore { return; } if (msg.getProperty(TIMER_ENQUEUE_MS) != null - && NumberUtils.toLong(msg.getProperty(TIMER_ENQUEUE_MS)) == Long.MAX_VALUE) { + && NumberUtils.toLong(msg.getProperty(TIMER_ENQUEUE_MS)) == Long.MAX_VALUE) { return; } // pass msg into addAndGet, for further more judgement extension.