This is an automated email from the ASF dual-hosted git repository.

lizhimin pushed a commit to branch develop
in repository https://gitbox.apache.org/repos/asf/rocketmq.git


The following commit(s) were added to refs/heads/develop by this push:
     new 1b42515093 [ISSUE #8129] Support topic reserved time in tiered storage 
(#8130)
1b42515093 is described below

commit 1b42515093fb56a2cabfa754564397e343a357be
Author: yuz10 <845238...@qq.com>
AuthorDate: Wed May 22 14:09:00 2024 +0800

    [ISSUE #8129] Support topic reserved time in tiered storage (#8130)
    
    Co-authored-by: yuzhou <yuzh...@huawei.com>
---
 broker/BUILD.bazel                                 |  2 ++
 .../rocketmq/broker/topic/TopicConfigManager.java  | 31 ++++++++++++++++++++++
 .../apache/rocketmq/common/TopicAttributes.java    |  9 +++++++
 tieredstore/README.md                              |  4 +--
 .../rocketmq/tieredstore/file/FlatFileStore.java   |  4 +--
 .../rocketmq/tieredstore/file/FlatMessageFile.java |  7 +++++
 .../metrics/TieredStoreMetricsManager.java         |  5 ++--
 7 files changed, 56 insertions(+), 6 deletions(-)

diff --git a/broker/BUILD.bazel b/broker/BUILD.bazel
index 785b765774..0dbc85f945 100644
--- a/broker/BUILD.bazel
+++ b/broker/BUILD.bazel
@@ -29,6 +29,7 @@ java_library(
         "//remoting",
         "//srvutil",
         "//store",
+        "//tieredstore",
         "@maven//:ch_qos_logback_logback_classic",
         "@maven//:com_alibaba_fastjson",
         "@maven//:com_alibaba_fastjson2_fastjson2",
@@ -81,6 +82,7 @@ java_library(
         "//filter",
         "//remoting",
         "//store",
+        "//tieredstore",
         "@maven//:com_alibaba_fastjson",
         "@maven//:com_alibaba_fastjson2_fastjson2",
         "@maven//:com_google_guava_guava",
diff --git 
a/broker/src/main/java/org/apache/rocketmq/broker/topic/TopicConfigManager.java 
b/broker/src/main/java/org/apache/rocketmq/broker/topic/TopicConfigManager.java
index 511d29e12a..1ed9cbab5f 100644
--- 
a/broker/src/main/java/org/apache/rocketmq/broker/topic/TopicConfigManager.java
+++ 
b/broker/src/main/java/org/apache/rocketmq/broker/topic/TopicConfigManager.java
@@ -51,6 +51,9 @@ import org.apache.rocketmq.remoting.protocol.body.KVTable;
 import 
org.apache.rocketmq.remoting.protocol.body.TopicConfigAndMappingSerializeWrapper;
 import org.apache.rocketmq.remoting.protocol.body.TopicConfigSerializeWrapper;
 import org.apache.rocketmq.remoting.protocol.statictopic.TopicQueueMappingInfo;
+import org.apache.rocketmq.tieredstore.TieredMessageStore;
+import org.apache.rocketmq.tieredstore.metadata.MetadataStore;
+import org.apache.rocketmq.tieredstore.metadata.entity.TopicMetadata;
 
 import static com.google.common.base.Preconditions.checkNotNull;
 
@@ -501,6 +504,7 @@ public class TopicConfigManager extends ConfigManager {
             ImmutableMap.copyOf(newAttributes));
 
         topicConfig.setAttributes(finalAttributes);
+        updateTieredStoreTopicMetadata(topicConfig, newAttributes);
 
         TopicConfig old = putTopicConfig(topicConfig);
         if (old != null) {
@@ -515,6 +519,33 @@ public class TopicConfigManager extends ConfigManager {
         this.persist(topicConfig.getTopicName(), topicConfig);
     }
 
+    private synchronized void updateTieredStoreTopicMetadata(final TopicConfig 
topicConfig, Map<String, String> newAttributes) {
+        if (!(brokerController.getMessageStore() instanceof 
TieredMessageStore)) {
+            if 
(newAttributes.get(TopicAttributes.TOPIC_RESERVE_TIME_ATTRIBUTE.getName()) != 
null) {
+                throw new IllegalArgumentException("Update topic reserveTime 
not supported");
+            }
+            return;
+        }
+
+        String topic = topicConfig.getTopicName();
+        long reserveTime = 
TopicAttributes.TOPIC_RESERVE_TIME_ATTRIBUTE.getDefaultValue();
+        String attr = 
topicConfig.getAttributes().get(TopicAttributes.TOPIC_RESERVE_TIME_ATTRIBUTE.getName());
+        if (attr != null) {
+            reserveTime = Long.parseLong(attr);
+        }
+
+        log.info("Update tiered storage metadata, topic {}, reserveTime {}", 
topic, reserveTime);
+        TieredMessageStore tieredMessageStore = (TieredMessageStore) 
brokerController.getMessageStore();
+        MetadataStore metadataStore = tieredMessageStore.getMetadataStore();
+        TopicMetadata topicMetadata = metadataStore.getTopic(topic);
+        if (topicMetadata == null) {
+            metadataStore.addTopic(topic, reserveTime);
+        } else if (topicMetadata.getReserveTime() != reserveTime) {
+            topicMetadata.setReserveTime(reserveTime);
+            metadataStore.updateTopic(topicMetadata);
+        }
+    }
+
     public void updateOrderTopicConfig(final KVTable orderKVTableFromNs) {
 
         if (orderKVTableFromNs != null && orderKVTableFromNs.getTable() != 
null) {
diff --git 
a/common/src/main/java/org/apache/rocketmq/common/TopicAttributes.java 
b/common/src/main/java/org/apache/rocketmq/common/TopicAttributes.java
index 1f26866e5b..c507748c67 100644
--- a/common/src/main/java/org/apache/rocketmq/common/TopicAttributes.java
+++ b/common/src/main/java/org/apache/rocketmq/common/TopicAttributes.java
@@ -20,6 +20,7 @@ import java.util.HashMap;
 import java.util.Map;
 import org.apache.rocketmq.common.attribute.Attribute;
 import org.apache.rocketmq.common.attribute.EnumAttribute;
+import org.apache.rocketmq.common.attribute.LongRangeAttribute;
 import org.apache.rocketmq.common.attribute.TopicMessageType;
 
 import static com.google.common.collect.Sets.newHashSet;
@@ -43,6 +44,13 @@ public class TopicAttributes {
         TopicMessageType.topicMessageTypeSet(),
         TopicMessageType.NORMAL.getValue()
     );
+    public static final LongRangeAttribute TOPIC_RESERVE_TIME_ATTRIBUTE = new 
LongRangeAttribute(
+        "reserve.time",
+        true,
+        -1,
+        Long.MAX_VALUE,
+        -1
+    );
 
     public static final Map<String, Attribute> ALL;
 
@@ -51,5 +59,6 @@ public class TopicAttributes {
         ALL.put(QUEUE_TYPE_ATTRIBUTE.getName(), QUEUE_TYPE_ATTRIBUTE);
         ALL.put(CLEANUP_POLICY_ATTRIBUTE.getName(), CLEANUP_POLICY_ATTRIBUTE);
         ALL.put(TOPIC_MESSAGE_TYPE_ATTRIBUTE.getName(), 
TOPIC_MESSAGE_TYPE_ATTRIBUTE);
+        ALL.put(TOPIC_RESERVE_TIME_ATTRIBUTE.getName(), 
TOPIC_RESERVE_TIME_ATTRIBUTE);
     }
 }
diff --git a/tieredstore/README.md b/tieredstore/README.md
index baeb56acc3..41e7458a2a 100644
--- a/tieredstore/README.md
+++ b/tieredstore/README.md
@@ -57,8 +57,8 @@ Tiered storage provides some useful metrics, see 
[RIP-46](https://github.com/apa
 
 ## How to contribute
 
-We need community participation to add more backend service providers for 
tiered storage. 
[PosixFileSegment](https://github.com/apache/rocketmq/blob/develop/tieredstore/src/main/java/org/apache/rocketmq/tieredstore/provider/posix/PosixFileSegment.java),
 the implementation provided by default is just an example. People who want to 
contribute can follow it to implement their own providers, such as 
S3FileSegment, OSSFileSegment, and MinIOFileSegment. Here are some guidelines:
+We need community participation to add more backend service providers for 
tiered storage. 
[PosixFileSegment](https://github.com/apache/rocketmq/blob/develop/tieredstore/src/main/java/org/apache/rocketmq/tieredstore/provider/PosixFileSegment.java),
 the implementation provided by default is just an example. People who want to 
contribute can follow it to implement their own providers, such as 
S3FileSegment, OSSFileSegment, and MinIOFileSegment. Here are some guidelines:
 
-1. Extend 
[TieredFileSegment](https://github.com/apache/rocketmq/blob/develop/tieredstore/src/main/java/org/apache/rocketmq/tieredstore/provider/TieredFileSegment.java)
 and implement the methods of 
[TieredStoreProvider](https://github.com/apache/rocketmq/blob/develop/tieredstore/src/main/java/org/apache/rocketmq/tieredstore/provider/TieredStoreProvider.java)
 interface.
+1. Extend 
[FileSegment](https://github.com/apache/rocketmq/blob/develop/tieredstore/src/main/java/org/apache/rocketmq/tieredstore/provider/FileSegment.java)
 and implement the methods of 
[FileSegmentProvider](https://github.com/apache/rocketmq/blob/develop/tieredstore/src/main/java/org/apache/rocketmq/tieredstore/provider/FileSegmentProvider.java)
 interface.
 2. Record metrics where appropriate. See 
`rocketmq_tiered_store_provider_rpc_latency`, 
`rocketmq_tiered_store_provider_upload_bytes`, and 
`rocketmq_tiered_store_provider_download_bytes`
 3. No need to maintain your own cache and avoid polluting the page cache. It 
is already having the read-ahead cache.
diff --git 
a/tieredstore/src/main/java/org/apache/rocketmq/tieredstore/file/FlatFileStore.java
 
b/tieredstore/src/main/java/org/apache/rocketmq/tieredstore/file/FlatFileStore.java
index 0d7044a544..f782d099de 100644
--- 
a/tieredstore/src/main/java/org/apache/rocketmq/tieredstore/file/FlatFileStore.java
+++ 
b/tieredstore/src/main/java/org/apache/rocketmq/tieredstore/file/FlatFileStore.java
@@ -60,9 +60,9 @@ public class FlatFileStore {
             this.flatFileConcurrentMap.clear();
             this.recover();
             this.executor.commonExecutor.scheduleWithFixedDelay(() -> {
-                long expiredTimeStamp = System.currentTimeMillis() -
-                    
TimeUnit.HOURS.toMillis(storeConfig.getTieredStoreFileReservedTime());
                 for (FlatMessageFile flatFile : deepCopyFlatFileToList()) {
+                    long expiredTimeStamp = System.currentTimeMillis() -
+                        
TimeUnit.HOURS.toMillis(flatFile.getFileReservedHours());
                     flatFile.destroyExpiredFile(expiredTimeStamp);
                     if (flatFile.consumeQueue.fileSegmentTable.isEmpty()) {
                         this.destroyFile(flatFile.getMessageQueue());
diff --git 
a/tieredstore/src/main/java/org/apache/rocketmq/tieredstore/file/FlatMessageFile.java
 
b/tieredstore/src/main/java/org/apache/rocketmq/tieredstore/file/FlatMessageFile.java
index a214059442..d5675976cb 100644
--- 
a/tieredstore/src/main/java/org/apache/rocketmq/tieredstore/file/FlatMessageFile.java
+++ 
b/tieredstore/src/main/java/org/apache/rocketmq/tieredstore/file/FlatMessageFile.java
@@ -399,4 +399,11 @@ public class FlatMessageFile implements FlatFileInterface {
             fileLock.unlock();
         }
     }
+
+    public long getFileReservedHours() {
+        if (topicMetadata.getReserveTime() > 0) {
+            return topicMetadata.getReserveTime();
+        }
+        return storeConfig.getTieredStoreFileReservedTime();
+    }
 }
diff --git 
a/tieredstore/src/main/java/org/apache/rocketmq/tieredstore/metrics/TieredStoreMetricsManager.java
 
b/tieredstore/src/main/java/org/apache/rocketmq/tieredstore/metrics/TieredStoreMetricsManager.java
index e76c86d79b..8b5a9e63c0 100644
--- 
a/tieredstore/src/main/java/org/apache/rocketmq/tieredstore/metrics/TieredStoreMetricsManager.java
+++ 
b/tieredstore/src/main/java/org/apache/rocketmq/tieredstore/metrics/TieredStoreMetricsManager.java
@@ -32,6 +32,7 @@ import java.util.Arrays;
 import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
+import java.util.concurrent.TimeUnit;
 import java.util.function.Supplier;
 import org.apache.rocketmq.common.Pair;
 import org.apache.rocketmq.common.message.MessageQueue;
@@ -180,7 +181,7 @@ public class TieredStoreMetricsManager {
                     MessageQueue mq = flatFile.getMessageQueue();
                     long maxOffset = next.getMaxOffsetInQueue(mq.getTopic(), 
mq.getQueueId());
                     long maxTimestamp = 
next.getMessageStoreTimeStamp(mq.getTopic(), mq.getQueueId(), maxOffset - 1);
-                    if (maxTimestamp > 0 && System.currentTimeMillis() - 
maxTimestamp > (long) storeConfig.getTieredStoreFileReservedTime() * 60 * 60 * 
1000) {
+                    if (maxTimestamp > 0 && System.currentTimeMillis() - 
maxTimestamp > TimeUnit.HOURS.toMillis(flatFile.getFileReservedHours())) {
                         continue;
                     }
 
@@ -209,7 +210,7 @@ public class TieredStoreMetricsManager {
                     MessageQueue mq = flatFile.getMessageQueue();
                     long maxOffset = next.getMaxOffsetInQueue(mq.getTopic(), 
mq.getQueueId());
                     long maxTimestamp = 
next.getMessageStoreTimeStamp(mq.getTopic(), mq.getQueueId(), maxOffset - 1);
-                    if (maxTimestamp > 0 && System.currentTimeMillis() - 
maxTimestamp > (long) storeConfig.getTieredStoreFileReservedTime() * 60 * 60 * 
1000) {
+                    if (maxTimestamp > 0 && System.currentTimeMillis() - 
maxTimestamp > TimeUnit.HOURS.toMillis(flatFile.getFileReservedHours())) {
                         continue;
                     }
 

Reply via email to