This is an automated email from the ASF dual-hosted git repository.

lollipop pushed a commit to branch develop
in repository https://gitbox.apache.org/repos/asf/rocketmq.git


The following commit(s) were added to refs/heads/develop by this push:
     new 1c35adb3dc [ISSUE #9075]Avoid message type validate in message sync 
scenario. (#9076)
1c35adb3dc is described below

commit 1c35adb3dc3824ef39f39d1375cf224428fdc4fb
Author: dingshuangxi888 <dingshuangxi...@gmail.com>
AuthorDate: Wed Dec 25 17:57:47 2024 +0800

    [ISSUE #9075]Avoid message type validate in message sync scenario. (#9076)
    
    * Avoid message type validate in message sync scenario.
---
 .../java/org/apache/rocketmq/common/message/Message.java   |  7 +++++++
 .../apache/rocketmq/proxy/processor/ProducerProcessor.java |  6 +++++-
 .../proxy/remoting/activity/SendMessageActivity.java       | 14 ++++++++++----
 3 files changed, 22 insertions(+), 5 deletions(-)

diff --git 
a/common/src/main/java/org/apache/rocketmq/common/message/Message.java 
b/common/src/main/java/org/apache/rocketmq/common/message/Message.java
index c7997c4731..acd4df96d2 100644
--- a/common/src/main/java/org/apache/rocketmq/common/message/Message.java
+++ b/common/src/main/java/org/apache/rocketmq/common/message/Message.java
@@ -108,6 +108,13 @@ public class Message implements Serializable {
         return this.properties.get(name);
     }
 
+    public boolean hasProperty(final String name) {
+        if (null == this.properties) {
+            return false;
+        }
+        return this.properties.containsKey(name);
+    }
+
     public String getTopic() {
         return topic;
     }
diff --git 
a/proxy/src/main/java/org/apache/rocketmq/proxy/processor/ProducerProcessor.java
 
b/proxy/src/main/java/org/apache/rocketmq/proxy/processor/ProducerProcessor.java
index 43e16ddd2d..17a2f27fa7 100644
--- 
a/proxy/src/main/java/org/apache/rocketmq/proxy/processor/ProducerProcessor.java
+++ 
b/proxy/src/main/java/org/apache/rocketmq/proxy/processor/ProducerProcessor.java
@@ -74,7 +74,7 @@ public class ProducerProcessor extends AbstractProcessor {
         try {
             Message message = messageList.get(0);
             String topic = message.getTopic();
-            if 
(ConfigurationManager.getProxyConfig().isEnableTopicMessageTypeCheck()) {
+            if (isNeedCheckTopicMessageType(message)) {
                 if (topicMessageTypeValidator != null) {
                     // Do not check retry or dlq topic
                     if (!NamespaceUtil.isRetryTopic(topic) && 
!NamespaceUtil.isDLQTopic(topic)) {
@@ -261,4 +261,8 @@ public class ProducerProcessor extends AbstractProcessor {
         return FutureUtils.addExecutor(future, this.executor);
     }
 
+    private boolean isNeedCheckTopicMessageType(Message message) {
+        return 
ConfigurationManager.getProxyConfig().isEnableTopicMessageTypeCheck()
+            && !message.hasProperty(MessageConst.PROPERTY_TRANSFER_FLAG);
+    }
 }
diff --git 
a/proxy/src/main/java/org/apache/rocketmq/proxy/remoting/activity/SendMessageActivity.java
 
b/proxy/src/main/java/org/apache/rocketmq/proxy/remoting/activity/SendMessageActivity.java
index 17af0fdcb3..22d9efd934 100644
--- 
a/proxy/src/main/java/org/apache/rocketmq/proxy/remoting/activity/SendMessageActivity.java
+++ 
b/proxy/src/main/java/org/apache/rocketmq/proxy/remoting/activity/SendMessageActivity.java
@@ -21,17 +21,18 @@ import io.netty.channel.ChannelHandlerContext;
 import java.time.Duration;
 import java.util.Map;
 import org.apache.rocketmq.common.attribute.TopicMessageType;
+import org.apache.rocketmq.common.message.MessageConst;
 import org.apache.rocketmq.common.message.MessageDecoder;
-import org.apache.rocketmq.remoting.protocol.NamespaceUtil;
-import org.apache.rocketmq.remoting.protocol.RequestCode;
-import org.apache.rocketmq.remoting.protocol.header.SendMessageRequestHeader;
 import org.apache.rocketmq.proxy.common.ProxyContext;
 import org.apache.rocketmq.proxy.config.ConfigurationManager;
 import org.apache.rocketmq.proxy.processor.MessagingProcessor;
 import 
org.apache.rocketmq.proxy.processor.validator.DefaultTopicMessageTypeValidator;
 import org.apache.rocketmq.proxy.processor.validator.TopicMessageTypeValidator;
 import org.apache.rocketmq.proxy.remoting.pipeline.RequestPipeline;
+import org.apache.rocketmq.remoting.protocol.NamespaceUtil;
 import org.apache.rocketmq.remoting.protocol.RemotingCommand;
+import org.apache.rocketmq.remoting.protocol.RequestCode;
+import org.apache.rocketmq.remoting.protocol.header.SendMessageRequestHeader;
 
 public class SendMessageActivity extends AbstractRemotingActivity {
     TopicMessageTypeValidator topicMessageTypeValidator;
@@ -66,7 +67,7 @@ public class SendMessageActivity extends 
AbstractRemotingActivity {
         String topic = requestHeader.getTopic();
         Map<String, String> property = 
MessageDecoder.string2messageProperties(requestHeader.getProperties());
         TopicMessageType messageType = 
TopicMessageType.parseFromMessageProperty(property);
-        if 
(ConfigurationManager.getProxyConfig().isEnableTopicMessageTypeCheck()) {
+        if (isNeedCheckTopicMessageType(property)) {
             if (topicMessageTypeValidator != null) {
                 // Do not check retry or dlq topic
                 if (!NamespaceUtil.isRetryTopic(topic) && 
!NamespaceUtil.isDLQTopic(topic)) {
@@ -87,4 +88,9 @@ public class SendMessageActivity extends 
AbstractRemotingActivity {
         ProxyContext context) throws Exception {
         return request(ctx, request, context, 
Duration.ofSeconds(3).toMillis());
     }
+
+    private boolean isNeedCheckTopicMessageType(Map<String, String> property) {
+        return 
ConfigurationManager.getProxyConfig().isEnableTopicMessageTypeCheck()
+            && !property.containsKey(MessageConst.PROPERTY_TRANSFER_FLAG);
+    }
 }

Reply via email to