bruce256 opened a new issue, #8900:
URL: https://github.com/apache/rocketmq/issues/8900

   ### Before Creating the Bug Report
   
   - [X] I found a bug, not just asking a question, which should be created in 
[GitHub Discussions](https://github.com/apache/rocketmq/discussions).
   
   - [X] I have searched the [GitHub 
Issues](https://github.com/apache/rocketmq/issues) and [GitHub 
Discussions](https://github.com/apache/rocketmq/discussions)  of this 
repository and believe that this is not a duplicate.
   
   - [X] I have confirmed that this bug belongs to the current repository, not 
other repositories of RocketMQ.
   
   
   ### Runtime platform environment
   
   centos 7.9
   
   ### RocketMQ version
   
   rocketmq 5.3.0
   rocketmq-client 5.0.6
   
   ### JDK Version
   
   jdk 1.8
   
   ### Describe the Bug
   
   
本地消息消费成功,但是broker依然重复投递消息,直到进入死信队列。与此同时,proxy.log报如下异常堆栈。客户端到proxy的端口网络正常.刚开始部署的时候是好的,近期才出现
   
   ```
   2024-11-07 16:12:31 INFO pool-4-thread-1 - clear handle of this client when 
client unregister. group:test, clientChannelInfo:ClientChannelInfo 
[channel=GrpcClientChannel{clientId=V-LVSHENG1@10572@1@x8pkjaacj4, 
remoteAddress=10.43.165.207:65031, localAddress=10.0.82.170:8081}, 
clientId=V-LVSHENG1@10572@1@x8pkjaacj4, language=JAVA, version=473, 
lastUpdateTimestamp=1730967025698]
   2024-11-07 16:12:31 INFO pool-4-thread-1 - remove grpc channel when client 
unregister. group:test, clientChannelInfo:ClientChannelInfo 
[channel=GrpcClientChannel{clientId=V-LVSHENG1@10572@1@x8pkjaacj4, 
remoteAddress=10.43.165.207:65031, localAddress=10.0.82.170:8081}, 
clientId=V-LVSHENG1@10572@1@x8pkjaacj4, language=JAVA, version=473, 
lastUpdateTimestamp=1730967025698], removed:true
   2024-11-07 16:12:31 INFO pool-4-thread-1 - remove remoting channel when 
client unregister. clientChannelInfo:ClientChannelInfo 
[channel=GrpcClientChannel{clientId=V-LVSHENG1@10572@1@x8pkjaacj4, 
remoteAddress=10.43.165.207:65031, localAddress=10.0.82.170:8081}, 
clientId=V-LVSHENG1@10572@1@x8pkjaacj4, language=JAVA, version=473, 
lastUpdateTimestamp=1730967025698]
   2024-11-07 16:12:32 INFO GrpcClientSettingsManagerCleaner - remove unused 
grpc client settings. 
group:org.apache.rocketmq.broker.client.ConsumerGroupInfo@4f8c3973, 
settings:client_type: PUSH_CONSUMER
   access_point {
     scheme: IPv4
     addresses {
       host: "10.0.82.170"
       port: 8081
     }
   }
   backoff_policy {
     max_attempts: 17
     customized_backoff {
       next {
         seconds: 1
       }
       next {
         seconds: 5
       }
       next {
         seconds: 10
       }
       next {
         seconds: 30
       }
       next {
         seconds: 60
       }
       next {
         seconds: 120
       }
       next {
         seconds: 180
       }
       next {
         seconds: 240
       }
       next {
         seconds: 300
       }
       next {
         seconds: 360
       }
       next {
         seconds: 420
       }
       next {
         seconds: 480
       }
       next {
         seconds: 540
       }
       next {
         seconds: 600
       }
       next {
         seconds: 1200
       }
       next {
         seconds: 1800
       }
       next {
         seconds: 3600
       }
       next {
         seconds: 7200
       }
     }
   }
   request_timeout {
     seconds: 3
   }
   subscription {
     group {
       name: "test"
     }
     subscriptions {
       topic {
         name: "test"
       }
       expression {
         type: TAG
         expression: "*"
       }
     }
     fifo: false
     receive_batch_size: 32
     long_polling_timeout {
       seconds: 20
     }
   }
   user_agent {
     language: JAVA
     version: "5.0.6"
     platform: "Windows 10 10.0"
     hostname: "V-LVSHENG1"
   }
   
   2024-11-07 16:12:58 ERROR GrpcRequestExecutorThread-12 - telemetry on error
   io.grpc.StatusRuntimeException: CANCELLED: client cancelled
           at io.grpc.Status.asRuntimeException(Status.java:530)
           at 
io.grpc.stub.ServerCalls$StreamingServerCallHandler$StreamingServerCallListener.onCancel(ServerCalls.java:291)
           at 
io.grpc.PartialForwardingServerCallListener.onCancel(PartialForwardingServerCallListener.java:40)
           at 
io.grpc.ForwardingServerCallListener.onCancel(ForwardingServerCallListener.java:23)
           at 
io.grpc.ForwardingServerCallListener$SimpleForwardingServerCallListener.onCancel(ForwardingServerCallListener.java:40)
           at 
io.grpc.PartialForwardingServerCallListener.onCancel(PartialForwardingServerCallListener.java:40)
           at 
io.grpc.ForwardingServerCallListener.onCancel(ForwardingServerCallListener.java:23)
           at 
io.grpc.ForwardingServerCallListener$SimpleForwardingServerCallListener.onCancel(ForwardingServerCallListener.java:40)
           at 
org.apache.rocketmq.proxy.grpc.interceptor.GlobalExceptionInterceptor$1.onCancel(GlobalExceptionInterceptor.java:65)
           at 
io.grpc.PartialForwardingServerCallListener.onCancel(PartialForwardingServerCallListener.java:40)
           at 
io.grpc.ForwardingServerCallListener.onCancel(ForwardingServerCallListener.java:23)
           at 
io.grpc.ForwardingServerCallListener$SimpleForwardingServerCallListener.onCancel(ForwardingServerCallListener.java:40)
           at 
io.grpc.Contexts$ContextualizedServerCallListener.onCancel(Contexts.java:96)
           at 
io.grpc.internal.ServerCallImpl$ServerStreamListenerImpl.closedInternal(ServerCallImpl.java:378)
           at 
io.grpc.internal.ServerCallImpl$ServerStreamListenerImpl.closed(ServerCallImpl.java:365)
           at 
io.grpc.internal.ServerImpl$JumpToApplicationThreadServerStreamListener$1Closed.runInContext(ServerImpl.java:923)
           at io.grpc.internal.ContextRunnable.run(ContextRunnable.java:37)
           at 
io.grpc.internal.SerializingExecutor.run(SerializingExecutor.java:133)
           at 
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
           at 
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
           at java.lang.Thread.run(Thread.java:750)
   2024-11-07 16:13:12 ERROR ConsumerProcessorExecutor-0 - internal server error
   io.grpc.StatusRuntimeException: CANCELLED: call already cancelled. Use 
ServerCallStreamObserver.setOnCancelHandler() to disable this exception
           at io.grpc.Status.asRuntimeException(Status.java:530)
           at 
io.grpc.stub.ServerCalls$ServerCallStreamObserverImpl.onNext(ServerCalls.java:366)
           at 
org.apache.rocketmq.proxy.grpc.v2.consumer.ReceiveMessageResponseStreamWriter.writeAndComplete(ReceiveMessageResponseStreamWriter.java:95)
           at 
org.apache.rocketmq.proxy.grpc.v2.consumer.ReceiveMessageActivity.lambda$receiveMessage$0(ReceiveMessageActivity.java:150)
           at 
java.util.concurrent.CompletableFuture.uniAccept(CompletableFuture.java:670)
           at 
java.util.concurrent.CompletableFuture$UniAccept.tryFire(CompletableFuture.java:646)
           at 
java.util.concurrent.CompletableFuture.postComplete(CompletableFuture.java:488)
           at 
java.util.concurrent.CompletableFuture.complete(CompletableFuture.java:1975)
           at 
org.apache.rocketmq.common.utils.FutureUtils.lambda$appendNextFuture$0(FutureUtils.java:31)
           at 
java.util.concurrent.CompletableFuture.uniWhenComplete(CompletableFuture.java:774)
           at 
java.util.concurrent.CompletableFuture$UniWhenComplete.tryFire(CompletableFuture.java:750)
           at 
java.util.concurrent.CompletableFuture$Completion.run(CompletableFuture.java:456)
           at 
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
           at 
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
           at java.lang.Thread.run(Thread.java:750)
   2024-11-07 16:13:12 WARN ConsumerProcessorExecutor-0 - client has cancelled 
the request. response to write: status {
     code: INTERNAL_SERVER_ERROR
     message: "CANCELLED: call already cancelled. Use 
ServerCallStreamObserver.setOnCancelHandler() to disable this exception. 
StatusRuntimeException. io.grpc.Status.asRuntimeException(Status.java:530)"
   }
   
   2024-11-07 16:13:12 WARN ConsumerProcessorExecutor-0 - client has cancelled 
the request. response to write: delivery_timestamp {
     seconds: 1730967192
     nanos: 766000000
   }
   
   ```
   
   客户端发送心跳信息正常
   ```
   2024-11-07 16:17:11.603 INFO  [12364] [RocketmqClientAsyncWorker-1-16] 
[o.a.r.c.j.i.ClientImpl#?:?] - Send heartbeat successfully, 
endpoints=ipv4:10.0.82.170:8081, clientId=V-LVSHENG1@12364@1@x8qeno6tr0
   ```
   
   客户端稳定每80s收到重新投递的消息
   ```
   2024-11-07 16:35:56.614  INFO 30944 --- [onsumption-1-25] 
c.b.b.e.f.r.consumer.ConsumerClient      : Consume 
message=MessageViewImpl{messageId=0A0052AA254E668BC3D5226EAC510057, topic=test, 
bornHost=10.0.82.170, bornTimestamp=1730968078417, 
endpoints=ipv4:10.0.82.170:8081, deliveryAttempt=7, tag=tag, keys=[key], 
messageGroup=null, deliveryTimestamp=null, properties={}}
   2024-11-07 16:35:56.618  INFO 30944 --- [onsumption-1-25] 
c.b.b.e.f.r.consumer.ConsumerClient      : message body: 唐宗宋祖
   唐宗宋祖
   2024-11-07 16:37:16.501  INFO 30944 --- [onsumption-1-26] 
c.b.b.e.f.r.consumer.ConsumerClient      : Consume 
message=MessageViewImpl{messageId=0A0052AA254E668BC3D5226EAC510057, topic=test, 
bornHost=10.0.82.170, bornTimestamp=1730968078417, 
endpoints=ipv4:10.0.82.170:8081, deliveryAttempt=8, tag=tag, keys=[key], 
messageGroup=null, deliveryTimestamp=null, properties={}}
   2024-11-07 16:37:16.502  INFO 30944 --- [onsumption-1-26] 
c.b.b.e.f.r.consumer.ConsumerClient      : message body: 唐宗宋祖
   唐宗宋祖
   2024-11-07 16:38:36.387  INFO 30944 --- [onsumption-1-27] 
c.b.b.e.f.r.consumer.ConsumerClient      : Consume 
message=MessageViewImpl{messageId=0A0052AA254E668BC3D5226EAC510057, topic=test, 
bornHost=10.0.82.170, bornTimestamp=1730968078417, 
endpoints=ipv4:10.0.82.170:8081, deliveryAttempt=9, tag=tag, keys=[key], 
messageGroup=null, deliveryTimestamp=null, properties={}}
   2024-11-07 16:38:36.387  INFO 30944 --- [onsumption-1-27] 
c.b.b.e.f.r.consumer.ConsumerClient      : message body: 唐宗宋祖
   唐宗宋祖
   2024-11-07 16:39:56.286  INFO 30944 --- [onsumption-1-28] 
c.b.b.e.f.r.consumer.ConsumerClient      : Consume 
message=MessageViewImpl{messageId=0A0052AA254E668BC3D5226EAC510057, topic=test, 
bornHost=10.0.82.170, bornTimestamp=1730968078417, 
endpoints=ipv4:10.0.82.170:8081, deliveryAttempt=10, tag=tag, keys=[key], 
messageGroup=null, deliveryTimestamp=null, properties={}}
   2024-11-07 16:39:56.286  INFO 30944 --- [onsumption-1-28] 
c.b.b.e.f.r.consumer.ConsumerClient      : message body: 唐宗宋祖
   唐宗宋祖
   2024-11-07 16:41:16.154  INFO 30944 --- [onsumption-1-29] 
c.b.b.e.f.r.consumer.ConsumerClient      : Consume 
message=MessageViewImpl{messageId=0A0052AA254E668BC3D5226EAC510057, topic=test, 
bornHost=10.0.82.170, bornTimestamp=1730968078417, 
endpoints=ipv4:10.0.82.170:8081, deliveryAttempt=11, tag=tag, keys=[key], 
messageGroup=null, deliveryTimestamp=null, properties={}}
   2024-11-07 16:41:16.155  INFO 30944 --- [onsumption-1-29] 
c.b.b.e.f.r.consumer.ConsumerClient      : message body: 唐宗宋祖
   唐宗宋祖
   2024-11-07 16:42:36.044  INFO 30944 --- [onsumption-1-30] 
c.b.b.e.f.r.consumer.ConsumerClient      : Consume 
message=MessageViewImpl{messageId=0A0052AA254E668BC3D5226EAC510057, topic=test, 
bornHost=10.0.82.170, bornTimestamp=1730968078417, 
endpoints=ipv4:10.0.82.170:8081, deliveryAttempt=12, tag=tag, keys=[key], 
messageGroup=null, deliveryTimestamp=null, properties={}}
   2024-11-07 16:42:36.044  INFO 30944 --- [onsumption-1-30] 
c.b.b.e.f.r.consumer.ConsumerClient      : message body: 唐宗宋祖
   唐宗宋祖
   2024-11-07 16:43:55.932  INFO 30944 --- [onsumption-1-31] 
c.b.b.e.f.r.consumer.ConsumerClient      : Consume 
message=MessageViewImpl{messageId=0A0052AA254E668BC3D5226EAC510057, topic=test, 
bornHost=10.0.82.170, bornTimestamp=1730968078417, 
endpoints=ipv4:10.0.82.170:8081, deliveryAttempt=13, tag=tag, keys=[key], 
messageGroup=null, deliveryTimestamp=null, properties={}}
   2024-11-07 16:43:55.933  INFO 30944 --- [onsumption-1-31] 
c.b.b.e.f.r.consumer.ConsumerClient      : message body: 唐宗宋祖
   唐宗宋祖
   2024-11-07 16:45:15.817  INFO 30944 --- [onsumption-1-32] 
c.b.b.e.f.r.consumer.ConsumerClient      : Consume 
message=MessageViewImpl{messageId=0A0052AA254E668BC3D5226EAC510057, topic=test, 
bornHost=10.0.82.170, bornTimestamp=1730968078417, 
endpoints=ipv4:10.0.82.170:8081, deliveryAttempt=14, tag=tag, keys=[key], 
messageGroup=null, deliveryTimestamp=null, properties={}}
   2024-11-07 16:45:15.817  INFO 30944 --- [onsumption-1-32] 
c.b.b.e.f.r.consumer.ConsumerClient      : message body: 唐宗宋祖
   唐宗宋祖
   
   ```
   
   ### Steps to Reproduce
   
   1. 控制台发条消息
   2. comsumer消费
   消费端逻辑
   ```java 
   public class MyConsumer extends AbstractMqConsumer<String> {
   
       @Override
       public String getTopic() {
           return "test";
       }
   
       @Override
       public ConsumeResult process(String message) {
           System.out.println(message);
   
           try {
               // 模拟我们的业务,耗时3分钟左右
               Thread.sleep(200 * 1000);
           } catch (InterruptedException e) {
               throw new RuntimeException(e);
           }
           return ConsumeResult.SUCCESS;
       }
   }
   
   ```
   4. 消费者消费未超过15min分钟,过80s就收到重新投递的消息了
   稳定复现
   
   ### What Did You Expect to See?
   
   消费者消费超时(默认15分钟)或失败才重新投递
   
   ### What Did You See Instead?
   
   消费者并没有超时,消费80s左右,就收到了重新投递的消息了
   
   ### Additional Context
   
   _No response_


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@rocketmq.apache.org.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to