showuon commented on a change in pull request #8712:
URL: https://github.com/apache/kafka/pull/8712#discussion_r445625112



##########
File path: 
streams/src/main/java/org/apache/kafka/streams/processor/internals/InternalTopicManager.java
##########
@@ -179,31 +180,43 @@ public InternalTopicManager(final Admin adminClient, 
final StreamsConfig streams
      * Topics that were not able to get its description will simply not be 
returned
      */
     // visible for testing
-    protected Map<String, Integer> getNumPartitions(final Set<String> topics) {
-        log.debug("Trying to check if topics {} have been created with 
expected number of partitions.", topics);
-
-        final DescribeTopicsResult describeTopicsResult = 
adminClient.describeTopics(topics);
+    protected Map<String, Integer> getNumPartitions(final Set<String> topics,
+                                                    final HashSet<String> 
tempUnknownTopics,
+                                                    final int 
remainingRetries) {
+        final Set<String> allTopicsToDescribe = new HashSet<>(topics);
+        allTopicsToDescribe.addAll(tempUnknownTopics);

Review comment:
       The `allTopicsToDescribe` is for `tempUnknownTopics` to have chance to 
get described again, by the retries in `makeReady` method. In the `makeReady`, 
we want to know which topics existed and to validate it, and which topics not 
existed that needed to be created. But for the `LeaderNotAvailable`topics, we 
can't know if topics existed or not. So, we need to merge `topics`(topics to 
validate) and `tempUnknownTopics` here to describe them all (again) here.
   
   Thanks.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Reply via email to