brandboat commented on code in PR #19803:
URL: https://github.com/apache/kafka/pull/19803#discussion_r2106238196


##########
metadata/src/main/java/org/apache/kafka/metadata/MetadataCache.java:
##########
@@ -148,56 +147,53 @@ DescribeTopicPartitionsResponseData describeTopicResponse(
         boolean ignoreTopicsWithExceptions);
 
     static Cluster toCluster(String clusterId, MetadataImage image) {
-        Map<Integer, List<Node>> brokerToNodes = new HashMap<>();
-        image.cluster().brokers().values().stream()
-            .filter(broker -> !broker.fenced())
-            .forEach(broker -> brokerToNodes.put(broker.id(), broker.nodes()));
+        Map<Integer, Node> nodesById = 
image.cluster().brokers().values().stream()
+            .collect(Collectors.toMap(BrokerRegistration::id, broker -> 
broker.nodes().get(0)));

Review Comment:
   Pardon me, I meant "replicas", not leader.
   
   Maybe an example will help clarify things:
   Let’s say we have 3 brokers — broker0, broker1, and broker2 — and a topic 
called my-topic. Partition 0 of this topic has 3 replicas, one on each broker. 
Now suppose broker2 unexpectedly shuts down.
   
   Here’s what happens next:
   1. DynamicTopicClusterQuotaPublisher#onMetadataUpdate gets triggered
   2. That leads to MetadataCache#toCluster being called
   3. In toCluster, nodesById gets constructed without the fenced broker 
(broker2 is filtered out)
   4. Then MetadataCache#toArray uses this nodesById, but in the TopicsImage 
the PartitionRegistration (my-topic partition-0) still has replicas [0, 1, 2]. 
Since broker2 isn’t in nodesById, we get a NullPointerException, kaboom!



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to