This is an automated email from the ASF dual-hosted git repository. lollipop pushed a commit to branch develop in repository https://gitbox.apache.org/repos/asf/rocketmq.git
The following commit(s) were added to refs/heads/develop by this push: new a3afb05cb3 [ISSUE #9025] [RIP-73] Modify Pop Consumption rocksdb init config (#9100) a3afb05cb3 is described below commit a3afb05cb32f6c63fe8af5aef7a86ad1a4d5797f Author: lizhimins <707364...@qq.com> AuthorDate: Fri Jan 3 19:39:35 2025 +0800 [ISSUE #9025] [RIP-73] Modify Pop Consumption rocksdb init config (#9100) --- .../broker/pop/PopConsumerRocksdbStore.java | 11 ++--- .../store/rocksdb/RocksDBOptionsFactory.java | 51 ++++++++++++++++++++++ 2 files changed, 54 insertions(+), 8 deletions(-) diff --git a/broker/src/main/java/org/apache/rocketmq/broker/pop/PopConsumerRocksdbStore.java b/broker/src/main/java/org/apache/rocketmq/broker/pop/PopConsumerRocksdbStore.java index 9c940034a9..f2a617b408 100644 --- a/broker/src/main/java/org/apache/rocketmq/broker/pop/PopConsumerRocksdbStore.java +++ b/broker/src/main/java/org/apache/rocketmq/broker/pop/PopConsumerRocksdbStore.java @@ -28,7 +28,6 @@ import org.rocksdb.ColumnFamilyDescriptor; import org.rocksdb.ColumnFamilyHandle; import org.rocksdb.ColumnFamilyOptions; import org.rocksdb.CompactRangeOptions; -import org.rocksdb.DBOptions; import org.rocksdb.RocksDB; import org.rocksdb.RocksDBException; import org.rocksdb.RocksIterator; @@ -63,7 +62,7 @@ public class PopConsumerRocksdbStore extends AbstractRocksDBStorage implements P this.deleteOptions = new WriteOptions(); this.deleteOptions.setSync(false); this.deleteOptions.setLowPri(true); - this.deleteOptions.setDisableWAL(true); + this.deleteOptions.setDisableWAL(false); this.deleteOptions.setNoSlowdown(false); this.compactRangeOptions = new CompactRangeOptions(); @@ -83,15 +82,11 @@ public class PopConsumerRocksdbStore extends AbstractRocksDBStorage implements P initOptions(); // init column family here - ColumnFamilyOptions defaultOptions = new ColumnFamilyOptions().optimizeForSmallDb(); - ColumnFamilyOptions popStateOptions = new ColumnFamilyOptions().optimizeForSmallDb(); + ColumnFamilyOptions defaultOptions = RocksDBOptionsFactory.createPopCFOptions(); + ColumnFamilyOptions popStateOptions = RocksDBOptionsFactory.createPopCFOptions(); this.cfOptions.add(defaultOptions); this.cfOptions.add(popStateOptions); - this.options = new DBOptions() - .setCreateIfMissing(true) - .setCreateMissingColumnFamilies(true); - List<ColumnFamilyDescriptor> cfDescriptors = new ArrayList<>(); cfDescriptors.add(new ColumnFamilyDescriptor(RocksDB.DEFAULT_COLUMN_FAMILY, defaultOptions)); cfDescriptors.add(new ColumnFamilyDescriptor(COLUMN_FAMILY_NAME, popStateOptions)); diff --git a/store/src/main/java/org/apache/rocketmq/store/rocksdb/RocksDBOptionsFactory.java b/store/src/main/java/org/apache/rocketmq/store/rocksdb/RocksDBOptionsFactory.java index 2fac3bf485..5687d6a222 100644 --- a/store/src/main/java/org/apache/rocketmq/store/rocksdb/RocksDBOptionsFactory.java +++ b/store/src/main/java/org/apache/rocketmq/store/rocksdb/RocksDBOptionsFactory.java @@ -131,6 +131,57 @@ public class RocksDBOptionsFactory { setInplaceUpdateSupport(true); } + public static ColumnFamilyOptions createPopCFOptions() { + BlockBasedTableConfig blockBasedTableConfig = new BlockBasedTableConfig() + .setFormatVersion(5) + .setIndexType(IndexType.kBinarySearch) + .setDataBlockIndexType(DataBlockIndexType.kDataBlockBinaryAndHash) + .setDataBlockHashTableUtilRatio(0.75) + .setBlockSize(32 * SizeUnit.KB) + .setMetadataBlockSize(4 * SizeUnit.KB) + .setFilterPolicy(new BloomFilter(16, false)) + .setCacheIndexAndFilterBlocks(false) + .setCacheIndexAndFilterBlocksWithHighPriority(true) + .setPinL0FilterAndIndexBlocksInCache(false) + .setPinTopLevelIndexAndFilter(true) + .setBlockCache(new LRUCache(1024 * SizeUnit.MB, 8, false)) + .setWholeKeyFiltering(true); + + CompactionOptionsUniversal compactionOption = new CompactionOptionsUniversal() + .setSizeRatio(100) + .setMaxSizeAmplificationPercent(25) + .setAllowTrivialMove(true) + .setMinMergeWidth(2) + .setMaxMergeWidth(Integer.MAX_VALUE) + .setStopStyle(CompactionStopStyle.CompactionStopStyleTotalSize) + .setCompressionSizePercent(-1); + + //noinspection resource + return new ColumnFamilyOptions() + .setMaxWriteBufferNumber(4) + .setWriteBufferSize(128 * SizeUnit.MB) + .setMinWriteBufferNumberToMerge(1) + .setTableFormatConfig(blockBasedTableConfig) + .setMemTableConfig(new SkipListMemTableConfig()) + .setCompressionType(CompressionType.NO_COMPRESSION) + .setBottommostCompressionType(CompressionType.NO_COMPRESSION) + .setNumLevels(7) + .setCompactionPriority(CompactionPriority.MinOverlappingRatio) + .setCompactionStyle(CompactionStyle.UNIVERSAL) + .setCompactionOptionsUniversal(compactionOption) + .setMaxCompactionBytes(100 * SizeUnit.GB) + .setSoftPendingCompactionBytesLimit(100 * SizeUnit.GB) + .setHardPendingCompactionBytesLimit(256 * SizeUnit.GB) + .setLevel0FileNumCompactionTrigger(2) + .setLevel0SlowdownWritesTrigger(8) + .setLevel0StopWritesTrigger(10) + .setTargetFileSizeBase(256 * SizeUnit.MB) + .setTargetFileSizeMultiplier(2) + .setMergeOperator(new StringAppendOperator()) + .setReportBgIoStats(true) + .setOptimizeFiltersForHits(true); + } + /** * Create a rocksdb db options, the user must take care to close it after closing db. * @return