This is an automated email from the ASF dual-hosted git repository.
lizhimin pushed a commit to branch develop
in repository https://gitbox.apache.org/repos/asf/rocketmq.git
The following commit(s) were added to refs/heads/develop by this push:
new e2a341e7f Calculate retry message throughput in pop consumption mode
(#5954)
e2a341e7f is described below
commit e2a341e7f50d606230105e7e1f5b8e734a984802
Author: SSpirits <[email protected]>
AuthorDate: Tue Jan 31 16:35:44 2023 +0800
Calculate retry message throughput in pop consumption mode (#5954)
---
.../rocketmq/broker/processor/PopMessageProcessor.java | 18 +++++++++---------
1 file changed, 9 insertions(+), 9 deletions(-)
diff --git
a/broker/src/main/java/org/apache/rocketmq/broker/processor/PopMessageProcessor.java
b/broker/src/main/java/org/apache/rocketmq/broker/processor/PopMessageProcessor.java
index e2d51857e..647d2e8a9 100644
---
a/broker/src/main/java/org/apache/rocketmq/broker/processor/PopMessageProcessor.java
+++
b/broker/src/main/java/org/apache/rocketmq/broker/processor/PopMessageProcessor.java
@@ -82,6 +82,7 @@ import org.apache.rocketmq.store.pop.AckMsg;
import org.apache.rocketmq.store.pop.PopCheckPoint;
import static
org.apache.rocketmq.broker.metrics.BrokerMetricsConstant.LABEL_CONSUMER_GROUP;
+import static
org.apache.rocketmq.broker.metrics.BrokerMetricsConstant.LABEL_IS_RETRY;
import static
org.apache.rocketmq.broker.metrics.BrokerMetricsConstant.LABEL_IS_SYSTEM;
import static
org.apache.rocketmq.broker.metrics.BrokerMetricsConstant.LABEL_TOPIC;
import static
org.apache.rocketmq.remoting.metrics.RemotingMetricsConstant.LABEL_REQUEST_CODE;
@@ -600,15 +601,14 @@ public class PopMessageProcessor implements
NettyRequestProcessor {
this.brokerController.getBrokerStatsManager().incGroupGetSize(requestHeader.getConsumerGroup(),
topic,
result.getBufferTotalSize());
- if (!isRetry) {
- Attributes attributes =
BrokerMetricsManager.newAttributesBuilder()
- .put(LABEL_TOPIC, requestHeader.getTopic())
- .put(LABEL_CONSUMER_GROUP,
requestHeader.getConsumerGroup())
- .put(LABEL_IS_SYSTEM,
TopicValidator.isSystemTopic(requestHeader.getTopic()) ||
MixAll.isSysConsumerGroup(requestHeader.getConsumerGroup()))
- .build();
-
BrokerMetricsManager.messagesOutTotal.add(getMessageResult.getMessageCount(),
attributes);
-
BrokerMetricsManager.throughputOutTotal.add(getMessageResult.getBufferTotalSize(),
attributes);
- }
+ Attributes attributes =
BrokerMetricsManager.newAttributesBuilder()
+ .put(LABEL_TOPIC, requestHeader.getTopic())
+ .put(LABEL_CONSUMER_GROUP,
requestHeader.getConsumerGroup())
+ .put(LABEL_IS_SYSTEM,
TopicValidator.isSystemTopic(requestHeader.getTopic()) ||
MixAll.isSysConsumerGroup(requestHeader.getConsumerGroup()))
+ .put(LABEL_IS_RETRY, isRetry)
+ .build();
+
BrokerMetricsManager.messagesOutTotal.add(result.getMessageCount(), attributes);
+
BrokerMetricsManager.throughputOutTotal.add(result.getBufferTotalSize(),
attributes);
if (isOrder) {
this.brokerController.getConsumerOrderInfoManager().update(isRetry, topic,