This is an automated email from the ASF dual-hosted git repository.
lizhanhui pushed a commit to branch develop
in repository https://gitbox.apache.org/repos/asf/rocketmq.git
The following commit(s) were added to refs/heads/develop by this push:
new 83c4a8f3f Support long polling in rocketmq proxy in the protocol
(#5788)
83c4a8f3f is described below
commit 83c4a8f3f0284135d999c39a9fdc580b3d76d80e
Author: Zhouxiang Zhan <[email protected]>
AuthorDate: Fri Dec 30 22:31:01 2022 +0800
Support long polling in rocketmq proxy in the protocol (#5788)
* Add long polling
* Change rocketmq-proto version to 2.0.2
* fix checkstyle
* Fix rocketmq-proto version
Signed-off-by: Li Zhanhui <[email protected]>
* Change pollTime to timeRemaining
* fix test
Signed-off-by: Li Zhanhui <[email protected]>
Co-authored-by: Li Zhanhui <[email protected]>
---
WORKSPACE | 2 +-
pom.xml | 2 +-
.../apache/rocketmq/proxy/config/ProxyConfig.java | 19 +++--
.../grpc/v2/common/GrpcClientSettingsManager.java | 2 +-
.../grpc/v2/consumer/ReceiveMessageActivity.java | 81 +++++++++++++---------
.../v2/consumer/ReceiveMessageActivityTest.java | 2 +
.../apache/rocketmq/test/grpc/v2/GrpcBaseIT.java | 1 +
7 files changed, 68 insertions(+), 41 deletions(-)
diff --git a/WORKSPACE b/WORKSPACE
index 394f8766e..267959878 100644
--- a/WORKSPACE
+++ b/WORKSPACE
@@ -67,7 +67,7 @@ maven_install(
"org.bouncycastle:bcpkix-jdk15on:1.69",
"com.google.code.gson:gson:2.8.9",
"com.googlecode.concurrentlinkedhashmap:concurrentlinkedhashmap-lru:1.4.2",
- "org.apache.rocketmq:rocketmq-proto:2.0.1",
+ "org.apache.rocketmq:rocketmq-proto:2.0.2",
"com.google.protobuf:protobuf-java:3.20.1",
"com.google.protobuf:protobuf-java-util:3.20.1",
"com.conversantmedia:disruptor:1.2.10",
diff --git a/pom.xml b/pom.xml
index e4324a5b0..040f8c5b6 100644
--- a/pom.xml
+++ b/pom.xml
@@ -127,7 +127,7 @@
<annotations-api.version>6.0.53</annotations-api.version>
<extra-enforcer-rules.version>1.0-beta-4</extra-enforcer-rules.version>
<concurrentlinkedhashmap-lru.version>1.4.2</concurrentlinkedhashmap-lru.version>
- <rocketmq-proto.version>2.0.1</rocketmq-proto.version>
+ <rocketmq-proto.version>2.0.2</rocketmq-proto.version>
<grpc.version>1.50.0</grpc.version>
<protobuf.version>3.20.1</protobuf.version>
<disruptor.version>1.2.10</disruptor.version>
diff --git
a/proxy/src/main/java/org/apache/rocketmq/proxy/config/ProxyConfig.java
b/proxy/src/main/java/org/apache/rocketmq/proxy/config/ProxyConfig.java
index 9c833ba8a..ef8d4ad30 100644
--- a/proxy/src/main/java/org/apache/rocketmq/proxy/config/ProxyConfig.java
+++ b/proxy/src/main/java/org/apache/rocketmq/proxy/config/ProxyConfig.java
@@ -121,7 +121,8 @@ public class ProxyConfig implements ConfigFile {
private long grpcClientProducerBackoffInitialMillis = 10;
private long grpcClientProducerBackoffMaxMillis = 1000;
private int grpcClientProducerBackoffMultiplier = 2;
- private long grpcClientConsumerLongPollingTimeoutMillis =
Duration.ofSeconds(30).toMillis();
+ private long grpcClientConsumerMinLongPollingTimeoutMillis =
Duration.ofSeconds(5).toMillis();
+ private long grpcClientConsumerMaxLongPollingTimeoutMillis =
Duration.ofSeconds(20).toMillis();
private int grpcClientConsumerLongPollingBatchSize = 32;
private long grpcClientIdleTimeMills = Duration.ofSeconds(120).toMillis();
@@ -598,12 +599,20 @@ public class ProxyConfig implements ConfigFile {
this.grpcClientProducerBackoffMultiplier =
grpcClientProducerBackoffMultiplier;
}
- public long getGrpcClientConsumerLongPollingTimeoutMillis() {
- return grpcClientConsumerLongPollingTimeoutMillis;
+ public long getGrpcClientConsumerMinLongPollingTimeoutMillis() {
+ return grpcClientConsumerMinLongPollingTimeoutMillis;
}
- public void setGrpcClientConsumerLongPollingTimeoutMillis(long
grpcClientConsumerLongPollingTimeoutMillis) {
- this.grpcClientConsumerLongPollingTimeoutMillis =
grpcClientConsumerLongPollingTimeoutMillis;
+ public void setGrpcClientConsumerMinLongPollingTimeoutMillis(long
grpcClientConsumerMinLongPollingTimeoutMillis) {
+ this.grpcClientConsumerMinLongPollingTimeoutMillis =
grpcClientConsumerMinLongPollingTimeoutMillis;
+ }
+
+ public long getGrpcClientConsumerMaxLongPollingTimeoutMillis() {
+ return grpcClientConsumerMaxLongPollingTimeoutMillis;
+ }
+
+ public void setGrpcClientConsumerMaxLongPollingTimeoutMillis(long
grpcClientConsumerMaxLongPollingTimeoutMillis) {
+ this.grpcClientConsumerMaxLongPollingTimeoutMillis =
grpcClientConsumerMaxLongPollingTimeoutMillis;
}
public int getGrpcClientConsumerLongPollingBatchSize() {
diff --git
a/proxy/src/main/java/org/apache/rocketmq/proxy/grpc/v2/common/GrpcClientSettingsManager.java
b/proxy/src/main/java/org/apache/rocketmq/proxy/grpc/v2/common/GrpcClientSettingsManager.java
index b5b82fbdc..dcb619416 100644
---
a/proxy/src/main/java/org/apache/rocketmq/proxy/grpc/v2/common/GrpcClientSettingsManager.java
+++
b/proxy/src/main/java/org/apache/rocketmq/proxy/grpc/v2/common/GrpcClientSettingsManager.java
@@ -143,7 +143,7 @@ public class GrpcClientSettingsManager extends
ServiceThread implements StartAnd
resultSettingsBuilder.getSubscriptionBuilder()
.setReceiveBatchSize(config.getGrpcClientConsumerLongPollingBatchSize())
-
.setLongPollingTimeout(Durations.fromMillis(config.getGrpcClientConsumerLongPollingTimeoutMillis()))
+
.setLongPollingTimeout(Durations.fromMillis(config.getGrpcClientConsumerMaxLongPollingTimeoutMillis()))
.setFifo(groupConfig.isConsumeMessageOrderly());
resultSettingsBuilder.getBackoffPolicyBuilder().setMaxAttempts(groupConfig.getRetryMaxTimes()
+ 1);
diff --git
a/proxy/src/main/java/org/apache/rocketmq/proxy/grpc/v2/consumer/ReceiveMessageActivity.java
b/proxy/src/main/java/org/apache/rocketmq/proxy/grpc/v2/consumer/ReceiveMessageActivity.java
index f653858de..31b841132 100644
---
a/proxy/src/main/java/org/apache/rocketmq/proxy/grpc/v2/consumer/ReceiveMessageActivity.java
+++
b/proxy/src/main/java/org/apache/rocketmq/proxy/grpc/v2/consumer/ReceiveMessageActivity.java
@@ -68,12 +68,27 @@ public class ReceiveMessageActivity extends
AbstractMessingActivity {
ProxyConfig config = ConfigurationManager.getProxyConfig();
Long timeRemaining = ctx.getRemainingMs();
- long pollTime = timeRemaining -
Durations.toMillis(settings.getRequestTimeout()) / 2;
- if (pollTime < 0) {
- pollTime = 0;
+ long pollingTime;
+ if (request.hasLongPollingTimeout()) {
+ pollingTime =
Durations.toMillis(request.getLongPollingTimeout());
+ } else {
+ pollingTime = timeRemaining -
Durations.toMillis(settings.getRequestTimeout()) / 2;
+ }
+ if (pollingTime <
config.getGrpcClientConsumerMinLongPollingTimeoutMillis()) {
+ pollingTime =
config.getGrpcClientConsumerMinLongPollingTimeoutMillis();
+ }
+ if (pollingTime >
config.getGrpcClientConsumerMaxLongPollingTimeoutMillis()) {
+ pollingTime =
config.getGrpcClientConsumerMaxLongPollingTimeoutMillis();
}
- if (pollTime >
config.getGrpcClientConsumerLongPollingTimeoutMillis()) {
- pollTime =
config.getGrpcClientConsumerLongPollingTimeoutMillis();
+
+ if (pollingTime > timeRemaining) {
+ if (timeRemaining >=
config.getGrpcClientConsumerMinLongPollingTimeoutMillis()) {
+ pollingTime = timeRemaining;
+ } else {
+ writer.writeAndComplete(ctx, Code.ILLEGAL_POLLING_TIME,
"The deadline time remaining is not enough" +
+ " for polling, please check network condition");
+ return;
+ }
}
validateTopicAndConsumerGroup(request.getMessageQueue().getTopic(),
request.getGroup());
@@ -100,37 +115,37 @@ public class ReceiveMessageActivity extends
AbstractMessingActivity {
}
this.messagingProcessor.popMessage(
- ctx,
- new ReceiveMessageQueueSelector(
- request.getMessageQueue().getBroker().getName()
- ),
- group,
- topic,
- request.getBatchSize(),
- actualInvisibleTime,
- pollTime,
- ConsumeInitMode.MAX,
- subscriptionData,
- fifo,
- new PopMessageResultFilterImpl(maxAttempts),
- timeRemaining
- ).thenAccept(popResult -> {
- if (proxyConfig.isEnableProxyAutoRenew() &&
request.getAutoRenew()) {
- if (PopStatus.FOUND.equals(popResult.getPopStatus())) {
- List<MessageExt> messageExtList =
popResult.getMsgFoundList();
- for (MessageExt messageExt : messageExtList) {
- String receiptHandle =
messageExt.getProperty(MessageConst.PROPERTY_POP_CK);
- if (receiptHandle != null) {
- MessageReceiptHandle messageReceiptHandle =
- new MessageReceiptHandle(group, topic,
messageExt.getQueueId(), receiptHandle, messageExt.getMsgId(),
- messageExt.getQueueOffset(),
messageExt.getReconsumeTimes());
-
receiptHandleProcessor.addReceiptHandle(grpcChannelManager.getChannel(ctx.getClientID()),
group, messageExt.getMsgId(), receiptHandle, messageReceiptHandle);
+ ctx,
+ new ReceiveMessageQueueSelector(
+ request.getMessageQueue().getBroker().getName()
+ ),
+ group,
+ topic,
+ request.getBatchSize(),
+ actualInvisibleTime,
+ pollingTime,
+ ConsumeInitMode.MAX,
+ subscriptionData,
+ fifo,
+ new PopMessageResultFilterImpl(maxAttempts),
+ timeRemaining
+ ).thenAccept(popResult -> {
+ if (proxyConfig.isEnableProxyAutoRenew() &&
request.getAutoRenew()) {
+ if (PopStatus.FOUND.equals(popResult.getPopStatus())) {
+ List<MessageExt> messageExtList =
popResult.getMsgFoundList();
+ for (MessageExt messageExt : messageExtList) {
+ String receiptHandle =
messageExt.getProperty(MessageConst.PROPERTY_POP_CK);
+ if (receiptHandle != null) {
+ MessageReceiptHandle messageReceiptHandle =
+ new MessageReceiptHandle(group, topic,
messageExt.getQueueId(), receiptHandle, messageExt.getMsgId(),
+ messageExt.getQueueOffset(),
messageExt.getReconsumeTimes());
+
receiptHandleProcessor.addReceiptHandle(grpcChannelManager.getChannel(ctx.getClientID()),
group, messageExt.getMsgId(), receiptHandle, messageReceiptHandle);
+ }
}
}
}
- }
- writer.writeAndComplete(ctx, request, popResult);
- })
+ writer.writeAndComplete(ctx, request, popResult);
+ })
.exceptionally(t -> {
writer.writeAndComplete(ctx, request, t);
return null;
diff --git
a/proxy/src/test/java/org/apache/rocketmq/proxy/grpc/v2/consumer/ReceiveMessageActivityTest.java
b/proxy/src/test/java/org/apache/rocketmq/proxy/grpc/v2/consumer/ReceiveMessageActivityTest.java
index 771830de9..4c2f7bd1c 100644
---
a/proxy/src/test/java/org/apache/rocketmq/proxy/grpc/v2/consumer/ReceiveMessageActivityTest.java
+++
b/proxy/src/test/java/org/apache/rocketmq/proxy/grpc/v2/consumer/ReceiveMessageActivityTest.java
@@ -38,6 +38,7 @@ import org.apache.rocketmq.client.consumer.PopStatus;
import org.apache.rocketmq.common.MixAll;
import org.apache.rocketmq.common.constant.PermName;
import org.apache.rocketmq.proxy.common.ProxyContext;
+import org.apache.rocketmq.proxy.config.ConfigurationManager;
import org.apache.rocketmq.proxy.grpc.v2.BaseActivityTest;
import org.apache.rocketmq.proxy.service.route.AddressableMessageQueue;
import org.apache.rocketmq.proxy.service.route.MessageQueueView;
@@ -71,6 +72,7 @@ public class ReceiveMessageActivityTest extends
BaseActivityTest {
@Before
public void before() throws Throwable {
super.before();
+
ConfigurationManager.getProxyConfig().setGrpcClientConsumerMinLongPollingTimeoutMillis(0);
this.receiveMessageActivity = new
ReceiveMessageActivity(messagingProcessor, receiptHandleProcessor,
grpcClientSettingsManager, grpcChannelManager);
}
diff --git
a/test/src/test/java/org/apache/rocketmq/test/grpc/v2/GrpcBaseIT.java
b/test/src/test/java/org/apache/rocketmq/test/grpc/v2/GrpcBaseIT.java
index 95810b97c..243c72dec 100644
--- a/test/src/test/java/org/apache/rocketmq/test/grpc/v2/GrpcBaseIT.java
+++ b/test/src/test/java/org/apache/rocketmq/test/grpc/v2/GrpcBaseIT.java
@@ -158,6 +158,7 @@ public class GrpcBaseIT extends BaseConf {
ConfigurationManager.getProxyConfig().setRocketMQClusterName(brokerController1.getBrokerConfig().getBrokerClusterName());
ConfigurationManager.getProxyConfig().setHeartbeatSyncerTopicClusterName(brokerController1.getBrokerConfig().getBrokerClusterName());
ConfigurationManager.getProxyConfig().setMinInvisibleTimeMillsForRecv(3);
+
ConfigurationManager.getProxyConfig().setGrpcClientConsumerMinLongPollingTimeoutMillis(0);
}
protected MessagingServiceGrpc.MessagingServiceStub createStub(Channel
channel) {