This is an automated email from the ASF dual-hosted git repository. jinrongtong pushed a commit to branch develop in repository https://gitbox.apache.org/repos/asf/rocketmq.git
The following commit(s) were added to refs/heads/develop by this push: new 2d44ec897c [ISSUE #8261] Avoid unnecessary waiting when a response is successfully returned (#8272) 2d44ec897c is described below commit 2d44ec897c5ff9e1ce46a8ac8765c5cf493c7ac6 Author: hqbfz <125714719+3424672...@users.noreply.github.com> AuthorDate: Mon Jul 29 18:55:55 2024 +0800 [ISSUE #8261] Avoid unnecessary waiting when a response is successfully returned (#8272) --- .../apache/rocketmq/client/impl/producer/DefaultMQProducerImpl.java | 5 ++++- .../org/apache/rocketmq/client/producer/RequestResponseFuture.java | 4 ++++ 2 files changed, 8 insertions(+), 1 deletion(-) diff --git a/client/src/main/java/org/apache/rocketmq/client/impl/producer/DefaultMQProducerImpl.java b/client/src/main/java/org/apache/rocketmq/client/impl/producer/DefaultMQProducerImpl.java index 7ef3402513..0e70ee2595 100644 --- a/client/src/main/java/org/apache/rocketmq/client/impl/producer/DefaultMQProducerImpl.java +++ b/client/src/main/java/org/apache/rocketmq/client/impl/producer/DefaultMQProducerImpl.java @@ -969,7 +969,7 @@ public class DefaultMQProducerImpl implements MQProducerInner { boolean messageCloned = false; if (msgBodyCompressed) { //If msg body was compressed, msgbody should be reset using prevBody. - //Clone new message using commpressed message body and recover origin massage. + //Clone new message using compressed message body and recover origin massage. //Fix bug:https://github.com/apache/rocketmq-externals/issues/66 tmpMessage = MessageAccessor.cloneMessage(msg); messageCloned = true; @@ -1538,6 +1538,7 @@ public class DefaultMQProducerImpl implements MQProducerInner { @Override public void onSuccess(SendResult sendResult) { requestResponseFuture.setSendRequestOk(true); + requestResponseFuture.acquireCountDownLatch(); } @Override @@ -1595,6 +1596,7 @@ public class DefaultMQProducerImpl implements MQProducerInner { @Override public void onSuccess(SendResult sendResult) { requestResponseFuture.setSendRequestOk(true); + requestResponseFuture.acquireCountDownLatch(); } @Override @@ -1652,6 +1654,7 @@ public class DefaultMQProducerImpl implements MQProducerInner { @Override public void onSuccess(SendResult sendResult) { requestResponseFuture.setSendRequestOk(true); + requestResponseFuture.acquireCountDownLatch(); } @Override diff --git a/client/src/main/java/org/apache/rocketmq/client/producer/RequestResponseFuture.java b/client/src/main/java/org/apache/rocketmq/client/producer/RequestResponseFuture.java index e66c08fdc5..203f92907a 100644 --- a/client/src/main/java/org/apache/rocketmq/client/producer/RequestResponseFuture.java +++ b/client/src/main/java/org/apache/rocketmq/client/producer/RequestResponseFuture.java @@ -107,6 +107,10 @@ public class RequestResponseFuture { this.sendRequestOk = sendRequestOk; } + public void acquireCountDownLatch() { + this.countDownLatch.countDown(); + } + public Message getRequestMsg() { return requestMsg; }