This is an automated email from the ASF dual-hosted git repository.
aaronai pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/rocketmq-clients.git
The following commit(s) were added to refs/heads/master by this push:
new cea5343e Implement the reentrant PushConsumer message receiving (#547)
cea5343e is described below
commit cea5343ea8a2d2fee2614be31164bf7844ef8930
Author: Aaron Ai <[email protected]>
AuthorDate: Wed Jun 21 10:55:47 2023 +0800
Implement the reentrant PushConsumer message receiving (#547)
---
.../client/java/impl/consumer/ConsumerImpl.java | 6 ++-
.../java/impl/consumer/ProcessQueueImpl.java | 48 ++++++++++++++++------
.../java/impl/consumer/ConsumerImplTest.java | 3 +-
.../java/impl/consumer/ProcessQueueImplTest.java | 3 +-
4 files changed, 43 insertions(+), 17 deletions(-)
diff --git
a/java/client/src/main/java/org/apache/rocketmq/client/java/impl/consumer/ConsumerImpl.java
b/java/client/src/main/java/org/apache/rocketmq/client/java/impl/consumer/ConsumerImpl.java
index 5175b4dd..a807fd28 100644
---
a/java/client/src/main/java/org/apache/rocketmq/client/java/impl/consumer/ConsumerImpl.java
+++
b/java/client/src/main/java/org/apache/rocketmq/client/java/impl/consumer/ConsumerImpl.java
@@ -42,6 +42,7 @@ import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
import java.util.Set;
+import java.util.UUID;
import java.util.regex.Pattern;
import org.apache.rocketmq.client.apis.ClientConfiguration;
import org.apache.rocketmq.client.apis.consumer.FilterExpression;
@@ -242,11 +243,12 @@ abstract class ConsumerImpl extends ClientImpl {
}
ReceiveMessageRequest wrapReceiveMessageRequest(int batchSize,
MessageQueueImpl mq,
- FilterExpression filterExpression, Duration longPollingTimeout) {
+ FilterExpression filterExpression, Duration longPollingTimeout, String
attemptId) {
+ attemptId = null == attemptId ? UUID.randomUUID().toString() :
attemptId;
return ReceiveMessageRequest.newBuilder().setGroup(getProtobufGroup())
.setMessageQueue(mq.toProtobuf()).setFilterExpression(wrapFilterExpression(filterExpression))
.setLongPollingTimeout(Durations.fromNanos(longPollingTimeout.toNanos()))
- .setBatchSize(batchSize).setAutoRenew(true).build();
+
.setBatchSize(batchSize).setAutoRenew(true).setAttemptId(attemptId).build();
}
ReceiveMessageRequest wrapReceiveMessageRequest(int batchSize,
MessageQueueImpl mq,
diff --git
a/java/client/src/main/java/org/apache/rocketmq/client/java/impl/consumer/ProcessQueueImpl.java
b/java/client/src/main/java/org/apache/rocketmq/client/java/impl/consumer/ProcessQueueImpl.java
index a80f90a2..c43b4a10 100644
---
a/java/client/src/main/java/org/apache/rocketmq/client/java/impl/consumer/ProcessQueueImpl.java
+++
b/java/client/src/main/java/org/apache/rocketmq/client/java/impl/consumer/ProcessQueueImpl.java
@@ -32,10 +32,12 @@ import com.google.common.util.concurrent.ListenableFuture;
import com.google.common.util.concurrent.MoreExecutors;
import com.google.common.util.concurrent.SettableFuture;
import com.google.errorprone.annotations.concurrent.GuardedBy;
+import io.grpc.StatusRuntimeException;
import java.time.Duration;
import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
+import java.util.UUID;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicLong;
@@ -175,29 +177,37 @@ class ProcessQueueImpl implements ProcessQueue {
*
* <p> Make sure that no exception will be thrown.
*/
- public void onReceiveMessageException(Throwable t) {
+ public void onReceiveMessageException(Throwable t, String attemptId) {
Duration delay = t instanceof TooManyRequestsException ?
RECEIVING_FLOW_CONTROL_BACKOFF_DELAY :
RECEIVING_FAILURE_BACKOFF_DELAY;
- receiveMessageLater(delay);
+ receiveMessageLater(delay, attemptId);
}
- private void receiveMessageLater(Duration delay) {
+ private void receiveMessageLater(Duration delay, String attemptId) {
final ClientId clientId = consumer.getClientId();
final ScheduledExecutorService scheduler = consumer.getScheduler();
try {
log.info("Try to receive message later, mq={}, delay={},
clientId={}", mq, delay, clientId);
- scheduler.schedule(this::receiveMessage, delay.toNanos(),
TimeUnit.NANOSECONDS);
+ scheduler.schedule(() -> receiveMessage(attemptId),
delay.toNanos(), TimeUnit.NANOSECONDS);
} catch (Throwable t) {
if (scheduler.isShutdown()) {
return;
}
// Should never reach here.
log.error("[Bug] Failed to schedule message receiving request,
mq={}, clientId={}", mq, clientId, t);
- onReceiveMessageException(t);
+ onReceiveMessageException(t, attemptId);
}
}
+ private String generateAttemptId() {
+ return UUID.randomUUID().toString();
+ }
+
public void receiveMessage() {
+ receiveMessage(this.generateAttemptId());
+ }
+
+ public void receiveMessage(String attemptId) {
final ClientId clientId = consumer.getClientId();
if (dropped) {
log.info("Process queue has been dropped, no longer receive
message, mq={}, clientId={}", mq, clientId);
@@ -205,13 +215,17 @@ class ProcessQueueImpl implements ProcessQueue {
}
if (this.isCacheFull()) {
log.warn("Process queue cache is full, would receive message
later, mq={}, clientId={}", mq, clientId);
- receiveMessageLater(RECEIVING_BACKOFF_DELAY_WHEN_CACHE_IS_FULL);
+ receiveMessageLater(RECEIVING_BACKOFF_DELAY_WHEN_CACHE_IS_FULL,
attemptId);
return;
}
- receiveMessageImmediately();
+ receiveMessageImmediately(attemptId);
}
private void receiveMessageImmediately() {
+ receiveMessageImmediately(this.generateAttemptId());
+ }
+
+ private void receiveMessageImmediately(String attemptId) {
final ClientId clientId = consumer.getClientId();
if (!consumer.isRunning()) {
log.info("Stop to receive message because consumer is not running,
mq={}, clientId={}", mq, clientId);
@@ -222,7 +236,7 @@ class ProcessQueueImpl implements ProcessQueue {
final int batchSize = this.getReceptionBatchSize();
final Duration longPollingTimeout =
consumer.getPushConsumerSettings().getLongPollingTimeout();
final ReceiveMessageRequest request =
consumer.wrapReceiveMessageRequest(batchSize, mq, filterExpression,
- longPollingTimeout);
+ longPollingTimeout, attemptId);
activityNanoTime = System.nanoTime();
// Intercept before message reception.
@@ -248,27 +262,35 @@ class ProcessQueueImpl implements ProcessQueue {
// Should never reach here.
log.error("[Bug] Exception raised while handling
receive result, mq={}, endpoints={}, "
+ "clientId={}", mq, endpoints, clientId, t);
- onReceiveMessageException(t);
+ onReceiveMessageException(t, attemptId);
}
}
@Override
public void onFailure(Throwable t) {
+ String nextAttemptId = null;
+ if (t instanceof StatusRuntimeException) {
+ StatusRuntimeException exception =
(StatusRuntimeException) t;
+ if
(io.grpc.Status.DEADLINE_EXCEEDED.equals(exception.getStatus())) {
+ nextAttemptId = request.getAttemptId();
+ }
+ }
// Intercept after message reception.
final MessageInterceptorContextImpl context0 =
new MessageInterceptorContextImpl(context,
MessageHookPointsStatus.ERROR);
consumer.doAfter(context0, Collections.emptyList());
- log.error("Exception raised during message reception,
mq={}, endpoints={}, clientId={}", mq,
- endpoints, clientId, t);
- onReceiveMessageException(t);
+ log.error("Exception raised during message reception,
mq={}, endpoints={}, attemptId={}, " +
+ "nextAttemptId={}, clientId={}", mq, endpoints,
request.getAttemptId(), nextAttemptId,
+ clientId, t);
+ onReceiveMessageException(t, nextAttemptId);
}
}, MoreExecutors.directExecutor());
receptionTimes.getAndIncrement();
consumer.getReceptionTimes().getAndIncrement();
} catch (Throwable t) {
log.error("Exception raised during message reception, mq={},
clientId={}", mq, clientId, t);
- onReceiveMessageException(t);
+ onReceiveMessageException(t, attemptId);
}
}
diff --git
a/java/client/src/test/java/org/apache/rocketmq/client/java/impl/consumer/ConsumerImplTest.java
b/java/client/src/test/java/org/apache/rocketmq/client/java/impl/consumer/ConsumerImplTest.java
index 619cd7d2..c7a12140 100644
---
a/java/client/src/test/java/org/apache/rocketmq/client/java/impl/consumer/ConsumerImplTest.java
+++
b/java/client/src/test/java/org/apache/rocketmq/client/java/impl/consumer/ConsumerImplTest.java
@@ -29,6 +29,7 @@ import com.google.common.util.concurrent.ListenableFuture;
import java.time.Duration;
import java.util.List;
import java.util.Map;
+import java.util.UUID;
import java.util.concurrent.ExecutionException;
import org.apache.rocketmq.client.apis.ClientConfiguration;
import org.apache.rocketmq.client.apis.consumer.ConsumeResult;
@@ -71,7 +72,7 @@ public class ConsumerImplTest extends TestBase {
any(ReceiveMessageRequest.class), any(Duration.class));
final MessageQueueImpl mq = fakeMessageQueueImpl(FAKE_TOPIC_0);
final ReceiveMessageRequest request =
pushConsumer.wrapReceiveMessageRequest(1,
- mq, new FilterExpression(), Duration.ofSeconds(15));
+ mq, new FilterExpression(), Duration.ofSeconds(15),
UUID.randomUUID().toString());
final ListenableFuture<ReceiveMessageResult> future0 =
pushConsumer.receiveMessage(request, mq, Duration.ofSeconds(15));
final ReceiveMessageResult receiveMessageResult = future0.get();
diff --git
a/java/client/src/test/java/org/apache/rocketmq/client/java/impl/consumer/ProcessQueueImplTest.java
b/java/client/src/test/java/org/apache/rocketmq/client/java/impl/consumer/ProcessQueueImplTest.java
index b4a29717..d9aa61fb 100644
---
a/java/client/src/test/java/org/apache/rocketmq/client/java/impl/consumer/ProcessQueueImplTest.java
+++
b/java/client/src/test/java/org/apache/rocketmq/client/java/impl/consumer/ProcessQueueImplTest.java
@@ -22,6 +22,7 @@ import static org.junit.Assert.assertFalse;
import static org.mockito.ArgumentMatchers.any;
import static org.mockito.ArgumentMatchers.anyInt;
import static org.mockito.ArgumentMatchers.eq;
+import static org.mockito.ArgumentMatchers.nullable;
import static org.mockito.Mockito.times;
import static org.mockito.Mockito.verify;
import static org.mockito.Mockito.when;
@@ -134,7 +135,7 @@ public class ProcessQueueImplTest extends TestBase {
when(pushSubscriptionSettings.getReceiveBatchSize()).thenReturn(32);
ReceiveMessageRequest request =
ReceiveMessageRequest.newBuilder().build();
when(pushConsumer.wrapReceiveMessageRequest(anyInt(),
any(MessageQueueImpl.class),
- any(FilterExpression.class),
any(Duration.class))).thenReturn(request);
+ any(FilterExpression.class), any(Duration.class),
nullable(String.class))).thenReturn(request);
processQueue.fetchMessageImmediately();
await().atMost(Duration.ofSeconds(3))
.untilAsserted(() -> verify(pushConsumer,
times(cachedMessagesCountThresholdPerQueue))