Hi,

I'm not sure, maybe someone will be able to help you, but it sounds like it
would be better for you to:
- google search something like "Kafka Error sending fetch request
TimeoutException" (I see there are quite a lot of results, some of them
might be related)
- ask this question on the Kafka mailing list
- ask this question on stackoverflow as a Kafka question

In short, FlinkKafkaConsumer is a very thin wrapper around the
KafkaConsumer class, so the thing you are observing has most likely very
little to do with the Flink itself. In other words, if you are observing
such a problem you most likely would be possible to reproduce it without
Flink.

Best,
Piotrek

pt., 9 lip 2021 o 12:30 Rahul Patwari <rahulpatwari8...@gmail.com>
napisaƂ(a):

> Hi,
>
> We have a Flink 1.11.1 Version streaming pipeline in production which
> reads from Kafka.
> Kafka Server version is 2.5.0 - confluent 5.5.0
> Kafka Client Version is 2.4.1 - 
> {"component":"org.apache.kafka.common.utils.AppInfoParser$AppInfo","message":"Kafka
> version: 2.4.1","method":"<init>"}
>
> Occasionally(every 6 to 12 hours), we have observed that the Kafka
> consumption rate went down(NOT 0) and the following logs were observed:
> Generally, the consumption rate across all consumers is 4k records/sec.
> When this issue occurred, the consumption rate dropped to < 50 records/sec
>
> org.apache.kafka.common.errors.DisconnectException: null
>
> {"time":"2021-07-07T22:13:37,385","severity":"INFO","component":"org.apache.kafka.clients.FetchSessionHandler","message":"[Consumer
> clientId=consumer-MFTDataProcessorEventSignatureConsumerGroupV1R1-3,
> groupId=MFTDataProcessorEventSignatureConsumerGroupV1R1] Error sending
> fetch request (sessionId=405798138, epoch=5808) to node 8:
> {}.","method":"handleError"}
>
> org.apache.kafka.common.errors.TimeoutException: Failed
>
> {"time":"2021-07-07T22:26:41,379","severity":"INFO","component":"org.apache.kafka.clients.consumer.internals.AbstractCoordinator","message":"[Consumer
> clientId=consumer-MFTDataProcessorEventSignatureConsumerGroupV1R1-3,
> groupId=MFTDataProcessorEventSignatureConsumerGroupV1R1] Group coordinator
> 100.98.40.16:9092 (id: 2147483623 rack: null) is unavailable or invalid,
> will attempt rediscovery","method":"markCoordinatorUnknown"}
>
> {"time":"2021-07-07T22:27:10,465","severity":"INFO","component":"org.apache.kafka.clients.consumer.internals.AbstractCoordinator$FindCoordinatorResponseHandler","message":"[Consumer
> clientId=consumer-MFTDataProcessorEventSignatureConsumerGroupV1R1-3,
> groupId=MFTDataProcessorEventSignatureConsumerGroupV1R1] Discovered group
> coordinator 100.98.40.16:9092 (id: 2147483623 rack:
> null)","method":"onSuccess"}
>
> The consumers retried for more than an hour but the above logs are
> observed again.
> The consumers started pulling data after a manual restart.
>
> No WARN or ERROR logs were observed in Kafka or Zookeeper during this
> period.
>
> Our observation from this incident is that Kafka Consumer retries could
> not resolve the issue but a manual restart (or) Flink internal
> restart(Failure rate restart policy) does.
>
> Has anyone faced this issue before? Any pointers are appreciated.
>
> Regards,
> Rahul
>

Reply via email to