This is an automated email from the ASF dual-hosted git repository. lizhimin pushed a commit to branch develop in repository https://gitbox.apache.org/repos/asf/rocketmq.git
The following commit(s) were added to refs/heads/develop by this push: new 0b687a9dd8 [ISSUE #8997] Ensure there is an opportunity to send a retry message when broker no response (#9137) 0b687a9dd8 is described below commit 0b687a9dd81075c8d811f1c49fdd1c59502db1c9 Author: gaoyf <ga...@users.noreply.github.com> AuthorDate: Mon Mar 10 16:03:10 2025 +0800 [ISSUE #8997] Ensure there is an opportunity to send a retry message when broker no response (#9137) --- .../client/impl/producer/DefaultMQProducerImpl.java | 12 ++++++++++-- .../apache/rocketmq/client/producer/DefaultMQProducer.java | 13 +++++++++++++ 2 files changed, 23 insertions(+), 2 deletions(-) diff --git a/client/src/main/java/org/apache/rocketmq/client/impl/producer/DefaultMQProducerImpl.java b/client/src/main/java/org/apache/rocketmq/client/impl/producer/DefaultMQProducerImpl.java index 15264f0e50..4aa605821f 100644 --- a/client/src/main/java/org/apache/rocketmq/client/impl/producer/DefaultMQProducerImpl.java +++ b/client/src/main/java/org/apache/rocketmq/client/impl/producer/DefaultMQProducerImpl.java @@ -777,8 +777,16 @@ public class DefaultMQProducerImpl implements MQProducerInner { callTimeout = true; break; } - - sendResult = this.sendKernelImpl(msg, mq, communicationMode, sendCallback, topicPublishInfo, timeout - costTime); + long curTimeout = timeout - costTime; + // Get the maximum timeout allowed per request + long maxSendTimeoutPerRequest = defaultMQProducer.getSendMsgMaxTimeoutPerRequest(); + // Determine if retries are still possible + boolean canRetryAgain = times + 1 < timesTotal; + // If retries are possible, and the current timeout exceeds the max allowed timeout, set the current timeout to the max allowed + if (maxSendTimeoutPerRequest > -1 && canRetryAgain && curTimeout > maxSendTimeoutPerRequest) { + curTimeout = maxSendTimeoutPerRequest; + } + sendResult = this.sendKernelImpl(msg, mq, communicationMode, sendCallback, topicPublishInfo, curTimeout); endTimestamp = System.currentTimeMillis(); this.updateFaultItem(mq.getBrokerName(), endTimestamp - beginTimestampPrev, false, true); switch (communicationMode) { diff --git a/client/src/main/java/org/apache/rocketmq/client/producer/DefaultMQProducer.java b/client/src/main/java/org/apache/rocketmq/client/producer/DefaultMQProducer.java index e3f81ad968..11edcaa441 100644 --- a/client/src/main/java/org/apache/rocketmq/client/producer/DefaultMQProducer.java +++ b/client/src/main/java/org/apache/rocketmq/client/producer/DefaultMQProducer.java @@ -115,6 +115,11 @@ public class DefaultMQProducer extends ClientConfig implements MQProducer { */ private int sendMsgTimeout = 3000; + /** + * Max timeout for sending messages per request. + */ + private int sendMsgMaxTimeoutPerRequest = -1; + /** * Compress message body threshold, namely, message body larger than 4k will be compressed on default. */ @@ -1259,6 +1264,14 @@ public class DefaultMQProducer extends ClientConfig implements MQProducer { this.sendMsgTimeout = sendMsgTimeout; } + public int getSendMsgMaxTimeoutPerRequest() { + return sendMsgMaxTimeoutPerRequest; + } + + public void setSendMsgMaxTimeoutPerRequest(int sendMsgMaxTimeoutPerRequest) { + this.sendMsgMaxTimeoutPerRequest = sendMsgMaxTimeoutPerRequest; + } + public int getCompressMsgBodyOverHowmuch() { return compressMsgBodyOverHowmuch; }