redlsz opened a new issue, #9253:
URL: https://github.com/apache/rocketmq/issues/9253

   ### Before Creating the Enhancement Request
   
   - [x] I have confirmed that this should be classified as an enhancement 
rather than a bug/feature.
   
   
   ### Summary
   
   In proxy receiveMessage process, when adding message renew task fails due to 
client disconnection, we can make the message visible earlier by calling 
changeInvisibleTime, to reduce delay of the next consumption retry.
   
   ### Motivation
   
   Reproduce code:
   
   ```
   public static void main(String[] args) throws ClientException, IOException, 
InterruptedException {
           String topic = "topic";
           String consumerGroup = "group";
           String accessKey = "xxx";
           String secretKey = "xxx";
           String endpoints = "127.0.0.1:8080";
   
           SessionCredentialsProvider sessionCredentialsProvider = new 
StaticSessionCredentialsProvider(accessKey, secretKey);
           ClientConfiguration clientConfiguration = 
ClientConfiguration.newBuilder()
               .setEndpoints(endpoints)
               .enableSsl(false)
               .setCredentialProvider(sessionCredentialsProvider)
               .build();
   
           ClientServiceProvider provider = ClientServiceProvider.loadService();
           PushConsumerBuilder pushConsumerBuilder = 
provider.newPushConsumerBuilder()
               .setClientConfiguration(clientConfiguration)
               .setConsumerGroup(consumerGroup)
               .setSubscriptionExpressions(Collections.singletonMap(topic, new 
FilterExpression("*", FilterExpressionType.TAG)))
               .setMessageListener(messageView -> {
                   System.out.printf("%s [consumer] receive %s attempts=%d%n",
                       dateFormat.format(new Date()), 
messageView.getMessageId(), messageView.getDeliveryAttempt());
                   return ConsumeResult.SUCCESS;
               });
   
           // Start consumer
           AtomicReference<PushConsumer> consumerRef = new 
AtomicReference<>(pushConsumerBuilder.build());
           System.out.printf("%s [consumer] started %n", dateFormat.format(new 
Date()));
   
           // Wait for a while
           TimeUnit.SECONDS.sleep(3);
   
           new Thread(() -> {
               try {
                   // Restart consumer
                   consumerRef.get().close();
                   System.out.printf("%s [consumer] closed %n", 
dateFormat.format(new Date()));
                   consumerRef.set(pushConsumerBuilder.build());
                   System.out.printf("%s [consumer] restarted %n", 
dateFormat.format(new Date()));
               } catch (Exception e) {
                   e.printStackTrace();
               }
           }).start();
   
           // Wait for a while
           TimeUnit.SECONDS.sleep(1);
   
           // Then send one message
           Producer producer = provider.newProducerBuilder()
               .setClientConfiguration(clientConfiguration)
               .setTopics(topic)
               .build();
           System.out.printf("%s [producer] started %n", dateFormat.format(new 
Date()));
           MessageBuilder messageBuilder = provider.newMessageBuilder()
               .setTopic(topic)
               .setBody("BODY".getBytes());
           SendReceipt receipt = producer.send(messageBuilder.build());
           System.out.printf("%s [producer] send %s %n", dateFormat.format(new 
Date()), receipt.getMessageId());
   
           TimeUnit.MINUTES.sleep(10);
           producer.close();
           consumerRef.get().close();
       }
   ```
   
   Output:
   <img width="799" alt="Image" 
src="https://github.com/user-attachments/assets/aaac9e0d-45e1-4d52-aa51-155c20557fe3";
 />
   
   Proxy NullPointerException logs:
   > 2025-03-17 10:30:11 INFO GrpcClientManagerThreadPool-1 - clear handle of 
this client when client unregister. group:group, 
clientChannelInfo:ClientChannelInfo 
[channel=GrpcClientChannel{clientId=ANONYMOUS-MB0@28703@0@f21f9v1emn, 
remoteAddress=127.0.0.1:52424, localAddress=127.0.0.1:8081}, 
clientId=ANONYMOUS-MB0@28703@0@f21f9v1emn, language=JAVA, version=413, 
lastUpdateTimestamp=1742178611780]
   2025-03-17 10:30:11 INFO GrpcClientManagerThreadPool-1 - remove grpc channel 
when client unregister. group:group, clientChannelInfo:ClientChannelInfo 
[channel=GrpcClientChannel{clientId=ANONYMOUS-MB0@28703@0@f21f9v1emn, 
remoteAddress=127.0.0.1:52424, localAddress=127.0.0.1:8081}, 
clientId=ANONYMOUS-MB0@28703@0@f21f9v1emn, language=JAVA, version=413, 
lastUpdateTimestamp=1742178611780], removed:false
   2025-03-17 10:30:11 INFO GrpcClientManagerThreadPool-1 - remove remoting 
channel when client unregister. clientChannelInfo:ClientChannelInfo 
[channel=GrpcClientChannel{clientId=ANONYMOUS-MB0@28703@0@f21f9v1emn, 
remoteAddress=127.0.0.1:52424, localAddress=127.0.0.1:8081}, 
clientId=ANONYMOUS-MB0@28703@0@f21f9v1emn, language=JAVA, version=413, 
lastUpdateTimestamp=1742178611780]
   2025-03-17 10:30:12 ERROR ConsumerProcessorExecutor-9 - internal server error
   java.lang.NullPointerException: null
        at 
org.apache.rocketmq.proxy.common.ReceiptHandleGroupKey.getChannelId(ReceiptHandleGroupKey.java:34)
        at 
org.apache.rocketmq.proxy.common.ReceiptHandleGroupKey.hashCode(ReceiptHandleGroupKey.java:59)
        at 
java.util.concurrent.ConcurrentHashMap.get(ConcurrentHashMap.java:936)
        at 
org.apache.rocketmq.common.utils.ConcurrentHashMapUtils.computeIfAbsent(ConcurrentHashMapUtils.java:48)
        at 
org.apache.rocketmq.proxy.service.receipt.DefaultReceiptHandleManager.addReceiptHandle(DefaultReceiptHandleManager.java:128)
        at 
org.apache.rocketmq.proxy.processor.ReceiptHandleProcessor.addReceiptHandle(ReceiptHandleProcessor.java:61)
        at 
org.apache.rocketmq.proxy.processor.DefaultMessagingProcessor.addReceiptHandle(DefaultMessagingProcessor.java:362)
        at 
org.apache.rocketmq.proxy.grpc.v2.consumer.ReceiveMessageActivity.lambda$receiveMessage$0(ReceiveMessageActivity.java:145)
        at 
java.util.concurrent.CompletableFuture.uniAccept(CompletableFuture.java:670)
        at 
java.util.concurrent.CompletableFuture$UniAccept.tryFire(CompletableFuture.java:646)
        at 
java.util.concurrent.CompletableFuture.postComplete(CompletableFuture.java:488)
        at 
java.util.concurrent.CompletableFuture.complete(CompletableFuture.java:1975)
        at 
org.apache.rocketmq.common.utils.FutureUtils.lambda$appendNextFuture$0(FutureUtils.java:31)
        at 
java.util.concurrent.CompletableFuture.uniWhenComplete(CompletableFuture.java:774)
        at 
java.util.concurrent.CompletableFuture$UniWhenComplete.tryFire(CompletableFuture.java:750)
        at 
java.util.concurrent.CompletableFuture$Completion.run(CompletableFuture.java:456)
        at 
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
        at 
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
        at java.lang.Thread.run(Thread.java:750)
   
   ### Describe the Solution You'd Like
   
   Call changeInvisibleTime  to make message visible earlier when this happens.
   
   ### Describe Alternatives You've Considered
   
   /
   
   ### Additional Context
   
   _No response_


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@rocketmq.apache.org.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to