This is an automated email from the ASF dual-hosted git repository. jinrongtong pushed a commit to branch develop in repository https://gitbox.apache.org/repos/asf/rocketmq.git
The following commit(s) were added to refs/heads/develop by this push: new 6d7513425c [RIP-46] Enhanced metrics for timing and transactional messages (#7500) 6d7513425c is described below commit 6d7513425c2aeb17e527be9d0d98d47f7251927d Author: Ji Juntao <juntao....@alibaba-inc.com> AuthorDate: Mon Jan 22 16:56:05 2024 +0800 [RIP-46] Enhanced metrics for timing and transactional messages (#7500) * add request codes' distribution and timing messages' distribution * remove the requestCode distribution. * add delay message latency distribution. * add transaction metrics * transaction metric of topics finished, v1. * add the transaction metrics, to be tested. * fix the judgement of putMessageResult * optimize. * add config. * fix test case. * add unit tests for transactionMetrics. * remove chinese character * add rocksdb metrics. * add more rocksdb metrics. * fix NPE * avoid the total time is 0. * add license * remove useless import. --- .../apache/rocketmq/broker/BrokerController.java | 9 + .../rocketmq/broker/BrokerPathConfigHelper.java | 3 + .../broker/metrics/BrokerMetricsConstant.java | 5 + .../broker/metrics/BrokerMetricsManager.java | 72 +++++- .../broker/processor/EndTransactionProcessor.java | 18 ++ .../broker/processor/SendMessageProcessor.java | 26 ++- .../broker/transaction/TransactionMetrics.java | 259 +++++++++++++++++++++ .../TransactionMetricsFlushService.java | 55 +++++ .../transaction/TransactionalMessageService.java | 5 + .../DefaultTransactionalMessageCheckListener.java | 2 + .../queue/TransactionalMessageServiceImpl.java | 19 ++ .../processor/EndTransactionProcessorTest.java | 5 + .../transaction/queue/TransactionMetricsTest.java | 83 +++++++ .../util/TransactionalMessageServiceImpl.java | 11 + .../org/apache/rocketmq/common/BrokerConfig.java | 10 + .../org/apache/rocketmq/common/ConfigManager.java | 11 +- .../common/config/RocksDBConfigManager.java | 13 +- .../common/metrics/NopObservableDoubleGauge.java | 22 ++ .../remoting/metrics/RemotingMetricsConstant.java | 1 - .../apache/rocketmq/store/RocksDBMessageStore.java | 12 + .../store/metrics/DefaultStoreMetricsConstant.java | 12 + .../store/metrics/DefaultStoreMetricsManager.java | 63 ++++- .../store/metrics/RocksDBStoreMetricsManager.java | 154 ++++++++++++ .../store/queue/RocksDBConsumeQueueStore.java | 4 + .../rocketmq/store/timer/TimerMessageStore.java | 5 + 25 files changed, 856 insertions(+), 23 deletions(-) diff --git a/broker/src/main/java/org/apache/rocketmq/broker/BrokerController.java b/broker/src/main/java/org/apache/rocketmq/broker/BrokerController.java index 8d29d44383..af90e5f87e 100644 --- a/broker/src/main/java/org/apache/rocketmq/broker/BrokerController.java +++ b/broker/src/main/java/org/apache/rocketmq/broker/BrokerController.java @@ -104,6 +104,7 @@ import org.apache.rocketmq.broker.topic.TopicQueueMappingCleanService; import org.apache.rocketmq.broker.topic.TopicQueueMappingManager; import org.apache.rocketmq.broker.topic.TopicRouteInfoManager; import org.apache.rocketmq.broker.transaction.AbstractTransactionalMessageCheckListener; +import org.apache.rocketmq.broker.transaction.TransactionMetricsFlushService; import org.apache.rocketmq.broker.transaction.TransactionalMessageCheckService; import org.apache.rocketmq.broker.transaction.TransactionalMessageService; import org.apache.rocketmq.broker.transaction.queue.DefaultTransactionalMessageCheckListener; @@ -277,6 +278,7 @@ public class BrokerController { private BrokerMetricsManager brokerMetricsManager; private ColdDataPullRequestHoldService coldDataPullRequestHoldService; private ColdDataCgCtrService coldDataCgCtrService; + private TransactionMetricsFlushService transactionMetricsFlushService; public BrokerController( final BrokerConfig brokerConfig, @@ -963,6 +965,9 @@ public class BrokerController { } this.transactionalMessageCheckListener.setBrokerController(this); this.transactionalMessageCheckService = new TransactionalMessageCheckService(this); + this.transactionMetricsFlushService = new TransactionMetricsFlushService(this); + this.transactionMetricsFlushService.start(); + } private void initialAcl() { @@ -1440,6 +1445,10 @@ public class BrokerController { this.endTransactionExecutor.shutdown(); } + if (this.transactionMetricsFlushService != null) { + this.transactionMetricsFlushService.shutdown(); + } + if (this.escapeBridge != null) { escapeBridge.shutdown(); } diff --git a/broker/src/main/java/org/apache/rocketmq/broker/BrokerPathConfigHelper.java b/broker/src/main/java/org/apache/rocketmq/broker/BrokerPathConfigHelper.java index cea321ef78..0b2f52f32e 100644 --- a/broker/src/main/java/org/apache/rocketmq/broker/BrokerPathConfigHelper.java +++ b/broker/src/main/java/org/apache/rocketmq/broker/BrokerPathConfigHelper.java @@ -60,6 +60,9 @@ public class BrokerPathConfigHelper { public static String getTimerMetricsPath(final String rootDir) { return rootDir + File.separator + "config" + File.separator + "timermetrics"; } + public static String getTransactionMetricsPath(final String rootDir) { + return rootDir + File.separator + "config" + File.separator + "transactionMetrics"; + } public static String getConsumerFilterPath(final String rootDir) { return rootDir + File.separator + "config" + File.separator + "consumerFilter.json"; diff --git a/broker/src/main/java/org/apache/rocketmq/broker/metrics/BrokerMetricsConstant.java b/broker/src/main/java/org/apache/rocketmq/broker/metrics/BrokerMetricsConstant.java index 73b40f6ba5..5733aa40ba 100644 --- a/broker/src/main/java/org/apache/rocketmq/broker/metrics/BrokerMetricsConstant.java +++ b/broker/src/main/java/org/apache/rocketmq/broker/metrics/BrokerMetricsConstant.java @@ -38,6 +38,11 @@ public class BrokerMetricsConstant { public static final String GAUGE_CONSUMER_READY_MESSAGES = "rocketmq_consumer_ready_messages"; public static final String COUNTER_CONSUMER_SEND_TO_DLQ_MESSAGES_TOTAL = "rocketmq_send_to_dlq_messages_total"; + public static final String COUNTER_COMMIT_MESSAGES_TOTAL = "rocketmq_commit_messages_total"; + public static final String COUNTER_ROLLBACK_MESSAGES_TOTAL = "rocketmq_rollback_messages_total"; + public static final String HISTOGRAM_FINISH_MSG_LATENCY = "rocketmq_finish_message_latency"; + public static final String GAUGE_HALF_MESSAGES = "rocketmq_half_messages"; + public static final String LABEL_CLUSTER_NAME = "cluster"; public static final String LABEL_NODE_TYPE = "node_type"; public static final String NODE_TYPE_BROKER = "broker"; diff --git a/broker/src/main/java/org/apache/rocketmq/broker/metrics/BrokerMetricsManager.java b/broker/src/main/java/org/apache/rocketmq/broker/metrics/BrokerMetricsManager.java index 307fc02ef0..fc7e97bda9 100644 --- a/broker/src/main/java/org/apache/rocketmq/broker/metrics/BrokerMetricsManager.java +++ b/broker/src/main/java/org/apache/rocketmq/broker/metrics/BrokerMetricsManager.java @@ -40,13 +40,6 @@ import io.opentelemetry.sdk.metrics.export.MetricExporter; import io.opentelemetry.sdk.metrics.export.PeriodicMetricReader; import io.opentelemetry.sdk.metrics.internal.SdkMeterProviderUtil; import io.opentelemetry.sdk.resources.Resource; -import java.util.ArrayList; -import java.util.Arrays; -import java.util.HashMap; -import java.util.List; -import java.util.Map; -import java.util.concurrent.TimeUnit; -import java.util.function.Supplier; import org.apache.commons.lang3.StringUtils; import org.apache.rocketmq.broker.BrokerController; import org.apache.rocketmq.broker.client.ConsumerManager; @@ -68,12 +61,23 @@ import org.apache.rocketmq.logging.org.slf4j.LoggerFactory; import org.apache.rocketmq.remoting.metrics.RemotingMetricsManager; import org.apache.rocketmq.remoting.protocol.header.SendMessageRequestHeader; import org.apache.rocketmq.store.MessageStore; +import org.apache.rocketmq.store.metrics.DefaultStoreMetricsConstant; import org.slf4j.bridge.SLF4JBridgeHandler; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.concurrent.TimeUnit; +import java.util.function.Supplier; + import static org.apache.rocketmq.broker.metrics.BrokerMetricsConstant.AGGREGATION_DELTA; +import static org.apache.rocketmq.broker.metrics.BrokerMetricsConstant.COUNTER_COMMIT_MESSAGES_TOTAL; import static org.apache.rocketmq.broker.metrics.BrokerMetricsConstant.COUNTER_CONSUMER_SEND_TO_DLQ_MESSAGES_TOTAL; import static org.apache.rocketmq.broker.metrics.BrokerMetricsConstant.COUNTER_MESSAGES_IN_TOTAL; import static org.apache.rocketmq.broker.metrics.BrokerMetricsConstant.COUNTER_MESSAGES_OUT_TOTAL; +import static org.apache.rocketmq.broker.metrics.BrokerMetricsConstant.COUNTER_ROLLBACK_MESSAGES_TOTAL; import static org.apache.rocketmq.broker.metrics.BrokerMetricsConstant.COUNTER_THROUGHPUT_IN_TOTAL; import static org.apache.rocketmq.broker.metrics.BrokerMetricsConstant.COUNTER_THROUGHPUT_OUT_TOTAL; import static org.apache.rocketmq.broker.metrics.BrokerMetricsConstant.GAUGE_BROKER_PERMISSION; @@ -83,8 +87,10 @@ import static org.apache.rocketmq.broker.metrics.BrokerMetricsConstant.GAUGE_CON import static org.apache.rocketmq.broker.metrics.BrokerMetricsConstant.GAUGE_CONSUMER_LAG_MESSAGES; import static org.apache.rocketmq.broker.metrics.BrokerMetricsConstant.GAUGE_CONSUMER_QUEUEING_LATENCY; import static org.apache.rocketmq.broker.metrics.BrokerMetricsConstant.GAUGE_CONSUMER_READY_MESSAGES; +import static org.apache.rocketmq.broker.metrics.BrokerMetricsConstant.GAUGE_HALF_MESSAGES; import static org.apache.rocketmq.broker.metrics.BrokerMetricsConstant.GAUGE_PROCESSOR_WATERMARK; import static org.apache.rocketmq.broker.metrics.BrokerMetricsConstant.GAUGE_PRODUCER_CONNECTIONS; +import static org.apache.rocketmq.broker.metrics.BrokerMetricsConstant.HISTOGRAM_FINISH_MSG_LATENCY; import static org.apache.rocketmq.broker.metrics.BrokerMetricsConstant.HISTOGRAM_MESSAGE_SIZE; import static org.apache.rocketmq.broker.metrics.BrokerMetricsConstant.LABEL_AGGREGATION; import static org.apache.rocketmq.broker.metrics.BrokerMetricsConstant.LABEL_CLUSTER_NAME; @@ -141,6 +147,10 @@ public class BrokerMetricsManager { public static ObservableLongGauge consumerQueueingLatency = new NopObservableLongGauge(); public static ObservableLongGauge consumerReadyMessages = new NopObservableLongGauge(); public static LongCounter sendToDlqMessages = new NopLongCounter(); + public static ObservableLongGauge halfMessages = new NopObservableLongGauge(); + public static LongCounter commitMessagesTotal = new NopLongCounter(); + public static LongCounter rollBackMessagesTotal = new NopLongCounter(); + public static LongHistogram transactionFinishLatency = new NopLongHistogram(); public static final List<String> SYSTEM_GROUP_PREFIX_LIST = new ArrayList<String>() { { @@ -348,6 +358,7 @@ public class BrokerMetricsManager { initRequestMetrics(); initConnectionMetrics(); initLagAndDlqMetrics(); + initTransactionMetrics(); initOtherMetrics(); } @@ -361,6 +372,15 @@ public class BrokerMetricsManager { 2d * 1024 * 1024, //2MB 4d * 1024 * 1024 //4MB ); + + List<Double> commitLatencyBuckets = Arrays.asList( + 1d * 1 * 1 * 5, //5s + 1d * 1 * 1 * 60, //1min + 1d * 1 * 10 * 60, //10min + 1d * 1 * 60 * 60, //1h + 1d * 12 * 60 * 60, //12h + 1d * 24 * 60 * 60 //24h + ); InstrumentSelector messageSizeSelector = InstrumentSelector.builder() .setType(InstrumentType.HISTOGRAM) .setName(HISTOGRAM_MESSAGE_SIZE) @@ -371,6 +391,16 @@ public class BrokerMetricsManager { SdkMeterProviderUtil.setCardinalityLimit(messageSizeViewBuilder, brokerConfig.getMetricsOtelCardinalityLimit()); providerBuilder.registerView(messageSizeSelector, messageSizeViewBuilder.build()); + InstrumentSelector commitLatencySelector = InstrumentSelector.builder() + .setType(InstrumentType.HISTOGRAM) + .setName(HISTOGRAM_FINISH_MSG_LATENCY) + .build(); + ViewBuilder commitLatencyViewBuilder = View.builder() + .setAggregation(Aggregation.explicitBucketHistogram(commitLatencyBuckets)); + // To config the cardinalityLimit for openTelemetry metrics exporting. + SdkMeterProviderUtil.setCardinalityLimit(commitLatencyViewBuilder, brokerConfig.getMetricsOtelCardinalityLimit()); + providerBuilder.registerView(commitLatencySelector, commitLatencyViewBuilder.build()); + for (Pair<InstrumentSelector, ViewBuilder> selectorViewPair : RemotingMetricsManager.getMetricsView()) { ViewBuilder viewBuilder = selectorViewPair.getObject2(); SdkMeterProviderUtil.setCardinalityLimit(viewBuilder, brokerConfig.getMetricsOtelCardinalityLimit()); @@ -560,6 +590,34 @@ public class BrokerMetricsManager { .build(); } + private void initTransactionMetrics() { + commitMessagesTotal = brokerMeter.counterBuilder(COUNTER_COMMIT_MESSAGES_TOTAL) + .setDescription("Total number of commit messages") + .build(); + + rollBackMessagesTotal = brokerMeter.counterBuilder(COUNTER_ROLLBACK_MESSAGES_TOTAL) + .setDescription("Total number of rollback messages") + .build(); + + transactionFinishLatency = brokerMeter.histogramBuilder(HISTOGRAM_FINISH_MSG_LATENCY) + .setDescription("Transaction finish latency") + .ofLongs() + .setUnit("ms") + .build(); + + halfMessages = brokerMeter.gaugeBuilder(GAUGE_HALF_MESSAGES) + .setDescription("Half messages of all topics") + .ofLongs() + .buildWithCallback(measurement -> { + brokerController.getTransactionalMessageService().getTransactionMetrics().getTransactionCounts() + .forEach((topic, metric) -> { + measurement.record( + metric.getCount().get(), + newAttributesBuilder().put(DefaultStoreMetricsConstant.LABEL_TOPIC, topic).build() + ); + }); + }); + } private void initOtherMetrics() { RemotingMetricsManager.initMetrics(brokerMeter, BrokerMetricsManager::newAttributesBuilder); messageStore.initMetrics(brokerMeter, BrokerMetricsManager::newAttributesBuilder); diff --git a/broker/src/main/java/org/apache/rocketmq/broker/processor/EndTransactionProcessor.java b/broker/src/main/java/org/apache/rocketmq/broker/processor/EndTransactionProcessor.java index f6aa0d48c9..e812a53ba7 100644 --- a/broker/src/main/java/org/apache/rocketmq/broker/processor/EndTransactionProcessor.java +++ b/broker/src/main/java/org/apache/rocketmq/broker/processor/EndTransactionProcessor.java @@ -19,6 +19,7 @@ package org.apache.rocketmq.broker.processor; import io.netty.channel.ChannelHandlerContext; import org.apache.commons.lang3.StringUtils; import org.apache.rocketmq.broker.BrokerController; +import org.apache.rocketmq.broker.metrics.BrokerMetricsManager; import org.apache.rocketmq.broker.transaction.OperationResult; import org.apache.rocketmq.broker.transaction.queue.TransactionalMessageUtil; import org.apache.rocketmq.common.TopicFilterType; @@ -40,6 +41,8 @@ import org.apache.rocketmq.remoting.protocol.header.EndTransactionRequestHeader; import org.apache.rocketmq.store.PutMessageResult; import org.apache.rocketmq.store.config.BrokerRole; +import static org.apache.rocketmq.broker.metrics.BrokerMetricsConstant.LABEL_TOPIC; + /** * EndTransaction processor: process commit and rollback message */ @@ -144,6 +147,16 @@ public class EndTransactionProcessor implements NettyRequestProcessor { RemotingCommand sendResult = sendFinalMessage(msgInner); if (sendResult.getCode() == ResponseCode.SUCCESS) { this.brokerController.getTransactionalMessageService().deletePrepareMessage(result.getPrepareMessage()); + // successful committed, then total num of half-messages minus 1 + this.brokerController.getTransactionalMessageService().getTransactionMetrics().addAndGet(msgInner.getTopic(), -1); + BrokerMetricsManager.commitMessagesTotal.add(1, BrokerMetricsManager.newAttributesBuilder() + .put(LABEL_TOPIC, msgInner.getTopic()) + .build()); + // record the commit latency. + Long commitLatency = (System.currentTimeMillis() - result.getPrepareMessage().getBornTimestamp()) / 1000; + BrokerMetricsManager.transactionFinishLatency.record(commitLatency, BrokerMetricsManager.newAttributesBuilder() + .put(LABEL_TOPIC, msgInner.getTopic()) + .build()); } return sendResult; } @@ -161,6 +174,11 @@ public class EndTransactionProcessor implements NettyRequestProcessor { RemotingCommand res = checkPrepareMessage(result.getPrepareMessage(), requestHeader); if (res.getCode() == ResponseCode.SUCCESS) { this.brokerController.getTransactionalMessageService().deletePrepareMessage(result.getPrepareMessage()); + // roll back, then total num of half-messages minus 1 + this.brokerController.getTransactionalMessageService().getTransactionMetrics().addAndGet(result.getPrepareMessage().getProperty(MessageConst.PROPERTY_REAL_TOPIC), -1); + BrokerMetricsManager.rollBackMessagesTotal.add(1, BrokerMetricsManager.newAttributesBuilder() + .put(LABEL_TOPIC, result.getPrepareMessage().getProperty(MessageConst.PROPERTY_REAL_TOPIC)) + .build()); } return res; } diff --git a/broker/src/main/java/org/apache/rocketmq/broker/processor/SendMessageProcessor.java b/broker/src/main/java/org/apache/rocketmq/broker/processor/SendMessageProcessor.java index 4ec84c1461..912d502eab 100644 --- a/broker/src/main/java/org/apache/rocketmq/broker/processor/SendMessageProcessor.java +++ b/broker/src/main/java/org/apache/rocketmq/broker/processor/SendMessageProcessor.java @@ -18,11 +18,6 @@ package org.apache.rocketmq.broker.processor; import io.netty.channel.ChannelHandlerContext; import io.opentelemetry.api.common.Attributes; -import java.nio.ByteBuffer; -import java.util.Map; -import java.util.Objects; -import java.util.Optional; -import java.util.concurrent.CompletableFuture; import org.apache.commons.lang3.StringUtils; import org.apache.rocketmq.broker.BrokerController; import org.apache.rocketmq.broker.metrics.BrokerMetricsManager; @@ -65,10 +60,17 @@ import org.apache.rocketmq.store.AppendMessageResult; import org.apache.rocketmq.store.DefaultMessageStore; import org.apache.rocketmq.store.MessageStore; import org.apache.rocketmq.store.PutMessageResult; +import org.apache.rocketmq.store.PutMessageStatus; import org.apache.rocketmq.store.config.BrokerRole; import org.apache.rocketmq.store.config.StorePathConfigHelper; import org.apache.rocketmq.store.stats.BrokerStatsManager; +import java.nio.ByteBuffer; +import java.util.Map; +import java.util.Objects; +import java.util.Optional; +import java.util.concurrent.CompletableFuture; + import static org.apache.rocketmq.broker.metrics.BrokerMetricsConstant.LABEL_CONSUMER_GROUP; import static org.apache.rocketmq.broker.metrics.BrokerMetricsConstant.LABEL_IS_SYSTEM; import static org.apache.rocketmq.broker.metrics.BrokerMetricsConstant.LABEL_MESSAGE_TYPE; @@ -300,7 +302,7 @@ public class SendMessageProcessor extends AbstractSendMessageProcessor implement // Map<String, String> oriProps = MessageDecoder.string2messageProperties(requestHeader.getProperties()); String traFlag = oriProps.get(MessageConst.PROPERTY_TRANSACTION_PREPARED); - boolean sendTransactionPrepareMessage = false; + boolean sendTransactionPrepareMessage; if (Boolean.parseBoolean(traFlag) && !(msgInner.getReconsumeTimes() > 0 && msgInner.getDelayTimeLevel() > 0)) { //For client under version 4.6.1 if (this.brokerController.getBrokerConfig().isRejectTransactionMessage()) { @@ -311,6 +313,8 @@ public class SendMessageProcessor extends AbstractSendMessageProcessor implement return response; } sendTransactionPrepareMessage = true; + } else { + sendTransactionPrepareMessage = false; } long beginTimeMillis = this.brokerController.getMessageStore().now(); @@ -332,6 +336,12 @@ public class SendMessageProcessor extends AbstractSendMessageProcessor implement if (responseFuture != null) { doResponse(ctx, request, responseFuture); } + + // record the transaction metrics, responseFuture == null means put successfully + if (sendTransactionPrepareMessage && (responseFuture == null || responseFuture.getCode() == ResponseCode.SUCCESS)) { + this.brokerController.getTransactionalMessageService().getTransactionMetrics().addAndGet(msgInner.getProperty(MessageConst.PROPERTY_REAL_TOPIC), 1); + } + sendMessageCallback.onComplete(sendMessageContext, response); }, this.brokerController.getPutMessageFutureExecutor()); // Returns null to release the send message thread @@ -344,6 +354,10 @@ public class SendMessageProcessor extends AbstractSendMessageProcessor implement putMessageResult = this.brokerController.getMessageStore().putMessage(msgInner); } handlePutMessageResult(putMessageResult, response, request, msgInner, responseHeader, sendMessageContext, ctx, queueIdInt, beginTimeMillis, mappingContext, BrokerMetricsManager.getMessageType(requestHeader)); + // record the transaction metrics + if (putMessageResult.getPutMessageStatus() == PutMessageStatus.PUT_OK && putMessageResult.getAppendMessageResult().isOk()) { + this.brokerController.getTransactionalMessageService().getTransactionMetrics().addAndGet(msgInner.getProperty(MessageConst.PROPERTY_REAL_TOPIC), 1); + } sendMessageCallback.onComplete(sendMessageContext, response); return response; } diff --git a/broker/src/main/java/org/apache/rocketmq/broker/transaction/TransactionMetrics.java b/broker/src/main/java/org/apache/rocketmq/broker/transaction/TransactionMetrics.java new file mode 100644 index 0000000000..ad30c73c60 --- /dev/null +++ b/broker/src/main/java/org/apache/rocketmq/broker/transaction/TransactionMetrics.java @@ -0,0 +1,259 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.rocketmq.broker.transaction; + +import com.alibaba.fastjson.JSON; +import com.alibaba.fastjson.serializer.SerializerFeature; +import com.google.common.io.Files; +import java.io.BufferedWriter; +import java.io.File; +import java.io.FileOutputStream; +import java.io.IOException; +import java.io.OutputStreamWriter; +import java.io.Writer; +import java.nio.charset.StandardCharsets; +import java.util.Iterator; +import java.util.Map; +import java.util.Set; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.ConcurrentMap; +import java.util.concurrent.atomic.AtomicLong; + +import org.apache.rocketmq.common.ConfigManager; +import org.apache.rocketmq.common.constant.LoggerName; +import org.apache.rocketmq.common.topic.TopicValidator; +import org.apache.rocketmq.logging.org.slf4j.Logger; +import org.apache.rocketmq.logging.org.slf4j.LoggerFactory; +import org.apache.rocketmq.remoting.protocol.DataVersion; +import org.apache.rocketmq.remoting.protocol.RemotingSerializable; + +public class TransactionMetrics extends ConfigManager { + private static final Logger log = LoggerFactory.getLogger(LoggerName.BROKER_LOGGER_NAME); + + private ConcurrentMap<String, Metric> transactionCounts = + new ConcurrentHashMap<>(1024); + + private DataVersion dataVersion = new DataVersion(); + + private final String configPath; + + public TransactionMetrics(String configPath) { + this.configPath = configPath; + } + + public long addAndGet(String topic, int value) { + Metric pair = getTopicPair(topic); + getDataVersion().nextVersion(); + pair.setTimeStamp(System.currentTimeMillis()); + return pair.getCount().addAndGet(value); + } + + public Metric getTopicPair(String topic) { + Metric pair = transactionCounts.get(topic); + if (null != pair) { + return pair; + } + pair = new Metric(); + final Metric previous = transactionCounts.putIfAbsent(topic, pair); + if (null != previous) { + return previous; + } + return pair; + } + public long getTransactionCount(String topic) { + Metric pair = transactionCounts.get(topic); + if (null == pair) { + return 0; + } else { + return pair.getCount().get(); + } + } + + public Map<String, Metric> getTransactionCounts() { + return transactionCounts; + } + public void setTransactionCounts(ConcurrentMap<String, Metric> transactionCounts) { + this.transactionCounts = transactionCounts; + } + + protected void write0(Writer writer) { + TransactionMetricsSerializeWrapper wrapper = new TransactionMetricsSerializeWrapper(); + wrapper.setTransactionCount(transactionCounts); + wrapper.setDataVersion(dataVersion); + JSON.writeJSONString(writer, wrapper, SerializerFeature.BrowserCompatible); + } + + @Override + public String encode() { + return encode(false); + } + + @Override + public String configFilePath() { + return configPath; + } + + @Override + public void decode(String jsonString) { + if (jsonString != null) { + TransactionMetricsSerializeWrapper transactionMetricsSerializeWrapper = + TransactionMetricsSerializeWrapper.fromJson(jsonString, TransactionMetricsSerializeWrapper.class); + if (transactionMetricsSerializeWrapper != null) { + this.transactionCounts.putAll(transactionMetricsSerializeWrapper.getTransactionCount()); + this.dataVersion.assignNewOne(transactionMetricsSerializeWrapper.getDataVersion()); + } + } + } + + @Override + public String encode(boolean prettyFormat) { + TransactionMetricsSerializeWrapper metricsSerializeWrapper = new TransactionMetricsSerializeWrapper(); + metricsSerializeWrapper.setDataVersion(this.dataVersion); + metricsSerializeWrapper.setTransactionCount(this.transactionCounts); + return metricsSerializeWrapper.toJson(prettyFormat); + } + + public DataVersion getDataVersion() { + return dataVersion; + } + + public void setDataVersion(DataVersion dataVersion) { + this.dataVersion = dataVersion; + } + + public void cleanMetrics(Set<String> topics) { + if (topics == null || topics.isEmpty()) { + return; + } + Iterator<Map.Entry<String, Metric>> iterator = transactionCounts.entrySet().iterator(); + while (iterator.hasNext()) { + Map.Entry<String, Metric> entry = iterator.next(); + final String topic = entry.getKey(); + if (topic.startsWith(TopicValidator.SYSTEM_TOPIC_PREFIX)) { + continue; + } + if (!topics.contains(topic)) { + continue; + } + // in the input topics set, then remove it. + iterator.remove(); + } + } + + public static class TransactionMetricsSerializeWrapper extends RemotingSerializable { + private ConcurrentMap<String, Metric> transactionCount = + new ConcurrentHashMap<>(1024); + private DataVersion dataVersion = new DataVersion(); + + public ConcurrentMap<String, Metric> getTransactionCount() { + return transactionCount; + } + + public void setTransactionCount( + ConcurrentMap<String, Metric> transactionCount) { + this.transactionCount = transactionCount; + } + + public DataVersion getDataVersion() { + return dataVersion; + } + + public void setDataVersion(DataVersion dataVersion) { + this.dataVersion = dataVersion; + } + } + + @Override + public synchronized void persist() { + String config = configFilePath(); + String temp = config + ".tmp"; + String backup = config + ".bak"; + BufferedWriter bufferedWriter = null; + try { + File tmpFile = new File(temp); + File parentDirectory = tmpFile.getParentFile(); + if (!parentDirectory.exists()) { + if (!parentDirectory.mkdirs()) { + log.error("Failed to create directory: {}", parentDirectory.getCanonicalPath()); + return; + } + } + + if (!tmpFile.exists()) { + if (!tmpFile.createNewFile()) { + log.error("Failed to create file: {}", tmpFile.getCanonicalPath()); + return; + } + } + bufferedWriter = new BufferedWriter(new OutputStreamWriter(new FileOutputStream(tmpFile, false), + StandardCharsets.UTF_8)); + write0(bufferedWriter); + bufferedWriter.flush(); + bufferedWriter.close(); + log.debug("Finished writing tmp file: {}", temp); + + File configFile = new File(config); + if (configFile.exists()) { + Files.copy(configFile, new File(backup)); + configFile.delete(); + } + + tmpFile.renameTo(configFile); + } catch (IOException e) { + log.error("Failed to persist {}", temp, e); + } finally { + if (null != bufferedWriter) { + try { + bufferedWriter.close(); + } catch (IOException ignore) { + } + } + } + } + + public static class Metric { + private AtomicLong count; + private long timeStamp; + + public Metric() { + count = new AtomicLong(0); + timeStamp = System.currentTimeMillis(); + } + + public AtomicLong getCount() { + return count; + } + + public void setCount(AtomicLong count) { + this.count = count; + } + + public long getTimeStamp() { + return timeStamp; + } + + public void setTimeStamp(long timeStamp) { + this.timeStamp = timeStamp; + } + + @Override + public String toString() { + return String.format("[%d,%d]", count.get(), timeStamp); + } + } + +} diff --git a/broker/src/main/java/org/apache/rocketmq/broker/transaction/TransactionMetricsFlushService.java b/broker/src/main/java/org/apache/rocketmq/broker/transaction/TransactionMetricsFlushService.java new file mode 100644 index 0000000000..948f9fbc8e --- /dev/null +++ b/broker/src/main/java/org/apache/rocketmq/broker/transaction/TransactionMetricsFlushService.java @@ -0,0 +1,55 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.rocketmq.broker.transaction; + +import org.apache.rocketmq.broker.BrokerController; +import org.apache.rocketmq.common.ServiceThread; +import org.apache.rocketmq.common.constant.LoggerName; +import org.apache.rocketmq.logging.org.slf4j.Logger; +import org.apache.rocketmq.logging.org.slf4j.LoggerFactory; + +public class TransactionMetricsFlushService extends ServiceThread { + private static final Logger log = LoggerFactory.getLogger(LoggerName.TRANSACTION_LOGGER_NAME); + private BrokerController brokerController; + public TransactionMetricsFlushService(BrokerController brokerController) { + this.brokerController = brokerController; + } + + @Override + public String getServiceName() { + return "TransactionFlushService"; + } + + @Override + public void run() { + log.info(this.getServiceName() + " service start"); + long start = System.currentTimeMillis(); + while (!this.isStopped()) { + try { + if (System.currentTimeMillis() - start > brokerController.getBrokerConfig().getTransactionMetricFlushInterval()) { + start = System.currentTimeMillis(); + brokerController.getTransactionalMessageService().getTransactionMetrics().persist(); + waitForRunning(brokerController.getBrokerConfig().getTransactionMetricFlushInterval()); + } + } catch (Throwable e) { + log.error("Error occurred in " + getServiceName(), e); + } + } + log.info(this.getServiceName() + " service end"); + } +} \ No newline at end of file diff --git a/broker/src/main/java/org/apache/rocketmq/broker/transaction/TransactionalMessageService.java b/broker/src/main/java/org/apache/rocketmq/broker/transaction/TransactionalMessageService.java index 8dbbf980eb..849e64024b 100644 --- a/broker/src/main/java/org/apache/rocketmq/broker/transaction/TransactionalMessageService.java +++ b/broker/src/main/java/org/apache/rocketmq/broker/transaction/TransactionalMessageService.java @@ -17,6 +17,7 @@ package org.apache.rocketmq.broker.transaction; import java.util.concurrent.CompletableFuture; + import org.apache.rocketmq.common.message.MessageExt; import org.apache.rocketmq.common.message.MessageExtBrokerInner; import org.apache.rocketmq.remoting.protocol.header.EndTransactionRequestHeader; @@ -87,4 +88,8 @@ public interface TransactionalMessageService { * Close transaction service. */ void close(); + + TransactionMetrics getTransactionMetrics(); + + void setTransactionMetrics(TransactionMetrics transactionMetrics); } diff --git a/broker/src/main/java/org/apache/rocketmq/broker/transaction/queue/DefaultTransactionalMessageCheckListener.java b/broker/src/main/java/org/apache/rocketmq/broker/transaction/queue/DefaultTransactionalMessageCheckListener.java index ad02ae4270..8e2b679b40 100644 --- a/broker/src/main/java/org/apache/rocketmq/broker/transaction/queue/DefaultTransactionalMessageCheckListener.java +++ b/broker/src/main/java/org/apache/rocketmq/broker/transaction/queue/DefaultTransactionalMessageCheckListener.java @@ -49,6 +49,8 @@ public class DefaultTransactionalMessageCheckListener extends AbstractTransactio if (putMessageResult != null && putMessageResult.getPutMessageStatus() == PutMessageStatus.PUT_OK) { log.info("Put checked-too-many-time half message to TRANS_CHECK_MAXTIME_TOPIC OK. Restored in queueOffset={}, " + "commitLogOffset={}, real topic={}", msgExt.getQueueOffset(), msgExt.getCommitLogOffset(), msgExt.getUserProperty(MessageConst.PROPERTY_REAL_TOPIC)); + // discarded, then the num of half-messages minus 1 + this.getBrokerController().getTransactionalMessageService().getTransactionMetrics().addAndGet(msgExt.getUserProperty(MessageConst.PROPERTY_REAL_TOPIC), -1); } else { log.error("Put checked-too-many-time half message to TRANS_CHECK_MAXTIME_TOPIC failed, real topic={}, msgId={}", msgExt.getTopic(), msgExt.getMsgId()); } diff --git a/broker/src/main/java/org/apache/rocketmq/broker/transaction/queue/TransactionalMessageServiceImpl.java b/broker/src/main/java/org/apache/rocketmq/broker/transaction/queue/TransactionalMessageServiceImpl.java index 48db828e0a..9fdfd0a710 100644 --- a/broker/src/main/java/org/apache/rocketmq/broker/transaction/queue/TransactionalMessageServiceImpl.java +++ b/broker/src/main/java/org/apache/rocketmq/broker/transaction/queue/TransactionalMessageServiceImpl.java @@ -27,8 +27,11 @@ import java.util.Set; import java.util.concurrent.CompletableFuture; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.TimeUnit; + +import org.apache.rocketmq.broker.BrokerPathConfigHelper; import org.apache.rocketmq.broker.transaction.AbstractTransactionalMessageCheckListener; import org.apache.rocketmq.broker.transaction.OperationResult; +import org.apache.rocketmq.broker.transaction.TransactionMetrics; import org.apache.rocketmq.broker.transaction.TransactionalMessageService; import org.apache.rocketmq.client.consumer.PullResult; import org.apache.rocketmq.client.consumer.PullStatus; @@ -70,10 +73,25 @@ public class TransactionalMessageServiceImpl implements TransactionalMessageServ private ConcurrentHashMap<MessageQueue, MessageQueue> opQueueMap = new ConcurrentHashMap<>(); + private TransactionMetrics transactionMetrics; + public TransactionalMessageServiceImpl(TransactionalMessageBridge transactionBridge) { this.transactionalMessageBridge = transactionBridge; transactionalOpBatchService = new TransactionalOpBatchService(transactionalMessageBridge.getBrokerController(), this); transactionalOpBatchService.start(); + transactionMetrics = new TransactionMetrics(BrokerPathConfigHelper.getTransactionMetricsPath( + transactionalMessageBridge.getBrokerController().getMessageStoreConfig().getStorePathRootDir())); + transactionMetrics.load(); + } + + @Override + public TransactionMetrics getTransactionMetrics() { + return transactionMetrics; + } + + @Override + public void setTransactionMetrics(TransactionMetrics transactionMetrics) { + this.transactionMetrics = transactionMetrics; } @@ -632,6 +650,7 @@ public class TransactionalMessageServiceImpl implements TransactionalMessageServ if (this.transactionalOpBatchService != null) { this.transactionalOpBatchService.shutdown(); } + this.getTransactionMetrics().persist(); } public Message getOpMessage(int queueId, String moreData) { diff --git a/broker/src/test/java/org/apache/rocketmq/broker/processor/EndTransactionProcessorTest.java b/broker/src/test/java/org/apache/rocketmq/broker/processor/EndTransactionProcessorTest.java index 72b339ae73..a364a1bbee 100644 --- a/broker/src/test/java/org/apache/rocketmq/broker/processor/EndTransactionProcessorTest.java +++ b/broker/src/test/java/org/apache/rocketmq/broker/processor/EndTransactionProcessorTest.java @@ -19,6 +19,7 @@ package org.apache.rocketmq.broker.processor; import io.netty.channel.ChannelHandlerContext; import org.apache.rocketmq.broker.BrokerController; import org.apache.rocketmq.broker.transaction.OperationResult; +import org.apache.rocketmq.broker.transaction.TransactionMetrics; import org.apache.rocketmq.broker.transaction.TransactionalMessageService; import org.apache.rocketmq.common.BrokerConfig; import org.apache.rocketmq.common.message.MessageAccessor; @@ -71,8 +72,12 @@ public class EndTransactionProcessorTest { @Mock private TransactionalMessageService transactionMsgService; + @Mock + private TransactionMetrics transactionMetrics; + @Before public void init() { + when(transactionMsgService.getTransactionMetrics()).thenReturn(transactionMetrics); brokerController.setMessageStore(messageStore); brokerController.setTransactionalMessageService(transactionMsgService); endTransactionProcessor = new EndTransactionProcessor(brokerController); diff --git a/broker/src/test/java/org/apache/rocketmq/broker/transaction/queue/TransactionMetricsTest.java b/broker/src/test/java/org/apache/rocketmq/broker/transaction/queue/TransactionMetricsTest.java new file mode 100644 index 0000000000..690b4eabb5 --- /dev/null +++ b/broker/src/test/java/org/apache/rocketmq/broker/transaction/queue/TransactionMetricsTest.java @@ -0,0 +1,83 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.rocketmq.broker.transaction.queue; + +import org.apache.rocketmq.broker.transaction.TransactionMetrics; +import org.apache.rocketmq.broker.transaction.TransactionMetrics.Metric; +import org.junit.Before; +import org.junit.Test; +import org.junit.runner.RunWith; +import org.mockito.junit.MockitoJUnitRunner; + +import java.util.Collections; + + +@RunWith(MockitoJUnitRunner.class) +public class TransactionMetricsTest { + private TransactionMetrics transactionMetrics; + private String configPath; + + @Before + public void setUp() throws Exception { + configPath = "configPath"; + transactionMetrics = new TransactionMetrics(configPath); + } + + /** + * test addAndGet method + */ + @Test + public void testAddAndGet() { + String topic = "testAddAndGet"; + int value = 10; + long result = transactionMetrics.addAndGet(topic, value); + + assert result == value; + } + + @Test + public void testGetTopicPair() { + String topic = "getTopicPair"; + Metric result = transactionMetrics.getTopicPair(topic); + assert result != null; + } + + @Test + public void testGetTransactionCount() { + String topicExist = "topicExist"; + String topicNotExist = "topicNotExist"; + + transactionMetrics.addAndGet(topicExist, 10); + + assert transactionMetrics.getTransactionCount(topicExist) == 10; + assert transactionMetrics.getTransactionCount(topicNotExist) == 0; + } + + + /** + * test clean metrics + */ + @Test + public void testCleanMetrics() { + String topic = "testCleanMetrics"; + int value = 10; + assert transactionMetrics.addAndGet(topic, value) == value; + transactionMetrics.cleanMetrics(Collections.singleton(topic)); + assert transactionMetrics.getTransactionCount(topic) == 0; + } +} diff --git a/broker/src/test/java/org/apache/rocketmq/broker/util/TransactionalMessageServiceImpl.java b/broker/src/test/java/org/apache/rocketmq/broker/util/TransactionalMessageServiceImpl.java index 3cbfab2c27..2de4c307e0 100644 --- a/broker/src/test/java/org/apache/rocketmq/broker/util/TransactionalMessageServiceImpl.java +++ b/broker/src/test/java/org/apache/rocketmq/broker/util/TransactionalMessageServiceImpl.java @@ -19,6 +19,7 @@ package org.apache.rocketmq.broker.util; import java.util.concurrent.CompletableFuture; import org.apache.rocketmq.broker.transaction.AbstractTransactionalMessageCheckListener; import org.apache.rocketmq.broker.transaction.OperationResult; +import org.apache.rocketmq.broker.transaction.TransactionMetrics; import org.apache.rocketmq.broker.transaction.TransactionalMessageService; import org.apache.rocketmq.common.constant.LoggerName; import org.apache.rocketmq.common.message.MessageExt; @@ -70,4 +71,14 @@ public class TransactionalMessageServiceImpl implements TransactionalMessageServ public void close() { } + + @Override + public TransactionMetrics getTransactionMetrics() { + return null; + } + + @Override + public void setTransactionMetrics(TransactionMetrics transactionMetrics) { + + } } diff --git a/common/src/main/java/org/apache/rocketmq/common/BrokerConfig.java b/common/src/main/java/org/apache/rocketmq/common/BrokerConfig.java index bedc7f386b..0a2c528f86 100644 --- a/common/src/main/java/org/apache/rocketmq/common/BrokerConfig.java +++ b/common/src/main/java/org/apache/rocketmq/common/BrokerConfig.java @@ -269,6 +269,8 @@ public class BrokerConfig extends BrokerIdentity { @ImportantField private long transactionCheckInterval = 30 * 1000; + private long transactionMetricFlushInterval = 3 * 1000; + /** * transaction batch op message */ @@ -1789,4 +1791,12 @@ public class BrokerConfig extends BrokerIdentity { public void setSplitRegistrationSize(int splitRegistrationSize) { this.splitRegistrationSize = splitRegistrationSize; } + + public long getTransactionMetricFlushInterval() { + return transactionMetricFlushInterval; + } + + public void setTransactionMetricFlushInterval(long transactionMetricFlushInterval) { + this.transactionMetricFlushInterval = transactionMetricFlushInterval; + } } diff --git a/common/src/main/java/org/apache/rocketmq/common/ConfigManager.java b/common/src/main/java/org/apache/rocketmq/common/ConfigManager.java index 6c3bed47cf..5e99759619 100644 --- a/common/src/main/java/org/apache/rocketmq/common/ConfigManager.java +++ b/common/src/main/java/org/apache/rocketmq/common/ConfigManager.java @@ -16,13 +16,14 @@ */ package org.apache.rocketmq.common; -import java.io.IOException; -import java.util.Map; - import org.apache.rocketmq.common.config.RocksDBConfigManager; import org.apache.rocketmq.common.constant.LoggerName; import org.apache.rocketmq.logging.org.slf4j.Logger; import org.apache.rocketmq.logging.org.slf4j.LoggerFactory; +import org.rocksdb.Statistics; + +import java.io.IOException; +import java.util.Map; public abstract class ConfigManager { private static final Logger log = LoggerFactory.getLogger(LoggerName.COMMON_LOGGER_NAME); @@ -103,4 +104,8 @@ public abstract class ConfigManager { public abstract String encode(final boolean prettyFormat); public abstract void decode(final String jsonString); + + public Statistics getStatistics() { + return rocksDBConfigManager == null ? null : rocksDBConfigManager.getStatistics(); + } } diff --git a/common/src/main/java/org/apache/rocketmq/common/config/RocksDBConfigManager.java b/common/src/main/java/org/apache/rocketmq/common/config/RocksDBConfigManager.java index f958bbdf0b..d1ec894685 100644 --- a/common/src/main/java/org/apache/rocketmq/common/config/RocksDBConfigManager.java +++ b/common/src/main/java/org/apache/rocketmq/common/config/RocksDBConfigManager.java @@ -16,15 +16,16 @@ */ package org.apache.rocketmq.common.config; -import java.util.function.BiConsumer; - import org.apache.rocketmq.common.constant.LoggerName; import org.apache.rocketmq.logging.org.slf4j.Logger; import org.apache.rocketmq.logging.org.slf4j.LoggerFactory; import org.rocksdb.FlushOptions; import org.rocksdb.RocksIterator; +import org.rocksdb.Statistics; import org.rocksdb.WriteBatch; +import java.util.function.BiConsumer; + public class RocksDBConfigManager { protected static final Logger BROKER_LOG = LoggerFactory.getLogger(LoggerName.BROKER_LOGGER_NAME); @@ -105,4 +106,12 @@ public class RocksDBConfigManager { public void batchPutWithWal(final WriteBatch batch) throws Exception { this.configRocksDBStorage.batchPutWithWal(batch); } + + public Statistics getStatistics() { + if (this.configRocksDBStorage == null) { + return null; + } + + return configRocksDBStorage.getStatistics(); + } } diff --git a/common/src/main/java/org/apache/rocketmq/common/metrics/NopObservableDoubleGauge.java b/common/src/main/java/org/apache/rocketmq/common/metrics/NopObservableDoubleGauge.java new file mode 100644 index 0000000000..899ac14a9a --- /dev/null +++ b/common/src/main/java/org/apache/rocketmq/common/metrics/NopObservableDoubleGauge.java @@ -0,0 +1,22 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.rocketmq.common.metrics; + +import io.opentelemetry.api.metrics.ObservableDoubleGauge; + +public class NopObservableDoubleGauge implements ObservableDoubleGauge { +} diff --git a/remoting/src/main/java/org/apache/rocketmq/remoting/metrics/RemotingMetricsConstant.java b/remoting/src/main/java/org/apache/rocketmq/remoting/metrics/RemotingMetricsConstant.java index 730469e590..f9b3e4c6fa 100644 --- a/remoting/src/main/java/org/apache/rocketmq/remoting/metrics/RemotingMetricsConstant.java +++ b/remoting/src/main/java/org/apache/rocketmq/remoting/metrics/RemotingMetricsConstant.java @@ -18,7 +18,6 @@ package org.apache.rocketmq.remoting.metrics; public class RemotingMetricsConstant { public static final String HISTOGRAM_RPC_LATENCY = "rocketmq_rpc_latency"; - public static final String LABEL_PROTOCOL_TYPE = "protocol_type"; public static final String LABEL_REQUEST_CODE = "request_code"; public static final String LABEL_RESPONSE_CODE = "response_code"; diff --git a/store/src/main/java/org/apache/rocketmq/store/RocksDBMessageStore.java b/store/src/main/java/org/apache/rocketmq/store/RocksDBMessageStore.java index 87ccb5474b..6141b778bf 100644 --- a/store/src/main/java/org/apache/rocketmq/store/RocksDBMessageStore.java +++ b/store/src/main/java/org/apache/rocketmq/store/RocksDBMessageStore.java @@ -19,12 +19,17 @@ package org.apache.rocketmq.store; import java.io.IOException; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ConcurrentMap; +import java.util.function.Supplier; +import io.opentelemetry.api.common.AttributesBuilder; +import io.opentelemetry.api.metrics.Meter; import org.apache.rocketmq.common.BrokerConfig; import org.apache.rocketmq.common.TopicConfig; import org.apache.rocketmq.common.UtilAll; import org.apache.rocketmq.store.config.MessageStoreConfig; import org.apache.rocketmq.store.config.StorePathConfigHelper; +import org.apache.rocketmq.store.metrics.DefaultStoreMetricsManager; +import org.apache.rocketmq.store.metrics.RocksDBStoreMetricsManager; import org.apache.rocketmq.store.queue.ConsumeQueueInterface; import org.apache.rocketmq.store.queue.ConsumeQueueStoreInterface; import org.apache.rocketmq.store.queue.RocksDBConsumeQueue; @@ -166,4 +171,11 @@ public class RocksDBMessageStore extends DefaultMessageStore { // todo return 0; } + + @Override + public void initMetrics(Meter meter, Supplier<AttributesBuilder> attributesBuilderSupplier) { + DefaultStoreMetricsManager.init(meter, attributesBuilderSupplier, this); + // Also add some metrics for rocksdb's monitoring. + RocksDBStoreMetricsManager.init(meter, attributesBuilderSupplier, this); + } } diff --git a/store/src/main/java/org/apache/rocketmq/store/metrics/DefaultStoreMetricsConstant.java b/store/src/main/java/org/apache/rocketmq/store/metrics/DefaultStoreMetricsConstant.java index 271604b1e5..956501c64f 100644 --- a/store/src/main/java/org/apache/rocketmq/store/metrics/DefaultStoreMetricsConstant.java +++ b/store/src/main/java/org/apache/rocketmq/store/metrics/DefaultStoreMetricsConstant.java @@ -30,10 +30,22 @@ public class DefaultStoreMetricsConstant { public static final String COUNTER_TIMER_ENQUEUE_TOTAL = "rocketmq_timer_enqueue_total"; public static final String COUNTER_TIMER_DEQUEUE_TOTAL = "rocketmq_timer_dequeue_total"; + public static final String GAUGE_TIMER_MESSAGE_SNAPSHOT = "rocketmq_timer_message_snapshot"; + public static final String HISTOGRAM_DELAY_MSG_LATENCY = "rocketmq_delay_message_latency"; public static final String LABEL_STORAGE_TYPE = "storage_type"; public static final String DEFAULT_STORAGE_TYPE = "local"; public static final String LABEL_STORAGE_MEDIUM = "storage_medium"; public static final String DEFAULT_STORAGE_MEDIUM = "disk"; public static final String LABEL_TOPIC = "topic"; + public static final String LABEL_TIMING_BOUND = "timer_bound_s"; + public static final String GAUGE_BYTES_ROCKSDB_WRITTEN = "rocketmq_rocksdb_bytes_written"; + public static final String GAUGE_BYTES_ROCKSDB_READ = "rocketmq_rocksdb_bytes_read"; + + public static final String GAUGE_TIMES_ROCKSDB_WRITTEN_SELF = "rocketmq_rocksdb_times_written_self"; + public static final String GAUGE_TIMES_ROCKSDB_WRITTEN_OTHER = "rocketmq_rocksdb_times_written_other"; + public static final String GAUGE_RATE_ROCKSDB_CACHE_HIT = "rocketmq_rocksdb_rate_cache_hit"; + public static final String GAUGE_TIMES_ROCKSDB_COMPRESSED = "rocketmq_rocksdb_times_compressed"; + public static final String GAUGE_BYTES_READ_AMPLIFICATION = "rocketmq_rocksdb_read_amplification_bytes"; + public static final String GAUGE_TIMES_ROCKSDB_READ = "rocketmq_rocksdb_times_read"; } diff --git a/store/src/main/java/org/apache/rocketmq/store/metrics/DefaultStoreMetricsManager.java b/store/src/main/java/org/apache/rocketmq/store/metrics/DefaultStoreMetricsManager.java index 45a6bbc680..db4c7bb766 100644 --- a/store/src/main/java/org/apache/rocketmq/store/metrics/DefaultStoreMetricsManager.java +++ b/store/src/main/java/org/apache/rocketmq/store/metrics/DefaultStoreMetricsManager.java @@ -20,19 +20,29 @@ import com.google.common.collect.Lists; import io.opentelemetry.api.common.Attributes; import io.opentelemetry.api.common.AttributesBuilder; import io.opentelemetry.api.metrics.LongCounter; +import io.opentelemetry.api.metrics.LongHistogram; import io.opentelemetry.api.metrics.Meter; import io.opentelemetry.api.metrics.ObservableLongGauge; +import io.opentelemetry.sdk.metrics.Aggregation; import io.opentelemetry.sdk.metrics.InstrumentSelector; +import io.opentelemetry.sdk.metrics.InstrumentType; +import io.opentelemetry.sdk.metrics.View; import io.opentelemetry.sdk.metrics.ViewBuilder; -import java.io.File; -import java.util.List; -import java.util.function.Supplier; import org.apache.rocketmq.common.Pair; import org.apache.rocketmq.common.metrics.NopLongCounter; +import org.apache.rocketmq.common.metrics.NopLongHistogram; import org.apache.rocketmq.common.metrics.NopObservableLongGauge; import org.apache.rocketmq.store.DefaultMessageStore; import org.apache.rocketmq.store.config.MessageStoreConfig; +import org.apache.rocketmq.store.timer.Slot; import org.apache.rocketmq.store.timer.TimerMessageStore; +import org.apache.rocketmq.store.timer.TimerMetrics; +import org.apache.rocketmq.store.timer.TimerWheel; + +import java.io.File; +import java.util.Arrays; +import java.util.List; +import java.util.function.Supplier; import static org.apache.rocketmq.store.metrics.DefaultStoreMetricsConstant.COUNTER_TIMER_DEQUEUE_TOTAL; import static org.apache.rocketmq.store.metrics.DefaultStoreMetricsConstant.COUNTER_TIMER_ENQUEUE_TOTAL; @@ -46,9 +56,12 @@ import static org.apache.rocketmq.store.metrics.DefaultStoreMetricsConstant.GAUG import static org.apache.rocketmq.store.metrics.DefaultStoreMetricsConstant.GAUGE_TIMER_DEQUEUE_LATENCY; import static org.apache.rocketmq.store.metrics.DefaultStoreMetricsConstant.GAUGE_TIMER_ENQUEUE_LAG; import static org.apache.rocketmq.store.metrics.DefaultStoreMetricsConstant.GAUGE_TIMER_ENQUEUE_LATENCY; +import static org.apache.rocketmq.store.metrics.DefaultStoreMetricsConstant.GAUGE_TIMER_MESSAGE_SNAPSHOT; import static org.apache.rocketmq.store.metrics.DefaultStoreMetricsConstant.GAUGE_TIMING_MESSAGES; +import static org.apache.rocketmq.store.metrics.DefaultStoreMetricsConstant.HISTOGRAM_DELAY_MSG_LATENCY; import static org.apache.rocketmq.store.metrics.DefaultStoreMetricsConstant.LABEL_STORAGE_MEDIUM; import static org.apache.rocketmq.store.metrics.DefaultStoreMetricsConstant.LABEL_STORAGE_TYPE; +import static org.apache.rocketmq.store.metrics.DefaultStoreMetricsConstant.LABEL_TIMING_BOUND; import static org.apache.rocketmq.store.metrics.DefaultStoreMetricsConstant.LABEL_TOPIC; public class DefaultStoreMetricsManager { @@ -68,9 +81,26 @@ public class DefaultStoreMetricsManager { public static LongCounter timerDequeueTotal = new NopLongCounter(); public static LongCounter timerEnqueueTotal = new NopLongCounter(); + public static ObservableLongGauge timerMessageSnapshot = new NopObservableLongGauge(); + public static LongHistogram timerMessageSetLatency = new NopLongHistogram(); public static List<Pair<InstrumentSelector, ViewBuilder>> getMetricsView() { - return Lists.newArrayList(); + List<Double> rpcCostTimeBuckets = Arrays.asList( + // day * hour * min * second + 1d * 1 * 1 * 60, // 60 second + 1d * 1 * 10 * 60, // 10 min + 1d * 1 * 60 * 60, // 1 hour + 1d * 12 * 60 * 60, // 12 hour + 1d * 24 * 60 * 60, // 1 day + 3d * 24 * 60 * 60 // 3 day + ); + InstrumentSelector selector = InstrumentSelector.builder() + .setType(InstrumentType.HISTOGRAM) + .setName(HISTOGRAM_DELAY_MSG_LATENCY) + .build(); + ViewBuilder viewBuilder = View.builder() + .setAggregation(Aggregation.explicitBucketHistogram(rpcCostTimeBuckets)); + return Lists.newArrayList(new Pair<>(selector, viewBuilder)); } public static void init(Meter meter, Supplier<AttributesBuilder> attributesBuilderSupplier, @@ -168,6 +198,31 @@ public class DefaultStoreMetricsManager { timerEnqueueTotal = meter.counterBuilder(COUNTER_TIMER_ENQUEUE_TOTAL) .setDescription("Total number of timer enqueue") .build(); + timerMessageSnapshot = meter.gaugeBuilder(GAUGE_TIMER_MESSAGE_SNAPSHOT) + .setDescription("Timer message distribution snapshot, only count timing messages in 24h.") + .ofLongs() + .buildWithCallback(measurement -> { + TimerMetrics timerMetrics = messageStore.getTimerMessageStore().getTimerMetrics(); + TimerWheel timerWheel = messageStore.getTimerMessageStore().getTimerWheel(); + int precisionMs = messageStoreConfig.getTimerPrecisionMs(); + List<Integer> timerDist = timerMetrics.getTimerDistList(); + long currTime = System.currentTimeMillis() / precisionMs * precisionMs; + for (int i = 0; i < timerDist.size(); i++) { + int slotBeforeNum = i == 0 ? 0 : timerDist.get(i - 1) * 1000 / precisionMs; + int slotTotalNum = timerDist.get(i) * 1000 / precisionMs; + int periodTotal = 0; + for (int j = slotBeforeNum; j < slotTotalNum; j++) { + Slot slotEach = timerWheel.getSlot(currTime + (long) j * precisionMs); + periodTotal += slotEach.num; + } + measurement.record(periodTotal, newAttributesBuilder().put(LABEL_TIMING_BOUND, timerDist.get(i).toString()).build()); + } + }); + timerMessageSetLatency = meter.histogramBuilder(HISTOGRAM_DELAY_MSG_LATENCY) + .setDescription("Timer message set latency distribution") + .setUnit("seconds") + .ofLongs() + .build(); } } diff --git a/store/src/main/java/org/apache/rocketmq/store/metrics/RocksDBStoreMetricsManager.java b/store/src/main/java/org/apache/rocketmq/store/metrics/RocksDBStoreMetricsManager.java new file mode 100644 index 0000000000..6029488056 --- /dev/null +++ b/store/src/main/java/org/apache/rocketmq/store/metrics/RocksDBStoreMetricsManager.java @@ -0,0 +1,154 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.rocketmq.store.metrics; + +import com.google.common.collect.Lists; +import io.opentelemetry.api.common.Attributes; +import io.opentelemetry.api.common.AttributesBuilder; +import io.opentelemetry.api.metrics.Meter; +import io.opentelemetry.api.metrics.ObservableDoubleGauge; +import io.opentelemetry.api.metrics.ObservableLongGauge; +import io.opentelemetry.sdk.metrics.InstrumentSelector; +import io.opentelemetry.sdk.metrics.ViewBuilder; +import org.apache.rocketmq.common.Pair; +import org.apache.rocketmq.common.metrics.NopObservableDoubleGauge; +import org.apache.rocketmq.common.metrics.NopObservableLongGauge; +import org.apache.rocketmq.store.RocksDBMessageStore; +import org.apache.rocketmq.store.config.MessageStoreConfig; +import org.apache.rocketmq.store.queue.RocksDBConsumeQueueStore; +import org.rocksdb.TickerType; + +import java.util.List; +import java.util.function.Supplier; + +import static org.apache.rocketmq.store.metrics.DefaultStoreMetricsConstant.DEFAULT_STORAGE_MEDIUM; +import static org.apache.rocketmq.store.metrics.DefaultStoreMetricsConstant.DEFAULT_STORAGE_TYPE; +import static org.apache.rocketmq.store.metrics.DefaultStoreMetricsConstant.GAUGE_BYTES_ROCKSDB_READ; +import static org.apache.rocketmq.store.metrics.DefaultStoreMetricsConstant.GAUGE_BYTES_ROCKSDB_WRITTEN; +import static org.apache.rocketmq.store.metrics.DefaultStoreMetricsConstant.LABEL_STORAGE_MEDIUM; +import static org.apache.rocketmq.store.metrics.DefaultStoreMetricsConstant.LABEL_STORAGE_TYPE; + +public class RocksDBStoreMetricsManager { + public static Supplier<AttributesBuilder> attributesBuilderSupplier; + public static MessageStoreConfig messageStoreConfig; + + // The cumulative number of bytes read from the database. + public static ObservableLongGauge bytesRocksdbRead = new NopObservableLongGauge(); + + // The cumulative number of bytes written to the database. + public static ObservableLongGauge bytesRocksdbWritten = new NopObservableLongGauge(); + + // The cumulative number of read operations performed. + public static ObservableLongGauge timesRocksdbRead = new NopObservableLongGauge(); + + // The cumulative number of write operations performed. + public static ObservableLongGauge timesRocksdbWrittenSelf = new NopObservableLongGauge(); + public static ObservableLongGauge timesRocksdbWrittenOther = new NopObservableLongGauge(); + + // The cumulative number of compressions that have occurred. + public static ObservableLongGauge timesRocksdbCompressed = new NopObservableLongGauge(); + + // The ratio of the amount of data actually written to the storage medium to the amount of data written by the application. + public static ObservableDoubleGauge bytesRocksdbAmplificationRead = new NopObservableDoubleGauge(); + + // The rate at which cache lookups were served from the cache rather than needing to be fetched from disk. + public static ObservableDoubleGauge rocksdbCacheHitRate = new NopObservableDoubleGauge(); + + public static volatile long blockCacheHitTimes = 0; + public static volatile long blockCacheMissTimes = 0; + + + + public static List<Pair<InstrumentSelector, ViewBuilder>> getMetricsView() { + return Lists.newArrayList(); + } + + public static void init(Meter meter, Supplier<AttributesBuilder> attributesBuilderSupplier, + RocksDBMessageStore messageStore) { + RocksDBStoreMetricsManager.attributesBuilderSupplier = attributesBuilderSupplier; + bytesRocksdbWritten = meter.gaugeBuilder(GAUGE_BYTES_ROCKSDB_WRITTEN) + .setDescription("The cumulative number of bytes written to the database.") + .ofLongs() + .buildWithCallback(measurement -> { + measurement.record(((RocksDBConsumeQueueStore)messageStore.getQueueStore()) + .getStatistics().getTickerCount(TickerType.BYTES_WRITTEN), newAttributesBuilder().put("type", "consume_queue").build()); + }); + bytesRocksdbRead = meter.gaugeBuilder(GAUGE_BYTES_ROCKSDB_READ) + .setDescription("The cumulative number of bytes read from the database.") + .ofLongs() + .buildWithCallback(measurement -> { + measurement.record(((RocksDBConsumeQueueStore)messageStore.getQueueStore()) + .getStatistics().getTickerCount(TickerType.BYTES_READ), newAttributesBuilder().put("type", "consume_queue").build()); + }); + timesRocksdbWrittenSelf = meter.gaugeBuilder(DefaultStoreMetricsConstant.GAUGE_TIMES_ROCKSDB_WRITTEN_SELF) + .setDescription("The cumulative number of write operations performed by self.") + .ofLongs() + .buildWithCallback(measurement -> { + measurement.record(((RocksDBConsumeQueueStore)messageStore.getQueueStore()) + .getStatistics().getTickerCount(TickerType.WRITE_DONE_BY_SELF), newAttributesBuilder().put("type", "consume_queue").build()); + }); + timesRocksdbWrittenOther = meter.gaugeBuilder(DefaultStoreMetricsConstant.GAUGE_TIMES_ROCKSDB_WRITTEN_OTHER) + .setDescription("The cumulative number of write operations performed by other.") + .ofLongs() + .buildWithCallback(measurement -> { + measurement.record(((RocksDBConsumeQueueStore)messageStore.getQueueStore()) + .getStatistics().getTickerCount(TickerType.WRITE_DONE_BY_OTHER), newAttributesBuilder().put("type", "consume_queue").build()); + }); + timesRocksdbRead = meter.gaugeBuilder(DefaultStoreMetricsConstant.GAUGE_TIMES_ROCKSDB_READ) + .setDescription("The cumulative number of write operations performed by other.") + .ofLongs() + .buildWithCallback(measurement -> { + measurement.record(((RocksDBConsumeQueueStore)messageStore.getQueueStore()) + .getStatistics().getTickerCount(TickerType.NUMBER_KEYS_READ), newAttributesBuilder().put("type", "consume_queue").build()); + }); + rocksdbCacheHitRate = meter.gaugeBuilder(DefaultStoreMetricsConstant.GAUGE_RATE_ROCKSDB_CACHE_HIT) + .setDescription("The rate at which cache lookups were served from the cache rather than needing to be fetched from disk.") + .buildWithCallback(measurement -> { + long newHitTimes = ((RocksDBConsumeQueueStore)messageStore.getQueueStore()) + .getStatistics().getTickerCount(TickerType.BLOCK_CACHE_HIT); + long newMissTimes = ((RocksDBConsumeQueueStore)messageStore.getQueueStore()) + .getStatistics().getTickerCount(TickerType.BLOCK_CACHE_MISS); + long totalPeriod = newHitTimes - blockCacheHitTimes + newMissTimes - blockCacheMissTimes; + double hitRate = totalPeriod == 0 ? 0 : (double)(newHitTimes - blockCacheHitTimes) / totalPeriod; + blockCacheHitTimes = newHitTimes; + blockCacheMissTimes = newMissTimes; + measurement.record(hitRate, newAttributesBuilder().put("type", "consume_queue").build()); + }); + timesRocksdbCompressed = meter.gaugeBuilder(DefaultStoreMetricsConstant.GAUGE_TIMES_ROCKSDB_COMPRESSED) + .setDescription("The cumulative number of compressions that have occurred.") + .ofLongs() + .buildWithCallback(measurement -> { + measurement.record(((RocksDBConsumeQueueStore)messageStore.getQueueStore()) + .getStatistics().getTickerCount(TickerType.NUMBER_BLOCK_COMPRESSED), newAttributesBuilder().put("type", "consume_queue").build()); + }); + bytesRocksdbAmplificationRead = meter.gaugeBuilder(DefaultStoreMetricsConstant.GAUGE_BYTES_READ_AMPLIFICATION) + .setDescription("The rate at which cache lookups were served from the cache rather than needing to be fetched from disk.") + .buildWithCallback(measurement -> { + measurement.record(((RocksDBConsumeQueueStore)messageStore.getQueueStore()) + .getStatistics().getTickerCount(TickerType.READ_AMP_TOTAL_READ_BYTES), newAttributesBuilder().put("type", "consume_queue").build()); + }); + } + + public static AttributesBuilder newAttributesBuilder() { + if (attributesBuilderSupplier == null) { + return Attributes.builder(); + } + return attributesBuilderSupplier.get() + .put(LABEL_STORAGE_TYPE, DEFAULT_STORAGE_TYPE) + .put(LABEL_STORAGE_MEDIUM, DEFAULT_STORAGE_MEDIUM); + } +} diff --git a/store/src/main/java/org/apache/rocketmq/store/queue/RocksDBConsumeQueueStore.java b/store/src/main/java/org/apache/rocketmq/store/queue/RocksDBConsumeQueueStore.java index 78456cfcd8..4c66696e3c 100644 --- a/store/src/main/java/org/apache/rocketmq/store/queue/RocksDBConsumeQueueStore.java +++ b/store/src/main/java/org/apache/rocketmq/store/queue/RocksDBConsumeQueueStore.java @@ -45,6 +45,7 @@ import org.apache.rocketmq.store.config.BrokerRole; import org.apache.rocketmq.store.config.StorePathConfigHelper; import org.apache.rocketmq.store.rocksdb.ConsumeQueueRocksDBStorage; import org.rocksdb.RocksDBException; +import org.rocksdb.Statistics; import org.rocksdb.WriteBatch; public class RocksDBConsumeQueueStore extends AbstractConsumeQueueStore { @@ -266,6 +267,9 @@ public class RocksDBConsumeQueueStore extends AbstractConsumeQueueStore { } } + public Statistics getStatistics() { + return rocksDBStorage.getStatistics(); + } @Override public List<ByteBuffer> rangeQuery(final String topic, final int queueId, final long startIndex, final int num) throws RocksDBException { return this.rocksDBConsumeQueueTable.rangeQuery(topic, queueId, startIndex, num); diff --git a/store/src/main/java/org/apache/rocketmq/store/timer/TimerMessageStore.java b/store/src/main/java/org/apache/rocketmq/store/timer/TimerMessageStore.java index 872cd71054..819b3e96a4 100644 --- a/store/src/main/java/org/apache/rocketmq/store/timer/TimerMessageStore.java +++ b/store/src/main/java/org/apache/rocketmq/store/timer/TimerMessageStore.java @@ -40,6 +40,7 @@ import java.util.concurrent.ScheduledExecutorService; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicInteger; import java.util.function.Function; +import io.opentelemetry.api.common.Attributes; import org.apache.commons.collections.CollectionUtils; import org.apache.commons.lang3.math.NumberUtils; import org.apache.rocketmq.common.ServiceThread; @@ -64,6 +65,7 @@ import org.apache.rocketmq.store.SelectMappedBufferResult; import org.apache.rocketmq.store.config.BrokerRole; import org.apache.rocketmq.store.config.MessageStoreConfig; import org.apache.rocketmq.store.logfile.MappedFile; +import org.apache.rocketmq.store.metrics.DefaultStoreMetricsConstant; import org.apache.rocketmq.store.metrics.DefaultStoreMetricsManager; import org.apache.rocketmq.store.queue.ConsumeQueueInterface; import org.apache.rocketmq.store.queue.CqUnit; @@ -686,6 +688,9 @@ public class TimerMessageStore { return false; } } + Attributes attributes = DefaultStoreMetricsManager.newAttributesBuilder() + .put(DefaultStoreMetricsConstant.LABEL_TOPIC, msgExt.getProperty(MessageConst.PROPERTY_REAL_TOPIC)).build(); + DefaultStoreMetricsManager.timerMessageSetLatency.record((delayedTime - msgExt.getBornTimestamp()) / 1000, attributes); } } catch (Exception e) { // here may cause the message loss