lollipopjin commented on code in PR #9256: URL: https://github.com/apache/rocketmq/pull/9256#discussion_r2053700375
########## store/src/main/java/org/apache/rocketmq/store/queue/RocksDBConsumeQueue.java: ########## @@ -480,4 +486,21 @@ public CqUnit nextAndRelease() { } } } + + public void initializeWithOffset(long offset) { + log.info("RocksDBConsumeQueue initializeWithOffset topic={}, queueId={}, offset={}, oldMax={}, oldMin={}", + topic, queueId, offset, getMaxOffsetInQueue(), getMinOffsetInQueue()); + try { + // destroy the consume queue will clear the cqUnit and offset + consumeQueueStore.destroy(this); + + // update the max and min offset + if (offset > 0) { + this.consumeQueueStore.updateCqOffset(topic, queueId, 0L, offset - 1, true); Review Comment: How about use two methods for updating CQ offset for min and max offsets. ########## store/src/main/java/org/apache/rocketmq/store/queue/RocksDBConsumeQueue.java: ########## @@ -480,4 +486,21 @@ public CqUnit nextAndRelease() { } } } + + public void initializeWithOffset(long offset) { Review Comment: This is used for mount --> main in container mode? If so, add some java docs here. ########## store/src/main/java/org/apache/rocketmq/store/queue/RocksDBConsumeQueueStore.java: ########## @@ -562,4 +546,74 @@ public Long getMaxOffset(String topic, int queueId) throws ConsumeQueueException public boolean isStopped() { return ServiceState.SHUTDOWN_ALREADY == serviceState.get(); } + + public void updateCqOffset(final String topic, final int queueId, final long phyOffset, + final long cqOffset, boolean max) throws RocksDBException { + this.rocksDBConsumeQueueOffsetTable.updateCqOffset(topic, queueId, phyOffset, cqOffset, max); + } + + class RocksDBCleanConsumeQueueService { + protected long lastPhysicalMinOffset = 0; + + private final double diskSpaceWarningLevelRatio = + Double.parseDouble(System.getProperty("rocketmq.broker.diskSpaceWarningLevelRatio", "0.90")); + + private final double diskSpaceCleanForciblyRatio = + Double.parseDouble(System.getProperty("rocketmq.broker.diskSpaceCleanForciblyRatio", "0.85")); + + public void run() { + try { + this.deleteExpiredFiles(); + } catch (Throwable e) { + log.warn(this.getServiceName() + " service has exception. ", e); + } + } + + public String getServiceName() { + return messageStore.getBrokerConfig().getIdentifier() + ConsumeQueueStore.CleanConsumeQueueService.class.getSimpleName(); + } + + protected void deleteExpiredFiles() { + + long minOffset = messageStore.getCommitLog().getMinOffset(); + if (minOffset > this.lastPhysicalMinOffset) { + this.lastPhysicalMinOffset = minOffset; + + boolean spaceFull = isSpaceToDelete(); + boolean timeUp = messageStore.isTimeToDelete(); + if (spaceFull || timeUp) { + cleanExpired(minOffset); + } + Review Comment: Add some java docs here, to delete the CQ Units whose offset is little than min physical offset in commitLog. ########## store/src/main/java/org/apache/rocketmq/store/queue/RocksDBConsumeQueueStore.java: ########## @@ -562,4 +546,74 @@ public Long getMaxOffset(String topic, int queueId) throws ConsumeQueueException public boolean isStopped() { return ServiceState.SHUTDOWN_ALREADY == serviceState.get(); } + + public void updateCqOffset(final String topic, final int queueId, final long phyOffset, + final long cqOffset, boolean max) throws RocksDBException { + this.rocksDBConsumeQueueOffsetTable.updateCqOffset(topic, queueId, phyOffset, cqOffset, max); + } + + class RocksDBCleanConsumeQueueService { + protected long lastPhysicalMinOffset = 0; + + private final double diskSpaceWarningLevelRatio = + Double.parseDouble(System.getProperty("rocketmq.broker.diskSpaceWarningLevelRatio", "0.90")); + + private final double diskSpaceCleanForciblyRatio = + Double.parseDouble(System.getProperty("rocketmq.broker.diskSpaceCleanForciblyRatio", "0.85")); + + public void run() { + try { + this.deleteExpiredFiles(); + } catch (Throwable e) { + log.warn(this.getServiceName() + " service has exception. ", e); + } + } + + public String getServiceName() { + return messageStore.getBrokerConfig().getIdentifier() + ConsumeQueueStore.CleanConsumeQueueService.class.getSimpleName(); + } + + protected void deleteExpiredFiles() { + + long minOffset = messageStore.getCommitLog().getMinOffset(); + if (minOffset > this.lastPhysicalMinOffset) { + this.lastPhysicalMinOffset = minOffset; + + boolean spaceFull = isSpaceToDelete(); + boolean timeUp = messageStore.isTimeToDelete(); + if (spaceFull || timeUp) { + cleanExpired(minOffset); + } + + messageStore.getIndexService().deleteExpiredFile(minOffset); + } + } + + private boolean isSpaceToDelete() { + double ratio = messageStoreConfig.getDiskMaxUsedSpaceRatio() / 100.0; + + String storePathLogics = StorePathConfigHelper + .getStorePathConsumeQueue(messageStoreConfig.getStorePathRootDir()); + double logicsRatio = UtilAll.getDiskPartitionSpaceUsedPercent(storePathLogics); + if (logicsRatio > diskSpaceWarningLevelRatio) { + boolean diskOk = messageStore.getRunningFlags().getAndMakeLogicDiskFull(); Review Comment: How about rename to diskMayBeFullSoon ? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@rocketmq.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org