This is an automated email from the ASF dual-hosted git repository. lizhimin pushed a commit to branch develop in repository https://gitbox.apache.org/repos/asf/rocketmq.git
The following commit(s) were added to refs/heads/develop by this push: new 1b42515093 [ISSUE #8129] Support topic reserved time in tiered storage (#8130) 1b42515093 is described below commit 1b42515093fb56a2cabfa754564397e343a357be Author: yuz10 <845238...@qq.com> AuthorDate: Wed May 22 14:09:00 2024 +0800 [ISSUE #8129] Support topic reserved time in tiered storage (#8130) Co-authored-by: yuzhou <yuzh...@huawei.com> --- broker/BUILD.bazel | 2 ++ .../rocketmq/broker/topic/TopicConfigManager.java | 31 ++++++++++++++++++++++ .../apache/rocketmq/common/TopicAttributes.java | 9 +++++++ tieredstore/README.md | 4 +-- .../rocketmq/tieredstore/file/FlatFileStore.java | 4 +-- .../rocketmq/tieredstore/file/FlatMessageFile.java | 7 +++++ .../metrics/TieredStoreMetricsManager.java | 5 ++-- 7 files changed, 56 insertions(+), 6 deletions(-) diff --git a/broker/BUILD.bazel b/broker/BUILD.bazel index 785b765774..0dbc85f945 100644 --- a/broker/BUILD.bazel +++ b/broker/BUILD.bazel @@ -29,6 +29,7 @@ java_library( "//remoting", "//srvutil", "//store", + "//tieredstore", "@maven//:ch_qos_logback_logback_classic", "@maven//:com_alibaba_fastjson", "@maven//:com_alibaba_fastjson2_fastjson2", @@ -81,6 +82,7 @@ java_library( "//filter", "//remoting", "//store", + "//tieredstore", "@maven//:com_alibaba_fastjson", "@maven//:com_alibaba_fastjson2_fastjson2", "@maven//:com_google_guava_guava", diff --git a/broker/src/main/java/org/apache/rocketmq/broker/topic/TopicConfigManager.java b/broker/src/main/java/org/apache/rocketmq/broker/topic/TopicConfigManager.java index 511d29e12a..1ed9cbab5f 100644 --- a/broker/src/main/java/org/apache/rocketmq/broker/topic/TopicConfigManager.java +++ b/broker/src/main/java/org/apache/rocketmq/broker/topic/TopicConfigManager.java @@ -51,6 +51,9 @@ import org.apache.rocketmq.remoting.protocol.body.KVTable; import org.apache.rocketmq.remoting.protocol.body.TopicConfigAndMappingSerializeWrapper; import org.apache.rocketmq.remoting.protocol.body.TopicConfigSerializeWrapper; import org.apache.rocketmq.remoting.protocol.statictopic.TopicQueueMappingInfo; +import org.apache.rocketmq.tieredstore.TieredMessageStore; +import org.apache.rocketmq.tieredstore.metadata.MetadataStore; +import org.apache.rocketmq.tieredstore.metadata.entity.TopicMetadata; import static com.google.common.base.Preconditions.checkNotNull; @@ -501,6 +504,7 @@ public class TopicConfigManager extends ConfigManager { ImmutableMap.copyOf(newAttributes)); topicConfig.setAttributes(finalAttributes); + updateTieredStoreTopicMetadata(topicConfig, newAttributes); TopicConfig old = putTopicConfig(topicConfig); if (old != null) { @@ -515,6 +519,33 @@ public class TopicConfigManager extends ConfigManager { this.persist(topicConfig.getTopicName(), topicConfig); } + private synchronized void updateTieredStoreTopicMetadata(final TopicConfig topicConfig, Map<String, String> newAttributes) { + if (!(brokerController.getMessageStore() instanceof TieredMessageStore)) { + if (newAttributes.get(TopicAttributes.TOPIC_RESERVE_TIME_ATTRIBUTE.getName()) != null) { + throw new IllegalArgumentException("Update topic reserveTime not supported"); + } + return; + } + + String topic = topicConfig.getTopicName(); + long reserveTime = TopicAttributes.TOPIC_RESERVE_TIME_ATTRIBUTE.getDefaultValue(); + String attr = topicConfig.getAttributes().get(TopicAttributes.TOPIC_RESERVE_TIME_ATTRIBUTE.getName()); + if (attr != null) { + reserveTime = Long.parseLong(attr); + } + + log.info("Update tiered storage metadata, topic {}, reserveTime {}", topic, reserveTime); + TieredMessageStore tieredMessageStore = (TieredMessageStore) brokerController.getMessageStore(); + MetadataStore metadataStore = tieredMessageStore.getMetadataStore(); + TopicMetadata topicMetadata = metadataStore.getTopic(topic); + if (topicMetadata == null) { + metadataStore.addTopic(topic, reserveTime); + } else if (topicMetadata.getReserveTime() != reserveTime) { + topicMetadata.setReserveTime(reserveTime); + metadataStore.updateTopic(topicMetadata); + } + } + public void updateOrderTopicConfig(final KVTable orderKVTableFromNs) { if (orderKVTableFromNs != null && orderKVTableFromNs.getTable() != null) { diff --git a/common/src/main/java/org/apache/rocketmq/common/TopicAttributes.java b/common/src/main/java/org/apache/rocketmq/common/TopicAttributes.java index 1f26866e5b..c507748c67 100644 --- a/common/src/main/java/org/apache/rocketmq/common/TopicAttributes.java +++ b/common/src/main/java/org/apache/rocketmq/common/TopicAttributes.java @@ -20,6 +20,7 @@ import java.util.HashMap; import java.util.Map; import org.apache.rocketmq.common.attribute.Attribute; import org.apache.rocketmq.common.attribute.EnumAttribute; +import org.apache.rocketmq.common.attribute.LongRangeAttribute; import org.apache.rocketmq.common.attribute.TopicMessageType; import static com.google.common.collect.Sets.newHashSet; @@ -43,6 +44,13 @@ public class TopicAttributes { TopicMessageType.topicMessageTypeSet(), TopicMessageType.NORMAL.getValue() ); + public static final LongRangeAttribute TOPIC_RESERVE_TIME_ATTRIBUTE = new LongRangeAttribute( + "reserve.time", + true, + -1, + Long.MAX_VALUE, + -1 + ); public static final Map<String, Attribute> ALL; @@ -51,5 +59,6 @@ public class TopicAttributes { ALL.put(QUEUE_TYPE_ATTRIBUTE.getName(), QUEUE_TYPE_ATTRIBUTE); ALL.put(CLEANUP_POLICY_ATTRIBUTE.getName(), CLEANUP_POLICY_ATTRIBUTE); ALL.put(TOPIC_MESSAGE_TYPE_ATTRIBUTE.getName(), TOPIC_MESSAGE_TYPE_ATTRIBUTE); + ALL.put(TOPIC_RESERVE_TIME_ATTRIBUTE.getName(), TOPIC_RESERVE_TIME_ATTRIBUTE); } } diff --git a/tieredstore/README.md b/tieredstore/README.md index baeb56acc3..41e7458a2a 100644 --- a/tieredstore/README.md +++ b/tieredstore/README.md @@ -57,8 +57,8 @@ Tiered storage provides some useful metrics, see [RIP-46](https://github.com/apa ## How to contribute -We need community participation to add more backend service providers for tiered storage. [PosixFileSegment](https://github.com/apache/rocketmq/blob/develop/tieredstore/src/main/java/org/apache/rocketmq/tieredstore/provider/posix/PosixFileSegment.java), the implementation provided by default is just an example. People who want to contribute can follow it to implement their own providers, such as S3FileSegment, OSSFileSegment, and MinIOFileSegment. Here are some guidelines: +We need community participation to add more backend service providers for tiered storage. [PosixFileSegment](https://github.com/apache/rocketmq/blob/develop/tieredstore/src/main/java/org/apache/rocketmq/tieredstore/provider/PosixFileSegment.java), the implementation provided by default is just an example. People who want to contribute can follow it to implement their own providers, such as S3FileSegment, OSSFileSegment, and MinIOFileSegment. Here are some guidelines: -1. Extend [TieredFileSegment](https://github.com/apache/rocketmq/blob/develop/tieredstore/src/main/java/org/apache/rocketmq/tieredstore/provider/TieredFileSegment.java) and implement the methods of [TieredStoreProvider](https://github.com/apache/rocketmq/blob/develop/tieredstore/src/main/java/org/apache/rocketmq/tieredstore/provider/TieredStoreProvider.java) interface. +1. Extend [FileSegment](https://github.com/apache/rocketmq/blob/develop/tieredstore/src/main/java/org/apache/rocketmq/tieredstore/provider/FileSegment.java) and implement the methods of [FileSegmentProvider](https://github.com/apache/rocketmq/blob/develop/tieredstore/src/main/java/org/apache/rocketmq/tieredstore/provider/FileSegmentProvider.java) interface. 2. Record metrics where appropriate. See `rocketmq_tiered_store_provider_rpc_latency`, `rocketmq_tiered_store_provider_upload_bytes`, and `rocketmq_tiered_store_provider_download_bytes` 3. No need to maintain your own cache and avoid polluting the page cache. It is already having the read-ahead cache. diff --git a/tieredstore/src/main/java/org/apache/rocketmq/tieredstore/file/FlatFileStore.java b/tieredstore/src/main/java/org/apache/rocketmq/tieredstore/file/FlatFileStore.java index 0d7044a544..f782d099de 100644 --- a/tieredstore/src/main/java/org/apache/rocketmq/tieredstore/file/FlatFileStore.java +++ b/tieredstore/src/main/java/org/apache/rocketmq/tieredstore/file/FlatFileStore.java @@ -60,9 +60,9 @@ public class FlatFileStore { this.flatFileConcurrentMap.clear(); this.recover(); this.executor.commonExecutor.scheduleWithFixedDelay(() -> { - long expiredTimeStamp = System.currentTimeMillis() - - TimeUnit.HOURS.toMillis(storeConfig.getTieredStoreFileReservedTime()); for (FlatMessageFile flatFile : deepCopyFlatFileToList()) { + long expiredTimeStamp = System.currentTimeMillis() - + TimeUnit.HOURS.toMillis(flatFile.getFileReservedHours()); flatFile.destroyExpiredFile(expiredTimeStamp); if (flatFile.consumeQueue.fileSegmentTable.isEmpty()) { this.destroyFile(flatFile.getMessageQueue()); diff --git a/tieredstore/src/main/java/org/apache/rocketmq/tieredstore/file/FlatMessageFile.java b/tieredstore/src/main/java/org/apache/rocketmq/tieredstore/file/FlatMessageFile.java index a214059442..d5675976cb 100644 --- a/tieredstore/src/main/java/org/apache/rocketmq/tieredstore/file/FlatMessageFile.java +++ b/tieredstore/src/main/java/org/apache/rocketmq/tieredstore/file/FlatMessageFile.java @@ -399,4 +399,11 @@ public class FlatMessageFile implements FlatFileInterface { fileLock.unlock(); } } + + public long getFileReservedHours() { + if (topicMetadata.getReserveTime() > 0) { + return topicMetadata.getReserveTime(); + } + return storeConfig.getTieredStoreFileReservedTime(); + } } diff --git a/tieredstore/src/main/java/org/apache/rocketmq/tieredstore/metrics/TieredStoreMetricsManager.java b/tieredstore/src/main/java/org/apache/rocketmq/tieredstore/metrics/TieredStoreMetricsManager.java index e76c86d79b..8b5a9e63c0 100644 --- a/tieredstore/src/main/java/org/apache/rocketmq/tieredstore/metrics/TieredStoreMetricsManager.java +++ b/tieredstore/src/main/java/org/apache/rocketmq/tieredstore/metrics/TieredStoreMetricsManager.java @@ -32,6 +32,7 @@ import java.util.Arrays; import java.util.HashMap; import java.util.List; import java.util.Map; +import java.util.concurrent.TimeUnit; import java.util.function.Supplier; import org.apache.rocketmq.common.Pair; import org.apache.rocketmq.common.message.MessageQueue; @@ -180,7 +181,7 @@ public class TieredStoreMetricsManager { MessageQueue mq = flatFile.getMessageQueue(); long maxOffset = next.getMaxOffsetInQueue(mq.getTopic(), mq.getQueueId()); long maxTimestamp = next.getMessageStoreTimeStamp(mq.getTopic(), mq.getQueueId(), maxOffset - 1); - if (maxTimestamp > 0 && System.currentTimeMillis() - maxTimestamp > (long) storeConfig.getTieredStoreFileReservedTime() * 60 * 60 * 1000) { + if (maxTimestamp > 0 && System.currentTimeMillis() - maxTimestamp > TimeUnit.HOURS.toMillis(flatFile.getFileReservedHours())) { continue; } @@ -209,7 +210,7 @@ public class TieredStoreMetricsManager { MessageQueue mq = flatFile.getMessageQueue(); long maxOffset = next.getMaxOffsetInQueue(mq.getTopic(), mq.getQueueId()); long maxTimestamp = next.getMessageStoreTimeStamp(mq.getTopic(), mq.getQueueId(), maxOffset - 1); - if (maxTimestamp > 0 && System.currentTimeMillis() - maxTimestamp > (long) storeConfig.getTieredStoreFileReservedTime() * 60 * 60 * 1000) { + if (maxTimestamp > 0 && System.currentTimeMillis() - maxTimestamp > TimeUnit.HOURS.toMillis(flatFile.getFileReservedHours())) { continue; }