MinsuJo created KAFKA-10039: ------------------------------- Summary: A Kafka broker is gracefully shutdown, and incorrect metadata was passed to the Kafka connect client. Key: KAFKA-10039 URL: https://issues.apache.org/jira/browse/KAFKA-10039 Project: Kafka Issue Type: Bug Components: KafkaConnect Affects Versions: 2.3.1 Reporter: MinsuJo
To maintain the server, one of the 20 brokers was shutdown gracefully, but all kafka-sink-connect cluster suddenly died with the following NPE error. {code:java} // error log: kafka distributed sink connect [2020-05-22 15:16:20,433] ERROR [Worker clientId=connect-1, groupId=dc2-log-hyper-connector] Uncaught exception in herder work thread, exiting: (org.apache.kafka.connect.runtime.distributed.DistributedHerder:253) java.lang.NullPointerException at java.util.Objects.requireNonNull(Objects.java:203) at org.apache.kafka.common.Cluster.<init>(Cluster.java:134) at org.apache.kafka.common.Cluster.<init>(Cluster.java:89) at org.apache.kafka.clients.MetadataCache.computeClusterView(MetadataCache.java:120) at org.apache.kafka.clients.MetadataCache.<init>(MetadataCache.java:82) at org.apache.kafka.clients.MetadataCache.<init>(MetadataCache.java:58) at org.apache.kafka.clients.Metadata.handleMetadataResponse(Metadata.java:325) at org.apache.kafka.clients.Metadata.update(Metadata.java:252) at org.apache.kafka.clients.NetworkClient$DefaultMetadataUpdater.handleCompletedMetadataResponse(NetworkClient.java:1059) at org.apache.kafka.clients.NetworkClient.handleCompletedReceives(NetworkClient.java:845) at org.apache.kafka.clients.NetworkClient.poll(NetworkClient.java:548) at org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:262) at org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:233) at org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:224) at org.apache.kafka.connect.runtime.distributed.WorkerCoordinator.poll(WorkerCoordinator.java:154) at org.apache.kafka.connect.runtime.distributed.WorkerGroupMember.poll(WorkerGroupMember.java:166) at org.apache.kafka.connect.runtime.distributed.DistributedHerder.tick(DistributedHerder.java:355) at org.apache.kafka.connect.runtime.distributed.DistributedHerder.run(DistributedHerder.java:245) at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511) at java.util.concurrent.FutureTask.run(FutureTask.java:266) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) at java.lang.Thread.run(Thread.java:748) {code} Replication-factor of all topics was more than 2, there were 50 topics and 200 partitions. So, checking up the error and the Kafka library source code, it seems that the error occurred when the Connect Distributed Herder (client) cached the metadata including the broker_node_id_set and partition_info_set received from the broker. {code:java} // source code: org.apache.kafka.common.Cluster (v2.3.1) // index the partition infos by topic, topic+partition, and node // note that this code is performance sensitive if there are a large number of partitions so we are careful // to avoid unnecessary work Map<TopicPartition, PartitionInfo> tmpPartitionsByTopicPartition = new HashMap<>(partitions.size()); Map<String, List<PartitionInfo>> tmpPartitionsByTopic = new HashMap<>(); for (PartitionInfo p : partitions) { tmpPartitionsByTopicPartition.put(new TopicPartition(p.topic(), p.partition()), p); List<PartitionInfo> partitionsForTopic = tmpPartitionsByTopic.get(p.topic()); if (partitionsForTopic == null) { partitionsForTopic = new ArrayList<>(); tmpPartitionsByTopic.put(p.topic(), partitionsForTopic); } partitionsForTopic.add(p); if (p.leader() != null) { // The broker guarantees that if a partition has a non-null leader, it is one of the brokers returned // in the metadata response List<PartitionInfo> partitionsForNode = Objects.requireNonNull(tmpPartitionsByNode.get(p.leader().id())); partitionsForNode.add(p); } } {code} How can this happen, and how to deal with it in the future? (the Version of Broker and Client is v2.3.1) -- This message was sent by Atlassian Jira (v8.3.4#803005)