This is an automated email from the ASF dual-hosted git repository. lollipop pushed a commit to branch develop in repository https://gitbox.apache.org/repos/asf/rocketmq.git
The following commit(s) were added to refs/heads/develop by this push: new 804847e877 test (#9010) 804847e877 is described below commit 804847e87765f835afa147887f9507b8a41ae08c Author: qianye <wuxingcan....@alibaba-inc.com> AuthorDate: Fri Nov 29 17:18:49 2024 +0800 test (#9010) --- .../client/consumer/DefaultMQPullConsumer.java | 29 ++++++++++++++++++++++ .../impl/consumer/DefaultMQPullConsumerImpl.java | 9 +++++-- 2 files changed, 36 insertions(+), 2 deletions(-) diff --git a/client/src/main/java/org/apache/rocketmq/client/consumer/DefaultMQPullConsumer.java b/client/src/main/java/org/apache/rocketmq/client/consumer/DefaultMQPullConsumer.java index 9e7a86d9b4..38841e4128 100644 --- a/client/src/main/java/org/apache/rocketmq/client/consumer/DefaultMQPullConsumer.java +++ b/client/src/main/java/org/apache/rocketmq/client/consumer/DefaultMQPullConsumer.java @@ -16,9 +16,11 @@ */ package org.apache.rocketmq.client.consumer; +import java.util.Collections; import java.util.HashSet; import java.util.Map; import java.util.Set; +import java.util.concurrent.ConcurrentHashMap; import org.apache.rocketmq.client.ClientConfig; import org.apache.rocketmq.client.QueryResult; import org.apache.rocketmq.client.consumer.rebalance.AllocateMessageQueueAveragely; @@ -33,7 +35,9 @@ import org.apache.rocketmq.common.message.MessageQueue; import org.apache.rocketmq.remoting.RPCHook; import org.apache.rocketmq.remoting.exception.RemotingException; import org.apache.rocketmq.remoting.protocol.NamespaceUtil; +import org.apache.rocketmq.remoting.protocol.filter.FilterAPI; import org.apache.rocketmq.remoting.protocol.heartbeat.MessageModel; +import org.apache.rocketmq.remoting.protocol.heartbeat.SubscriptionData; /** * @deprecated Default pulling consumer. This class will be removed in 2022, and a better implementation @@ -77,6 +81,8 @@ public class DefaultMQPullConsumer extends ClientConfig implements MQPullConsume * Topic set you want to register */ private Set<String> registerTopics = new HashSet<>(); + + private final Set<SubscriptionData> registerSubscriptions = Collections.newSetFromMap(new ConcurrentHashMap<>()); /** * Queue allocation algorithm */ @@ -255,6 +261,29 @@ public class DefaultMQPullConsumer extends ClientConfig implements MQPullConsume this.registerTopics = withNamespace(registerTopics); } + public Set<SubscriptionData> getRegisterSubscriptions() { + return registerSubscriptions; + } + + public void addRegisterSubscriptions(String topic, MessageSelector messageSelector) throws MQClientException { + try { + if (messageSelector == null) { + messageSelector = MessageSelector.byTag(SubscriptionData.SUB_ALL); + } + + SubscriptionData subscriptionData = FilterAPI.build(withNamespace(topic), + messageSelector.getExpression(), messageSelector.getExpressionType()); + + this.registerSubscriptions.add(subscriptionData); + } catch (Exception e) { + throw new MQClientException("add subscription exception", e); + } + } + + public void clearRegisterSubscriptions() { + this.registerSubscriptions.clear(); + } + /** * This method will be removed or it's visibility will be changed in a certain version after April 5, 2020, so * please do not use this method. diff --git a/client/src/main/java/org/apache/rocketmq/client/impl/consumer/DefaultMQPullConsumerImpl.java b/client/src/main/java/org/apache/rocketmq/client/impl/consumer/DefaultMQPullConsumerImpl.java index 371a4a0dbd..9d46e28f5d 100644 --- a/client/src/main/java/org/apache/rocketmq/client/impl/consumer/DefaultMQPullConsumerImpl.java +++ b/client/src/main/java/org/apache/rocketmq/client/impl/consumer/DefaultMQPullConsumerImpl.java @@ -54,6 +54,8 @@ import org.apache.rocketmq.common.message.MessageConst; import org.apache.rocketmq.common.message.MessageExt; import org.apache.rocketmq.common.message.MessageQueue; import org.apache.rocketmq.common.sysflag.PullSysFlag; +import org.apache.rocketmq.logging.org.slf4j.Logger; +import org.apache.rocketmq.logging.org.slf4j.LoggerFactory; import org.apache.rocketmq.remoting.RPCHook; import org.apache.rocketmq.remoting.common.RemotingHelper; import org.apache.rocketmq.remoting.exception.RemotingException; @@ -63,8 +65,6 @@ import org.apache.rocketmq.remoting.protocol.filter.FilterAPI; import org.apache.rocketmq.remoting.protocol.heartbeat.ConsumeType; import org.apache.rocketmq.remoting.protocol.heartbeat.MessageModel; import org.apache.rocketmq.remoting.protocol.heartbeat.SubscriptionData; -import org.apache.rocketmq.logging.org.slf4j.Logger; -import org.apache.rocketmq.logging.org.slf4j.LoggerFactory; /** * This class will be removed in 2022, and a better implementation {@link DefaultLitePullConsumerImpl} is recommend to use @@ -356,6 +356,11 @@ public class DefaultMQPullConsumerImpl implements MQConsumerInner { @Override public Set<SubscriptionData> subscriptions() { + Set<SubscriptionData> registerSubscriptions = defaultMQPullConsumer.getRegisterSubscriptions(); + if (registerSubscriptions != null && !registerSubscriptions.isEmpty()) { + return registerSubscriptions; + } + Set<SubscriptionData> result = new HashSet<>(); Set<String> topics = this.defaultMQPullConsumer.getRegisterTopics();