This is an automated email from the ASF dual-hosted git repository.

lizhanhui pushed a commit to branch develop
in repository https://gitbox.apache.org/repos/asf/rocketmq.git


The following commit(s) were added to refs/heads/develop by this push:
     new 83c4a8f3f Support long polling in rocketmq proxy in the protocol 
(#5788)
83c4a8f3f is described below

commit 83c4a8f3f0284135d999c39a9fdc580b3d76d80e
Author: Zhouxiang Zhan <[email protected]>
AuthorDate: Fri Dec 30 22:31:01 2022 +0800

    Support long polling in rocketmq proxy in the protocol (#5788)
    
    * Add long polling
    
    * Change rocketmq-proto version to 2.0.2
    
    * fix checkstyle
    
    * Fix rocketmq-proto version
    
    Signed-off-by: Li Zhanhui <[email protected]>
    
    * Change pollTime to timeRemaining
    
    * fix test
    
    Signed-off-by: Li Zhanhui <[email protected]>
    Co-authored-by: Li Zhanhui <[email protected]>
---
 WORKSPACE                                          |  2 +-
 pom.xml                                            |  2 +-
 .../apache/rocketmq/proxy/config/ProxyConfig.java  | 19 +++--
 .../grpc/v2/common/GrpcClientSettingsManager.java  |  2 +-
 .../grpc/v2/consumer/ReceiveMessageActivity.java   | 81 +++++++++++++---------
 .../v2/consumer/ReceiveMessageActivityTest.java    |  2 +
 .../apache/rocketmq/test/grpc/v2/GrpcBaseIT.java   |  1 +
 7 files changed, 68 insertions(+), 41 deletions(-)

diff --git a/WORKSPACE b/WORKSPACE
index 394f8766e..267959878 100644
--- a/WORKSPACE
+++ b/WORKSPACE
@@ -67,7 +67,7 @@ maven_install(
         "org.bouncycastle:bcpkix-jdk15on:1.69",
         "com.google.code.gson:gson:2.8.9",
         
"com.googlecode.concurrentlinkedhashmap:concurrentlinkedhashmap-lru:1.4.2",
-        "org.apache.rocketmq:rocketmq-proto:2.0.1",
+        "org.apache.rocketmq:rocketmq-proto:2.0.2",
         "com.google.protobuf:protobuf-java:3.20.1",
         "com.google.protobuf:protobuf-java-util:3.20.1",
         "com.conversantmedia:disruptor:1.2.10",
diff --git a/pom.xml b/pom.xml
index e4324a5b0..040f8c5b6 100644
--- a/pom.xml
+++ b/pom.xml
@@ -127,7 +127,7 @@
         <annotations-api.version>6.0.53</annotations-api.version>
         <extra-enforcer-rules.version>1.0-beta-4</extra-enforcer-rules.version>
         
<concurrentlinkedhashmap-lru.version>1.4.2</concurrentlinkedhashmap-lru.version>
-        <rocketmq-proto.version>2.0.1</rocketmq-proto.version>
+        <rocketmq-proto.version>2.0.2</rocketmq-proto.version>
         <grpc.version>1.50.0</grpc.version>
         <protobuf.version>3.20.1</protobuf.version>
         <disruptor.version>1.2.10</disruptor.version>
diff --git 
a/proxy/src/main/java/org/apache/rocketmq/proxy/config/ProxyConfig.java 
b/proxy/src/main/java/org/apache/rocketmq/proxy/config/ProxyConfig.java
index 9c833ba8a..ef8d4ad30 100644
--- a/proxy/src/main/java/org/apache/rocketmq/proxy/config/ProxyConfig.java
+++ b/proxy/src/main/java/org/apache/rocketmq/proxy/config/ProxyConfig.java
@@ -121,7 +121,8 @@ public class ProxyConfig implements ConfigFile {
     private long grpcClientProducerBackoffInitialMillis = 10;
     private long grpcClientProducerBackoffMaxMillis = 1000;
     private int grpcClientProducerBackoffMultiplier = 2;
-    private long grpcClientConsumerLongPollingTimeoutMillis = 
Duration.ofSeconds(30).toMillis();
+    private long grpcClientConsumerMinLongPollingTimeoutMillis = 
Duration.ofSeconds(5).toMillis();
+    private long grpcClientConsumerMaxLongPollingTimeoutMillis = 
Duration.ofSeconds(20).toMillis();
     private int grpcClientConsumerLongPollingBatchSize = 32;
     private long grpcClientIdleTimeMills = Duration.ofSeconds(120).toMillis();
 
@@ -598,12 +599,20 @@ public class ProxyConfig implements ConfigFile {
         this.grpcClientProducerBackoffMultiplier = 
grpcClientProducerBackoffMultiplier;
     }
 
-    public long getGrpcClientConsumerLongPollingTimeoutMillis() {
-        return grpcClientConsumerLongPollingTimeoutMillis;
+    public long getGrpcClientConsumerMinLongPollingTimeoutMillis() {
+        return grpcClientConsumerMinLongPollingTimeoutMillis;
     }
 
-    public void setGrpcClientConsumerLongPollingTimeoutMillis(long 
grpcClientConsumerLongPollingTimeoutMillis) {
-        this.grpcClientConsumerLongPollingTimeoutMillis = 
grpcClientConsumerLongPollingTimeoutMillis;
+    public void setGrpcClientConsumerMinLongPollingTimeoutMillis(long 
grpcClientConsumerMinLongPollingTimeoutMillis) {
+        this.grpcClientConsumerMinLongPollingTimeoutMillis = 
grpcClientConsumerMinLongPollingTimeoutMillis;
+    }
+
+    public long getGrpcClientConsumerMaxLongPollingTimeoutMillis() {
+        return grpcClientConsumerMaxLongPollingTimeoutMillis;
+    }
+
+    public void setGrpcClientConsumerMaxLongPollingTimeoutMillis(long 
grpcClientConsumerMaxLongPollingTimeoutMillis) {
+        this.grpcClientConsumerMaxLongPollingTimeoutMillis = 
grpcClientConsumerMaxLongPollingTimeoutMillis;
     }
 
     public int getGrpcClientConsumerLongPollingBatchSize() {
diff --git 
a/proxy/src/main/java/org/apache/rocketmq/proxy/grpc/v2/common/GrpcClientSettingsManager.java
 
b/proxy/src/main/java/org/apache/rocketmq/proxy/grpc/v2/common/GrpcClientSettingsManager.java
index b5b82fbdc..dcb619416 100644
--- 
a/proxy/src/main/java/org/apache/rocketmq/proxy/grpc/v2/common/GrpcClientSettingsManager.java
+++ 
b/proxy/src/main/java/org/apache/rocketmq/proxy/grpc/v2/common/GrpcClientSettingsManager.java
@@ -143,7 +143,7 @@ public class GrpcClientSettingsManager extends 
ServiceThread implements StartAnd
 
         resultSettingsBuilder.getSubscriptionBuilder()
             
.setReceiveBatchSize(config.getGrpcClientConsumerLongPollingBatchSize())
-            
.setLongPollingTimeout(Durations.fromMillis(config.getGrpcClientConsumerLongPollingTimeoutMillis()))
+            
.setLongPollingTimeout(Durations.fromMillis(config.getGrpcClientConsumerMaxLongPollingTimeoutMillis()))
             .setFifo(groupConfig.isConsumeMessageOrderly());
 
         
resultSettingsBuilder.getBackoffPolicyBuilder().setMaxAttempts(groupConfig.getRetryMaxTimes()
 + 1);
diff --git 
a/proxy/src/main/java/org/apache/rocketmq/proxy/grpc/v2/consumer/ReceiveMessageActivity.java
 
b/proxy/src/main/java/org/apache/rocketmq/proxy/grpc/v2/consumer/ReceiveMessageActivity.java
index f653858de..31b841132 100644
--- 
a/proxy/src/main/java/org/apache/rocketmq/proxy/grpc/v2/consumer/ReceiveMessageActivity.java
+++ 
b/proxy/src/main/java/org/apache/rocketmq/proxy/grpc/v2/consumer/ReceiveMessageActivity.java
@@ -68,12 +68,27 @@ public class ReceiveMessageActivity extends 
AbstractMessingActivity {
             ProxyConfig config = ConfigurationManager.getProxyConfig();
 
             Long timeRemaining = ctx.getRemainingMs();
-            long pollTime = timeRemaining - 
Durations.toMillis(settings.getRequestTimeout()) / 2;
-            if (pollTime < 0) {
-                pollTime = 0;
+            long pollingTime;
+            if (request.hasLongPollingTimeout()) {
+                pollingTime = 
Durations.toMillis(request.getLongPollingTimeout());
+            } else {
+                pollingTime = timeRemaining - 
Durations.toMillis(settings.getRequestTimeout()) / 2;
+            }
+            if (pollingTime < 
config.getGrpcClientConsumerMinLongPollingTimeoutMillis()) {
+                pollingTime = 
config.getGrpcClientConsumerMinLongPollingTimeoutMillis();
+            }
+            if (pollingTime > 
config.getGrpcClientConsumerMaxLongPollingTimeoutMillis()) {
+                pollingTime = 
config.getGrpcClientConsumerMaxLongPollingTimeoutMillis();
             }
-            if (pollTime > 
config.getGrpcClientConsumerLongPollingTimeoutMillis()) {
-                pollTime = 
config.getGrpcClientConsumerLongPollingTimeoutMillis();
+
+            if (pollingTime > timeRemaining) {
+                if (timeRemaining >= 
config.getGrpcClientConsumerMinLongPollingTimeoutMillis()) {
+                    pollingTime = timeRemaining;
+                } else {
+                    writer.writeAndComplete(ctx, Code.ILLEGAL_POLLING_TIME, 
"The deadline time remaining is not enough" +
+                        " for polling, please check network condition");
+                    return;
+                }
             }
 
             
validateTopicAndConsumerGroup(request.getMessageQueue().getTopic(), 
request.getGroup());
@@ -100,37 +115,37 @@ public class ReceiveMessageActivity extends 
AbstractMessingActivity {
             }
 
             this.messagingProcessor.popMessage(
-                ctx,
-                new ReceiveMessageQueueSelector(
-                    request.getMessageQueue().getBroker().getName()
-                ),
-                group,
-                topic,
-                request.getBatchSize(),
-                actualInvisibleTime,
-                pollTime,
-                ConsumeInitMode.MAX,
-                subscriptionData,
-                fifo,
-                new PopMessageResultFilterImpl(maxAttempts),
-                timeRemaining
-            ).thenAccept(popResult -> {
-                if (proxyConfig.isEnableProxyAutoRenew() && 
request.getAutoRenew()) {
-                    if (PopStatus.FOUND.equals(popResult.getPopStatus())) {
-                        List<MessageExt> messageExtList = 
popResult.getMsgFoundList();
-                        for (MessageExt messageExt : messageExtList) {
-                            String receiptHandle = 
messageExt.getProperty(MessageConst.PROPERTY_POP_CK);
-                            if (receiptHandle != null) {
-                                MessageReceiptHandle messageReceiptHandle =
-                                    new MessageReceiptHandle(group, topic, 
messageExt.getQueueId(), receiptHandle, messageExt.getMsgId(),
-                                        messageExt.getQueueOffset(), 
messageExt.getReconsumeTimes());
-                                
receiptHandleProcessor.addReceiptHandle(grpcChannelManager.getChannel(ctx.getClientID()),
 group, messageExt.getMsgId(), receiptHandle, messageReceiptHandle);
+                    ctx,
+                    new ReceiveMessageQueueSelector(
+                        request.getMessageQueue().getBroker().getName()
+                    ),
+                    group,
+                    topic,
+                    request.getBatchSize(),
+                    actualInvisibleTime,
+                    pollingTime,
+                    ConsumeInitMode.MAX,
+                    subscriptionData,
+                    fifo,
+                    new PopMessageResultFilterImpl(maxAttempts),
+                    timeRemaining
+                ).thenAccept(popResult -> {
+                    if (proxyConfig.isEnableProxyAutoRenew() && 
request.getAutoRenew()) {
+                        if (PopStatus.FOUND.equals(popResult.getPopStatus())) {
+                            List<MessageExt> messageExtList = 
popResult.getMsgFoundList();
+                            for (MessageExt messageExt : messageExtList) {
+                                String receiptHandle = 
messageExt.getProperty(MessageConst.PROPERTY_POP_CK);
+                                if (receiptHandle != null) {
+                                    MessageReceiptHandle messageReceiptHandle =
+                                        new MessageReceiptHandle(group, topic, 
messageExt.getQueueId(), receiptHandle, messageExt.getMsgId(),
+                                            messageExt.getQueueOffset(), 
messageExt.getReconsumeTimes());
+                                    
receiptHandleProcessor.addReceiptHandle(grpcChannelManager.getChannel(ctx.getClientID()),
 group, messageExt.getMsgId(), receiptHandle, messageReceiptHandle);
+                                }
                             }
                         }
                     }
-                }
-                writer.writeAndComplete(ctx, request, popResult);
-            })
+                    writer.writeAndComplete(ctx, request, popResult);
+                })
                 .exceptionally(t -> {
                     writer.writeAndComplete(ctx, request, t);
                     return null;
diff --git 
a/proxy/src/test/java/org/apache/rocketmq/proxy/grpc/v2/consumer/ReceiveMessageActivityTest.java
 
b/proxy/src/test/java/org/apache/rocketmq/proxy/grpc/v2/consumer/ReceiveMessageActivityTest.java
index 771830de9..4c2f7bd1c 100644
--- 
a/proxy/src/test/java/org/apache/rocketmq/proxy/grpc/v2/consumer/ReceiveMessageActivityTest.java
+++ 
b/proxy/src/test/java/org/apache/rocketmq/proxy/grpc/v2/consumer/ReceiveMessageActivityTest.java
@@ -38,6 +38,7 @@ import org.apache.rocketmq.client.consumer.PopStatus;
 import org.apache.rocketmq.common.MixAll;
 import org.apache.rocketmq.common.constant.PermName;
 import org.apache.rocketmq.proxy.common.ProxyContext;
+import org.apache.rocketmq.proxy.config.ConfigurationManager;
 import org.apache.rocketmq.proxy.grpc.v2.BaseActivityTest;
 import org.apache.rocketmq.proxy.service.route.AddressableMessageQueue;
 import org.apache.rocketmq.proxy.service.route.MessageQueueView;
@@ -71,6 +72,7 @@ public class ReceiveMessageActivityTest extends 
BaseActivityTest {
     @Before
     public void before() throws Throwable {
         super.before();
+        
ConfigurationManager.getProxyConfig().setGrpcClientConsumerMinLongPollingTimeoutMillis(0);
         this.receiveMessageActivity = new 
ReceiveMessageActivity(messagingProcessor, receiptHandleProcessor,
             grpcClientSettingsManager, grpcChannelManager);
     }
diff --git 
a/test/src/test/java/org/apache/rocketmq/test/grpc/v2/GrpcBaseIT.java 
b/test/src/test/java/org/apache/rocketmq/test/grpc/v2/GrpcBaseIT.java
index 95810b97c..243c72dec 100644
--- a/test/src/test/java/org/apache/rocketmq/test/grpc/v2/GrpcBaseIT.java
+++ b/test/src/test/java/org/apache/rocketmq/test/grpc/v2/GrpcBaseIT.java
@@ -158,6 +158,7 @@ public class GrpcBaseIT extends BaseConf {
         
ConfigurationManager.getProxyConfig().setRocketMQClusterName(brokerController1.getBrokerConfig().getBrokerClusterName());
         
ConfigurationManager.getProxyConfig().setHeartbeatSyncerTopicClusterName(brokerController1.getBrokerConfig().getBrokerClusterName());
         
ConfigurationManager.getProxyConfig().setMinInvisibleTimeMillsForRecv(3);
+        
ConfigurationManager.getProxyConfig().setGrpcClientConsumerMinLongPollingTimeoutMillis(0);
     }
 
     protected MessagingServiceGrpc.MessagingServiceStub createStub(Channel 
channel) {

Reply via email to