Julius Michaelis created FLINK-18150:
----------------------------------------

             Summary: A single failing Kafka broker may cause jobs to fail 
indefinitely with TimeoutException: Timeout expired while fetching topic 
metadata
                 Key: FLINK-18150
                 URL: https://issues.apache.org/jira/browse/FLINK-18150
             Project: Flink
          Issue Type: Bug
          Components: Connectors / Kafka
    Affects Versions: 1.10.1
         Environment: It is a bit unclear to me under what circumstances this 
can be reproduced. I created a "minimum" non-working example at 
https://github.com/jcaesar/flink-kafka-ha-failure. Note that this deploys the 
minimum number of Kafka brokers, but it works just as well with replication 
factor 3 and 8 brokers, e.g.

I run this with
{code:bash}
docker-compose kill; and docker-compose rm -vf; and docker-compose up 
--abort-on-container-exit --build
{code}
The exception should appear on the webui after 5~6 minutes.

You verify that the Kafka cluster is running "normally" e.g. with:
{code:bash}
kafkacat -b localhost,localhost:9093 -L
{code}

So far, I only know that
* {{flink.partition-discovery.interval-millis}} must be set.
* The broker that failed must be part of the {{bootstrap.servers}}
* There needs to be a certain amount of topics or producers, but I'm unsure 
which is crucial
* Changing the values of {{metadata.request.timeout.ms}} or 
{{flink.partition-discovery.interval-millis}} does not seem to have any effect.
            Reporter: Julius Michaelis


When a Kafka broker fails that is listed among the bootstrap servers and 
partition discovery is active, the Flink job reading from that Kafka may enter 
a failing loop.

At first, the job seems to react normally without failure with only a short 
latency spike when switching Kafka leaders.
Then, it fails with a
{code:plain}
org.apache.flink.streaming.connectors.kafka.internal.Handover$ClosedException
        at 
org.apache.flink.streaming.connectors.kafka.internal.Handover.close(Handover.java:182)
        at 
org.apache.flink.streaming.connectors.kafka.internal.KafkaFetcher.cancel(KafkaFetcher.java:175)
        at 
org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumerBase.cancel(FlinkKafkaConsumerBase.java:821)
        at 
org.apache.flink.streaming.api.operators.StreamSource.cancel(StreamSource.java:147)
        at 
org.apache.flink.streaming.runtime.tasks.SourceStreamTask.cancelTask(SourceStreamTask.java:136)
        at 
org.apache.flink.streaming.runtime.tasks.StreamTask.cancel(StreamTask.java:602)
        at 
org.apache.flink.runtime.taskmanager.Task$TaskCanceler.run(Task.java:1355)
        at java.lang.Thread.run(Thread.java:748)
{code}
It recovers, but processes fewer than the expected amount of records.

Finally,  the job fails with
{code:plain}
2020-06-05 13:59:37
org.apache.kafka.common.errors.TimeoutException: Timeout expired while fetching 
topic metadata
{code}
and repeats doing so while not processing any records.


I have also observed this behavior with partition-discovery turned off, but 
only when the Flink job failed (after a broker failure) and had to run 
checkpoint recovery for some other reason.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)

Reply via email to