Hi, I'm not sure, maybe someone will be able to help you, but it sounds like it would be better for you to: - google search something like "Kafka Error sending fetch request TimeoutException" (I see there are quite a lot of results, some of them might be related) - ask this question on the Kafka mailing list - ask this question on stackoverflow as a Kafka question
In short, FlinkKafkaConsumer is a very thin wrapper around the KafkaConsumer class, so the thing you are observing has most likely very little to do with the Flink itself. In other words, if you are observing such a problem you most likely would be possible to reproduce it without Flink. Best, Piotrek pt., 9 lip 2021 o 12:30 Rahul Patwari <rahulpatwari8...@gmail.com> napisaĆ(a): > Hi, > > We have a Flink 1.11.1 Version streaming pipeline in production which > reads from Kafka. > Kafka Server version is 2.5.0 - confluent 5.5.0 > Kafka Client Version is 2.4.1 - > {"component":"org.apache.kafka.common.utils.AppInfoParser$AppInfo","message":"Kafka > version: 2.4.1","method":"<init>"} > > Occasionally(every 6 to 12 hours), we have observed that the Kafka > consumption rate went down(NOT 0) and the following logs were observed: > Generally, the consumption rate across all consumers is 4k records/sec. > When this issue occurred, the consumption rate dropped to < 50 records/sec > > org.apache.kafka.common.errors.DisconnectException: null > > {"time":"2021-07-07T22:13:37,385","severity":"INFO","component":"org.apache.kafka.clients.FetchSessionHandler","message":"[Consumer > clientId=consumer-MFTDataProcessorEventSignatureConsumerGroupV1R1-3, > groupId=MFTDataProcessorEventSignatureConsumerGroupV1R1] Error sending > fetch request (sessionId=405798138, epoch=5808) to node 8: > {}.","method":"handleError"} > > org.apache.kafka.common.errors.TimeoutException: Failed > > {"time":"2021-07-07T22:26:41,379","severity":"INFO","component":"org.apache.kafka.clients.consumer.internals.AbstractCoordinator","message":"[Consumer > clientId=consumer-MFTDataProcessorEventSignatureConsumerGroupV1R1-3, > groupId=MFTDataProcessorEventSignatureConsumerGroupV1R1] Group coordinator > 100.98.40.16:9092 (id: 2147483623 rack: null) is unavailable or invalid, > will attempt rediscovery","method":"markCoordinatorUnknown"} > > {"time":"2021-07-07T22:27:10,465","severity":"INFO","component":"org.apache.kafka.clients.consumer.internals.AbstractCoordinator$FindCoordinatorResponseHandler","message":"[Consumer > clientId=consumer-MFTDataProcessorEventSignatureConsumerGroupV1R1-3, > groupId=MFTDataProcessorEventSignatureConsumerGroupV1R1] Discovered group > coordinator 100.98.40.16:9092 (id: 2147483623 rack: > null)","method":"onSuccess"} > > The consumers retried for more than an hour but the above logs are > observed again. > The consumers started pulling data after a manual restart. > > No WARN or ERROR logs were observed in Kafka or Zookeeper during this > period. > > Our observation from this incident is that Kafka Consumer retries could > not resolve the issue but a manual restart (or) Flink internal > restart(Failure rate restart policy) does. > > Has anyone faced this issue before? Any pointers are appreciated. > > Regards, > Rahul >