[ 
https://issues.apache.org/jira/browse/FLINK-25293?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17463569#comment-17463569
 ] 

Qingsheng Ren commented on FLINK-25293:
---------------------------------------

Thanks for the ticket [~rero] ! 

Basically Flink doesn't rely on committed offsets for fault tolerance, so we 
don't treat it as a severe failure if commit fails since it doesn't break the 
correctness of Flink job. 

I'm a little bit wondering about your usage of KafkaSource. Is there any cases 
that the offset commit failure can only be recovered by restarting the Flink 
job? If the purpose of failing the Flink job is just to alert user about the 
offset commit failure, I think a better approach is to setup a metric system 
and create an alarm to monitor metric "commitsFailed".

> Option to let fail if KafkaSource keeps failing to commit offset
> ----------------------------------------------------------------
>
>                 Key: FLINK-25293
>                 URL: https://issues.apache.org/jira/browse/FLINK-25293
>             Project: Flink
>          Issue Type: Improvement
>          Components: Connectors / Kafka
>    Affects Versions: 1.14.0
>         Environment: Flink 1.14.0
>            Reporter: rerorero
>            Priority: Major
>
> Is it possible to let KafkaSource fail if it keeps failing to commit offset?
>  
> I faced an issue where KafkaSource keeps failing and never recover, while 
> it's logging like these logs:
> {code:java}
> 2021-12-08 22:18:34,155 INFO  
> org.apache.kafka.clients.consumer.internals.AbstractCoordinator [] - 
> [Consumer clientId=dbz-cg-1, groupId=dbz-cg] Group coordinator 
> b4-pkc-xxxxx.asia-northeast1.gcp.confluent.cloud:9092 (id: 2147483643 rack: 
> null) is unavailable or invalid due to cause: null.isDisconnected: true. 
> Rediscovery will be attempted.
> 2021-12-08 22:18:34,157 WARN  
> org.apache.flink.connector.kafka.source.reader.KafkaSourceReader [] - Failed 
> to commit consumer offsets for checkpoint 13 {code}
> This is happening not just once, but a couple of times a week (it happens 
> when the Kafka broker performs rolling restart). It can be recovered by 
> restarting the Flink Job.
> I found other people reporting the similar thing: 
> [https://lists.apache.org/thread/8l4f2yb4qwysdn1cj1wjk99tfb79kgs2]. This 
> could possibly be a problem with the Kafka client, and of course, the problem 
> should be fixed on Kafka side if so.
> However, Flink Kafka connector doesn't provide an automatic way to save this 
> situation. KafkaSource keeps retrying forever when a retriable error occurs, 
> even if it is not retriable actually: 
> [https://github.com/apache/flink/blob/afb29d92c4e76ec6a453459c3d8a08304efec549/flink-connectors/flink-connector-kafka/src/main/java/org/apache/flink/connector/kafka/source/reader/KafkaSourceReader.java#L144-L148]
> Since it sends metrics of the number of times a commit fails, it could be 
> automated by monitoring it and restarting the job, but that would mean we 
> need to have a new process to be managed.
> Does it make sense to have KafkaSource have the option like, let the source 
> task fail if it keeps failing to commit an offset more than X times?



--
This message was sent by Atlassian Jira
(v8.20.1#820001)

Reply via email to