This is an automated email from the ASF dual-hosted git repository. lizhimin pushed a commit to branch develop in repository https://gitbox.apache.org/repos/asf/rocketmq.git
The following commit(s) were added to refs/heads/develop by this push: new e0db6545b0 [ISSUE #9112] Speedup revive scan in Pop Consumption and support server side reset offset (#9113) e0db6545b0 is described below commit e0db6545b00770e8f41bf55cfb560ac79f9a17c9 Author: lizhimins <707364...@qq.com> AuthorDate: Wed Jan 8 23:00:22 2025 +0800 [ISSUE #9112] Speedup revive scan in Pop Consumption and support server side reset offset (#9113) --- .../rocketmq/broker/pop/PopConsumerKVStore.java | 11 ++- .../broker/pop/PopConsumerRocksdbStore.java | 20 +++-- .../rocketmq/broker/pop/PopConsumerService.java | 60 ++++++++----- .../broker/pop/PopConsumerRocksdbStoreTest.java | 97 +++++++++++++++++++++- .../broker/pop/PopConsumerServiceTest.java | 13 +-- .../common/config/AbstractRocksDBStorage.java | 6 +- 6 files changed, 162 insertions(+), 45 deletions(-) diff --git a/broker/src/main/java/org/apache/rocketmq/broker/pop/PopConsumerKVStore.java b/broker/src/main/java/org/apache/rocketmq/broker/pop/PopConsumerKVStore.java index 5569abe3db..33072d699b 100644 --- a/broker/src/main/java/org/apache/rocketmq/broker/pop/PopConsumerKVStore.java +++ b/broker/src/main/java/org/apache/rocketmq/broker/pop/PopConsumerKVStore.java @@ -49,10 +49,13 @@ public interface PopConsumerKVStore { void deleteRecords(List<PopConsumerRecord> consumerRecordList); /** - * Scans and returns a list of expired consumer records before the current time. - * @param currentTime The current revive checkpoint timestamp. + * Scans and returns a list of expired consumer records within the specified time range. + * @param lowerTime The start time (inclusive) of the time range to search, in milliseconds. + * @param upperTime The end time (exclusive) of the time range to search, in milliseconds. * @param maxCount The maximum number of records to return. - * @return A list of expired consumer records. + * Even if more records match the criteria, only this many will be returned. + * @return A list of expired consumer records within the specified time range. + * If no matching records are found, an empty list is returned. */ - List<PopConsumerRecord> scanExpiredRecords(long currentTime, int maxCount); + List<PopConsumerRecord> scanExpiredRecords(long lowerTime, long upperTime, int maxCount); } diff --git a/broker/src/main/java/org/apache/rocketmq/broker/pop/PopConsumerRocksdbStore.java b/broker/src/main/java/org/apache/rocketmq/broker/pop/PopConsumerRocksdbStore.java index f2a617b408..7ab276a418 100644 --- a/broker/src/main/java/org/apache/rocketmq/broker/pop/PopConsumerRocksdbStore.java +++ b/broker/src/main/java/org/apache/rocketmq/broker/pop/PopConsumerRocksdbStore.java @@ -28,9 +28,11 @@ import org.rocksdb.ColumnFamilyDescriptor; import org.rocksdb.ColumnFamilyHandle; import org.rocksdb.ColumnFamilyOptions; import org.rocksdb.CompactRangeOptions; +import org.rocksdb.ReadOptions; import org.rocksdb.RocksDB; import org.rocksdb.RocksDBException; import org.rocksdb.RocksIterator; +import org.rocksdb.Slice; import org.rocksdb.WriteBatch; import org.rocksdb.WriteOptions; import org.slf4j.Logger; @@ -43,7 +45,7 @@ public class PopConsumerRocksdbStore extends AbstractRocksDBStorage implements P private WriteOptions writeOptions; private WriteOptions deleteOptions; - private ColumnFamilyHandle columnFamilyHandle; + protected ColumnFamilyHandle columnFamilyHandle; public PopConsumerRocksdbStore(String filePath) { super(filePath); @@ -60,8 +62,7 @@ public class PopConsumerRocksdbStore extends AbstractRocksDBStorage implements P this.writeOptions.setNoSlowdown(false); this.deleteOptions = new WriteOptions(); - this.deleteOptions.setSync(false); - this.deleteOptions.setLowPri(true); + this.deleteOptions.setSync(true); this.deleteOptions.setDisableWAL(false); this.deleteOptions.setNoSlowdown(false); @@ -135,18 +136,19 @@ public class PopConsumerRocksdbStore extends AbstractRocksDBStorage implements P } @Override - public List<PopConsumerRecord> scanExpiredRecords(long currentTime, int maxCount) { + // https://github.com/facebook/rocksdb/issues/10300 + public List<PopConsumerRecord> scanExpiredRecords(long lower, long upper, int maxCount) { // In RocksDB, we can use SstPartitionerFixedPrefixFactory in cfOptions // and new ColumnFamilyOptions().useFixedLengthPrefixExtractor() to // configure prefix indexing to improve the performance of scans. // However, in the current implementation, this is not the bottleneck. List<PopConsumerRecord> consumerRecordList = new ArrayList<>(); - try (RocksIterator iterator = db.newIterator(this.columnFamilyHandle)) { - iterator.seekToFirst(); + try (ReadOptions scanOptions = new ReadOptions() + .setIterateLowerBound(new Slice(ByteBuffer.allocate(Long.BYTES).putLong(lower).array())) + .setIterateUpperBound(new Slice(ByteBuffer.allocate(Long.BYTES).putLong(upper).array())); + RocksIterator iterator = db.newIterator(this.columnFamilyHandle, scanOptions)) { + iterator.seek(ByteBuffer.allocate(Long.BYTES).putLong(lower).array()); while (iterator.isValid() && consumerRecordList.size() < maxCount) { - if (ByteBuffer.wrap(iterator.key()).getLong() > currentTime) { - break; - } consumerRecordList.add(PopConsumerRecord.decode(iterator.value())); iterator.next(); } diff --git a/broker/src/main/java/org/apache/rocketmq/broker/pop/PopConsumerService.java b/broker/src/main/java/org/apache/rocketmq/broker/pop/PopConsumerService.java index 647e3d6ff7..1f0125412a 100644 --- a/broker/src/main/java/org/apache/rocketmq/broker/pop/PopConsumerService.java +++ b/broker/src/main/java/org/apache/rocketmq/broker/pop/PopConsumerService.java @@ -75,6 +75,7 @@ public class PopConsumerService extends ServiceThread { private final AtomicBoolean consumerRunning; private final BrokerConfig brokerConfig; private final BrokerController brokerController; + private final AtomicLong currentTime; private final AtomicLong lastCleanupLockTime; private final PopConsumerCache popConsumerCache; private final PopConsumerKVStore popConsumerStore; @@ -88,6 +89,7 @@ public class PopConsumerService extends ServiceThread { this.consumerRunning = new AtomicBoolean(false); this.requestCountTable = new ConcurrentHashMap<>(); + this.currentTime = new AtomicLong(TimeUnit.SECONDS.toMillis(3)); this.lastCleanupLockTime = new AtomicLong(System.currentTimeMillis()); this.consumerLockService = new PopConsumerLockService(TimeUnit.MINUTES.toMillis(2)); this.popConsumerStore = new PopConsumerRocksdbStore(Paths.get( @@ -195,12 +197,27 @@ public class PopConsumerService extends ServiceThread { return context; } + public Long getPopOffset(String groupId, String topicId, int queueId) { + Long resetOffset = + this.brokerController.getConsumerOffsetManager().queryThenEraseResetOffset(topicId, groupId, queueId); + if (resetOffset != null) { + this.clearCache(groupId, topicId, queueId); + this.brokerController.getConsumerOrderInfoManager().clearBlock(topicId, groupId, queueId); + this.brokerController.getConsumerOffsetManager() + .commitOffset("ResetPopOffset", groupId, topicId, queueId, resetOffset); + } + return resetOffset; + } + public CompletableFuture<GetMessageResult> getMessageAsync(String clientHost, String groupId, String topicId, int queueId, long offset, int batchSize, MessageFilter filter) { log.debug("PopConsumerService getMessageAsync, groupId={}, topicId={}, queueId={}, offset={}, batchSize={}, filter={}", groupId, topicId, offset, queueId, batchSize, filter != null); + Long resetOffset = this.getPopOffset(groupId, topicId, queueId); + final long currentOffset = resetOffset != null ? resetOffset : offset; + CompletableFuture<GetMessageResult> getMessageFuture = brokerController.getMessageStore().getMessageAsync(groupId, topicId, queueId, offset, batchSize, filter); @@ -223,7 +240,7 @@ public class PopConsumerService extends ServiceThread { log.warn("PopConsumerService getMessageAsync, initial offset because store is no correct, " + "groupId={}, topicId={}, queueId={}, batchSize={}, offset={}->{}", - groupId, topicId, queueId, batchSize, offset, result.getNextBeginOffset()); + groupId, topicId, queueId, batchSize, currentOffset, result.getNextBeginOffset()); return brokerController.getMessageStore().getMessageAsync( groupId, topicId, queueId, result.getNextBeginOffset(), batchSize, filter); @@ -482,10 +499,12 @@ public class PopConsumerService extends ServiceThread { } } - public long revive(long currentTime, int maxCount) { + public long revive(AtomicLong currentTime, int maxCount) { Stopwatch stopwatch = Stopwatch.createStarted(); - List<PopConsumerRecord> consumerRecords = - this.popConsumerStore.scanExpiredRecords(currentTime, maxCount); + long upperTime = System.currentTimeMillis() - 50L; + List<PopConsumerRecord> consumerRecords = this.popConsumerStore.scanExpiredRecords( + currentTime.get() - TimeUnit.SECONDS.toMillis(3), upperTime, maxCount); + long scanCostTime = stopwatch.elapsed(TimeUnit.MILLISECONDS); Queue<PopConsumerRecord> failureList = new LinkedBlockingQueue<>(); List<CompletableFuture<?>> futureList = new ArrayList<>(consumerRecords.size()); @@ -497,9 +516,9 @@ public class PopConsumerService extends ServiceThread { long backoffInterval = 1000L * REWRITE_INTERVALS_IN_SECONDS[ Math.min(REWRITE_INTERVALS_IN_SECONDS.length, record.getAttemptTimes())]; long nextInvisibleTime = record.getInvisibleTime() + backoffInterval; - PopConsumerRecord retryRecord = new PopConsumerRecord(record.getPopTime(), record.getGroupId(), - record.getTopicId(), record.getQueueId(), record.getRetryFlag(), nextInvisibleTime, - record.getOffset(), record.getAttemptId()); + PopConsumerRecord retryRecord = new PopConsumerRecord(System.currentTimeMillis(), + record.getGroupId(), record.getTopicId(), record.getQueueId(), + record.getRetryFlag(), nextInvisibleTime, record.getOffset(), record.getAttemptId()); retryRecord.setAttemptTimes(record.getAttemptTimes() + 1); failureList.add(retryRecord); log.warn("PopConsumerService revive backoff retry, record={}", retryRecord); @@ -513,14 +532,20 @@ public class PopConsumerService extends ServiceThread { CompletableFuture.allOf(futureList.toArray(new CompletableFuture[0])).join(); this.popConsumerStore.writeRecords(new ArrayList<>(failureList)); this.popConsumerStore.deleteRecords(consumerRecords); + currentTime.set(consumerRecords.isEmpty() ? + upperTime : consumerRecords.get(consumerRecords.size() - 1).getVisibilityTimeout()); if (brokerConfig.isEnablePopBufferMerge()) { - log.info("PopConsumerService, key size={}, cache size={}, revive count={}, failure count={}, cost={}ms", - popConsumerCache.getCacheKeySize(), popConsumerCache.getCacheSize(), consumerRecords.size(), - failureList.size(), stopwatch.elapsed(TimeUnit.MILLISECONDS)); + log.info("PopConsumerService, key size={}, cache size={}, revive count={}, failure count={}, " + + "behindInMillis={}, scanInMillis={}, costInMillis={}", + popConsumerCache.getCacheKeySize(), popConsumerCache.getCacheSize(), + consumerRecords.size(), failureList.size(), upperTime - currentTime.get(), + scanCostTime, stopwatch.elapsed(TimeUnit.MILLISECONDS)); } else { - log.info("PopConsumerService, revive count={}, failure count={}, cost={}ms", - consumerRecords.size(), failureList.size(), stopwatch.elapsed(TimeUnit.MILLISECONDS)); + log.info("PopConsumerService, revive count={}, failure count={}, " + + "behindInMillis={}, scanInMillis={}, costInMillis={}", + consumerRecords.size(), failureList.size(), upperTime - currentTime.get(), + scanCostTime, stopwatch.elapsed(TimeUnit.MILLISECONDS)); } return consumerRecords.size(); @@ -588,11 +613,6 @@ public class PopConsumerService extends ServiceThread { PutMessageResult putMessageResult = brokerController.getEscapeBridge().putMessageToSpecificQueue(msgInner); - if (brokerConfig.isEnablePopLog()) { - log.debug("PopConsumerService revive retry msg, put status={}, ck={}, delay={}ms", - putMessageResult, JSON.toJSONString(record), System.currentTimeMillis() - record.getVisibilityTimeout()); - } - if (putMessageResult.getAppendMessageResult() == null || putMessageResult.getAppendMessageResult().getStatus() != AppendMessageStatus.PUT_OK) { log.error("PopConsumerService revive retry msg error, put status={}, ck={}, delay={}ms", @@ -616,7 +636,7 @@ public class PopConsumerService extends ServiceThread { while (true) { try { List<PopConsumerRecord> consumerRecords = this.popConsumerStore.scanExpiredRecords( - Long.MAX_VALUE, brokerConfig.getPopReviveMaxReturnSizePerRead()); + 0, Long.MAX_VALUE, brokerConfig.getPopReviveMaxReturnSizePerRead()); if (consumerRecords == null || consumerRecords.isEmpty()) { break; } @@ -695,7 +715,7 @@ public class PopConsumerService extends ServiceThread { while (!isStopped()) { try { // to prevent concurrency issues during read and write operations - long reviveCount = this.revive(System.currentTimeMillis() - 50L, + long reviveCount = this.revive(this.currentTime, brokerConfig.getPopReviveMaxReturnSizePerRead()); long current = System.currentTimeMillis(); @@ -704,7 +724,7 @@ public class PopConsumerService extends ServiceThread { this.lastCleanupLockTime.set(current); } - if (reviveCount == 0) { + if (reviveCount < brokerConfig.getPopReviveMaxReturnSizePerRead()) { this.waitForRunning(500); } } catch (Exception e) { diff --git a/broker/src/test/java/org/apache/rocketmq/broker/pop/PopConsumerRocksdbStoreTest.java b/broker/src/test/java/org/apache/rocketmq/broker/pop/PopConsumerRocksdbStoreTest.java index 5facaeb55f..3c2b190d1c 100644 --- a/broker/src/test/java/org/apache/rocketmq/broker/pop/PopConsumerRocksdbStoreTest.java +++ b/broker/src/test/java/org/apache/rocketmq/broker/pop/PopConsumerRocksdbStoreTest.java @@ -16,18 +16,26 @@ */ package org.apache.rocketmq.broker.pop; +import com.google.common.base.Stopwatch; import java.io.File; import java.io.IOException; +import java.lang.reflect.Field; import java.nio.file.Paths; +import java.util.ArrayList; import java.util.List; import java.util.UUID; import java.util.concurrent.TimeUnit; import java.util.stream.Collectors; import java.util.stream.IntStream; import org.apache.commons.io.FileUtils; +import org.apache.rocketmq.common.config.AbstractRocksDBStorage; import org.apache.rocketmq.common.constant.LoggerName; +import org.apache.rocketmq.tieredstore.util.MessageStoreUtil; import org.junit.Assert; +import org.junit.Ignore; import org.junit.Test; +import org.rocksdb.RocksDB; +import org.rocksdb.RocksIterator; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -85,18 +93,101 @@ public class PopConsumerRocksdbStoreTest { .collect(Collectors.toList())); List<PopConsumerRecord> consumerRecords = - consumerStore.scanExpiredRecords(20002, 2); + consumerStore.scanExpiredRecords(0, 20002, 2); Assert.assertEquals(2, consumerRecords.size()); consumerStore.deleteRecords(consumerRecords); - consumerRecords = consumerStore.scanExpiredRecords(20002, 2); + consumerRecords = consumerStore.scanExpiredRecords(0, 20003, 2); Assert.assertEquals(1, consumerRecords.size()); consumerStore.deleteRecords(consumerRecords); - consumerRecords = consumerStore.scanExpiredRecords(20004, 3); + consumerRecords = consumerStore.scanExpiredRecords(0, 20005, 3); Assert.assertEquals(2, consumerRecords.size()); consumerStore.shutdown(); deleteStoreDirectory(filePath); } + + private long getDirectorySizeRecursive(File directory) { + long size = 0; + File[] files = directory.listFiles(); + if (files != null) { + for (File file : files) { + if (file.isFile()) { + size += file.length(); + } else if (file.isDirectory()) { + size += getDirectorySizeRecursive(file); + } + } + } + return size; + } + + @Test + @Ignore + @SuppressWarnings("ConstantValue") + public void tombstoneDeletionTest() throws IllegalAccessException, NoSuchFieldException { + PopConsumerRocksdbStore rocksdbStore = new PopConsumerRocksdbStore(getRandomStorePath()); + rocksdbStore.start(); + + int iterCount = 1000 * 1000; + boolean useSeekFirstDelete = false; + Field dbField = AbstractRocksDBStorage.class.getDeclaredField("db"); + dbField.setAccessible(true); + RocksDB rocksDB = (RocksDB) dbField.get(rocksdbStore); + + long currentTime = 0L; + Stopwatch stopwatch = Stopwatch.createStarted(); + for (int i = 0; i < iterCount; i++) { + List<PopConsumerRecord> records = new ArrayList<>(); + for (int j = 0; j < 1000; j++) { + PopConsumerRecord record = getConsumerRecord(); + record.setPopTime((long) i * iterCount + j); + record.setGroupId("GroupTest"); + record.setTopicId("TopicTest"); + record.setQueueId(i % 10); + record.setRetryFlag(0); + record.setInvisibleTime(TimeUnit.SECONDS.toMillis(30)); + record.setOffset(i); + records.add(record); + } + rocksdbStore.writeRecords(records); + + long start = stopwatch.elapsed(TimeUnit.MILLISECONDS); + List<PopConsumerRecord> deleteList = new ArrayList<>(); + if (useSeekFirstDelete) { + try (RocksIterator iterator = rocksDB.newIterator(rocksdbStore.columnFamilyHandle)) { + iterator.seekToFirst(); + if (i % 10 == 0) { + long fileSize = getDirectorySizeRecursive(new File(rocksdbStore.getFilePath())); + log.info("DirectorySize={}, Cost={}ms", + MessageStoreUtil.toHumanReadable(fileSize), stopwatch.elapsed(TimeUnit.MILLISECONDS) - start); + } + while (iterator.isValid() && deleteList.size() < 1024) { + deleteList.add(PopConsumerRecord.decode(iterator.value())); + iterator.next(); + } + } + } else { + long upper = System.currentTimeMillis(); + deleteList = rocksdbStore.scanExpiredRecords(currentTime, upper, 800); + if (!deleteList.isEmpty()) { + currentTime = deleteList.get(deleteList.size() - 1).getVisibilityTimeout(); + } + long scanCost = stopwatch.elapsed(TimeUnit.MILLISECONDS) - start; + if (i % 100 == 0) { + long fileSize = getDirectorySizeRecursive(new File(rocksdbStore.getFilePath())); + long seekTime = stopwatch.elapsed(TimeUnit.MILLISECONDS); + try (RocksIterator iterator = rocksDB.newIterator(rocksdbStore.columnFamilyHandle)) { + iterator.seekToFirst(); + } + log.info("DirectorySize={}, Cost={}ms, SeekFirstCost={}ms", MessageStoreUtil.toHumanReadable(fileSize), + scanCost, stopwatch.elapsed(TimeUnit.MILLISECONDS) - seekTime); + } + } + rocksdbStore.deleteRecords(deleteList); + } + rocksdbStore.shutdown(); + deleteStoreDirectory(rocksdbStore.getFilePath()); + } } \ No newline at end of file diff --git a/broker/src/test/java/org/apache/rocketmq/broker/pop/PopConsumerServiceTest.java b/broker/src/test/java/org/apache/rocketmq/broker/pop/PopConsumerServiceTest.java index b77c170c8c..2b930d5852 100644 --- a/broker/src/test/java/org/apache/rocketmq/broker/pop/PopConsumerServiceTest.java +++ b/broker/src/test/java/org/apache/rocketmq/broker/pop/PopConsumerServiceTest.java @@ -25,6 +25,7 @@ import java.util.List; import java.util.UUID; import java.util.concurrent.CompletableFuture; import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicLong; import java.util.stream.Collectors; import java.util.stream.IntStream; import org.apache.commons.io.FileUtils; @@ -371,17 +372,17 @@ public class PopConsumerServiceTest { Mockito.doReturn(CompletableFuture.completedFuture(null)) .when(consumerServiceSpy).getMessageAsync(any(PopConsumerRecord.class)); - consumerServiceSpy.revive(20 * 1000, 1); + consumerServiceSpy.revive(new AtomicLong(20 * 1000), 1); Mockito.doReturn(CompletableFuture.completedFuture( Triple.of(null, "GetMessageResult is null", false))) .when(consumerServiceSpy).getMessageAsync(any(PopConsumerRecord.class)); - consumerServiceSpy.revive(20 * 1000, 1); + consumerServiceSpy.revive(new AtomicLong(20 * 1000), 1); Mockito.doReturn(CompletableFuture.completedFuture( Triple.of(Mockito.mock(MessageExt.class), null, false))) .when(consumerServiceSpy).getMessageAsync(any(PopConsumerRecord.class)); - consumerServiceSpy.revive(20 * 1000, 1); + consumerServiceSpy.revive(new AtomicLong(20 * 1000), 1); consumerService.shutdown(); } @@ -412,11 +413,11 @@ public class PopConsumerServiceTest { long visibleTimestamp = popTime + invisibleTime; // revive fails - Assert.assertEquals(1, consumerServiceSpy.revive(visibleTimestamp, 1)); + Assert.assertEquals(1, consumerServiceSpy.revive(new AtomicLong(visibleTimestamp), 1)); // should be invisible now - Assert.assertEquals(0, consumerService.getPopConsumerStore().scanExpiredRecords(visibleTimestamp, 1).size()); + Assert.assertEquals(0, consumerService.getPopConsumerStore().scanExpiredRecords(0, visibleTimestamp, 1).size()); // will be visible again in 10 seconds - Assert.assertEquals(1, consumerService.getPopConsumerStore().scanExpiredRecords(visibleTimestamp + 10 * 1000, 1).size()); + Assert.assertEquals(1, consumerService.getPopConsumerStore().scanExpiredRecords(visibleTimestamp, System.currentTimeMillis() + visibleTimestamp + 10 * 1000, 1).size()); consumerService.shutdown(); } diff --git a/common/src/main/java/org/apache/rocketmq/common/config/AbstractRocksDBStorage.java b/common/src/main/java/org/apache/rocketmq/common/config/AbstractRocksDBStorage.java index 48ba4b8086..347d92304d 100644 --- a/common/src/main/java/org/apache/rocketmq/common/config/AbstractRocksDBStorage.java +++ b/common/src/main/java/org/apache/rocketmq/common/config/AbstractRocksDBStorage.java @@ -365,7 +365,7 @@ public abstract class AbstractRocksDBStorage { } if (postLoad()) { this.loaded = true; - LOGGER.info("RocksDB[{}] starts OK", this.dbPath); + LOGGER.info("RocksDB [{}] starts OK", this.dbPath); this.closed = false; return true; } else { @@ -437,9 +437,9 @@ public abstract class AbstractRocksDBStorage { this.options = null; this.loaded = false; - LOGGER.info("shutdown OK. {}", this.dbPath); + LOGGER.info("RocksDB shutdown OK. {}", this.dbPath); } catch (Exception e) { - LOGGER.error("shutdown Failed. {}", this.dbPath, e); + LOGGER.error("RocksDB shutdown failed. {}", this.dbPath, e); return false; } return true;