[ 
https://issues.apache.org/jira/browse/FLINK-18150?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17152808#comment-17152808
 ] 

Aljoscha Krettek commented on FLINK-18150:
------------------------------------------

This is even easier to reproduce with this:
{code:java}
package org.apache.flink.streaming.connectors.kafka;

import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.common.serialization.StringDeserializer;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.util.Properties;

/**
 * The list of brokers should contain a broker that doesn't exist. Sometimes 
this example will
 * succeed, sometimes it hangs indefinitely. This depends on which broker from 
the bootstrap servers
 * the client picks.
 */
public class FetchFromMissingBroker {

   private static final Logger LOG = 
LoggerFactory.getLogger(FetchFromMissingBroker.class);

   public static void main(String[] args) {
      LOG.info("Starting...");

      Properties kafkaProperties = new Properties();
      kafkaProperties.setProperty("bootstrap.servers", 
"localhost:9092,localhost:9093");
      kafkaProperties.setProperty("group.id", "joebs");
      kafkaProperties.setProperty("key.deserializer", 
StringDeserializer.class.getCanonicalName());
      kafkaProperties.setProperty("value.deserializer", 
StringDeserializer.class.getCanonicalName());
      KafkaConsumer<String, String> consumer = new 
KafkaConsumer<>(kafkaProperties);

      consumer.listTopics();
   }
}
{code}
with trace logging on {{org.apache.kafka.clients.NetworkClient}} you can see 
what happens in the different cases.
 # good case: the client picks {{localhost:9093}} -> all good
 # semi-good case: the client picks {{localhost:9092}}, then tries again with 
{{localhost:9093}} -> all good
 # bad case: the client picks {{localhost:9092}} and then tries with that again 
after a timeout -> bad

This is an interesting excerpt from a log where we see it repeatedly picks the 
missing broker and then finally the good broker:
{code:java}
991  [main] DEBUG org.apache.kafka.clients.consumer.KafkaConsumer [] - 
[Consumer clientId=consumer-joebs-1, groupId=joebs] Initializing the Kafka 
consumer
1111 [main] INFO  org.apache.kafka.common.utils.AppInfoParser [] - Kafka 
version: 2.4.1
1111 [main] INFO  org.apache.kafka.common.utils.AppInfoParser [] - Kafka 
commitId: c57222ae8cd7866b
1111 [main] INFO  org.apache.kafka.common.utils.AppInfoParser [] - Kafka 
startTimeMs: 1594132735452
1112 [main] DEBUG org.apache.kafka.clients.consumer.KafkaConsumer [] - 
[Consumer clientId=consumer-joebs-1, groupId=joebs] Kafka consumer initialized
1334 [main] TRACE org.apache.kafka.clients.NetworkClient [] - [Consumer 
clientId=consumer-joebs-1, groupId=joebs] Found least loaded node 
localhost:9092 (id: -1 rack: null) with no active connection
1338 [main] DEBUG org.apache.kafka.clients.NetworkClient [] - [Consumer 
clientId=consumer-joebs-1, groupId=joebs] Initiating connection to node 
localhost:9092 (id: -1 rack: null) using address localhost/127.0.0.1
1358 [main] TRACE org.apache.kafka.clients.NetworkClient [] - [Consumer 
clientId=consumer-joebs-1, groupId=joebs] Found least loaded connecting node 
localhost:9092 (id: -1 rack: null)
1364 [main] DEBUG org.apache.kafka.common.network.Selector [] - [Consumer 
clientId=consumer-joebs-1, groupId=joebs] Created socket with SO_RCVBUF = 
342972, SO_SNDBUF = 146988, SO_TIMEOUT = 0 to node -1
1365 [main] DEBUG org.apache.kafka.clients.NetworkClient [] - [Consumer 
clientId=consumer-joebs-1, groupId=joebs] Completed connection to node -1. 
Fetching API versions.
1365 [main] DEBUG org.apache.kafka.clients.NetworkClient [] - [Consumer 
clientId=consumer-joebs-1, groupId=joebs] Initiating API versions fetch from 
node -1.
1365 [main] TRACE org.apache.kafka.clients.NetworkClient [] - [Consumer 
clientId=consumer-joebs-1, groupId=joebs] No version information found when 
sending API_VERSIONS with correlation id 1 to node -1. Assuming version 3.
1426 [main] TRACE org.apache.kafka.clients.NetworkClient [] - [Consumer 
clientId=consumer-joebs-1, groupId=joebs] Sending API_VERSIONS 
{client_software_name=apache-kafka-java,client_software_version=2.4.1,_tagged_fields={}}
 with correlation id 1 to node -1
1429 [main] TRACE org.apache.kafka.clients.NetworkClient [] - [Consumer 
clientId=consumer-joebs-1, groupId=joebs] Found least loaded connecting node 
localhost:9092 (id: -1 rack: null)
1434 [main] TRACE org.apache.kafka.clients.NetworkClient [] - [Consumer 
clientId=consumer-joebs-1, groupId=joebs] Found least loaded connecting node 
localhost:9092 (id: -1 rack: null)
4434 [main] TRACE org.apache.kafka.clients.NetworkClient [] - [Consumer 
clientId=consumer-joebs-1, groupId=joebs] Found least loaded connecting node 
localhost:9092 (id: -1 rack: null)
7438 [main] TRACE org.apache.kafka.clients.NetworkClient [] - [Consumer 
clientId=consumer-joebs-1, groupId=joebs] Found least loaded connecting node 
localhost:9092 (id: -1 rack: null)
10442 [main] TRACE org.apache.kafka.clients.NetworkClient [] - [Consumer 
clientId=consumer-joebs-1, groupId=joebs] Found least loaded connecting node 
localhost:9092 (id: -1 rack: null)
13445 [main] TRACE org.apache.kafka.clients.NetworkClient [] - [Consumer 
clientId=consumer-joebs-1, groupId=joebs] Found least loaded connecting node 
localhost:9092 (id: -1 rack: null)
16449 [main] TRACE org.apache.kafka.clients.NetworkClient [] - [Consumer 
clientId=consumer-joebs-1, groupId=joebs] Found least loaded connecting node 
localhost:9092 (id: -1 rack: null)
19452 [main] TRACE org.apache.kafka.clients.NetworkClient [] - [Consumer 
clientId=consumer-joebs-1, groupId=joebs] Found least loaded connecting node 
localhost:9092 (id: -1 rack: null)
22454 [main] TRACE org.apache.kafka.clients.NetworkClient [] - [Consumer 
clientId=consumer-joebs-1, groupId=joebs] Found least loaded connecting node 
localhost:9092 (id: -1 rack: null)
25455 [main] TRACE org.apache.kafka.clients.NetworkClient [] - [Consumer 
clientId=consumer-joebs-1, groupId=joebs] Found least loaded connecting node 
localhost:9092 (id: -1 rack: null)
28458 [main] TRACE org.apache.kafka.clients.NetworkClient [] - [Consumer 
clientId=consumer-joebs-1, groupId=joebs] Found least loaded connecting node 
localhost:9092 (id: -1 rack: null)
31462 [main] DEBUG org.apache.kafka.clients.NetworkClient [] - [Consumer 
clientId=consumer-joebs-1, groupId=joebs] Disconnecting from node -1 due to 
request timeout.
31465 [main] TRACE org.apache.kafka.clients.NetworkClient [] - [Consumer 
clientId=consumer-joebs-1, groupId=joebs] Cancelled request API_VERSIONS 
{client_software_name=apache-kafka-java,client_software_version=2.4.1,_tagged_fields={}}
 with correlation id 1 due to node -1 being disconnected
31465 [main] WARN  org.apache.kafka.clients.NetworkClient [] - [Consumer 
clientId=consumer-joebs-1, groupId=joebs] Bootstrap broker localhost:9092 (id: 
-1 rack: null) disconnected
31469 [main] DEBUG 
org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient [] - 
[Consumer clientId=consumer-joebs-1, groupId=joebs] Cancelled request with 
header RequestHeader(apiKey=METADATA, apiVersion=9, clientId=consumer-joebs-1, 
correlationId=0) due to node -1 being disconnected
31572 [main] TRACE org.apache.kafka.clients.NetworkClient [] - [Consumer 
clientId=consumer-joebs-1, groupId=joebs] Found least loaded node 
localhost:9093 (id: -2 rack: null) with no active connection
31573 [main] DEBUG org.apache.kafka.clients.NetworkClient [] - [Consumer 
clientId=consumer-joebs-1, groupId=joebs] Initiating connection to node 
localhost:9093 (id: -2 rack: null) using address localhost/127.0.0.1
31574 [main] TRACE org.apache.kafka.clients.NetworkClient [] - [Consumer 
clientId=consumer-joebs-1, groupId=joebs] Found least loaded connecting node 
localhost:9093 (id: -2 rack: null)
31576 [main] DEBUG org.apache.kafka.common.network.Selector [] - [Consumer 
clientId=consumer-joebs-1, groupId=joebs] Created socket with SO_RCVBUF = 
342972, SO_SNDBUF = 146988, SO_TIMEOUT = 0 to node -2
31577 [main] DEBUG org.apache.kafka.clients.NetworkClient [] - [Consumer 
clientId=consumer-joebs-1, groupId=joebs] Completed connection to node -2. 
Fetching API versions.
31577 [main] DEBUG org.apache.kafka.clients.NetworkClient [] - [Consumer 
clientId=consumer-joebs-1, groupId=joebs] Initiating API versions fetch from 
node -2.
31577 [main] TRACE org.apache.kafka.clients.NetworkClient [] - [Consumer 
clientId=consumer-joebs-1, groupId=joebs] No version information found when 
sending API_VERSIONS with correlation id 3 to node -2. Assuming version 3.
31577 [main] TRACE org.apache.kafka.clients.NetworkClient [] - [Consumer 
clientId=consumer-joebs-1, groupId=joebs] Sending API_VERSIONS 
{client_software_name=apache-kafka-java,client_software_version=2.4.1,_tagged_fields={}}
 with correlation id 3 to node -2
31577 [main] TRACE org.apache.kafka.clients.NetworkClient [] - [Consumer 
clientId=consumer-joebs-1, groupId=joebs] Found least loaded connecting node 
localhost:9093 (id: -2 rack: null)
31578 [main] TRACE org.apache.kafka.clients.NetworkClient [] - [Consumer 
clientId=consumer-joebs-1, groupId=joebs] Found least loaded connecting node 
localhost:9093 (id: -2 rack: null){code}
The reason for why this problem becomes more prevalent in Flink 1.11 is here: 
[https://github.com/apache/kafka/blob/2.4/clients/src/main/java/org/apache/kafka/clients/NetworkClient.java#L684].
 Which was introduced in Kafka 2.3.x: 
https://issues.apache.org/jira/browse/KAFKA-8376. The problem with this is that 
the "broken" node {{localhost:9092}} is listed as a connecting node and the 
logic will prefer that as the "least loaded node". With Flink 1.11.x you will 
have this "fix" while on Flink 1.10.x, which uses Kafka 2.2.x you don't have it.

Also, as you found out already, there's two levels of retry: the first one is 
here: 
[https://github.com/apache/kafka/blob/2.4/clients/src/main/java/org/apache/kafka/clients/consumer/internals/Fetcher.java#L368].
 Where the polling will continuously pick our broken broker. When that polling 
finally fails we send a completely new request in the next loop iteration, 
which might or might not pick the good broker now.

> A single failing Kafka broker may cause jobs to fail indefinitely with 
> TimeoutException: Timeout expired while fetching topic metadata
> --------------------------------------------------------------------------------------------------------------------------------------
>
>                 Key: FLINK-18150
>                 URL: https://issues.apache.org/jira/browse/FLINK-18150
>             Project: Flink
>          Issue Type: Bug
>          Components: Connectors / Kafka
>    Affects Versions: 1.10.1
>         Environment: It is a bit unclear to me under what circumstances this 
> can be reproduced. I created a "minimum" non-working example at 
> https://github.com/jcaesar/flink-kafka-ha-failure. Note that this deploys the 
> minimum number of Kafka brokers, but it works just as well with replication 
> factor 3 and 8 brokers, e.g.
> I run this with
> {code:bash}
> docker-compose kill; and docker-compose rm -vf; and docker-compose up 
> --abort-on-container-exit --build
> {code}
> The exception should appear on the webui after 5~6 minutes.
> To make sure that this isn't dependent on my machine, I've also checked 
> reproducibility on a m5a.2xlarge EC2 instance.
> You verify that the Kafka cluster is running "normally" e.g. with:
> {code:bash}
> kafkacat -b localhost,localhost:9093 -L
> {code}
> So far, I only know that
> * {{flink.partition-discovery.interval-millis}} must be set.
> * The broker that failed must be part of the {{bootstrap.servers}}
> * There needs to be a certain amount of topics or producers, but I'm unsure 
> which is crucial
> * Changing the values of {{metadata.request.timeout.ms}} or 
> {{flink.partition-discovery.interval-millis}} does not seem to have any 
> effect.
>            Reporter: Julius Michaelis
>            Assignee: Aljoscha Krettek
>            Priority: Major
>
> When a Kafka broker fails that is listed among the bootstrap servers and 
> partition discovery is active, the Flink job reading from that Kafka may 
> enter a failing loop.
> At first, the job seems to react normally without failure with only a short 
> latency spike when switching Kafka leaders.
> Then, it fails with a
> {code:none}
> org.apache.flink.streaming.connectors.kafka.internal.Handover$ClosedException
>         at 
> org.apache.flink.streaming.connectors.kafka.internal.Handover.close(Handover.java:182)
>         at 
> org.apache.flink.streaming.connectors.kafka.internal.KafkaFetcher.cancel(KafkaFetcher.java:175)
>         at 
> org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumerBase.cancel(FlinkKafkaConsumerBase.java:821)
>         at 
> org.apache.flink.streaming.api.operators.StreamSource.cancel(StreamSource.java:147)
>         at 
> org.apache.flink.streaming.runtime.tasks.SourceStreamTask.cancelTask(SourceStreamTask.java:136)
>         at 
> org.apache.flink.streaming.runtime.tasks.StreamTask.cancel(StreamTask.java:602)
>         at 
> org.apache.flink.runtime.taskmanager.Task$TaskCanceler.run(Task.java:1355)
>         at java.lang.Thread.run(Thread.java:748)
> {code}
> It recovers, but processes fewer than the expected amount of records.
> Finally,  the job fails with
> {code:none}
> 2020-06-05 13:59:37
> org.apache.kafka.common.errors.TimeoutException: Timeout expired while 
> fetching topic metadata
> {code}
> and repeats doing so while not processing any records. (The exception comes 
> without any backtrace or otherwise interesting information)
> I have also observed this behavior with partition-discovery turned off, but 
> only when the Flink job failed (after a broker failure) and had to run 
> checkpoint recovery for some other reason.
> Please see the [Environment] description for information on how to reproduce 
> the issue.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)

Reply via email to