This is an automated email from the ASF dual-hosted git repository.

jinrongtong pushed a commit to branch develop
in repository https://gitbox.apache.org/repos/asf/rocketmq.git


The following commit(s) were added to refs/heads/develop by this push:
     new 88a9d939ce [ISSUE #7381] Fix the problem of inaccurate timer message 
metric (#7382)
88a9d939ce is described below

commit 88a9d939ce110381b3b418370d4711c0c214dc7f
Author: Ji Juntao <juntao....@alibaba-inc.com>
AuthorDate: Sat Sep 23 17:38:27 2023 +0800

    [ISSUE #7381] Fix the problem of inaccurate timer message metric (#7382)
    
    * correct the timerMetrics' result.
    
    * for further extension.
    
    * checkstyle.
    
    * use toLong.
---
 .../rocketmq/store/timer/TimerMessageStore.java      | 20 ++++++++++++++++----
 .../apache/rocketmq/store/timer/TimerMetrics.java    |  5 ++++-
 .../apache/rocketmq/store/timer/TimerRequest.java    |  7 +++++--
 .../rocketmq/store/timer/TimerMetricsTest.java       | 10 ++++++++--
 4 files changed, 33 insertions(+), 9 deletions(-)

diff --git 
a/store/src/main/java/org/apache/rocketmq/store/timer/TimerMessageStore.java 
b/store/src/main/java/org/apache/rocketmq/store/timer/TimerMessageStore.java
index 0d50de65ae..ac4c61cd61 100644
--- a/store/src/main/java/org/apache/rocketmq/store/timer/TimerMessageStore.java
+++ b/store/src/main/java/org/apache/rocketmq/store/timer/TimerMessageStore.java
@@ -41,6 +41,7 @@ import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicInteger;
 import java.util.function.Function;
 import org.apache.commons.collections.CollectionUtils;
+import org.apache.commons.lang3.math.NumberUtils;
 import org.apache.rocketmq.common.ServiceThread;
 import org.apache.rocketmq.common.ThreadFactoryImpl;
 import org.apache.rocketmq.common.TopicFilterType;
@@ -599,7 +600,12 @@ public class TimerMessageStore {
             if (null == msg || null == 
msg.getProperty(MessageConst.PROPERTY_REAL_TOPIC)) {
                 return;
             }
-            
timerMetrics.addAndGet(msg.getProperty(MessageConst.PROPERTY_REAL_TOPIC), 
value);
+            if (msg.getProperty(TIMER_ENQUEUE_MS) != null
+                    && NumberUtils.toLong(msg.getProperty(TIMER_ENQUEUE_MS)) 
== Long.MAX_VALUE) {
+                return;
+            }
+            // pass msg into addAndGet, for further more judgement extension.
+            timerMetrics.addAndGet(msg, value);
         } catch (Throwable t) {
             if (frequency.incrementAndGet() % 1000 == 0) {
                 LOGGER.error("error in adding metric", t);
@@ -1323,6 +1329,7 @@ public class TimerMessageStore {
                 perfCounterTicks.startTick(ENQUEUE_PUT);
                 
DefaultStoreMetricsManager.incTimerEnqueueCount(getRealTopic(req.getMsg()));
                 if (shouldRunningDequeue && req.getDelayTime() < 
currWriteTimeMs) {
+                    req.setEnqueueTime(Long.MAX_VALUE);
                     dequeuePutQueue.put(req);
                 } else {
                     boolean doEnqueueRes = doEnqueue(
@@ -1452,9 +1459,14 @@ public class TimerMessageStore {
                             }
                             try {
                                 perfCounterTicks.startTick(DEQUEUE_PUT);
-                                
DefaultStoreMetricsManager.incTimerDequeueCount(getRealTopic(tr.getMsg()));
-                                addMetric(tr.getMsg(), -1);
-                                MessageExtBrokerInner msg = 
convert(tr.getMsg(), tr.getEnqueueTime(), needRoll(tr.getMagic()));
+                                MessageExt msgExt = tr.getMsg();
+                                
DefaultStoreMetricsManager.incTimerDequeueCount(getRealTopic(msgExt));
+                                if (tr.getEnqueueTime() == Long.MAX_VALUE) {
+                                    // never enqueue, mark it.
+                                    MessageAccessor.putProperty(msgExt, 
TIMER_ENQUEUE_MS, String.valueOf(Long.MAX_VALUE));
+                                }
+                                addMetric(msgExt, -1);
+                                MessageExtBrokerInner msg = convert(msgExt, 
tr.getEnqueueTime(), needRoll(tr.getMagic()));
                                 doRes = PUT_NEED_RETRY != doPut(msg, 
needRoll(tr.getMagic()));
                                 while (!doRes && !isStopped()) {
                                     if (!isRunningDequeue()) {
diff --git 
a/store/src/main/java/org/apache/rocketmq/store/timer/TimerMetrics.java 
b/store/src/main/java/org/apache/rocketmq/store/timer/TimerMetrics.java
index e7b00cc073..7f8fedd8a5 100644
--- a/store/src/main/java/org/apache/rocketmq/store/timer/TimerMetrics.java
+++ b/store/src/main/java/org/apache/rocketmq/store/timer/TimerMetrics.java
@@ -38,6 +38,8 @@ import java.util.concurrent.locks.Lock;
 import java.util.concurrent.locks.ReentrantLock;
 import org.apache.rocketmq.common.ConfigManager;
 import org.apache.rocketmq.common.constant.LoggerName;
+import org.apache.rocketmq.common.message.MessageConst;
+import org.apache.rocketmq.common.message.MessageExt;
 import org.apache.rocketmq.common.topic.TopicValidator;
 import org.apache.rocketmq.logging.org.slf4j.Logger;
 import org.apache.rocketmq.logging.org.slf4j.LoggerFactory;
@@ -78,7 +80,8 @@ public class TimerMetrics extends ConfigManager {
         return distPair.getCount().addAndGet(value);
     }
 
-    public long addAndGet(String topic, int value) {
+    public long addAndGet(MessageExt msg, int value) {
+        String topic = msg.getProperty(MessageConst.PROPERTY_REAL_TOPIC);
         Metric pair = getTopicPair(topic);
         getDataVersion().nextVersion();
         pair.setTimeStamp(System.currentTimeMillis());
diff --git 
a/store/src/main/java/org/apache/rocketmq/store/timer/TimerRequest.java 
b/store/src/main/java/org/apache/rocketmq/store/timer/TimerRequest.java
index 1dd64f7592..1b25d355c6 100644
--- a/store/src/main/java/org/apache/rocketmq/store/timer/TimerRequest.java
+++ b/store/src/main/java/org/apache/rocketmq/store/timer/TimerRequest.java
@@ -27,8 +27,9 @@ public class TimerRequest {
     private final int sizePy;
     private final long delayTime;
 
-    private final long enqueueTime;
     private final int magic;
+
+    private long enqueueTime;
     private MessageExt msg;
 
 
@@ -94,7 +95,9 @@ public class TimerRequest {
     public void setLatch(CountDownLatch latch) {
         this.latch = latch;
     }
-
+    public void setEnqueueTime(long enqueueTime) {
+        this.enqueueTime = enqueueTime;
+    }
     public void idempotentRelease() {
         idempotentRelease(true);
     }
diff --git 
a/store/src/test/java/org/apache/rocketmq/store/timer/TimerMetricsTest.java 
b/store/src/test/java/org/apache/rocketmq/store/timer/TimerMetricsTest.java
index b7392cc455..3c7b9b67fb 100644
--- a/store/src/test/java/org/apache/rocketmq/store/timer/TimerMetricsTest.java
+++ b/store/src/test/java/org/apache/rocketmq/store/timer/TimerMetricsTest.java
@@ -16,6 +16,9 @@
  */
 package org.apache.rocketmq.store.timer;
 
+import org.apache.rocketmq.common.message.MessageAccessor;
+import org.apache.rocketmq.common.message.MessageConst;
+import org.apache.rocketmq.common.message.MessageExt;
 import org.junit.Assert;
 import org.junit.Test;
 
@@ -31,8 +34,11 @@ public class TimerMetricsTest {
 
         TimerMetrics first = new TimerMetrics(baseDir);
         Assert.assertTrue(first.load());
-        first.addAndGet("AAA", 1000);
-        first.addAndGet("BBB", 2000);
+        MessageExt msg = new MessageExt();
+        MessageAccessor.putProperty(msg, MessageConst.PROPERTY_REAL_TOPIC, 
"AAA");
+        first.addAndGet(msg, 1000);
+        MessageAccessor.putProperty(msg, MessageConst.PROPERTY_REAL_TOPIC, 
"BBB");
+        first.addAndGet(msg, 2000);
         Assert.assertEquals(1000, first.getTimingCount("AAA"));
         Assert.assertEquals(2000, first.getTimingCount("BBB"));
         long curr = System.currentTimeMillis();

Reply via email to