This is an automated email from the ASF dual-hosted git repository. jinrongtong pushed a commit to branch develop in repository https://gitbox.apache.org/repos/asf/rocketmq.git
The following commit(s) were added to refs/heads/develop by this push: new 343ed4ff46 [ISSUE #8127] Optimize the metric calculation logic of the time wheel (#8128) 343ed4ff46 is described below commit 343ed4ff468debed8cbb766af0d0906b697facc4 Author: hqbfz <125714719+3424672...@users.noreply.github.com> AuthorDate: Tue Mar 4 15:53:28 2025 +0800 [ISSUE #8127] Optimize the metric calculation logic of the time wheel (#8128) * Fix the metric of the time wheel was incorrectly calculated * Fix the metric of the time wheel was incorrectly calculated --------- Co-authored-by: wanghuaiyuan <wanghuaiy...@xiaomi.com> --- .../rocketmq/store/timer/TimerMessageStore.java | 23 ++++++++++++++++++---- 1 file changed, 19 insertions(+), 4 deletions(-) diff --git a/store/src/main/java/org/apache/rocketmq/store/timer/TimerMessageStore.java b/store/src/main/java/org/apache/rocketmq/store/timer/TimerMessageStore.java index 4287ce78ab..d6af7b84e7 100644 --- a/store/src/main/java/org/apache/rocketmq/store/timer/TimerMessageStore.java +++ b/store/src/main/java/org/apache/rocketmq/store/timer/TimerMessageStore.java @@ -1571,6 +1571,9 @@ public class TimerMessageStore { public void run() { setState(AbstractStateService.START); TimerMessageStore.LOGGER.info(this.getServiceName() + " service start"); + //Mark different rounds + boolean isRound = true; + Map<String ,MessageExt> avoidDeleteLose = new HashMap<>(); while (!this.isStopped()) { try { setState(AbstractStateService.WAITING); @@ -1587,9 +1590,18 @@ public class TimerMessageStore { MessageExt msgExt = getMessageByCommitOffset(tr.getOffsetPy(), tr.getSizePy()); if (null != msgExt) { if (needDelete(tr.getMagic()) && !needRoll(tr.getMagic())) { + //Clearing is performed once in each round. + //The deletion message is received first and the common message is received once + if (!isRound) { + isRound = true; + for (MessageExt messageExt: avoidDeleteLose.values()) { + addMetric(messageExt, 1); + } + avoidDeleteLose.clear(); + } if (msgExt.getProperty(MessageConst.PROPERTY_TIMER_DEL_UNIQKEY) != null && tr.getDeleteList() != null) { - //Execute metric plus one for messages that fail to be deleted - addMetric(msgExt, 1); + + avoidDeleteLose.put(msgExt.getProperty(MessageConst.PROPERTY_TIMER_DEL_UNIQKEY), msgExt); tr.getDeleteList().add(msgExt.getProperty(MessageConst.PROPERTY_TIMER_DEL_UNIQKEY)); } tr.idempotentRelease(); @@ -1599,10 +1611,13 @@ public class TimerMessageStore { if (null == uniqueKey) { LOGGER.warn("No uniqueKey for msg:{}", msgExt); } + //Mark ready for next round + if (isRound) { + isRound = false; + } if (null != uniqueKey && tr.getDeleteList() != null && tr.getDeleteList().size() > 0 && tr.getDeleteList().contains(buildDeleteKey(getRealTopic(msgExt), uniqueKey))) { - //Normally, it cancels out with the +1 above - addMetric(msgExt, -1); + avoidDeleteLose.remove(uniqueKey); doRes = true; tr.idempotentRelease(); perfCounterTicks.getCounter("dequeue_delete").flow(1);