[ https://issues.apache.org/jira/browse/FLINK-18150?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17152808#comment-17152808 ]
Aljoscha Krettek commented on FLINK-18150: ------------------------------------------ This is even easier to reproduce with this: {code:java} package org.apache.flink.streaming.connectors.kafka; import org.apache.kafka.clients.consumer.KafkaConsumer; import org.apache.kafka.common.serialization.StringDeserializer; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import java.util.Properties; /** * The list of brokers should contain a broker that doesn't exist. Sometimes this example will * succeed, sometimes it hangs indefinitely. This depends on which broker from the bootstrap servers * the client picks. */ public class FetchFromMissingBroker { private static final Logger LOG = LoggerFactory.getLogger(FetchFromMissingBroker.class); public static void main(String[] args) { LOG.info("Starting..."); Properties kafkaProperties = new Properties(); kafkaProperties.setProperty("bootstrap.servers", "localhost:9092,localhost:9093"); kafkaProperties.setProperty("group.id", "joebs"); kafkaProperties.setProperty("key.deserializer", StringDeserializer.class.getCanonicalName()); kafkaProperties.setProperty("value.deserializer", StringDeserializer.class.getCanonicalName()); KafkaConsumer<String, String> consumer = new KafkaConsumer<>(kafkaProperties); consumer.listTopics(); } } {code} with trace logging on {{org.apache.kafka.clients.NetworkClient}} you can see what happens in the different cases. # good case: the client picks {{localhost:9093}} -> all good # semi-good case: the client picks {{localhost:9092}}, then tries again with {{localhost:9093}} -> all good # bad case: the client picks {{localhost:9092}} and then tries with that again after a timeout -> bad This is an interesting excerpt from a log where we see it repeatedly picks the missing broker and then finally the good broker: {code:java} 991 [main] DEBUG org.apache.kafka.clients.consumer.KafkaConsumer [] - [Consumer clientId=consumer-joebs-1, groupId=joebs] Initializing the Kafka consumer 1111 [main] INFO org.apache.kafka.common.utils.AppInfoParser [] - Kafka version: 2.4.1 1111 [main] INFO org.apache.kafka.common.utils.AppInfoParser [] - Kafka commitId: c57222ae8cd7866b 1111 [main] INFO org.apache.kafka.common.utils.AppInfoParser [] - Kafka startTimeMs: 1594132735452 1112 [main] DEBUG org.apache.kafka.clients.consumer.KafkaConsumer [] - [Consumer clientId=consumer-joebs-1, groupId=joebs] Kafka consumer initialized 1334 [main] TRACE org.apache.kafka.clients.NetworkClient [] - [Consumer clientId=consumer-joebs-1, groupId=joebs] Found least loaded node localhost:9092 (id: -1 rack: null) with no active connection 1338 [main] DEBUG org.apache.kafka.clients.NetworkClient [] - [Consumer clientId=consumer-joebs-1, groupId=joebs] Initiating connection to node localhost:9092 (id: -1 rack: null) using address localhost/127.0.0.1 1358 [main] TRACE org.apache.kafka.clients.NetworkClient [] - [Consumer clientId=consumer-joebs-1, groupId=joebs] Found least loaded connecting node localhost:9092 (id: -1 rack: null) 1364 [main] DEBUG org.apache.kafka.common.network.Selector [] - [Consumer clientId=consumer-joebs-1, groupId=joebs] Created socket with SO_RCVBUF = 342972, SO_SNDBUF = 146988, SO_TIMEOUT = 0 to node -1 1365 [main] DEBUG org.apache.kafka.clients.NetworkClient [] - [Consumer clientId=consumer-joebs-1, groupId=joebs] Completed connection to node -1. Fetching API versions. 1365 [main] DEBUG org.apache.kafka.clients.NetworkClient [] - [Consumer clientId=consumer-joebs-1, groupId=joebs] Initiating API versions fetch from node -1. 1365 [main] TRACE org.apache.kafka.clients.NetworkClient [] - [Consumer clientId=consumer-joebs-1, groupId=joebs] No version information found when sending API_VERSIONS with correlation id 1 to node -1. Assuming version 3. 1426 [main] TRACE org.apache.kafka.clients.NetworkClient [] - [Consumer clientId=consumer-joebs-1, groupId=joebs] Sending API_VERSIONS {client_software_name=apache-kafka-java,client_software_version=2.4.1,_tagged_fields={}} with correlation id 1 to node -1 1429 [main] TRACE org.apache.kafka.clients.NetworkClient [] - [Consumer clientId=consumer-joebs-1, groupId=joebs] Found least loaded connecting node localhost:9092 (id: -1 rack: null) 1434 [main] TRACE org.apache.kafka.clients.NetworkClient [] - [Consumer clientId=consumer-joebs-1, groupId=joebs] Found least loaded connecting node localhost:9092 (id: -1 rack: null) 4434 [main] TRACE org.apache.kafka.clients.NetworkClient [] - [Consumer clientId=consumer-joebs-1, groupId=joebs] Found least loaded connecting node localhost:9092 (id: -1 rack: null) 7438 [main] TRACE org.apache.kafka.clients.NetworkClient [] - [Consumer clientId=consumer-joebs-1, groupId=joebs] Found least loaded connecting node localhost:9092 (id: -1 rack: null) 10442 [main] TRACE org.apache.kafka.clients.NetworkClient [] - [Consumer clientId=consumer-joebs-1, groupId=joebs] Found least loaded connecting node localhost:9092 (id: -1 rack: null) 13445 [main] TRACE org.apache.kafka.clients.NetworkClient [] - [Consumer clientId=consumer-joebs-1, groupId=joebs] Found least loaded connecting node localhost:9092 (id: -1 rack: null) 16449 [main] TRACE org.apache.kafka.clients.NetworkClient [] - [Consumer clientId=consumer-joebs-1, groupId=joebs] Found least loaded connecting node localhost:9092 (id: -1 rack: null) 19452 [main] TRACE org.apache.kafka.clients.NetworkClient [] - [Consumer clientId=consumer-joebs-1, groupId=joebs] Found least loaded connecting node localhost:9092 (id: -1 rack: null) 22454 [main] TRACE org.apache.kafka.clients.NetworkClient [] - [Consumer clientId=consumer-joebs-1, groupId=joebs] Found least loaded connecting node localhost:9092 (id: -1 rack: null) 25455 [main] TRACE org.apache.kafka.clients.NetworkClient [] - [Consumer clientId=consumer-joebs-1, groupId=joebs] Found least loaded connecting node localhost:9092 (id: -1 rack: null) 28458 [main] TRACE org.apache.kafka.clients.NetworkClient [] - [Consumer clientId=consumer-joebs-1, groupId=joebs] Found least loaded connecting node localhost:9092 (id: -1 rack: null) 31462 [main] DEBUG org.apache.kafka.clients.NetworkClient [] - [Consumer clientId=consumer-joebs-1, groupId=joebs] Disconnecting from node -1 due to request timeout. 31465 [main] TRACE org.apache.kafka.clients.NetworkClient [] - [Consumer clientId=consumer-joebs-1, groupId=joebs] Cancelled request API_VERSIONS {client_software_name=apache-kafka-java,client_software_version=2.4.1,_tagged_fields={}} with correlation id 1 due to node -1 being disconnected 31465 [main] WARN org.apache.kafka.clients.NetworkClient [] - [Consumer clientId=consumer-joebs-1, groupId=joebs] Bootstrap broker localhost:9092 (id: -1 rack: null) disconnected 31469 [main] DEBUG org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient [] - [Consumer clientId=consumer-joebs-1, groupId=joebs] Cancelled request with header RequestHeader(apiKey=METADATA, apiVersion=9, clientId=consumer-joebs-1, correlationId=0) due to node -1 being disconnected 31572 [main] TRACE org.apache.kafka.clients.NetworkClient [] - [Consumer clientId=consumer-joebs-1, groupId=joebs] Found least loaded node localhost:9093 (id: -2 rack: null) with no active connection 31573 [main] DEBUG org.apache.kafka.clients.NetworkClient [] - [Consumer clientId=consumer-joebs-1, groupId=joebs] Initiating connection to node localhost:9093 (id: -2 rack: null) using address localhost/127.0.0.1 31574 [main] TRACE org.apache.kafka.clients.NetworkClient [] - [Consumer clientId=consumer-joebs-1, groupId=joebs] Found least loaded connecting node localhost:9093 (id: -2 rack: null) 31576 [main] DEBUG org.apache.kafka.common.network.Selector [] - [Consumer clientId=consumer-joebs-1, groupId=joebs] Created socket with SO_RCVBUF = 342972, SO_SNDBUF = 146988, SO_TIMEOUT = 0 to node -2 31577 [main] DEBUG org.apache.kafka.clients.NetworkClient [] - [Consumer clientId=consumer-joebs-1, groupId=joebs] Completed connection to node -2. Fetching API versions. 31577 [main] DEBUG org.apache.kafka.clients.NetworkClient [] - [Consumer clientId=consumer-joebs-1, groupId=joebs] Initiating API versions fetch from node -2. 31577 [main] TRACE org.apache.kafka.clients.NetworkClient [] - [Consumer clientId=consumer-joebs-1, groupId=joebs] No version information found when sending API_VERSIONS with correlation id 3 to node -2. Assuming version 3. 31577 [main] TRACE org.apache.kafka.clients.NetworkClient [] - [Consumer clientId=consumer-joebs-1, groupId=joebs] Sending API_VERSIONS {client_software_name=apache-kafka-java,client_software_version=2.4.1,_tagged_fields={}} with correlation id 3 to node -2 31577 [main] TRACE org.apache.kafka.clients.NetworkClient [] - [Consumer clientId=consumer-joebs-1, groupId=joebs] Found least loaded connecting node localhost:9093 (id: -2 rack: null) 31578 [main] TRACE org.apache.kafka.clients.NetworkClient [] - [Consumer clientId=consumer-joebs-1, groupId=joebs] Found least loaded connecting node localhost:9093 (id: -2 rack: null){code} The reason for why this problem becomes more prevalent in Flink 1.11 is here: [https://github.com/apache/kafka/blob/2.4/clients/src/main/java/org/apache/kafka/clients/NetworkClient.java#L684]. Which was introduced in Kafka 2.3.x: https://issues.apache.org/jira/browse/KAFKA-8376. The problem with this is that the "broken" node {{localhost:9092}} is listed as a connecting node and the logic will prefer that as the "least loaded node". With Flink 1.11.x you will have this "fix" while on Flink 1.10.x, which uses Kafka 2.2.x you don't have it. Also, as you found out already, there's two levels of retry: the first one is here: [https://github.com/apache/kafka/blob/2.4/clients/src/main/java/org/apache/kafka/clients/consumer/internals/Fetcher.java#L368]. Where the polling will continuously pick our broken broker. When that polling finally fails we send a completely new request in the next loop iteration, which might or might not pick the good broker now. > A single failing Kafka broker may cause jobs to fail indefinitely with > TimeoutException: Timeout expired while fetching topic metadata > -------------------------------------------------------------------------------------------------------------------------------------- > > Key: FLINK-18150 > URL: https://issues.apache.org/jira/browse/FLINK-18150 > Project: Flink > Issue Type: Bug > Components: Connectors / Kafka > Affects Versions: 1.10.1 > Environment: It is a bit unclear to me under what circumstances this > can be reproduced. I created a "minimum" non-working example at > https://github.com/jcaesar/flink-kafka-ha-failure. Note that this deploys the > minimum number of Kafka brokers, but it works just as well with replication > factor 3 and 8 brokers, e.g. > I run this with > {code:bash} > docker-compose kill; and docker-compose rm -vf; and docker-compose up > --abort-on-container-exit --build > {code} > The exception should appear on the webui after 5~6 minutes. > To make sure that this isn't dependent on my machine, I've also checked > reproducibility on a m5a.2xlarge EC2 instance. > You verify that the Kafka cluster is running "normally" e.g. with: > {code:bash} > kafkacat -b localhost,localhost:9093 -L > {code} > So far, I only know that > * {{flink.partition-discovery.interval-millis}} must be set. > * The broker that failed must be part of the {{bootstrap.servers}} > * There needs to be a certain amount of topics or producers, but I'm unsure > which is crucial > * Changing the values of {{metadata.request.timeout.ms}} or > {{flink.partition-discovery.interval-millis}} does not seem to have any > effect. > Reporter: Julius Michaelis > Assignee: Aljoscha Krettek > Priority: Major > > When a Kafka broker fails that is listed among the bootstrap servers and > partition discovery is active, the Flink job reading from that Kafka may > enter a failing loop. > At first, the job seems to react normally without failure with only a short > latency spike when switching Kafka leaders. > Then, it fails with a > {code:none} > org.apache.flink.streaming.connectors.kafka.internal.Handover$ClosedException > at > org.apache.flink.streaming.connectors.kafka.internal.Handover.close(Handover.java:182) > at > org.apache.flink.streaming.connectors.kafka.internal.KafkaFetcher.cancel(KafkaFetcher.java:175) > at > org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumerBase.cancel(FlinkKafkaConsumerBase.java:821) > at > org.apache.flink.streaming.api.operators.StreamSource.cancel(StreamSource.java:147) > at > org.apache.flink.streaming.runtime.tasks.SourceStreamTask.cancelTask(SourceStreamTask.java:136) > at > org.apache.flink.streaming.runtime.tasks.StreamTask.cancel(StreamTask.java:602) > at > org.apache.flink.runtime.taskmanager.Task$TaskCanceler.run(Task.java:1355) > at java.lang.Thread.run(Thread.java:748) > {code} > It recovers, but processes fewer than the expected amount of records. > Finally, the job fails with > {code:none} > 2020-06-05 13:59:37 > org.apache.kafka.common.errors.TimeoutException: Timeout expired while > fetching topic metadata > {code} > and repeats doing so while not processing any records. (The exception comes > without any backtrace or otherwise interesting information) > I have also observed this behavior with partition-discovery turned off, but > only when the Flink job failed (after a broker failure) and had to run > checkpoint recovery for some other reason. > Please see the [Environment] description for information on how to reproduce > the issue. -- This message was sent by Atlassian Jira (v8.3.4#803005)