This is an automated email from the ASF dual-hosted git repository.

lizhimin pushed a commit to branch develop
in repository https://gitbox.apache.org/repos/asf/rocketmq.git


The following commit(s) were added to refs/heads/develop by this push:
     new 19393e0475 [ISSUE #8974] Add feature switch of recalling, disable by 
default (#9067)
19393e0475 is described below

commit 19393e047515db3d65e898bb254c1f16d62ffcd3
Author: imzs <i...@foxmail.com>
AuthorDate: Fri Dec 27 13:56:09 2024 +0800

    [ISSUE #8974] Add feature switch of recalling, disable by default (#9067)
---
 .../rocketmq/broker/processor/RecallMessageProcessor.java      |  6 ++++++
 .../rocketmq/broker/processor/RecallMessageProcessorTest.java  |  9 +++++++++
 .../src/main/java/org/apache/rocketmq/common/BrokerConfig.java | 10 ++++++++++
 .../org/apache/rocketmq/test/base/IntegrationTestBase.java     |  1 +
 4 files changed, 26 insertions(+)

diff --git 
a/broker/src/main/java/org/apache/rocketmq/broker/processor/RecallMessageProcessor.java
 
b/broker/src/main/java/org/apache/rocketmq/broker/processor/RecallMessageProcessor.java
index 7a652f4315..372db0d36e 100644
--- 
a/broker/src/main/java/org/apache/rocketmq/broker/processor/RecallMessageProcessor.java
+++ 
b/broker/src/main/java/org/apache/rocketmq/broker/processor/RecallMessageProcessor.java
@@ -57,6 +57,12 @@ public class RecallMessageProcessor implements 
NettyRequestProcessor {
         final RecallMessageRequestHeader requestHeader =
             
request.decodeCommandCustomHeader(RecallMessageRequestHeader.class);
 
+        if (!brokerController.getBrokerConfig().isRecallMessageEnable()) {
+            response.setCode(ResponseCode.NO_PERMISSION);
+            response.setRemark("recall failed, operation is forbidden");
+            return response;
+        }
+
         if (BrokerRole.SLAVE == 
brokerController.getMessageStoreConfig().getBrokerRole()) {
             response.setCode(ResponseCode.SLAVE_NOT_AVAILABLE);
             response.setRemark("recall failed, broker service not available");
diff --git 
a/broker/src/test/java/org/apache/rocketmq/broker/processor/RecallMessageProcessorTest.java
 
b/broker/src/test/java/org/apache/rocketmq/broker/processor/RecallMessageProcessorTest.java
index 7bd260cc2c..d28eb2f1df 100644
--- 
a/broker/src/test/java/org/apache/rocketmq/broker/processor/RecallMessageProcessorTest.java
+++ 
b/broker/src/test/java/org/apache/rocketmq/broker/processor/RecallMessageProcessorTest.java
@@ -89,6 +89,7 @@ public class RecallMessageProcessorTest {
         when(brokerController.getMessageStore()).thenReturn(messageStore);
         when(brokerController.getBrokerConfig()).thenReturn(brokerConfig);
         when(brokerConfig.getBrokerName()).thenReturn(BROKER_NAME);
+        when(brokerConfig.isRecallMessageEnable()).thenReturn(true);
         
when(brokerController.getBrokerStatsManager()).thenReturn(brokerStatsManager);
         when(handlerContext.channel()).thenReturn(channel);
         recallMessageProcessor = new RecallMessageProcessor(brokerController);
@@ -134,6 +135,14 @@ public class RecallMessageProcessorTest {
         }
     }
 
+    @Test
+    public void testProcessRequest_notEnable() throws RemotingCommandException 
{
+        when(brokerConfig.isRecallMessageEnable()).thenReturn(false);
+        RemotingCommand request = mockRequest(0, TOPIC, TOPIC, "id", 
BROKER_NAME);
+        RemotingCommand response = 
recallMessageProcessor.processRequest(handlerContext, request);
+        Assert.assertEquals(ResponseCode.NO_PERMISSION, response.getCode());
+    }
+
     @Test
     public void testProcessRequest_invalidStatus() throws 
RemotingCommandException {
         RemotingCommand request = mockRequest(0, TOPIC, TOPIC, "id", 
BROKER_NAME);
diff --git a/common/src/main/java/org/apache/rocketmq/common/BrokerConfig.java 
b/common/src/main/java/org/apache/rocketmq/common/BrokerConfig.java
index b5dc1899e9..dd34544935 100644
--- a/common/src/main/java/org/apache/rocketmq/common/BrokerConfig.java
+++ b/common/src/main/java/org/apache/rocketmq/common/BrokerConfig.java
@@ -453,6 +453,8 @@ public class BrokerConfig extends BrokerIdentity {
 
     private boolean allowRecallWhenBrokerNotWriteable = true;
 
+    private boolean recallMessageEnable = false;
+
     public String getConfigBlackList() {
         return configBlackList;
     }
@@ -1996,4 +1998,12 @@ public class BrokerConfig extends BrokerIdentity {
     public void setAllowRecallWhenBrokerNotWriteable(boolean 
allowRecallWhenBrokerNotWriteable) {
         this.allowRecallWhenBrokerNotWriteable = 
allowRecallWhenBrokerNotWriteable;
     }
+
+    public boolean isRecallMessageEnable() {
+        return recallMessageEnable;
+    }
+
+    public void setRecallMessageEnable(boolean recallMessageEnable) {
+        this.recallMessageEnable = recallMessageEnable;
+    }
 }
diff --git 
a/test/src/test/java/org/apache/rocketmq/test/base/IntegrationTestBase.java 
b/test/src/test/java/org/apache/rocketmq/test/base/IntegrationTestBase.java
index fde991ad13..287e54d561 100644
--- a/test/src/test/java/org/apache/rocketmq/test/base/IntegrationTestBase.java
+++ b/test/src/test/java/org/apache/rocketmq/test/base/IntegrationTestBase.java
@@ -138,6 +138,7 @@ public class IntegrationTestBase {
         brokerConfig.setEnableCalcFilterBitMap(true);
         brokerConfig.setAppendAckAsync(true);
         brokerConfig.setAppendCkAsync(true);
+        brokerConfig.setRecallMessageEnable(true);
         storeConfig.setEnableConsumeQueueExt(true);
         brokerConfig.setLoadBalancePollNameServerInterval(500);
         storeConfig.setStorePathRootDir(baseDir);

Reply via email to