[ 
https://issues.apache.org/jira/browse/FLINK-25293?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17464297#comment-17464297
 ] 

rerorero commented on FLINK-25293:
----------------------------------

Thank you for taking a look.

 

> since it doesn't break the correctness of Flink job. 

I got it.

 

>  Is there any cases that the offset commit failure can only be recovered by 
> restarting the Flink job

I guess my case is an issue that is not supposed to happen, so it might be a 
bug. I use KafkaSource with Confluent Cloud for a simple stream job, which just 
does some simple transforms and routes a message to another storage system, no 
window processing is used either.  But this problem happens once or twice a 
week. I contacted Confluent support team and found that the rolling restart of 
brokers was happening every time I faced the issue.

According to the log, after the client is disconnected from the broker, the 
group coordinator never recovers and keeps unavailable. This seems to be a 
problem that should be solved by client retries, but no luck in my case. All 
props I specified for the Kafka consumer is the following:

 
{code:java}
    props.setProperty("bootstrap.servers", bootstrapServers)
    props.setProperty("group.id", consumerGroup)
    props.setProperty("request.timeout.ms", "20000")
    props.setProperty("retry.backoff.ms", "500")
    props.setProperty("partition.discovery.interval.ms", "10000")
    props.setProperty("ssl.endpoint.identification.algorithm", "https")
    props.setProperty("security.protocol", "SASL_SSL")
    props.setProperty("sasl.mechanism", "PLAIN")
    props.setProperty(
      "sasl.jaas.config",
      s"""org.apache.kafka.common.security.plain.PlainLoginModule required 
username="$username" password="$password";"""
    )
{code}
I'm not arguing whether this is a bug or not, but I'd just pose if it makes 
sense to give developers more options when KafkaSource keeps failing to commit. 
At least I'm facing this unexpected case...

 

 

 

> Option to let fail if KafkaSource keeps failing to commit offset
> ----------------------------------------------------------------
>
>                 Key: FLINK-25293
>                 URL: https://issues.apache.org/jira/browse/FLINK-25293
>             Project: Flink
>          Issue Type: Improvement
>          Components: Connectors / Kafka
>    Affects Versions: 1.14.0
>         Environment: Flink 1.14.0
>            Reporter: rerorero
>            Priority: Major
>
> Is it possible to let KafkaSource fail if it keeps failing to commit offset?
>  
> I faced an issue where KafkaSource keeps failing and never recover, while 
> it's logging like these logs:
> {code:java}
> 2021-12-08 22:18:34,155 INFO  
> org.apache.kafka.clients.consumer.internals.AbstractCoordinator [] - 
> [Consumer clientId=dbz-cg-1, groupId=dbz-cg] Group coordinator 
> b4-pkc-xxxxx.asia-northeast1.gcp.confluent.cloud:9092 (id: 2147483643 rack: 
> null) is unavailable or invalid due to cause: null.isDisconnected: true. 
> Rediscovery will be attempted.
> 2021-12-08 22:18:34,157 WARN  
> org.apache.flink.connector.kafka.source.reader.KafkaSourceReader [] - Failed 
> to commit consumer offsets for checkpoint 13 {code}
> This is happening not just once, but a couple of times a week (it happens 
> when the Kafka broker performs rolling restart). It can be recovered by 
> restarting the Flink Job.
> I found other people reporting the similar thing: 
> [https://lists.apache.org/thread/8l4f2yb4qwysdn1cj1wjk99tfb79kgs2]. This 
> could possibly be a problem with the Kafka client, and of course, the problem 
> should be fixed on Kafka side if so.
> However, Flink Kafka connector doesn't provide an automatic way to save this 
> situation. KafkaSource keeps retrying forever when a retriable error occurs, 
> even if it is not retriable actually: 
> [https://github.com/apache/flink/blob/afb29d92c4e76ec6a453459c3d8a08304efec549/flink-connectors/flink-connector-kafka/src/main/java/org/apache/flink/connector/kafka/source/reader/KafkaSourceReader.java#L144-L148]
> Since it sends metrics of the number of times a commit fails, it could be 
> automated by monitoring it and restarting the job, but that would mean we 
> need to have a new process to be managed.
> Does it make sense to have KafkaSource have the option like, let the source 
> task fail if it keeps failing to commit an offset more than X times?



--
This message was sent by Atlassian Jira
(v8.20.1#820001)

Reply via email to