This is an automated email from the ASF dual-hosted git repository.

ltamber pushed a commit to branch develop
in repository https://gitbox.apache.org/repos/asf/rocketmq.git


The following commit(s) were added to refs/heads/develop by this push:
     new 3ae0139aa0 [ISSUE #8968] Introduce the clearRetryTopicWhenDeleteTopic 
option to enable precise external deletion of topics (#8969)
3ae0139aa0 is described below

commit 3ae0139aa0cd75fe52d583e69f0c974d5ce2639c
Author: rongtong <jinrongton...@mails.ucas.ac.cn>
AuthorDate: Wed Nov 27 16:19:06 2024 +0800

    [ISSUE #8968] Introduce the clearRetryTopicWhenDeleteTopic option to enable 
precise external deletion of topics (#8969)
    
    * Add the clearRetryTopicWhenDeleteTopic option to allow precise deletion 
of topics externally without the need to traverse consumerOffset
    
    * Fix check style
---
 .../apache/rocketmq/broker/BrokerController.java   |  7 ++-
 .../broker/processor/AdminBrokerProcessor.java     | 71 ++++++++++++----------
 .../org/apache/rocketmq/common/BrokerConfig.java   |  9 +++
 3 files changed, 55 insertions(+), 32 deletions(-)

diff --git 
a/broker/src/main/java/org/apache/rocketmq/broker/BrokerController.java 
b/broker/src/main/java/org/apache/rocketmq/broker/BrokerController.java
index b907489bbf..99e5b85d2e 100644
--- a/broker/src/main/java/org/apache/rocketmq/broker/BrokerController.java
+++ b/broker/src/main/java/org/apache/rocketmq/broker/BrokerController.java
@@ -191,7 +191,7 @@ public class BrokerController {
     private final NettyClientConfig nettyClientConfig;
     protected final MessageStoreConfig messageStoreConfig;
     private final AuthConfig authConfig;
-    protected final ConsumerOffsetManager consumerOffsetManager;
+    protected ConsumerOffsetManager consumerOffsetManager;
     protected final BroadcastOffsetManager broadcastOffsetManager;
     protected final ConsumerManager consumerManager;
     protected final ConsumerFilterManager consumerFilterManager;
@@ -1313,6 +1313,11 @@ public class BrokerController {
         return consumerOffsetManager;
     }
 
+    public void setConsumerOffsetManager(ConsumerOffsetManager 
consumerOffsetManager) {
+        this.consumerOffsetManager = consumerOffsetManager;
+    }
+
+
     public BroadcastOffsetManager getBroadcastOffsetManager() {
         return broadcastOffsetManager;
     }
diff --git 
a/broker/src/main/java/org/apache/rocketmq/broker/processor/AdminBrokerProcessor.java
 
b/broker/src/main/java/org/apache/rocketmq/broker/processor/AdminBrokerProcessor.java
index ac882e94ab..cc70e69a46 100644
--- 
a/broker/src/main/java/org/apache/rocketmq/broker/processor/AdminBrokerProcessor.java
+++ 
b/broker/src/main/java/org/apache/rocketmq/broker/processor/AdminBrokerProcessor.java
@@ -490,6 +490,7 @@ public class AdminBrokerProcessor implements 
NettyRequestProcessor {
         response.setBody(JSON.toJSONBytes(result));
         return response;
     }
+
     @Override
     public boolean rejectRequest() {
         return false;
@@ -559,18 +560,17 @@ public class AdminBrokerProcessor implements 
NettyRequestProcessor {
             response.setCode(ResponseCode.SYSTEM_ERROR);
             response.setRemark(e.getMessage());
             return response;
-        }
-        finally {
+        } finally {
             executionTime = System.currentTimeMillis() - startTime;
             InvocationStatus status = response.getCode() == 
ResponseCode.SUCCESS ?
-                    InvocationStatus.SUCCESS : InvocationStatus.FAILURE;
+                InvocationStatus.SUCCESS : InvocationStatus.FAILURE;
             Attributes attributes = BrokerMetricsManager.newAttributesBuilder()
-                    .put(LABEL_INVOCATION_STATUS, status.getName())
-                    .put(LABEL_IS_SYSTEM, TopicValidator.isSystemTopic(topic))
-                    .build();
+                .put(LABEL_INVOCATION_STATUS, status.getName())
+                .put(LABEL_IS_SYSTEM, TopicValidator.isSystemTopic(topic))
+                .build();
             BrokerMetricsManager.topicCreateExecuteTime.record(executionTime, 
attributes);
         }
-        LOGGER.info("executionTime of create topic:{} is {} ms" , topic, 
executionTime);
+        LOGGER.info("executionTime of create topic:{} is {} ms", topic, 
executionTime);
         return response;
     }
 
@@ -637,8 +637,7 @@ public class AdminBrokerProcessor implements 
NettyRequestProcessor {
             response.setCode(ResponseCode.SYSTEM_ERROR);
             response.setRemark(e.getMessage());
             return response;
-        }
-        finally {
+        } finally {
             executionTime = System.currentTimeMillis() - startTime;
             InvocationStatus status = response.getCode() == 
ResponseCode.SUCCESS ?
                 InvocationStatus.SUCCESS : InvocationStatus.FAILURE;
@@ -648,7 +647,7 @@ public class AdminBrokerProcessor implements 
NettyRequestProcessor {
                 .build();
             BrokerMetricsManager.topicCreateExecuteTime.record(executionTime, 
attributes);
         }
-        LOGGER.info("executionTime of all topics:{} is {} ms" , topicNames, 
executionTime);
+        LOGGER.info("executionTime of all topics:{} is {} ms", topicNames, 
executionTime);
         return response;
     }
 
@@ -725,21 +724,28 @@ public class AdminBrokerProcessor implements 
NettyRequestProcessor {
             }
         }
 
-        final Set<String> groups = 
this.brokerController.getConsumerOffsetManager().whichGroupByTopic(topic);
-        // delete pop retry topics first
-        try {
+        List<String> topicsToClean = new ArrayList<>();
+        topicsToClean.add(topic);
+
+        if 
(brokerController.getBrokerConfig().isClearRetryTopicWhenDeleteTopic()) {
+            final Set<String> groups = 
this.brokerController.getConsumerOffsetManager().whichGroupByTopic(topic);
             for (String group : groups) {
                 final String popRetryTopicV2 = 
KeyBuilder.buildPopRetryTopic(topic, group, true);
                 if 
(brokerController.getTopicConfigManager().selectTopicConfig(popRetryTopicV2) != 
null) {
-                    deleteTopicInBroker(popRetryTopicV2);
+                    topicsToClean.add(popRetryTopicV2);
                 }
                 final String popRetryTopicV1 = 
KeyBuilder.buildPopRetryTopicV1(topic, group);
                 if 
(brokerController.getTopicConfigManager().selectTopicConfig(popRetryTopicV1) != 
null) {
-                    deleteTopicInBroker(popRetryTopicV1);
+                    topicsToClean.add(popRetryTopicV1);
                 }
             }
-            // delete topic
-            deleteTopicInBroker(topic);
+        }
+
+        try {
+            for (String topicToClean : topicsToClean) {
+                // delete topic
+                deleteTopicInBroker(topicToClean);
+            }
         } catch (Throwable t) {
             return buildErrorResponse(ResponseCode.SYSTEM_ERROR, 
t.getMessage());
         }
@@ -982,10 +988,10 @@ public class AdminBrokerProcessor implements 
NettyRequestProcessor {
                             String consumerGroup = String.valueOf(key);
                             Long threshold = 
Long.valueOf(String.valueOf(value));
                             this.brokerController.getColdDataCgCtrService()
-                                    .addOrUpdateGroupConfig(consumerGroup, 
threshold);
+                                .addOrUpdateGroupConfig(consumerGroup, 
threshold);
                         } catch (Exception e) {
                             LOGGER.error("updateColdDataFlowCtrGroupConfig 
properties on entry error, key: {}, val: {}",
-                                    key, value, e);
+                                key, value, e);
                         }
                     });
                 } else {
@@ -1598,12 +1604,12 @@ public class AdminBrokerProcessor implements 
NettyRequestProcessor {
         response.setCode(ResponseCode.SUCCESS);
         response.setRemark(null);
         long executionTime = System.currentTimeMillis() - startTime;
-        LOGGER.info("executionTime of create subscriptionGroup:{} is {} ms" 
,config.getGroupName() ,executionTime);
+        LOGGER.info("executionTime of create subscriptionGroup:{} is {} ms", 
config.getGroupName(), executionTime);
         InvocationStatus status = response.getCode() == ResponseCode.SUCCESS ?
-                InvocationStatus.SUCCESS : InvocationStatus.FAILURE;
+            InvocationStatus.SUCCESS : InvocationStatus.FAILURE;
         Attributes attributes = BrokerMetricsManager.newAttributesBuilder()
-                .put(LABEL_INVOCATION_STATUS, status.getName())
-                .build();
+            .put(LABEL_INVOCATION_STATUS, status.getName())
+            .build();
         
BrokerMetricsManager.consumerGroupCreateExecuteTime.record(executionTime, 
attributes);
         return response;
     }
@@ -2083,13 +2089,13 @@ public class AdminBrokerProcessor implements 
NettyRequestProcessor {
     /**
      * Reset consumer offset.
      *
-     * @param topic     Required, not null.
-     * @param group     Required, not null.
-     * @param queueId   if target queue ID is negative, all message queues 
will be reset; otherwise, only the target queue
-     *                  would get reset.
+     * @param topic Required, not null.
+     * @param group Required, not null.
+     * @param queueId if target queue ID is negative, all message queues will 
be reset; otherwise, only the target queue
+     * would get reset.
      * @param timestamp if timestamp is negative, offset would be reset to 
broker offset at the time being; otherwise,
-     *                  binary search is performed to locate target offset.
-     * @param offset    Target offset to reset to if target queue ID is 
properly provided.
+     * binary search is performed to locate target offset.
+     * @param offset Target offset to reset to if target queue ID is properly 
provided.
      * @return Affected queues and their new offset
      */
     private RemotingCommand resetOffsetInner(String topic, String group, int 
queueId, long timestamp, Long offset) {
@@ -3371,7 +3377,8 @@ public class AdminBrokerProcessor implements 
NettyRequestProcessor {
         return false;
     }
 
-    private CheckRocksdbCqWriteResult 
doCheckRocksdbCqWriteProgress(ChannelHandlerContext ctx, RemotingCommand 
request) throws RemotingCommandException {
+    private CheckRocksdbCqWriteResult 
doCheckRocksdbCqWriteProgress(ChannelHandlerContext ctx,
+        RemotingCommand request) throws RemotingCommandException {
         CheckRocksdbCqWriteProgressRequestHeader requestHeader = 
request.decodeCommandCustomHeader(CheckRocksdbCqWriteProgressRequestHeader.class);
         String requestTopic = requestHeader.getTopic();
         MessageStore messageStore = brokerController.getMessageStore();
@@ -3428,7 +3435,9 @@ public class AdminBrokerProcessor implements 
NettyRequestProcessor {
         return result;
     }
 
-    private boolean processConsumeQueuesForTopic(ConcurrentMap<Integer, 
ConsumeQueueInterface> queueMap, String topic, RocksDBMessageStore 
rocksDBMessageStore, StringBuilder diffResult, boolean printDetail, long 
checkpointByStoreTime) {
+    private boolean processConsumeQueuesForTopic(ConcurrentMap<Integer, 
ConsumeQueueInterface> queueMap, String topic,
+        RocksDBMessageStore rocksDBMessageStore, StringBuilder diffResult, 
boolean printDetail,
+        long checkpointByStoreTime) {
         boolean processResult = true;
         for (Map.Entry<Integer, ConsumeQueueInterface> queueEntry : 
queueMap.entrySet()) {
             Integer queueId = queueEntry.getKey();
diff --git a/common/src/main/java/org/apache/rocketmq/common/BrokerConfig.java 
b/common/src/main/java/org/apache/rocketmq/common/BrokerConfig.java
index 9d8d913521..c0b557dfa1 100644
--- a/common/src/main/java/org/apache/rocketmq/common/BrokerConfig.java
+++ b/common/src/main/java/org/apache/rocketmq/common/BrokerConfig.java
@@ -435,6 +435,7 @@ public class BrokerConfig extends BrokerIdentity {
 
     private boolean appendCkAsync = false;
 
+    private boolean clearRetryTopicWhenDeleteTopic = true;
 
     private boolean enableLmqStats = false;
 
@@ -1908,6 +1909,14 @@ public class BrokerConfig extends BrokerIdentity {
         this.appendCkAsync = appendCkAsync;
     }
 
+    public boolean isClearRetryTopicWhenDeleteTopic() {
+        return clearRetryTopicWhenDeleteTopic;
+    }
+
+    public void setClearRetryTopicWhenDeleteTopic(boolean 
clearRetryTopicWhenDeleteTopic) {
+        this.clearRetryTopicWhenDeleteTopic = clearRetryTopicWhenDeleteTopic;
+    }
+
     public boolean isEnableLmqStats() {
         return enableLmqStats;
     }

Reply via email to