This is an automated email from the ASF dual-hosted git repository.

jinrongtong pushed a commit to branch develop
in repository https://gitbox.apache.org/repos/asf/rocketmq.git


The following commit(s) were added to refs/heads/develop by this push:
     new 9aa081b8ac [ISSUE #8988] Support dispatchBehindMilliseconds (#8989)
9aa081b8ac is described below

commit 9aa081b8acfd01a40f50bd9c3face3c0d2c530b1
Author: guyinyou <36399867+guyin...@users.noreply.github.com>
AuthorDate: Tue Dec 10 20:05:44 2024 +0800

    [ISSUE #8988] Support dispatchBehindMilliseconds (#8989)
    
    * support dispatchBehindMilliseconds
    
    * Modify the initial value of currentReputTimestamp
    
    ---------
    
    Co-authored-by: guyinyou <guyinyou....@alibaba-inc.com>
---
 .../apache/rocketmq/store/DefaultMessageStore.java | 25 +++++++++++++++++++++-
 .../org/apache/rocketmq/store/MessageStore.java    |  7 ++++++
 .../store/plugin/AbstractPluginMessageStore.java   |  5 +++++
 3 files changed, 36 insertions(+), 1 deletion(-)

diff --git 
a/store/src/main/java/org/apache/rocketmq/store/DefaultMessageStore.java 
b/store/src/main/java/org/apache/rocketmq/store/DefaultMessageStore.java
index 6b8ea0ee8a..9d3c46a438 100644
--- a/store/src/main/java/org/apache/rocketmq/store/DefaultMessageStore.java
+++ b/store/src/main/java/org/apache/rocketmq/store/DefaultMessageStore.java
@@ -1556,6 +1556,10 @@ public class DefaultMessageStore implements MessageStore 
{
     public long dispatchBehindBytes() {
         return this.reputMessageService.behind();
     }
+    @Override
+    public long dispatchBehindMilliseconds() {
+        return this.reputMessageService.behindMs();
+    }
 
     public long flushBehindBytes() {
         if (this.messageStoreConfig.isTransientStorePoolEnable()) {
@@ -2793,6 +2797,7 @@ public class DefaultMessageStore implements MessageStore {
     class ReputMessageService extends ServiceThread {
 
         protected volatile long reputFromOffset = 0;
+        protected volatile long currentReputTimestamp = 
System.currentTimeMillis();
 
         public long getReputFromOffset() {
             return reputFromOffset;
@@ -2802,6 +2807,10 @@ public class DefaultMessageStore implements MessageStore 
{
             this.reputFromOffset = reputFromOffset;
         }
 
+        public long getCurrentReputTimestamp() {
+            return currentReputTimestamp;
+        }
+
         @Override
         public void shutdown() {
             for (int i = 0; i < 50 && this.isCommitLogAvailable(); i++) {
@@ -2824,6 +2833,15 @@ public class DefaultMessageStore implements MessageStore 
{
             return DefaultMessageStore.this.getConfirmOffset() - 
this.reputFromOffset;
         }
 
+        public long behindMs() {
+            long lastCommitLogFileTimeStamp = System.currentTimeMillis();
+            MappedFile lastMappedFile = 
DefaultMessageStore.this.commitLog.getMappedFileQueue().getLastMappedFile();
+            if (lastMappedFile != null) {
+                lastCommitLogFileTimeStamp = 
lastMappedFile.getStoreTimestamp();
+            }
+            return Math.max(0, lastCommitLogFileTimeStamp - 
this.currentReputTimestamp);
+        }
+
         public boolean isCommitLogAvailable() {
             return this.reputFromOffset < getReputEndOffset();
         }
@@ -2838,7 +2856,11 @@ public class DefaultMessageStore implements MessageStore 
{
                     this.reputFromOffset, 
DefaultMessageStore.this.commitLog.getMinOffset());
                 this.reputFromOffset = 
DefaultMessageStore.this.commitLog.getMinOffset();
             }
-            for (boolean doNext = true; this.isCommitLogAvailable() && doNext; 
) {
+            boolean isCommitLogAvailable = isCommitLogAvailable();
+            if (!isCommitLogAvailable) {
+                currentReputTimestamp = System.currentTimeMillis();
+            }
+            for (boolean doNext = true; isCommitLogAvailable && doNext; ) {
 
                 SelectMappedBufferResult result = 
DefaultMessageStore.this.commitLog.getData(reputFromOffset);
 
@@ -2861,6 +2883,7 @@ public class DefaultMessageStore implements MessageStore {
 
                         if (dispatchRequest.isSuccess()) {
                             if (size > 0) {
+                                currentReputTimestamp = 
dispatchRequest.getStoreTimestamp();
                                 
DefaultMessageStore.this.doDispatch(dispatchRequest);
 
                                 if (!notifyMessageArriveInBatch) {
diff --git a/store/src/main/java/org/apache/rocketmq/store/MessageStore.java 
b/store/src/main/java/org/apache/rocketmq/store/MessageStore.java
index 5c3984e5b2..4bbee142a1 100644
--- a/store/src/main/java/org/apache/rocketmq/store/MessageStore.java
+++ b/store/src/main/java/org/apache/rocketmq/store/MessageStore.java
@@ -511,6 +511,13 @@ public interface MessageStore {
      */
     long dispatchBehindBytes();
 
+    /**
+     * Get number of the milliseconds that have been stored in commit log and 
not yet dispatched to consume queue.
+     *
+     * @return number of the milliseconds to dispatch.
+     */
+    long dispatchBehindMilliseconds();
+
     /**
      * Flush the message store to persist all data.
      *
diff --git 
a/store/src/main/java/org/apache/rocketmq/store/plugin/AbstractPluginMessageStore.java
 
b/store/src/main/java/org/apache/rocketmq/store/plugin/AbstractPluginMessageStore.java
index 0f57a17d46..d5d6236458 100644
--- 
a/store/src/main/java/org/apache/rocketmq/store/plugin/AbstractPluginMessageStore.java
+++ 
b/store/src/main/java/org/apache/rocketmq/store/plugin/AbstractPluginMessageStore.java
@@ -293,6 +293,11 @@ public abstract class AbstractPluginMessageStore 
implements MessageStore {
         return next.dispatchBehindBytes();
     }
 
+    @Override
+    public long dispatchBehindMilliseconds() {
+        return next.dispatchBehindMilliseconds();
+    }
+
     @Override
     public long flush() {
         return next.flush();

Reply via email to