This is an automated email from the ASF dual-hosted git repository.

aaronai pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/rocketmq-clients.git


The following commit(s) were added to refs/heads/master by this push:
     new 57c5935b Bump rocketmq-proto to 2.0.2 (#364)
57c5935b is described below

commit 57c5935bc5a820b3e66429075d375a9bdf72b4be
Author: Aaron Ai <[email protected]>
AuthorDate: Thu Feb 23 19:50:28 2023 +0800

    Bump rocketmq-proto to 2.0.2 (#364)
---
 .../org/apache/rocketmq/client/java/impl/consumer/ConsumerImpl.java | 6 ++++--
 .../apache/rocketmq/client/java/impl/consumer/ProcessQueueImpl.java | 6 ++++--
 .../rocketmq/client/java/impl/consumer/SimpleConsumerImpl.java      | 2 +-
 .../rocketmq/client/java/impl/consumer/ProcessQueueImplTest.java    | 2 +-
 java/pom.xml                                                        | 2 +-
 5 files changed, 11 insertions(+), 7 deletions(-)

diff --git 
a/java/client/src/main/java/org/apache/rocketmq/client/java/impl/consumer/ConsumerImpl.java
 
b/java/client/src/main/java/org/apache/rocketmq/client/java/impl/consumer/ConsumerImpl.java
index c4ee9751..3efcb540 100644
--- 
a/java/client/src/main/java/org/apache/rocketmq/client/java/impl/consumer/ConsumerImpl.java
+++ 
b/java/client/src/main/java/org/apache/rocketmq/client/java/impl/consumer/ConsumerImpl.java
@@ -242,17 +242,19 @@ abstract class ConsumerImpl extends ClientImpl {
     }
 
     ReceiveMessageRequest wrapReceiveMessageRequest(int batchSize, 
MessageQueueImpl mq,
-        FilterExpression filterExpression) {
+        FilterExpression filterExpression, Duration longPollingTimeout) {
         return ReceiveMessageRequest.newBuilder().setGroup(getProtobufGroup())
             
.setMessageQueue(mq.toProtobuf()).setFilterExpression(wrapFilterExpression(filterExpression))
+            
.setLongPollingTimeout(Durations.fromNanos(longPollingTimeout.toNanos()))
             .setBatchSize(batchSize).setAutoRenew(true).build();
     }
 
     ReceiveMessageRequest wrapReceiveMessageRequest(int batchSize, 
MessageQueueImpl mq,
-        FilterExpression filterExpression, Duration invisibleDuration) {
+        FilterExpression filterExpression, Duration invisibleDuration, 
Duration longPollingTimeout) {
         final com.google.protobuf.Duration duration = 
Durations.fromNanos(invisibleDuration.toNanos());
         return ReceiveMessageRequest.newBuilder().setGroup(getProtobufGroup())
             
.setMessageQueue(mq.toProtobuf()).setFilterExpression(wrapFilterExpression(filterExpression))
+            
.setLongPollingTimeout(Durations.fromNanos(longPollingTimeout.toNanos()))
             
.setBatchSize(batchSize).setAutoRenew(false).setInvisibleDuration(duration).build();
     }
 }
diff --git 
a/java/client/src/main/java/org/apache/rocketmq/client/java/impl/consumer/ProcessQueueImpl.java
 
b/java/client/src/main/java/org/apache/rocketmq/client/java/impl/consumer/ProcessQueueImpl.java
index 8443c293..ff4a13dc 100644
--- 
a/java/client/src/main/java/org/apache/rocketmq/client/java/impl/consumer/ProcessQueueImpl.java
+++ 
b/java/client/src/main/java/org/apache/rocketmq/client/java/impl/consumer/ProcessQueueImpl.java
@@ -219,7 +219,9 @@ class ProcessQueueImpl implements ProcessQueue {
         try {
             final Endpoints endpoints = mq.getBroker().getEndpoints();
             final int batchSize = this.getReceptionBatchSize();
-            final ReceiveMessageRequest request = 
consumer.wrapReceiveMessageRequest(batchSize, mq, filterExpression);
+            final Duration longPollingTimeout = 
consumer.getPushConsumerSettings().getLongPollingTimeout();
+            final ReceiveMessageRequest request = 
consumer.wrapReceiveMessageRequest(batchSize, mq, filterExpression,
+                longPollingTimeout);
             activityNanoTime = System.nanoTime();
 
             // Intercept before message reception.
@@ -227,7 +229,7 @@ class ProcessQueueImpl implements ProcessQueue {
             consumer.doBefore(context, Collections.emptyList());
 
             final ListenableFuture<ReceiveMessageResult> future = 
consumer.receiveMessage(request, mq,
-                consumer.getPushConsumerSettings().getLongPollingTimeout());
+                longPollingTimeout);
             Futures.addCallback(future, new 
FutureCallback<ReceiveMessageResult>() {
                 @Override
                 public void onSuccess(ReceiveMessageResult result) {
diff --git 
a/java/client/src/main/java/org/apache/rocketmq/client/java/impl/consumer/SimpleConsumerImpl.java
 
b/java/client/src/main/java/org/apache/rocketmq/client/java/impl/consumer/SimpleConsumerImpl.java
index 4b7ddb13..5d6092a8 100644
--- 
a/java/client/src/main/java/org/apache/rocketmq/client/java/impl/consumer/SimpleConsumerImpl.java
+++ 
b/java/client/src/main/java/org/apache/rocketmq/client/java/impl/consumer/SimpleConsumerImpl.java
@@ -202,7 +202,7 @@ class SimpleConsumerImpl extends ConsumerImpl implements 
SimpleConsumer {
         final ListenableFuture<ReceiveMessageResult> future0 = 
Futures.transformAsync(routeFuture, result -> {
             final MessageQueueImpl mq = result.takeMessageQueue();
             final ReceiveMessageRequest request = 
wrapReceiveMessageRequest(maxMessageNum, mq, filterExpression,
-                invisibleDuration);
+                invisibleDuration, awaitDuration);
             return receiveMessage(request, mq, awaitDuration);
         }, MoreExecutors.directExecutor());
         return Futures.transformAsync(future0, result -> 
Futures.immediateFuture(result.getMessageViews()),
diff --git 
a/java/client/src/test/java/org/apache/rocketmq/client/java/impl/consumer/ProcessQueueImplTest.java
 
b/java/client/src/test/java/org/apache/rocketmq/client/java/impl/consumer/ProcessQueueImplTest.java
index 4e497cd9..b4a29717 100644
--- 
a/java/client/src/test/java/org/apache/rocketmq/client/java/impl/consumer/ProcessQueueImplTest.java
+++ 
b/java/client/src/test/java/org/apache/rocketmq/client/java/impl/consumer/ProcessQueueImplTest.java
@@ -134,7 +134,7 @@ public class ProcessQueueImplTest extends TestBase {
         when(pushSubscriptionSettings.getReceiveBatchSize()).thenReturn(32);
         ReceiveMessageRequest request = 
ReceiveMessageRequest.newBuilder().build();
         when(pushConsumer.wrapReceiveMessageRequest(anyInt(), 
any(MessageQueueImpl.class),
-            any(FilterExpression.class))).thenReturn(request);
+            any(FilterExpression.class), 
any(Duration.class))).thenReturn(request);
         processQueue.fetchMessageImmediately();
         await().atMost(Duration.ofSeconds(3))
             .untilAsserted(() -> verify(pushConsumer, 
times(cachedMessagesCountThresholdPerQueue))
diff --git a/java/pom.xml b/java/pom.xml
index 689b3df1..b97290e6 100644
--- a/java/pom.xml
+++ b/java/pom.xml
@@ -47,7 +47,7 @@
            ~  1. Whether it is essential, because the current shaded jar is 
fat enough.
            ~  2. Make sure that it is compatible with Java 8.
          -->
-        <rocketmq-proto.version>2.0.1</rocketmq-proto.version>
+        <rocketmq-proto.version>2.0.2</rocketmq-proto.version>
         <annotations-api.version>6.0.53</annotations-api.version>
         <protobuf.version>3.21.7</protobuf.version>
         <grpc.version>1.50.0</grpc.version>

Reply via email to