redlsz opened a new issue, #9253: URL: https://github.com/apache/rocketmq/issues/9253
### Before Creating the Enhancement Request - [x] I have confirmed that this should be classified as an enhancement rather than a bug/feature. ### Summary In proxy receiveMessage process, when adding message renew task fails due to client disconnection, we can make the message visible earlier by calling changeInvisibleTime, to reduce delay of the next consumption retry. ### Motivation Reproduce code: ``` public static void main(String[] args) throws ClientException, IOException, InterruptedException { String topic = "topic"; String consumerGroup = "group"; String accessKey = "xxx"; String secretKey = "xxx"; String endpoints = "127.0.0.1:8080"; SessionCredentialsProvider sessionCredentialsProvider = new StaticSessionCredentialsProvider(accessKey, secretKey); ClientConfiguration clientConfiguration = ClientConfiguration.newBuilder() .setEndpoints(endpoints) .enableSsl(false) .setCredentialProvider(sessionCredentialsProvider) .build(); ClientServiceProvider provider = ClientServiceProvider.loadService(); PushConsumerBuilder pushConsumerBuilder = provider.newPushConsumerBuilder() .setClientConfiguration(clientConfiguration) .setConsumerGroup(consumerGroup) .setSubscriptionExpressions(Collections.singletonMap(topic, new FilterExpression("*", FilterExpressionType.TAG))) .setMessageListener(messageView -> { System.out.printf("%s [consumer] receive %s attempts=%d%n", dateFormat.format(new Date()), messageView.getMessageId(), messageView.getDeliveryAttempt()); return ConsumeResult.SUCCESS; }); // Start consumer AtomicReference<PushConsumer> consumerRef = new AtomicReference<>(pushConsumerBuilder.build()); System.out.printf("%s [consumer] started %n", dateFormat.format(new Date())); // Wait for a while TimeUnit.SECONDS.sleep(3); new Thread(() -> { try { // Restart consumer consumerRef.get().close(); System.out.printf("%s [consumer] closed %n", dateFormat.format(new Date())); consumerRef.set(pushConsumerBuilder.build()); System.out.printf("%s [consumer] restarted %n", dateFormat.format(new Date())); } catch (Exception e) { e.printStackTrace(); } }).start(); // Wait for a while TimeUnit.SECONDS.sleep(1); // Then send one message Producer producer = provider.newProducerBuilder() .setClientConfiguration(clientConfiguration) .setTopics(topic) .build(); System.out.printf("%s [producer] started %n", dateFormat.format(new Date())); MessageBuilder messageBuilder = provider.newMessageBuilder() .setTopic(topic) .setBody("BODY".getBytes()); SendReceipt receipt = producer.send(messageBuilder.build()); System.out.printf("%s [producer] send %s %n", dateFormat.format(new Date()), receipt.getMessageId()); TimeUnit.MINUTES.sleep(10); producer.close(); consumerRef.get().close(); } ``` Output: <img width="799" alt="Image" src="https://github.com/user-attachments/assets/aaac9e0d-45e1-4d52-aa51-155c20557fe3" /> Proxy NullPointerException logs: > 2025-03-17 10:30:11 INFO GrpcClientManagerThreadPool-1 - clear handle of this client when client unregister. group:group, clientChannelInfo:ClientChannelInfo [channel=GrpcClientChannel{clientId=ANONYMOUS-MB0@28703@0@f21f9v1emn, remoteAddress=127.0.0.1:52424, localAddress=127.0.0.1:8081}, clientId=ANONYMOUS-MB0@28703@0@f21f9v1emn, language=JAVA, version=413, lastUpdateTimestamp=1742178611780] 2025-03-17 10:30:11 INFO GrpcClientManagerThreadPool-1 - remove grpc channel when client unregister. group:group, clientChannelInfo:ClientChannelInfo [channel=GrpcClientChannel{clientId=ANONYMOUS-MB0@28703@0@f21f9v1emn, remoteAddress=127.0.0.1:52424, localAddress=127.0.0.1:8081}, clientId=ANONYMOUS-MB0@28703@0@f21f9v1emn, language=JAVA, version=413, lastUpdateTimestamp=1742178611780], removed:false 2025-03-17 10:30:11 INFO GrpcClientManagerThreadPool-1 - remove remoting channel when client unregister. clientChannelInfo:ClientChannelInfo [channel=GrpcClientChannel{clientId=ANONYMOUS-MB0@28703@0@f21f9v1emn, remoteAddress=127.0.0.1:52424, localAddress=127.0.0.1:8081}, clientId=ANONYMOUS-MB0@28703@0@f21f9v1emn, language=JAVA, version=413, lastUpdateTimestamp=1742178611780] 2025-03-17 10:30:12 ERROR ConsumerProcessorExecutor-9 - internal server error java.lang.NullPointerException: null at org.apache.rocketmq.proxy.common.ReceiptHandleGroupKey.getChannelId(ReceiptHandleGroupKey.java:34) at org.apache.rocketmq.proxy.common.ReceiptHandleGroupKey.hashCode(ReceiptHandleGroupKey.java:59) at java.util.concurrent.ConcurrentHashMap.get(ConcurrentHashMap.java:936) at org.apache.rocketmq.common.utils.ConcurrentHashMapUtils.computeIfAbsent(ConcurrentHashMapUtils.java:48) at org.apache.rocketmq.proxy.service.receipt.DefaultReceiptHandleManager.addReceiptHandle(DefaultReceiptHandleManager.java:128) at org.apache.rocketmq.proxy.processor.ReceiptHandleProcessor.addReceiptHandle(ReceiptHandleProcessor.java:61) at org.apache.rocketmq.proxy.processor.DefaultMessagingProcessor.addReceiptHandle(DefaultMessagingProcessor.java:362) at org.apache.rocketmq.proxy.grpc.v2.consumer.ReceiveMessageActivity.lambda$receiveMessage$0(ReceiveMessageActivity.java:145) at java.util.concurrent.CompletableFuture.uniAccept(CompletableFuture.java:670) at java.util.concurrent.CompletableFuture$UniAccept.tryFire(CompletableFuture.java:646) at java.util.concurrent.CompletableFuture.postComplete(CompletableFuture.java:488) at java.util.concurrent.CompletableFuture.complete(CompletableFuture.java:1975) at org.apache.rocketmq.common.utils.FutureUtils.lambda$appendNextFuture$0(FutureUtils.java:31) at java.util.concurrent.CompletableFuture.uniWhenComplete(CompletableFuture.java:774) at java.util.concurrent.CompletableFuture$UniWhenComplete.tryFire(CompletableFuture.java:750) at java.util.concurrent.CompletableFuture$Completion.run(CompletableFuture.java:456) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) at java.lang.Thread.run(Thread.java:750) ### Describe the Solution You'd Like Call changeInvisibleTime to make message visible earlier when this happens. ### Describe Alternatives You've Considered / ### Additional Context _No response_ -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@rocketmq.apache.org.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org