Christian Lorenz created FLINK-28060:
----------------------------------------

             Summary: Kafka Commit on checkpointing fails repeatedly after a 
broker restart
                 Key: FLINK-28060
                 URL: https://issues.apache.org/jira/browse/FLINK-28060
             Project: Flink
          Issue Type: Bug
          Components: API / DataStream, Connectors / Kafka
    Affects Versions: 1.15.0
         Environment: Reproduced on MacOS and Linux.

Using java 8, Flink 1.15.0, Kafka 2.8.1.
            Reporter: Christian Lorenz
         Attachments: flink-kafka-testjob.zip

When Kafka Offset committing is enabled and done on Flinks checkpointing, an 
error might occur if one Kafka broker is shutdown which might be the leader of 
that partition in Kafkas internal __consumer_offsets topic.

This is an expected behaviour. But once the broker is started up again, the 
next checkpoint issued by flink should commit the meanwhile processed offsets 
back to kafka. Somehow this does not seem to happen always in Flink 1.15.0 
anymore and the offset committing is broken. An warning like the following will 
be logged on each checkpoint:

{code}
[info] 14:33:13.684 WARN  [Source Data Fetcher for Source: input-kafka-source 
-> Sink: output-stdout-sink (1/1)#1] o.a.f.c.k.s.reader.KafkaSourceReader - 
Failed to commit consumer offsets for checkpoint 35
[info] org.apache.kafka.clients.consumer.RetriableCommitFailedException: Offset 
commit failed with a retriable exception. You should retry committing the 
latest consumed offsets.
[info] Caused by: 
org.apache.kafka.common.errors.CoordinatorNotAvailableException: The 
coordinator is not available.
{code}

To reproduce this I've attached a small flink job program.  To execute this 
java8, scala sbt and docker / docker-compose is required.  Also see readme.md 
for more details.
The job can be run with `sbt run`, kafka cluster is started by `docker-compose 
up`. If then the kafka brokers are restarted gracefully by e.g. `docker-compose 
stop kafka1` and `docker-compose start kafka1` with kafka2 and kafka3 
afterwards, this warning will occur and no offsets will be committed into kafka.

This is not reproducible in flink 1.14.4.





--
This message was sent by Atlassian Jira
(v8.20.7#820007)

Reply via email to