This is an automated email from the ASF dual-hosted git repository.

lizhimin pushed a commit to branch develop
in repository https://gitbox.apache.org/repos/asf/rocketmq.git


The following commit(s) were added to refs/heads/develop by this push:
     new e0db6545b0 [ISSUE #9112] Speedup revive scan in Pop Consumption and 
support server side reset offset (#9113)
e0db6545b0 is described below

commit e0db6545b00770e8f41bf55cfb560ac79f9a17c9
Author: lizhimins <707364...@qq.com>
AuthorDate: Wed Jan 8 23:00:22 2025 +0800

    [ISSUE #9112] Speedup revive scan in Pop Consumption and support server 
side reset offset (#9113)
---
 .../rocketmq/broker/pop/PopConsumerKVStore.java    | 11 ++-
 .../broker/pop/PopConsumerRocksdbStore.java        | 20 +++--
 .../rocketmq/broker/pop/PopConsumerService.java    | 60 ++++++++-----
 .../broker/pop/PopConsumerRocksdbStoreTest.java    | 97 +++++++++++++++++++++-
 .../broker/pop/PopConsumerServiceTest.java         | 13 +--
 .../common/config/AbstractRocksDBStorage.java      |  6 +-
 6 files changed, 162 insertions(+), 45 deletions(-)

diff --git 
a/broker/src/main/java/org/apache/rocketmq/broker/pop/PopConsumerKVStore.java 
b/broker/src/main/java/org/apache/rocketmq/broker/pop/PopConsumerKVStore.java
index 5569abe3db..33072d699b 100644
--- 
a/broker/src/main/java/org/apache/rocketmq/broker/pop/PopConsumerKVStore.java
+++ 
b/broker/src/main/java/org/apache/rocketmq/broker/pop/PopConsumerKVStore.java
@@ -49,10 +49,13 @@ public interface PopConsumerKVStore {
     void deleteRecords(List<PopConsumerRecord> consumerRecordList);
 
     /**
-     * Scans and returns a list of expired consumer records before the current 
time.
-     * @param currentTime The current revive checkpoint timestamp.
+     * Scans and returns a list of expired consumer records within the 
specified time range.
+     * @param lowerTime The start time (inclusive) of the time range to 
search, in milliseconds.
+     * @param upperTime The end time (exclusive) of the time range to search, 
in milliseconds.
      * @param maxCount The maximum number of records to return.
-     * @return A list of expired consumer records.
+     *                 Even if more records match the criteria, only this many 
will be returned.
+     * @return A list of expired consumer records within the specified time 
range.
+     *         If no matching records are found, an empty list is returned.
      */
-    List<PopConsumerRecord> scanExpiredRecords(long currentTime, int maxCount);
+    List<PopConsumerRecord> scanExpiredRecords(long lowerTime, long upperTime, 
int maxCount);
 }
diff --git 
a/broker/src/main/java/org/apache/rocketmq/broker/pop/PopConsumerRocksdbStore.java
 
b/broker/src/main/java/org/apache/rocketmq/broker/pop/PopConsumerRocksdbStore.java
index f2a617b408..7ab276a418 100644
--- 
a/broker/src/main/java/org/apache/rocketmq/broker/pop/PopConsumerRocksdbStore.java
+++ 
b/broker/src/main/java/org/apache/rocketmq/broker/pop/PopConsumerRocksdbStore.java
@@ -28,9 +28,11 @@ import org.rocksdb.ColumnFamilyDescriptor;
 import org.rocksdb.ColumnFamilyHandle;
 import org.rocksdb.ColumnFamilyOptions;
 import org.rocksdb.CompactRangeOptions;
+import org.rocksdb.ReadOptions;
 import org.rocksdb.RocksDB;
 import org.rocksdb.RocksDBException;
 import org.rocksdb.RocksIterator;
+import org.rocksdb.Slice;
 import org.rocksdb.WriteBatch;
 import org.rocksdb.WriteOptions;
 import org.slf4j.Logger;
@@ -43,7 +45,7 @@ public class PopConsumerRocksdbStore extends 
AbstractRocksDBStorage implements P
 
     private WriteOptions writeOptions;
     private WriteOptions deleteOptions;
-    private ColumnFamilyHandle columnFamilyHandle;
+    protected ColumnFamilyHandle columnFamilyHandle;
 
     public PopConsumerRocksdbStore(String filePath) {
         super(filePath);
@@ -60,8 +62,7 @@ public class PopConsumerRocksdbStore extends 
AbstractRocksDBStorage implements P
         this.writeOptions.setNoSlowdown(false);
 
         this.deleteOptions = new WriteOptions();
-        this.deleteOptions.setSync(false);
-        this.deleteOptions.setLowPri(true);
+        this.deleteOptions.setSync(true);
         this.deleteOptions.setDisableWAL(false);
         this.deleteOptions.setNoSlowdown(false);
 
@@ -135,18 +136,19 @@ public class PopConsumerRocksdbStore extends 
AbstractRocksDBStorage implements P
     }
 
     @Override
-    public List<PopConsumerRecord> scanExpiredRecords(long currentTime, int 
maxCount) {
+    // https://github.com/facebook/rocksdb/issues/10300
+    public List<PopConsumerRecord> scanExpiredRecords(long lower, long upper, 
int maxCount) {
         // In RocksDB, we can use SstPartitionerFixedPrefixFactory in cfOptions
         // and new ColumnFamilyOptions().useFixedLengthPrefixExtractor() to
         // configure prefix indexing to improve the performance of scans.
         // However, in the current implementation, this is not the bottleneck.
         List<PopConsumerRecord> consumerRecordList = new ArrayList<>();
-        try (RocksIterator iterator = db.newIterator(this.columnFamilyHandle)) 
{
-            iterator.seekToFirst();
+        try (ReadOptions scanOptions = new ReadOptions()
+            .setIterateLowerBound(new 
Slice(ByteBuffer.allocate(Long.BYTES).putLong(lower).array()))
+            .setIterateUpperBound(new 
Slice(ByteBuffer.allocate(Long.BYTES).putLong(upper).array()));
+             RocksIterator iterator = db.newIterator(this.columnFamilyHandle, 
scanOptions)) {
+            
iterator.seek(ByteBuffer.allocate(Long.BYTES).putLong(lower).array());
             while (iterator.isValid() && consumerRecordList.size() < maxCount) 
{
-                if (ByteBuffer.wrap(iterator.key()).getLong() > currentTime) {
-                    break;
-                }
                 
consumerRecordList.add(PopConsumerRecord.decode(iterator.value()));
                 iterator.next();
             }
diff --git 
a/broker/src/main/java/org/apache/rocketmq/broker/pop/PopConsumerService.java 
b/broker/src/main/java/org/apache/rocketmq/broker/pop/PopConsumerService.java
index 647e3d6ff7..1f0125412a 100644
--- 
a/broker/src/main/java/org/apache/rocketmq/broker/pop/PopConsumerService.java
+++ 
b/broker/src/main/java/org/apache/rocketmq/broker/pop/PopConsumerService.java
@@ -75,6 +75,7 @@ public class PopConsumerService extends ServiceThread {
     private final AtomicBoolean consumerRunning;
     private final BrokerConfig brokerConfig;
     private final BrokerController brokerController;
+    private final AtomicLong currentTime;
     private final AtomicLong lastCleanupLockTime;
     private final PopConsumerCache popConsumerCache;
     private final PopConsumerKVStore popConsumerStore;
@@ -88,6 +89,7 @@ public class PopConsumerService extends ServiceThread {
 
         this.consumerRunning = new AtomicBoolean(false);
         this.requestCountTable = new ConcurrentHashMap<>();
+        this.currentTime = new AtomicLong(TimeUnit.SECONDS.toMillis(3));
         this.lastCleanupLockTime = new AtomicLong(System.currentTimeMillis());
         this.consumerLockService = new 
PopConsumerLockService(TimeUnit.MINUTES.toMillis(2));
         this.popConsumerStore = new PopConsumerRocksdbStore(Paths.get(
@@ -195,12 +197,27 @@ public class PopConsumerService extends ServiceThread {
         return context;
     }
 
+    public Long getPopOffset(String groupId, String topicId, int queueId) {
+        Long resetOffset =
+            
this.brokerController.getConsumerOffsetManager().queryThenEraseResetOffset(topicId,
 groupId, queueId);
+        if (resetOffset != null) {
+            this.clearCache(groupId, topicId, queueId);
+            
this.brokerController.getConsumerOrderInfoManager().clearBlock(topicId, 
groupId, queueId);
+            this.brokerController.getConsumerOffsetManager()
+                .commitOffset("ResetPopOffset", groupId, topicId, queueId, 
resetOffset);
+        }
+        return resetOffset;
+    }
+
     public CompletableFuture<GetMessageResult> getMessageAsync(String 
clientHost,
         String groupId, String topicId, int queueId, long offset, int 
batchSize, MessageFilter filter) {
 
         log.debug("PopConsumerService getMessageAsync, groupId={}, topicId={}, 
queueId={}, offset={}, batchSize={}, filter={}",
             groupId, topicId, offset, queueId, batchSize, filter != null);
 
+        Long resetOffset = this.getPopOffset(groupId, topicId, queueId);
+        final long currentOffset = resetOffset != null ? resetOffset : offset;
+
         CompletableFuture<GetMessageResult> getMessageFuture =
             brokerController.getMessageStore().getMessageAsync(groupId, 
topicId, queueId, offset, batchSize, filter);
 
@@ -223,7 +240,7 @@ public class PopConsumerService extends ServiceThread {
 
                 log.warn("PopConsumerService getMessageAsync, initial offset 
because store is no correct, " +
                         "groupId={}, topicId={}, queueId={}, batchSize={}, 
offset={}->{}",
-                    groupId, topicId, queueId, batchSize, offset, 
result.getNextBeginOffset());
+                    groupId, topicId, queueId, batchSize, currentOffset, 
result.getNextBeginOffset());
 
                 return brokerController.getMessageStore().getMessageAsync(
                     groupId, topicId, queueId, result.getNextBeginOffset(), 
batchSize, filter);
@@ -482,10 +499,12 @@ public class PopConsumerService extends ServiceThread {
         }
     }
 
-    public long revive(long currentTime, int maxCount) {
+    public long revive(AtomicLong currentTime, int maxCount) {
         Stopwatch stopwatch = Stopwatch.createStarted();
-        List<PopConsumerRecord> consumerRecords =
-            this.popConsumerStore.scanExpiredRecords(currentTime, maxCount);
+        long upperTime = System.currentTimeMillis() - 50L;
+        List<PopConsumerRecord> consumerRecords = 
this.popConsumerStore.scanExpiredRecords(
+                currentTime.get() - TimeUnit.SECONDS.toMillis(3), upperTime, 
maxCount);
+        long scanCostTime = stopwatch.elapsed(TimeUnit.MILLISECONDS);
         Queue<PopConsumerRecord> failureList = new LinkedBlockingQueue<>();
         List<CompletableFuture<?>> futureList = new 
ArrayList<>(consumerRecords.size());
 
@@ -497,9 +516,9 @@ public class PopConsumerService extends ServiceThread {
                         long backoffInterval = 1000L * 
REWRITE_INTERVALS_IN_SECONDS[
                             Math.min(REWRITE_INTERVALS_IN_SECONDS.length, 
record.getAttemptTimes())];
                         long nextInvisibleTime = record.getInvisibleTime() + 
backoffInterval;
-                        PopConsumerRecord retryRecord = new 
PopConsumerRecord(record.getPopTime(), record.getGroupId(),
-                            record.getTopicId(), record.getQueueId(), 
record.getRetryFlag(), nextInvisibleTime,
-                            record.getOffset(), record.getAttemptId());
+                        PopConsumerRecord retryRecord = new 
PopConsumerRecord(System.currentTimeMillis(),
+                            record.getGroupId(), record.getTopicId(), 
record.getQueueId(),
+                            record.getRetryFlag(), nextInvisibleTime, 
record.getOffset(), record.getAttemptId());
                         retryRecord.setAttemptTimes(record.getAttemptTimes() + 
1);
                         failureList.add(retryRecord);
                         log.warn("PopConsumerService revive backoff retry, 
record={}", retryRecord);
@@ -513,14 +532,20 @@ public class PopConsumerService extends ServiceThread {
         CompletableFuture.allOf(futureList.toArray(new 
CompletableFuture[0])).join();
         this.popConsumerStore.writeRecords(new ArrayList<>(failureList));
         this.popConsumerStore.deleteRecords(consumerRecords);
+        currentTime.set(consumerRecords.isEmpty() ?
+            upperTime : consumerRecords.get(consumerRecords.size() - 
1).getVisibilityTimeout());
 
         if (brokerConfig.isEnablePopBufferMerge()) {
-            log.info("PopConsumerService, key size={}, cache size={}, revive 
count={}, failure count={}, cost={}ms",
-                popConsumerCache.getCacheKeySize(), 
popConsumerCache.getCacheSize(), consumerRecords.size(),
-                failureList.size(), stopwatch.elapsed(TimeUnit.MILLISECONDS));
+            log.info("PopConsumerService, key size={}, cache size={}, revive 
count={}, failure count={}, " +
+                    "behindInMillis={}, scanInMillis={}, costInMillis={}",
+                popConsumerCache.getCacheKeySize(), 
popConsumerCache.getCacheSize(),
+                consumerRecords.size(), failureList.size(), upperTime - 
currentTime.get(),
+                scanCostTime, stopwatch.elapsed(TimeUnit.MILLISECONDS));
         } else {
-            log.info("PopConsumerService, revive count={}, failure count={}, 
cost={}ms",
-                consumerRecords.size(), failureList.size(), 
stopwatch.elapsed(TimeUnit.MILLISECONDS));
+            log.info("PopConsumerService, revive count={}, failure count={}, " 
+
+                    "behindInMillis={}, scanInMillis={}, costInMillis={}",
+                consumerRecords.size(), failureList.size(), upperTime - 
currentTime.get(),
+                scanCostTime, stopwatch.elapsed(TimeUnit.MILLISECONDS));
         }
 
         return consumerRecords.size();
@@ -588,11 +613,6 @@ public class PopConsumerService extends ServiceThread {
         PutMessageResult putMessageResult =
             
brokerController.getEscapeBridge().putMessageToSpecificQueue(msgInner);
 
-        if (brokerConfig.isEnablePopLog()) {
-            log.debug("PopConsumerService revive retry msg, put status={}, 
ck={}, delay={}ms",
-                putMessageResult, JSON.toJSONString(record), 
System.currentTimeMillis() - record.getVisibilityTimeout());
-        }
-
         if (putMessageResult.getAppendMessageResult() == null ||
             putMessageResult.getAppendMessageResult().getStatus() != 
AppendMessageStatus.PUT_OK) {
             log.error("PopConsumerService revive retry msg error, put 
status={}, ck={}, delay={}ms",
@@ -616,7 +636,7 @@ public class PopConsumerService extends ServiceThread {
         while (true) {
             try {
                 List<PopConsumerRecord> consumerRecords = 
this.popConsumerStore.scanExpiredRecords(
-                    Long.MAX_VALUE, 
brokerConfig.getPopReviveMaxReturnSizePerRead());
+                    0, Long.MAX_VALUE, 
brokerConfig.getPopReviveMaxReturnSizePerRead());
                 if (consumerRecords == null || consumerRecords.isEmpty()) {
                     break;
                 }
@@ -695,7 +715,7 @@ public class PopConsumerService extends ServiceThread {
         while (!isStopped()) {
             try {
                 // to prevent concurrency issues during read and write 
operations
-                long reviveCount = this.revive(System.currentTimeMillis() - 
50L,
+                long reviveCount = this.revive(this.currentTime,
                     brokerConfig.getPopReviveMaxReturnSizePerRead());
 
                 long current = System.currentTimeMillis();
@@ -704,7 +724,7 @@ public class PopConsumerService extends ServiceThread {
                     this.lastCleanupLockTime.set(current);
                 }
 
-                if (reviveCount == 0) {
+                if (reviveCount < 
brokerConfig.getPopReviveMaxReturnSizePerRead()) {
                     this.waitForRunning(500);
                 }
             } catch (Exception e) {
diff --git 
a/broker/src/test/java/org/apache/rocketmq/broker/pop/PopConsumerRocksdbStoreTest.java
 
b/broker/src/test/java/org/apache/rocketmq/broker/pop/PopConsumerRocksdbStoreTest.java
index 5facaeb55f..3c2b190d1c 100644
--- 
a/broker/src/test/java/org/apache/rocketmq/broker/pop/PopConsumerRocksdbStoreTest.java
+++ 
b/broker/src/test/java/org/apache/rocketmq/broker/pop/PopConsumerRocksdbStoreTest.java
@@ -16,18 +16,26 @@
  */
 package org.apache.rocketmq.broker.pop;
 
+import com.google.common.base.Stopwatch;
 import java.io.File;
 import java.io.IOException;
+import java.lang.reflect.Field;
 import java.nio.file.Paths;
+import java.util.ArrayList;
 import java.util.List;
 import java.util.UUID;
 import java.util.concurrent.TimeUnit;
 import java.util.stream.Collectors;
 import java.util.stream.IntStream;
 import org.apache.commons.io.FileUtils;
+import org.apache.rocketmq.common.config.AbstractRocksDBStorage;
 import org.apache.rocketmq.common.constant.LoggerName;
+import org.apache.rocketmq.tieredstore.util.MessageStoreUtil;
 import org.junit.Assert;
+import org.junit.Ignore;
 import org.junit.Test;
+import org.rocksdb.RocksDB;
+import org.rocksdb.RocksIterator;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -85,18 +93,101 @@ public class PopConsumerRocksdbStoreTest {
             .collect(Collectors.toList()));
 
         List<PopConsumerRecord> consumerRecords =
-            consumerStore.scanExpiredRecords(20002, 2);
+            consumerStore.scanExpiredRecords(0, 20002, 2);
         Assert.assertEquals(2, consumerRecords.size());
         consumerStore.deleteRecords(consumerRecords);
 
-        consumerRecords = consumerStore.scanExpiredRecords(20002, 2);
+        consumerRecords = consumerStore.scanExpiredRecords(0, 20003, 2);
         Assert.assertEquals(1, consumerRecords.size());
         consumerStore.deleteRecords(consumerRecords);
 
-        consumerRecords = consumerStore.scanExpiredRecords(20004, 3);
+        consumerRecords = consumerStore.scanExpiredRecords(0, 20005, 3);
         Assert.assertEquals(2, consumerRecords.size());
 
         consumerStore.shutdown();
         deleteStoreDirectory(filePath);
     }
+
+    private long getDirectorySizeRecursive(File directory) {
+        long size = 0;
+        File[] files = directory.listFiles();
+        if (files != null) {
+            for (File file : files) {
+                if (file.isFile()) {
+                    size += file.length();
+                } else if (file.isDirectory()) {
+                    size += getDirectorySizeRecursive(file);
+                }
+            }
+        }
+        return size;
+    }
+
+    @Test
+    @Ignore
+    @SuppressWarnings("ConstantValue")
+    public void tombstoneDeletionTest() throws IllegalAccessException, 
NoSuchFieldException {
+        PopConsumerRocksdbStore rocksdbStore = new 
PopConsumerRocksdbStore(getRandomStorePath());
+        rocksdbStore.start();
+
+        int iterCount = 1000 * 1000;
+        boolean useSeekFirstDelete = false;
+        Field dbField = AbstractRocksDBStorage.class.getDeclaredField("db");
+        dbField.setAccessible(true);
+        RocksDB rocksDB = (RocksDB) dbField.get(rocksdbStore);
+
+        long currentTime = 0L;
+        Stopwatch stopwatch = Stopwatch.createStarted();
+        for (int i = 0; i < iterCount; i++) {
+            List<PopConsumerRecord> records = new ArrayList<>();
+            for (int j = 0; j < 1000; j++) {
+                PopConsumerRecord record = getConsumerRecord();
+                record.setPopTime((long) i * iterCount + j);
+                record.setGroupId("GroupTest");
+                record.setTopicId("TopicTest");
+                record.setQueueId(i % 10);
+                record.setRetryFlag(0);
+                record.setInvisibleTime(TimeUnit.SECONDS.toMillis(30));
+                record.setOffset(i);
+                records.add(record);
+            }
+            rocksdbStore.writeRecords(records);
+
+            long start = stopwatch.elapsed(TimeUnit.MILLISECONDS);
+            List<PopConsumerRecord> deleteList = new ArrayList<>();
+            if (useSeekFirstDelete) {
+                try (RocksIterator iterator = 
rocksDB.newIterator(rocksdbStore.columnFamilyHandle)) {
+                    iterator.seekToFirst();
+                    if (i % 10 == 0) {
+                        long fileSize = getDirectorySizeRecursive(new 
File(rocksdbStore.getFilePath()));
+                        log.info("DirectorySize={}, Cost={}ms",
+                            MessageStoreUtil.toHumanReadable(fileSize), 
stopwatch.elapsed(TimeUnit.MILLISECONDS) - start);
+                    }
+                    while (iterator.isValid() && deleteList.size() < 1024) {
+                        
deleteList.add(PopConsumerRecord.decode(iterator.value()));
+                        iterator.next();
+                    }
+                }
+            } else {
+                long upper = System.currentTimeMillis();
+                deleteList = rocksdbStore.scanExpiredRecords(currentTime, 
upper, 800);
+                if (!deleteList.isEmpty()) {
+                    currentTime = deleteList.get(deleteList.size() - 
1).getVisibilityTimeout();
+                }
+                long scanCost = stopwatch.elapsed(TimeUnit.MILLISECONDS) - 
start;
+                if (i % 100 == 0) {
+                    long fileSize = getDirectorySizeRecursive(new 
File(rocksdbStore.getFilePath()));
+                    long seekTime = stopwatch.elapsed(TimeUnit.MILLISECONDS);
+                    try (RocksIterator iterator = 
rocksDB.newIterator(rocksdbStore.columnFamilyHandle)) {
+                        iterator.seekToFirst();
+                    }
+                    log.info("DirectorySize={}, Cost={}ms, 
SeekFirstCost={}ms", MessageStoreUtil.toHumanReadable(fileSize),
+                        scanCost, stopwatch.elapsed(TimeUnit.MILLISECONDS) - 
seekTime);
+                }
+            }
+            rocksdbStore.deleteRecords(deleteList);
+        }
+        rocksdbStore.shutdown();
+        deleteStoreDirectory(rocksdbStore.getFilePath());
+    }
 }
\ No newline at end of file
diff --git 
a/broker/src/test/java/org/apache/rocketmq/broker/pop/PopConsumerServiceTest.java
 
b/broker/src/test/java/org/apache/rocketmq/broker/pop/PopConsumerServiceTest.java
index b77c170c8c..2b930d5852 100644
--- 
a/broker/src/test/java/org/apache/rocketmq/broker/pop/PopConsumerServiceTest.java
+++ 
b/broker/src/test/java/org/apache/rocketmq/broker/pop/PopConsumerServiceTest.java
@@ -25,6 +25,7 @@ import java.util.List;
 import java.util.UUID;
 import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicLong;
 import java.util.stream.Collectors;
 import java.util.stream.IntStream;
 import org.apache.commons.io.FileUtils;
@@ -371,17 +372,17 @@ public class PopConsumerServiceTest {
 
         Mockito.doReturn(CompletableFuture.completedFuture(null))
             
.when(consumerServiceSpy).getMessageAsync(any(PopConsumerRecord.class));
-        consumerServiceSpy.revive(20 * 1000, 1);
+        consumerServiceSpy.revive(new AtomicLong(20 * 1000), 1);
 
         Mockito.doReturn(CompletableFuture.completedFuture(
                 Triple.of(null, "GetMessageResult is null", false)))
             
.when(consumerServiceSpy).getMessageAsync(any(PopConsumerRecord.class));
-        consumerServiceSpy.revive(20 * 1000, 1);
+        consumerServiceSpy.revive(new AtomicLong(20 * 1000), 1);
 
         Mockito.doReturn(CompletableFuture.completedFuture(
                 Triple.of(Mockito.mock(MessageExt.class), null, false)))
             
.when(consumerServiceSpy).getMessageAsync(any(PopConsumerRecord.class));
-        consumerServiceSpy.revive(20 * 1000, 1);
+        consumerServiceSpy.revive(new AtomicLong(20 * 1000), 1);
         consumerService.shutdown();
     }
 
@@ -412,11 +413,11 @@ public class PopConsumerServiceTest {
         long visibleTimestamp = popTime + invisibleTime;
 
         // revive fails
-        Assert.assertEquals(1, consumerServiceSpy.revive(visibleTimestamp, 1));
+        Assert.assertEquals(1, consumerServiceSpy.revive(new 
AtomicLong(visibleTimestamp), 1));
         // should be invisible now
-        Assert.assertEquals(0, 
consumerService.getPopConsumerStore().scanExpiredRecords(visibleTimestamp, 
1).size());
+        Assert.assertEquals(0, 
consumerService.getPopConsumerStore().scanExpiredRecords(0, visibleTimestamp, 
1).size());
         // will be visible again in 10 seconds
-        Assert.assertEquals(1, 
consumerService.getPopConsumerStore().scanExpiredRecords(visibleTimestamp + 10 
* 1000, 1).size());
+        Assert.assertEquals(1, 
consumerService.getPopConsumerStore().scanExpiredRecords(visibleTimestamp, 
System.currentTimeMillis() + visibleTimestamp + 10 * 1000, 1).size());
 
         consumerService.shutdown();
     }
diff --git 
a/common/src/main/java/org/apache/rocketmq/common/config/AbstractRocksDBStorage.java
 
b/common/src/main/java/org/apache/rocketmq/common/config/AbstractRocksDBStorage.java
index 48ba4b8086..347d92304d 100644
--- 
a/common/src/main/java/org/apache/rocketmq/common/config/AbstractRocksDBStorage.java
+++ 
b/common/src/main/java/org/apache/rocketmq/common/config/AbstractRocksDBStorage.java
@@ -365,7 +365,7 @@ public abstract class AbstractRocksDBStorage {
         }
         if (postLoad()) {
             this.loaded = true;
-            LOGGER.info("RocksDB[{}] starts OK", this.dbPath);
+            LOGGER.info("RocksDB [{}] starts OK", this.dbPath);
             this.closed = false;
             return true;
         } else {
@@ -437,9 +437,9 @@ public abstract class AbstractRocksDBStorage {
             this.options = null;
 
             this.loaded = false;
-            LOGGER.info("shutdown OK. {}", this.dbPath);
+            LOGGER.info("RocksDB shutdown OK. {}", this.dbPath);
         } catch (Exception e) {
-            LOGGER.error("shutdown Failed. {}", this.dbPath, e);
+            LOGGER.error("RocksDB shutdown failed. {}", this.dbPath, e);
             return false;
         }
         return true;

Reply via email to