This is an automated email from the ASF dual-hosted git repository. lizhimin pushed a commit to branch develop in repository https://gitbox.apache.org/repos/asf/rocketmq.git
The following commit(s) were added to refs/heads/develop by this push: new 19393e0475 [ISSUE #8974] Add feature switch of recalling, disable by default (#9067) 19393e0475 is described below commit 19393e047515db3d65e898bb254c1f16d62ffcd3 Author: imzs <i...@foxmail.com> AuthorDate: Fri Dec 27 13:56:09 2024 +0800 [ISSUE #8974] Add feature switch of recalling, disable by default (#9067) --- .../rocketmq/broker/processor/RecallMessageProcessor.java | 6 ++++++ .../rocketmq/broker/processor/RecallMessageProcessorTest.java | 9 +++++++++ .../src/main/java/org/apache/rocketmq/common/BrokerConfig.java | 10 ++++++++++ .../org/apache/rocketmq/test/base/IntegrationTestBase.java | 1 + 4 files changed, 26 insertions(+) diff --git a/broker/src/main/java/org/apache/rocketmq/broker/processor/RecallMessageProcessor.java b/broker/src/main/java/org/apache/rocketmq/broker/processor/RecallMessageProcessor.java index 7a652f4315..372db0d36e 100644 --- a/broker/src/main/java/org/apache/rocketmq/broker/processor/RecallMessageProcessor.java +++ b/broker/src/main/java/org/apache/rocketmq/broker/processor/RecallMessageProcessor.java @@ -57,6 +57,12 @@ public class RecallMessageProcessor implements NettyRequestProcessor { final RecallMessageRequestHeader requestHeader = request.decodeCommandCustomHeader(RecallMessageRequestHeader.class); + if (!brokerController.getBrokerConfig().isRecallMessageEnable()) { + response.setCode(ResponseCode.NO_PERMISSION); + response.setRemark("recall failed, operation is forbidden"); + return response; + } + if (BrokerRole.SLAVE == brokerController.getMessageStoreConfig().getBrokerRole()) { response.setCode(ResponseCode.SLAVE_NOT_AVAILABLE); response.setRemark("recall failed, broker service not available"); diff --git a/broker/src/test/java/org/apache/rocketmq/broker/processor/RecallMessageProcessorTest.java b/broker/src/test/java/org/apache/rocketmq/broker/processor/RecallMessageProcessorTest.java index 7bd260cc2c..d28eb2f1df 100644 --- a/broker/src/test/java/org/apache/rocketmq/broker/processor/RecallMessageProcessorTest.java +++ b/broker/src/test/java/org/apache/rocketmq/broker/processor/RecallMessageProcessorTest.java @@ -89,6 +89,7 @@ public class RecallMessageProcessorTest { when(brokerController.getMessageStore()).thenReturn(messageStore); when(brokerController.getBrokerConfig()).thenReturn(brokerConfig); when(brokerConfig.getBrokerName()).thenReturn(BROKER_NAME); + when(brokerConfig.isRecallMessageEnable()).thenReturn(true); when(brokerController.getBrokerStatsManager()).thenReturn(brokerStatsManager); when(handlerContext.channel()).thenReturn(channel); recallMessageProcessor = new RecallMessageProcessor(brokerController); @@ -134,6 +135,14 @@ public class RecallMessageProcessorTest { } } + @Test + public void testProcessRequest_notEnable() throws RemotingCommandException { + when(brokerConfig.isRecallMessageEnable()).thenReturn(false); + RemotingCommand request = mockRequest(0, TOPIC, TOPIC, "id", BROKER_NAME); + RemotingCommand response = recallMessageProcessor.processRequest(handlerContext, request); + Assert.assertEquals(ResponseCode.NO_PERMISSION, response.getCode()); + } + @Test public void testProcessRequest_invalidStatus() throws RemotingCommandException { RemotingCommand request = mockRequest(0, TOPIC, TOPIC, "id", BROKER_NAME); diff --git a/common/src/main/java/org/apache/rocketmq/common/BrokerConfig.java b/common/src/main/java/org/apache/rocketmq/common/BrokerConfig.java index b5dc1899e9..dd34544935 100644 --- a/common/src/main/java/org/apache/rocketmq/common/BrokerConfig.java +++ b/common/src/main/java/org/apache/rocketmq/common/BrokerConfig.java @@ -453,6 +453,8 @@ public class BrokerConfig extends BrokerIdentity { private boolean allowRecallWhenBrokerNotWriteable = true; + private boolean recallMessageEnable = false; + public String getConfigBlackList() { return configBlackList; } @@ -1996,4 +1998,12 @@ public class BrokerConfig extends BrokerIdentity { public void setAllowRecallWhenBrokerNotWriteable(boolean allowRecallWhenBrokerNotWriteable) { this.allowRecallWhenBrokerNotWriteable = allowRecallWhenBrokerNotWriteable; } + + public boolean isRecallMessageEnable() { + return recallMessageEnable; + } + + public void setRecallMessageEnable(boolean recallMessageEnable) { + this.recallMessageEnable = recallMessageEnable; + } } diff --git a/test/src/test/java/org/apache/rocketmq/test/base/IntegrationTestBase.java b/test/src/test/java/org/apache/rocketmq/test/base/IntegrationTestBase.java index fde991ad13..287e54d561 100644 --- a/test/src/test/java/org/apache/rocketmq/test/base/IntegrationTestBase.java +++ b/test/src/test/java/org/apache/rocketmq/test/base/IntegrationTestBase.java @@ -138,6 +138,7 @@ public class IntegrationTestBase { brokerConfig.setEnableCalcFilterBitMap(true); brokerConfig.setAppendAckAsync(true); brokerConfig.setAppendCkAsync(true); + brokerConfig.setRecallMessageEnable(true); storeConfig.setEnableConsumeQueueExt(true); brokerConfig.setLoadBalancePollNameServerInterval(500); storeConfig.setStorePathRootDir(baseDir);