This is an automated email from the ASF dual-hosted git repository. jinrongtong pushed a commit to branch develop in repository https://gitbox.apache.org/repos/asf/rocketmq.git
The following commit(s) were added to refs/heads/develop by this push: new 9a891f1d49 fix: multiple patches during long running tests for LMQ over RocksDB (#8915) 9a891f1d49 is described below commit 9a891f1d49af0cfe4e384bc28a0bf0eeed02587f Author: Zhanhui Li <lizhan...@apache.org> AuthorDate: Mon Dec 2 10:07:54 2024 +0800 fix: multiple patches during long running tests for LMQ over RocksDB (#8915) * fix: multiple patches during long running tests for LMQ over RocksDB Signed-off-by: Li Zhanhui <lizhan...@gmail.com> * fix: fix a bug in RocksGroupCommitService; remove RocksDBConsumeQueueStore#findConsumeQueueMap override Signed-off-by: Li Zhanhui <lizhan...@gmail.com> * fix: async fsync on RocksDB WAL flush Signed-off-by: Li Zhanhui <lizhan...@gmail.com> * fix: use a dedicated thread to flush and sync RocksDB WAL Signed-off-by: Li Zhanhui <lizhan...@gmail.com> * fix: trigger WAL rolling according to estimated WAL file size Signed-off-by: Li Zhanhui <lizhan...@gmail.com> * chore: add doc, explaining config RocksDB instance flush/sync strategy Signed-off-by: Li Zhanhui <lizhan...@gmail.com> * fix: data-version should be per table Signed-off-by: Li Zhanhui <lizhan...@gmail.com> * fix: test case: RocksdbTransferOffsetAndCqTest Signed-off-by: Li Zhanhui <lizhan...@gmail.com> --------- Signed-off-by: Li Zhanhui <lizhan...@gmail.com> --- .../apache/rocketmq/broker/BrokerController.java | 2 +- .../rocketmq/broker/config/v2/ConfigHelper.java | 4 +- .../rocketmq/broker/config/v2/ConfigStorage.java | 156 ++++++++++++++++++++- .../broker/config/v2/ConsumerOffsetManagerV2.java | 8 +- .../config/v2/SubscriptionGroupManagerV2.java | 6 +- .../broker/config/v2/TopicConfigManagerV2.java | 6 +- .../config/v2/ConsumerOffsetManagerV2Test.java | 19 ++- .../config/v2/SubscriptionGroupManagerV2Test.java | 15 +- .../broker/config/v2/TopicConfigManagerV2Test.java | 25 +++- .../offset/RocksdbTransferOffsetAndCqTest.java | 16 ++- .../common/config/AbstractRocksDBStorage.java | 34 ++--- .../rocketmq/common/config/ConfigHelper.java | 32 +++-- .../common/config/ConfigRocksDBStorage.java | 2 +- .../java/org/apache/rocketmq/store/CommitLog.java | 16 ++- .../rocketmq/store/config/MessageStoreConfig.java | 24 ++++ .../store/queue/RocksDBConsumeQueueStore.java | 95 +++++++++---- .../store/queue/RocksGroupCommitService.java | 103 ++++++++++++++ .../store/rocksdb/RocksDBOptionsFactory.java | 7 +- 18 files changed, 474 insertions(+), 96 deletions(-) diff --git a/broker/src/main/java/org/apache/rocketmq/broker/BrokerController.java b/broker/src/main/java/org/apache/rocketmq/broker/BrokerController.java index 99e5b85d2e..e1edd2f512 100644 --- a/broker/src/main/java/org/apache/rocketmq/broker/BrokerController.java +++ b/broker/src/main/java/org/apache/rocketmq/broker/BrokerController.java @@ -344,7 +344,7 @@ public class BrokerController { this.brokerStatsManager = messageStoreConfig.isEnableLmq() ? new LmqBrokerStatsManager(this.brokerConfig) : new BrokerStatsManager(this.brokerConfig.getBrokerClusterName(), this.brokerConfig.isEnableDetailStat()); this.broadcastOffsetManager = new BroadcastOffsetManager(this); if (ConfigManagerVersion.V2.getVersion().equals(brokerConfig.getConfigManagerVersion())) { - this.configStorage = new ConfigStorage(messageStoreConfig.getStorePathRootDir()); + this.configStorage = new ConfigStorage(messageStoreConfig); this.topicConfigManager = new TopicConfigManagerV2(this, configStorage); this.subscriptionGroupManager = new SubscriptionGroupManagerV2(this, configStorage); this.consumerOffsetManager = new ConsumerOffsetManagerV2(this, configStorage); diff --git a/broker/src/main/java/org/apache/rocketmq/broker/config/v2/ConfigHelper.java b/broker/src/main/java/org/apache/rocketmq/broker/config/v2/ConfigHelper.java index 8183a1f835..29a7c313ba 100644 --- a/broker/src/main/java/org/apache/rocketmq/broker/config/v2/ConfigHelper.java +++ b/broker/src/main/java/org/apache/rocketmq/broker/config/v2/ConfigHelper.java @@ -64,7 +64,7 @@ public class ConfigHelper { return Optional.empty(); } - public static void stampDataVersion(WriteBatch writeBatch, DataVersion dataVersion, long stateMachineVersion) + public static void stampDataVersion(WriteBatch writeBatch, TableId table, DataVersion dataVersion, long stateMachineVersion) throws RocksDBException { // Increase data version dataVersion.nextVersion(stateMachineVersion); @@ -75,7 +75,7 @@ public class ConfigHelper { ByteBuf valueBuf = AbstractRocksDBStorage.POOLED_ALLOCATOR.buffer(Long.BYTES * 3); try { keyBuf.writeByte(TablePrefix.TABLE.getValue()); - keyBuf.writeShort(TableId.CONSUMER_OFFSET.getValue()); + keyBuf.writeShort(table.getValue()); keyBuf.writeByte(RecordPrefix.DATA_VERSION.getValue()); keyBuf.writeBytes(ConfigStorage.DATA_VERSION_KEY_BYTES); valueBuf.writeLong(dataVersion.getStateVersion()); diff --git a/broker/src/main/java/org/apache/rocketmq/broker/config/v2/ConfigStorage.java b/broker/src/main/java/org/apache/rocketmq/broker/config/v2/ConfigStorage.java index 6bc62957a8..c4056d142f 100644 --- a/broker/src/main/java/org/apache/rocketmq/broker/config/v2/ConfigStorage.java +++ b/broker/src/main/java/org/apache/rocketmq/broker/config/v2/ConfigStorage.java @@ -16,17 +16,29 @@ */ package org.apache.rocketmq.broker.config.v2; +import com.google.common.base.Stopwatch; +import com.google.common.util.concurrent.ThreadFactoryBuilder; +import io.netty.buffer.PooledByteBufAllocatorMetric; import io.netty.util.internal.PlatformDependent; import java.io.File; import java.nio.ByteBuffer; import java.nio.charset.StandardCharsets; import java.util.ArrayList; import java.util.List; +import java.util.concurrent.ScheduledExecutorService; +import java.util.concurrent.ScheduledThreadPoolExecutor; +import java.util.concurrent.ThreadFactory; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicInteger; +import java.util.concurrent.atomic.AtomicLong; +import org.apache.rocketmq.common.ServiceThread; import org.apache.rocketmq.common.UtilAll; import org.apache.rocketmq.common.config.AbstractRocksDBStorage; import org.apache.rocketmq.common.config.ConfigHelper; +import org.apache.rocketmq.store.config.MessageStoreConfig; import org.rocksdb.ColumnFamilyDescriptor; import org.rocksdb.ColumnFamilyOptions; +import org.rocksdb.FlushOptions; import org.rocksdb.ReadOptions; import org.rocksdb.RocksDB; import org.rocksdb.RocksDBException; @@ -43,8 +55,48 @@ public class ConfigStorage extends AbstractRocksDBStorage { public static final String DATA_VERSION_KEY = "data_version"; public static final byte[] DATA_VERSION_KEY_BYTES = DATA_VERSION_KEY.getBytes(StandardCharsets.UTF_8); - public ConfigStorage(String storePath) { - super(storePath + File.separator + "config" + File.separator + "rdb"); + private final ScheduledExecutorService scheduledExecutorService; + + /** + * Number of write ops since previous flush. + */ + private final AtomicInteger writeOpsCounter; + + private final AtomicLong estimateWalFileSize = new AtomicLong(0L); + + private final MessageStoreConfig messageStoreConfig; + + private final FlushSyncService flushSyncService; + + public ConfigStorage(MessageStoreConfig messageStoreConfig) { + super(messageStoreConfig.getStorePathRootDir() + File.separator + "config" + File.separator + "rdb"); + this.messageStoreConfig = messageStoreConfig; + ThreadFactory threadFactory = new ThreadFactoryBuilder() + .setDaemon(true) + .setNameFormat("config-storage-%d") + .build(); + scheduledExecutorService = new ScheduledThreadPoolExecutor(1, threadFactory); + writeOpsCounter = new AtomicInteger(0); + this.flushSyncService = new FlushSyncService(); + this.flushSyncService.setDaemon(true); + } + + private void statNettyMemory() { + PooledByteBufAllocatorMetric metric = AbstractRocksDBStorage.POOLED_ALLOCATOR.metric(); + LOGGER.info("Netty Memory Usage: {}", metric); + } + + @Override + public synchronized boolean start() { + boolean started = super.start(); + if (started) { + scheduledExecutorService.scheduleWithFixedDelay(() -> statRocksdb(LOGGER), 1, 10, TimeUnit.SECONDS); + scheduledExecutorService.scheduleWithFixedDelay(this::statNettyMemory, 10, 10, TimeUnit.SECONDS); + this.flushSyncService.start(); + } else { + LOGGER.error("Failed to start config storage"); + } + return started; } @Override @@ -58,7 +110,7 @@ public class ConfigStorage extends AbstractRocksDBStorage { initOptions(); List<ColumnFamilyDescriptor> cfDescriptors = new ArrayList<>(); - ColumnFamilyOptions defaultOptions = ConfigHelper.createConfigOptions(); + ColumnFamilyOptions defaultOptions = ConfigHelper.createConfigColumnFamilyOptions(); this.cfOptions.add(defaultOptions); cfDescriptors.add(new ColumnFamilyDescriptor(RocksDB.DEFAULT_COLUMN_FAMILY, defaultOptions)); @@ -66,7 +118,7 @@ public class ConfigStorage extends AbstractRocksDBStorage { open(cfDescriptors); this.defaultCFHandle = cfHandles.get(0); - } catch (final Exception e) { + } catch (Exception e) { AbstractRocksDBStorage.LOGGER.error("postLoad Failed. {}", this.dbPath, e); return false; } @@ -75,7 +127,8 @@ public class ConfigStorage extends AbstractRocksDBStorage { @Override protected void preShutdown() { - + scheduledExecutorService.shutdown(); + flushSyncService.shutdown(); } protected void initOptions() { @@ -105,6 +158,12 @@ public class ConfigStorage extends AbstractRocksDBStorage { public void write(WriteBatch writeBatch) throws RocksDBException { db.write(ableWalWriteOptions, writeBatch); + accountWriteOps(writeBatch.getDataSize()); + } + + private void accountWriteOps(long dataSize) { + writeOpsCounter.incrementAndGet(); + estimateWalFileSize.addAndGet(dataSize); } public RocksIterator iterate(ByteBuffer beginKey, ByteBuffer endKey) { @@ -125,4 +184,91 @@ public class ConfigStorage extends AbstractRocksDBStorage { return iterator; } } + + /** + * RocksDB writes contain 3 stages: application memory buffer --> OS Page Cache --> Disk. + * Given that we are having DBOptions::manual_wal_flush, we need to manually call DB::FlushWAL and DB::SyncWAL + * Note: DB::FlushWAL(true) will internally call DB::SyncWAL. + * <p> + * See <a href="https://rocksdb.org/blog/2017/08/25/flushwal.html">Flush And Sync WAL</a> + */ + class FlushSyncService extends ServiceThread { + + private long lastSyncTime = 0; + + private static final long MAX_SYNC_INTERVAL_IN_MILLIS = 100; + + private final Stopwatch stopwatch = Stopwatch.createUnstarted(); + + private final FlushOptions flushOptions = new FlushOptions(); + + @Override + public String getServiceName() { + return "FlushSyncService"; + } + + @Override + public void run() { + flushOptions.setAllowWriteStall(false); + flushOptions.setWaitForFlush(true); + log.info("{} service started", this.getServiceName()); + while (!this.isStopped()) { + try { + this.waitForRunning(10); + this.flushAndSyncWAL(false); + } catch (Exception e) { + log.warn("{} service has exception. ", this.getServiceName(), e); + } + } + try { + flushAndSyncWAL(true); + } catch (Exception e) { + log.warn("{} raised an exception while performing flush-and-sync WAL on exit", + this.getServiceName(), e); + } + flushOptions.close(); + log.info("{} service end", this.getServiceName()); + } + + private void flushAndSyncWAL(boolean onExit) throws RocksDBException { + int writeOps = writeOpsCounter.get(); + if (0 == writeOps) { + // No write ops to flush + return; + } + + /* + * Normally, when MemTables become full then immutable, RocksDB threads will automatically flush them to L0 + * SST files. The use case here is different: the MemTable may never get full and immutable given that the + * volume of data involved is relatively small. Further, we are constantly modifying the key-value pairs and + * generating WAL entries. The WAL file size can grow up to dozens of gigabytes without manual triggering of + * flush. + */ + if (ConfigStorage.this.estimateWalFileSize.get() >= messageStoreConfig.getRocksdbWalFileRollingThreshold()) { + ConfigStorage.this.flush(flushOptions); + estimateWalFileSize.set(0L); + } + + // Flush and Sync WAL if we have committed enough writes + if (writeOps >= messageStoreConfig.getRocksdbFlushWalFrequency() || onExit) { + stopwatch.reset().start(); + ConfigStorage.this.db.flushWal(true); + long elapsed = stopwatch.stop().elapsed(TimeUnit.MILLISECONDS); + writeOpsCounter.getAndAdd(-writeOps); + lastSyncTime = System.currentTimeMillis(); + LOGGER.debug("Flush and Sync WAL of RocksDB[{}] costs {}ms, write-ops={}", dbPath, elapsed, writeOps); + return; + } + // Flush and Sync WAL if some writes are out there for a period of time + long elapsedTime = System.currentTimeMillis() - lastSyncTime; + if (elapsedTime > MAX_SYNC_INTERVAL_IN_MILLIS) { + stopwatch.reset().start(); + ConfigStorage.this.db.flushWal(true); + long elapsed = stopwatch.stop().elapsed(TimeUnit.MILLISECONDS); + LOGGER.debug("Flush and Sync WAL of RocksDB[{}] costs {}ms, write-ops={}", dbPath, elapsed, writeOps); + writeOpsCounter.getAndAdd(-writeOps); + lastSyncTime = System.currentTimeMillis(); + } + } + } } diff --git a/broker/src/main/java/org/apache/rocketmq/broker/config/v2/ConsumerOffsetManagerV2.java b/broker/src/main/java/org/apache/rocketmq/broker/config/v2/ConsumerOffsetManagerV2.java index 2c5d3677d8..1821c801cb 100644 --- a/broker/src/main/java/org/apache/rocketmq/broker/config/v2/ConsumerOffsetManagerV2.java +++ b/broker/src/main/java/org/apache/rocketmq/broker/config/v2/ConsumerOffsetManagerV2.java @@ -97,7 +97,7 @@ public class ConsumerOffsetManagerV2 extends ConsumerOffsetManager { // TODO: we have to make a copy here as WriteBatch lacks ByteBuffer API here writeBatch.deleteRange(ConfigHelper.readBytes(beginKey), ConfigHelper.readBytes(endKey)); long stateMachineVersion = brokerController.getMessageStore() != null ? brokerController.getMessageStore().getStateMachineVersion() : 0; - ConfigHelper.stampDataVersion(writeBatch, dataVersion, stateMachineVersion); + ConfigHelper.stampDataVersion(writeBatch, TableId.CONSUMER_OFFSET, dataVersion, stateMachineVersion); configStorage.write(writeBatch); } catch (RocksDBException e) { LOG.error("Failed to removeConsumerOffset, topicAtGroup={}", topicAtGroup, e); @@ -138,7 +138,7 @@ public class ConsumerOffsetManagerV2 extends ConsumerOffsetManager { writeBatch.deleteRange(ConfigHelper.readBytes(beginKey), ConfigHelper.readBytes(endKey)); MessageStore messageStore = brokerController.getMessageStore(); long stateMachineVersion = messageStore != null ? messageStore.getStateMachineVersion() : 0; - ConfigHelper.stampDataVersion(writeBatch, dataVersion, stateMachineVersion); + ConfigHelper.stampDataVersion(writeBatch, TableId.CONSUMER_OFFSET, dataVersion, stateMachineVersion); configStorage.write(writeBatch); } catch (RocksDBException e) { LOG.error("Failed to consumer offsets by group={}", group, e); @@ -194,7 +194,7 @@ public class ConsumerOffsetManagerV2 extends ConsumerOffsetManager { writeBatch.put(keyBuf.nioBuffer(), valueBuf.nioBuffer()); MessageStore messageStore = brokerController.getMessageStore(); long stateMachineVersion = messageStore != null ? messageStore.getStateMachineVersion() : 0; - ConfigHelper.stampDataVersion(writeBatch, dataVersion, stateMachineVersion); + ConfigHelper.stampDataVersion(writeBatch, TableId.CONSUMER_OFFSET, dataVersion, stateMachineVersion); configStorage.write(writeBatch); } catch (RocksDBException e) { LOG.error("Failed to commit consumer offset", e); @@ -394,7 +394,7 @@ public class ConsumerOffsetManagerV2 extends ConsumerOffsetManager { try (WriteBatch writeBatch = new WriteBatch()) { writeBatch.put(keyBuf.nioBuffer(), valueBuf.nioBuffer()); long stateMachineVersion = brokerController.getMessageStore() != null ? brokerController.getMessageStore().getStateMachineVersion() : 0; - ConfigHelper.stampDataVersion(writeBatch, dataVersion, stateMachineVersion); + ConfigHelper.stampDataVersion(writeBatch, TableId.PULL_OFFSET, dataVersion, stateMachineVersion); configStorage.write(writeBatch); } catch (RocksDBException e) { LOG.error("Failed to commit pull offset. group={}, topic={}, queueId={}, offset={}", diff --git a/broker/src/main/java/org/apache/rocketmq/broker/config/v2/SubscriptionGroupManagerV2.java b/broker/src/main/java/org/apache/rocketmq/broker/config/v2/SubscriptionGroupManagerV2.java index dea8a2d2c1..dd67871f18 100644 --- a/broker/src/main/java/org/apache/rocketmq/broker/config/v2/SubscriptionGroupManagerV2.java +++ b/broker/src/main/java/org/apache/rocketmq/broker/config/v2/SubscriptionGroupManagerV2.java @@ -137,8 +137,10 @@ public class SubscriptionGroupManagerV2 extends SubscriptionGroupManager { try (WriteBatch writeBatch = new WriteBatch()) { writeBatch.put(keyBuf.nioBuffer(), valueBuf.nioBuffer()); long stateMachineVersion = brokerController.getMessageStore() != null ? brokerController.getMessageStore().getStateMachineVersion() : 0; - ConfigHelper.stampDataVersion(writeBatch, dataVersion, stateMachineVersion); + ConfigHelper.stampDataVersion(writeBatch, TableId.SUBSCRIPTION_GROUP, dataVersion, stateMachineVersion); configStorage.write(writeBatch); + // fdatasync on core metadata change + persist(); } catch (RocksDBException e) { log.error("update subscription group config error", e); } finally { @@ -163,7 +165,7 @@ public class SubscriptionGroupManagerV2 extends SubscriptionGroupManager { try (WriteBatch writeBatch = new WriteBatch()) { writeBatch.delete(ConfigHelper.readBytes(keyBuf)); long stateMachineVersion = brokerController.getMessageStore().getStateMachineVersion(); - ConfigHelper.stampDataVersion(writeBatch, dataVersion, stateMachineVersion); + ConfigHelper.stampDataVersion(writeBatch, TableId.SUBSCRIPTION_GROUP, dataVersion, stateMachineVersion); configStorage.write(writeBatch); } catch (RocksDBException e) { log.error("Failed to remove subscription group config by group-name={}", groupName, e); diff --git a/broker/src/main/java/org/apache/rocketmq/broker/config/v2/TopicConfigManagerV2.java b/broker/src/main/java/org/apache/rocketmq/broker/config/v2/TopicConfigManagerV2.java index 4e36b08727..7991d70445 100644 --- a/broker/src/main/java/org/apache/rocketmq/broker/config/v2/TopicConfigManagerV2.java +++ b/broker/src/main/java/org/apache/rocketmq/broker/config/v2/TopicConfigManagerV2.java @@ -151,8 +151,10 @@ public class TopicConfigManagerV2 extends TopicConfigManager { try (WriteBatch writeBatch = new WriteBatch()) { writeBatch.put(keyBuf.nioBuffer(), valueBuf.nioBuffer()); long stateMachineVersion = brokerController.getMessageStore() != null ? brokerController.getMessageStore().getStateMachineVersion() : 0; - ConfigHelper.stampDataVersion(writeBatch, dataVersion, stateMachineVersion); + ConfigHelper.stampDataVersion(writeBatch, TableId.TOPIC, dataVersion, stateMachineVersion); configStorage.write(writeBatch); + // fdatasync on core metadata change + this.persist(); } catch (RocksDBException e) { log.error("Failed to update topic config", e); } finally { @@ -167,7 +169,7 @@ public class TopicConfigManagerV2 extends TopicConfigManager { try (WriteBatch writeBatch = new WriteBatch()) { writeBatch.delete(keyBuf.nioBuffer()); long stateMachineVersion = brokerController.getMessageStore() != null ? brokerController.getMessageStore().getStateMachineVersion() : 0; - ConfigHelper.stampDataVersion(writeBatch, dataVersion, stateMachineVersion); + ConfigHelper.stampDataVersion(writeBatch, TableId.TOPIC, dataVersion, stateMachineVersion); configStorage.write(writeBatch); } catch (RocksDBException e) { log.error("Failed to delete topic config by topicName={}", topicName, e); diff --git a/broker/src/test/java/org/apache/rocketmq/broker/config/v2/ConsumerOffsetManagerV2Test.java b/broker/src/test/java/org/apache/rocketmq/broker/config/v2/ConsumerOffsetManagerV2Test.java index d7f46855e1..132bd5c1a5 100644 --- a/broker/src/test/java/org/apache/rocketmq/broker/config/v2/ConsumerOffsetManagerV2Test.java +++ b/broker/src/test/java/org/apache/rocketmq/broker/config/v2/ConsumerOffsetManagerV2Test.java @@ -23,6 +23,7 @@ import org.apache.rocketmq.broker.BrokerController; import org.apache.rocketmq.broker.offset.ConsumerOffsetManager; import org.apache.rocketmq.common.BrokerConfig; import org.apache.rocketmq.common.MixAll; +import org.apache.rocketmq.store.config.MessageStoreConfig; import org.junit.After; import org.junit.Assert; import org.junit.Before; @@ -44,6 +45,8 @@ public class ConsumerOffsetManagerV2Test { @Mock private BrokerController controller; + private MessageStoreConfig messageStoreConfig; + @Rule public TemporaryFolder tf = new TemporaryFolder(); @@ -60,7 +63,9 @@ public class ConsumerOffsetManagerV2Test { Mockito.doReturn(brokerConfig).when(controller).getBrokerConfig(); File configStoreDir = tf.newFolder(); - configStorage = new ConfigStorage(configStoreDir.getAbsolutePath()); + messageStoreConfig = new MessageStoreConfig(); + messageStoreConfig.setStorePathRootDir(configStoreDir.getAbsolutePath()); + configStorage = new ConfigStorage(messageStoreConfig); configStorage.start(); consumerOffsetManagerV2 = new ConsumerOffsetManagerV2(controller, configStorage); } @@ -84,7 +89,9 @@ public class ConsumerOffsetManagerV2Test { consumerOffsetManagerV2.getOffsetTable().clear(); Assert.assertEquals(-1L, consumerOffsetManagerV2.queryOffset(group, topic, queueId)); + configStorage = new ConfigStorage(messageStoreConfig); configStorage.start(); + consumerOffsetManagerV2 = new ConsumerOffsetManagerV2(controller, configStorage); consumerOffsetManagerV2.load(); Assert.assertEquals(queueOffset, consumerOffsetManagerV2.queryOffset(group, topic, queueId)); } @@ -106,7 +113,9 @@ public class ConsumerOffsetManagerV2Test { configStorage.shutdown(); + configStorage = new ConfigStorage(messageStoreConfig); configStorage.start(); + consumerOffsetManagerV2 = new ConsumerOffsetManagerV2(controller, configStorage); consumerOffsetManagerV2.load(); Assert.assertEquals(queueOffset, consumerOffsetManagerV2.queryOffset(group, topic, queueId)); } @@ -129,7 +138,9 @@ public class ConsumerOffsetManagerV2Test { configStorage.shutdown(); + configStorage = new ConfigStorage(messageStoreConfig); configStorage.start(); + consumerOffsetManagerV2 = new ConsumerOffsetManagerV2(controller, configStorage); consumerOffsetManagerV2.load(); Assert.assertEquals(queueOffset, consumerOffsetManagerV2.queryPullOffset(group, topic, queueId)); } @@ -157,7 +168,10 @@ public class ConsumerOffsetManagerV2Test { Assert.assertEquals(queueOffset, consumerOffsetManagerV2.queryOffset(group, topic2, queueId)); configStorage.shutdown(); + + configStorage = new ConfigStorage(messageStoreConfig); configStorage.start(); + consumerOffsetManagerV2 = new ConsumerOffsetManagerV2(controller, configStorage); consumerOffsetManagerV2.load(); Assert.assertEquals(-1L, consumerOffsetManagerV2.queryOffset(group, topic, queueId)); Assert.assertEquals(queueOffset, consumerOffsetManagerV2.queryOffset(group, topic2, queueId)); @@ -184,7 +198,10 @@ public class ConsumerOffsetManagerV2Test { Assert.assertEquals(-1L, consumerOffsetManagerV2.queryOffset(group, topic2, queueId)); configStorage.shutdown(); + + configStorage = new ConfigStorage(messageStoreConfig); configStorage.start(); + consumerOffsetManagerV2 = new ConsumerOffsetManagerV2(controller, configStorage); consumerOffsetManagerV2.load(); Assert.assertEquals(-1L, consumerOffsetManagerV2.queryOffset(group, topic, queueId)); Assert.assertEquals(-1L, consumerOffsetManagerV2.queryOffset(group, topic2, queueId)); diff --git a/broker/src/test/java/org/apache/rocketmq/broker/config/v2/SubscriptionGroupManagerV2Test.java b/broker/src/test/java/org/apache/rocketmq/broker/config/v2/SubscriptionGroupManagerV2Test.java index 6d436a7c4d..4ff8a81e60 100644 --- a/broker/src/test/java/org/apache/rocketmq/broker/config/v2/SubscriptionGroupManagerV2Test.java +++ b/broker/src/test/java/org/apache/rocketmq/broker/config/v2/SubscriptionGroupManagerV2Test.java @@ -25,6 +25,7 @@ import org.apache.rocketmq.remoting.protocol.subscription.GroupRetryPolicy; import org.apache.rocketmq.remoting.protocol.subscription.GroupRetryPolicyType; import org.apache.rocketmq.remoting.protocol.subscription.SubscriptionGroupConfig; import org.apache.rocketmq.store.MessageStore; +import org.apache.rocketmq.store.config.MessageStoreConfig; import org.junit.After; import org.junit.Assert; import org.junit.Before; @@ -38,6 +39,9 @@ import org.mockito.junit.MockitoJUnitRunner; @RunWith(MockitoJUnitRunner.class) public class SubscriptionGroupManagerV2Test { + + private MessageStoreConfig messageStoreConfig; + private ConfigStorage configStorage; private SubscriptionGroupManagerV2 subscriptionGroupManagerV2; @@ -68,7 +72,9 @@ public class SubscriptionGroupManagerV2Test { Mockito.doReturn(1L).when(messageStore).getStateMachineVersion(); File configStoreDir = tf.newFolder(); - configStorage = new ConfigStorage(configStoreDir.getAbsolutePath()); + messageStoreConfig = new MessageStoreConfig(); + messageStoreConfig.setStorePathRootDir(configStoreDir.getAbsolutePath()); + configStorage = new ConfigStorage(messageStoreConfig); configStorage.start(); subscriptionGroupManagerV2 = new SubscriptionGroupManagerV2(controller, configStorage); } @@ -98,7 +104,10 @@ public class SubscriptionGroupManagerV2Test { subscriptionGroupManagerV2.getSubscriptionGroupTable().clear(); configStorage.shutdown(); + + configStorage = new ConfigStorage(messageStoreConfig); configStorage.start(); + subscriptionGroupManagerV2 = new SubscriptionGroupManagerV2(controller, configStorage); subscriptionGroupManagerV2.load(); found = subscriptionGroupManagerV2.findSubscriptionGroupConfig(subscriptionGroupConfig.getGroupName()); Assert.assertEquals(subscriptionGroupConfig, found); @@ -132,7 +141,11 @@ public class SubscriptionGroupManagerV2Test { Assert.assertNull(found); configStorage.shutdown(); + + configStorage = new ConfigStorage(messageStoreConfig); configStorage.start(); + + subscriptionGroupManagerV2 = new SubscriptionGroupManagerV2(controller, configStorage); subscriptionGroupManagerV2.load(); found = subscriptionGroupManagerV2.findSubscriptionGroupConfig(subscriptionGroupConfig.getGroupName()); Assert.assertNull(found); diff --git a/broker/src/test/java/org/apache/rocketmq/broker/config/v2/TopicConfigManagerV2Test.java b/broker/src/test/java/org/apache/rocketmq/broker/config/v2/TopicConfigManagerV2Test.java index 92c936b110..731a1f538f 100644 --- a/broker/src/test/java/org/apache/rocketmq/broker/config/v2/TopicConfigManagerV2Test.java +++ b/broker/src/test/java/org/apache/rocketmq/broker/config/v2/TopicConfigManagerV2Test.java @@ -23,6 +23,7 @@ import java.io.IOException; import org.apache.rocketmq.broker.BrokerController; import org.apache.rocketmq.common.BrokerConfig; import org.apache.rocketmq.common.TopicConfig; +import org.apache.rocketmq.store.MessageStore; import org.apache.rocketmq.store.config.MessageStoreConfig; import org.junit.After; import org.junit.Assert; @@ -35,17 +36,19 @@ import org.mockito.Mock; import org.mockito.Mockito; import org.mockito.junit.MockitoJUnitRunner; - @RunWith(value = MockitoJUnitRunner.class) public class TopicConfigManagerV2Test { - private ConfigStorage configStorage; + private MessageStoreConfig messageStoreConfig; - private TopicConfigManagerV2 topicConfigManagerV2; + private ConfigStorage configStorage; @Mock private BrokerController controller; + @Mock + private MessageStore messageStore; + @Rule public TemporaryFolder tf = new TemporaryFolder(); @@ -61,17 +64,22 @@ public class TopicConfigManagerV2Test { BrokerConfig brokerConfig = new BrokerConfig(); Mockito.doReturn(brokerConfig).when(controller).getBrokerConfig(); - MessageStoreConfig messageStoreConfig = new MessageStoreConfig(); + messageStoreConfig = new MessageStoreConfig(); Mockito.doReturn(messageStoreConfig).when(controller).getMessageStoreConfig(); + Mockito.doReturn(messageStore).when(controller).getMessageStore(); File configStoreDir = tf.newFolder(); - configStorage = new ConfigStorage(configStoreDir.getAbsolutePath()); + messageStoreConfig.setStorePathRootDir(configStoreDir.getAbsolutePath()); + + configStorage = new ConfigStorage(messageStoreConfig); configStorage.start(); - topicConfigManagerV2 = new TopicConfigManagerV2(controller, configStorage); } @Test public void testUpdateTopicConfig() { + TopicConfigManagerV2 topicConfigManagerV2 = new TopicConfigManagerV2(controller, configStorage); + topicConfigManagerV2.load(); + TopicConfig topicConfig = new TopicConfig(); String topicName = "T1"; topicConfig.setTopicName(topicName); @@ -86,7 +94,9 @@ public class TopicConfigManagerV2Test { topicConfigManagerV2.getTopicConfigTable().clear(); + configStorage = new ConfigStorage(messageStoreConfig); Assert.assertTrue(configStorage.start()); + topicConfigManagerV2 = new TopicConfigManagerV2(controller, configStorage); Assert.assertTrue(topicConfigManagerV2.load()); TopicConfig loaded = topicConfigManagerV2.selectTopicConfig(topicName); @@ -111,12 +121,15 @@ public class TopicConfigManagerV2Test { topicConfig.setWriteQueueNums(4); topicConfig.setOrder(true); topicConfig.setTopicSysFlag(4); + TopicConfigManagerV2 topicConfigManagerV2 = new TopicConfigManagerV2(controller, configStorage); topicConfigManagerV2.updateTopicConfig(topicConfig); topicConfigManagerV2.removeTopicConfig(topicName); Assert.assertFalse(topicConfigManagerV2.containsTopic(topicName)); Assert.assertTrue(configStorage.shutdown()); + configStorage = new ConfigStorage(messageStoreConfig); Assert.assertTrue(configStorage.start()); + topicConfigManagerV2 = new TopicConfigManagerV2(controller, configStorage); Assert.assertTrue(topicConfigManagerV2.load()); Assert.assertFalse(topicConfigManagerV2.containsTopic(topicName)); } diff --git a/broker/src/test/java/org/apache/rocketmq/broker/offset/RocksdbTransferOffsetAndCqTest.java b/broker/src/test/java/org/apache/rocketmq/broker/offset/RocksdbTransferOffsetAndCqTest.java index 4b320eb53f..6a805b0434 100644 --- a/broker/src/test/java/org/apache/rocketmq/broker/offset/RocksdbTransferOffsetAndCqTest.java +++ b/broker/src/test/java/org/apache/rocketmq/broker/offset/RocksdbTransferOffsetAndCqTest.java @@ -23,11 +23,11 @@ import java.util.HashMap; import java.util.UUID; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ConcurrentMap; +import java.util.concurrent.TimeUnit; import org.apache.commons.collections.MapUtils; import org.apache.rocketmq.broker.BrokerController; import org.apache.rocketmq.broker.config.v1.RocksDBConsumerOffsetManager; import org.apache.rocketmq.common.BrokerConfig; -import org.apache.rocketmq.common.MixAll; import org.apache.rocketmq.common.Pair; import org.apache.rocketmq.common.TopicConfig; import org.apache.rocketmq.store.DefaultMessageStore; @@ -38,6 +38,7 @@ import org.apache.rocketmq.store.queue.ConsumeQueueInterface; import org.apache.rocketmq.store.queue.ConsumeQueueStoreInterface; import org.apache.rocketmq.store.queue.CqUnit; import org.apache.rocketmq.store.stats.BrokerStatsManager; +import org.awaitility.Awaitility; import org.junit.Assert; import org.junit.Before; import org.junit.Test; @@ -135,6 +136,7 @@ public class RocksdbTransferOffsetAndCqTest { } RocksDBMessageStore kvStore = defaultMessageStore.getRocksDBMessageStore(); ConsumeQueueStoreInterface store = kvStore.getConsumeQueueStore(); + store.start(); ConsumeQueueInterface rocksdbCq = defaultMessageStore.getRocksDBMessageStore().findConsumeQueue(topic, queueId); ConsumeQueueInterface fileCq = defaultMessageStore.findConsumeQueue(topic, queueId); for (int i = 0; i < 200; i++) { @@ -142,13 +144,21 @@ public class RocksdbTransferOffsetAndCqTest { fileCq.putMessagePositionInfoWrapper(request); store.putMessagePositionInfoWrapper(request); } + Awaitility.await() + .pollInterval(100, TimeUnit.MILLISECONDS) + .atMost(3, TimeUnit.SECONDS) + .until(() -> rocksdbCq.getMaxOffsetInQueue() == 200); Pair<CqUnit, Long> unit = rocksdbCq.getCqUnitAndStoreTime(100); Pair<CqUnit, Long> unit1 = fileCq.getCqUnitAndStoreTime(100); - Assert.assertTrue(unit.getObject1().getPos() == unit1.getObject1().getPos()); + Assert.assertEquals(unit.getObject1().getPos(), unit1.getObject1().getPos()); } + /** + * No need to skip macOS platform. + * @return true if some platform is NOT a good fit for this test case. + */ private boolean notToBeExecuted() { - return MixAll.isMac(); + return false; } } diff --git a/common/src/main/java/org/apache/rocketmq/common/config/AbstractRocksDBStorage.java b/common/src/main/java/org/apache/rocketmq/common/config/AbstractRocksDBStorage.java index 28ed4e924c..48ba4b8086 100644 --- a/common/src/main/java/org/apache/rocketmq/common/config/AbstractRocksDBStorage.java +++ b/common/src/main/java/org/apache/rocketmq/common/config/AbstractRocksDBStorage.java @@ -121,14 +121,16 @@ public abstract class AbstractRocksDBStorage { this.writeOptions = new WriteOptions(); this.writeOptions.setSync(false); this.writeOptions.setDisableWAL(true); - this.writeOptions.setNoSlowdown(true); + // https://github.com/facebook/rocksdb/wiki/Write-Stalls + this.writeOptions.setNoSlowdown(false); } protected void initAbleWalWriteOptions() { this.ableWalWriteOptions = new WriteOptions(); this.ableWalWriteOptions.setSync(false); this.ableWalWriteOptions.setDisableWAL(false); - this.ableWalWriteOptions.setNoSlowdown(true); + // https://github.com/facebook/rocksdb/wiki/Write-Stalls + this.ableWalWriteOptions.setNoSlowdown(false); } protected void initReadOptions() { @@ -363,7 +365,7 @@ public abstract class AbstractRocksDBStorage { } if (postLoad()) { this.loaded = true; - LOGGER.info("start OK. {}", this.dbPath); + LOGGER.info("RocksDB[{}] starts OK", this.dbPath); this.closed = false; return true; } else { @@ -560,7 +562,15 @@ public abstract class AbstractRocksDBStorage { public void statRocksdb(Logger logger) { try { + // Log Memory Usage + String blockCacheMemUsage = this.db.getProperty("rocksdb.block-cache-usage"); + String indexesAndFilterBlockMemUsage = this.db.getProperty("rocksdb.estimate-table-readers-mem"); + String memTableMemUsage = this.db.getProperty("rocksdb.cur-size-all-mem-tables"); + String blocksPinnedByIteratorMemUsage = this.db.getProperty("rocksdb.block-cache-pinned-usage"); + logger.info("RocksDB Memory Usage: BlockCache: {}, IndexesAndFilterBlock: {}, MemTable: {}, BlocksPinnedByIterator: {}", + blockCacheMemUsage, indexesAndFilterBlockMemUsage, memTableMemUsage, blocksPinnedByIteratorMemUsage); + // Log file metadata by level List<LiveFileMetaData> liveFileMetaDataList = this.getCompactionStatus(); if (liveFileMetaDataList == null || liveFileMetaDataList.isEmpty()) { return; @@ -570,21 +580,13 @@ public abstract class AbstractRocksDBStorage { StringBuilder sb = map.computeIfAbsent(metaData.level(), k -> new StringBuilder(256)); sb.append(new String(metaData.columnFamilyName(), StandardCharsets.UTF_8)).append(SPACE). append(metaData.fileName()).append(SPACE). - append("s: ").append(metaData.size()).append(SPACE). - append("a: ").append(metaData.numEntries()).append(SPACE). - append("r: ").append(metaData.numReadsSampled()).append(SPACE). - append("d: ").append(metaData.numDeletions()).append(SPACE). - append(metaData.beingCompacted()).append("\n"); + append("file-size: ").append(metaData.size()).append(SPACE). + append("number-of-entries: ").append(metaData.numEntries()).append(SPACE). + append("file-read-times: ").append(metaData.numReadsSampled()).append(SPACE). + append("deletions: ").append(metaData.numDeletions()).append(SPACE). + append("being-compacted: ").append(metaData.beingCompacted()).append("\n"); } - map.forEach((key, value) -> logger.info("level: {}\n{}", key, value.toString())); - - String blockCacheMemUsage = this.db.getProperty("rocksdb.block-cache-usage"); - String indexesAndFilterBlockMemUsage = this.db.getProperty("rocksdb.estimate-table-readers-mem"); - String memTableMemUsage = this.db.getProperty("rocksdb.cur-size-all-mem-tables"); - String blocksPinnedByIteratorMemUsage = this.db.getProperty("rocksdb.block-cache-pinned-usage"); - logger.info("MemUsage. blockCache: {}, indexesAndFilterBlock: {}, MemTable: {}, blocksPinnedByIterator: {}", - blockCacheMemUsage, indexesAndFilterBlockMemUsage, memTableMemUsage, blocksPinnedByIteratorMemUsage); } catch (Exception ignored) { } } diff --git a/common/src/main/java/org/apache/rocketmq/common/config/ConfigHelper.java b/common/src/main/java/org/apache/rocketmq/common/config/ConfigHelper.java index a4ba35bd5a..e3f6f22002 100644 --- a/common/src/main/java/org/apache/rocketmq/common/config/ConfigHelper.java +++ b/common/src/main/java/org/apache/rocketmq/common/config/ConfigHelper.java @@ -38,7 +38,7 @@ import org.rocksdb.WALRecoveryMode; import org.rocksdb.util.SizeUnit; public class ConfigHelper { - public static ColumnFamilyOptions createConfigOptions() { + public static ColumnFamilyOptions createConfigColumnFamilyOptions() { BlockBasedTableConfig blockBasedTableConfig = new BlockBasedTableConfig(). setFormatVersion(5). setIndexType(IndexType.kBinarySearch). @@ -46,7 +46,7 @@ public class ConfigHelper { setBlockSize(32 * SizeUnit.KB). setFilterPolicy(new BloomFilter(16, false)). // Indicating if we'd put index/filter blocks to the block cache. - setCacheIndexAndFilterBlocks(false). + setCacheIndexAndFilterBlocks(true). setCacheIndexAndFilterBlocksWithHighPriority(true). setPinL0FilterAndIndexBlocksInCache(false). setPinTopLevelIndexAndFilter(true). @@ -54,9 +54,8 @@ public class ConfigHelper { setWholeKeyFiltering(true); ColumnFamilyOptions options = new ColumnFamilyOptions(); - return options.setMaxWriteBufferNumber(2). - // MemTable size, MemTable(cache) -> immutable MemTable(cache) -> SST(disk) - setWriteBufferSize(8 * SizeUnit.MB). + return options.setMaxWriteBufferNumber(4). + setWriteBufferSize(64 * SizeUnit.MB). setMinWriteBufferNumberToMerge(1). setTableFormatConfig(blockBasedTableConfig). setMemTableConfig(new SkipListMemTableConfig()). @@ -67,17 +66,17 @@ public class ConfigHelper { setLevel0SlowdownWritesTrigger(8). setLevel0StopWritesTrigger(12). // The target file size for compaction. - setTargetFileSizeBase(64 * SizeUnit.MB). + setTargetFileSizeBase(64 * SizeUnit.MB). setTargetFileSizeMultiplier(2). // The upper-bound of the total size of L1 files in bytes - setMaxBytesForLevelBase(256 * SizeUnit.MB). + setMaxBytesForLevelBase(256 * SizeUnit.MB). setMaxBytesForLevelMultiplier(2). setMergeOperator(new StringAppendOperator()). setInplaceUpdateSupport(true); } public static DBOptions createConfigDBOptions() { - //Turn based on https://github.com/facebook/rocksdb/wiki/RocksDB-Tuning-Guide + // Tune based on https://github.com/facebook/rocksdb/wiki/RocksDB-Tuning-Guide // and http://gitlab.alibaba-inc.com/aloha/aloha/blob/branch_2_5_0/jstorm-core/src/main/java/com/alibaba/jstorm/cache/rocksdb/RocksDbOptionsFactory.java DBOptions options = new DBOptions(); Statistics statistics = new Statistics(); @@ -86,10 +85,20 @@ public class ConfigHelper { setDbLogDir(getDBLogDir()). setInfoLogLevel(InfoLogLevel.INFO_LEVEL). setWalRecoveryMode(WALRecoveryMode.SkipAnyCorruptedRecords). + /* + * We use manual flush to achieve desired balance between reliability and performance: + * for metadata that matters, including {topic, subscription}-config changes, each write incurs a + * flush-and-sync to ensure reliability; for {commit, pull}-offset advancements, group-flush are offered for + * every N(configurable, 1024 by default) writes or aging of writes, similar to OS page-cache flush + * mechanism. + */ setManualWalFlush(true). - setMaxTotalWalSize(500 * SizeUnit.MB). - setWalSizeLimitMB(0). - setWalTtlSeconds(0). + // This option takes effect only when we have multiple column families + // https://github.com/facebook/rocksdb/issues/4180 + // setMaxTotalWalSize(1024 * SizeUnit.MB). + setDbWriteBufferSize(128 * SizeUnit.MB). + setBytesPerSync(SizeUnit.MB). + setWalBytesPerSync(SizeUnit.MB). setCreateIfMissing(true). setCreateMissingColumnFamilies(true). setMaxOpenFiles(-1). @@ -99,7 +108,6 @@ public class ConfigHelper { setAllowConcurrentMemtableWrite(false). setStatistics(statistics). setStatsDumpPeriodSec(600). - setAtomicFlush(true). setMaxBackgroundJobs(32). setMaxSubcompactions(4). setParanoidChecks(true). diff --git a/common/src/main/java/org/apache/rocketmq/common/config/ConfigRocksDBStorage.java b/common/src/main/java/org/apache/rocketmq/common/config/ConfigRocksDBStorage.java index 3b924a6a0d..5fd9bab2d7 100644 --- a/common/src/main/java/org/apache/rocketmq/common/config/ConfigRocksDBStorage.java +++ b/common/src/main/java/org/apache/rocketmq/common/config/ConfigRocksDBStorage.java @@ -70,7 +70,7 @@ public class ConfigRocksDBStorage extends AbstractRocksDBStorage { final List<ColumnFamilyDescriptor> cfDescriptors = new ArrayList<>(); - ColumnFamilyOptions defaultOptions = ConfigHelper.createConfigOptions(); + ColumnFamilyOptions defaultOptions = ConfigHelper.createConfigColumnFamilyOptions(); this.cfOptions.add(defaultOptions); cfDescriptors.add(new ColumnFamilyDescriptor(RocksDB.DEFAULT_COLUMN_FAMILY, defaultOptions)); cfDescriptors.add(new ColumnFamilyDescriptor(KV_DATA_VERSION_COLUMN_FAMILY_NAME, defaultOptions)); diff --git a/store/src/main/java/org/apache/rocketmq/store/CommitLog.java b/store/src/main/java/org/apache/rocketmq/store/CommitLog.java index 7cf9746551..d30691908b 100644 --- a/store/src/main/java/org/apache/rocketmq/store/CommitLog.java +++ b/store/src/main/java/org/apache/rocketmq/store/CommitLog.java @@ -326,22 +326,26 @@ public class CommitLog implements Swappable { boolean checkDupInfo = this.defaultMessageStore.getMessageStoreConfig().isDuplicationEnable(); final List<MappedFile> mappedFiles = this.mappedFileQueue.getMappedFiles(); if (!mappedFiles.isEmpty()) { - // Began to recover from the last third file - int index = mappedFiles.size() - 3; - if (index < 0) { - index = 0; + int index = mappedFiles.size() - 1; + while (index > 0) { + MappedFile mappedFile = mappedFiles.get(index); + if (mappedFile.getFileFromOffset() <= maxPhyOffsetOfConsumeQueue) { + // It's safe to recover from this mapped file + break; + } + index--; } + // TODO: Discuss if we need to load more commit-log mapped files into memory. MappedFile mappedFile = mappedFiles.get(index); ByteBuffer byteBuffer = mappedFile.sliceByteBuffer(); long processOffset = mappedFile.getFileFromOffset(); long mappedFileOffset = 0; long lastValidMsgPhyOffset = this.getConfirmOffset(); - // normal recover doesn't require dispatching - boolean doDispatch = false; while (true) { DispatchRequest dispatchRequest = this.checkMessageAndReturnSize(byteBuffer, checkCRCOnRecover, checkDupInfo); int size = dispatchRequest.getMsgSize(); + boolean doDispatch = dispatchRequest.getCommitLogOffset() > maxPhyOffsetOfConsumeQueue; // Normal data if (dispatchRequest.isSuccess() && size > 0) { lastValidMsgPhyOffset = processOffset + mappedFileOffset; diff --git a/store/src/main/java/org/apache/rocketmq/store/config/MessageStoreConfig.java b/store/src/main/java/org/apache/rocketmq/store/config/MessageStoreConfig.java index fe090e3fa2..6dfdc0b1c8 100644 --- a/store/src/main/java/org/apache/rocketmq/store/config/MessageStoreConfig.java +++ b/store/src/main/java/org/apache/rocketmq/store/config/MessageStoreConfig.java @@ -23,6 +23,7 @@ import org.apache.rocketmq.store.ConsumeQueue; import org.apache.rocketmq.store.StoreType; import org.apache.rocketmq.store.queue.BatchConsumeQueue; import org.rocksdb.CompressionType; +import org.rocksdb.util.SizeUnit; public class MessageStoreConfig { @@ -444,6 +445,13 @@ public class MessageStoreConfig { private String rocksdbCompressionType = CompressionType.LZ4_COMPRESSION.getLibraryName(); + /** + * Flush RocksDB WAL frequency, aka, flush WAL every N write ops. + */ + private int rocksdbFlushWalFrequency = 1024; + + private long rocksdbWalFileRollingThreshold = SizeUnit.GB; + public String getRocksdbCompressionType() { return rocksdbCompressionType; } @@ -1902,6 +1910,22 @@ public class MessageStoreConfig { this.bottomMostCompressionTypeForConsumeQueueStore = bottomMostCompressionTypeForConsumeQueueStore; } + public int getRocksdbFlushWalFrequency() { + return rocksdbFlushWalFrequency; + } + + public void setRocksdbFlushWalFrequency(int rocksdbFlushWalFrequency) { + this.rocksdbFlushWalFrequency = rocksdbFlushWalFrequency; + } + + public long getRocksdbWalFileRollingThreshold() { + return rocksdbWalFileRollingThreshold; + } + + public void setRocksdbWalFileRollingThreshold(long rocksdbWalFileRollingThreshold) { + this.rocksdbWalFileRollingThreshold = rocksdbWalFileRollingThreshold; + } + public int getSpinLockCollisionRetreatOptimalDegree() { return spinLockCollisionRetreatOptimalDegree; } diff --git a/store/src/main/java/org/apache/rocketmq/store/queue/RocksDBConsumeQueueStore.java b/store/src/main/java/org/apache/rocketmq/store/queue/RocksDBConsumeQueueStore.java index 0242ec2309..7e3aa70d02 100644 --- a/store/src/main/java/org/apache/rocketmq/store/queue/RocksDBConsumeQueueStore.java +++ b/store/src/main/java/org/apache/rocketmq/store/queue/RocksDBConsumeQueueStore.java @@ -30,12 +30,14 @@ import java.util.concurrent.Executors; import java.util.concurrent.ScheduledExecutorService; import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicReference; import javax.annotation.Nonnull; import org.apache.commons.io.FileUtils; import org.apache.commons.lang3.StringUtils; import org.apache.rocketmq.common.BoundaryType; import org.apache.rocketmq.common.MixAll; import org.apache.rocketmq.common.Pair; +import org.apache.rocketmq.common.ServiceState; import org.apache.rocketmq.common.ThreadFactoryImpl; import org.apache.rocketmq.common.constant.LoggerName; import org.apache.rocketmq.common.message.MessageConst; @@ -84,6 +86,10 @@ public class RocksDBConsumeQueueStore extends AbstractConsumeQueueStore { private final OffsetInitializer offsetInitializer; + private final RocksGroupCommitService groupCommitService; + + private final AtomicReference<ServiceState> serviceState = new AtomicReference<>(ServiceState.CREATE_JUST); + public RocksDBConsumeQueueStore(DefaultMessageStore messageStore) { super(messageStore); @@ -93,6 +99,7 @@ public class RocksDBConsumeQueueStore extends AbstractConsumeQueueStore { this.rocksDBConsumeQueueOffsetTable = new RocksDBConsumeQueueOffsetTable(rocksDBConsumeQueueTable, rocksDBStorage, messageStore); this.offsetInitializer = new OffsetInitializerRocksDBImpl(this); + this.groupCommitService = new RocksGroupCommitService(this); this.cqBBPairList = new ArrayList<>(16); this.offsetBBPairList = new ArrayList<>(DEFAULT_BYTE_BUFFER_CAPACITY); for (int i = 0; i < DEFAULT_BYTE_BUFFER_CAPACITY; i++) { @@ -123,14 +130,17 @@ public class RocksDBConsumeQueueStore extends AbstractConsumeQueueStore { @Override public void start() { - log.info("RocksDB ConsumeQueueStore start!"); - this.scheduledExecutorService.scheduleAtFixedRate(() -> { - this.rocksDBStorage.statRocksdb(ROCKSDB_LOG); - }, 10, this.messageStoreConfig.getStatRocksDBCQIntervalSec(), TimeUnit.SECONDS); - - this.scheduledExecutorService.scheduleWithFixedDelay(() -> { - cleanDirty(messageStore.getTopicConfigs().keySet()); - }, 10, this.messageStoreConfig.getCleanRocksDBDirtyCQIntervalMin(), TimeUnit.MINUTES); + if (serviceState.compareAndSet(ServiceState.CREATE_JUST, ServiceState.RUNNING)) { + log.info("RocksDB ConsumeQueueStore start!"); + this.groupCommitService.start(); + this.scheduledExecutorService.scheduleAtFixedRate(() -> { + this.rocksDBStorage.statRocksdb(ROCKSDB_LOG); + }, 10, this.messageStoreConfig.getStatRocksDBCQIntervalSec(), TimeUnit.SECONDS); + + this.scheduledExecutorService.scheduleWithFixedDelay(() -> { + cleanDirty(messageStore.getTopicConfigs().keySet()); + }, 10, this.messageStoreConfig.getCleanRocksDBDirtyCQIntervalMin(), TimeUnit.MINUTES); + } } private void cleanDirty(final Set<String> existTopicSet) { @@ -165,18 +175,23 @@ public class RocksDBConsumeQueueStore extends AbstractConsumeQueueStore { @Override public void recover() { - // ignored + start(); } @Override public boolean recoverConcurrently() { + start(); return true; } @Override public boolean shutdown() { - this.scheduledExecutorService.shutdown(); - return shutdownInner(); + if (serviceState.compareAndSet(ServiceState.RUNNING, ServiceState.SHUTDOWN_ALREADY)) { + this.groupCommitService.shutdown(); + this.scheduledExecutorService.shutdown(); + return shutdownInner(); + } + return true; } private boolean shutdownInner() { @@ -188,23 +203,25 @@ public class RocksDBConsumeQueueStore extends AbstractConsumeQueueStore { if (null == request) { return; } - // We are taking advantage of Atomic Flush, this operation is purely memory-based. - // batch and cache in Java heap does not make sense, instead, we should put the metadata into RocksDB immediately - // to optimized overall end-to-end latency. - putMessagePosition(request); + + try { + groupCommitService.putRequest(request); + } catch (InterruptedException e) { + Thread.currentThread().interrupt(); + } } - public void putMessagePosition(DispatchRequest request) throws RocksDBException { + public void putMessagePosition(List<DispatchRequest> requests) throws RocksDBException { final int maxRetries = 30; for (int i = 0; i < maxRetries; i++) { - if (putMessagePosition0(request)) { + if (putMessagePosition0(requests)) { if (this.isCQError) { this.messageStore.getRunningFlags().clearLogicsQueueError(); this.isCQError = false; } return; } else { - ERROR_LOG.warn("{} put cq Failed. retryTime: {}", i); + ERROR_LOG.warn("Put cq Failed. retryTime: {}", i); try { Thread.sleep(100); } catch (InterruptedException ignored) { @@ -219,34 +236,43 @@ public class RocksDBConsumeQueueStore extends AbstractConsumeQueueStore { throw new RocksDBException("put CQ Failed"); } - private boolean putMessagePosition0(DispatchRequest request) { + private boolean putMessagePosition0(List<DispatchRequest> requests) { if (!this.rocksDBStorage.hold()) { return false; } try (WriteBatch writeBatch = new WriteBatch()) { + final int size = requests.size(); + if (size == 0) { + return true; + } long maxPhyOffset = 0; - DispatchEntry entry = DispatchEntry.from(request); - dispatch(entry, writeBatch); - dispatchLMQ(request, writeBatch); - - final int msgSize = request.getMsgSize(); - final long phyOffset = request.getCommitLogOffset(); - if (phyOffset + msgSize >= maxPhyOffset) { - maxPhyOffset = phyOffset + msgSize; + for (int i = size - 1; i >= 0; i--) { + final DispatchRequest request = requests.get(i); + DispatchEntry entry = DispatchEntry.from(request); + dispatch(entry, writeBatch); + dispatchLMQ(request, writeBatch); + + final int msgSize = request.getMsgSize(); + final long phyOffset = request.getCommitLogOffset(); + if (phyOffset + msgSize >= maxPhyOffset) { + maxPhyOffset = phyOffset + msgSize; + } } this.rocksDBConsumeQueueOffsetTable.putMaxPhyAndCqOffset(tempTopicQueueMaxOffsetMap, writeBatch, maxPhyOffset); this.rocksDBStorage.batchPut(writeBatch); + this.rocksDBConsumeQueueOffsetTable.putHeapMaxCqOffset(tempTopicQueueMaxOffsetMap); - long storeTimeStamp = request.getStoreTimestamp(); + + long storeTimeStamp = requests.get(size - 1).getStoreTimestamp(); if (this.messageStore.getMessageStoreConfig().getBrokerRole() == BrokerRole.SLAVE || this.messageStore.getMessageStoreConfig().isEnableDLegerCommitLog()) { this.messageStore.getStoreCheckpoint().setPhysicMsgTimestamp(storeTimeStamp); } this.messageStore.getStoreCheckpoint().setLogicsMsgTimestamp(storeTimeStamp); - notifyMessageArrival(request); + notifyMessageArriveAndClear(requests); return true; } catch (Exception e) { ERROR_LOG.error("putMessagePosition0 failed.", e); @@ -311,9 +337,12 @@ public class RocksDBConsumeQueueStore extends AbstractConsumeQueueStore { } } - private void notifyMessageArrival(DispatchRequest request) { + private void notifyMessageArriveAndClear(List<DispatchRequest> requests) { try { - this.messageStore.notifyMessageArriveIfNecessary(request); + for (DispatchRequest dp : requests) { + this.messageStore.notifyMessageArriveIfNecessary(dp); + } + requests.clear(); } catch (Exception e) { ERROR_LOG.error("notifyMessageArriveAndClear Failed.", e); } @@ -538,4 +567,8 @@ public class RocksDBConsumeQueueStore extends AbstractConsumeQueueStore { } return super.getMaxOffset(topic, queueId); } + + public boolean isStopped() { + return ServiceState.SHUTDOWN_ALREADY == serviceState.get(); + } } diff --git a/store/src/main/java/org/apache/rocketmq/store/queue/RocksGroupCommitService.java b/store/src/main/java/org/apache/rocketmq/store/queue/RocksGroupCommitService.java new file mode 100644 index 0000000000..e2f2c9ee2c --- /dev/null +++ b/store/src/main/java/org/apache/rocketmq/store/queue/RocksGroupCommitService.java @@ -0,0 +1,103 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.rocketmq.store.queue; + +import java.util.ArrayList; +import java.util.List; +import java.util.concurrent.LinkedBlockingQueue; +import java.util.concurrent.TimeUnit; +import org.apache.rocketmq.common.ServiceThread; +import org.apache.rocketmq.store.DispatchRequest; +import org.rocksdb.RocksDBException; + +public class RocksGroupCommitService extends ServiceThread { + + private static final int MAX_BUFFER_SIZE = 100_000; + + private static final int PREFERRED_DISPATCH_REQUEST_COUNT = 256; + + private final LinkedBlockingQueue<DispatchRequest> buffer; + + private final RocksDBConsumeQueueStore store; + + private final List<DispatchRequest> requests = new ArrayList<>(PREFERRED_DISPATCH_REQUEST_COUNT); + + public RocksGroupCommitService(RocksDBConsumeQueueStore store) { + this.store = store; + this.buffer = new LinkedBlockingQueue<>(MAX_BUFFER_SIZE); + } + + @Override + public String getServiceName() { + return "RocksGroupCommit"; + } + + @Override + public void run() { + log.info("{} service started", this.getServiceName()); + while (!this.isStopped()) { + try { + this.waitForRunning(10); + this.doCommit(); + } catch (Exception e) { + log.warn("{} service has exception. ", this.getServiceName(), e); + } + } + log.info("{} service end", this.getServiceName()); + } + + public void putRequest(final DispatchRequest request) throws InterruptedException { + while (!buffer.offer(request, 3, TimeUnit.SECONDS)) { + log.warn("RocksGroupCommitService#buffer is full, 3s elapsed before space becomes available"); + } + this.wakeup(); + } + + private void doCommit() { + while (!buffer.isEmpty()) { + while (true) { + DispatchRequest dispatchRequest = buffer.poll(); + if (null != dispatchRequest) { + requests.add(dispatchRequest); + } + + if (requests.isEmpty()) { + // buffer has been drained + break; + } + + if (null == dispatchRequest || requests.size() >= PREFERRED_DISPATCH_REQUEST_COUNT) { + groupCommit(); + } + } + } + } + + private void groupCommit() { + while (!store.isStopped()) { + try { + // putMessagePosition will clear requests after consume queue building completion + store.putMessagePosition(requests); + break; + } catch (RocksDBException e) { + log.error("Failed to build consume queue in RocksDB", e); + } + } + } + +} diff --git a/store/src/main/java/org/apache/rocketmq/store/rocksdb/RocksDBOptionsFactory.java b/store/src/main/java/org/apache/rocketmq/store/rocksdb/RocksDBOptionsFactory.java index 66f5cbd095..2fac3bf485 100644 --- a/store/src/main/java/org/apache/rocketmq/store/rocksdb/RocksDBOptionsFactory.java +++ b/store/src/main/java/org/apache/rocketmq/store/rocksdb/RocksDBOptionsFactory.java @@ -22,6 +22,7 @@ import org.rocksdb.BlockBasedTableConfig; import org.rocksdb.BloomFilter; import org.rocksdb.ColumnFamilyOptions; import org.rocksdb.CompactionOptionsUniversal; +import org.rocksdb.CompactionPriority; import org.rocksdb.CompactionStopStyle; import org.rocksdb.CompactionStyle; import org.rocksdb.CompressionType; @@ -79,6 +80,7 @@ public class RocksDBOptionsFactory { setCompressionType(compressionType). setBottommostCompressionType(bottomMostCompressionType). setNumLevels(7). + setCompactionPriority(CompactionPriority.MinOverlappingRatio). setCompactionStyle(CompactionStyle.UNIVERSAL). setCompactionOptionsUniversal(compactionOption). setMaxCompactionBytes(100 * SizeUnit.GB). @@ -144,10 +146,8 @@ public class RocksDBOptionsFactory { setInfoLogLevel(InfoLogLevel.INFO_LEVEL). setWalRecoveryMode(WALRecoveryMode.PointInTimeRecovery). setManualWalFlush(true). - setMaxTotalWalSize(0). - setWalSizeLimitMB(0). - setWalTtlSeconds(0). setCreateIfMissing(true). + setBytesPerSync(SizeUnit.MB). setCreateMissingColumnFamilies(true). setMaxOpenFiles(-1). setMaxLogFileSize(SizeUnit.GB). @@ -156,6 +156,7 @@ public class RocksDBOptionsFactory { setAllowConcurrentMemtableWrite(false). setStatistics(statistics). setAtomicFlush(true). + setCompactionReadaheadSize(4 * SizeUnit.MB). setMaxBackgroundJobs(32). setMaxSubcompactions(8). setParanoidChecks(true).