This is an automated email from the ASF dual-hosted git repository.
aaronai pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/rocketmq-clients.git
The following commit(s) were added to refs/heads/master by this push:
new 3677e36 Add RPC timeout tolerance for long-polling (#101)
3677e36 is described below
commit 3677e36a75d2f7c5e47b3019483ee53ba217a4ba
Author: Aaron Ai <[email protected]>
AuthorDate: Fri Jul 29 20:34:50 2022 +0800
Add RPC timeout tolerance for long-polling (#101)
---
.../org/apache/rocketmq/client/java/impl/consumer/ConsumerImpl.java | 4 ++--
1 file changed, 2 insertions(+), 2 deletions(-)
diff --git
a/java/client/src/main/java/org/apache/rocketmq/client/java/impl/consumer/ConsumerImpl.java
b/java/client/src/main/java/org/apache/rocketmq/client/java/impl/consumer/ConsumerImpl.java
index 2b962ed..fbc6191 100644
---
a/java/client/src/main/java/org/apache/rocketmq/client/java/impl/consumer/ConsumerImpl.java
+++
b/java/client/src/main/java/org/apache/rocketmq/client/java/impl/consumer/ConsumerImpl.java
@@ -65,7 +65,6 @@ import org.slf4j.LoggerFactory;
abstract class ConsumerImpl extends ClientImpl {
static final Pattern CONSUMER_GROUP_PATTERN =
Pattern.compile("^[%a-zA-Z0-9_-]+$");
private static final Logger LOGGER =
LoggerFactory.getLogger(ConsumerImpl.class);
- private static final Duration LONG_POLLING_TOLERANCE =
Duration.ofMillis(500);
private final String consumerGroup;
ConsumerImpl(ClientConfiguration clientConfiguration, String
consumerGroup, Set<String> topics) {
@@ -80,7 +79,8 @@ abstract class ConsumerImpl extends ClientImpl {
try {
Metadata metadata = sign();
final Endpoints endpoints = mq.getBroker().getEndpoints();
- final Duration timeout = Duration.ofNanos(awaitDuration.toNanos()
+ LONG_POLLING_TOLERANCE.toNanos());
+ final Duration tolerance = clientConfiguration.getRequestTimeout();
+ final Duration timeout = Duration.ofNanos(awaitDuration.toNanos()
+ tolerance.toNanos());
final
ListenableFuture<RpcInvocation<Iterator<ReceiveMessageResponse>>> future =
clientManager.receiveMessage(endpoints, metadata, request,
timeout);
return Futures.transformAsync(future, context -> {