bruce256 opened a new issue, #8900: URL: https://github.com/apache/rocketmq/issues/8900
### Before Creating the Bug Report - [X] I found a bug, not just asking a question, which should be created in [GitHub Discussions](https://github.com/apache/rocketmq/discussions). - [X] I have searched the [GitHub Issues](https://github.com/apache/rocketmq/issues) and [GitHub Discussions](https://github.com/apache/rocketmq/discussions) of this repository and believe that this is not a duplicate. - [X] I have confirmed that this bug belongs to the current repository, not other repositories of RocketMQ. ### Runtime platform environment centos 7.9 ### RocketMQ version rocketmq 5.3.0 rocketmq-client 5.0.6 ### JDK Version jdk 1.8 ### Describe the Bug 本地消息消费成功,但是broker依然重复投递消息,直到进入死信队列。与此同时,proxy.log报如下异常堆栈。客户端到proxy的端口网络正常.刚开始部署的时候是好的,近期才出现 ``` 2024-11-07 16:12:31 INFO pool-4-thread-1 - clear handle of this client when client unregister. group:test, clientChannelInfo:ClientChannelInfo [channel=GrpcClientChannel{clientId=V-LVSHENG1@10572@1@x8pkjaacj4, remoteAddress=10.43.165.207:65031, localAddress=10.0.82.170:8081}, clientId=V-LVSHENG1@10572@1@x8pkjaacj4, language=JAVA, version=473, lastUpdateTimestamp=1730967025698] 2024-11-07 16:12:31 INFO pool-4-thread-1 - remove grpc channel when client unregister. group:test, clientChannelInfo:ClientChannelInfo [channel=GrpcClientChannel{clientId=V-LVSHENG1@10572@1@x8pkjaacj4, remoteAddress=10.43.165.207:65031, localAddress=10.0.82.170:8081}, clientId=V-LVSHENG1@10572@1@x8pkjaacj4, language=JAVA, version=473, lastUpdateTimestamp=1730967025698], removed:true 2024-11-07 16:12:31 INFO pool-4-thread-1 - remove remoting channel when client unregister. clientChannelInfo:ClientChannelInfo [channel=GrpcClientChannel{clientId=V-LVSHENG1@10572@1@x8pkjaacj4, remoteAddress=10.43.165.207:65031, localAddress=10.0.82.170:8081}, clientId=V-LVSHENG1@10572@1@x8pkjaacj4, language=JAVA, version=473, lastUpdateTimestamp=1730967025698] 2024-11-07 16:12:32 INFO GrpcClientSettingsManagerCleaner - remove unused grpc client settings. group:org.apache.rocketmq.broker.client.ConsumerGroupInfo@4f8c3973, settings:client_type: PUSH_CONSUMER access_point { scheme: IPv4 addresses { host: "10.0.82.170" port: 8081 } } backoff_policy { max_attempts: 17 customized_backoff { next { seconds: 1 } next { seconds: 5 } next { seconds: 10 } next { seconds: 30 } next { seconds: 60 } next { seconds: 120 } next { seconds: 180 } next { seconds: 240 } next { seconds: 300 } next { seconds: 360 } next { seconds: 420 } next { seconds: 480 } next { seconds: 540 } next { seconds: 600 } next { seconds: 1200 } next { seconds: 1800 } next { seconds: 3600 } next { seconds: 7200 } } } request_timeout { seconds: 3 } subscription { group { name: "test" } subscriptions { topic { name: "test" } expression { type: TAG expression: "*" } } fifo: false receive_batch_size: 32 long_polling_timeout { seconds: 20 } } user_agent { language: JAVA version: "5.0.6" platform: "Windows 10 10.0" hostname: "V-LVSHENG1" } 2024-11-07 16:12:58 ERROR GrpcRequestExecutorThread-12 - telemetry on error io.grpc.StatusRuntimeException: CANCELLED: client cancelled at io.grpc.Status.asRuntimeException(Status.java:530) at io.grpc.stub.ServerCalls$StreamingServerCallHandler$StreamingServerCallListener.onCancel(ServerCalls.java:291) at io.grpc.PartialForwardingServerCallListener.onCancel(PartialForwardingServerCallListener.java:40) at io.grpc.ForwardingServerCallListener.onCancel(ForwardingServerCallListener.java:23) at io.grpc.ForwardingServerCallListener$SimpleForwardingServerCallListener.onCancel(ForwardingServerCallListener.java:40) at io.grpc.PartialForwardingServerCallListener.onCancel(PartialForwardingServerCallListener.java:40) at io.grpc.ForwardingServerCallListener.onCancel(ForwardingServerCallListener.java:23) at io.grpc.ForwardingServerCallListener$SimpleForwardingServerCallListener.onCancel(ForwardingServerCallListener.java:40) at org.apache.rocketmq.proxy.grpc.interceptor.GlobalExceptionInterceptor$1.onCancel(GlobalExceptionInterceptor.java:65) at io.grpc.PartialForwardingServerCallListener.onCancel(PartialForwardingServerCallListener.java:40) at io.grpc.ForwardingServerCallListener.onCancel(ForwardingServerCallListener.java:23) at io.grpc.ForwardingServerCallListener$SimpleForwardingServerCallListener.onCancel(ForwardingServerCallListener.java:40) at io.grpc.Contexts$ContextualizedServerCallListener.onCancel(Contexts.java:96) at io.grpc.internal.ServerCallImpl$ServerStreamListenerImpl.closedInternal(ServerCallImpl.java:378) at io.grpc.internal.ServerCallImpl$ServerStreamListenerImpl.closed(ServerCallImpl.java:365) at io.grpc.internal.ServerImpl$JumpToApplicationThreadServerStreamListener$1Closed.runInContext(ServerImpl.java:923) at io.grpc.internal.ContextRunnable.run(ContextRunnable.java:37) at io.grpc.internal.SerializingExecutor.run(SerializingExecutor.java:133) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) at java.lang.Thread.run(Thread.java:750) 2024-11-07 16:13:12 ERROR ConsumerProcessorExecutor-0 - internal server error io.grpc.StatusRuntimeException: CANCELLED: call already cancelled. Use ServerCallStreamObserver.setOnCancelHandler() to disable this exception at io.grpc.Status.asRuntimeException(Status.java:530) at io.grpc.stub.ServerCalls$ServerCallStreamObserverImpl.onNext(ServerCalls.java:366) at org.apache.rocketmq.proxy.grpc.v2.consumer.ReceiveMessageResponseStreamWriter.writeAndComplete(ReceiveMessageResponseStreamWriter.java:95) at org.apache.rocketmq.proxy.grpc.v2.consumer.ReceiveMessageActivity.lambda$receiveMessage$0(ReceiveMessageActivity.java:150) at java.util.concurrent.CompletableFuture.uniAccept(CompletableFuture.java:670) at java.util.concurrent.CompletableFuture$UniAccept.tryFire(CompletableFuture.java:646) at java.util.concurrent.CompletableFuture.postComplete(CompletableFuture.java:488) at java.util.concurrent.CompletableFuture.complete(CompletableFuture.java:1975) at org.apache.rocketmq.common.utils.FutureUtils.lambda$appendNextFuture$0(FutureUtils.java:31) at java.util.concurrent.CompletableFuture.uniWhenComplete(CompletableFuture.java:774) at java.util.concurrent.CompletableFuture$UniWhenComplete.tryFire(CompletableFuture.java:750) at java.util.concurrent.CompletableFuture$Completion.run(CompletableFuture.java:456) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) at java.lang.Thread.run(Thread.java:750) 2024-11-07 16:13:12 WARN ConsumerProcessorExecutor-0 - client has cancelled the request. response to write: status { code: INTERNAL_SERVER_ERROR message: "CANCELLED: call already cancelled. Use ServerCallStreamObserver.setOnCancelHandler() to disable this exception. StatusRuntimeException. io.grpc.Status.asRuntimeException(Status.java:530)" } 2024-11-07 16:13:12 WARN ConsumerProcessorExecutor-0 - client has cancelled the request. response to write: delivery_timestamp { seconds: 1730967192 nanos: 766000000 } ``` 客户端发送心跳信息正常 ``` 2024-11-07 16:17:11.603 INFO [12364] [RocketmqClientAsyncWorker-1-16] [o.a.r.c.j.i.ClientImpl#?:?] - Send heartbeat successfully, endpoints=ipv4:10.0.82.170:8081, clientId=V-LVSHENG1@12364@1@x8qeno6tr0 ``` 客户端稳定每80s收到重新投递的消息 ``` 2024-11-07 16:35:56.614 INFO 30944 --- [onsumption-1-25] c.b.b.e.f.r.consumer.ConsumerClient : Consume message=MessageViewImpl{messageId=0A0052AA254E668BC3D5226EAC510057, topic=test, bornHost=10.0.82.170, bornTimestamp=1730968078417, endpoints=ipv4:10.0.82.170:8081, deliveryAttempt=7, tag=tag, keys=[key], messageGroup=null, deliveryTimestamp=null, properties={}} 2024-11-07 16:35:56.618 INFO 30944 --- [onsumption-1-25] c.b.b.e.f.r.consumer.ConsumerClient : message body: 唐宗宋祖 唐宗宋祖 2024-11-07 16:37:16.501 INFO 30944 --- [onsumption-1-26] c.b.b.e.f.r.consumer.ConsumerClient : Consume message=MessageViewImpl{messageId=0A0052AA254E668BC3D5226EAC510057, topic=test, bornHost=10.0.82.170, bornTimestamp=1730968078417, endpoints=ipv4:10.0.82.170:8081, deliveryAttempt=8, tag=tag, keys=[key], messageGroup=null, deliveryTimestamp=null, properties={}} 2024-11-07 16:37:16.502 INFO 30944 --- [onsumption-1-26] c.b.b.e.f.r.consumer.ConsumerClient : message body: 唐宗宋祖 唐宗宋祖 2024-11-07 16:38:36.387 INFO 30944 --- [onsumption-1-27] c.b.b.e.f.r.consumer.ConsumerClient : Consume message=MessageViewImpl{messageId=0A0052AA254E668BC3D5226EAC510057, topic=test, bornHost=10.0.82.170, bornTimestamp=1730968078417, endpoints=ipv4:10.0.82.170:8081, deliveryAttempt=9, tag=tag, keys=[key], messageGroup=null, deliveryTimestamp=null, properties={}} 2024-11-07 16:38:36.387 INFO 30944 --- [onsumption-1-27] c.b.b.e.f.r.consumer.ConsumerClient : message body: 唐宗宋祖 唐宗宋祖 2024-11-07 16:39:56.286 INFO 30944 --- [onsumption-1-28] c.b.b.e.f.r.consumer.ConsumerClient : Consume message=MessageViewImpl{messageId=0A0052AA254E668BC3D5226EAC510057, topic=test, bornHost=10.0.82.170, bornTimestamp=1730968078417, endpoints=ipv4:10.0.82.170:8081, deliveryAttempt=10, tag=tag, keys=[key], messageGroup=null, deliveryTimestamp=null, properties={}} 2024-11-07 16:39:56.286 INFO 30944 --- [onsumption-1-28] c.b.b.e.f.r.consumer.ConsumerClient : message body: 唐宗宋祖 唐宗宋祖 2024-11-07 16:41:16.154 INFO 30944 --- [onsumption-1-29] c.b.b.e.f.r.consumer.ConsumerClient : Consume message=MessageViewImpl{messageId=0A0052AA254E668BC3D5226EAC510057, topic=test, bornHost=10.0.82.170, bornTimestamp=1730968078417, endpoints=ipv4:10.0.82.170:8081, deliveryAttempt=11, tag=tag, keys=[key], messageGroup=null, deliveryTimestamp=null, properties={}} 2024-11-07 16:41:16.155 INFO 30944 --- [onsumption-1-29] c.b.b.e.f.r.consumer.ConsumerClient : message body: 唐宗宋祖 唐宗宋祖 2024-11-07 16:42:36.044 INFO 30944 --- [onsumption-1-30] c.b.b.e.f.r.consumer.ConsumerClient : Consume message=MessageViewImpl{messageId=0A0052AA254E668BC3D5226EAC510057, topic=test, bornHost=10.0.82.170, bornTimestamp=1730968078417, endpoints=ipv4:10.0.82.170:8081, deliveryAttempt=12, tag=tag, keys=[key], messageGroup=null, deliveryTimestamp=null, properties={}} 2024-11-07 16:42:36.044 INFO 30944 --- [onsumption-1-30] c.b.b.e.f.r.consumer.ConsumerClient : message body: 唐宗宋祖 唐宗宋祖 2024-11-07 16:43:55.932 INFO 30944 --- [onsumption-1-31] c.b.b.e.f.r.consumer.ConsumerClient : Consume message=MessageViewImpl{messageId=0A0052AA254E668BC3D5226EAC510057, topic=test, bornHost=10.0.82.170, bornTimestamp=1730968078417, endpoints=ipv4:10.0.82.170:8081, deliveryAttempt=13, tag=tag, keys=[key], messageGroup=null, deliveryTimestamp=null, properties={}} 2024-11-07 16:43:55.933 INFO 30944 --- [onsumption-1-31] c.b.b.e.f.r.consumer.ConsumerClient : message body: 唐宗宋祖 唐宗宋祖 2024-11-07 16:45:15.817 INFO 30944 --- [onsumption-1-32] c.b.b.e.f.r.consumer.ConsumerClient : Consume message=MessageViewImpl{messageId=0A0052AA254E668BC3D5226EAC510057, topic=test, bornHost=10.0.82.170, bornTimestamp=1730968078417, endpoints=ipv4:10.0.82.170:8081, deliveryAttempt=14, tag=tag, keys=[key], messageGroup=null, deliveryTimestamp=null, properties={}} 2024-11-07 16:45:15.817 INFO 30944 --- [onsumption-1-32] c.b.b.e.f.r.consumer.ConsumerClient : message body: 唐宗宋祖 唐宗宋祖 ``` ### Steps to Reproduce 1. 控制台发条消息 2. comsumer消费 消费端逻辑 ```java public class MyConsumer extends AbstractMqConsumer<String> { @Override public String getTopic() { return "test"; } @Override public ConsumeResult process(String message) { System.out.println(message); try { // 模拟我们的业务,耗时3分钟左右 Thread.sleep(200 * 1000); } catch (InterruptedException e) { throw new RuntimeException(e); } return ConsumeResult.SUCCESS; } } ``` 4. 消费者消费未超过15min分钟,过80s就收到重新投递的消息了 稳定复现 ### What Did You Expect to See? 消费者消费超时(默认15分钟)或失败才重新投递 ### What Did You See Instead? 消费者并没有超时,消费80s左右,就收到了重新投递的消息了 ### Additional Context _No response_ -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@rocketmq.apache.org.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org