[ https://issues.apache.org/jira/browse/FLINK-25293?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17464410#comment-17464410 ]
Qingsheng Ren commented on FLINK-25293: --------------------------------------- Thanks for the clarification [~rero]~ From your descriptions it's hard to tell the root cause of the group coordinator disconnection problem. Maybe some logs from both broker and consumer side would be helpful. Considering the proposal of failing the task if offset commit fails for several times, I still think that this will introduce instability to a Flink job that could have run continuously without a failover. Failing the Flink job is not quite helpful to solve a problem that is actually a Kafka-related issue. > Option to let fail if KafkaSource keeps failing to commit offset > ---------------------------------------------------------------- > > Key: FLINK-25293 > URL: https://issues.apache.org/jira/browse/FLINK-25293 > Project: Flink > Issue Type: Improvement > Components: Connectors / Kafka > Affects Versions: 1.14.0 > Environment: Flink 1.14.0 > Reporter: rerorero > Priority: Major > > Is it possible to let KafkaSource fail if it keeps failing to commit offset? > > I faced an issue where KafkaSource keeps failing and never recover, while > it's logging like these logs: > {code:java} > 2021-12-08 22:18:34,155 INFO > org.apache.kafka.clients.consumer.internals.AbstractCoordinator [] - > [Consumer clientId=dbz-cg-1, groupId=dbz-cg] Group coordinator > b4-pkc-xxxxx.asia-northeast1.gcp.confluent.cloud:9092 (id: 2147483643 rack: > null) is unavailable or invalid due to cause: null.isDisconnected: true. > Rediscovery will be attempted. > 2021-12-08 22:18:34,157 WARN > org.apache.flink.connector.kafka.source.reader.KafkaSourceReader [] - Failed > to commit consumer offsets for checkpoint 13 {code} > This is happening not just once, but a couple of times a week (it happens > when the Kafka broker performs rolling restart). It can be recovered by > restarting the Flink Job. > I found other people reporting the similar thing: > [https://lists.apache.org/thread/8l4f2yb4qwysdn1cj1wjk99tfb79kgs2]. This > could possibly be a problem with the Kafka client, and of course, the problem > should be fixed on Kafka side if so. > However, Flink Kafka connector doesn't provide an automatic way to save this > situation. KafkaSource keeps retrying forever when a retriable error occurs, > even if it is not retriable actually: > [https://github.com/apache/flink/blob/afb29d92c4e76ec6a453459c3d8a08304efec549/flink-connectors/flink-connector-kafka/src/main/java/org/apache/flink/connector/kafka/source/reader/KafkaSourceReader.java#L144-L148] > Since it sends metrics of the number of times a commit fails, it could be > automated by monitoring it and restarting the job, but that would mean we > need to have a new process to be managed. > Does it make sense to have KafkaSource have the option like, let the source > task fail if it keeps failing to commit an offset more than X times? -- This message was sent by Atlassian Jira (v8.20.1#820001)