This is an automated email from the ASF dual-hosted git repository.

lollipop pushed a commit to branch develop
in repository https://gitbox.apache.org/repos/asf/rocketmq.git


The following commit(s) were added to refs/heads/develop by this push:
     new 804847e877 test (#9010)
804847e877 is described below

commit 804847e87765f835afa147887f9507b8a41ae08c
Author: qianye <wuxingcan....@alibaba-inc.com>
AuthorDate: Fri Nov 29 17:18:49 2024 +0800

    test (#9010)
---
 .../client/consumer/DefaultMQPullConsumer.java     | 29 ++++++++++++++++++++++
 .../impl/consumer/DefaultMQPullConsumerImpl.java   |  9 +++++--
 2 files changed, 36 insertions(+), 2 deletions(-)

diff --git 
a/client/src/main/java/org/apache/rocketmq/client/consumer/DefaultMQPullConsumer.java
 
b/client/src/main/java/org/apache/rocketmq/client/consumer/DefaultMQPullConsumer.java
index 9e7a86d9b4..38841e4128 100644
--- 
a/client/src/main/java/org/apache/rocketmq/client/consumer/DefaultMQPullConsumer.java
+++ 
b/client/src/main/java/org/apache/rocketmq/client/consumer/DefaultMQPullConsumer.java
@@ -16,9 +16,11 @@
  */
 package org.apache.rocketmq.client.consumer;
 
+import java.util.Collections;
 import java.util.HashSet;
 import java.util.Map;
 import java.util.Set;
+import java.util.concurrent.ConcurrentHashMap;
 import org.apache.rocketmq.client.ClientConfig;
 import org.apache.rocketmq.client.QueryResult;
 import 
org.apache.rocketmq.client.consumer.rebalance.AllocateMessageQueueAveragely;
@@ -33,7 +35,9 @@ import org.apache.rocketmq.common.message.MessageQueue;
 import org.apache.rocketmq.remoting.RPCHook;
 import org.apache.rocketmq.remoting.exception.RemotingException;
 import org.apache.rocketmq.remoting.protocol.NamespaceUtil;
+import org.apache.rocketmq.remoting.protocol.filter.FilterAPI;
 import org.apache.rocketmq.remoting.protocol.heartbeat.MessageModel;
+import org.apache.rocketmq.remoting.protocol.heartbeat.SubscriptionData;
 
 /**
  * @deprecated Default pulling consumer. This class will be removed in 2022, 
and a better implementation
@@ -77,6 +81,8 @@ public class DefaultMQPullConsumer extends ClientConfig 
implements MQPullConsume
      * Topic set you want to register
      */
     private Set<String> registerTopics = new HashSet<>();
+
+    private final Set<SubscriptionData> registerSubscriptions = 
Collections.newSetFromMap(new ConcurrentHashMap<>());
     /**
      * Queue allocation algorithm
      */
@@ -255,6 +261,29 @@ public class DefaultMQPullConsumer extends ClientConfig 
implements MQPullConsume
         this.registerTopics = withNamespace(registerTopics);
     }
 
+    public Set<SubscriptionData> getRegisterSubscriptions() {
+        return registerSubscriptions;
+    }
+
+    public void addRegisterSubscriptions(String topic, MessageSelector 
messageSelector) throws MQClientException {
+        try {
+            if (messageSelector == null) {
+                messageSelector = 
MessageSelector.byTag(SubscriptionData.SUB_ALL);
+            }
+
+            SubscriptionData subscriptionData = 
FilterAPI.build(withNamespace(topic),
+                messageSelector.getExpression(), 
messageSelector.getExpressionType());
+
+            this.registerSubscriptions.add(subscriptionData);
+        } catch (Exception e) {
+            throw new MQClientException("add subscription exception", e);
+        }
+    }
+
+    public void clearRegisterSubscriptions() {
+        this.registerSubscriptions.clear();
+    }
+
     /**
      * This method will be removed or it's visibility will be changed in a 
certain version after April 5, 2020, so
      * please do not use this method.
diff --git 
a/client/src/main/java/org/apache/rocketmq/client/impl/consumer/DefaultMQPullConsumerImpl.java
 
b/client/src/main/java/org/apache/rocketmq/client/impl/consumer/DefaultMQPullConsumerImpl.java
index 371a4a0dbd..9d46e28f5d 100644
--- 
a/client/src/main/java/org/apache/rocketmq/client/impl/consumer/DefaultMQPullConsumerImpl.java
+++ 
b/client/src/main/java/org/apache/rocketmq/client/impl/consumer/DefaultMQPullConsumerImpl.java
@@ -54,6 +54,8 @@ import org.apache.rocketmq.common.message.MessageConst;
 import org.apache.rocketmq.common.message.MessageExt;
 import org.apache.rocketmq.common.message.MessageQueue;
 import org.apache.rocketmq.common.sysflag.PullSysFlag;
+import org.apache.rocketmq.logging.org.slf4j.Logger;
+import org.apache.rocketmq.logging.org.slf4j.LoggerFactory;
 import org.apache.rocketmq.remoting.RPCHook;
 import org.apache.rocketmq.remoting.common.RemotingHelper;
 import org.apache.rocketmq.remoting.exception.RemotingException;
@@ -63,8 +65,6 @@ import org.apache.rocketmq.remoting.protocol.filter.FilterAPI;
 import org.apache.rocketmq.remoting.protocol.heartbeat.ConsumeType;
 import org.apache.rocketmq.remoting.protocol.heartbeat.MessageModel;
 import org.apache.rocketmq.remoting.protocol.heartbeat.SubscriptionData;
-import org.apache.rocketmq.logging.org.slf4j.Logger;
-import org.apache.rocketmq.logging.org.slf4j.LoggerFactory;
 
 /**
  * This class will be removed in 2022, and a better implementation {@link 
DefaultLitePullConsumerImpl} is recommend to use
@@ -356,6 +356,11 @@ public class DefaultMQPullConsumerImpl implements 
MQConsumerInner {
 
     @Override
     public Set<SubscriptionData> subscriptions() {
+        Set<SubscriptionData> registerSubscriptions = 
defaultMQPullConsumer.getRegisterSubscriptions();
+        if (registerSubscriptions != null && !registerSubscriptions.isEmpty()) 
{
+            return registerSubscriptions;
+        }
+
         Set<SubscriptionData> result = new HashSet<>();
 
         Set<String> topics = this.defaultMQPullConsumer.getRegisterTopics();

Reply via email to