showuon commented on a change in pull request #8712:
URL: https://github.com/apache/kafka/pull/8712#discussion_r446126644



##########
File path: 
streams/src/main/java/org/apache/kafka/streams/processor/internals/InternalTopicManager.java
##########
@@ -125,35 +131,37 @@ public InternalTopicManager(final Admin adminClient, 
final StreamsConfig streams
 
                 final CreateTopicsResult createTopicsResult = 
adminClient.createTopics(newTopics);
 
-                for (final Map.Entry<String, KafkaFuture<Void>> 
createTopicResult : createTopicsResult.values().entrySet()) {
-                    final String topicName = createTopicResult.getKey();
-                    try {
-                        createTopicResult.getValue().get();
-                        topicsNotReady.remove(topicName);
-                    } catch (final InterruptedException fatalException) {
-                        // this should not happen; if it ever happens it 
indicate a bug
-                        Thread.currentThread().interrupt();
-                        log.error(INTERRUPTED_ERROR_MESSAGE, fatalException);
-                        throw new 
IllegalStateException(INTERRUPTED_ERROR_MESSAGE, fatalException);
-                    } catch (final ExecutionException executionException) {
-                        final Throwable cause = executionException.getCause();
-                        if (cause instanceof TopicExistsException) {
-                            // This topic didn't exist earlier or its leader 
not known before; just retain it for next round of validation.
-                            log.info("Could not create topic {}. Topic is 
probably marked for deletion (number of partitions is unknown).\n" +
-                                "Will retry to create this topic in {} ms (to 
let broker finish async delete operation first).\n" +
-                                "Error message was: {}", topicName, 
retryBackOffMs, cause.toString());
-                        } else {
-                            log.error("Unexpected error during topic creation 
for {}.\n" +
-                                "Error message was: {}", topicName, 
cause.toString());
-                            throw new StreamsException(String.format("Could 
not create topic %s.", topicName), cause);
+                if (createTopicsResult != null) {
+                    for (final Map.Entry<String, KafkaFuture<Void>> 
createTopicResult : createTopicsResult.values().entrySet()) {

Review comment:
       Need the null check for `createTopicsResult` since the `newTopics` might 
be empty




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Reply via email to