lollipopjin commented on code in PR #9256:
URL: https://github.com/apache/rocketmq/pull/9256#discussion_r2053700375


##########
store/src/main/java/org/apache/rocketmq/store/queue/RocksDBConsumeQueue.java:
##########
@@ -480,4 +486,21 @@ public CqUnit nextAndRelease() {
             }
         }
     }
+
+    public void initializeWithOffset(long offset) {
+        log.info("RocksDBConsumeQueue initializeWithOffset topic={}, 
queueId={}, offset={}, oldMax={}, oldMin={}",
+            topic, queueId, offset, getMaxOffsetInQueue(), 
getMinOffsetInQueue());
+        try {
+            // destroy the consume queue will clear the cqUnit and offset
+            consumeQueueStore.destroy(this);
+
+            // update the max and min offset
+            if (offset > 0) {
+                this.consumeQueueStore.updateCqOffset(topic, queueId, 0L, 
offset - 1, true);

Review Comment:
   How about use two methods for updating CQ offset for min and max offsets.



##########
store/src/main/java/org/apache/rocketmq/store/queue/RocksDBConsumeQueue.java:
##########
@@ -480,4 +486,21 @@ public CqUnit nextAndRelease() {
             }
         }
     }
+
+    public void initializeWithOffset(long offset) {

Review Comment:
   This is used for mount --> main in container mode? If so, add some java docs 
here.



##########
store/src/main/java/org/apache/rocketmq/store/queue/RocksDBConsumeQueueStore.java:
##########
@@ -562,4 +546,74 @@ public Long getMaxOffset(String topic, int queueId) throws 
ConsumeQueueException
     public boolean isStopped() {
         return ServiceState.SHUTDOWN_ALREADY == serviceState.get();
     }
+
+    public void updateCqOffset(final String topic, final int queueId, final 
long phyOffset,
+        final long cqOffset, boolean max) throws RocksDBException {
+        this.rocksDBConsumeQueueOffsetTable.updateCqOffset(topic, queueId, 
phyOffset, cqOffset, max);
+    }
+
+    class RocksDBCleanConsumeQueueService {
+        protected long lastPhysicalMinOffset = 0;
+
+        private final double diskSpaceWarningLevelRatio =
+            
Double.parseDouble(System.getProperty("rocketmq.broker.diskSpaceWarningLevelRatio",
 "0.90"));
+
+        private final double diskSpaceCleanForciblyRatio =
+            
Double.parseDouble(System.getProperty("rocketmq.broker.diskSpaceCleanForciblyRatio",
 "0.85"));
+
+        public void run() {
+            try {
+                this.deleteExpiredFiles();
+            } catch (Throwable e) {
+                log.warn(this.getServiceName() + " service has exception. ", 
e);
+            }
+        }
+
+        public String getServiceName() {
+            return messageStore.getBrokerConfig().getIdentifier() + 
ConsumeQueueStore.CleanConsumeQueueService.class.getSimpleName();
+        }
+
+        protected void deleteExpiredFiles() {
+
+            long minOffset = messageStore.getCommitLog().getMinOffset();
+            if (minOffset > this.lastPhysicalMinOffset) {
+                this.lastPhysicalMinOffset = minOffset;
+
+                boolean spaceFull = isSpaceToDelete();
+                boolean timeUp = messageStore.isTimeToDelete();
+                if (spaceFull || timeUp) {
+                    cleanExpired(minOffset);
+                }
+

Review Comment:
   Add some java docs here, to delete the CQ Units whose offset is little than 
min physical offset in commitLog.



##########
store/src/main/java/org/apache/rocketmq/store/queue/RocksDBConsumeQueueStore.java:
##########
@@ -562,4 +546,74 @@ public Long getMaxOffset(String topic, int queueId) throws 
ConsumeQueueException
     public boolean isStopped() {
         return ServiceState.SHUTDOWN_ALREADY == serviceState.get();
     }
+
+    public void updateCqOffset(final String topic, final int queueId, final 
long phyOffset,
+        final long cqOffset, boolean max) throws RocksDBException {
+        this.rocksDBConsumeQueueOffsetTable.updateCqOffset(topic, queueId, 
phyOffset, cqOffset, max);
+    }
+
+    class RocksDBCleanConsumeQueueService {
+        protected long lastPhysicalMinOffset = 0;
+
+        private final double diskSpaceWarningLevelRatio =
+            
Double.parseDouble(System.getProperty("rocketmq.broker.diskSpaceWarningLevelRatio",
 "0.90"));
+
+        private final double diskSpaceCleanForciblyRatio =
+            
Double.parseDouble(System.getProperty("rocketmq.broker.diskSpaceCleanForciblyRatio",
 "0.85"));
+
+        public void run() {
+            try {
+                this.deleteExpiredFiles();
+            } catch (Throwable e) {
+                log.warn(this.getServiceName() + " service has exception. ", 
e);
+            }
+        }
+
+        public String getServiceName() {
+            return messageStore.getBrokerConfig().getIdentifier() + 
ConsumeQueueStore.CleanConsumeQueueService.class.getSimpleName();
+        }
+
+        protected void deleteExpiredFiles() {
+
+            long minOffset = messageStore.getCommitLog().getMinOffset();
+            if (minOffset > this.lastPhysicalMinOffset) {
+                this.lastPhysicalMinOffset = minOffset;
+
+                boolean spaceFull = isSpaceToDelete();
+                boolean timeUp = messageStore.isTimeToDelete();
+                if (spaceFull || timeUp) {
+                    cleanExpired(minOffset);
+                }
+
+                messageStore.getIndexService().deleteExpiredFile(minOffset);
+            }
+        }
+
+        private boolean isSpaceToDelete() {
+            double ratio = messageStoreConfig.getDiskMaxUsedSpaceRatio() / 
100.0;
+
+            String storePathLogics = StorePathConfigHelper
+                
.getStorePathConsumeQueue(messageStoreConfig.getStorePathRootDir());
+            double logicsRatio = 
UtilAll.getDiskPartitionSpaceUsedPercent(storePathLogics);
+            if (logicsRatio > diskSpaceWarningLevelRatio) {
+                boolean diskOk = 
messageStore.getRunningFlags().getAndMakeLogicDiskFull();

Review Comment:
   How about rename to diskMayBeFullSoon ?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@rocketmq.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to