brandboat commented on code in PR #19803: URL: https://github.com/apache/kafka/pull/19803#discussion_r2106238196
########## metadata/src/main/java/org/apache/kafka/metadata/MetadataCache.java: ########## @@ -148,56 +147,53 @@ DescribeTopicPartitionsResponseData describeTopicResponse( boolean ignoreTopicsWithExceptions); static Cluster toCluster(String clusterId, MetadataImage image) { - Map<Integer, List<Node>> brokerToNodes = new HashMap<>(); - image.cluster().brokers().values().stream() - .filter(broker -> !broker.fenced()) - .forEach(broker -> brokerToNodes.put(broker.id(), broker.nodes())); + Map<Integer, Node> nodesById = image.cluster().brokers().values().stream() + .collect(Collectors.toMap(BrokerRegistration::id, broker -> broker.nodes().get(0))); Review Comment: Pardon me, I meant "replicas", not leader. Maybe an example will help clarify things: Let’s say we have 3 brokers — broker0, broker1, and broker2 — and a topic called my-topic. Partition 0 of this topic has 3 replicas, one on each broker. Now suppose broker2 unexpectedly shuts down. Here’s what happens next: 1. DynamicTopicClusterQuotaPublisher#onMetadataUpdate gets triggered 2. That leads to MetadataCache#toCluster being called 3. In toCluster, nodesById gets constructed without the fenced broker (broker2 is filtered out) 4. Then MetadataCache#toArray uses this nodesById, but in the TopicsImage the PartitionRegistration (my-topic partition-0) still has replicas [0, 1, 2]. Since broker2 isn’t in nodesById, we get a NullPointerException, kaboom! -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org