This is an automated email from the ASF dual-hosted git repository.
lizhimin pushed a commit to branch develop
in repository https://gitbox.apache.org/repos/asf/rocketmq.git
The following commit(s) were added to refs/heads/develop by this push:
new 83c47dddc [ISSUE #5823] Add delete topic in message store interface
(#5824)
83c47dddc is described below
commit 83c47dddc98f7a74b3ef956a85e0343fc8ab6725
Author: lizhimins <[email protected]>
AuthorDate: Thu Jan 5 15:28:46 2023 +0800
[ISSUE #5823] Add delete topic in message store interface (#5824)
Co-authored-by: 斜阳 <[email protected]>
---
.../broker/processor/AdminBrokerProcessor.java | 4 +-
.../apache/rocketmq/store/DefaultMessageStore.java | 79 +++++++++++++++-------
.../org/apache/rocketmq/store/MessageStore.java | 15 +++-
.../store/plugin/AbstractPluginMessageStore.java | 9 ++-
4 files changed, 77 insertions(+), 30 deletions(-)
diff --git
a/broker/src/main/java/org/apache/rocketmq/broker/processor/AdminBrokerProcessor.java
b/broker/src/main/java/org/apache/rocketmq/broker/processor/AdminBrokerProcessor.java
index 24162022c..1723923d3 100644
---
a/broker/src/main/java/org/apache/rocketmq/broker/processor/AdminBrokerProcessor.java
+++
b/broker/src/main/java/org/apache/rocketmq/broker/processor/AdminBrokerProcessor.java
@@ -18,6 +18,7 @@ package org.apache.rocketmq.broker.processor;
import com.alibaba.fastjson.JSON;
import com.alibaba.fastjson.JSONObject;
+import com.google.common.collect.Sets;
import io.netty.channel.Channel;
import io.netty.channel.ChannelHandlerContext;
import java.io.UnsupportedEncodingException;
@@ -524,8 +525,7 @@ public class AdminBrokerProcessor implements
NettyRequestProcessor {
this.brokerController.getTopicQueueMappingManager().delete(requestHeader.getTopic());
this.brokerController.getConsumerOffsetManager().cleanOffsetByTopic(requestHeader.getTopic());
this.brokerController.getPopInflightMessageCounter().clearInFlightMessageNumByTopicName(requestHeader.getTopic());
- this.brokerController.getMessageStore()
-
.cleanUnusedTopic(this.brokerController.getTopicConfigManager().getTopicConfigTable().keySet());
+
this.brokerController.getMessageStore().deleteTopics(Sets.newHashSet(requestHeader.getTopic()));
if (this.brokerController.getBrokerConfig().isAutoDeleteUnusedStats())
{
this.brokerController.getBrokerStatsManager().onTopicDeleted(requestHeader.getTopic());
}
diff --git
a/store/src/main/java/org/apache/rocketmq/store/DefaultMessageStore.java
b/store/src/main/java/org/apache/rocketmq/store/DefaultMessageStore.java
index 3cf8efdfa..2b829637a 100644
--- a/store/src/main/java/org/apache/rocketmq/store/DefaultMessageStore.java
+++ b/store/src/main/java/org/apache/rocketmq/store/DefaultMessageStore.java
@@ -16,6 +16,7 @@
*/
package org.apache.rocketmq.store;
+import com.google.common.collect.Sets;
import com.google.common.hash.Hashing;
import io.openmessaging.storage.dledger.entry.DLedgerEntry;
import io.opentelemetry.api.common.AttributesBuilder;
@@ -35,11 +36,9 @@ import java.util.ArrayList;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
-import java.util.Iterator;
import java.util.LinkedList;
import java.util.List;
import java.util.Map;
-import java.util.Map.Entry;
import java.util.Objects;
import java.util.Optional;
import java.util.Set;
@@ -1295,35 +1294,69 @@ public class DefaultMessageStore implements
MessageStore {
return this.systemClock.now();
}
+ /**
+ * Lazy clean queue offset table.
+ * If offset table is cleaned, and old messages are dispatching after the
old consume queue is cleaned,
+ * consume queue will be created with old offset, then later message with
new offset table can not be
+ * dispatched to consume queue.
+ */
@Override
- public int cleanUnusedTopic(Set<String> topics) {
- Iterator<Entry<String, ConcurrentMap<Integer, ConsumeQueueInterface>>>
it = this.getConsumeQueueTable().entrySet().iterator();
- while (it.hasNext()) {
- Entry<String, ConcurrentMap<Integer, ConsumeQueueInterface>> next
= it.next();
- String topic = next.getKey();
+ public int deleteTopics(final Set<String> deleteTopics) {
+ if (deleteTopics == null || deleteTopics.isEmpty()) {
+ return 0;
+ }
- if (!topics.contains(topic) &&
!TopicValidator.isSystemTopic(topic) && !MixAll.isLmq(topic)) {
- ConcurrentMap<Integer, ConsumeQueueInterface> queueTable =
next.getValue();
- for (ConsumeQueueInterface cq : queueTable.values()) {
- this.consumeQueueStore.destroy(cq);
- LOGGER.info("cleanUnusedTopic: {} {} ConsumeQueue cleaned",
- cq.getTopic(),
- cq.getQueueId()
- );
+ int deleteCount = 0;
+ for (String topic : deleteTopics) {
+ ConcurrentMap<Integer, ConsumeQueueInterface> queueTable =
+ this.consumeQueueStore.getConsumeQueueTable().get(topic);
-
this.consumeQueueStore.removeTopicQueueTable(cq.getTopic(), cq.getQueueId());
- }
- it.remove();
+ if (queueTable == null || queueTable.isEmpty()) {
+ continue;
+ }
- if (this.brokerConfig.isAutoDeleteUnusedStats()) {
- this.brokerStatsManager.onTopicDeleted(topic);
- }
+ for (ConsumeQueueInterface cq : queueTable.values()) {
+ this.consumeQueueStore.destroy(cq);
+ LOGGER.info("DeleteTopic: ConsumeQueue has been cleaned,
topic={}, queueId={}",
+ cq.getTopic(), cq.getQueueId());
+ this.consumeQueueStore.removeTopicQueueTable(cq.getTopic(),
cq.getQueueId());
+ }
- LOGGER.info("cleanUnusedTopic: {},topic destroyed", topic);
+ if (this.brokerConfig.isAutoDeleteUnusedStats()) {
+ this.brokerStatsManager.onTopicDeleted(topic);
}
+
+ // destroy consume queue dir
+ String consumeQueueDir =
StorePathConfigHelper.getStorePathConsumeQueue(
+ this.messageStoreConfig.getStorePathRootDir()) +
File.separator + topic;
+ String consumeQueueExtDir =
StorePathConfigHelper.getStorePathConsumeQueue(
+ this.messageStoreConfig.getStorePathRootDir()) +
File.separator + topic;
+ String batchConsumeQueueDir =
StorePathConfigHelper.getStorePathBatchConsumeQueue(
+ this.messageStoreConfig.getStorePathRootDir()) +
File.separator + topic;
+
+ UtilAll.deleteEmptyDirectory(new File(consumeQueueDir));
+ UtilAll.deleteEmptyDirectory(new File(consumeQueueExtDir));
+ UtilAll.deleteEmptyDirectory(new File(batchConsumeQueueDir));
+
+ LOGGER.info("DeleteTopic: Topic has been destroyed, topic={}",
topic);
+ deleteCount++;
}
+ return deleteCount;
+ }
- return 0;
+ @Override
+ public int cleanUnusedTopic(final Set<String> retainTopics) {
+ Set<String> consumeQueueTopicSet =
this.getConsumeQueueTable().keySet();
+ int deleteCount = 0;
+ for (String topicName : Sets.difference(retainTopics,
consumeQueueTopicSet)) {
+ if (retainTopics.contains(topicName) ||
+ TopicValidator.isSystemTopic(topicName) ||
+ MixAll.isLmq(topicName)) {
+ continue;
+ }
+ deleteCount += this.deleteTopics(Sets.newHashSet(topicName));
+ }
+ return deleteCount;
}
@Override
diff --git a/store/src/main/java/org/apache/rocketmq/store/MessageStore.java
b/store/src/main/java/org/apache/rocketmq/store/MessageStore.java
index bb596c844..bbf2056cc 100644
--- a/store/src/main/java/org/apache/rocketmq/store/MessageStore.java
+++ b/store/src/main/java/org/apache/rocketmq/store/MessageStore.java
@@ -436,12 +436,21 @@ public interface MessageStore {
long now();
/**
- * Clean unused topics.
+ * Delete topic's consume queue file and unused stats.
+ * This interface allows user delete system topic.
*
- * @param topics all valid topics.
+ * @param deleteTopics unused topic name set
+ * @return the number of the topics which has been deleted.
+ */
+ int deleteTopics(final Set<String> deleteTopics);
+
+ /**
+ * Clean unused topics which not in retain topic name set.
+ *
+ * @param retainTopics all valid topics.
* @return number of the topics deleted.
*/
- int cleanUnusedTopic(final Set<String> topics);
+ int cleanUnusedTopic(final Set<String> retainTopics);
/**
* Clean expired consume queues.
diff --git
a/store/src/main/java/org/apache/rocketmq/store/plugin/AbstractPluginMessageStore.java
b/store/src/main/java/org/apache/rocketmq/store/plugin/AbstractPluginMessageStore.java
index db752919b..47416a873 100644
---
a/store/src/main/java/org/apache/rocketmq/store/plugin/AbstractPluginMessageStore.java
+++
b/store/src/main/java/org/apache/rocketmq/store/plugin/AbstractPluginMessageStore.java
@@ -257,8 +257,13 @@ public abstract class AbstractPluginMessageStore
implements MessageStore {
}
@Override
- public int cleanUnusedTopic(Set<String> topics) {
- return next.cleanUnusedTopic(topics);
+ public int deleteTopics(final Set<String> deleteTopics) {
+ return next.deleteTopics(deleteTopics);
+ }
+
+ @Override
+ public int cleanUnusedTopic(final Set<String> retainTopics) {
+ return next.cleanUnusedTopic(retainTopics);
}
@Override