This is an automated email from the ASF dual-hosted git repository. dinglei pushed a commit to branch develop in repository https://gitbox.apache.org/repos/asf/rocketmq.git
The following commit(s) were added to refs/heads/develop by this push: new cb7fa3e0b0 Revise the measurement method of GROUP_GET_LATENCY to reveal its intended semantics (#7808) cb7fa3e0b0 is described below commit cb7fa3e0b015ec1bf74b95a98e7ef84eaf83b52d Author: rongtong <jinrongto...@163.com> AuthorDate: Sun Feb 4 10:27:21 2024 +0800 Revise the measurement method of GROUP_GET_LATENCY to reveal its intended semantics (#7808) --- .../apache/rocketmq/broker/plugin/PullMessageResultHandler.java | 3 ++- .../rocketmq/broker/processor/DefaultPullMessageResultHandler.java | 5 ++--- .../org/apache/rocketmq/broker/processor/PeekMessageProcessor.java | 2 +- .../org/apache/rocketmq/broker/processor/PopMessageProcessor.java | 2 +- .../org/apache/rocketmq/broker/processor/PullMessageProcessor.java | 7 +++++-- 5 files changed, 11 insertions(+), 8 deletions(-) diff --git a/broker/src/main/java/org/apache/rocketmq/broker/plugin/PullMessageResultHandler.java b/broker/src/main/java/org/apache/rocketmq/broker/plugin/PullMessageResultHandler.java index 0b9f4295c2..bddb57f150 100644 --- a/broker/src/main/java/org/apache/rocketmq/broker/plugin/PullMessageResultHandler.java +++ b/broker/src/main/java/org/apache/rocketmq/broker/plugin/PullMessageResultHandler.java @@ -51,5 +51,6 @@ public interface PullMessageResultHandler { final boolean brokerAllowSuspend, final MessageFilter messageFilter, final RemotingCommand response, - final TopicQueueMappingContext mappingContext); + final TopicQueueMappingContext mappingContext, + final long beginTimeMills); } diff --git a/broker/src/main/java/org/apache/rocketmq/broker/processor/DefaultPullMessageResultHandler.java b/broker/src/main/java/org/apache/rocketmq/broker/processor/DefaultPullMessageResultHandler.java index 913e1a96c4..43b66b4c51 100644 --- a/broker/src/main/java/org/apache/rocketmq/broker/processor/DefaultPullMessageResultHandler.java +++ b/broker/src/main/java/org/apache/rocketmq/broker/processor/DefaultPullMessageResultHandler.java @@ -84,7 +84,8 @@ public class DefaultPullMessageResultHandler implements PullMessageResultHandler final boolean brokerAllowSuspend, final MessageFilter messageFilter, RemotingCommand response, - TopicQueueMappingContext mappingContext) { + TopicQueueMappingContext mappingContext, + long beginTimeMills) { PullMessageProcessor processor = brokerController.getPullMessageProcessor(); final String clientAddress = RemotingHelper.parseChannelRemoteAddr(channel); TopicConfig topicConfig = this.brokerController.getTopicConfigManager().selectTopicConfig(requestHeader.getTopic()); @@ -137,8 +138,6 @@ public class DefaultPullMessageResultHandler implements PullMessageResultHandler } if (this.brokerController.getBrokerConfig().isTransferMsgByHeap()) { - - final long beginTimeMills = this.brokerController.getMessageStore().now(); final byte[] r = this.readGetMessageResult(getMessageResult, requestHeader.getConsumerGroup(), requestHeader.getTopic(), requestHeader.getQueueId()); this.brokerController.getBrokerStatsManager().incGroupGetLatency(requestHeader.getConsumerGroup(), requestHeader.getTopic(), requestHeader.getQueueId(), diff --git a/broker/src/main/java/org/apache/rocketmq/broker/processor/PeekMessageProcessor.java b/broker/src/main/java/org/apache/rocketmq/broker/processor/PeekMessageProcessor.java index a72759883c..55552003d8 100644 --- a/broker/src/main/java/org/apache/rocketmq/broker/processor/PeekMessageProcessor.java +++ b/broker/src/main/java/org/apache/rocketmq/broker/processor/PeekMessageProcessor.java @@ -81,6 +81,7 @@ public class PeekMessageProcessor implements NettyRequestProcessor { private RemotingCommand processRequest(final Channel channel, RemotingCommand request, boolean brokerAllowSuspend) throws RemotingCommandException { + final long beginTimeMills = this.brokerController.getMessageStore().now(); RemotingCommand response = RemotingCommand.createResponseCommand(PopMessageResponseHeader.class); final PopMessageResponseHeader responseHeader = (PopMessageResponseHeader) response.readCustomHeader(); final PeekMessageRequestHeader requestHeader = @@ -188,7 +189,6 @@ public class PeekMessageProcessor implements NettyRequestProcessor { this.brokerController.getBrokerStatsManager().incBrokerGetNums(requestHeader.getTopic(), getMessageResult.getMessageCount()); if (this.brokerController.getBrokerConfig().isTransferMsgByHeap()) { - final long beginTimeMills = this.brokerController.getMessageStore().now(); final byte[] r = this.readGetMessageResult(getMessageResult, requestHeader.getConsumerGroup(), requestHeader.getTopic(), requestHeader.getQueueId()); this.brokerController.getBrokerStatsManager().incGroupGetLatency(requestHeader.getConsumerGroup(), requestHeader.getTopic(), requestHeader.getQueueId(), diff --git a/broker/src/main/java/org/apache/rocketmq/broker/processor/PopMessageProcessor.java b/broker/src/main/java/org/apache/rocketmq/broker/processor/PopMessageProcessor.java index 105e11643f..59ff2e0fd5 100644 --- a/broker/src/main/java/org/apache/rocketmq/broker/processor/PopMessageProcessor.java +++ b/broker/src/main/java/org/apache/rocketmq/broker/processor/PopMessageProcessor.java @@ -196,6 +196,7 @@ public class PopMessageProcessor implements NettyRequestProcessor { @Override public RemotingCommand processRequest(final ChannelHandlerContext ctx, RemotingCommand request) throws RemotingCommandException { + final long beginTimeMills = this.brokerController.getMessageStore().now(); request.addExtFieldIfNotExist(BORN_TIME, String.valueOf(System.currentTimeMillis())); if (Objects.equals(request.getExtFields().get(BORN_TIME), "0")) { request.addExtField(BORN_TIME, String.valueOf(System.currentTimeMillis())); @@ -435,7 +436,6 @@ public class PopMessageProcessor implements NettyRequestProcessor { switch (finalResponse.getCode()) { case ResponseCode.SUCCESS: if (this.brokerController.getBrokerConfig().isTransferMsgByHeap()) { - final long beginTimeMills = this.brokerController.getMessageStore().now(); final byte[] r = this.readGetMessageResult(getMessageResult, requestHeader.getConsumerGroup(), requestHeader.getTopic(), requestHeader.getQueueId()); this.brokerController.getBrokerStatsManager().incGroupGetLatency(requestHeader.getConsumerGroup(), diff --git a/broker/src/main/java/org/apache/rocketmq/broker/processor/PullMessageProcessor.java b/broker/src/main/java/org/apache/rocketmq/broker/processor/PullMessageProcessor.java index ea9c327e98..d53454f215 100644 --- a/broker/src/main/java/org/apache/rocketmq/broker/processor/PullMessageProcessor.java +++ b/broker/src/main/java/org/apache/rocketmq/broker/processor/PullMessageProcessor.java @@ -300,6 +300,7 @@ public class PullMessageProcessor implements NettyRequestProcessor { private RemotingCommand processRequest(final Channel channel, RemotingCommand request, boolean brokerAllowSuspend, boolean brokerAllowFlowCtrSuspend) throws RemotingCommandException { + final long beginTimeMills = this.brokerController.getMessageStore().now(); RemotingCommand response = RemotingCommand.createResponseCommand(PullMessageResponseHeader.class); final PullMessageResponseHeader responseHeader = (PullMessageResponseHeader) response.readCustomHeader(); final PullMessageRequestHeader requestHeader = @@ -555,7 +556,8 @@ public class PullMessageProcessor implements NettyRequestProcessor { brokerAllowSuspend, messageFilter, finalResponse, - mappingContext + mappingContext, + beginTimeMills ); }) .thenAccept(result -> NettyRemotingAbstract.writeResponse(channel, request, result)); @@ -574,7 +576,8 @@ public class PullMessageProcessor implements NettyRequestProcessor { brokerAllowSuspend, messageFilter, response, - mappingContext + mappingContext, + beginTimeMills ); } return null;