[ https://issues.apache.org/jira/browse/FLINK-25293?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ]
rerorero updated FLINK-25293: ----------------------------- Description: Is it possible to let KafkaSource fail if it keeps failing to commit offset? I faced an issue where KafkaSource keeps failing and never recover, while it's logging like these logs: {code:java} 2021-12-08 22:18:34,155 INFO org.apache.kafka.clients.consumer.internals.AbstractCoordinator [] - [Consumer clientId=dbz-mercari-contact-tool-jp-cg-1, groupId=dbz-mercari-contact-tool-jp-cg] Group coordinator b4-pkc-xmj7g.asia-northeast1.gcp.confluent.cloud:9092 (id: 2147483643 rack: null) is unavailable or invalid due to cause: null.isDisconnected: true. Rediscovery will be attempted. 2021-12-08 22:18:34,157 WARN org.apache.flink.connector.kafka.source.reader.KafkaSourceReader [] - Failed to commit consumer offsets for checkpoint 13 {code} This is happening not just once, but a couple of times a week. I found [other people reporting the same thing]([https://lists.apache.org/thread/8l4f2yb4qwysdn1cj1wjk99tfb79kgs2]). This could possibly be a problem with the Kafka client. It can be resolved by restarting the Flink Job. However, Flink Kafka connector doesn't provide an automatic way to save this situation. KafkaSource [keeps retrying forever when a retriable error occurs]([https://github.com/apache/flink/blob/afb29d92c4e76ec6a453459c3d8a08304efec549/flink-connectors/flink-connector-kafka/src/main/java/org/apache/flink/connector/kafka/source/reader/KafkaSourceReader.java#L144-L148]], even if it is not retriable actually. Since it sends metrics of the number of times a commit fails, it could be automated by monitoring it and restarting the job, but that would mean we need to have a new process to manage. Does it make sense to have KafkaSource have the option like, let the source task fail if it keeps failing to commit an offset more than X times? was: Is it possible to let KafkaSource fail if it keeps failing to commit offset? I faced an issue where KafkaSource keeps failing and never recover, while it's logging like these logs: {code:java} 2021-12-08 22:18:34,155 INFO org.apache.kafka.clients.consumer.internals.AbstractCoordinator [] - [Consumer clientId=dbz-mercari-contact-tool-jp-cg-1, groupId=dbz-mercari-contact-tool-jp-cg] Group coordinator b4-pkc-xmj7g.asia-northeast1.gcp.confluent.cloud:9092 (id: 2147483643 rack: null) is unavailable or invalid due to cause: null.isDisconnected: true. Rediscovery will be attempted. 2021-12-08 22:18:34,157 WARN org.apache.flink.connector.kafka.source.reader.KafkaSourceReader [] - Failed to commit consumer offsets for checkpoint 13 {code} This is happening not just once, but a couple of times a week. I found [other people reporting the same thing](https://lists.apache.org/thread/8l4f2yb4qwysdn1cj1wjk99tfb79kgs2). This could possibly be a problem with the Kafka client. It can be resolved by restarting the Flink Job. However, Flink Kafka connector doesn't provide an automatic way to save this situation. KafkaSource [keeps retrying forever when a retriable error occurs](https://github.com/apache/flink/blob/afb29d92c4e76ec6a453459c3d8a08304efec549/flink-connectors/flink-connector-kafka/src/main/java/org/apache/flink/connector/kafka/source/reader/KafkaSourceReader.java#L144-L148], even if it is not retriable actually. Since it sends metrics of the number of times a commit fails, it could be automated by monitoring it and restarting the job, but that would mean we need to have a new process to manage. Does it make sense to have KafkaSource have the option like, let the source task fail if it keeps failing to commit an offset more than X times? > Option to let fail if KafkaSource keeps failing to commit offset > ---------------------------------------------------------------- > > Key: FLINK-25293 > URL: https://issues.apache.org/jira/browse/FLINK-25293 > Project: Flink > Issue Type: Improvement > Components: Connectors / Kafka > Affects Versions: 1.14.0 > Environment: Flink 1.14.0 > Reporter: rerorero > Priority: Major > > Is it possible to let KafkaSource fail if it keeps failing to commit offset? > > I faced an issue where KafkaSource keeps failing and never recover, while > it's logging like these logs: > {code:java} > 2021-12-08 22:18:34,155 INFO > org.apache.kafka.clients.consumer.internals.AbstractCoordinator [] - > [Consumer clientId=dbz-mercari-contact-tool-jp-cg-1, > groupId=dbz-mercari-contact-tool-jp-cg] Group coordinator > b4-pkc-xmj7g.asia-northeast1.gcp.confluent.cloud:9092 (id: 2147483643 rack: > null) is unavailable or invalid due to cause: null.isDisconnected: true. > Rediscovery will be attempted. > 2021-12-08 22:18:34,157 WARN > org.apache.flink.connector.kafka.source.reader.KafkaSourceReader [] - Failed > to commit consumer offsets for checkpoint 13 {code} > This is happening not just once, but a couple of times a week. I found [other > people reporting the same > thing]([https://lists.apache.org/thread/8l4f2yb4qwysdn1cj1wjk99tfb79kgs2]). > This could possibly be a problem with the Kafka client. It can be resolved by > restarting the Flink Job. > However, Flink Kafka connector doesn't provide an automatic way to save this > situation. KafkaSource [keeps retrying forever when a retriable error > occurs]([https://github.com/apache/flink/blob/afb29d92c4e76ec6a453459c3d8a08304efec549/flink-connectors/flink-connector-kafka/src/main/java/org/apache/flink/connector/kafka/source/reader/KafkaSourceReader.java#L144-L148]], > even if it is not retriable actually. > Since it sends metrics of the number of times a commit fails, it could be > automated by monitoring it and restarting the job, but that would mean we > need to have a new process to manage. > Does it make sense to have KafkaSource have the option like, let the source > task fail if it keeps failing to commit an offset more than X times? -- This message was sent by Atlassian Jira (v8.20.1#820001)