This is an automated email from the ASF dual-hosted git repository.

lizhimin pushed a commit to branch develop
in repository https://gitbox.apache.org/repos/asf/rocketmq.git


The following commit(s) were added to refs/heads/develop by this push:
     new 0b687a9dd8 [ISSUE #8997] Ensure there is an opportunity to send a 
retry message when broker no response (#9137)
0b687a9dd8 is described below

commit 0b687a9dd81075c8d811f1c49fdd1c59502db1c9
Author: gaoyf <ga...@users.noreply.github.com>
AuthorDate: Mon Mar 10 16:03:10 2025 +0800

    [ISSUE #8997] Ensure there is an opportunity to send a retry message when 
broker no response (#9137)
---
 .../client/impl/producer/DefaultMQProducerImpl.java         | 12 ++++++++++--
 .../apache/rocketmq/client/producer/DefaultMQProducer.java  | 13 +++++++++++++
 2 files changed, 23 insertions(+), 2 deletions(-)

diff --git 
a/client/src/main/java/org/apache/rocketmq/client/impl/producer/DefaultMQProducerImpl.java
 
b/client/src/main/java/org/apache/rocketmq/client/impl/producer/DefaultMQProducerImpl.java
index 15264f0e50..4aa605821f 100644
--- 
a/client/src/main/java/org/apache/rocketmq/client/impl/producer/DefaultMQProducerImpl.java
+++ 
b/client/src/main/java/org/apache/rocketmq/client/impl/producer/DefaultMQProducerImpl.java
@@ -777,8 +777,16 @@ public class DefaultMQProducerImpl implements 
MQProducerInner {
                             callTimeout = true;
                             break;
                         }
-
-                        sendResult = this.sendKernelImpl(msg, mq, 
communicationMode, sendCallback, topicPublishInfo, timeout - costTime);
+                        long curTimeout = timeout - costTime;
+                        // Get the maximum timeout allowed per request
+                        long maxSendTimeoutPerRequest = 
defaultMQProducer.getSendMsgMaxTimeoutPerRequest();
+                        // Determine if retries are still possible
+                        boolean canRetryAgain = times + 1 < timesTotal;
+                        // If retries are possible, and the current timeout 
exceeds the max allowed timeout, set the current timeout to the max allowed
+                        if (maxSendTimeoutPerRequest > -1 && canRetryAgain && 
curTimeout > maxSendTimeoutPerRequest) {
+                            curTimeout = maxSendTimeoutPerRequest;
+                        }
+                        sendResult = this.sendKernelImpl(msg, mq, 
communicationMode, sendCallback, topicPublishInfo, curTimeout);
                         endTimestamp = System.currentTimeMillis();
                         this.updateFaultItem(mq.getBrokerName(), endTimestamp 
- beginTimestampPrev, false, true);
                         switch (communicationMode) {
diff --git 
a/client/src/main/java/org/apache/rocketmq/client/producer/DefaultMQProducer.java
 
b/client/src/main/java/org/apache/rocketmq/client/producer/DefaultMQProducer.java
index e3f81ad968..11edcaa441 100644
--- 
a/client/src/main/java/org/apache/rocketmq/client/producer/DefaultMQProducer.java
+++ 
b/client/src/main/java/org/apache/rocketmq/client/producer/DefaultMQProducer.java
@@ -115,6 +115,11 @@ public class DefaultMQProducer extends ClientConfig 
implements MQProducer {
      */
     private int sendMsgTimeout = 3000;
 
+    /**
+     * Max timeout for sending messages per request.
+     */
+    private int sendMsgMaxTimeoutPerRequest = -1;
+
     /**
      * Compress message body threshold, namely, message body larger than 4k 
will be compressed on default.
      */
@@ -1259,6 +1264,14 @@ public class DefaultMQProducer extends ClientConfig 
implements MQProducer {
         this.sendMsgTimeout = sendMsgTimeout;
     }
 
+    public int getSendMsgMaxTimeoutPerRequest() {
+        return sendMsgMaxTimeoutPerRequest;
+    }
+
+    public void setSendMsgMaxTimeoutPerRequest(int 
sendMsgMaxTimeoutPerRequest) {
+        this.sendMsgMaxTimeoutPerRequest = sendMsgMaxTimeoutPerRequest;
+    }
+
     public int getCompressMsgBodyOverHowmuch() {
         return compressMsgBodyOverHowmuch;
     }

Reply via email to