This is an automated email from the ASF dual-hosted git repository.

fuyou pushed a commit to branch enh-9692-1
in repository https://gitbox.apache.org/repos/asf/rocketmq.git

commit bc88d8a7c831f3fc48cc9746a5913a6ae9e3c367
Author: fuchong <[email protected]>
AuthorDate: Fri Sep 12 14:18:57 2025 +0800

    to #9692 Support async delete for topics
    
    Signed-off-by: fuchong <[email protected]>
    
    Signed-off-by: fuchong <[email protected]>
    
    Signed-off-by: fuchong <[email protected]>
---
 .../broker/processor/AdminBrokerProcessor.java     | 40 +++++++++++++++++++---
 .../rocketmq/broker/topic/TopicConfigManager.java  | 10 ++++--
 .../broker/topic/TopicQueueMappingManager.java     | 10 ++++--
 .../protocol/header/DeleteTopicRequestHeader.java  |  9 +++++
 .../apache/rocketmq/store/DefaultMessageStore.java |  2 +-
 .../rocketmq/store/stats/BrokerStatsManager.java   |  5 +--
 6 files changed, 64 insertions(+), 12 deletions(-)

diff --git 
a/broker/src/main/java/org/apache/rocketmq/broker/processor/AdminBrokerProcessor.java
 
b/broker/src/main/java/org/apache/rocketmq/broker/processor/AdminBrokerProcessor.java
index 298e239086..df694cad6c 100644
--- 
a/broker/src/main/java/org/apache/rocketmq/broker/processor/AdminBrokerProcessor.java
+++ 
b/broker/src/main/java/org/apache/rocketmq/broker/processor/AdminBrokerProcessor.java
@@ -242,7 +242,9 @@ public class AdminBrokerProcessor implements 
NettyRequestProcessor {
     private static final Logger LOGGER = 
LoggerFactory.getLogger(LoggerName.BROKER_LOGGER_NAME);
     protected final BrokerController brokerController;
     protected Set<String> configBlackList = new HashSet<>();
-    private final ExecutorService asyncExecuteWorker = new 
ThreadPoolExecutor(0, 4, 60L, TimeUnit.SECONDS, new SynchronousQueue<>());
+    private final ExecutorService asyncExecuteWorker = new 
ThreadPoolExecutor(0, 10, 60L, TimeUnit.SECONDS,
+        new SynchronousQueue<>());
+
 
     public AdminBrokerProcessor(final BrokerController brokerController) {
         this.brokerController = brokerController;
@@ -778,6 +780,25 @@ public class AdminBrokerProcessor implements 
NettyRequestProcessor {
             }
         }
 
+        final Boolean syncDelete = requestHeader.getSyncDelete();
+        if (Boolean.TRUE.equals(syncDelete)) {
+            return doDeleteTopic(topic, true);
+        } else {
+            asyncExecuteWorker.execute(() -> {
+                try {
+                    doDeleteTopic(topic, syncDelete);
+                } catch (Exception e) {
+                    LOGGER.error(String.format("delete topic %s failed for ", 
topic), e);
+                }
+            });
+            response.setCode(ResponseCode.SUCCESS);
+            response.setRemark(null);
+            return response;
+        }
+    }
+
+    private RemotingCommand  doDeleteTopic(String topic,boolean isSyncDelete) {
+        final RemotingCommand response = 
RemotingCommand.createResponseCommand(null);
         List<String> topicsToClean = new ArrayList<>();
         topicsToClean.add(topic);
 
@@ -798,7 +819,10 @@ public class AdminBrokerProcessor implements 
NettyRequestProcessor {
         try {
             for (String topicToClean : topicsToClean) {
                 // delete topic
-                deleteTopicInBroker(topicToClean);
+                deleteTopicInBroker(topicToClean,isSyncDelete);
+            }
+            if (!isSyncDelete) {
+                batchSyncMetaData();
             }
         } catch (Throwable t) {
             return buildErrorResponse(ResponseCode.SYSTEM_ERROR, 
t.getMessage());
@@ -808,15 +832,21 @@ public class AdminBrokerProcessor implements 
NettyRequestProcessor {
         return response;
     }
 
-    private void deleteTopicInBroker(String topic) {
-        this.brokerController.getTopicConfigManager().deleteTopicConfig(topic);
-        this.brokerController.getTopicQueueMappingManager().delete(topic);
+    private void deleteTopicInBroker(String topic, boolean isSyncDelete) {
+        this.brokerController.getTopicConfigManager().deleteTopicConfig(topic, 
isSyncDelete);
+        this.brokerController.getTopicQueueMappingManager().delete(topic, 
isSyncDelete);
         
this.brokerController.getConsumerOffsetManager().cleanOffsetByTopic(topic);
         
this.brokerController.getPopInflightMessageCounter().clearInFlightMessageNumByTopicName(topic);
         
this.brokerController.getMessageStore().deleteTopics(Sets.newHashSet(topic));
         
this.brokerController.getMessageStore().getTimerMessageStore().getTimerMetrics().removeTimingCount(topic);
     }
 
+    private void batchSyncMetaData() {
+        this.brokerController.getTopicConfigManager().persist();
+        this.brokerController.getTopicQueueMappingManager().persist();
+        this.brokerController.getConsumerOffsetManager().persist();
+    }
+
     private RemotingCommand getUnknownCmdResponse(ChannelHandlerContext ctx, 
RemotingCommand request) {
         String error = " request type " + request.getCode() + " not supported";
         final RemotingCommand response =
diff --git 
a/broker/src/main/java/org/apache/rocketmq/broker/topic/TopicConfigManager.java 
b/broker/src/main/java/org/apache/rocketmq/broker/topic/TopicConfigManager.java
index ed46dfdc49..46095ce499 100644
--- 
a/broker/src/main/java/org/apache/rocketmq/broker/topic/TopicConfigManager.java
+++ 
b/broker/src/main/java/org/apache/rocketmq/broker/topic/TopicConfigManager.java
@@ -595,17 +595,23 @@ public class TopicConfigManager extends ConfigManager {
         }
     }
 
-    public void deleteTopicConfig(final String topic) {
+    public void deleteTopicConfig(final String topic, boolean isSync) {
         TopicConfig old = removeTopicConfig(topic);
         if (old != null) {
             log.info("delete topic config OK, topic: {}", old);
             updateDataVersion();
-            this.persist();
+            if (isSync) {
+                this.persist();
+            }
         } else {
             log.warn("delete topic config failed, topic: {} not exists", 
topic);
         }
     }
 
+    public void deleteTopicConfig(final String topic) {
+       deleteTopicConfig(topic, true);
+    }
+
     public TopicConfigSerializeWrapper buildTopicConfigSerializeWrapper() {
         TopicConfigSerializeWrapper topicConfigSerializeWrapper = new 
TopicConfigSerializeWrapper();
         topicConfigSerializeWrapper.setTopicConfigTable(this.topicConfigTable);
diff --git 
a/broker/src/main/java/org/apache/rocketmq/broker/topic/TopicQueueMappingManager.java
 
b/broker/src/main/java/org/apache/rocketmq/broker/topic/TopicQueueMappingManager.java
index 4b0714decb..6bd696f08d 100644
--- 
a/broker/src/main/java/org/apache/rocketmq/broker/topic/TopicQueueMappingManager.java
+++ 
b/broker/src/main/java/org/apache/rocketmq/broker/topic/TopicQueueMappingManager.java
@@ -136,17 +136,23 @@ public class TopicQueueMappingManager extends 
ConfigManager {
 
     }
 
-    public void delete(final String topic) {
+    public void delete(final String topic, boolean isSync) {
         TopicQueueMappingDetail old = 
this.topicQueueMappingTable.remove(topic);
         if (old != null) {
             log.info("delete topic queue mapping OK, static topic queue 
mapping: {}", old);
             this.dataVersion.nextVersion();
-            this.persist();
+            if (isSync) {
+                this.persist();
+            }
         } else {
             log.warn("delete topic queue mapping failed, static topic: {} not 
exists", topic);
         }
     }
 
+    public void delete(final String topic) {
+        delete(topic,true);
+    }
+
     public TopicQueueMappingDetail getTopicQueueMapping(String topic) {
         return topicQueueMappingTable.get(topic);
     }
diff --git 
a/remoting/src/main/java/org/apache/rocketmq/remoting/protocol/header/DeleteTopicRequestHeader.java
 
b/remoting/src/main/java/org/apache/rocketmq/remoting/protocol/header/DeleteTopicRequestHeader.java
index ea66ed94c7..4f2738fc57 100644
--- 
a/remoting/src/main/java/org/apache/rocketmq/remoting/protocol/header/DeleteTopicRequestHeader.java
+++ 
b/remoting/src/main/java/org/apache/rocketmq/remoting/protocol/header/DeleteTopicRequestHeader.java
@@ -34,6 +34,7 @@ public class DeleteTopicRequestHeader extends 
TopicRequestHeader {
     @CFNotNull
     @RocketMQResource(ResourceType.TOPIC)
     private String topic;
+    private Boolean syncDelete = true;
 
     @Override
     public void checkFields() throws RemotingCommandException {
@@ -46,4 +47,12 @@ public class DeleteTopicRequestHeader extends 
TopicRequestHeader {
     public void setTopic(String topic) {
         this.topic = topic;
     }
+
+    public Boolean getSyncDelete() {
+        return syncDelete;
+    }
+
+    public void setSyncDelete(Boolean syncDelete) {
+        this.syncDelete = syncDelete;
+    }
 }
diff --git 
a/store/src/main/java/org/apache/rocketmq/store/DefaultMessageStore.java 
b/store/src/main/java/org/apache/rocketmq/store/DefaultMessageStore.java
index 4d13acf225..b736123642 100644
--- a/store/src/main/java/org/apache/rocketmq/store/DefaultMessageStore.java
+++ b/store/src/main/java/org/apache/rocketmq/store/DefaultMessageStore.java
@@ -1374,7 +1374,7 @@ public class DefaultMessageStore implements MessageStore {
      * dispatched to consume queue.
      */
     @Override
-    public int deleteTopics(final Set<String> deleteTopics) {
+    public synchronized int deleteTopics(final Set<String> deleteTopics) {
         if (deleteTopics == null || deleteTopics.isEmpty()) {
             return 0;
         }
diff --git 
a/store/src/main/java/org/apache/rocketmq/store/stats/BrokerStatsManager.java 
b/store/src/main/java/org/apache/rocketmq/store/stats/BrokerStatsManager.java
index c272a30234..27852f84d2 100644
--- 
a/store/src/main/java/org/apache/rocketmq/store/stats/BrokerStatsManager.java
+++ 
b/store/src/main/java/org/apache/rocketmq/store/stats/BrokerStatsManager.java
@@ -16,7 +16,8 @@
  */
 package org.apache.rocketmq.store.stats;
 
-import java.util.HashMap;
+import java.util.Map;
+import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.ScheduledExecutorService;
 import java.util.concurrent.TimeUnit;
 
@@ -140,7 +141,7 @@ public class BrokerStatsManager {
     private ScheduledExecutorService accountExecutor;
     private ScheduledExecutorService cleanResourceExecutor;
 
-    private final HashMap<String, StatsItemSet> statsTable = new HashMap<>();
+    private final Map<String, StatsItemSet> statsTable = new 
ConcurrentHashMap<>();
     private final String clusterName;
     private final boolean enableQueueStat;
     private MomentStatsItemSet momentStatsItemSetFallSize;

Reply via email to