This is an automated email from the ASF dual-hosted git repository. huangli pushed a commit to branch develop in repository https://gitbox.apache.org/repos/asf/rocketmq.git
The following commit(s) were added to refs/heads/develop by this push: new 5081d15 [ISSUE #3637] Add enableDetailStat in BrokerConfig so we can disable stat of queue level. (#3638) 5081d15 is described below commit 5081d155f8e66ef1b6f22a99bbb6639e8171607a Author: huangli <areyo...@gmail.com> AuthorDate: Wed Dec 15 20:03:23 2021 +0800 [ISSUE #3637] Add enableDetailStat in BrokerConfig so we can disable stat of queue level. (#3638) [ISSUE #3637] Add enableDetailStat in BrokerConfig so we can disable stat of queue level. --- .../apache/rocketmq/broker/BrokerController.java | 2 +- .../broker/processor/PullMessageProcessor.java | 2 +- .../broker/filter/MessageStoreWithFilterTest.java | 2 +- .../org/apache/rocketmq/common/BrokerConfig.java | 10 ++++ .../rocketmq/store/stats/BrokerStatsManager.java | 59 +++++++++++++++------- .../apache/rocketmq/store/BatchPutMessageTest.java | 2 +- .../apache/rocketmq/store/ConsumeQueueTest.java | 2 +- .../store/DefaultMessageStoreCleanFilesTest.java | 2 +- .../store/DefaultMessageStoreShutDownTest.java | 2 +- .../rocketmq/store/DefaultMessageStoreTest.java | 2 +- .../java/org/apache/rocketmq/store/HATest.java | 2 +- .../rocketmq/store/ScheduleMessageServiceTest.java | 2 +- .../store/dledger/MessageStoreTestBase.java | 4 +- .../store/schedule/ScheduleMessageServiceTest.java | 2 +- .../test/java/stats/BrokerStatsManagerTest.java | 2 +- 15 files changed, 65 insertions(+), 32 deletions(-) diff --git a/broker/src/main/java/org/apache/rocketmq/broker/BrokerController.java b/broker/src/main/java/org/apache/rocketmq/broker/BrokerController.java index bce21c5..662ec49 100644 --- a/broker/src/main/java/org/apache/rocketmq/broker/BrokerController.java +++ b/broker/src/main/java/org/apache/rocketmq/broker/BrokerController.java @@ -206,7 +206,7 @@ public class BrokerController { this.heartbeatThreadPoolQueue = new LinkedBlockingQueue<Runnable>(this.brokerConfig.getHeartbeatThreadPoolQueueCapacity()); this.endTransactionThreadPoolQueue = new LinkedBlockingQueue<Runnable>(this.brokerConfig.getEndTransactionPoolQueueCapacity()); - this.brokerStatsManager = new BrokerStatsManager(this.brokerConfig.getBrokerClusterName()); + this.brokerStatsManager = new BrokerStatsManager(this.brokerConfig.getBrokerClusterName(), this.brokerConfig.isEnableDetailStat()); this.setStoreHost(new InetSocketAddress(this.getBrokerConfig().getBrokerIP1(), this.getNettyServerConfig().getListenPort())); this.brokerFastFailure = new BrokerFastFailure(this); diff --git a/broker/src/main/java/org/apache/rocketmq/broker/processor/PullMessageProcessor.java b/broker/src/main/java/org/apache/rocketmq/broker/processor/PullMessageProcessor.java index 8879a72..20665a3 100644 --- a/broker/src/main/java/org/apache/rocketmq/broker/processor/PullMessageProcessor.java +++ b/broker/src/main/java/org/apache/rocketmq/broker/processor/PullMessageProcessor.java @@ -91,6 +91,7 @@ public class PullMessageProcessor extends AsyncNettyRequestProcessor implements private RemotingCommand processRequest(final Channel channel, RemotingCommand request, boolean brokerAllowSuspend) throws RemotingCommandException { + final long beginTimeMills = this.brokerController.getMessageStore().now(); RemotingCommand response = RemotingCommand.createResponseCommand(PullMessageResponseHeader.class); final PullMessageResponseHeader responseHeader = (PullMessageResponseHeader) response.readCustomHeader(); final PullMessageRequestHeader requestHeader = @@ -379,7 +380,6 @@ public class PullMessageProcessor extends AsyncNettyRequestProcessor implements this.brokerController.getBrokerStatsManager().incBrokerGetNums(getMessageResult.getMessageCount()); if (this.brokerController.getBrokerConfig().isTransferMsgByHeap()) { - final long beginTimeMills = this.brokerController.getMessageStore().now(); final byte[] r = this.readGetMessageResult(getMessageResult, requestHeader.getConsumerGroup(), requestHeader.getTopic(), requestHeader.getQueueId()); this.brokerController.getBrokerStatsManager().incGroupGetLatency(requestHeader.getConsumerGroup(), requestHeader.getTopic(), requestHeader.getQueueId(), diff --git a/broker/src/test/java/org/apache/rocketmq/broker/filter/MessageStoreWithFilterTest.java b/broker/src/test/java/org/apache/rocketmq/broker/filter/MessageStoreWithFilterTest.java index 34dc640..23b38e2 100644 --- a/broker/src/test/java/org/apache/rocketmq/broker/filter/MessageStoreWithFilterTest.java +++ b/broker/src/test/java/org/apache/rocketmq/broker/filter/MessageStoreWithFilterTest.java @@ -146,7 +146,7 @@ public class MessageStoreWithFilterTest { DefaultMessageStore master = new DefaultMessageStore( messageStoreConfig, - new BrokerStatsManager(brokerConfig.getBrokerClusterName()), + new BrokerStatsManager(brokerConfig.getBrokerClusterName(), brokerConfig.isEnableDetailStat()), new MessageArrivingListener() { @Override public void arriving(String topic, int queueId, long logicOffset, long tagsCode, diff --git a/common/src/main/java/org/apache/rocketmq/common/BrokerConfig.java b/common/src/main/java/org/apache/rocketmq/common/BrokerConfig.java index f710cdb..a45efbe 100644 --- a/common/src/main/java/org/apache/rocketmq/common/BrokerConfig.java +++ b/common/src/main/java/org/apache/rocketmq/common/BrokerConfig.java @@ -185,6 +185,8 @@ public class BrokerConfig { private boolean storeReplyMessageEnable = true; + private boolean enableDetailStat = true; + private boolean autoDeleteUnusedStats = false; public static String localHostName() { @@ -797,6 +799,14 @@ public class BrokerConfig { this.storeReplyMessageEnable = storeReplyMessageEnable; } + public boolean isEnableDetailStat() { + return enableDetailStat; + } + + public void setEnableDetailStat(boolean enableDetailStat) { + this.enableDetailStat = enableDetailStat; + } + public boolean isAutoDeleteUnusedStats() { return autoDeleteUnusedStats; } diff --git a/store/src/main/java/org/apache/rocketmq/store/stats/BrokerStatsManager.java b/store/src/main/java/org/apache/rocketmq/store/stats/BrokerStatsManager.java index 91de4a2..531d3fd 100644 --- a/store/src/main/java/org/apache/rocketmq/store/stats/BrokerStatsManager.java +++ b/store/src/main/java/org/apache/rocketmq/store/stats/BrokerStatsManager.java @@ -72,16 +72,20 @@ public class BrokerStatsManager { "CommercialStatsThread")); private final HashMap<String, StatsItemSet> statsTable = new HashMap<String, StatsItemSet>(); private final String clusterName; + private final boolean enableQueueStat; private final MomentStatsItemSet momentStatsItemSetFallSize = new MomentStatsItemSet(GROUP_GET_FALL_SIZE, scheduledExecutorService, log); private final MomentStatsItemSet momentStatsItemSetFallTime = new MomentStatsItemSet(GROUP_GET_FALL_TIME, scheduledExecutorService, log); - public BrokerStatsManager(String clusterName) { + public BrokerStatsManager(String clusterName, boolean enableQueueStat) { this.clusterName = clusterName; + this.enableQueueStat = enableQueueStat; - this.statsTable.put(QUEUE_PUT_NUMS, new StatsItemSet(QUEUE_PUT_NUMS, this.scheduledExecutorService, log)); - this.statsTable.put(QUEUE_PUT_SIZE, new StatsItemSet(QUEUE_PUT_SIZE, this.scheduledExecutorService, log)); - this.statsTable.put(QUEUE_GET_NUMS, new StatsItemSet(QUEUE_GET_NUMS, this.scheduledExecutorService, log)); - this.statsTable.put(QUEUE_GET_SIZE, new StatsItemSet(QUEUE_GET_SIZE, this.scheduledExecutorService, log)); + if (enableQueueStat) { + this.statsTable.put(QUEUE_PUT_NUMS, new StatsItemSet(QUEUE_PUT_NUMS, this.scheduledExecutorService, log)); + this.statsTable.put(QUEUE_PUT_SIZE, new StatsItemSet(QUEUE_PUT_SIZE, this.scheduledExecutorService, log)); + this.statsTable.put(QUEUE_GET_NUMS, new StatsItemSet(QUEUE_GET_NUMS, this.scheduledExecutorService, log)); + this.statsTable.put(QUEUE_GET_SIZE, new StatsItemSet(QUEUE_GET_SIZE, this.scheduledExecutorService, log)); + } this.statsTable.put(TOPIC_PUT_NUMS, new StatsItemSet(TOPIC_PUT_NUMS, this.scheduledExecutorService, log)); this.statsTable.put(TOPIC_PUT_SIZE, new StatsItemSet(TOPIC_PUT_SIZE, this.scheduledExecutorService, log)); this.statsTable.put(GROUP_GET_NUMS, new StatsItemSet(GROUP_GET_NUMS, this.scheduledExecutorService, log)); @@ -132,8 +136,10 @@ public class BrokerStatsManager { public void onTopicDeleted(final String topic) { this.statsTable.get(TOPIC_PUT_NUMS).delValue(topic); this.statsTable.get(TOPIC_PUT_SIZE).delValue(topic); - this.statsTable.get(QUEUE_PUT_NUMS).delValueByPrefixKey(topic, "@"); - this.statsTable.get(QUEUE_PUT_SIZE).delValueByPrefixKey(topic, "@"); + if (enableQueueStat) { + this.statsTable.get(QUEUE_PUT_NUMS).delValueByPrefixKey(topic, "@"); + this.statsTable.get(QUEUE_PUT_SIZE).delValueByPrefixKey(topic, "@"); + } this.statsTable.get(GROUP_GET_NUMS).delValueByPrefixKey(topic, "@"); this.statsTable.get(GROUP_GET_SIZE).delValueByPrefixKey(topic, "@"); this.statsTable.get(QUEUE_GET_NUMS).delValueByPrefixKey(topic, "@"); @@ -147,8 +153,10 @@ public class BrokerStatsManager { public void onGroupDeleted(final String group) { this.statsTable.get(GROUP_GET_NUMS).delValueBySuffixKey(group, "@"); this.statsTable.get(GROUP_GET_SIZE).delValueBySuffixKey(group, "@"); - this.statsTable.get(QUEUE_GET_NUMS).delValueBySuffixKey(group, "@"); - this.statsTable.get(QUEUE_GET_SIZE).delValueBySuffixKey(group, "@"); + if (enableQueueStat) { + this.statsTable.get(QUEUE_GET_NUMS).delValueBySuffixKey(group, "@"); + this.statsTable.get(QUEUE_GET_SIZE).delValueBySuffixKey(group, "@"); + } this.statsTable.get(SNDBCK_PUT_NUMS).delValueBySuffixKey(group, "@"); this.statsTable.get(GROUP_GET_LATENCY).delValueBySuffixKey(group, "@"); this.momentStatsItemSetFallSize.delValueBySuffixKey(group, "@"); @@ -156,25 +164,35 @@ public class BrokerStatsManager { } public void incQueuePutNums(final String topic, final Integer queueId) { - this.statsTable.get(QUEUE_PUT_NUMS).addValue(buildStatsKey(topic, queueId), 1, 1); + if (enableQueueStat) { + this.statsTable.get(QUEUE_PUT_NUMS).addValue(buildStatsKey(topic, queueId), 1, 1); + } } public void incQueuePutNums(final String topic, final Integer queueId, int num, int times) { - this.statsTable.get(QUEUE_PUT_NUMS).addValue(buildStatsKey(topic, queueId), num, times); + if (enableQueueStat) { + this.statsTable.get(QUEUE_PUT_NUMS).addValue(buildStatsKey(topic, queueId), num, times); + } } public void incQueuePutSize(final String topic, final Integer queueId, final int size) { - this.statsTable.get(QUEUE_PUT_SIZE).addValue(buildStatsKey(topic, queueId), size, 1); + if (enableQueueStat) { + this.statsTable.get(QUEUE_PUT_SIZE).addValue(buildStatsKey(topic, queueId), size, 1); + } } public void incQueueGetNums(final String group, final String topic, final Integer queueId, final int incValue) { - final String statsKey = buildStatsKey(topic, queueId, group); - this.statsTable.get(QUEUE_GET_NUMS).addValue(statsKey, incValue, 1); + if (enableQueueStat) { + final String statsKey = buildStatsKey(topic, queueId, group); + this.statsTable.get(QUEUE_GET_NUMS).addValue(statsKey, incValue, 1); + } } public void incQueueGetSize(final String group, final String topic, final Integer queueId, final int incValue) { - final String statsKey = buildStatsKey(topic, queueId, group); - this.statsTable.get(QUEUE_GET_SIZE).addValue(statsKey, incValue, 1); + if (enableQueueStat) { + final String statsKey = buildStatsKey(topic, queueId, group); + this.statsTable.get(QUEUE_GET_SIZE).addValue(statsKey, incValue, 1); + } } public void incTopicPutNums(final String topic) { @@ -244,8 +262,13 @@ public class BrokerStatsManager { } public void incGroupGetLatency(final String group, final String topic, final int queueId, final int incValue) { - final String statsKey = buildStatsKey(queueId, topic, group); - this.statsTable.get(GROUP_GET_LATENCY).addValue(statsKey, incValue, 1); + String statsKey; + if (enableQueueStat) { + statsKey = buildStatsKey(queueId, topic, group); + } else { + statsKey = buildStatsKey(topic, group); + } + this.statsTable.get(GROUP_GET_LATENCY).addRTValue(statsKey, incValue, 1); } public void incBrokerPutNums() { diff --git a/store/src/test/java/org/apache/rocketmq/store/BatchPutMessageTest.java b/store/src/test/java/org/apache/rocketmq/store/BatchPutMessageTest.java index 3bc52e3..0d1e2f3 100644 --- a/store/src/test/java/org/apache/rocketmq/store/BatchPutMessageTest.java +++ b/store/src/test/java/org/apache/rocketmq/store/BatchPutMessageTest.java @@ -76,7 +76,7 @@ public class BatchPutMessageTest { messageStoreConfig.setFlushIntervalConsumeQueue(1); messageStoreConfig.setStorePathRootDir(System.getProperty("user.home") + File.separator + "putmessagesteststore"); messageStoreConfig.setStorePathCommitLog(System.getProperty("user.home") + File.separator + "putmessagesteststore" + File.separator + "commitlog"); - return new DefaultMessageStore(messageStoreConfig, new BrokerStatsManager("simpleTest"), new MyMessageArrivingListener(), new BrokerConfig()); + return new DefaultMessageStore(messageStoreConfig, new BrokerStatsManager("simpleTest", true), new MyMessageArrivingListener(), new BrokerConfig()); } @Test diff --git a/store/src/test/java/org/apache/rocketmq/store/ConsumeQueueTest.java b/store/src/test/java/org/apache/rocketmq/store/ConsumeQueueTest.java index 7c57813..7654e0a 100644 --- a/store/src/test/java/org/apache/rocketmq/store/ConsumeQueueTest.java +++ b/store/src/test/java/org/apache/rocketmq/store/ConsumeQueueTest.java @@ -130,7 +130,7 @@ public class ConsumeQueueTest { DefaultMessageStore master = new DefaultMessageStore( messageStoreConfig, - new BrokerStatsManager(brokerConfig.getBrokerClusterName()), + new BrokerStatsManager(brokerConfig.getBrokerClusterName(), brokerConfig.isEnableDetailStat()), new MessageArrivingListener() { @Override public void arriving(String topic, int queueId, long logicOffset, long tagsCode, diff --git a/store/src/test/java/org/apache/rocketmq/store/DefaultMessageStoreCleanFilesTest.java b/store/src/test/java/org/apache/rocketmq/store/DefaultMessageStoreCleanFilesTest.java index 69c1673..d8202eb 100644 --- a/store/src/test/java/org/apache/rocketmq/store/DefaultMessageStoreCleanFilesTest.java +++ b/store/src/test/java/org/apache/rocketmq/store/DefaultMessageStoreCleanFilesTest.java @@ -486,7 +486,7 @@ public class DefaultMessageStoreCleanFilesTest { private void initMessageStore(MessageStoreConfig messageStoreConfig, double diskSpaceCleanForciblyRatio) throws Exception { messageStore = new DefaultMessageStore(messageStoreConfig, - new BrokerStatsManager("test"), new MyMessageArrivingListener(), new BrokerConfig()); + new BrokerStatsManager("test", true), new MyMessageArrivingListener(), new BrokerConfig()); cleanCommitLogService = getCleanCommitLogService(diskSpaceCleanForciblyRatio); cleanConsumeQueueService = getCleanConsumeQueueService(); diff --git a/store/src/test/java/org/apache/rocketmq/store/DefaultMessageStoreShutDownTest.java b/store/src/test/java/org/apache/rocketmq/store/DefaultMessageStoreShutDownTest.java index db7d367..788bdbd 100644 --- a/store/src/test/java/org/apache/rocketmq/store/DefaultMessageStoreShutDownTest.java +++ b/store/src/test/java/org/apache/rocketmq/store/DefaultMessageStoreShutDownTest.java @@ -70,7 +70,7 @@ public class DefaultMessageStoreShutDownTest { messageStoreConfig.setMaxHashSlotNum(10000); messageStoreConfig.setMaxIndexNum(100 * 100); messageStoreConfig.setFlushDiskType(FlushDiskType.SYNC_FLUSH); - return new DefaultMessageStore(messageStoreConfig, new BrokerStatsManager("simpleTest"), null, new BrokerConfig()); + return new DefaultMessageStore(messageStoreConfig, new BrokerStatsManager("simpleTest", true), null, new BrokerConfig()); } diff --git a/store/src/test/java/org/apache/rocketmq/store/DefaultMessageStoreTest.java b/store/src/test/java/org/apache/rocketmq/store/DefaultMessageStoreTest.java index f3e619d..96451e3 100644 --- a/store/src/test/java/org/apache/rocketmq/store/DefaultMessageStoreTest.java +++ b/store/src/test/java/org/apache/rocketmq/store/DefaultMessageStoreTest.java @@ -112,7 +112,7 @@ public class DefaultMessageStoreTest { messageStoreConfig.setMaxIndexNum(100 * 100); messageStoreConfig.setFlushDiskType(FlushDiskType.SYNC_FLUSH); messageStoreConfig.setFlushIntervalConsumeQueue(1); - return new DefaultMessageStore(messageStoreConfig, new BrokerStatsManager("simpleTest"), new MyMessageArrivingListener(), new BrokerConfig()); + return new DefaultMessageStore(messageStoreConfig, new BrokerStatsManager("simpleTest", true), new MyMessageArrivingListener(), new BrokerConfig()); } @Test diff --git a/store/src/test/java/org/apache/rocketmq/store/HATest.java b/store/src/test/java/org/apache/rocketmq/store/HATest.java index a2702a0..c82a237 100644 --- a/store/src/test/java/org/apache/rocketmq/store/HATest.java +++ b/store/src/test/java/org/apache/rocketmq/store/HATest.java @@ -60,7 +60,7 @@ public class HATest { private MessageStore slaveMessageStore; private MessageStoreConfig masterMessageStoreConfig; private MessageStoreConfig slaveStoreConfig; - private BrokerStatsManager brokerStatsManager = new BrokerStatsManager("simpleTest"); + private BrokerStatsManager brokerStatsManager = new BrokerStatsManager("simpleTest", true); private String storePathRootParentDir = System.getProperty("user.home") + File.separator + UUID.randomUUID().toString().replace("-", ""); private String storePathRootDir = storePathRootParentDir + File.separator + "store"; diff --git a/store/src/test/java/org/apache/rocketmq/store/ScheduleMessageServiceTest.java b/store/src/test/java/org/apache/rocketmq/store/ScheduleMessageServiceTest.java index 0eafe4c..bfcebd7 100644 --- a/store/src/test/java/org/apache/rocketmq/store/ScheduleMessageServiceTest.java +++ b/store/src/test/java/org/apache/rocketmq/store/ScheduleMessageServiceTest.java @@ -79,6 +79,6 @@ public class ScheduleMessageServiceTest { messageStoreConfig.setMaxIndexNum(100 * 100); messageStoreConfig.setFlushDiskType(FlushDiskType.SYNC_FLUSH); messageStoreConfig.setFlushIntervalConsumeQueue(1); - return new DefaultMessageStore(messageStoreConfig, new BrokerStatsManager("simpleTest"), null, new BrokerConfig()); + return new DefaultMessageStore(messageStoreConfig, new BrokerStatsManager("simpleTest", true), null, new BrokerConfig()); } } diff --git a/store/src/test/java/org/apache/rocketmq/store/dledger/MessageStoreTestBase.java b/store/src/test/java/org/apache/rocketmq/store/dledger/MessageStoreTestBase.java index 5864b28..7a77e95 100644 --- a/store/src/test/java/org/apache/rocketmq/store/dledger/MessageStoreTestBase.java +++ b/store/src/test/java/org/apache/rocketmq/store/dledger/MessageStoreTestBase.java @@ -56,7 +56,7 @@ public class MessageStoreTestBase extends StoreTestBase { storeConfig.setdLegerGroup(group); storeConfig.setdLegerPeers(peers); storeConfig.setdLegerSelfId(selfId); - DefaultMessageStore defaultMessageStore = new DefaultMessageStore(storeConfig, new BrokerStatsManager("DLedgerCommitlogTest"), (topic, queueId, logicOffset, tagsCode, msgStoreTime, filterBitMap, properties) -> { + DefaultMessageStore defaultMessageStore = new DefaultMessageStore(storeConfig, new BrokerStatsManager("DLedgerCommitlogTest", true), (topic, queueId, logicOffset, tagsCode, msgStoreTime, filterBitMap, properties) -> { }, new BrokerConfig()); DLedgerServer dLegerServer = ((DLedgerCommitLog) defaultMessageStore.getCommitLog()).getdLedgerServer(); @@ -106,7 +106,7 @@ public class MessageStoreTestBase extends StoreTestBase { storeConfig.setStorePathRootDir(base); storeConfig.setStorePathCommitLog(base + File.separator + "commitlog"); storeConfig.setFlushDiskType(FlushDiskType.ASYNC_FLUSH); - DefaultMessageStore defaultMessageStore = new DefaultMessageStore(storeConfig, new BrokerStatsManager("CommitlogTest"), (topic, queueId, logicOffset, tagsCode, msgStoreTime, filterBitMap, properties) -> { + DefaultMessageStore defaultMessageStore = new DefaultMessageStore(storeConfig, new BrokerStatsManager("CommitlogTest", true), (topic, queueId, logicOffset, tagsCode, msgStoreTime, filterBitMap, properties) -> { }, new BrokerConfig()); diff --git a/store/src/test/java/org/apache/rocketmq/store/schedule/ScheduleMessageServiceTest.java b/store/src/test/java/org/apache/rocketmq/store/schedule/ScheduleMessageServiceTest.java index d375fb0..de3cf7f 100644 --- a/store/src/test/java/org/apache/rocketmq/store/schedule/ScheduleMessageServiceTest.java +++ b/store/src/test/java/org/apache/rocketmq/store/schedule/ScheduleMessageServiceTest.java @@ -103,7 +103,7 @@ public class ScheduleMessageServiceTest { messageStoreConfig.setStorePathCommitLog(storePath + File.separator + "commitlog"); brokerConfig = new BrokerConfig(); - BrokerStatsManager manager = new BrokerStatsManager(brokerConfig.getBrokerClusterName()); + BrokerStatsManager manager = new BrokerStatsManager(brokerConfig.getBrokerClusterName(), brokerConfig.isEnableDetailStat()); messageStore = new DefaultMessageStore(messageStoreConfig, manager, new MyMessageArrivingListener(), new BrokerConfig()); assertThat(messageStore.load()).isTrue(); diff --git a/store/src/test/java/stats/BrokerStatsManagerTest.java b/store/src/test/java/stats/BrokerStatsManagerTest.java index 2b6d0f8..137c37d 100644 --- a/store/src/test/java/stats/BrokerStatsManagerTest.java +++ b/store/src/test/java/stats/BrokerStatsManagerTest.java @@ -47,7 +47,7 @@ public class BrokerStatsManagerTest { @Before public void init() { - brokerStatsManager = new BrokerStatsManager("DefaultCluster"); + brokerStatsManager = new BrokerStatsManager("DefaultCluster", true); brokerStatsManager.start(); }