This is an automated email from the ASF dual-hosted git repository. lizhimin pushed a commit to branch develop in repository https://gitbox.apache.org/repos/asf/rocketmq.git
The following commit(s) were added to refs/heads/develop by this push: new 988c826921 [ISSUE #9196] Broker return pop stats when receive notification (#9197) 988c826921 is described below commit 988c826921de5ed7eda3d88c22c705c6dc470e24 Author: qianye <wuxingcan....@alibaba-inc.com> AuthorDate: Mon Mar 3 11:35:03 2025 +0800 [ISSUE #9196] Broker return pop stats when receive notification (#9197) --- .../broker/processor/NotificationProcessor.java | 5 +++- .../rocketmq/client/consumer/NotifyResult.java | 29 +++++++++++++--------- .../client/impl/mqclient/MQClientAPIExt.java | 14 +++++++++-- .../header/NotificationResponseHeader.java | 10 ++++++++ 4 files changed, 43 insertions(+), 15 deletions(-) diff --git a/broker/src/main/java/org/apache/rocketmq/broker/processor/NotificationProcessor.java b/broker/src/main/java/org/apache/rocketmq/broker/processor/NotificationProcessor.java index b95055efba..2fe3464943 100644 --- a/broker/src/main/java/org/apache/rocketmq/broker/processor/NotificationProcessor.java +++ b/broker/src/main/java/org/apache/rocketmq/broker/processor/NotificationProcessor.java @@ -161,8 +161,11 @@ public class NotificationProcessor implements NettyRequestProcessor { } if (!hasMsg) { - if (popLongPollingService.polling(ctx, request, new PollingHeader(requestHeader)) == PollingResult.POLLING_SUC) { + PollingResult pollingResult = popLongPollingService.polling(ctx, request, new PollingHeader(requestHeader)); + if (pollingResult == PollingResult.POLLING_SUC) { return null; + } else if (pollingResult == PollingResult.POLLING_FULL) { + responseHeader.setPollingFull(true); } } response.setCode(ResponseCode.SUCCESS); diff --git a/remoting/src/main/java/org/apache/rocketmq/remoting/protocol/header/NotificationResponseHeader.java b/client/src/main/java/org/apache/rocketmq/client/consumer/NotifyResult.java similarity index 66% copy from remoting/src/main/java/org/apache/rocketmq/remoting/protocol/header/NotificationResponseHeader.java copy to client/src/main/java/org/apache/rocketmq/client/consumer/NotifyResult.java index cbab597401..4bd8b28175 100644 --- a/remoting/src/main/java/org/apache/rocketmq/remoting/protocol/header/NotificationResponseHeader.java +++ b/client/src/main/java/org/apache/rocketmq/client/consumer/NotifyResult.java @@ -14,27 +14,32 @@ * See the License for the specific language governing permissions and * limitations under the License. */ -package org.apache.rocketmq.remoting.protocol.header; +package org.apache.rocketmq.client.consumer; -import org.apache.rocketmq.remoting.CommandCustomHeader; -import org.apache.rocketmq.remoting.annotation.CFNotNull; -import org.apache.rocketmq.remoting.exception.RemotingCommandException; - -public class NotificationResponseHeader implements CommandCustomHeader { - - - @CFNotNull - private boolean hasMsg = false; +public class NotifyResult { + private boolean hasMsg; + private boolean pollingFull; public boolean isHasMsg() { return hasMsg; } + public boolean isPollingFull() { + return pollingFull; + } + public void setHasMsg(boolean hasMsg) { this.hasMsg = hasMsg; } - @Override - public void checkFields() throws RemotingCommandException { + public void setPollingFull(boolean pollingFull) { + this.pollingFull = pollingFull; + } + + @Override public String toString() { + return "NotifyResult{" + + "hasMsg=" + hasMsg + + ", pollingFull=" + pollingFull + + '}'; } } diff --git a/client/src/main/java/org/apache/rocketmq/client/impl/mqclient/MQClientAPIExt.java b/client/src/main/java/org/apache/rocketmq/client/impl/mqclient/MQClientAPIExt.java index c22f453477..9089503407 100644 --- a/client/src/main/java/org/apache/rocketmq/client/impl/mqclient/MQClientAPIExt.java +++ b/client/src/main/java/org/apache/rocketmq/client/impl/mqclient/MQClientAPIExt.java @@ -24,6 +24,7 @@ import java.util.concurrent.CompletableFuture; import org.apache.rocketmq.client.ClientConfig; import org.apache.rocketmq.client.consumer.AckCallback; import org.apache.rocketmq.client.consumer.AckResult; +import org.apache.rocketmq.client.consumer.NotifyResult; import org.apache.rocketmq.client.consumer.PopCallback; import org.apache.rocketmq.client.consumer.PopResult; import org.apache.rocketmq.client.consumer.PullCallback; @@ -620,14 +621,23 @@ public class MQClientAPIExt extends MQClientAPIImpl { } public CompletableFuture<Boolean> notification(String brokerAddr, NotificationRequestHeader requestHeader, + long timeoutMillis) { + return notificationWithPollingStats(brokerAddr, requestHeader, timeoutMillis).thenApply(NotifyResult::isHasMsg); + } + + public CompletableFuture<NotifyResult> notificationWithPollingStats(String brokerAddr, + NotificationRequestHeader requestHeader, long timeoutMillis) { RemotingCommand request = RemotingCommand.createRequestCommand(RequestCode.NOTIFICATION, requestHeader); return this.getRemotingClient().invoke(brokerAddr, request, timeoutMillis).thenCompose(response -> { - CompletableFuture<Boolean> future0 = new CompletableFuture<>(); + CompletableFuture<NotifyResult> future0 = new CompletableFuture<>(); if (response.getCode() == ResponseCode.SUCCESS) { try { NotificationResponseHeader responseHeader = (NotificationResponseHeader) response.decodeCommandCustomHeader(NotificationResponseHeader.class); - future0.complete(responseHeader.isHasMsg()); + NotifyResult notifyResult = new NotifyResult(); + notifyResult.setHasMsg(responseHeader.isHasMsg()); + notifyResult.setPollingFull(responseHeader.isPollingFull()); + future0.complete(notifyResult); } catch (Throwable t) { future0.completeExceptionally(t); } diff --git a/remoting/src/main/java/org/apache/rocketmq/remoting/protocol/header/NotificationResponseHeader.java b/remoting/src/main/java/org/apache/rocketmq/remoting/protocol/header/NotificationResponseHeader.java index cbab597401..027717e006 100644 --- a/remoting/src/main/java/org/apache/rocketmq/remoting/protocol/header/NotificationResponseHeader.java +++ b/remoting/src/main/java/org/apache/rocketmq/remoting/protocol/header/NotificationResponseHeader.java @@ -26,10 +26,20 @@ public class NotificationResponseHeader implements CommandCustomHeader { @CFNotNull private boolean hasMsg = false; + private boolean pollingFull = false; + public boolean isHasMsg() { return hasMsg; } + public boolean isPollingFull() { + return pollingFull; + } + + public void setPollingFull(boolean pollingFull) { + this.pollingFull = pollingFull; + } + public void setHasMsg(boolean hasMsg) { this.hasMsg = hasMsg; }