This is an automated email from the ASF dual-hosted git repository. lollipop pushed a commit to branch develop in repository https://gitbox.apache.org/repos/asf/rocketmq.git
The following commit(s) were added to refs/heads/develop by this push: new 1c35adb3dc [ISSUE #9075]Avoid message type validate in message sync scenario. (#9076) 1c35adb3dc is described below commit 1c35adb3dc3824ef39f39d1375cf224428fdc4fb Author: dingshuangxi888 <dingshuangxi...@gmail.com> AuthorDate: Wed Dec 25 17:57:47 2024 +0800 [ISSUE #9075]Avoid message type validate in message sync scenario. (#9076) * Avoid message type validate in message sync scenario. --- .../java/org/apache/rocketmq/common/message/Message.java | 7 +++++++ .../apache/rocketmq/proxy/processor/ProducerProcessor.java | 6 +++++- .../proxy/remoting/activity/SendMessageActivity.java | 14 ++++++++++---- 3 files changed, 22 insertions(+), 5 deletions(-) diff --git a/common/src/main/java/org/apache/rocketmq/common/message/Message.java b/common/src/main/java/org/apache/rocketmq/common/message/Message.java index c7997c4731..acd4df96d2 100644 --- a/common/src/main/java/org/apache/rocketmq/common/message/Message.java +++ b/common/src/main/java/org/apache/rocketmq/common/message/Message.java @@ -108,6 +108,13 @@ public class Message implements Serializable { return this.properties.get(name); } + public boolean hasProperty(final String name) { + if (null == this.properties) { + return false; + } + return this.properties.containsKey(name); + } + public String getTopic() { return topic; } diff --git a/proxy/src/main/java/org/apache/rocketmq/proxy/processor/ProducerProcessor.java b/proxy/src/main/java/org/apache/rocketmq/proxy/processor/ProducerProcessor.java index 43e16ddd2d..17a2f27fa7 100644 --- a/proxy/src/main/java/org/apache/rocketmq/proxy/processor/ProducerProcessor.java +++ b/proxy/src/main/java/org/apache/rocketmq/proxy/processor/ProducerProcessor.java @@ -74,7 +74,7 @@ public class ProducerProcessor extends AbstractProcessor { try { Message message = messageList.get(0); String topic = message.getTopic(); - if (ConfigurationManager.getProxyConfig().isEnableTopicMessageTypeCheck()) { + if (isNeedCheckTopicMessageType(message)) { if (topicMessageTypeValidator != null) { // Do not check retry or dlq topic if (!NamespaceUtil.isRetryTopic(topic) && !NamespaceUtil.isDLQTopic(topic)) { @@ -261,4 +261,8 @@ public class ProducerProcessor extends AbstractProcessor { return FutureUtils.addExecutor(future, this.executor); } + private boolean isNeedCheckTopicMessageType(Message message) { + return ConfigurationManager.getProxyConfig().isEnableTopicMessageTypeCheck() + && !message.hasProperty(MessageConst.PROPERTY_TRANSFER_FLAG); + } } diff --git a/proxy/src/main/java/org/apache/rocketmq/proxy/remoting/activity/SendMessageActivity.java b/proxy/src/main/java/org/apache/rocketmq/proxy/remoting/activity/SendMessageActivity.java index 17af0fdcb3..22d9efd934 100644 --- a/proxy/src/main/java/org/apache/rocketmq/proxy/remoting/activity/SendMessageActivity.java +++ b/proxy/src/main/java/org/apache/rocketmq/proxy/remoting/activity/SendMessageActivity.java @@ -21,17 +21,18 @@ import io.netty.channel.ChannelHandlerContext; import java.time.Duration; import java.util.Map; import org.apache.rocketmq.common.attribute.TopicMessageType; +import org.apache.rocketmq.common.message.MessageConst; import org.apache.rocketmq.common.message.MessageDecoder; -import org.apache.rocketmq.remoting.protocol.NamespaceUtil; -import org.apache.rocketmq.remoting.protocol.RequestCode; -import org.apache.rocketmq.remoting.protocol.header.SendMessageRequestHeader; import org.apache.rocketmq.proxy.common.ProxyContext; import org.apache.rocketmq.proxy.config.ConfigurationManager; import org.apache.rocketmq.proxy.processor.MessagingProcessor; import org.apache.rocketmq.proxy.processor.validator.DefaultTopicMessageTypeValidator; import org.apache.rocketmq.proxy.processor.validator.TopicMessageTypeValidator; import org.apache.rocketmq.proxy.remoting.pipeline.RequestPipeline; +import org.apache.rocketmq.remoting.protocol.NamespaceUtil; import org.apache.rocketmq.remoting.protocol.RemotingCommand; +import org.apache.rocketmq.remoting.protocol.RequestCode; +import org.apache.rocketmq.remoting.protocol.header.SendMessageRequestHeader; public class SendMessageActivity extends AbstractRemotingActivity { TopicMessageTypeValidator topicMessageTypeValidator; @@ -66,7 +67,7 @@ public class SendMessageActivity extends AbstractRemotingActivity { String topic = requestHeader.getTopic(); Map<String, String> property = MessageDecoder.string2messageProperties(requestHeader.getProperties()); TopicMessageType messageType = TopicMessageType.parseFromMessageProperty(property); - if (ConfigurationManager.getProxyConfig().isEnableTopicMessageTypeCheck()) { + if (isNeedCheckTopicMessageType(property)) { if (topicMessageTypeValidator != null) { // Do not check retry or dlq topic if (!NamespaceUtil.isRetryTopic(topic) && !NamespaceUtil.isDLQTopic(topic)) { @@ -87,4 +88,9 @@ public class SendMessageActivity extends AbstractRemotingActivity { ProxyContext context) throws Exception { return request(ctx, request, context, Duration.ofSeconds(3).toMillis()); } + + private boolean isNeedCheckTopicMessageType(Map<String, String> property) { + return ConfigurationManager.getProxyConfig().isEnableTopicMessageTypeCheck() + && !property.containsKey(MessageConst.PROPERTY_TRANSFER_FLAG); + } }