This is an automated email from the ASF dual-hosted git repository. jinrongtong pushed a commit to branch develop in repository https://gitbox.apache.org/repos/asf/rocketmq.git
The following commit(s) were added to refs/heads/develop by this push: new 88a9d939ce [ISSUE #7381] Fix the problem of inaccurate timer message metric (#7382) 88a9d939ce is described below commit 88a9d939ce110381b3b418370d4711c0c214dc7f Author: Ji Juntao <juntao....@alibaba-inc.com> AuthorDate: Sat Sep 23 17:38:27 2023 +0800 [ISSUE #7381] Fix the problem of inaccurate timer message metric (#7382) * correct the timerMetrics' result. * for further extension. * checkstyle. * use toLong. --- .../rocketmq/store/timer/TimerMessageStore.java | 20 ++++++++++++++++---- .../apache/rocketmq/store/timer/TimerMetrics.java | 5 ++++- .../apache/rocketmq/store/timer/TimerRequest.java | 7 +++++-- .../rocketmq/store/timer/TimerMetricsTest.java | 10 ++++++++-- 4 files changed, 33 insertions(+), 9 deletions(-) diff --git a/store/src/main/java/org/apache/rocketmq/store/timer/TimerMessageStore.java b/store/src/main/java/org/apache/rocketmq/store/timer/TimerMessageStore.java index 0d50de65ae..ac4c61cd61 100644 --- a/store/src/main/java/org/apache/rocketmq/store/timer/TimerMessageStore.java +++ b/store/src/main/java/org/apache/rocketmq/store/timer/TimerMessageStore.java @@ -41,6 +41,7 @@ import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicInteger; import java.util.function.Function; import org.apache.commons.collections.CollectionUtils; +import org.apache.commons.lang3.math.NumberUtils; import org.apache.rocketmq.common.ServiceThread; import org.apache.rocketmq.common.ThreadFactoryImpl; import org.apache.rocketmq.common.TopicFilterType; @@ -599,7 +600,12 @@ public class TimerMessageStore { if (null == msg || null == msg.getProperty(MessageConst.PROPERTY_REAL_TOPIC)) { return; } - timerMetrics.addAndGet(msg.getProperty(MessageConst.PROPERTY_REAL_TOPIC), value); + if (msg.getProperty(TIMER_ENQUEUE_MS) != null + && NumberUtils.toLong(msg.getProperty(TIMER_ENQUEUE_MS)) == Long.MAX_VALUE) { + return; + } + // pass msg into addAndGet, for further more judgement extension. + timerMetrics.addAndGet(msg, value); } catch (Throwable t) { if (frequency.incrementAndGet() % 1000 == 0) { LOGGER.error("error in adding metric", t); @@ -1323,6 +1329,7 @@ public class TimerMessageStore { perfCounterTicks.startTick(ENQUEUE_PUT); DefaultStoreMetricsManager.incTimerEnqueueCount(getRealTopic(req.getMsg())); if (shouldRunningDequeue && req.getDelayTime() < currWriteTimeMs) { + req.setEnqueueTime(Long.MAX_VALUE); dequeuePutQueue.put(req); } else { boolean doEnqueueRes = doEnqueue( @@ -1452,9 +1459,14 @@ public class TimerMessageStore { } try { perfCounterTicks.startTick(DEQUEUE_PUT); - DefaultStoreMetricsManager.incTimerDequeueCount(getRealTopic(tr.getMsg())); - addMetric(tr.getMsg(), -1); - MessageExtBrokerInner msg = convert(tr.getMsg(), tr.getEnqueueTime(), needRoll(tr.getMagic())); + MessageExt msgExt = tr.getMsg(); + DefaultStoreMetricsManager.incTimerDequeueCount(getRealTopic(msgExt)); + if (tr.getEnqueueTime() == Long.MAX_VALUE) { + // never enqueue, mark it. + MessageAccessor.putProperty(msgExt, TIMER_ENQUEUE_MS, String.valueOf(Long.MAX_VALUE)); + } + addMetric(msgExt, -1); + MessageExtBrokerInner msg = convert(msgExt, tr.getEnqueueTime(), needRoll(tr.getMagic())); doRes = PUT_NEED_RETRY != doPut(msg, needRoll(tr.getMagic())); while (!doRes && !isStopped()) { if (!isRunningDequeue()) { diff --git a/store/src/main/java/org/apache/rocketmq/store/timer/TimerMetrics.java b/store/src/main/java/org/apache/rocketmq/store/timer/TimerMetrics.java index e7b00cc073..7f8fedd8a5 100644 --- a/store/src/main/java/org/apache/rocketmq/store/timer/TimerMetrics.java +++ b/store/src/main/java/org/apache/rocketmq/store/timer/TimerMetrics.java @@ -38,6 +38,8 @@ import java.util.concurrent.locks.Lock; import java.util.concurrent.locks.ReentrantLock; import org.apache.rocketmq.common.ConfigManager; import org.apache.rocketmq.common.constant.LoggerName; +import org.apache.rocketmq.common.message.MessageConst; +import org.apache.rocketmq.common.message.MessageExt; import org.apache.rocketmq.common.topic.TopicValidator; import org.apache.rocketmq.logging.org.slf4j.Logger; import org.apache.rocketmq.logging.org.slf4j.LoggerFactory; @@ -78,7 +80,8 @@ public class TimerMetrics extends ConfigManager { return distPair.getCount().addAndGet(value); } - public long addAndGet(String topic, int value) { + public long addAndGet(MessageExt msg, int value) { + String topic = msg.getProperty(MessageConst.PROPERTY_REAL_TOPIC); Metric pair = getTopicPair(topic); getDataVersion().nextVersion(); pair.setTimeStamp(System.currentTimeMillis()); diff --git a/store/src/main/java/org/apache/rocketmq/store/timer/TimerRequest.java b/store/src/main/java/org/apache/rocketmq/store/timer/TimerRequest.java index 1dd64f7592..1b25d355c6 100644 --- a/store/src/main/java/org/apache/rocketmq/store/timer/TimerRequest.java +++ b/store/src/main/java/org/apache/rocketmq/store/timer/TimerRequest.java @@ -27,8 +27,9 @@ public class TimerRequest { private final int sizePy; private final long delayTime; - private final long enqueueTime; private final int magic; + + private long enqueueTime; private MessageExt msg; @@ -94,7 +95,9 @@ public class TimerRequest { public void setLatch(CountDownLatch latch) { this.latch = latch; } - + public void setEnqueueTime(long enqueueTime) { + this.enqueueTime = enqueueTime; + } public void idempotentRelease() { idempotentRelease(true); } diff --git a/store/src/test/java/org/apache/rocketmq/store/timer/TimerMetricsTest.java b/store/src/test/java/org/apache/rocketmq/store/timer/TimerMetricsTest.java index b7392cc455..3c7b9b67fb 100644 --- a/store/src/test/java/org/apache/rocketmq/store/timer/TimerMetricsTest.java +++ b/store/src/test/java/org/apache/rocketmq/store/timer/TimerMetricsTest.java @@ -16,6 +16,9 @@ */ package org.apache.rocketmq.store.timer; +import org.apache.rocketmq.common.message.MessageAccessor; +import org.apache.rocketmq.common.message.MessageConst; +import org.apache.rocketmq.common.message.MessageExt; import org.junit.Assert; import org.junit.Test; @@ -31,8 +34,11 @@ public class TimerMetricsTest { TimerMetrics first = new TimerMetrics(baseDir); Assert.assertTrue(first.load()); - first.addAndGet("AAA", 1000); - first.addAndGet("BBB", 2000); + MessageExt msg = new MessageExt(); + MessageAccessor.putProperty(msg, MessageConst.PROPERTY_REAL_TOPIC, "AAA"); + first.addAndGet(msg, 1000); + MessageAccessor.putProperty(msg, MessageConst.PROPERTY_REAL_TOPIC, "BBB"); + first.addAndGet(msg, 2000); Assert.assertEquals(1000, first.getTimingCount("AAA")); Assert.assertEquals(2000, first.getTimingCount("BBB")); long curr = System.currentTimeMillis();