This is an automated email from the ASF dual-hosted git repository.

davsclaus pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/camel.git


The following commit(s) were added to refs/heads/main by this push:
     new 2614b4a36a4 Expose more params for rocketmq component (#18527)
2614b4a36a4 is described below

commit 2614b4a36a4c7590a91dceaae6d54c05de002262
Author: phateffect <[email protected]>
AuthorDate: Fri Jul 4 01:28:13 2025 +0800

    Expose more params for rocketmq component (#18527)
    
    * expose namespace, messageSelectorType, enableTrace and accessChannel
    
    * set default value for subscribeSql
    
    * add generated files
    
    ---------
    
    Co-authored-by: ShiWei <[email protected]>
---
 .../rocketmq/RocketMQComponentConfigurer.java      | 27 ++++++++
 .../rocketmq/RocketMQEndpointConfigurer.java       | 27 ++++++++
 .../rocketmq/RocketMQEndpointUriFactory.java       |  7 +-
 .../apache/camel/component/rocketmq/rocketmq.json  | 72 +++++++++++---------
 .../component/rocketmq/RocketMQComponent.java      | 76 ++++++++++++++++++++++
 .../camel/component/rocketmq/RocketMQConsumer.java | 22 ++++++-
 .../camel/component/rocketmq/RocketMQEndpoint.java | 66 +++++++++++++++++++
 .../camel/component/rocketmq/RocketMQProducer.java |  6 +-
 8 files changed, 268 insertions(+), 35 deletions(-)

diff --git 
a/components/camel-rocketmq/src/generated/java/org/apache/camel/component/rocketmq/RocketMQComponentConfigurer.java
 
b/components/camel-rocketmq/src/generated/java/org/apache/camel/component/rocketmq/RocketMQComponentConfigurer.java
index ee1f607ff56..e219929cacc 100644
--- 
a/components/camel-rocketmq/src/generated/java/org/apache/camel/component/rocketmq/RocketMQComponentConfigurer.java
+++ 
b/components/camel-rocketmq/src/generated/java/org/apache/camel/component/rocketmq/RocketMQComponentConfigurer.java
@@ -23,6 +23,8 @@ public class RocketMQComponentConfigurer extends 
PropertyConfigurerSupport imple
     public boolean configure(CamelContext camelContext, Object obj, String 
name, Object value, boolean ignoreCase) {
         RocketMQComponent target = (RocketMQComponent) obj;
         switch (ignoreCase ? name.toLowerCase() : name) {
+        case "accesschannel":
+        case "accessChannel": target.setAccessChannel(property(camelContext, 
java.lang.String.class, value)); return true;
         case "accesskey":
         case "accessKey": target.setAccessKey(property(camelContext, 
java.lang.String.class, value)); return true;
         case "autowiredenabled":
@@ -31,8 +33,13 @@ public class RocketMQComponentConfigurer extends 
PropertyConfigurerSupport imple
         case "bridgeErrorHandler": 
target.setBridgeErrorHandler(property(camelContext, boolean.class, value)); 
return true;
         case "consumergroup":
         case "consumerGroup": target.setConsumerGroup(property(camelContext, 
java.lang.String.class, value)); return true;
+        case "enabletrace":
+        case "enableTrace": target.setEnableTrace(property(camelContext, 
boolean.class, value)); return true;
         case "lazystartproducer":
         case "lazyStartProducer": 
target.setLazyStartProducer(property(camelContext, boolean.class, value)); 
return true;
+        case "messageselectortype":
+        case "messageSelectorType": 
target.setMessageSelectorType(property(camelContext, java.lang.String.class, 
value)); return true;
+        case "namespace": target.setNamespace(property(camelContext, 
java.lang.String.class, value)); return true;
         case "namesrvaddr":
         case "namesrvAddr": target.setNamesrvAddr(property(camelContext, 
java.lang.String.class, value)); return true;
         case "producergroup":
@@ -49,6 +56,8 @@ public class RocketMQComponentConfigurer extends 
PropertyConfigurerSupport imple
         case "secretKey": target.setSecretKey(property(camelContext, 
java.lang.String.class, value)); return true;
         case "sendtag":
         case "sendTag": target.setSendTag(property(camelContext, 
java.lang.String.class, value)); return true;
+        case "subscribesql":
+        case "subscribeSql": target.setSubscribeSql(property(camelContext, 
java.lang.String.class, value)); return true;
         case "subscribetags":
         case "subscribeTags": target.setSubscribeTags(property(camelContext, 
java.lang.String.class, value)); return true;
         case "waitforsendresult":
@@ -60,6 +69,8 @@ public class RocketMQComponentConfigurer extends 
PropertyConfigurerSupport imple
     @Override
     public Class<?> getOptionType(String name, boolean ignoreCase) {
         switch (ignoreCase ? name.toLowerCase() : name) {
+        case "accesschannel":
+        case "accessChannel": return java.lang.String.class;
         case "accesskey":
         case "accessKey": return java.lang.String.class;
         case "autowiredenabled":
@@ -68,8 +79,13 @@ public class RocketMQComponentConfigurer extends 
PropertyConfigurerSupport imple
         case "bridgeErrorHandler": return boolean.class;
         case "consumergroup":
         case "consumerGroup": return java.lang.String.class;
+        case "enabletrace":
+        case "enableTrace": return boolean.class;
         case "lazystartproducer":
         case "lazyStartProducer": return boolean.class;
+        case "messageselectortype":
+        case "messageSelectorType": return java.lang.String.class;
+        case "namespace": return java.lang.String.class;
         case "namesrvaddr":
         case "namesrvAddr": return java.lang.String.class;
         case "producergroup":
@@ -86,6 +102,8 @@ public class RocketMQComponentConfigurer extends 
PropertyConfigurerSupport imple
         case "secretKey": return java.lang.String.class;
         case "sendtag":
         case "sendTag": return java.lang.String.class;
+        case "subscribesql":
+        case "subscribeSql": return java.lang.String.class;
         case "subscribetags":
         case "subscribeTags": return java.lang.String.class;
         case "waitforsendresult":
@@ -98,6 +116,8 @@ public class RocketMQComponentConfigurer extends 
PropertyConfigurerSupport imple
     public Object getOptionValue(Object obj, String name, boolean ignoreCase) {
         RocketMQComponent target = (RocketMQComponent) obj;
         switch (ignoreCase ? name.toLowerCase() : name) {
+        case "accesschannel":
+        case "accessChannel": return target.getAccessChannel();
         case "accesskey":
         case "accessKey": return target.getAccessKey();
         case "autowiredenabled":
@@ -106,8 +126,13 @@ public class RocketMQComponentConfigurer extends 
PropertyConfigurerSupport imple
         case "bridgeErrorHandler": return target.isBridgeErrorHandler();
         case "consumergroup":
         case "consumerGroup": return target.getConsumerGroup();
+        case "enabletrace":
+        case "enableTrace": return target.isEnableTrace();
         case "lazystartproducer":
         case "lazyStartProducer": return target.isLazyStartProducer();
+        case "messageselectortype":
+        case "messageSelectorType": return target.getMessageSelectorType();
+        case "namespace": return target.getNamespace();
         case "namesrvaddr":
         case "namesrvAddr": return target.getNamesrvAddr();
         case "producergroup":
@@ -124,6 +149,8 @@ public class RocketMQComponentConfigurer extends 
PropertyConfigurerSupport imple
         case "secretKey": return target.getSecretKey();
         case "sendtag":
         case "sendTag": return target.getSendTag();
+        case "subscribesql":
+        case "subscribeSql": return target.getSubscribeSql();
         case "subscribetags":
         case "subscribeTags": return target.getSubscribeTags();
         case "waitforsendresult":
diff --git 
a/components/camel-rocketmq/src/generated/java/org/apache/camel/component/rocketmq/RocketMQEndpointConfigurer.java
 
b/components/camel-rocketmq/src/generated/java/org/apache/camel/component/rocketmq/RocketMQEndpointConfigurer.java
index 3a4ce64b22b..7890a8f6a08 100644
--- 
a/components/camel-rocketmq/src/generated/java/org/apache/camel/component/rocketmq/RocketMQEndpointConfigurer.java
+++ 
b/components/camel-rocketmq/src/generated/java/org/apache/camel/component/rocketmq/RocketMQEndpointConfigurer.java
@@ -23,18 +23,25 @@ public class RocketMQEndpointConfigurer extends 
PropertyConfigurerSupport implem
     public boolean configure(CamelContext camelContext, Object obj, String 
name, Object value, boolean ignoreCase) {
         RocketMQEndpoint target = (RocketMQEndpoint) obj;
         switch (ignoreCase ? name.toLowerCase() : name) {
+        case "accesschannel":
+        case "accessChannel": target.setAccessChannel(property(camelContext, 
java.lang.String.class, value)); return true;
         case "accesskey":
         case "accessKey": target.setAccessKey(property(camelContext, 
java.lang.String.class, value)); return true;
         case "bridgeerrorhandler":
         case "bridgeErrorHandler": 
target.setBridgeErrorHandler(property(camelContext, boolean.class, value)); 
return true;
         case "consumergroup":
         case "consumerGroup": target.setConsumerGroup(property(camelContext, 
java.lang.String.class, value)); return true;
+        case "enabletrace":
+        case "enableTrace": target.setEnableTrace(property(camelContext, 
boolean.class, value)); return true;
         case "exceptionhandler":
         case "exceptionHandler": 
target.setExceptionHandler(property(camelContext, 
org.apache.camel.spi.ExceptionHandler.class, value)); return true;
         case "exchangepattern":
         case "exchangePattern": 
target.setExchangePattern(property(camelContext, 
org.apache.camel.ExchangePattern.class, value)); return true;
         case "lazystartproducer":
         case "lazyStartProducer": 
target.setLazyStartProducer(property(camelContext, boolean.class, value)); 
return true;
+        case "messageselectortype":
+        case "messageSelectorType": 
target.setMessageSelectorType(property(camelContext, java.lang.String.class, 
value)); return true;
+        case "namespace": target.setNamespace(property(camelContext, 
java.lang.String.class, value)); return true;
         case "namesrvaddr":
         case "namesrvAddr": target.setNamesrvAddr(property(camelContext, 
java.lang.String.class, value)); return true;
         case "producergroup":
@@ -51,6 +58,8 @@ public class RocketMQEndpointConfigurer extends 
PropertyConfigurerSupport implem
         case "secretKey": target.setSecretKey(property(camelContext, 
java.lang.String.class, value)); return true;
         case "sendtag":
         case "sendTag": target.setSendTag(property(camelContext, 
java.lang.String.class, value)); return true;
+        case "subscribesql":
+        case "subscribeSql": target.setSubscribeSql(property(camelContext, 
java.lang.String.class, value)); return true;
         case "subscribetags":
         case "subscribeTags": target.setSubscribeTags(property(camelContext, 
java.lang.String.class, value)); return true;
         case "waitforsendresult":
@@ -62,18 +71,25 @@ public class RocketMQEndpointConfigurer extends 
PropertyConfigurerSupport implem
     @Override
     public Class<?> getOptionType(String name, boolean ignoreCase) {
         switch (ignoreCase ? name.toLowerCase() : name) {
+        case "accesschannel":
+        case "accessChannel": return java.lang.String.class;
         case "accesskey":
         case "accessKey": return java.lang.String.class;
         case "bridgeerrorhandler":
         case "bridgeErrorHandler": return boolean.class;
         case "consumergroup":
         case "consumerGroup": return java.lang.String.class;
+        case "enabletrace":
+        case "enableTrace": return boolean.class;
         case "exceptionhandler":
         case "exceptionHandler": return 
org.apache.camel.spi.ExceptionHandler.class;
         case "exchangepattern":
         case "exchangePattern": return org.apache.camel.ExchangePattern.class;
         case "lazystartproducer":
         case "lazyStartProducer": return boolean.class;
+        case "messageselectortype":
+        case "messageSelectorType": return java.lang.String.class;
+        case "namespace": return java.lang.String.class;
         case "namesrvaddr":
         case "namesrvAddr": return java.lang.String.class;
         case "producergroup":
@@ -90,6 +106,8 @@ public class RocketMQEndpointConfigurer extends 
PropertyConfigurerSupport implem
         case "secretKey": return java.lang.String.class;
         case "sendtag":
         case "sendTag": return java.lang.String.class;
+        case "subscribesql":
+        case "subscribeSql": return java.lang.String.class;
         case "subscribetags":
         case "subscribeTags": return java.lang.String.class;
         case "waitforsendresult":
@@ -102,18 +120,25 @@ public class RocketMQEndpointConfigurer extends 
PropertyConfigurerSupport implem
     public Object getOptionValue(Object obj, String name, boolean ignoreCase) {
         RocketMQEndpoint target = (RocketMQEndpoint) obj;
         switch (ignoreCase ? name.toLowerCase() : name) {
+        case "accesschannel":
+        case "accessChannel": return target.getAccessChannel();
         case "accesskey":
         case "accessKey": return target.getAccessKey();
         case "bridgeerrorhandler":
         case "bridgeErrorHandler": return target.isBridgeErrorHandler();
         case "consumergroup":
         case "consumerGroup": return target.getConsumerGroup();
+        case "enabletrace":
+        case "enableTrace": return target.isEnableTrace();
         case "exceptionhandler":
         case "exceptionHandler": return target.getExceptionHandler();
         case "exchangepattern":
         case "exchangePattern": return target.getExchangePattern();
         case "lazystartproducer":
         case "lazyStartProducer": return target.isLazyStartProducer();
+        case "messageselectortype":
+        case "messageSelectorType": return target.getMessageSelectorType();
+        case "namespace": return target.getNamespace();
         case "namesrvaddr":
         case "namesrvAddr": return target.getNamesrvAddr();
         case "producergroup":
@@ -130,6 +155,8 @@ public class RocketMQEndpointConfigurer extends 
PropertyConfigurerSupport implem
         case "secretKey": return target.getSecretKey();
         case "sendtag":
         case "sendTag": return target.getSendTag();
+        case "subscribesql":
+        case "subscribeSql": return target.getSubscribeSql();
         case "subscribetags":
         case "subscribeTags": return target.getSubscribeTags();
         case "waitforsendresult":
diff --git 
a/components/camel-rocketmq/src/generated/java/org/apache/camel/component/rocketmq/RocketMQEndpointUriFactory.java
 
b/components/camel-rocketmq/src/generated/java/org/apache/camel/component/rocketmq/RocketMQEndpointUriFactory.java
index a7ac8457bf4..95fe5d6ea1d 100644
--- 
a/components/camel-rocketmq/src/generated/java/org/apache/camel/component/rocketmq/RocketMQEndpointUriFactory.java
+++ 
b/components/camel-rocketmq/src/generated/java/org/apache/camel/component/rocketmq/RocketMQEndpointUriFactory.java
@@ -23,13 +23,17 @@ public class RocketMQEndpointUriFactory extends 
org.apache.camel.support.compone
     private static final Set<String> SECRET_PROPERTY_NAMES;
     private static final Set<String> MULTI_VALUE_PREFIXES;
     static {
-        Set<String> props = new HashSet<>(17);
+        Set<String> props = new HashSet<>(22);
+        props.add("accessChannel");
         props.add("accessKey");
         props.add("bridgeErrorHandler");
         props.add("consumerGroup");
+        props.add("enableTrace");
         props.add("exceptionHandler");
         props.add("exchangePattern");
         props.add("lazyStartProducer");
+        props.add("messageSelectorType");
+        props.add("namespace");
         props.add("namesrvAddr");
         props.add("producerGroup");
         props.add("replyToConsumerGroup");
@@ -38,6 +42,7 @@ public class RocketMQEndpointUriFactory extends 
org.apache.camel.support.compone
         props.add("requestTimeoutMillis");
         props.add("secretKey");
         props.add("sendTag");
+        props.add("subscribeSql");
         props.add("subscribeTags");
         props.add("topicName");
         props.add("waitForSendResult");
diff --git 
a/components/camel-rocketmq/src/generated/resources/META-INF/org/apache/camel/component/rocketmq/rocketmq.json
 
b/components/camel-rocketmq/src/generated/resources/META-INF/org/apache/camel/component/rocketmq/rocketmq.json
index c41e534f5c2..e051a9be94e 100644
--- 
a/components/camel-rocketmq/src/generated/resources/META-INF/org/apache/camel/component/rocketmq/rocketmq.json
+++ 
b/components/camel-rocketmq/src/generated/resources/META-INF/org/apache/camel/component/rocketmq/rocketmq.json
@@ -24,21 +24,26 @@
     "remote": true
   },
   "componentProperties": {
-    "namesrvAddr": { "index": 0, "kind": "property", "displayName": "Namesrv 
Addr", "group": "common", "label": "common", "required": false, "type": 
"string", "javaType": "java.lang.String", "deprecated": false, "autowired": 
false, "secret": false, "defaultValue": "localhost:9876", "description": "Name 
server address of RocketMQ cluster." },
-    "sendTag": { "index": 1, "kind": "property", "displayName": "Send Tag", 
"group": "common", "label": "common", "required": false, "type": "string", 
"javaType": "java.lang.String", "deprecated": false, "autowired": false, 
"secret": false, "description": "Each message would be sent with this tag." },
-    "bridgeErrorHandler": { "index": 2, "kind": "property", "displayName": 
"Bridge Error Handler", "group": "consumer", "label": "consumer", "required": 
false, "type": "boolean", "javaType": "boolean", "deprecated": false, 
"autowired": false, "secret": false, "defaultValue": false, "description": 
"Allows for bridging the consumer to the Camel routing Error Handler, which 
mean any exceptions (if possible) occurred while the Camel consumer is trying 
to pickup incoming messages, or the like [...]
-    "consumerGroup": { "index": 3, "kind": "property", "displayName": 
"Consumer Group", "group": "consumer", "label": "consumer", "required": false, 
"type": "string", "javaType": "java.lang.String", "deprecated": false, 
"autowired": false, "secret": false, "description": "Consumer group name." },
-    "subscribeTags": { "index": 4, "kind": "property", "displayName": 
"Subscribe Tags", "group": "consumer", "label": "consumer", "required": false, 
"type": "string", "javaType": "java.lang.String", "deprecated": false, 
"autowired": false, "secret": false, "defaultValue": "*", "description": 
"Subscribe tags of consumer. Multiple tags could be split by , such as 
TagATagB" },
-    "lazyStartProducer": { "index": 5, "kind": "property", "displayName": 
"Lazy Start Producer", "group": "producer", "label": "producer", "required": 
false, "type": "boolean", "javaType": "boolean", "deprecated": false, 
"autowired": false, "secret": false, "defaultValue": false, "description": 
"Whether the producer should be started lazy (on the first message). By 
starting lazy you can use this to allow CamelContext and routes to startup in 
situations where a producer may otherwise fail [...]
-    "producerGroup": { "index": 6, "kind": "property", "displayName": 
"Producer Group", "group": "producer", "label": "producer", "required": false, 
"type": "string", "javaType": "java.lang.String", "deprecated": false, 
"autowired": false, "secret": false, "description": "Producer group name." },
-    "replyToConsumerGroup": { "index": 7, "kind": "property", "displayName": 
"Reply To Consumer Group", "group": "producer", "label": "producer", 
"required": false, "type": "string", "javaType": "java.lang.String", 
"deprecated": false, "autowired": false, "secret": false, "description": 
"Consumer group name used for receiving response." },
-    "replyToTopic": { "index": 8, "kind": "property", "displayName": "Reply To 
Topic", "group": "producer", "label": "producer", "required": false, "type": 
"string", "javaType": "java.lang.String", "deprecated": false, "autowired": 
false, "secret": false, "description": "Topic used for receiving response when 
using in-out pattern." },
-    "waitForSendResult": { "index": 9, "kind": "property", "displayName": 
"Wait For Send Result", "group": "producer", "label": "producer", "required": 
false, "type": "boolean", "javaType": "boolean", "deprecated": false, 
"autowired": false, "secret": false, "defaultValue": false, "description": 
"Whether waiting for send result before routing to next endpoint." },
-    "autowiredEnabled": { "index": 10, "kind": "property", "displayName": 
"Autowired Enabled", "group": "advanced", "label": "advanced", "required": 
false, "type": "boolean", "javaType": "boolean", "deprecated": false, 
"autowired": false, "secret": false, "defaultValue": true, "description": 
"Whether autowiring is enabled. This is used for automatic autowiring options 
(the option must be marked as autowired) by looking up in the registry to find 
if there is a single instance of matching  [...]
-    "requestTimeoutCheckerIntervalMillis": { "index": 11, "kind": "property", 
"displayName": "Request Timeout Checker Interval Millis", "group": "advanced", 
"label": "advanced", "required": false, "type": "integer", "javaType": "long", 
"deprecated": false, "autowired": false, "secret": false, "defaultValue": 1000, 
"description": "Check interval milliseconds of request timeout." },
-    "requestTimeoutMillis": { "index": 12, "kind": "property", "displayName": 
"Request Timeout Millis", "group": "advanced", "label": "advanced", "required": 
false, "type": "integer", "javaType": "long", "deprecated": false, "autowired": 
false, "secret": false, "defaultValue": 10000, "description": "Timeout 
milliseconds of receiving response when using in-out pattern." },
-    "accessKey": { "index": 13, "kind": "property", "displayName": "Access 
Key", "group": "secret", "label": "secret", "required": false, "type": 
"string", "javaType": "java.lang.String", "deprecated": false, "autowired": 
false, "secret": true, "description": "Access key for RocketMQ ACL." },
-    "secretKey": { "index": 14, "kind": "property", "displayName": "Secret 
Key", "group": "secret", "label": "secret", "required": false, "type": 
"string", "javaType": "java.lang.String", "deprecated": false, "autowired": 
false, "secret": true, "description": "Secret key for RocketMQ ACL." }
+    "accessChannel": { "index": 0, "kind": "property", "displayName": "Access 
Channel", "group": "common", "label": "common", "required": false, "type": 
"string", "javaType": "java.lang.String", "deprecated": false, "autowired": 
false, "secret": false, "defaultValue": "LOCAL", "description": "Access channel 
of RocketMQ cluster. LOCAL or CLOUD, LOCAL by default" },
+    "enableTrace": { "index": 1, "kind": "property", "displayName": "Enable 
Trace", "group": "common", "label": "common", "required": false, "type": 
"boolean", "javaType": "boolean", "deprecated": false, "autowired": false, 
"secret": false, "defaultValue": false, "description": "Whether to enable 
trace." },
+    "namespace": { "index": 2, "kind": "property", "displayName": "Namespace", 
"group": "common", "label": "common", "required": false, "type": "string", 
"javaType": "java.lang.String", "deprecated": false, "autowired": false, 
"secret": false, "description": "Namespace of RocketMQ cluster. You need to 
specify this if you are using serverless version of RocketMQ." },
+    "namesrvAddr": { "index": 3, "kind": "property", "displayName": "Namesrv 
Addr", "group": "common", "label": "common", "required": false, "type": 
"string", "javaType": "java.lang.String", "deprecated": false, "autowired": 
false, "secret": false, "defaultValue": "localhost:9876", "description": "Name 
server address of RocketMQ cluster." },
+    "sendTag": { "index": 4, "kind": "property", "displayName": "Send Tag", 
"group": "common", "label": "common", "required": false, "type": "string", 
"javaType": "java.lang.String", "deprecated": false, "autowired": false, 
"secret": false, "description": "Each message would be sent with this tag." },
+    "bridgeErrorHandler": { "index": 5, "kind": "property", "displayName": 
"Bridge Error Handler", "group": "consumer", "label": "consumer", "required": 
false, "type": "boolean", "javaType": "boolean", "deprecated": false, 
"autowired": false, "secret": false, "defaultValue": false, "description": 
"Allows for bridging the consumer to the Camel routing Error Handler, which 
mean any exceptions (if possible) occurred while the Camel consumer is trying 
to pickup incoming messages, or the like [...]
+    "consumerGroup": { "index": 6, "kind": "property", "displayName": 
"Consumer Group", "group": "consumer", "label": "consumer", "required": false, 
"type": "string", "javaType": "java.lang.String", "deprecated": false, 
"autowired": false, "secret": false, "description": "Consumer group name." },
+    "messageSelectorType": { "index": 7, "kind": "property", "displayName": 
"Message Selector Type", "group": "consumer", "label": "consumer", "required": 
false, "type": "string", "javaType": "java.lang.String", "deprecated": false, 
"autowired": false, "secret": false, "defaultValue": "tag", "description": 
"Message Selector Type, TAG or SQL TAG by default" },
+    "subscribeSql": { "index": 8, "kind": "property", "displayName": 
"Subscribe Sql", "group": "consumer", "label": "consumer", "required": false, 
"type": "string", "javaType": "java.lang.String", "deprecated": false, 
"autowired": false, "secret": false, "defaultValue": "1 = 1", "description": 
"Subscribe SQL of consumer. See 
https:\/\/rocketmq.apache.org\/docs\/featureBehavior\/07messagefilter\/#attribute-based-sql-filtering
 for more details." },
+    "subscribeTags": { "index": 9, "kind": "property", "displayName": 
"Subscribe Tags", "group": "consumer", "label": "consumer", "required": false, 
"type": "string", "javaType": "java.lang.String", "deprecated": false, 
"autowired": false, "secret": false, "defaultValue": "*", "description": 
"Subscribe tags of consumer. Multiple tags could be split by , such as 
TagATagB" },
+    "lazyStartProducer": { "index": 10, "kind": "property", "displayName": 
"Lazy Start Producer", "group": "producer", "label": "producer", "required": 
false, "type": "boolean", "javaType": "boolean", "deprecated": false, 
"autowired": false, "secret": false, "defaultValue": false, "description": 
"Whether the producer should be started lazy (on the first message). By 
starting lazy you can use this to allow CamelContext and routes to startup in 
situations where a producer may otherwise fai [...]
+    "producerGroup": { "index": 11, "kind": "property", "displayName": 
"Producer Group", "group": "producer", "label": "producer", "required": false, 
"type": "string", "javaType": "java.lang.String", "deprecated": false, 
"autowired": false, "secret": false, "description": "Producer group name." },
+    "replyToConsumerGroup": { "index": 12, "kind": "property", "displayName": 
"Reply To Consumer Group", "group": "producer", "label": "producer", 
"required": false, "type": "string", "javaType": "java.lang.String", 
"deprecated": false, "autowired": false, "secret": false, "description": 
"Consumer group name used for receiving response." },
+    "replyToTopic": { "index": 13, "kind": "property", "displayName": "Reply 
To Topic", "group": "producer", "label": "producer", "required": false, "type": 
"string", "javaType": "java.lang.String", "deprecated": false, "autowired": 
false, "secret": false, "description": "Topic used for receiving response when 
using in-out pattern." },
+    "waitForSendResult": { "index": 14, "kind": "property", "displayName": 
"Wait For Send Result", "group": "producer", "label": "producer", "required": 
false, "type": "boolean", "javaType": "boolean", "deprecated": false, 
"autowired": false, "secret": false, "defaultValue": false, "description": 
"Whether waiting for send result before routing to next endpoint." },
+    "autowiredEnabled": { "index": 15, "kind": "property", "displayName": 
"Autowired Enabled", "group": "advanced", "label": "advanced", "required": 
false, "type": "boolean", "javaType": "boolean", "deprecated": false, 
"autowired": false, "secret": false, "defaultValue": true, "description": 
"Whether autowiring is enabled. This is used for automatic autowiring options 
(the option must be marked as autowired) by looking up in the registry to find 
if there is a single instance of matching  [...]
+    "requestTimeoutCheckerIntervalMillis": { "index": 16, "kind": "property", 
"displayName": "Request Timeout Checker Interval Millis", "group": "advanced", 
"label": "advanced", "required": false, "type": "integer", "javaType": "long", 
"deprecated": false, "autowired": false, "secret": false, "defaultValue": 1000, 
"description": "Check interval milliseconds of request timeout." },
+    "requestTimeoutMillis": { "index": 17, "kind": "property", "displayName": 
"Request Timeout Millis", "group": "advanced", "label": "advanced", "required": 
false, "type": "integer", "javaType": "long", "deprecated": false, "autowired": 
false, "secret": false, "defaultValue": 10000, "description": "Timeout 
milliseconds of receiving response when using in-out pattern." },
+    "accessKey": { "index": 18, "kind": "property", "displayName": "Access 
Key", "group": "secret", "label": "secret", "required": false, "type": 
"string", "javaType": "java.lang.String", "deprecated": false, "autowired": 
false, "secret": true, "description": "Access key for RocketMQ ACL." },
+    "secretKey": { "index": 19, "kind": "property", "displayName": "Secret 
Key", "group": "secret", "label": "secret", "required": false, "type": 
"string", "javaType": "java.lang.String", "deprecated": false, "autowired": 
false, "secret": true, "description": "Secret key for RocketMQ ACL." }
   },
   "headers": {
     "CamelRockerMQTopic": { "index": 0, "kind": "header", "displayName": "", 
"group": "consumer", "label": "consumer", "required": false, "javaType": 
"String", "deprecated": false, "deprecationNote": "", "autowired": false, 
"secret": false, "description": "Topic of message", "constantName": 
"org.apache.camel.component.rocketmq.RocketMQConstants#TOPIC" },
@@ -64,21 +69,26 @@
   },
   "properties": {
     "topicName": { "index": 0, "kind": "path", "displayName": "Topic Name", 
"group": "common", "label": "", "required": true, "type": "string", "javaType": 
"java.lang.String", "deprecated": false, "deprecationNote": "", "autowired": 
false, "secret": false, "description": "Topic name of this endpoint." },
-    "namesrvAddr": { "index": 1, "kind": "parameter", "displayName": "Namesrv 
Addr", "group": "common", "label": "common", "required": false, "type": 
"string", "javaType": "java.lang.String", "deprecated": false, "autowired": 
false, "secret": false, "defaultValue": "localhost:9876", "description": "Name 
server address of RocketMQ cluster." },
-    "consumerGroup": { "index": 2, "kind": "parameter", "displayName": 
"Consumer Group", "group": "consumer", "label": "consumer", "required": false, 
"type": "string", "javaType": "java.lang.String", "deprecated": false, 
"autowired": false, "secret": false, "description": "Consumer group name." },
-    "subscribeTags": { "index": 3, "kind": "parameter", "displayName": 
"Subscribe Tags", "group": "consumer", "label": "consumer", "required": false, 
"type": "string", "javaType": "java.lang.String", "deprecated": false, 
"autowired": false, "secret": false, "defaultValue": "*", "description": 
"Subscribe tags of consumer. Multiple tags could be split by , such as 
TagATagB" },
-    "bridgeErrorHandler": { "index": 4, "kind": "parameter", "displayName": 
"Bridge Error Handler", "group": "consumer (advanced)", "label": 
"consumer,advanced", "required": false, "type": "boolean", "javaType": 
"boolean", "deprecated": false, "autowired": false, "secret": false, 
"defaultValue": false, "description": "Allows for bridging the consumer to the 
Camel routing Error Handler, which mean any exceptions (if possible) occurred 
while the Camel consumer is trying to pickup incoming  [...]
-    "exceptionHandler": { "index": 5, "kind": "parameter", "displayName": 
"Exception Handler", "group": "consumer (advanced)", "label": 
"consumer,advanced", "required": false, "type": "object", "javaType": 
"org.apache.camel.spi.ExceptionHandler", "optionalPrefix": "consumer.", 
"deprecated": false, "autowired": false, "secret": false, "description": "To 
let the consumer use a custom ExceptionHandler. Notice if the option 
bridgeErrorHandler is enabled then this option is not in use. By def [...]
-    "exchangePattern": { "index": 6, "kind": "parameter", "displayName": 
"Exchange Pattern", "group": "consumer (advanced)", "label": 
"consumer,advanced", "required": false, "type": "object", "javaType": 
"org.apache.camel.ExchangePattern", "enum": [ "InOnly", "InOut" ], 
"deprecated": false, "autowired": false, "secret": false, "description": "Sets 
the exchange pattern when the consumer creates an exchange." },
-    "producerGroup": { "index": 7, "kind": "parameter", "displayName": 
"Producer Group", "group": "producer", "label": "producer", "required": false, 
"type": "string", "javaType": "java.lang.String", "deprecated": false, 
"autowired": false, "secret": false, "description": "Producer group name." },
-    "replyToConsumerGroup": { "index": 8, "kind": "parameter", "displayName": 
"Reply To Consumer Group", "group": "producer", "label": "producer", 
"required": false, "type": "string", "javaType": "java.lang.String", 
"deprecated": false, "autowired": false, "secret": false, "description": 
"Consumer group name used for receiving response." },
-    "replyToTopic": { "index": 9, "kind": "parameter", "displayName": "Reply 
To Topic", "group": "producer", "label": "producer", "required": false, "type": 
"string", "javaType": "java.lang.String", "deprecated": false, "autowired": 
false, "secret": false, "description": "Topic used for receiving response when 
using in-out pattern." },
-    "sendTag": { "index": 10, "kind": "parameter", "displayName": "Send Tag", 
"group": "producer", "label": "producer", "required": false, "type": "string", 
"javaType": "java.lang.String", "deprecated": false, "autowired": false, 
"secret": false, "description": "Each message would be sent with this tag." },
-    "waitForSendResult": { "index": 11, "kind": "parameter", "displayName": 
"Wait For Send Result", "group": "producer", "label": "producer", "required": 
false, "type": "boolean", "javaType": "boolean", "deprecated": false, 
"autowired": false, "secret": false, "defaultValue": false, "description": 
"Whether waiting for send result before routing to next endpoint." },
-    "lazyStartProducer": { "index": 12, "kind": "parameter", "displayName": 
"Lazy Start Producer", "group": "producer (advanced)", "label": 
"producer,advanced", "required": false, "type": "boolean", "javaType": 
"boolean", "deprecated": false, "autowired": false, "secret": false, 
"defaultValue": false, "description": "Whether the producer should be started 
lazy (on the first message). By starting lazy you can use this to allow 
CamelContext and routes to startup in situations where a produ [...]
-    "requestTimeoutCheckerIntervalMillis": { "index": 13, "kind": "parameter", 
"displayName": "Request Timeout Checker Interval Millis", "group": "advanced", 
"label": "advanced", "required": false, "type": "integer", "javaType": "long", 
"deprecated": false, "autowired": false, "secret": false, "defaultValue": 1000, 
"description": "Check interval milliseconds of request timeout." },
-    "requestTimeoutMillis": { "index": 14, "kind": "parameter", "displayName": 
"Request Timeout Millis", "group": "advanced", "label": "advanced", "required": 
false, "type": "integer", "javaType": "long", "deprecated": false, "autowired": 
false, "secret": false, "defaultValue": 10000, "description": "Timeout 
milliseconds of receiving response when using in-out pattern." },
-    "accessKey": { "index": 15, "kind": "parameter", "displayName": "Access 
Key", "group": "security", "label": "security", "required": false, "type": 
"string", "javaType": "java.lang.String", "deprecated": false, "autowired": 
false, "secret": true, "description": "Access key for RocketMQ ACL." },
-    "secretKey": { "index": 16, "kind": "parameter", "displayName": "Secret 
Key", "group": "security", "label": "security", "required": false, "type": 
"string", "javaType": "java.lang.String", "deprecated": false, "autowired": 
false, "secret": true, "description": "Secret key for RocketMQ ACL." }
+    "accessChannel": { "index": 1, "kind": "parameter", "displayName": "Access 
Channel", "group": "common", "label": "common", "required": false, "type": 
"string", "javaType": "java.lang.String", "deprecated": false, "autowired": 
false, "secret": false, "defaultValue": "LOCAL", "description": "Access channel 
of RocketMQ cluster. LOCAL or CLOUD, LOCAL by default" },
+    "enableTrace": { "index": 2, "kind": "parameter", "displayName": "Enable 
Trace", "group": "common", "label": "common", "required": false, "type": 
"boolean", "javaType": "boolean", "deprecated": false, "autowired": false, 
"secret": false, "defaultValue": false, "description": "Whether to enable 
trace." },
+    "namespace": { "index": 3, "kind": "parameter", "displayName": 
"Namespace", "group": "common", "label": "common", "required": false, "type": 
"string", "javaType": "java.lang.String", "deprecated": false, "autowired": 
false, "secret": false, "description": "Namespace of RocketMQ cluster. You need 
to specify this if you are using serverless version of RocketMQ." },
+    "namesrvAddr": { "index": 4, "kind": "parameter", "displayName": "Namesrv 
Addr", "group": "common", "label": "common", "required": false, "type": 
"string", "javaType": "java.lang.String", "deprecated": false, "autowired": 
false, "secret": false, "defaultValue": "localhost:9876", "description": "Name 
server address of RocketMQ cluster." },
+    "consumerGroup": { "index": 5, "kind": "parameter", "displayName": 
"Consumer Group", "group": "consumer", "label": "consumer", "required": false, 
"type": "string", "javaType": "java.lang.String", "deprecated": false, 
"autowired": false, "secret": false, "description": "Consumer group name." },
+    "messageSelectorType": { "index": 6, "kind": "parameter", "displayName": 
"Message Selector Type", "group": "consumer", "label": "consumer", "required": 
false, "type": "string", "javaType": "java.lang.String", "deprecated": false, 
"autowired": false, "secret": false, "defaultValue": "tag", "description": 
"Message Selector Type, TAG or SQL TAG by default" },
+    "subscribeSql": { "index": 7, "kind": "parameter", "displayName": 
"Subscribe Sql", "group": "consumer", "label": "consumer", "required": false, 
"type": "string", "javaType": "java.lang.String", "deprecated": false, 
"autowired": false, "secret": false, "defaultValue": "1 = 1", "description": 
"Subscribe SQL of consumer. See 
https:\/\/rocketmq.apache.org\/docs\/featureBehavior\/07messagefilter\/#attribute-based-sql-filtering
 for more details." },
+    "subscribeTags": { "index": 8, "kind": "parameter", "displayName": 
"Subscribe Tags", "group": "consumer", "label": "consumer", "required": false, 
"type": "string", "javaType": "java.lang.String", "deprecated": false, 
"autowired": false, "secret": false, "defaultValue": "*", "description": 
"Subscribe tags of consumer. Multiple tags could be split by , such as 
TagATagB" },
+    "bridgeErrorHandler": { "index": 9, "kind": "parameter", "displayName": 
"Bridge Error Handler", "group": "consumer (advanced)", "label": 
"consumer,advanced", "required": false, "type": "boolean", "javaType": 
"boolean", "deprecated": false, "autowired": false, "secret": false, 
"defaultValue": false, "description": "Allows for bridging the consumer to the 
Camel routing Error Handler, which mean any exceptions (if possible) occurred 
while the Camel consumer is trying to pickup incoming  [...]
+    "exceptionHandler": { "index": 10, "kind": "parameter", "displayName": 
"Exception Handler", "group": "consumer (advanced)", "label": 
"consumer,advanced", "required": false, "type": "object", "javaType": 
"org.apache.camel.spi.ExceptionHandler", "optionalPrefix": "consumer.", 
"deprecated": false, "autowired": false, "secret": false, "description": "To 
let the consumer use a custom ExceptionHandler. Notice if the option 
bridgeErrorHandler is enabled then this option is not in use. By de [...]
+    "exchangePattern": { "index": 11, "kind": "parameter", "displayName": 
"Exchange Pattern", "group": "consumer (advanced)", "label": 
"consumer,advanced", "required": false, "type": "object", "javaType": 
"org.apache.camel.ExchangePattern", "enum": [ "InOnly", "InOut" ], 
"deprecated": false, "autowired": false, "secret": false, "description": "Sets 
the exchange pattern when the consumer creates an exchange." },
+    "producerGroup": { "index": 12, "kind": "parameter", "displayName": 
"Producer Group", "group": "producer", "label": "producer", "required": false, 
"type": "string", "javaType": "java.lang.String", "deprecated": false, 
"autowired": false, "secret": false, "description": "Producer group name." },
+    "replyToConsumerGroup": { "index": 13, "kind": "parameter", "displayName": 
"Reply To Consumer Group", "group": "producer", "label": "producer", 
"required": false, "type": "string", "javaType": "java.lang.String", 
"deprecated": false, "autowired": false, "secret": false, "description": 
"Consumer group name used for receiving response." },
+    "replyToTopic": { "index": 14, "kind": "parameter", "displayName": "Reply 
To Topic", "group": "producer", "label": "producer", "required": false, "type": 
"string", "javaType": "java.lang.String", "deprecated": false, "autowired": 
false, "secret": false, "description": "Topic used for receiving response when 
using in-out pattern." },
+    "sendTag": { "index": 15, "kind": "parameter", "displayName": "Send Tag", 
"group": "producer", "label": "producer", "required": false, "type": "string", 
"javaType": "java.lang.String", "deprecated": false, "autowired": false, 
"secret": false, "description": "Each message would be sent with this tag." },
+    "waitForSendResult": { "index": 16, "kind": "parameter", "displayName": 
"Wait For Send Result", "group": "producer", "label": "producer", "required": 
false, "type": "boolean", "javaType": "boolean", "deprecated": false, 
"autowired": false, "secret": false, "defaultValue": false, "description": 
"Whether waiting for send result before routing to next endpoint." },
+    "lazyStartProducer": { "index": 17, "kind": "parameter", "displayName": 
"Lazy Start Producer", "group": "producer (advanced)", "label": 
"producer,advanced", "required": false, "type": "boolean", "javaType": 
"boolean", "deprecated": false, "autowired": false, "secret": false, 
"defaultValue": false, "description": "Whether the producer should be started 
lazy (on the first message). By starting lazy you can use this to allow 
CamelContext and routes to startup in situations where a produ [...]
+    "requestTimeoutCheckerIntervalMillis": { "index": 18, "kind": "parameter", 
"displayName": "Request Timeout Checker Interval Millis", "group": "advanced", 
"label": "advanced", "required": false, "type": "integer", "javaType": "long", 
"deprecated": false, "autowired": false, "secret": false, "defaultValue": 1000, 
"description": "Check interval milliseconds of request timeout." },
+    "requestTimeoutMillis": { "index": 19, "kind": "parameter", "displayName": 
"Request Timeout Millis", "group": "advanced", "label": "advanced", "required": 
false, "type": "integer", "javaType": "long", "deprecated": false, "autowired": 
false, "secret": false, "defaultValue": 10000, "description": "Timeout 
milliseconds of receiving response when using in-out pattern." },
+    "accessKey": { "index": 20, "kind": "parameter", "displayName": "Access 
Key", "group": "security", "label": "security", "required": false, "type": 
"string", "javaType": "java.lang.String", "deprecated": false, "autowired": 
false, "secret": true, "description": "Access key for RocketMQ ACL." },
+    "secretKey": { "index": 21, "kind": "parameter", "displayName": "Secret 
Key", "group": "security", "label": "security", "required": false, "type": 
"string", "javaType": "java.lang.String", "deprecated": false, "autowired": 
false, "secret": true, "description": "Secret key for RocketMQ ACL." }
   }
 }
diff --git 
a/components/camel-rocketmq/src/main/java/org/apache/camel/component/rocketmq/RocketMQComponent.java
 
b/components/camel-rocketmq/src/main/java/org/apache/camel/component/rocketmq/RocketMQComponent.java
index 08063c8a152..737ee42dd58 100644
--- 
a/components/camel-rocketmq/src/main/java/org/apache/camel/component/rocketmq/RocketMQComponent.java
+++ 
b/components/camel-rocketmq/src/main/java/org/apache/camel/component/rocketmq/RocketMQComponent.java
@@ -32,15 +32,30 @@ public class RocketMQComponent extends DefaultComponent {
     @Metadata(label = "consumer")
     private String consumerGroup;
 
+    @Metadata(label = "consumer", defaultValue = "tag")
+    private String messageSelectorType = "tag";
+
     @Metadata(label = "consumer", defaultValue = "*")
     private String subscribeTags = "*";
 
+    @Metadata(label = "consumer", defaultValue = "1 = 1")
+    private String subscribeSql = "1 = 1";
+
     @Metadata(label = "common")
     private String sendTag = "";
 
     @Metadata(label = "common", defaultValue = "localhost:9876")
     private String namesrvAddr = "localhost:9876";
 
+    @Metadata(label = "common")
+    private String namespace;
+
+    @Metadata(label = "common", defaultValue = "false")
+    private boolean enableTrace = false;
+
+    @Metadata(label = "common", defaultValue = "LOCAL")
+    private String accessChannel = "LOCAL";
+
     @Metadata(label = "producer")
     private String replyToTopic;
 
@@ -67,8 +82,13 @@ public class RocketMQComponent extends DefaultComponent {
         RocketMQEndpoint endpoint = new RocketMQEndpoint(uri, this);
         endpoint.setProducerGroup(getProducerGroup());
         endpoint.setConsumerGroup(getConsumerGroup());
+        endpoint.setMessageSelectorType(getMessageSelectorType());
+        endpoint.setSubscribeSql(getSubscribeSql());
         endpoint.setSubscribeTags(getSubscribeTags());
         endpoint.setNamesrvAddr(getNamesrvAddr());
+        endpoint.setNamespace(getNamespace());
+        endpoint.setEnableTrace(isEnableTrace());
+        endpoint.setAccessChannel(getAccessChannel());
         endpoint.setSendTag(getSendTag());
         endpoint.setReplyToTopic(getReplyToTopic());
         endpoint.setReplyToConsumerGroup(getReplyToConsumerGroup());
@@ -82,6 +102,29 @@ public class RocketMQComponent extends DefaultComponent {
         return endpoint;
     }
 
+    public String getMessageSelectorType() {
+        return messageSelectorType;
+    }
+
+    /**
+     * Message Selector Type, TAG or SQL [TAG] by default
+     */
+    public void setMessageSelectorType(String messageSelectorType) {
+        this.messageSelectorType = messageSelectorType;
+    }
+
+    public String getSubscribeSql() {
+        return subscribeSql;
+    }
+
+    /**
+     * Subscribe SQL of consumer. See
+     * 
https://rocketmq.apache.org/docs/featureBehavior/07messagefilter/#attribute-based-sql-filtering
 for more details.
+     */
+    public void setSubscribeSql(String subscribeSql) {
+        this.subscribeSql = subscribeSql;
+    }
+
     public String getSubscribeTags() {
         return subscribeTags;
     }
@@ -115,6 +158,39 @@ public class RocketMQComponent extends DefaultComponent {
         this.namesrvAddr = namesrvAddr;
     }
 
+    public String getNamespace() {
+        return namespace;
+    }
+
+    /**
+     * Namespace of RocketMQ cluster. You need to specify this if you are 
using serverless version of RocketMQ.
+     */
+    public void setNamespace(String namespace) {
+        this.namespace = namespace;
+    }
+
+    public boolean isEnableTrace() {
+        return enableTrace;
+    }
+
+    /**
+     * Whether to enable trace.
+     */
+    public void setEnableTrace(boolean enableTrace) {
+        this.enableTrace = enableTrace;
+    }
+
+    public String getAccessChannel() {
+        return accessChannel;
+    }
+
+    /**
+     * Access channel of RocketMQ cluster. LOCAL or CLOUD, [LOCAL] by default
+     */
+    public void setAccessChannel(String accessChannel) {
+        this.accessChannel = accessChannel;
+    }
+
     public String getProducerGroup() {
         return producerGroup;
     }
diff --git 
a/components/camel-rocketmq/src/main/java/org/apache/camel/component/rocketmq/RocketMQConsumer.java
 
b/components/camel-rocketmq/src/main/java/org/apache/camel/component/rocketmq/RocketMQConsumer.java
index 6cfa996b861..c26dd2e10cb 100644
--- 
a/components/camel-rocketmq/src/main/java/org/apache/camel/component/rocketmq/RocketMQConsumer.java
+++ 
b/components/camel-rocketmq/src/main/java/org/apache/camel/component/rocketmq/RocketMQConsumer.java
@@ -21,7 +21,9 @@ import org.apache.camel.Exchange;
 import org.apache.camel.Processor;
 import org.apache.camel.Suspendable;
 import org.apache.camel.support.DefaultConsumer;
+import org.apache.rocketmq.client.AccessChannel;
 import org.apache.rocketmq.client.consumer.DefaultMQPushConsumer;
+import org.apache.rocketmq.client.consumer.MessageSelector;
 import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyStatus;
 import 
org.apache.rocketmq.client.consumer.listener.MessageListenerConcurrently;
 import org.apache.rocketmq.client.exception.MQClientException;
@@ -40,10 +42,26 @@ public class RocketMQConsumer extends DefaultConsumer 
implements Suspendable {
 
     private void startConsumer() throws MQClientException {
         mqPushConsumer = new DefaultMQPushConsumer(
-                null, endpoint.getConsumerGroup(),
+                endpoint.getConsumerGroup(),
                 RocketMQAclUtils.getAclRPCHook(getEndpoint().getAccessKey(), 
getEndpoint().getSecretKey()));
         mqPushConsumer.setNamesrvAddr(endpoint.getNamesrvAddr());
-        mqPushConsumer.subscribe(endpoint.getTopicName(), 
endpoint.getSubscribeTags());
+        mqPushConsumer.setNamespaceV2(endpoint.getNamespace());
+
+        MessageSelector messageSelector;
+        switch (endpoint.getMessageSelectorType().toLowerCase()) {
+            case "tag":
+                messageSelector = 
MessageSelector.byTag(endpoint.getSubscribeTags());
+                break;
+            case "sql":
+                messageSelector = 
MessageSelector.bySql(endpoint.getSubscribeSql());
+                break;
+            default:
+                throw new IllegalArgumentException("Unknown selector type: " + 
endpoint.getMessageSelectorType());
+        }
+        mqPushConsumer.setEnableTrace(endpoint.isEnableTrace());
+        
mqPushConsumer.setAccessChannel(AccessChannel.valueOf(endpoint.getAccessChannel()));
+        mqPushConsumer.subscribe(endpoint.getTopicName(), messageSelector);
+
         mqPushConsumer.registerMessageListener((MessageListenerConcurrently) 
(msgs, context) -> {
             MessageExt messageExt = msgs.get(0);
             Exchange exchange = 
endpoint.createRocketExchange(messageExt.getBody());
diff --git 
a/components/camel-rocketmq/src/main/java/org/apache/camel/component/rocketmq/RocketMQEndpoint.java
 
b/components/camel-rocketmq/src/main/java/org/apache/camel/component/rocketmq/RocketMQEndpoint.java
index 575645e1f0e..8e25b7a1b33 100644
--- 
a/components/camel-rocketmq/src/main/java/org/apache/camel/component/rocketmq/RocketMQEndpoint.java
+++ 
b/components/camel-rocketmq/src/main/java/org/apache/camel/component/rocketmq/RocketMQEndpoint.java
@@ -45,8 +45,12 @@ public class RocketMQEndpoint extends DefaultEndpoint 
implements AsyncEndpoint,
     private String producerGroup;
     @UriParam(label = "consumer")
     private String consumerGroup;
+    @UriParam(label = "consumer", defaultValue = "tag")
+    private String messageSelectorType = "tag";
     @UriParam(label = "consumer", defaultValue = "*")
     private String subscribeTags = "*";
+    @UriParam(label = "consumer", defaultValue = "1 = 1")
+    private String subscribeSql = "1 = 1";
     @UriParam(label = "producer")
     private String sendTag = "";
     @UriParam(label = "producer")
@@ -55,6 +59,12 @@ public class RocketMQEndpoint extends DefaultEndpoint 
implements AsyncEndpoint,
     private String replyToConsumerGroup;
     @UriParam(label = "common", defaultValue = "localhost:9876")
     private String namesrvAddr = "localhost:9876";
+    @UriParam(label = "common")
+    private String namespace;
+    @UriParam(label = "common", defaultValue = "false")
+    private boolean enableTrace = false;
+    @UriParam(label = "common", defaultValue = "LOCAL")
+    private String accessChannel = "LOCAL";
     @UriParam(label = "advanced", defaultValue = "10000")
     private long requestTimeoutMillis = 10000L;
     @UriParam(label = "advanced", defaultValue = "1000")
@@ -114,6 +124,29 @@ public class RocketMQEndpoint extends DefaultEndpoint 
implements AsyncEndpoint,
         this.topicName = topicName;
     }
 
+    public String getMessageSelectorType() {
+        return messageSelectorType;
+    }
+
+    /**
+     * Message Selector Type, TAG or SQL [TAG] by default
+     */
+    public void setMessageSelectorType(String messageSelectorType) {
+        this.messageSelectorType = messageSelectorType;
+    }
+
+    public String getSubscribeSql() {
+        return subscribeSql;
+    }
+
+    /**
+     * Subscribe SQL of consumer. See
+     * 
https://rocketmq.apache.org/docs/featureBehavior/07messagefilter/#attribute-based-sql-filtering
 for more details.
+     */
+    public void setSubscribeSql(String subscribeSql) {
+        this.subscribeSql = subscribeSql;
+    }
+
     public String getSubscribeTags() {
         return subscribeTags;
     }
@@ -147,6 +180,39 @@ public class RocketMQEndpoint extends DefaultEndpoint 
implements AsyncEndpoint,
         this.namesrvAddr = namesrvAddr;
     }
 
+    public String getNamespace() {
+        return namespace;
+    }
+
+    /**
+     * Namespace of RocketMQ cluster. You need to specify this if you are 
using serverless version of RocketMQ.
+     */
+    public void setNamespace(String namespace) {
+        this.namespace = namespace;
+    }
+
+    public boolean isEnableTrace() {
+        return enableTrace;
+    }
+
+    /**
+     * Whether to enable trace.
+     */
+    public void setEnableTrace(boolean enableTrace) {
+        this.enableTrace = enableTrace;
+    }
+
+    public String getAccessChannel() {
+        return accessChannel;
+    }
+
+    /**
+     * Access channel of RocketMQ cluster. LOCAL or CLOUD, [LOCAL] by default
+     */
+    public void setAccessChannel(String accessChannel) {
+        this.accessChannel = accessChannel;
+    }
+
     public String getProducerGroup() {
         return producerGroup;
     }
diff --git 
a/components/camel-rocketmq/src/main/java/org/apache/camel/component/rocketmq/RocketMQProducer.java
 
b/components/camel-rocketmq/src/main/java/org/apache/camel/component/rocketmq/RocketMQProducer.java
index defd5ea0ebd..0fd0f529c69 100644
--- 
a/components/camel-rocketmq/src/main/java/org/apache/camel/component/rocketmq/RocketMQProducer.java
+++ 
b/components/camel-rocketmq/src/main/java/org/apache/camel/component/rocketmq/RocketMQProducer.java
@@ -32,6 +32,7 @@ import org.apache.camel.component.rocketmq.reply.ReplyManager;
 import org.apache.camel.component.rocketmq.reply.RocketMQReplyManagerSupport;
 import org.apache.camel.support.DefaultAsyncProducer;
 import org.apache.camel.support.service.ServiceHelper;
+import org.apache.rocketmq.client.AccessChannel;
 import org.apache.rocketmq.client.exception.MQClientException;
 import org.apache.rocketmq.client.producer.DefaultMQProducer;
 import org.apache.rocketmq.client.producer.SendCallback;
@@ -228,9 +229,12 @@ public class RocketMQProducer extends DefaultAsyncProducer 
{
     @Override
     protected void doStart() throws Exception {
         this.mqProducer = new DefaultMQProducer(
-                null, getEndpoint().getProducerGroup(),
+                getEndpoint().getProducerGroup(),
                 RocketMQAclUtils.getAclRPCHook(getEndpoint().getAccessKey(), 
getEndpoint().getSecretKey()));
         this.mqProducer.setNamesrvAddr(getEndpoint().getNamesrvAddr());
+        this.mqProducer.setNamespaceV2(getEndpoint().getNamespace());
+        this.mqProducer.setEnableTrace(getEndpoint().isEnableTrace());
+        
this.mqProducer.setAccessChannel(AccessChannel.valueOf(getEndpoint().getAccessChannel()));
         this.mqProducer.start();
     }
 

Reply via email to