This is an automated email from the ASF dual-hosted git repository.

aaronai pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/rocketmq-clients.git


The following commit(s) were added to refs/heads/master by this push:
     new 3677e36  Add RPC timeout tolerance for long-polling (#101)
3677e36 is described below

commit 3677e36a75d2f7c5e47b3019483ee53ba217a4ba
Author: Aaron Ai <[email protected]>
AuthorDate: Fri Jul 29 20:34:50 2022 +0800

    Add RPC timeout tolerance for long-polling (#101)
---
 .../org/apache/rocketmq/client/java/impl/consumer/ConsumerImpl.java   | 4 ++--
 1 file changed, 2 insertions(+), 2 deletions(-)

diff --git 
a/java/client/src/main/java/org/apache/rocketmq/client/java/impl/consumer/ConsumerImpl.java
 
b/java/client/src/main/java/org/apache/rocketmq/client/java/impl/consumer/ConsumerImpl.java
index 2b962ed..fbc6191 100644
--- 
a/java/client/src/main/java/org/apache/rocketmq/client/java/impl/consumer/ConsumerImpl.java
+++ 
b/java/client/src/main/java/org/apache/rocketmq/client/java/impl/consumer/ConsumerImpl.java
@@ -65,7 +65,6 @@ import org.slf4j.LoggerFactory;
 abstract class ConsumerImpl extends ClientImpl {
     static final Pattern CONSUMER_GROUP_PATTERN = 
Pattern.compile("^[%a-zA-Z0-9_-]+$");
     private static final Logger LOGGER = 
LoggerFactory.getLogger(ConsumerImpl.class);
-    private static final Duration LONG_POLLING_TOLERANCE = 
Duration.ofMillis(500);
     private final String consumerGroup;
 
     ConsumerImpl(ClientConfiguration clientConfiguration, String 
consumerGroup, Set<String> topics) {
@@ -80,7 +79,8 @@ abstract class ConsumerImpl extends ClientImpl {
         try {
             Metadata metadata = sign();
             final Endpoints endpoints = mq.getBroker().getEndpoints();
-            final Duration timeout = Duration.ofNanos(awaitDuration.toNanos() 
+ LONG_POLLING_TOLERANCE.toNanos());
+            final Duration tolerance = clientConfiguration.getRequestTimeout();
+            final Duration timeout = Duration.ofNanos(awaitDuration.toNanos() 
+ tolerance.toNanos());
             final 
ListenableFuture<RpcInvocation<Iterator<ReceiveMessageResponse>>> future =
                 clientManager.receiveMessage(endpoints, metadata, request, 
timeout);
             return Futures.transformAsync(future, context -> {

Reply via email to