
Julius Michaelis edited comment on FLINK-18150 at 6/26/20, 9:12 AM:

Uh. It probably doesn't happen without producers. But I'm not entirely sure, 
since I have two ways of running into this bug
 * Turn on {{flink.partition-discovery.interval-millis}}, which requires 
producers, or
 * Cause the the job to run recovery for some other, unrelated reason, e.g. 
restart a taskmanager

Other (non-)similarities to FLINK-17327 are:
 * I am seeing {{Exceeded checkpoint tolerable failure threshold}}
 ** I've seen this appear on a job that had 
 * I am seeing the {{Handover$ClosedException}}
 * I am not seeing {{Increase kafka producers pool size or decrease number of 
concurrent checkpoints.}}

If it's already fixed on master and release-1.11, wouldn't it be easy to try 
whether it's reproducible with that? (Or so I thought. I just wasted the better 
part of a day trying to get the dependencies and build right.) If there's an 
easy way of getting a docker image (to use in my [reproducing 
setup|https://github.com/jcaesar/flink-kafka-ha-failure]) and a matching set of 
maven dependencies for a snapshot, please let me know...

was (Author: caesar):
Uh. It probably doesn't happen without producers. But I'm not entirely sure, 
since I have two ways of running into this bug
* Turn on {{flink.partition-discovery.interval-millis}}, which requires 
producers, or
* Cause the the job to run recovery for some other, unrelated reason, e.g. 
restart a taskmanager

Other (non-)similarities to FLINK-17327 are:
* I am seeing {{Exceeded checkpoint tolerable failure threshold}}
  * I've seen this appear on a job that had 
* I am seeing the {{Handover$ClosedException}}
* I am not seeing {{Increase kafka producers pool size or decrease number of 
concurrent checkpoints.}}

If it's already fixed on master and release-1.11, wouldn't it be easy to try 
whether it's reproducible with that? (Or so I thought. I just wasted the better 
part of a day trying to get the dependencies and build right.) If there's an 
easy way of getting a docker image (to use in my [reproducing 
setup|https://github.com/jcaesar/flink-kafka-ha-failure]) and a matching set of 
maven dependencies for a snapshot, please let me know...

> A single failing Kafka broker may cause jobs to fail indefinitely with 
> TimeoutException: Timeout expired while fetching topic metadata
> --------------------------------------------------------------------------------------------------------------------------------------
>                 Key: FLINK-18150
>                 URL: https://issues.apache.org/jira/browse/FLINK-18150
>             Project: Flink
>          Issue Type: Bug
>          Components: Connectors / Kafka
>    Affects Versions: 1.10.1
>         Environment: It is a bit unclear to me under what circumstances this 
> can be reproduced. I created a "minimum" non-working example at 
> https://github.com/jcaesar/flink-kafka-ha-failure. Note that this deploys the 
> minimum number of Kafka brokers, but it works just as well with replication 
> factor 3 and 8 brokers, e.g.
> I run this with
> {code:bash}
> docker-compose kill; and docker-compose rm -vf; and docker-compose up 
> --abort-on-container-exit --build
> {code}
> The exception should appear on the webui after 5~6 minutes.
> To make sure that this isn't dependent on my machine, I've also checked 
> reproducibility on a m5a.2xlarge EC2 instance.
> You verify that the Kafka cluster is running "normally" e.g. with:
> {code:bash}
> kafkacat -b localhost,localhost:9093 -L
> {code}
> So far, I only know that
> * {{flink.partition-discovery.interval-millis}} must be set.
> * The broker that failed must be part of the {{bootstrap.servers}}
> * There needs to be a certain amount of topics or producers, but I'm unsure 
> which is crucial
> * Changing the values of {{metadata.request.timeout.ms}} or 
> {{flink.partition-discovery.interval-millis}} does not seem to have any 
> effect.
>            Reporter: Julius Michaelis
>            Priority: Major
> When a Kafka broker fails that is listed among the bootstrap servers and 
> partition discovery is active, the Flink job reading from that Kafka may 
> enter a failing loop.
> At first, the job seems to react normally without failure with only a short 
> latency spike when switching Kafka leaders.
> Then, it fails with a
> {code:none}
> org.apache.flink.streaming.connectors.kafka.internal.Handover$ClosedException
>         at 
> org.apache.flink.streaming.connectors.kafka.internal.Handover.close(Handover.java:182)
>         at 
> org.apache.flink.streaming.connectors.kafka.internal.KafkaFetcher.cancel(KafkaFetcher.java:175)
>         at 
> org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumerBase.cancel(FlinkKafkaConsumerBase.java:821)
>         at 
> org.apache.flink.streaming.api.operators.StreamSource.cancel(StreamSource.java:147)
>         at 
> org.apache.flink.streaming.runtime.tasks.SourceStreamTask.cancelTask(SourceStreamTask.java:136)
>         at 
> org.apache.flink.streaming.runtime.tasks.StreamTask.cancel(StreamTask.java:602)
>         at 
> org.apache.flink.runtime.taskmanager.Task$TaskCanceler.run(Task.java:1355)
>         at java.lang.Thread.run(Thread.java:748)
> {code}
> It recovers, but processes fewer than the expected amount of records.
> Finally,  the job fails with
> {code:none}
> 2020-06-05 13:59:37
> org.apache.kafka.common.errors.TimeoutException: Timeout expired while 
> fetching topic metadata
> {code}
> and repeats doing so while not processing any records. (The exception comes 
> without any backtrace or otherwise interesting information)
> I have also observed this behavior with partition-discovery turned off, but 
> only when the Flink job failed (after a broker failure) and had to run 
> checkpoint recovery for some other reason.
> Please see the [Environment] description for information on how to reproduce 
> the issue.

This message was sent by Atlassian Jira

Reply via email to