This is an automated email from the ASF dual-hosted git repository.

dinglei pushed a commit to branch develop
in repository https://gitbox.apache.org/repos/asf/rocketmq.git


The following commit(s) were added to refs/heads/develop by this push:
     new cb7fa3e0b0 Revise the measurement method of GROUP_GET_LATENCY to 
reveal its intended semantics (#7808)
cb7fa3e0b0 is described below

commit cb7fa3e0b015ec1bf74b95a98e7ef84eaf83b52d
Author: rongtong <jinrongto...@163.com>
AuthorDate: Sun Feb 4 10:27:21 2024 +0800

    Revise the measurement method of GROUP_GET_LATENCY to reveal its intended 
semantics (#7808)
---
 .../apache/rocketmq/broker/plugin/PullMessageResultHandler.java    | 3 ++-
 .../rocketmq/broker/processor/DefaultPullMessageResultHandler.java | 5 ++---
 .../org/apache/rocketmq/broker/processor/PeekMessageProcessor.java | 2 +-
 .../org/apache/rocketmq/broker/processor/PopMessageProcessor.java  | 2 +-
 .../org/apache/rocketmq/broker/processor/PullMessageProcessor.java | 7 +++++--
 5 files changed, 11 insertions(+), 8 deletions(-)

diff --git 
a/broker/src/main/java/org/apache/rocketmq/broker/plugin/PullMessageResultHandler.java
 
b/broker/src/main/java/org/apache/rocketmq/broker/plugin/PullMessageResultHandler.java
index 0b9f4295c2..bddb57f150 100644
--- 
a/broker/src/main/java/org/apache/rocketmq/broker/plugin/PullMessageResultHandler.java
+++ 
b/broker/src/main/java/org/apache/rocketmq/broker/plugin/PullMessageResultHandler.java
@@ -51,5 +51,6 @@ public interface PullMessageResultHandler {
                            final boolean brokerAllowSuspend,
                            final MessageFilter messageFilter,
                            final RemotingCommand response,
-                           final TopicQueueMappingContext mappingContext);
+                           final TopicQueueMappingContext mappingContext,
+                           final long beginTimeMills);
 }
diff --git 
a/broker/src/main/java/org/apache/rocketmq/broker/processor/DefaultPullMessageResultHandler.java
 
b/broker/src/main/java/org/apache/rocketmq/broker/processor/DefaultPullMessageResultHandler.java
index 913e1a96c4..43b66b4c51 100644
--- 
a/broker/src/main/java/org/apache/rocketmq/broker/processor/DefaultPullMessageResultHandler.java
+++ 
b/broker/src/main/java/org/apache/rocketmq/broker/processor/DefaultPullMessageResultHandler.java
@@ -84,7 +84,8 @@ public class DefaultPullMessageResultHandler implements 
PullMessageResultHandler
         final boolean brokerAllowSuspend,
         final MessageFilter messageFilter,
         RemotingCommand response,
-        TopicQueueMappingContext mappingContext) {
+        TopicQueueMappingContext mappingContext,
+        long beginTimeMills) {
         PullMessageProcessor processor = 
brokerController.getPullMessageProcessor();
         final String clientAddress = 
RemotingHelper.parseChannelRemoteAddr(channel);
         TopicConfig topicConfig = 
this.brokerController.getTopicConfigManager().selectTopicConfig(requestHeader.getTopic());
@@ -137,8 +138,6 @@ public class DefaultPullMessageResultHandler implements 
PullMessageResultHandler
                 }
 
                 if 
(this.brokerController.getBrokerConfig().isTransferMsgByHeap()) {
-
-                    final long beginTimeMills = 
this.brokerController.getMessageStore().now();
                     final byte[] r = 
this.readGetMessageResult(getMessageResult, requestHeader.getConsumerGroup(), 
requestHeader.getTopic(), requestHeader.getQueueId());
                     
this.brokerController.getBrokerStatsManager().incGroupGetLatency(requestHeader.getConsumerGroup(),
                         requestHeader.getTopic(), requestHeader.getQueueId(),
diff --git 
a/broker/src/main/java/org/apache/rocketmq/broker/processor/PeekMessageProcessor.java
 
b/broker/src/main/java/org/apache/rocketmq/broker/processor/PeekMessageProcessor.java
index a72759883c..55552003d8 100644
--- 
a/broker/src/main/java/org/apache/rocketmq/broker/processor/PeekMessageProcessor.java
+++ 
b/broker/src/main/java/org/apache/rocketmq/broker/processor/PeekMessageProcessor.java
@@ -81,6 +81,7 @@ public class PeekMessageProcessor implements 
NettyRequestProcessor {
 
     private RemotingCommand processRequest(final Channel channel, 
RemotingCommand request, boolean brokerAllowSuspend)
         throws RemotingCommandException {
+        final long beginTimeMills = 
this.brokerController.getMessageStore().now();
         RemotingCommand response = 
RemotingCommand.createResponseCommand(PopMessageResponseHeader.class);
         final PopMessageResponseHeader responseHeader = 
(PopMessageResponseHeader) response.readCustomHeader();
         final PeekMessageRequestHeader requestHeader =
@@ -188,7 +189,6 @@ public class PeekMessageProcessor implements 
NettyRequestProcessor {
                 
this.brokerController.getBrokerStatsManager().incBrokerGetNums(requestHeader.getTopic(),
 getMessageResult.getMessageCount());
 
                 if 
(this.brokerController.getBrokerConfig().isTransferMsgByHeap()) {
-                    final long beginTimeMills = 
this.brokerController.getMessageStore().now();
                     final byte[] r = 
this.readGetMessageResult(getMessageResult, requestHeader.getConsumerGroup(), 
requestHeader.getTopic(), requestHeader.getQueueId());
                     
this.brokerController.getBrokerStatsManager().incGroupGetLatency(requestHeader.getConsumerGroup(),
                         requestHeader.getTopic(), requestHeader.getQueueId(),
diff --git 
a/broker/src/main/java/org/apache/rocketmq/broker/processor/PopMessageProcessor.java
 
b/broker/src/main/java/org/apache/rocketmq/broker/processor/PopMessageProcessor.java
index 105e11643f..59ff2e0fd5 100644
--- 
a/broker/src/main/java/org/apache/rocketmq/broker/processor/PopMessageProcessor.java
+++ 
b/broker/src/main/java/org/apache/rocketmq/broker/processor/PopMessageProcessor.java
@@ -196,6 +196,7 @@ public class PopMessageProcessor implements 
NettyRequestProcessor {
     @Override
     public RemotingCommand processRequest(final ChannelHandlerContext ctx, 
RemotingCommand request)
         throws RemotingCommandException {
+        final long beginTimeMills = 
this.brokerController.getMessageStore().now();
         request.addExtFieldIfNotExist(BORN_TIME, 
String.valueOf(System.currentTimeMillis()));
         if (Objects.equals(request.getExtFields().get(BORN_TIME), "0")) {
             request.addExtField(BORN_TIME, 
String.valueOf(System.currentTimeMillis()));
@@ -435,7 +436,6 @@ public class PopMessageProcessor implements 
NettyRequestProcessor {
             switch (finalResponse.getCode()) {
                 case ResponseCode.SUCCESS:
                     if 
(this.brokerController.getBrokerConfig().isTransferMsgByHeap()) {
-                        final long beginTimeMills = 
this.brokerController.getMessageStore().now();
                         final byte[] r = 
this.readGetMessageResult(getMessageResult, requestHeader.getConsumerGroup(),
                             requestHeader.getTopic(), 
requestHeader.getQueueId());
                         
this.brokerController.getBrokerStatsManager().incGroupGetLatency(requestHeader.getConsumerGroup(),
diff --git 
a/broker/src/main/java/org/apache/rocketmq/broker/processor/PullMessageProcessor.java
 
b/broker/src/main/java/org/apache/rocketmq/broker/processor/PullMessageProcessor.java
index ea9c327e98..d53454f215 100644
--- 
a/broker/src/main/java/org/apache/rocketmq/broker/processor/PullMessageProcessor.java
+++ 
b/broker/src/main/java/org/apache/rocketmq/broker/processor/PullMessageProcessor.java
@@ -300,6 +300,7 @@ public class PullMessageProcessor implements 
NettyRequestProcessor {
 
     private RemotingCommand processRequest(final Channel channel, 
RemotingCommand request, boolean brokerAllowSuspend, boolean 
brokerAllowFlowCtrSuspend)
         throws RemotingCommandException {
+        final long beginTimeMills = 
this.brokerController.getMessageStore().now();
         RemotingCommand response = 
RemotingCommand.createResponseCommand(PullMessageResponseHeader.class);
         final PullMessageResponseHeader responseHeader = 
(PullMessageResponseHeader) response.readCustomHeader();
         final PullMessageRequestHeader requestHeader =
@@ -555,7 +556,8 @@ public class PullMessageProcessor implements 
NettyRequestProcessor {
                             brokerAllowSuspend,
                             messageFilter,
                             finalResponse,
-                            mappingContext
+                            mappingContext,
+                            beginTimeMills
                         );
                     })
                     .thenAccept(result -> 
NettyRemotingAbstract.writeResponse(channel, request, result));
@@ -574,7 +576,8 @@ public class PullMessageProcessor implements 
NettyRequestProcessor {
                 brokerAllowSuspend,
                 messageFilter,
                 response,
-                mappingContext
+                mappingContext,
+                beginTimeMills
             );
         }
         return null;

Reply via email to