This is an automated email from the ASF dual-hosted git repository.

lizhimin pushed a commit to branch develop
in repository https://gitbox.apache.org/repos/asf/rocketmq.git


The following commit(s) were added to refs/heads/develop by this push:
     new 988c826921 [ISSUE #9196] Broker return pop stats when receive 
notification (#9197)
988c826921 is described below

commit 988c826921de5ed7eda3d88c22c705c6dc470e24
Author: qianye <wuxingcan....@alibaba-inc.com>
AuthorDate: Mon Mar 3 11:35:03 2025 +0800

    [ISSUE #9196] Broker return pop stats when receive notification (#9197)
---
 .../broker/processor/NotificationProcessor.java    |  5 +++-
 .../rocketmq/client/consumer/NotifyResult.java     | 29 +++++++++++++---------
 .../client/impl/mqclient/MQClientAPIExt.java       | 14 +++++++++--
 .../header/NotificationResponseHeader.java         | 10 ++++++++
 4 files changed, 43 insertions(+), 15 deletions(-)

diff --git 
a/broker/src/main/java/org/apache/rocketmq/broker/processor/NotificationProcessor.java
 
b/broker/src/main/java/org/apache/rocketmq/broker/processor/NotificationProcessor.java
index b95055efba..2fe3464943 100644
--- 
a/broker/src/main/java/org/apache/rocketmq/broker/processor/NotificationProcessor.java
+++ 
b/broker/src/main/java/org/apache/rocketmq/broker/processor/NotificationProcessor.java
@@ -161,8 +161,11 @@ public class NotificationProcessor implements 
NettyRequestProcessor {
         }
 
         if (!hasMsg) {
-            if (popLongPollingService.polling(ctx, request, new 
PollingHeader(requestHeader)) == PollingResult.POLLING_SUC) {
+            PollingResult pollingResult = popLongPollingService.polling(ctx, 
request, new PollingHeader(requestHeader));
+            if (pollingResult == PollingResult.POLLING_SUC) {
                 return null;
+            } else if (pollingResult == PollingResult.POLLING_FULL) {
+                responseHeader.setPollingFull(true);
             }
         }
         response.setCode(ResponseCode.SUCCESS);
diff --git 
a/remoting/src/main/java/org/apache/rocketmq/remoting/protocol/header/NotificationResponseHeader.java
 b/client/src/main/java/org/apache/rocketmq/client/consumer/NotifyResult.java
similarity index 66%
copy from 
remoting/src/main/java/org/apache/rocketmq/remoting/protocol/header/NotificationResponseHeader.java
copy to 
client/src/main/java/org/apache/rocketmq/client/consumer/NotifyResult.java
index cbab597401..4bd8b28175 100644
--- 
a/remoting/src/main/java/org/apache/rocketmq/remoting/protocol/header/NotificationResponseHeader.java
+++ b/client/src/main/java/org/apache/rocketmq/client/consumer/NotifyResult.java
@@ -14,27 +14,32 @@
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
-package org.apache.rocketmq.remoting.protocol.header;
+package org.apache.rocketmq.client.consumer;
 
-import org.apache.rocketmq.remoting.CommandCustomHeader;
-import org.apache.rocketmq.remoting.annotation.CFNotNull;
-import org.apache.rocketmq.remoting.exception.RemotingCommandException;
-
-public class NotificationResponseHeader implements CommandCustomHeader {
-
-
-    @CFNotNull
-    private boolean hasMsg = false;
+public class NotifyResult {
+    private boolean hasMsg;
+    private boolean pollingFull;
 
     public boolean isHasMsg() {
         return hasMsg;
     }
 
+    public boolean isPollingFull() {
+        return pollingFull;
+    }
+
     public void setHasMsg(boolean hasMsg) {
         this.hasMsg = hasMsg;
     }
 
-    @Override
-    public void checkFields() throws RemotingCommandException {
+    public void setPollingFull(boolean pollingFull) {
+        this.pollingFull = pollingFull;
+    }
+
+    @Override public String toString() {
+        return "NotifyResult{" +
+            "hasMsg=" + hasMsg +
+            ", pollingFull=" + pollingFull +
+            '}';
     }
 }
diff --git 
a/client/src/main/java/org/apache/rocketmq/client/impl/mqclient/MQClientAPIExt.java
 
b/client/src/main/java/org/apache/rocketmq/client/impl/mqclient/MQClientAPIExt.java
index c22f453477..9089503407 100644
--- 
a/client/src/main/java/org/apache/rocketmq/client/impl/mqclient/MQClientAPIExt.java
+++ 
b/client/src/main/java/org/apache/rocketmq/client/impl/mqclient/MQClientAPIExt.java
@@ -24,6 +24,7 @@ import java.util.concurrent.CompletableFuture;
 import org.apache.rocketmq.client.ClientConfig;
 import org.apache.rocketmq.client.consumer.AckCallback;
 import org.apache.rocketmq.client.consumer.AckResult;
+import org.apache.rocketmq.client.consumer.NotifyResult;
 import org.apache.rocketmq.client.consumer.PopCallback;
 import org.apache.rocketmq.client.consumer.PopResult;
 import org.apache.rocketmq.client.consumer.PullCallback;
@@ -620,14 +621,23 @@ public class MQClientAPIExt extends MQClientAPIImpl {
     }
 
     public CompletableFuture<Boolean> notification(String brokerAddr, 
NotificationRequestHeader requestHeader,
+        long timeoutMillis) {
+        return notificationWithPollingStats(brokerAddr, requestHeader, 
timeoutMillis).thenApply(NotifyResult::isHasMsg);
+    }
+
+    public CompletableFuture<NotifyResult> notificationWithPollingStats(String 
brokerAddr,
+        NotificationRequestHeader requestHeader,
         long timeoutMillis) {
         RemotingCommand request = 
RemotingCommand.createRequestCommand(RequestCode.NOTIFICATION, requestHeader);
         return this.getRemotingClient().invoke(brokerAddr, request, 
timeoutMillis).thenCompose(response -> {
-            CompletableFuture<Boolean> future0 = new CompletableFuture<>();
+            CompletableFuture<NotifyResult> future0 = new 
CompletableFuture<>();
             if (response.getCode() == ResponseCode.SUCCESS) {
                 try {
                     NotificationResponseHeader responseHeader = 
(NotificationResponseHeader) 
response.decodeCommandCustomHeader(NotificationResponseHeader.class);
-                    future0.complete(responseHeader.isHasMsg());
+                    NotifyResult notifyResult = new NotifyResult();
+                    notifyResult.setHasMsg(responseHeader.isHasMsg());
+                    
notifyResult.setPollingFull(responseHeader.isPollingFull());
+                    future0.complete(notifyResult);
                 } catch (Throwable t) {
                     future0.completeExceptionally(t);
                 }
diff --git 
a/remoting/src/main/java/org/apache/rocketmq/remoting/protocol/header/NotificationResponseHeader.java
 
b/remoting/src/main/java/org/apache/rocketmq/remoting/protocol/header/NotificationResponseHeader.java
index cbab597401..027717e006 100644
--- 
a/remoting/src/main/java/org/apache/rocketmq/remoting/protocol/header/NotificationResponseHeader.java
+++ 
b/remoting/src/main/java/org/apache/rocketmq/remoting/protocol/header/NotificationResponseHeader.java
@@ -26,10 +26,20 @@ public class NotificationResponseHeader implements 
CommandCustomHeader {
     @CFNotNull
     private boolean hasMsg = false;
 
+    private boolean pollingFull = false;
+
     public boolean isHasMsg() {
         return hasMsg;
     }
 
+    public boolean isPollingFull() {
+        return pollingFull;
+    }
+
+    public void setPollingFull(boolean pollingFull) {
+        this.pollingFull = pollingFull;
+    }
+
     public void setHasMsg(boolean hasMsg) {
         this.hasMsg = hasMsg;
     }

Reply via email to