This is an automated email from the ASF dual-hosted git repository. lollipop pushed a commit to branch develop in repository https://gitbox.apache.org/repos/asf/rocketmq.git
The following commit(s) were added to refs/heads/develop by this push: new 8859093a1d make ctx constructed in scheduleRenewTask (#8556) 8859093a1d is described below commit 8859093a1d345dc98a119fd2ae6fc2b14faa76ee Author: 吴星灿 <37405937+qianye1...@users.noreply.github.com> AuthorDate: Wed Aug 21 17:29:00 2024 +0800 make ctx constructed in scheduleRenewTask (#8556) --- .../proxy/service/receipt/DefaultReceiptHandleManager.java | 10 +++++----- 1 file changed, 5 insertions(+), 5 deletions(-) diff --git a/proxy/src/main/java/org/apache/rocketmq/proxy/service/receipt/DefaultReceiptHandleManager.java b/proxy/src/main/java/org/apache/rocketmq/proxy/service/receipt/DefaultReceiptHandleManager.java index 3948824a39..0cb519306e 100644 --- a/proxy/src/main/java/org/apache/rocketmq/proxy/service/receipt/DefaultReceiptHandleManager.java +++ b/proxy/src/main/java/org/apache/rocketmq/proxy/service/receipt/DefaultReceiptHandleManager.java @@ -159,7 +159,8 @@ public class DefaultReceiptHandleManager extends AbstractStartAndShutdown implem if (handle.getNextVisibleTime() - current > proxyConfig.getRenewAheadTimeMillis()) { return; } - renewalWorkerService.submit(() -> renewMessage(key, group, msgID, handleStr)); + renewalWorkerService.submit(() -> renewMessage(createContext("RenewMessage"), key, group, + msgID, handleStr)); }); } } catch (Exception e) { @@ -169,15 +170,15 @@ public class DefaultReceiptHandleManager extends AbstractStartAndShutdown implem log.debug("scan for renewal done. cost:{}ms", stopwatch.elapsed().toMillis()); } - protected void renewMessage(ReceiptHandleGroupKey key, ReceiptHandleGroup group, String msgID, String handleStr) { + protected void renewMessage(ProxyContext context, ReceiptHandleGroupKey key, ReceiptHandleGroup group, String msgID, String handleStr) { try { - group.computeIfPresent(msgID, handleStr, messageReceiptHandle -> startRenewMessage(key, messageReceiptHandle)); + group.computeIfPresent(msgID, handleStr, messageReceiptHandle -> startRenewMessage(context, key, messageReceiptHandle)); } catch (Exception e) { log.error("error when renew message. msgID:{}, handleStr:{}", msgID, handleStr, e); } } - protected CompletableFuture<MessageReceiptHandle> startRenewMessage(ReceiptHandleGroupKey key, MessageReceiptHandle messageReceiptHandle) { + protected CompletableFuture<MessageReceiptHandle> startRenewMessage(ProxyContext context, ReceiptHandleGroupKey key, MessageReceiptHandle messageReceiptHandle) { CompletableFuture<MessageReceiptHandle> resFuture = new CompletableFuture<>(); ProxyConfig proxyConfig = ConfigurationManager.getProxyConfig(); long current = System.currentTimeMillis(); @@ -209,7 +210,6 @@ public class DefaultReceiptHandleManager extends AbstractStartAndShutdown implem } }); } else { - ProxyContext context = createContext("RenewMessage"); SubscriptionGroupConfig subscriptionGroupConfig = metadataService.getSubscriptionGroupConfig(context, messageReceiptHandle.getGroup()); if (subscriptionGroupConfig == null) {