This is an automated email from the ASF dual-hosted git repository. dockerzhang pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/inlong.git
The following commit(s) were added to refs/heads/master by this push: new 6bbb5d99d [INLONG-6035][TubeMQ] Add Broker's message append and file refresh delay statistics (#6036) 6bbb5d99d is described below commit 6bbb5d99d09f4de5f26f79f2b04ab9905014743f Author: Goson Zhang <4675...@qq.com> AuthorDate: Tue Sep 27 14:17:04 2022 +0800 [INLONG-6035][TubeMQ] Add Broker's message append and file refresh delay statistics (#6036) --- .../inlong/tubemq/server/broker/TubeBroker.java | 63 +++++++++--------- .../server/broker/metadata/TopicMetadata.java | 2 +- .../server/broker/msgstore/MessageStore.java | 47 +++++++++----- .../broker/msgstore/MessageStoreManager.java | 23 +++---- .../server/broker/msgstore/disk/MsgFileStore.java | 75 +++++++++++++--------- .../server/broker/msgstore/mem/MsgMemStore.java | 17 ++--- .../server/broker/offset/DefaultOffsetManager.java | 10 +-- .../server/broker/stats/MsgStoreStatsHolder.java | 45 +++++++------ .../broker/stats/MsgStoreStatsHolderTest.java | 58 +++++++++-------- 9 files changed, 191 insertions(+), 149 deletions(-) diff --git a/inlong-tubemq/tubemq-server/src/main/java/org/apache/inlong/tubemq/server/broker/TubeBroker.java b/inlong-tubemq/tubemq-server/src/main/java/org/apache/inlong/tubemq/server/broker/TubeBroker.java index fb248059f..397ccfd1f 100644 --- a/inlong-tubemq/tubemq-server/src/main/java/org/apache/inlong/tubemq/server/broker/TubeBroker.java +++ b/inlong-tubemq/tubemq-server/src/main/java/org/apache/inlong/tubemq/server/broker/TubeBroker.java @@ -320,24 +320,24 @@ public class TubeBroker implements Stoppable { .append(TubeServerVersion.SERVER_VERSION).toString(); } - private void procConfigFromHeartBeat(StringBuilder sBuilder, + private void procConfigFromHeartBeat(StringBuilder strBuff, HeartResponseM2B response) { // process service status ServiceStatusHolder .setReadWriteServiceStatus(response.getStopRead(), response.getStopWrite(), "Master"); - // process flow controller rules - FlowCtrlRuleHandler flowCtrlRuleHandler = + // update default flow controller rules + FlowCtrlRuleHandler defFlowCtrlHandler = metadataManager.getFlowCtrlRuleHandler(); - long flowCheckId = flowCtrlRuleHandler.getFlowCtrlId(); - int qryPriorityId = flowCtrlRuleHandler.getQryPriorityId(); + long flowCheckId = defFlowCtrlHandler.getFlowCtrlId(); + int qryPriorityId = defFlowCtrlHandler.getQryPriorityId(); if (response.hasFlowCheckId()) { qryPriorityId = response.hasQryPriorityId() ? response.getQryPriorityId() : qryPriorityId; if (response.getFlowCheckId() != flowCheckId) { flowCheckId = response.getFlowCheckId(); try { - flowCtrlRuleHandler + defFlowCtrlHandler .updateFlowCtrlInfo(qryPriorityId, flowCheckId, response.getFlowControlInfo()); } catch (Exception e1) { @@ -345,8 +345,8 @@ public class TubeBroker implements Stoppable { "[HeartBeat response] found parse flowCtrl rules failure", e1); } } - if (qryPriorityId != flowCtrlRuleHandler.getQryPriorityId()) { - flowCtrlRuleHandler.setQryPriorityId(qryPriorityId); + if (qryPriorityId != defFlowCtrlHandler.getQryPriorityId()) { + defFlowCtrlHandler.setQryPriorityId(qryPriorityId); } } // update configure report requirement @@ -356,36 +356,37 @@ public class TubeBroker implements Stoppable { long configId = response.getClsConfig().getConfigId(); if (configId != ClusterConfigHolder.getConfigId()) { ClusterConfigHolder.updClusterSetting(response.getClsConfig()); - logger.info(sBuilder - .append("[HeartBeat response] received cluster configure changed,") + logger.info(strBuff + .append("[HeartBeat response] received cluster configure changed") .append(",hasClsConfig=").append(response.hasClsConfig()) .append(",curClusterConfigId=").append(ClusterConfigHolder.getConfigId()) .append(",curMaxMsgSize=").append(ClusterConfigHolder.getMaxMsgSize()) .append(",minMemCacheSize=") .append(ClusterConfigHolder.getMinMemCacheSize()) .toString()); - sBuilder.delete(0, sBuilder.length()); + strBuff.delete(0, strBuff.length()); } } if (response.getTakeConfInfo()) { - logger.info(sBuilder + logger.info(strBuff .append("[HeartBeat response] received broker metadata info: brokerConfId=") .append(response.getCurBrokerConfId()) .append(",stopWrite=").append(response.getStopWrite()) .append(",stopRead=").append(response.getStopRead()) .append(",configCheckSumId=").append(response.getConfCheckSumId()) .append(",hasFlowCtrl=").append(response.hasFlowCheckId()) - .append(",curFlowCtrlId=").append(flowCheckId) - .append(",curQryPriorityId=").append(qryPriorityId) + .append(",curFlowCtrlId=").append(defFlowCtrlHandler.getFlowCtrlId()) + .append(",curQryPriorityId=").append(defFlowCtrlHandler.getQryPriorityId()) + .append(",curMaxMsgSize=").append(ClusterConfigHolder.getMaxMsgSize()) .append(",brokerDefaultConfInfo=") .append(response.getBrokerDefaultConfInfo()) .append(",brokerTopicSetConfList=") - .append(response.getBrokerTopicSetConfInfoList().toString()).toString()); - sBuilder.delete(0, sBuilder.length()); + .append(response.getBrokerTopicSetConfInfoList()).toString()); + strBuff.delete(0, strBuff.length()); metadataManager .updateBrokerTopicConfigMap(response.getCurBrokerConfId(), response.getConfCheckSumId(), response.getBrokerDefaultConfInfo(), - response.getBrokerTopicSetConfInfoList(), false, sBuilder); + response.getBrokerTopicSetConfInfoList(), false, strBuff); } // update auth info if (response.hasBrokerAuthorizedInfo()) { @@ -395,7 +396,7 @@ public class TubeBroker implements Stoppable { boolean needProcess = metadataManager.updateBrokerRemoveTopicMap( response.getTakeRemoveTopicInfo(), - response.getRemoveTopicConfInfoList(), sBuilder); + response.getRemoveTopicConfInfoList(), strBuff); if (needProcess) { new Thread() { @Override @@ -440,29 +441,29 @@ public class TubeBroker implements Stoppable { } } - private void procConfigFromRegister(StringBuilder sBuilder, + private void procConfigFromRegister(StringBuilder strBuff, final RegisterResponseM2B response) { // process service status ServiceStatusHolder .setReadWriteServiceStatus(response.getStopRead(), response.getStopWrite(), "Master"); - // process flow controller rules - FlowCtrlRuleHandler flowCtrlRuleHandler = + // process default flow controller rules + FlowCtrlRuleHandler defFlowCtrlHandler = metadataManager.getFlowCtrlRuleHandler(); if (response.hasFlowCheckId()) { int qryPriorityId = response.hasQryPriorityId() - ? response.getQryPriorityId() : flowCtrlRuleHandler.getQryPriorityId(); - if (response.getFlowCheckId() != flowCtrlRuleHandler.getFlowCtrlId()) { + ? response.getQryPriorityId() : defFlowCtrlHandler.getQryPriorityId(); + if (response.getFlowCheckId() != defFlowCtrlHandler.getFlowCtrlId()) { try { - flowCtrlRuleHandler + defFlowCtrlHandler .updateFlowCtrlInfo(response.getQryPriorityId(), response.getFlowCheckId(), response.getFlowControlInfo()); } catch (Exception e1) { logger.warn("[Register response] found parse flowCtrl rules failure", e1); } } - if (qryPriorityId != flowCtrlRuleHandler.getQryPriorityId()) { - flowCtrlRuleHandler.setQryPriorityId(qryPriorityId); + if (qryPriorityId != defFlowCtrlHandler.getQryPriorityId()) { + defFlowCtrlHandler.setQryPriorityId(qryPriorityId); } } // update auth info @@ -477,14 +478,14 @@ public class TubeBroker implements Stoppable { ClusterConfigHolder.updClusterSetting(response.getClsConfig()); } } - sBuilder.append("[Register response] received broker metadata info: brokerConfId=") + strBuff.append("[Register response] received broker metadata info: brokerConfId=") .append(response.getCurBrokerConfId()) .append(",stopWrite=").append(response.getStopWrite()) .append(",stopRead=").append(response.getStopRead()) .append(",configCheckSumId=").append(response.getConfCheckSumId()) .append(",hasFlowCtrl=").append(response.hasFlowCheckId()) - .append(",curFlowCtrlId=").append(flowCtrlRuleHandler.getFlowCtrlId()) - .append(",curQryPriorityId=").append(flowCtrlRuleHandler.getQryPriorityId()) + .append(",curFlowCtrlId=").append(defFlowCtrlHandler.getFlowCtrlId()) + .append(",curQryPriorityId=").append(defFlowCtrlHandler.getQryPriorityId()) .append(",hasClsConfig=").append(response.hasClsConfig()) .append(",curClusterConfigId=").append(ClusterConfigHolder.getConfigId()) .append(",curMaxMsgSize=").append(ClusterConfigHolder.getMaxMsgSize()) @@ -501,10 +502,10 @@ public class TubeBroker implements Stoppable { .append(",brokerDefaultConfInfo=").append(response.getBrokerDefaultConfInfo()) .append(",brokerTopicSetConfList=") .append(response.getBrokerTopicSetConfInfoList().toString()).toString(); - sBuilder.delete(0, sBuilder.length()); + strBuff.delete(0, strBuff.length()); metadataManager.updateBrokerTopicConfigMap(response.getCurBrokerConfId(), response.getConfCheckSumId(), response.getBrokerDefaultConfInfo(), - response.getBrokerTopicSetConfInfoList(), true, sBuilder); + response.getBrokerTopicSetConfInfoList(), true, strBuff); } // build cluster configure info diff --git a/inlong-tubemq/tubemq-server/src/main/java/org/apache/inlong/tubemq/server/broker/metadata/TopicMetadata.java b/inlong-tubemq/tubemq-server/src/main/java/org/apache/inlong/tubemq/server/broker/metadata/TopicMetadata.java index 77116a3f2..c4ed4b32a 100644 --- a/inlong-tubemq/tubemq-server/src/main/java/org/apache/inlong/tubemq/server/broker/metadata/TopicMetadata.java +++ b/inlong-tubemq/tubemq-server/src/main/java/org/apache/inlong/tubemq/server/broker/metadata/TopicMetadata.java @@ -78,7 +78,7 @@ public class TopicMetadata { return; } String[] topicConfInfoArr = - topicMetaConfInfo.split(TokenConstants.ATTR_SEP); + topicMetaConfInfo.split(TokenConstants.ATTR_SEP, -1); this.topic = topicConfInfoArr[0]; if (TStringUtils.isBlank(topicConfInfoArr[1])) { this.numPartitions = brokerDefMetadata.getNumPartitions(); diff --git a/inlong-tubemq/tubemq-server/src/main/java/org/apache/inlong/tubemq/server/broker/msgstore/MessageStore.java b/inlong-tubemq/tubemq-server/src/main/java/org/apache/inlong/tubemq/server/broker/msgstore/MessageStore.java index 8cc968a93..c147f4858 100644 --- a/inlong-tubemq/tubemq-server/src/main/java/org/apache/inlong/tubemq/server/broker/msgstore/MessageStore.java +++ b/inlong-tubemq/tubemq-server/src/main/java/org/apache/inlong/tubemq/server/broker/msgstore/MessageStore.java @@ -333,7 +333,8 @@ public class MessageStore implements Closeable { .append("[Data Store] Closed MessageStore for storeKey ") .append(this.storeKey).toString()); } - if (timestamp <= this.msgFileStore.getIndexMaxAppendTime()) { + if (timestamp <= this.msgFileStore.getIndexMaxAppendTime() + || !tubeConfig.isEnableMemStore()) { return this.msgFileStore.getStartOffsetByTimeStamp(timestamp); } this.writeCacheMutex.readLock().lock(); @@ -428,20 +429,27 @@ public class MessageStore implements Closeable { indexBuffer.putLong(receivedTime); indexBuffer.flip(); appendResult.putReceivedInfo(messageId, receivedTime); + boolean appendSuss = true; + long startTime = System.currentTimeMillis(); if (this.tubeConfig.isEnableMemStore()) { do { this.writeCacheMutex.readLock().lock(); try { - if (this.msgMemStore.appendMsg(msgStoreStatsHolder, + appendSuss = this.msgMemStore.appendMsg(msgStoreStatsHolder, partitionId, msgTypeCode, receivedTime, indexBuffer, - msgBufLen, dataBuffer, appendResult)) { - return true; - } + msgBufLen, dataBuffer, appendResult); } finally { this.writeCacheMutex.readLock().unlock(); } + if (appendSuss) { + msgStoreStatsHolder.addMsgWriteSuccess(msgBufLen, + System.currentTimeMillis() - startTime); + return true; + } if (triggerFlushAndAddMsg(true, false, partitionId, msgTypeCode, receivedTime, indexBuffer, msgBufLen, dataBuffer, appendResult)) { + msgStoreStatsHolder.addMsgWriteSuccess(msgBufLen, + System.currentTimeMillis() - startTime); return true; } ThreadUtils.sleep(waitRetryMs); @@ -452,11 +460,17 @@ public class MessageStore implements Closeable { StringBuilder strBuffer = new StringBuilder(TBaseConstants.BUILDER_DEFAULT_SIZE); Tuple3<Boolean, Long, Long> appendRet = - this.msgFileStore.appendMsg(strBuffer, 1, + this.msgFileStore.appendMsg(false, startTime, strBuffer, 1, DataStoreUtils.STORE_INDEX_HEAD_LEN, indexBuffer, - msgBufLen, dataBuffer,receivedTime, receivedTime); + msgBufLen, dataBuffer, receivedTime, receivedTime); appendResult.putAppendResult(appendRet.getF1(), appendRet.getF2()); - return true; + if (appendRet.getF0()) { + msgStoreStatsHolder.addMsgWriteSuccess(msgBufLen, + System.currentTimeMillis() - startTime); + } else { + msgStoreStatsHolder.addMsgWriteFailure(); + } + return appendRet.getF0(); } } @@ -562,7 +576,7 @@ public class MessageStore implements Closeable { strBuffer.delete(0, strBuffer.length()); if (tubeConfig.isEnableMemStore()) { ThreadUtils.sleep(100); - flush(System.currentTimeMillis(), strBuffer); + flush(strBuffer); this.msgMemStore.close(); this.msgMemStoreBeingFlush.close(); this.executor.shutdown(); @@ -672,7 +686,7 @@ public class MessageStore implements Closeable { */ public long getIndexStoreSize() { long totalSize = 0L; - if (!tubeConfig.isEnableMemStore()) { + if (tubeConfig.isEnableMemStore()) { this.writeCacheMutex.readLock().lock(); try { if (this.msgMemStore.getCurMsgCount() > 0) { @@ -698,7 +712,7 @@ public class MessageStore implements Closeable { */ public long getDataStoreSize() { long totalSize = 0L; - if (!tubeConfig.isEnableMemStore()) { + if (tubeConfig.isEnableMemStore()) { this.writeCacheMutex.readLock().lock(); try { if (this.msgMemStore.getCurMsgCount() > 0) { @@ -777,15 +791,15 @@ public class MessageStore implements Closeable { this.executor.execute(new Runnable() { @Override public void run() { - long startTime2 = System.currentTimeMillis(); try { final StringBuilder strBuffer = new StringBuilder(512); - flush(startTime2, strBuffer); + flush(strBuffer); } catch (Throwable e) { logger.error("[Data Store] Error during flush", e); } finally { - msgStoreStatsHolder.addCacheTimeoutFlush( - (System.currentTimeMillis() - startTime2), isTimeTrigger); + if (isTimeTrigger) { + msgStoreStatsHolder.addCacheTimeoutFlush(); + } } } }); @@ -819,7 +833,8 @@ public class MessageStore implements Closeable { return false; } - private void flush(long startTime, StringBuilder strBuffer) throws IOException { + private void flush(StringBuilder strBuffer) throws IOException { + long startTime = System.currentTimeMillis(); flushMutex.lock(); this.lastMemFlushTime.set(System.currentTimeMillis()); try { diff --git a/inlong-tubemq/tubemq-server/src/main/java/org/apache/inlong/tubemq/server/broker/msgstore/MessageStoreManager.java b/inlong-tubemq/tubemq-server/src/main/java/org/apache/inlong/tubemq/server/broker/msgstore/MessageStoreManager.java index 1539c9294..5d9299058 100644 --- a/inlong-tubemq/tubemq-server/src/main/java/org/apache/inlong/tubemq/server/broker/msgstore/MessageStoreManager.java +++ b/inlong-tubemq/tubemq-server/src/main/java/org/apache/inlong/tubemq/server/broker/msgstore/MessageStoreManager.java @@ -421,16 +421,18 @@ public class MessageStoreManager implements StoreService { if (qryTopicSet.isEmpty()) { return topicPubStoreInfoMap; } + Map<Integer, MessageStore> storeMap; + Map<Integer, TopicPubStoreInfo> storeInfoMap; for (String topic : qryTopicSet) { topicMetadata = confTopicInfo.get(topic); if (topicMetadata == null) { continue; } - Map<Integer, MessageStore> storeMap = dataStores.get(topic); + storeMap = dataStores.get(topic); if (storeMap == null) { continue; } - Map<Integer, TopicPubStoreInfo> storeInfoMap = new HashMap<>(); + storeInfoMap = new HashMap<>(); for (Map.Entry<Integer, MessageStore> entry : storeMap.entrySet()) { if (entry == null || entry.getKey() == null @@ -439,11 +441,10 @@ public class MessageStoreManager implements StoreService { } store = entry.getValue(); for (Integer partitionId : topicMetadata.getPartIdsByStoreId(entry.getKey())) { - TopicPubStoreInfo storeInfo = - new TopicPubStoreInfo(topic, entry.getKey(), partitionId, - store.getIndexMinOffset(), store.getIndexMaxOffset(), - store.getDataMinOffset(), store.getDataMaxOffset()); - storeInfoMap.put(partitionId, storeInfo); + storeInfoMap.put(partitionId, new TopicPubStoreInfo(topic, + entry.getKey(), partitionId, store.getIndexMinOffset(), + store.getIndexMaxOffset(), store.getDataMinOffset(), + store.getDataMaxOffset())); } } topicPubStoreInfoMap.put(topic, storeInfoMap); @@ -460,12 +461,13 @@ public class MessageStoreManager implements StoreService { @Override public void getTopicPublishInfos(Map<String, OffsetRecordInfo> groupOffsetMap) { MessageStore store = null; + Map<Integer, MessageStore> storeMap; + Map<String, Map<Integer, RecordItem>> topicOffsetMap; for (Map.Entry<String, OffsetRecordInfo> entry : groupOffsetMap.entrySet()) { if (entry == null || entry.getKey() == null || entry.getValue() == null) { continue; } - Map<String, Map<Integer, RecordItem>> topicOffsetMap = - entry.getValue().getOffsetMap(); + topicOffsetMap = entry.getValue().getOffsetMap(); // Get offset records by topic for (Map.Entry<String, Map<Integer, RecordItem>> entryTopic : topicOffsetMap.entrySet()) { @@ -475,8 +477,7 @@ public class MessageStoreManager implements StoreService { continue; } // Get message store instance - Map<Integer, MessageStore> storeMap = - dataStores.get(entryTopic.getKey()); + storeMap = dataStores.get(entryTopic.getKey()); if (storeMap == null) { continue; } diff --git a/inlong-tubemq/tubemq-server/src/main/java/org/apache/inlong/tubemq/server/broker/msgstore/disk/MsgFileStore.java b/inlong-tubemq/tubemq-server/src/main/java/org/apache/inlong/tubemq/server/broker/msgstore/disk/MsgFileStore.java index 9c9723418..f7fe541eb 100644 --- a/inlong-tubemq/tubemq-server/src/main/java/org/apache/inlong/tubemq/server/broker/msgstore/disk/MsgFileStore.java +++ b/inlong-tubemq/tubemq-server/src/main/java/org/apache/inlong/tubemq/server/broker/msgstore/disk/MsgFileStore.java @@ -118,6 +118,8 @@ public class MsgFileStore implements Closeable { /** * Append message to file segment * + * @param fromMem whether data from memory directly + * @param currTime the current time * @param sb string buffer * @param msgCnt the record count to append * @param indexSize the index buffer length @@ -128,7 +130,8 @@ public class MsgFileStore implements Closeable { * @param rightTime the latest record timestamp * @return file storage status, the index and data offsets of the added message */ - public Tuple3<Boolean, Long, Long> appendMsg(StringBuilder sb, int msgCnt, + public Tuple3<Boolean, Long, Long> appendMsg(boolean fromMem, long currTime, + StringBuilder sb, int msgCnt, int indexSize, ByteBuffer indexBuffer, int dataSize, ByteBuffer dataBuffer, long leftTime, long rightTime) { @@ -145,7 +148,7 @@ public class MsgFileStore implements Closeable { boolean pendingMsgSizeExceed = false; boolean pendingMsgTimeExceed = false; boolean isForceMetadata = false; - // flushed message message count and data size info + // flushed message count and data size info long flushedMsgCnt = 0; long flushedDataSize = 0; // Temporary variables in calculations @@ -155,19 +158,29 @@ public class MsgFileStore implements Closeable { long inDataOffset; Segment curIndexSeg; long indexOffset = -1; - long currTime; // new file paths of creating String newDataFilePath = null; String newIndexFilePath = null; boolean fileStoreOK = false; this.writeLock.lock(); try { - inIndexOffset = dataBuffer.getLong(DataStoreUtils.STORE_HEADER_POS_QUEUE_LOGICOFF); - // filling data segment. + // position last segments curDataSeg = this.dataSegments.last(); + curIndexSeg = this.indexSegments.last(); + // get inputted offsets + if (fromMem) { + inIndexOffset = dataBuffer.getLong(DataStoreUtils.STORE_HEADER_POS_QUEUE_LOGICOFF); + inDataOffset = indexBuffer.getLong(DataStoreUtils.INDEX_POS_DATAOFFSET); + } else { + inIndexOffset = curIndexSeg.getLast(); + inDataOffset = curDataSeg.getLast(); + indexBuffer.putLong(DataStoreUtils.INDEX_POS_DATAOFFSET, inDataOffset); + dataBuffer.putLong(DataStoreUtils.STORE_HEADER_POS_QUEUE_LOGICOFF, inIndexOffset); + } + // filling data segment. this.curUnflushSize.addAndGet(dataSize); dataOffset = curDataSeg.append(dataBuffer, leftTime, rightTime); - // judge whether need to create a new data segment. + // judge whether we need to create a new data segment. if (curDataSeg.getCachedSize() >= this.tubeConfig.getMaxSegmentSize()) { isDataSegFlushed = true; long newDataOffset = curDataSeg.flush(true); @@ -179,10 +192,8 @@ public class MsgFileStore implements Closeable { this.dataSegments.append(new FileSegment(newDataOffset, newDataFile, SegmentType.DATA)); } // filling index data. - inDataOffset = indexBuffer.getLong(DataStoreUtils.INDEX_POS_DATAOFFSET); - curIndexSeg = this.indexSegments.last(); indexOffset = curIndexSeg.append(indexBuffer, leftTime, rightTime); - // judge whether need to create a new index segment. + // judge whether we need to create a new index segment. if (curIndexSeg.getCachedSize() >= this.tubeConfig.getMaxIndexSegmentSize()) { isIndexSegFlushed = true; @@ -195,16 +206,17 @@ public class MsgFileStore implements Closeable { this.indexSegments.append(new FileSegment(newIndexOffset, newIndexFile, SegmentType.INDEX)); } - // check whether need to flush to disk. - currTime = System.currentTimeMillis(); + // check whether we need to flush to disk. pendingMsgSizeExceed = (messageStore.getUnflushDataHold() > 0) && (curUnflushSize.get() >= messageStore.getUnflushDataHold()); - pendingMsgCntExceed = this.curUnflushed.addAndGet(msgCnt) >= messageStore.getUnflushThreshold(); - pendingMsgTimeExceed = currTime - this.lastFlushTime.get() >= messageStore.getUnflushInterval(); - boolean isSegmentRollOver = isDataSegFlushed || isIndexSegFlushed; - - if (pendingMsgCntExceed || pendingMsgTimeExceed || pendingMsgSizeExceed || isSegmentRollOver) { - isForceMetadata = isSegmentRollOver || (currTime - this.lastMetaFlushTime.get() > MAX_META_REFRESH_DUR); + pendingMsgCntExceed = + (this.curUnflushed.addAndGet(msgCnt) >= messageStore.getUnflushThreshold()); + pendingMsgTimeExceed = + (currTime - this.lastFlushTime.get() >= messageStore.getUnflushInterval()); + if (pendingMsgCntExceed || pendingMsgTimeExceed + || pendingMsgSizeExceed || isDataSegFlushed || isIndexSegFlushed) { + isForceMetadata = (isDataSegFlushed || isIndexSegFlushed + || (currTime - this.lastMetaFlushTime.get() > MAX_META_REFRESH_DUR)); if (!isDataSegFlushed) { curDataSeg.flush(isForceMetadata); } @@ -213,7 +225,7 @@ public class MsgFileStore implements Closeable { } flushedMsgCnt = this.curUnflushed.getAndSet(0); flushedDataSize = this.curUnflushSize.getAndSet(0); - this.lastFlushTime.set(System.currentTimeMillis()); + this.lastFlushTime.set(currTime); if (isForceMetadata) { this.lastMetaFlushTime.set(this.lastFlushTime.get()); } @@ -243,18 +255,21 @@ public class MsgFileStore implements Closeable { } finally { this.writeLock.unlock(); // add statistics. - msgStoreStatsHolder.addFileFlushStatsInfo(msgCnt, indexSize, dataSize, - flushedMsgCnt, flushedDataSize, isDataSegFlushed, isIndexSegFlushed, - pendingMsgSizeExceed, pendingMsgCntExceed, pendingMsgTimeExceed, isForceMetadata); - if (isDataSegFlushed) { - logger.info(sb.append("[File Store] Created data segment ") - .append(newDataFilePath).toString()); - sb.delete(0, sb.length()); - } - if (isIndexSegFlushed) { - logger.info(sb.append("[File Store] Created index segment ") - .append(newIndexFilePath).toString()); - sb.delete(0, sb.length()); + if (fileStoreOK) { + msgStoreStatsHolder.addFileFlushStatsInfo(msgCnt, indexSize, dataSize, + flushedMsgCnt, flushedDataSize, isDataSegFlushed, isIndexSegFlushed, + pendingMsgSizeExceed, pendingMsgCntExceed, pendingMsgTimeExceed, + isForceMetadata, System.currentTimeMillis() - currTime); + if (isDataSegFlushed) { + logger.info(sb.append("[File Store] Created data segment ") + .append(newDataFilePath).toString()); + sb.delete(0, sb.length()); + } + if (isIndexSegFlushed) { + logger.info(sb.append("[File Store] Created index segment ") + .append(newIndexFilePath).toString()); + sb.delete(0, sb.length()); + } } } return new Tuple3<>(fileStoreOK, indexOffset, dataOffset); diff --git a/inlong-tubemq/tubemq-server/src/main/java/org/apache/inlong/tubemq/server/broker/msgstore/mem/MsgMemStore.java b/inlong-tubemq/tubemq-server/src/main/java/org/apache/inlong/tubemq/server/broker/msgstore/mem/MsgMemStore.java index e8a432b73..40bba25c2 100644 --- a/inlong-tubemq/tubemq-server/src/main/java/org/apache/inlong/tubemq/server/broker/msgstore/mem/MsgMemStore.java +++ b/inlong-tubemq/tubemq-server/src/main/java/org/apache/inlong/tubemq/server/broker/msgstore/mem/MsgMemStore.java @@ -138,10 +138,13 @@ public class MsgMemStore implements Closeable { this.writeLock.lock(); try { // judge whether can write to memory or not. - if ((fullDataSize = (this.cacheDataOffset.get() + dataEntryLength > this.maxDataCacheSize)) - || (fullCount = (this.curMessageCount.get() + 1 > maxAllowedMsgCount)) - || (fullIndexSize = - (this.cacheIndexOffset.get() + DataStoreUtils.STORE_INDEX_HEAD_LEN > this.maxIndexCacheSize))) { + fullDataSize = + (this.cacheDataOffset.get() + dataEntryLength > this.maxDataCacheSize); + fullCount = + (this.curMessageCount.get() + 1 > maxAllowedMsgCount); + fullIndexSize = + (this.cacheIndexOffset.get() + DataStoreUtils.STORE_INDEX_HEAD_LEN > this.maxIndexCacheSize); + if (fullDataSize || fullCount || fullIndexSize) { isAppended = false; return false; } @@ -163,9 +166,7 @@ public class MsgMemStore implements Closeable { } } finally { this.writeLock.unlock(); - if (isAppended) { - memStatsHolder.addMsgWriteSuccess(dataEntryLength); - } else { + if (!isAppended) { memStatsHolder.addCacheFullType(fullDataSize, fullIndexSize, fullCount); } } @@ -329,7 +330,7 @@ public class MsgMemStore implements Closeable { tmpIndexBuffer.flip(); tmpDataReadBuf.flip(); long startTime = System.currentTimeMillis(); - msgFileStore.appendMsg(strBuffer, curMessageCount.get(), + msgFileStore.appendMsg(true, startTime, strBuffer, curMessageCount.get(), cacheIndexOffset.get(), tmpIndexBuffer, cacheDataOffset.get(), tmpDataReadBuf, leftAppendTime.get(), rightAppendTime.get()); BrokerSrvStatsHolder.updDiskSyncDataDlt(System.currentTimeMillis() - startTime); diff --git a/inlong-tubemq/tubemq-server/src/main/java/org/apache/inlong/tubemq/server/broker/offset/DefaultOffsetManager.java b/inlong-tubemq/tubemq-server/src/main/java/org/apache/inlong/tubemq/server/broker/offset/DefaultOffsetManager.java index cb467528c..cb9cdb353 100644 --- a/inlong-tubemq/tubemq-server/src/main/java/org/apache/inlong/tubemq/server/broker/offset/DefaultOffsetManager.java +++ b/inlong-tubemq/tubemq-server/src/main/java/org/apache/inlong/tubemq/server/broker/offset/DefaultOffsetManager.java @@ -255,8 +255,7 @@ public class DefaultOffsetManager extends AbstractDaemonService implements Offse OffsetStorageInfo regInfo = loadOrCreateOffset(group, topic, partitionId, offsetCacheKey, 0); if ((tmpOffset == 0) && (!regInfo.isFirstCreate())) { - updatedOffset = regInfo.getOffset(); - return updatedOffset; + return regInfo.getOffset(); } updatedOffset = regInfo.addAndGetOffset(tmpOffset); if (logger.isDebugEnabled()) { @@ -462,6 +461,8 @@ public class DefaultOffsetManager extends AbstractDaemonService implements Offse */ @Override public Map<String, OffsetRecordInfo> getOnlineGroupOffsetInfo() { + OffsetRecordInfo recordInfo; + Map<String, OffsetStorageInfo> storeMap; Map<String, OffsetRecordInfo> result = new HashMap<>(); for (Map.Entry<String, ConcurrentHashMap<String, OffsetStorageInfo>> entry : cfmOffsetMap.entrySet()) { @@ -469,7 +470,7 @@ public class DefaultOffsetManager extends AbstractDaemonService implements Offse continue; } // read offset map information - Map<String, OffsetStorageInfo> storeMap = entry.getValue(); + storeMap = entry.getValue(); if (storeMap.isEmpty()) { continue; } @@ -477,7 +478,7 @@ public class DefaultOffsetManager extends AbstractDaemonService implements Offse if (storageInfo == null) { continue; } - OffsetRecordInfo recordInfo = result.get(entry.getKey()); + recordInfo = result.get(entry.getKey()); if (recordInfo == null) { recordInfo = new OffsetRecordInfo( brokerConfig.getBrokerId(), entry.getKey()); @@ -740,5 +741,4 @@ public class DefaultOffsetManager extends AbstractDaemonService implements Offse return (lagValue > TServerConstants.CFG_OFFSET_RESET_MID_ALARM_CHECK) ? 2 : (lagValue > TServerConstants.CFG_OFFSET_RESET_MIN_ALARM_CHECK) ? 1 : 0; } - } diff --git a/inlong-tubemq/tubemq-server/src/main/java/org/apache/inlong/tubemq/server/broker/stats/MsgStoreStatsHolder.java b/inlong-tubemq/tubemq-server/src/main/java/org/apache/inlong/tubemq/server/broker/stats/MsgStoreStatsHolder.java index ae8ac4ca3..a238d93d8 100644 --- a/inlong-tubemq/tubemq-server/src/main/java/org/apache/inlong/tubemq/server/broker/stats/MsgStoreStatsHolder.java +++ b/inlong-tubemq/tubemq-server/src/main/java/org/apache/inlong/tubemq/server/broker/stats/MsgStoreStatsHolder.java @@ -67,12 +67,15 @@ public class MsgStoreStatsHolder { * Add write message success statistics. * * @param msgSize the message size + * @param writeDlt write duration */ - public void addMsgWriteSuccess(int msgSize) { + public void addMsgWriteSuccess(int msgSize, long writeDlt) { if (isClosed) { return; } - msgStoreStatsSets[getIndex()].msgAppendSizeStats.update(msgSize); + MsgStoreStatsItemSet tmStatsSet = msgStoreStatsSets[getIndex()]; + tmStatsSet.msgAppendSizeStats.update(msgSize); + tmStatsSet.msgAppendDurStats.update(writeDlt); } /** @@ -133,18 +136,12 @@ public class MsgStoreStatsHolder { /** * Add cache timeout flush statistic. * - * @param flushTime the flush time - * @param isTimeoutFlush whether is timeout flush */ - public void addCacheTimeoutFlush(long flushTime, boolean isTimeoutFlush) { + public void addCacheTimeoutFlush() { if (isClosed) { return; } - MsgStoreStatsItemSet tmStatsSet = msgStoreStatsSets[getIndex()]; - tmStatsSet.cacheSyncStats.update(flushTime); - if (isTimeoutFlush) { - tmStatsSet.cacheTimeFullCnt.incValue(); - } + msgStoreStatsSets[getIndex()].cacheTimeFullCnt.incValue(); } /** @@ -161,12 +158,14 @@ public class MsgStoreStatsHolder { * @param isMsgCntFull whether the cached message count is full * @param isCacheTimeFull whether the cached time is full * @param isForceMetadata whether force push metadata + * @param dltAppendTime the duration of the append operation */ public void addFileFlushStatsInfo(int msgCnt, int msgIndexSize, int msgDataSize, long flushedMsgCnt, long flushedDataSize, boolean isDataSegFlush, boolean isIndexSegFlush, boolean isDataSizeFull, boolean isMsgCntFull, - boolean isCacheTimeFull, boolean isForceMetadata) { + boolean isCacheTimeFull, boolean isForceMetadata, + long dltAppendTime) { if (isClosed) { return; } @@ -174,6 +173,7 @@ public class MsgStoreStatsHolder { tmStatsSet.fileAccumMsgCnt.addValue(msgCnt); tmStatsSet.fileAccumMsgIndexSize.addValue(msgIndexSize); tmStatsSet.fileAccumMsgDataSize.addValue(msgDataSize); + tmStatsSet.fileFlusheDurStats.update(dltAppendTime); if (flushedDataSize > 0) { tmStatsSet.fileFlushedDataSize.update(flushedDataSize); } @@ -401,6 +401,7 @@ public class MsgStoreStatsHolder { statsMap.put("isClosed", (isStatsClosed() ? 1L : 0L)); // for memory store statsSet.msgAppendSizeStats.getValue(statsMap, false); + statsSet.msgAppendDurStats.getValue(statsMap, false); statsMap.put(statsSet.msgAppendFailCnt.getFullName(), statsSet.msgAppendFailCnt.getValue()); statsMap.put(statsSet.cacheDataSizeFullCnt.getFullName(), @@ -415,7 +416,6 @@ public class MsgStoreStatsHolder { statsSet.cacheFlushPendingCnt.getValue()); statsMap.put(statsSet.cacheReAllocCnt.getFullName(), statsSet.cacheReAllocCnt.getValue()); - statsSet.cacheSyncStats.getValue(statsMap, false); // for file store statsMap.put(statsSet.fileAccumMsgCnt.getFullName(), statsSet.fileAccumMsgCnt.getValue()); @@ -423,6 +423,7 @@ public class MsgStoreStatsHolder { statsSet.fileAccumMsgDataSize.getValue()); statsMap.put(statsSet.fileAccumMsgIndexSize.getFullName(), statsSet.fileAccumMsgIndexSize.getValue()); + statsSet.fileFlusheDurStats.getValue(statsMap, false); statsSet.fileFlushedDataSize.getValue(statsMap, false); statsSet.fileFlushedMsgCnt.getValue(statsMap, false); statsMap.put(statsSet.fileDataSegAddCnt.getFullName(), @@ -460,6 +461,8 @@ public class MsgStoreStatsHolder { .append("\":\"").append(statsSet.resetTime.getStrSinceTime()) .append("\",\"isClosed\":").append(isStatsClosed()).append(","); statsSet.msgAppendSizeStats.getValue(strBuff, false); + strBuff.append(","); + statsSet.msgAppendDurStats.getValue(strBuff, false); strBuff.append(",\"").append(statsSet.msgAppendFailCnt.getFullName()) .append("\":").append(statsSet.msgAppendFailCnt.getValue()) .append(",\"").append(statsSet.cacheDataSizeFullCnt.getFullName()) @@ -474,15 +477,15 @@ public class MsgStoreStatsHolder { .append("\":").append(statsSet.cacheReAllocCnt.getValue()) .append(",\"").append(statsSet.cacheDataSizeFullCnt.getFullName()) .append("\":").append(statsSet.cacheDataSizeFullCnt.getValue()) - .append(","); - statsSet.cacheSyncStats.getValue(strBuff, false); - strBuff.append(",\"").append(statsSet.fileAccumMsgCnt.getFullName()) + .append(",\"").append(statsSet.fileAccumMsgCnt.getFullName()) .append("\":").append(statsSet.fileAccumMsgCnt.getValue()) .append(",\"").append(statsSet.fileAccumMsgDataSize.getFullName()) .append("\":").append(statsSet.fileAccumMsgDataSize.getValue()) .append(",\"").append(statsSet.fileAccumMsgIndexSize.getFullName()) .append("\":").append(statsSet.fileAccumMsgIndexSize.getValue()) .append(","); + statsSet.fileFlusheDurStats.getValue(strBuff, false); + strBuff.append(","); statsSet.fileFlushedDataSize.getValue(strBuff, false); strBuff.append(","); statsSet.fileFlushedMsgCnt.getValue(strBuff, false); @@ -519,6 +522,9 @@ public class MsgStoreStatsHolder { // The message size received protected final SimpleHistogram msgAppendSizeStats = new SimpleHistogram("msg_append_size", null); + // The duration of message written + protected final ESTHistogram msgAppendDurStats = + new ESTHistogram("msg_append_dlt", null); // The count of message append failures protected final LongStatsCounter msgAppendFailCnt = new LongStatsCounter("msg_append_fail", null); @@ -540,9 +546,6 @@ public class MsgStoreStatsHolder { // The cache re-alloc count protected final LongStatsCounter cacheReAllocCnt = new LongStatsCounter("cache_realloc", null); - // The cache persistence duration statistics - protected final ESTHistogram cacheSyncStats = - new ESTHistogram("cache_flush_dlt", null); // for file store // The accumulate message count statistics protected final LongStatsCounter fileAccumMsgCnt = @@ -553,6 +556,9 @@ public class MsgStoreStatsHolder { // The accumulate message index statistics protected final LongStatsCounter fileAccumMsgIndexSize = new LongStatsCounter("file_total_index_size", null); + // statistics on file flush time + protected final ESTHistogram fileFlusheDurStats = + new ESTHistogram("file_flush_dlt", null); // The data flushed statistics protected final SimpleHistogram fileFlushedDataSize = new SimpleHistogram("file_flush_data_size", null); @@ -594,6 +600,7 @@ public class MsgStoreStatsHolder { // for file metric items this.fileAccumMsgCnt.clear(); this.fileAccumMsgDataSize.clear(); + this.fileFlusheDurStats.clear(); this.fileFlushedDataSize.clear(); this.fileAccumMsgIndexSize.clear(); this.fileFlushedMsgCnt.clear(); @@ -605,6 +612,7 @@ public class MsgStoreStatsHolder { this.fileCachedTimeFullCnt.clear(); // for message metric items this.msgAppendSizeStats.clear(); + this.msgAppendDurStats.clear(); this.msgAppendFailCnt.clear(); // for cache metric items this.cacheDataSizeFullCnt.clear(); @@ -613,7 +621,6 @@ public class MsgStoreStatsHolder { this.cacheFlushPendingCnt.clear(); this.cacheReAllocCnt.clear(); this.cacheTimeFullCnt.clear(); - this.cacheSyncStats.clear(); this.resetTime.reset(); } } diff --git a/inlong-tubemq/tubemq-server/src/test/java/org/apache/inlong/tubemq/server/broker/stats/MsgStoreStatsHolderTest.java b/inlong-tubemq/tubemq-server/src/test/java/org/apache/inlong/tubemq/server/broker/stats/MsgStoreStatsHolderTest.java index c7a81312f..cea7777bb 100644 --- a/inlong-tubemq/tubemq-server/src/test/java/org/apache/inlong/tubemq/server/broker/stats/MsgStoreStatsHolderTest.java +++ b/inlong-tubemq/tubemq-server/src/test/java/org/apache/inlong/tubemq/server/broker/stats/MsgStoreStatsHolderTest.java @@ -32,12 +32,11 @@ public class MsgStoreStatsHolderTest { public void testMemPartStats() { MsgStoreStatsHolder msgStoreStatsHolder = new MsgStoreStatsHolder(); // case 1, not started - msgStoreStatsHolder.addMsgWriteSuccess(50); + msgStoreStatsHolder.addMsgWriteSuccess(50, 2); msgStoreStatsHolder.addCacheFullType(true, false, false); msgStoreStatsHolder.addCacheFullType(false, true, false); msgStoreStatsHolder.addCacheFullType(false, false, true); - msgStoreStatsHolder.addCacheTimeoutFlush(50, false); - msgStoreStatsHolder.addCacheTimeoutFlush(10, true); + msgStoreStatsHolder.addCacheTimeoutFlush(); msgStoreStatsHolder.addMsgWriteFailure(); Map<String, Long> retMap = new LinkedHashMap<>(); msgStoreStatsHolder.getValue(retMap); @@ -52,9 +51,6 @@ public class MsgStoreStatsHolderTest { Assert.assertEquals(0, retMap.get("cache_time_full").longValue()); Assert.assertEquals(0, retMap.get("cache_flush_pending").longValue()); Assert.assertEquals(0, retMap.get("cache_realloc").longValue()); - Assert.assertEquals(0, retMap.get("cache_flush_dlt_count").longValue()); - Assert.assertEquals(Long.MIN_VALUE, retMap.get("cache_flush_dlt_max").longValue()); - Assert.assertEquals(Long.MAX_VALUE, retMap.get("cache_flush_dlt_min").longValue()); Assert.assertNotNull(retMap.get("end_time")); retMap.clear(); // get content by StringBuilder @@ -66,16 +62,14 @@ public class MsgStoreStatsHolderTest { // System.out.println("getAllMemStatsInfo : " + strBuff); strBuff.delete(0, strBuff.length()); // case 2 started - msgStoreStatsHolder.addMsgWriteSuccess(50); - msgStoreStatsHolder.addMsgWriteSuccess(500); - msgStoreStatsHolder.addMsgWriteSuccess(5); + msgStoreStatsHolder.addMsgWriteSuccess(50, 10); + msgStoreStatsHolder.addMsgWriteSuccess(500, 20); + msgStoreStatsHolder.addMsgWriteSuccess(5, 3); msgStoreStatsHolder.addCacheFullType(true, false, false); msgStoreStatsHolder.addCacheFullType(false, true, false); msgStoreStatsHolder.addCacheFullType(false, false, true); - msgStoreStatsHolder.addCacheTimeoutFlush(50, false); - msgStoreStatsHolder.addCacheTimeoutFlush(10, true); - msgStoreStatsHolder.addCacheTimeoutFlush(100, true); - msgStoreStatsHolder.addCacheTimeoutFlush(1, false); + msgStoreStatsHolder.addCacheTimeoutFlush(); + msgStoreStatsHolder.addCacheTimeoutFlush(); msgStoreStatsHolder.addMsgWriteFailure(); msgStoreStatsHolder.addMsgWriteFailure(); msgStoreStatsHolder.addCacheReAlloc(); @@ -95,16 +89,9 @@ public class MsgStoreStatsHolderTest { Assert.assertEquals(2, retMap.get("cache_time_full").longValue()); Assert.assertEquals(3, retMap.get("cache_flush_pending").longValue()); Assert.assertEquals(2, retMap.get("cache_realloc").longValue()); - Assert.assertEquals(4, retMap.get("cache_flush_dlt_count").longValue()); - Assert.assertEquals(100, retMap.get("cache_flush_dlt_max").longValue()); - Assert.assertEquals(1, retMap.get("cache_flush_dlt_min").longValue()); - Assert.assertEquals(1, retMap.get("cache_flush_dlt_cell_0t2").longValue()); - Assert.assertEquals(1, retMap.get("cache_flush_dlt_cell_8t16").longValue()); - Assert.assertEquals(1, retMap.get("cache_flush_dlt_cell_32t64").longValue()); - Assert.assertEquals(1, retMap.get("cache_flush_dlt_cell_64t128").longValue()); Assert.assertNotNull(retMap.get("end_time")); msgStoreStatsHolder.getMsgStoreStatsInfo(false, strBuff); - // System.out.println("\n the second is : " + strBuff.toString()); + System.out.println("\n the second is : " + strBuff.toString()); strBuff.delete(0, strBuff.length()); } @@ -114,7 +101,7 @@ public class MsgStoreStatsHolderTest { // case 1, not started msgStoreStatsHolder.addFileFlushStatsInfo(2, 30, 500, 0, 0, true, true, - true, true, true, true); + true, true, true, true, 5); msgStoreStatsHolder.addFileTimeoutFlushStats(1, 500, false); Map<String, Long> retMap = new LinkedHashMap<>(); msgStoreStatsHolder.getValue(retMap); @@ -133,6 +120,9 @@ public class MsgStoreStatsHolderTest { Assert.assertEquals(0, retMap.get("file_data_full").longValue()); Assert.assertEquals(0, retMap.get("file_count_full").longValue()); Assert.assertEquals(0, retMap.get("file_time_full").longValue()); + Assert.assertEquals(0, retMap.get("file_flush_dlt_count").longValue()); + Assert.assertEquals(Long.MAX_VALUE, retMap.get("file_flush_dlt_min").longValue()); + Assert.assertEquals(Long.MIN_VALUE, retMap.get("file_flush_dlt_max").longValue()); Assert.assertNotNull(retMap.get("end_time")); retMap.clear(); // get content by StringBuilder @@ -159,27 +149,36 @@ public class MsgStoreStatsHolderTest { Assert.assertEquals(0, retMap.get("file_data_full").longValue()); Assert.assertEquals(0, retMap.get("file_count_full").longValue()); Assert.assertEquals(1, retMap.get("file_time_full").longValue()); + Assert.assertEquals(0, retMap.get("file_flush_dlt_count").longValue()); + Assert.assertEquals(Long.MAX_VALUE, retMap.get("file_flush_dlt_min").longValue()); + Assert.assertEquals(Long.MIN_VALUE, retMap.get("file_flush_dlt_max").longValue()); Assert.assertNotNull(retMap.get("end_time")); retMap.clear(); // get value when started msgStoreStatsHolder.addFileFlushStatsInfo(1, 1, 1, 1, 1, true, false, - false, false, false, false); + false, false, false, false, + 6); msgStoreStatsHolder.addFileFlushStatsInfo(6, 6, 6, 6, 6, false, false, - false, false, false, true); + false, false, false, true, + 100); msgStoreStatsHolder.addFileFlushStatsInfo(2, 2, 2, 2, 2, false, true, - false, false, false, false); + false, false, false, false, + 10); msgStoreStatsHolder.addFileFlushStatsInfo(5, 5, 5, 5, 5, false, false, - false, false, true, false); + false, false, true, false, + 200); msgStoreStatsHolder.addFileFlushStatsInfo(4, 4, 4, 4, 4, false, false, - false, true, false, false); + false, true, false, false, + 50); msgStoreStatsHolder.addFileFlushStatsInfo(3, 3, 3, 3, 3, false, false, - true, false, false, false); + true, false, false, false, + 150); msgStoreStatsHolder.snapShort(retMap); Assert.assertNotNull(retMap.get("reset_time")); Assert.assertEquals(21, retMap.get("file_total_msg_cnt").longValue()); @@ -197,6 +196,9 @@ public class MsgStoreStatsHolderTest { Assert.assertEquals(2, retMap.get("file_meta_flush").longValue()); Assert.assertEquals(1, retMap.get("file_count_full").longValue()); Assert.assertEquals(2, retMap.get("file_time_full").longValue()); + Assert.assertEquals(6, retMap.get("file_flush_dlt_count").longValue()); + Assert.assertEquals(6, retMap.get("file_flush_dlt_min").longValue()); + Assert.assertEquals(200, retMap.get("file_flush_dlt_max").longValue()); Assert.assertNotNull(retMap.get("end_time")); retMap.clear(); msgStoreStatsHolder.getMsgStoreStatsInfo(true, strBuff);