ableegoldman commented on a change in pull request #8712:
URL: https://github.com/apache/kafka/pull/8712#discussion_r444555724



##########
File path: 
streams/src/main/java/org/apache/kafka/streams/processor/internals/InternalTopicManager.java
##########
@@ -98,9 +98,10 @@ public InternalTopicManager(final Admin adminClient, final 
StreamsConfig streams
         int remainingRetries = retries;
         Set<String> topicsNotReady = new HashSet<>(topics.keySet());
         final Set<String> newlyCreatedTopics = new HashSet<>();
+        final HashSet<String> leaderNotAvailableTopics = new HashSet<>();

Review comment:
       Can we give this a more descriptive name? It might be obvious to you, 
but I think someone just looking at this code for the first time would not get 
that this actually means topics that may or may not already exist.
   That said, I'm struggling to think of a good alternative...maybe 
`possiblyCreatedTopics` or `unknownTopics`...any better ideas?

##########
File path: 
clients/src/test/java/org/apache/kafka/clients/admin/MockAdminClient.java
##########
@@ -330,6 +331,12 @@ synchronized public DescribeTopicsResult 
describeTopics(Collection<String> topic
                 future.completeExceptionally(new 
UnknownTopicOrPartitionException("Topic " + requestedTopic + " not found."));
                 topicDescriptions.put(requestedTopic, future);
             }
+            // try to simulate the leader not available situation when topic 
name is "LeaderNotAvailableTopic"
+            if (requestedTopic.equals("LeaderNotAvailableTopic")) {
+                KafkaFutureImpl<TopicDescription> future = new 
KafkaFutureImpl<>();
+                future.completeExceptionally(new 
LeaderNotAvailableException("The leader of Topic " + requestedTopic + " is not 
available."));

Review comment:
       Is it possible to use `EasyMock` instead of adding this to the actual 
`MockAdminClient`? I know it's kind of a pain to set up but I think it'll make 
the test a lot more clear. I did something similar in 
StreamsPartitionAssignorTest to mock the results of the `listOffsets` request

##########
File path: 
streams/src/main/java/org/apache/kafka/streams/processor/internals/InternalTopicManager.java
##########
@@ -157,16 +161,16 @@ public InternalTopicManager(final Admin adminClient, 
final StreamsConfig streams
             }
 
 
-            if (!topicsNotReady.isEmpty()) {
-                log.info("Topics {} can not be made ready with {} retries 
left", topicsNotReady, retries);
+            if (isNeedRetry(topicsNotReady)) {
+                log.info("Topics {} can not be made ready with {} retries 
left", topicsNotReady, remainingRetries);

Review comment:
       Good catch

##########
File path: 
streams/src/main/java/org/apache/kafka/streams/processor/internals/InternalTopicManager.java
##########
@@ -242,11 +256,16 @@ public InternalTopicManager(final Admin adminClient, 
final StreamsConfig streams
                     log.error(errorMsg);
                     throw new StreamsException(errorMsg);
                 }
-            } else {
+            } else if (!leaderNotAvailableTopics.contains(topicName)) {
                 topicsToCreate.add(topicName);
             }
         }
 
         return topicsToCreate;
     }
+
+    private boolean shouldRetry(final Set<String> topicsNotReady, final 
HashSet<String> leaderNotAvailableTopics) {
+        // If there's topic with LeaderNotAvailableException, we still need 
retry
+        return !topicsNotReady.isEmpty() || leaderNotAvailableTopics.size() > 
0;

Review comment:
       Can we just use `!isEmpty` for both sets?




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Reply via email to