This is an automated email from the ASF dual-hosted git repository. dinglei pushed a commit to branch develop in repository https://gitbox.apache.org/repos/asf/rocketmq.git
The following commit(s) were added to refs/heads/develop by this push: new f7dbc94ad7 Add expression filtering capability to the pullBlockIfNotFound method of pull consumer (#8024) f7dbc94ad7 is described below commit f7dbc94ad715143ad610e026f73a3d60a01204d6 Author: rongtong <jinrongto...@163.com> AuthorDate: Fri Apr 19 13:41:43 2024 +0800 Add expression filtering capability to the pullBlockIfNotFound method of pull consumer (#8024) --- .../client/consumer/DefaultMQPullConsumer.java | 18 ++++++++++++++++-- .../rocketmq/client/consumer/MQPullConsumer.java | 22 +++++++++++++++++++--- .../impl/consumer/DefaultMQPullConsumerImpl.java | 15 +++++++++++++++ 3 files changed, 50 insertions(+), 5 deletions(-) diff --git a/client/src/main/java/org/apache/rocketmq/client/consumer/DefaultMQPullConsumer.java b/client/src/main/java/org/apache/rocketmq/client/consumer/DefaultMQPullConsumer.java index b4ca6ab3b3..089fd39b3e 100644 --- a/client/src/main/java/org/apache/rocketmq/client/consumer/DefaultMQPullConsumer.java +++ b/client/src/main/java/org/apache/rocketmq/client/consumer/DefaultMQPullConsumer.java @@ -36,8 +36,8 @@ import org.apache.rocketmq.remoting.protocol.NamespaceUtil; import org.apache.rocketmq.remoting.protocol.heartbeat.MessageModel; /** - * @deprecated Default pulling consumer. This class will be removed in 2022, and a better implementation {@link - * DefaultLitePullConsumer} is recommend to use in the scenario of actively pulling messages. + * @deprecated Default pulling consumer. This class will be removed in 2022, and a better implementation + * {@link DefaultLitePullConsumer} is recommend to use in the scenario of actively pulling messages. */ @Deprecated public class DefaultMQPullConsumer extends ClientConfig implements MQPullConsumer { @@ -375,6 +375,20 @@ public class DefaultMQPullConsumer extends ClientConfig implements MQPullConsume this.defaultMQPullConsumerImpl.pullBlockIfNotFound(queueWithNamespace(mq), subExpression, offset, maxNums, pullCallback); } + @Override + public void pullBlockIfNotFoundWithMessageSelector(MessageQueue mq, MessageSelector selector, + long offset, int maxNums, + PullCallback pullCallback) throws MQClientException, RemotingException, InterruptedException { + this.defaultMQPullConsumerImpl.pullBlockIfNotFoundWithMessageSelector(mq, selector, offset, maxNums, pullCallback); + } + + @Override + public PullResult pullBlockIfNotFoundWithMessageSelector(MessageQueue mq, MessageSelector selector, + long offset, + int maxNums) throws MQClientException, RemotingException, MQBrokerException, InterruptedException { + return this.defaultMQPullConsumerImpl.pullBlockIfNotFoundWithMessageSelector(mq, selector, offset, maxNums); + } + @Override public void updateConsumeOffset(MessageQueue mq, long offset) throws MQClientException { this.defaultMQPullConsumerImpl.updateConsumeOffset(queueWithNamespace(mq), offset); diff --git a/client/src/main/java/org/apache/rocketmq/client/consumer/MQPullConsumer.java b/client/src/main/java/org/apache/rocketmq/client/consumer/MQPullConsumer.java index 868ee93ff8..ee77b12bbc 100644 --- a/client/src/main/java/org/apache/rocketmq/client/consumer/MQPullConsumer.java +++ b/client/src/main/java/org/apache/rocketmq/client/consumer/MQPullConsumer.java @@ -47,8 +47,7 @@ public interface MQPullConsumer extends MQConsumer { * * @param mq from which message queue * @param subExpression subscription expression.it only support or operation such as "tag1 || tag2 || tag3" <br> if - * null or * expression,meaning subscribe - * all + * null or * expression,meaning subscribe all * @param offset from where to pull * @param maxNums max pulling numbers * @return The resulting {@code PullRequest} @@ -121,7 +120,7 @@ public interface MQPullConsumer extends MQConsumer { InterruptedException; /** - * Pulling the messages in a async. way. Support message selection + * Pulling the messages in a async way. Support message selection */ void pull(final MessageQueue mq, final MessageSelector selector, final long offset, final int maxNums, final PullCallback pullCallback) throws MQClientException, RemotingException, @@ -150,6 +149,23 @@ public interface MQPullConsumer extends MQConsumer { final int maxNums, final PullCallback pullCallback) throws MQClientException, RemotingException, InterruptedException; + /** + * Pulling the messages through callback function,if no message arrival,blocking. Support message selection + */ + void pullBlockIfNotFoundWithMessageSelector(final MessageQueue mq, final MessageSelector selector, + final long offset, final int maxNums, + final PullCallback pullCallback) throws MQClientException, RemotingException, + InterruptedException; + + /** + * Pulling the messages,if no message arrival,blocking some time. Support message selection + * + * @return The resulting {@code PullRequest} + */ + PullResult pullBlockIfNotFoundWithMessageSelector(final MessageQueue mq, final MessageSelector selector, + final long offset, final int maxNums) throws MQClientException, RemotingException, + MQBrokerException, InterruptedException; + /** * Update the offset */ diff --git a/client/src/main/java/org/apache/rocketmq/client/impl/consumer/DefaultMQPullConsumerImpl.java b/client/src/main/java/org/apache/rocketmq/client/impl/consumer/DefaultMQPullConsumerImpl.java index 91d72989ca..c877ccc070 100644 --- a/client/src/main/java/org/apache/rocketmq/client/impl/consumer/DefaultMQPullConsumerImpl.java +++ b/client/src/main/java/org/apache/rocketmq/client/impl/consumer/DefaultMQPullConsumerImpl.java @@ -589,6 +589,21 @@ public class DefaultMQPullConsumerImpl implements MQConsumerInner { this.getDefaultMQPullConsumer().getConsumerPullTimeoutMillis()); } + public void pullBlockIfNotFoundWithMessageSelector(MessageQueue mq, MessageSelector messageSelector, long offset, int maxNums, + PullCallback pullCallback) + throws MQClientException, RemotingException, InterruptedException { + SubscriptionData subscriptionData = getSubscriptionData(mq, messageSelector); + this.pullAsyncImpl(mq, subscriptionData, offset, maxNums, pullCallback, true, + this.getDefaultMQPullConsumer().getConsumerPullTimeoutMillis()); + } + + public PullResult pullBlockIfNotFoundWithMessageSelector(MessageQueue mq, MessageSelector messageSelector, long offset, int maxNums) + throws MQClientException, RemotingException, InterruptedException, MQBrokerException { + SubscriptionData subscriptionData = getSubscriptionData(mq, messageSelector); + return this.pullSyncImpl(mq, subscriptionData, offset, maxNums, true, this.getDefaultMQPullConsumer().getConsumerPullTimeoutMillis()); + } + + public QueryResult queryMessage(String topic, String key, int maxNum, long begin, long end) throws MQClientException, InterruptedException { this.isRunning();