This is an automated email from the ASF dual-hosted git repository.

dinglei pushed a commit to branch develop
in repository https://gitbox.apache.org/repos/asf/rocketmq.git


The following commit(s) were added to refs/heads/develop by this push:
     new f7dbc94ad7 Add expression filtering capability to the 
pullBlockIfNotFound method of pull consumer (#8024)
f7dbc94ad7 is described below

commit f7dbc94ad715143ad610e026f73a3d60a01204d6
Author: rongtong <jinrongto...@163.com>
AuthorDate: Fri Apr 19 13:41:43 2024 +0800

    Add expression filtering capability to the pullBlockIfNotFound method of 
pull consumer (#8024)
---
 .../client/consumer/DefaultMQPullConsumer.java     | 18 ++++++++++++++++--
 .../rocketmq/client/consumer/MQPullConsumer.java   | 22 +++++++++++++++++++---
 .../impl/consumer/DefaultMQPullConsumerImpl.java   | 15 +++++++++++++++
 3 files changed, 50 insertions(+), 5 deletions(-)

diff --git 
a/client/src/main/java/org/apache/rocketmq/client/consumer/DefaultMQPullConsumer.java
 
b/client/src/main/java/org/apache/rocketmq/client/consumer/DefaultMQPullConsumer.java
index b4ca6ab3b3..089fd39b3e 100644
--- 
a/client/src/main/java/org/apache/rocketmq/client/consumer/DefaultMQPullConsumer.java
+++ 
b/client/src/main/java/org/apache/rocketmq/client/consumer/DefaultMQPullConsumer.java
@@ -36,8 +36,8 @@ import org.apache.rocketmq.remoting.protocol.NamespaceUtil;
 import org.apache.rocketmq.remoting.protocol.heartbeat.MessageModel;
 
 /**
- * @deprecated Default pulling consumer. This class will be removed in 2022, 
and a better implementation {@link
- * DefaultLitePullConsumer} is recommend to use in the scenario of actively 
pulling messages.
+ * @deprecated Default pulling consumer. This class will be removed in 2022, 
and a better implementation
+ * {@link DefaultLitePullConsumer} is recommend to use in the scenario of 
actively pulling messages.
  */
 @Deprecated
 public class DefaultMQPullConsumer extends ClientConfig implements 
MQPullConsumer {
@@ -375,6 +375,20 @@ public class DefaultMQPullConsumer extends ClientConfig 
implements MQPullConsume
         
this.defaultMQPullConsumerImpl.pullBlockIfNotFound(queueWithNamespace(mq), 
subExpression, offset, maxNums, pullCallback);
     }
 
+    @Override
+    public void pullBlockIfNotFoundWithMessageSelector(MessageQueue mq, 
MessageSelector selector,
+        long offset, int maxNums,
+        PullCallback pullCallback) throws MQClientException, 
RemotingException, InterruptedException {
+        
this.defaultMQPullConsumerImpl.pullBlockIfNotFoundWithMessageSelector(mq, 
selector, offset, maxNums, pullCallback);
+    }
+
+    @Override
+    public PullResult pullBlockIfNotFoundWithMessageSelector(MessageQueue mq, 
MessageSelector selector,
+        long offset,
+        int maxNums) throws MQClientException, RemotingException, 
MQBrokerException, InterruptedException {
+        return 
this.defaultMQPullConsumerImpl.pullBlockIfNotFoundWithMessageSelector(mq, 
selector, offset, maxNums);
+    }
+
     @Override
     public void updateConsumeOffset(MessageQueue mq, long offset) throws 
MQClientException {
         
this.defaultMQPullConsumerImpl.updateConsumeOffset(queueWithNamespace(mq), 
offset);
diff --git 
a/client/src/main/java/org/apache/rocketmq/client/consumer/MQPullConsumer.java 
b/client/src/main/java/org/apache/rocketmq/client/consumer/MQPullConsumer.java
index 868ee93ff8..ee77b12bbc 100644
--- 
a/client/src/main/java/org/apache/rocketmq/client/consumer/MQPullConsumer.java
+++ 
b/client/src/main/java/org/apache/rocketmq/client/consumer/MQPullConsumer.java
@@ -47,8 +47,7 @@ public interface MQPullConsumer extends MQConsumer {
      *
      * @param mq from which message queue
      * @param subExpression subscription expression.it only support or 
operation such as "tag1 || tag2 || tag3" <br> if
-     * null or * expression,meaning subscribe
-     * all
+     * null or * expression,meaning subscribe all
      * @param offset from where to pull
      * @param maxNums max pulling numbers
      * @return The resulting {@code PullRequest}
@@ -121,7 +120,7 @@ public interface MQPullConsumer extends MQConsumer {
         InterruptedException;
 
     /**
-     * Pulling the messages in a async. way. Support message selection
+     * Pulling the messages in a async way. Support message selection
      */
     void pull(final MessageQueue mq, final MessageSelector selector, final 
long offset, final int maxNums,
         final PullCallback pullCallback) throws MQClientException, 
RemotingException,
@@ -150,6 +149,23 @@ public interface MQPullConsumer extends MQConsumer {
         final int maxNums, final PullCallback pullCallback) throws 
MQClientException, RemotingException,
         InterruptedException;
 
+    /**
+     * Pulling the messages through callback function,if no message 
arrival,blocking. Support message selection
+     */
+    void pullBlockIfNotFoundWithMessageSelector(final MessageQueue mq, final 
MessageSelector selector,
+        final long offset, final int maxNums,
+        final PullCallback pullCallback) throws MQClientException, 
RemotingException,
+        InterruptedException;
+
+    /**
+     * Pulling the messages,if no message arrival,blocking some time. Support 
message selection
+     *
+     * @return The resulting {@code PullRequest}
+     */
+    PullResult pullBlockIfNotFoundWithMessageSelector(final MessageQueue mq, 
final MessageSelector selector,
+        final long offset, final int maxNums) throws MQClientException, 
RemotingException,
+        MQBrokerException, InterruptedException;
+
     /**
      * Update the offset
      */
diff --git 
a/client/src/main/java/org/apache/rocketmq/client/impl/consumer/DefaultMQPullConsumerImpl.java
 
b/client/src/main/java/org/apache/rocketmq/client/impl/consumer/DefaultMQPullConsumerImpl.java
index 91d72989ca..c877ccc070 100644
--- 
a/client/src/main/java/org/apache/rocketmq/client/impl/consumer/DefaultMQPullConsumerImpl.java
+++ 
b/client/src/main/java/org/apache/rocketmq/client/impl/consumer/DefaultMQPullConsumerImpl.java
@@ -589,6 +589,21 @@ public class DefaultMQPullConsumerImpl implements 
MQConsumerInner {
             this.getDefaultMQPullConsumer().getConsumerPullTimeoutMillis());
     }
 
+    public void pullBlockIfNotFoundWithMessageSelector(MessageQueue mq, 
MessageSelector messageSelector, long offset, int maxNums,
+        PullCallback pullCallback)
+        throws MQClientException, RemotingException, InterruptedException {
+        SubscriptionData subscriptionData = getSubscriptionData(mq, 
messageSelector);
+        this.pullAsyncImpl(mq, subscriptionData, offset, maxNums, 
pullCallback, true,
+            this.getDefaultMQPullConsumer().getConsumerPullTimeoutMillis());
+    }
+
+    public PullResult pullBlockIfNotFoundWithMessageSelector(MessageQueue mq, 
MessageSelector messageSelector, long offset, int maxNums)
+        throws MQClientException, RemotingException, InterruptedException, 
MQBrokerException {
+        SubscriptionData subscriptionData = getSubscriptionData(mq, 
messageSelector);
+        return this.pullSyncImpl(mq, subscriptionData, offset, maxNums, true, 
this.getDefaultMQPullConsumer().getConsumerPullTimeoutMillis());
+    }
+
+
     public QueryResult queryMessage(String topic, String key, int maxNum, long 
begin, long end)
         throws MQClientException, InterruptedException {
         this.isRunning();

Reply via email to