This is an automated email from the ASF dual-hosted git repository.
lizhimin pushed a commit to branch develop
in repository https://gitbox.apache.org/repos/asf/rocketmq.git
The following commit(s) were added to refs/heads/develop by this push:
new 79967c00b2 [ISSUE #6933] Optimize delete topic in tiered storage
(#6973)
79967c00b2 is described below
commit 79967c00b2028acf0a707fe09435848f0acf8e6d
Author: lizhimins <[email protected]>
AuthorDate: Fri Jun 30 15:54:32 2023 +0800
[ISSUE #6933] Optimize delete topic in tiered storage (#6973)
---
.../rocketmq/tieredstore/TieredMessageStore.java | 51 +++++++---------------
.../tieredstore/file/TieredFlatFileManager.java | 7 +++
2 files changed, 23 insertions(+), 35 deletions(-)
diff --git
a/tieredstore/src/main/java/org/apache/rocketmq/tieredstore/TieredMessageStore.java
b/tieredstore/src/main/java/org/apache/rocketmq/tieredstore/TieredMessageStore.java
index f0026cf934..115d9640d6 100644
---
a/tieredstore/src/main/java/org/apache/rocketmq/tieredstore/TieredMessageStore.java
+++
b/tieredstore/src/main/java/org/apache/rocketmq/tieredstore/TieredMessageStore.java
@@ -27,6 +27,7 @@ import java.util.Set;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.TimeUnit;
import java.util.function.Supplier;
+import org.apache.commons.lang3.StringUtils;
import org.apache.rocketmq.common.MixAll;
import org.apache.rocketmq.common.Pair;
import org.apache.rocketmq.common.PopAckConstants;
@@ -50,7 +51,6 @@ import
org.apache.rocketmq.tieredstore.common.TieredStoreExecutor;
import org.apache.rocketmq.tieredstore.file.CompositeFlatFile;
import org.apache.rocketmq.tieredstore.file.TieredFlatFileManager;
import org.apache.rocketmq.tieredstore.metadata.TieredMetadataStore;
-import org.apache.rocketmq.tieredstore.metadata.TopicMetadata;
import org.apache.rocketmq.tieredstore.metrics.TieredStoreMetricsConstant;
import org.apache.rocketmq.tieredstore.metrics.TieredStoreMetricsManager;
import org.apache.rocketmq.tieredstore.util.TieredStoreUtil;
@@ -394,12 +394,7 @@ public class TieredMessageStore extends
AbstractPluginMessageStore {
MixAll.isLmq(topic)) {
return;
}
- logger.info("TieredMessageStore#cleanUnusedTopic: start
deleting topic {}", topic);
- try {
- destroyCompositeFlatFile(topicMetadata);
- } catch (Exception e) {
- logger.error("TieredMessageStore#cleanUnusedTopic: delete
topic {} failed", topic, e);
- }
+ this.destroyCompositeFlatFile(topicMetadata.getTopic());
});
} catch (Exception e) {
logger.error("TieredMessageStore#cleanUnusedTopic: iterate topic
metadata failed", e);
@@ -410,38 +405,24 @@ public class TieredMessageStore extends
AbstractPluginMessageStore {
@Override
public int deleteTopics(Set<String> deleteTopics) {
for (String topic : deleteTopics) {
- logger.info("TieredMessageStore#deleteTopics: start deleting topic
{}", topic);
- try {
- TopicMetadata topicMetadata = metadataStore.getTopic(topic);
- if (topicMetadata != null) {
- destroyCompositeFlatFile(topicMetadata);
- } else {
- logger.error("TieredMessageStore#deleteTopics: delete
topic {} failed, can not obtain metadata", topic);
- }
- } catch (Exception e) {
- logger.error("TieredMessageStore#deleteTopics: delete topic {}
failed", topic, e);
- }
+ this.destroyCompositeFlatFile(topic);
}
-
return next.deleteTopics(deleteTopics);
}
- public void destroyCompositeFlatFile(TopicMetadata topicMetadata) {
- String topic = topicMetadata.getTopic();
- metadataStore.iterateQueue(topic, queueMetadata -> {
- MessageQueue mq = queueMetadata.getQueue();
- CompositeFlatFile flatFile = flatFileManager.getFlatFile(mq);
- if (flatFile != null) {
- flatFileManager.destroyCompositeFile(mq);
- try {
- metadataStore.deleteQueue(mq);
- } catch (Exception e) {
- throw new IllegalStateException(e);
- }
- logger.info("TieredMessageStore#destroyCompositeFlatFile: " +
- "destroy flatFile success: topic: {}, queueId: {}",
mq.getTopic(), mq.getQueueId());
+ public void destroyCompositeFlatFile(String topic) {
+ try {
+ if (StringUtils.isBlank(topic)) {
+ return;
}
- });
- metadataStore.deleteTopic(topicMetadata.getTopic());
+ metadataStore.iterateQueue(topic, queueMetadata -> {
+ flatFileManager.destroyCompositeFile(queueMetadata.getQueue());
+ });
+ // delete topic metadata
+ metadataStore.deleteTopic(topic);
+ logger.info("Destroy composite flat file in message store,
topic={}", topic);
+ } catch (Exception e) {
+ logger.error("Destroy composite flat file in message store failed,
topic={}", topic, e);
+ }
}
}
diff --git
a/tieredstore/src/main/java/org/apache/rocketmq/tieredstore/file/TieredFlatFileManager.java
b/tieredstore/src/main/java/org/apache/rocketmq/tieredstore/file/TieredFlatFileManager.java
index 1a2f65c00c..5fe511f689 100644
---
a/tieredstore/src/main/java/org/apache/rocketmq/tieredstore/file/TieredFlatFileManager.java
+++
b/tieredstore/src/main/java/org/apache/rocketmq/tieredstore/file/TieredFlatFileManager.java
@@ -265,12 +265,19 @@ public class TieredFlatFileManager {
}
public void destroyCompositeFile(MessageQueue mq) {
+ if (mq == null) {
+ return;
+ }
+
+ // delete memory reference
CompositeQueueFlatFile flatFile = queueFlatFileMap.remove(mq);
if (flatFile != null) {
MessageQueue messageQueue = flatFile.getMessageQueue();
logger.info("TieredFlatFileManager#destroyCompositeFile: " +
"try to destroy composite flat file: topic: {}, queueId:
{}",
messageQueue.getTopic(), messageQueue.getQueueId());
+
+ // delete queue metadata
flatFile.destroy();
}
}