This is an automated email from the ASF dual-hosted git repository. jinrongtong pushed a commit to branch develop in repository https://gitbox.apache.org/repos/asf/rocketmq.git
The following commit(s) were added to refs/heads/develop by this push: new 9aa081b8ac [ISSUE #8988] Support dispatchBehindMilliseconds (#8989) 9aa081b8ac is described below commit 9aa081b8acfd01a40f50bd9c3face3c0d2c530b1 Author: guyinyou <36399867+guyin...@users.noreply.github.com> AuthorDate: Tue Dec 10 20:05:44 2024 +0800 [ISSUE #8988] Support dispatchBehindMilliseconds (#8989) * support dispatchBehindMilliseconds * Modify the initial value of currentReputTimestamp --------- Co-authored-by: guyinyou <guyinyou....@alibaba-inc.com> --- .../apache/rocketmq/store/DefaultMessageStore.java | 25 +++++++++++++++++++++- .../org/apache/rocketmq/store/MessageStore.java | 7 ++++++ .../store/plugin/AbstractPluginMessageStore.java | 5 +++++ 3 files changed, 36 insertions(+), 1 deletion(-) diff --git a/store/src/main/java/org/apache/rocketmq/store/DefaultMessageStore.java b/store/src/main/java/org/apache/rocketmq/store/DefaultMessageStore.java index 6b8ea0ee8a..9d3c46a438 100644 --- a/store/src/main/java/org/apache/rocketmq/store/DefaultMessageStore.java +++ b/store/src/main/java/org/apache/rocketmq/store/DefaultMessageStore.java @@ -1556,6 +1556,10 @@ public class DefaultMessageStore implements MessageStore { public long dispatchBehindBytes() { return this.reputMessageService.behind(); } + @Override + public long dispatchBehindMilliseconds() { + return this.reputMessageService.behindMs(); + } public long flushBehindBytes() { if (this.messageStoreConfig.isTransientStorePoolEnable()) { @@ -2793,6 +2797,7 @@ public class DefaultMessageStore implements MessageStore { class ReputMessageService extends ServiceThread { protected volatile long reputFromOffset = 0; + protected volatile long currentReputTimestamp = System.currentTimeMillis(); public long getReputFromOffset() { return reputFromOffset; @@ -2802,6 +2807,10 @@ public class DefaultMessageStore implements MessageStore { this.reputFromOffset = reputFromOffset; } + public long getCurrentReputTimestamp() { + return currentReputTimestamp; + } + @Override public void shutdown() { for (int i = 0; i < 50 && this.isCommitLogAvailable(); i++) { @@ -2824,6 +2833,15 @@ public class DefaultMessageStore implements MessageStore { return DefaultMessageStore.this.getConfirmOffset() - this.reputFromOffset; } + public long behindMs() { + long lastCommitLogFileTimeStamp = System.currentTimeMillis(); + MappedFile lastMappedFile = DefaultMessageStore.this.commitLog.getMappedFileQueue().getLastMappedFile(); + if (lastMappedFile != null) { + lastCommitLogFileTimeStamp = lastMappedFile.getStoreTimestamp(); + } + return Math.max(0, lastCommitLogFileTimeStamp - this.currentReputTimestamp); + } + public boolean isCommitLogAvailable() { return this.reputFromOffset < getReputEndOffset(); } @@ -2838,7 +2856,11 @@ public class DefaultMessageStore implements MessageStore { this.reputFromOffset, DefaultMessageStore.this.commitLog.getMinOffset()); this.reputFromOffset = DefaultMessageStore.this.commitLog.getMinOffset(); } - for (boolean doNext = true; this.isCommitLogAvailable() && doNext; ) { + boolean isCommitLogAvailable = isCommitLogAvailable(); + if (!isCommitLogAvailable) { + currentReputTimestamp = System.currentTimeMillis(); + } + for (boolean doNext = true; isCommitLogAvailable && doNext; ) { SelectMappedBufferResult result = DefaultMessageStore.this.commitLog.getData(reputFromOffset); @@ -2861,6 +2883,7 @@ public class DefaultMessageStore implements MessageStore { if (dispatchRequest.isSuccess()) { if (size > 0) { + currentReputTimestamp = dispatchRequest.getStoreTimestamp(); DefaultMessageStore.this.doDispatch(dispatchRequest); if (!notifyMessageArriveInBatch) { diff --git a/store/src/main/java/org/apache/rocketmq/store/MessageStore.java b/store/src/main/java/org/apache/rocketmq/store/MessageStore.java index 5c3984e5b2..4bbee142a1 100644 --- a/store/src/main/java/org/apache/rocketmq/store/MessageStore.java +++ b/store/src/main/java/org/apache/rocketmq/store/MessageStore.java @@ -511,6 +511,13 @@ public interface MessageStore { */ long dispatchBehindBytes(); + /** + * Get number of the milliseconds that have been stored in commit log and not yet dispatched to consume queue. + * + * @return number of the milliseconds to dispatch. + */ + long dispatchBehindMilliseconds(); + /** * Flush the message store to persist all data. * diff --git a/store/src/main/java/org/apache/rocketmq/store/plugin/AbstractPluginMessageStore.java b/store/src/main/java/org/apache/rocketmq/store/plugin/AbstractPluginMessageStore.java index 0f57a17d46..d5d6236458 100644 --- a/store/src/main/java/org/apache/rocketmq/store/plugin/AbstractPluginMessageStore.java +++ b/store/src/main/java/org/apache/rocketmq/store/plugin/AbstractPluginMessageStore.java @@ -293,6 +293,11 @@ public abstract class AbstractPluginMessageStore implements MessageStore { return next.dispatchBehindBytes(); } + @Override + public long dispatchBehindMilliseconds() { + return next.dispatchBehindMilliseconds(); + } + @Override public long flush() { return next.flush();