[ https://issues.apache.org/jira/browse/FLINK-18150?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17146138#comment-17146138 ]
Julius Michaelis edited comment on FLINK-18150 at 6/26/20, 9:15 AM: -------------------------------------------------------------------- Uh. It probably doesn't happen without producers. But I'm not entirely sure, since I have two ways of running into this bug Either: * Turn on {{flink.partition-discovery.interval-millis}}, and then it does not happen unless there's a certain number of producers, or * Cause the the job to run recovery for some other, unrelated reason, e.g. restart a taskmanager, after failing the Kafka node Other (non-)similarities to FLINK-17327 are: * I am seeing {{Exceeded checkpoint tolerable failure threshold}} ** I've seen this appear on a job that had {{setFailTaskOnCheckpointError(false)}} * I am seeing the {{Handover$ClosedException}} * I am not seeing {{Increase kafka producers pool size or decrease number of concurrent checkpoints.}} If it's already fixed on master and release-1.11, wouldn't it be easy to try whether it's reproducible with that? (Or so I thought. I just wasted the better part of a day trying to get the dependencies and build right. Couldn't.) If there's an easy way of getting a docker image (to use in my [reproducing setup|https://github.com/jcaesar/flink-kafka-ha-failure]) and a matching set of maven dependencies for a snapshot, please let me know... was (Author: caesar): Uh. It probably doesn't happen without producers. But I'm not entirely sure, since I have two ways of running into this bug Either: * Turn on {{flink.partition-discovery.interval-millis}}, and then it does not happen unless there's a certain number of producers, or * Cause the the job to run recovery for some other, unrelated reason, e.g. restart a taskmanager, after failing the Kafka node Other (non-)similarities to FLINK-17327 are: * I am seeing {{Exceeded checkpoint tolerable failure threshold}} ** I've seen this appear on a job that had {{setFailTaskOnCheckpointError(false)}} * I am seeing the {{Handover$ClosedException}} * I am not seeing {{Increase kafka producers pool size or decrease number of concurrent checkpoints.}} If it's already fixed on master and release-1.11, wouldn't it be easy to try whether it's reproducible with that? (Or so I thought. I just wasted the better part of a day trying to get the dependencies and build right.) If there's an easy way of getting a docker image (to use in my [reproducing setup|https://github.com/jcaesar/flink-kafka-ha-failure]) and a matching set of maven dependencies for a snapshot, please let me know... > A single failing Kafka broker may cause jobs to fail indefinitely with > TimeoutException: Timeout expired while fetching topic metadata > -------------------------------------------------------------------------------------------------------------------------------------- > > Key: FLINK-18150 > URL: https://issues.apache.org/jira/browse/FLINK-18150 > Project: Flink > Issue Type: Bug > Components: Connectors / Kafka > Affects Versions: 1.10.1 > Environment: It is a bit unclear to me under what circumstances this > can be reproduced. I created a "minimum" non-working example at > https://github.com/jcaesar/flink-kafka-ha-failure. Note that this deploys the > minimum number of Kafka brokers, but it works just as well with replication > factor 3 and 8 brokers, e.g. > I run this with > {code:bash} > docker-compose kill; and docker-compose rm -vf; and docker-compose up > --abort-on-container-exit --build > {code} > The exception should appear on the webui after 5~6 minutes. > To make sure that this isn't dependent on my machine, I've also checked > reproducibility on a m5a.2xlarge EC2 instance. > You verify that the Kafka cluster is running "normally" e.g. with: > {code:bash} > kafkacat -b localhost,localhost:9093 -L > {code} > So far, I only know that > * {{flink.partition-discovery.interval-millis}} must be set. > * The broker that failed must be part of the {{bootstrap.servers}} > * There needs to be a certain amount of topics or producers, but I'm unsure > which is crucial > * Changing the values of {{metadata.request.timeout.ms}} or > {{flink.partition-discovery.interval-millis}} does not seem to have any > effect. > Reporter: Julius Michaelis > Priority: Major > > When a Kafka broker fails that is listed among the bootstrap servers and > partition discovery is active, the Flink job reading from that Kafka may > enter a failing loop. > At first, the job seems to react normally without failure with only a short > latency spike when switching Kafka leaders. > Then, it fails with a > {code:none} > org.apache.flink.streaming.connectors.kafka.internal.Handover$ClosedException > at > org.apache.flink.streaming.connectors.kafka.internal.Handover.close(Handover.java:182) > at > org.apache.flink.streaming.connectors.kafka.internal.KafkaFetcher.cancel(KafkaFetcher.java:175) > at > org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumerBase.cancel(FlinkKafkaConsumerBase.java:821) > at > org.apache.flink.streaming.api.operators.StreamSource.cancel(StreamSource.java:147) > at > org.apache.flink.streaming.runtime.tasks.SourceStreamTask.cancelTask(SourceStreamTask.java:136) > at > org.apache.flink.streaming.runtime.tasks.StreamTask.cancel(StreamTask.java:602) > at > org.apache.flink.runtime.taskmanager.Task$TaskCanceler.run(Task.java:1355) > at java.lang.Thread.run(Thread.java:748) > {code} > It recovers, but processes fewer than the expected amount of records. > Finally, the job fails with > {code:none} > 2020-06-05 13:59:37 > org.apache.kafka.common.errors.TimeoutException: Timeout expired while > fetching topic metadata > {code} > and repeats doing so while not processing any records. (The exception comes > without any backtrace or otherwise interesting information) > I have also observed this behavior with partition-discovery turned off, but > only when the Flink job failed (after a broker failure) and had to run > checkpoint recovery for some other reason. > Please see the [Environment] description for information on how to reproduce > the issue. -- This message was sent by Atlassian Jira (v8.3.4#803005)