This is an automated email from the ASF dual-hosted git repository.

lollipop pushed a commit to branch develop
in repository https://gitbox.apache.org/repos/asf/rocketmq.git


The following commit(s) were added to refs/heads/develop by this push:
     new 8859093a1d make ctx constructed in scheduleRenewTask (#8556)
8859093a1d is described below

commit 8859093a1d345dc98a119fd2ae6fc2b14faa76ee
Author: 吴星灿 <37405937+qianye1...@users.noreply.github.com>
AuthorDate: Wed Aug 21 17:29:00 2024 +0800

    make ctx constructed in scheduleRenewTask (#8556)
---
 .../proxy/service/receipt/DefaultReceiptHandleManager.java     | 10 +++++-----
 1 file changed, 5 insertions(+), 5 deletions(-)

diff --git 
a/proxy/src/main/java/org/apache/rocketmq/proxy/service/receipt/DefaultReceiptHandleManager.java
 
b/proxy/src/main/java/org/apache/rocketmq/proxy/service/receipt/DefaultReceiptHandleManager.java
index 3948824a39..0cb519306e 100644
--- 
a/proxy/src/main/java/org/apache/rocketmq/proxy/service/receipt/DefaultReceiptHandleManager.java
+++ 
b/proxy/src/main/java/org/apache/rocketmq/proxy/service/receipt/DefaultReceiptHandleManager.java
@@ -159,7 +159,8 @@ public class DefaultReceiptHandleManager extends 
AbstractStartAndShutdown implem
                     if (handle.getNextVisibleTime() - current > 
proxyConfig.getRenewAheadTimeMillis()) {
                         return;
                     }
-                    renewalWorkerService.submit(() -> renewMessage(key, group, 
msgID, handleStr));
+                    renewalWorkerService.submit(() -> 
renewMessage(createContext("RenewMessage"), key, group,
+                        msgID, handleStr));
                 });
             }
         } catch (Exception e) {
@@ -169,15 +170,15 @@ public class DefaultReceiptHandleManager extends 
AbstractStartAndShutdown implem
         log.debug("scan for renewal done. cost:{}ms", 
stopwatch.elapsed().toMillis());
     }
 
-    protected void renewMessage(ReceiptHandleGroupKey key, ReceiptHandleGroup 
group, String msgID, String handleStr) {
+    protected void renewMessage(ProxyContext context, ReceiptHandleGroupKey 
key, ReceiptHandleGroup group, String msgID, String handleStr) {
         try {
-            group.computeIfPresent(msgID, handleStr, messageReceiptHandle -> 
startRenewMessage(key, messageReceiptHandle));
+            group.computeIfPresent(msgID, handleStr, messageReceiptHandle -> 
startRenewMessage(context, key, messageReceiptHandle));
         } catch (Exception e) {
             log.error("error when renew message. msgID:{}, handleStr:{}", 
msgID, handleStr, e);
         }
     }
 
-    protected CompletableFuture<MessageReceiptHandle> 
startRenewMessage(ReceiptHandleGroupKey key, MessageReceiptHandle 
messageReceiptHandle) {
+    protected CompletableFuture<MessageReceiptHandle> 
startRenewMessage(ProxyContext context, ReceiptHandleGroupKey key, 
MessageReceiptHandle messageReceiptHandle) {
         CompletableFuture<MessageReceiptHandle> resFuture = new 
CompletableFuture<>();
         ProxyConfig proxyConfig = ConfigurationManager.getProxyConfig();
         long current = System.currentTimeMillis();
@@ -209,7 +210,6 @@ public class DefaultReceiptHandleManager extends 
AbstractStartAndShutdown implem
                     }
                 });
             } else {
-                ProxyContext context = createContext("RenewMessage");
                 SubscriptionGroupConfig subscriptionGroupConfig =
                     metadataService.getSubscriptionGroupConfig(context, 
messageReceiptHandle.getGroup());
                 if (subscriptionGroupConfig == null) {

Reply via email to