mjsax commented on code in PR #20326:
URL: https://github.com/apache/kafka/pull/20326#discussion_r2278384006


##########
streams/src/main/java/org/apache/kafka/streams/processor/internals/InternalTopicManager.java:
##########
@@ -113,7 +113,7 @@ public InternalTopicManager(final Time time,
         }
     }
 
-    static class ValidationResult {
+    public static class ValidationResult {

Review Comment:
   Why do we need to make this class `public` ?



##########
streams/src/main/java/org/apache/kafka/streams/processor/internals/InternalTopicManager.java:
##########
@@ -461,120 +466,119 @@ public Set<String> makeReady(final Map<String, 
InternalTopicConfig> topics) {
         // have existed with the expected number of partitions, or some create 
topic returns fatal errors.
         log.debug("Starting to validate internal topics {} in partition 
assignor.", topics);
 
-        long currentWallClockMs = time.milliseconds();
+        final long currentWallClockMs = time.milliseconds();
         final long deadlineMs = currentWallClockMs + retryTimeoutMs;
 
-        Set<String> topicsNotReady = new HashSet<>(topics.keySet());
-        final Set<String> newlyCreatedTopics = new HashSet<>();
+        final Set<String> topicsNotReady = new HashSet<>(topics.keySet());
+        final Set<String> newTopics = new HashSet<>();
 
         while (!topicsNotReady.isEmpty()) {
-            final Set<String> tempUnknownTopics = new HashSet<>();
-            topicsNotReady = validateTopics(topicsNotReady, topics, 
tempUnknownTopics);
-            newlyCreatedTopics.addAll(topicsNotReady);
-
+            final Set<NewTopic> topicsToCreate = computeTopicsToCreate(topics, 
topicsNotReady, newTopics);
+            if (!topicsToCreate.isEmpty()) {
+                readyTopics(topicsToCreate, topicsNotReady);
+            }
             if (!topicsNotReady.isEmpty()) {
-                final Set<NewTopic> newTopics = new HashSet<>();
+                maybeThrowTimeout(new TimeoutContext(
+                    Collections.singleton("makeReadyCheck"), // dummy 
collection just to trigger if `topicsNotReady` is non-empty
+                    deadlineMs,
+                    "MakeReady timeout",
+                    String.format("Could not create topics within %d 
milliseconds. This can happen if the Kafka cluster is temporarily not 
available.", retryTimeoutMs),
+                    null
+                ));
 
-                for (final String topicName : topicsNotReady) {
-                    if (tempUnknownTopics.contains(topicName)) {
-                        // for the tempUnknownTopics, don't create topic for 
them
-                        // we'll check again later if remaining retries > 0
-                        continue;
-                    }
-                    final InternalTopicConfig internalTopicConfig = 
Objects.requireNonNull(topics.get(topicName));
-                    final Map<String, String> topicConfig = 
internalTopicConfig.properties(defaultTopicConfigs, 
windowChangeLogAdditionalRetention);
+            }
+        }
+        log.debug("Completed validating internal topics and created {}", 
newTopics);
 
-                    log.debug("Going to create topic {} with {} partitions and 
config {}.",
-                        internalTopicConfig.name(),
-                        internalTopicConfig.numberOfPartitions(),
-                        topicConfig);
+        return newTopics;
+    }
+
+    private Set<NewTopic> computeTopicsToCreate(final Map<String, 
InternalTopicConfig> topics,
+                                                      final Set<String> 
topicsNotReady,

Review Comment:
   nit: fix indention



##########
streams/src/main/java/org/apache/kafka/streams/processor/internals/InternalTopicManager.java:
##########
@@ -461,120 +466,119 @@ public Set<String> makeReady(final Map<String, 
InternalTopicConfig> topics) {
         // have existed with the expected number of partitions, or some create 
topic returns fatal errors.
         log.debug("Starting to validate internal topics {} in partition 
assignor.", topics);
 
-        long currentWallClockMs = time.milliseconds();
+        final long currentWallClockMs = time.milliseconds();
         final long deadlineMs = currentWallClockMs + retryTimeoutMs;
 
-        Set<String> topicsNotReady = new HashSet<>(topics.keySet());
-        final Set<String> newlyCreatedTopics = new HashSet<>();
+        final Set<String> topicsNotReady = new HashSet<>(topics.keySet());
+        final Set<String> newTopics = new HashSet<>();
 
         while (!topicsNotReady.isEmpty()) {
-            final Set<String> tempUnknownTopics = new HashSet<>();
-            topicsNotReady = validateTopics(topicsNotReady, topics, 
tempUnknownTopics);
-            newlyCreatedTopics.addAll(topicsNotReady);
-
+            final Set<NewTopic> topicsToCreate = computeTopicsToCreate(topics, 
topicsNotReady, newTopics);
+            if (!topicsToCreate.isEmpty()) {
+                readyTopics(topicsToCreate, topicsNotReady);
+            }
             if (!topicsNotReady.isEmpty()) {
-                final Set<NewTopic> newTopics = new HashSet<>();
+                maybeThrowTimeout(new TimeoutContext(
+                    Collections.singleton("makeReadyCheck"), // dummy 
collection just to trigger if `topicsNotReady` is non-empty
+                    deadlineMs,
+                    "MakeReady timeout",
+                    String.format("Could not create topics within %d 
milliseconds. This can happen if the Kafka cluster is temporarily not 
available.", retryTimeoutMs),
+                    null
+                ));
 

Review Comment:
   Cf my other comment further below: I think we need to add the retryBackOff 
sleep here, if we where not able to create all topics yet, but did not yet hit 
the timeout.



##########
streams/src/main/java/org/apache/kafka/streams/processor/internals/InternalTopicManager.java:
##########
@@ -461,120 +466,119 @@ public Set<String> makeReady(final Map<String, 
InternalTopicConfig> topics) {
         // have existed with the expected number of partitions, or some create 
topic returns fatal errors.
         log.debug("Starting to validate internal topics {} in partition 
assignor.", topics);
 
-        long currentWallClockMs = time.milliseconds();
+        final long currentWallClockMs = time.milliseconds();
         final long deadlineMs = currentWallClockMs + retryTimeoutMs;
 
-        Set<String> topicsNotReady = new HashSet<>(topics.keySet());
-        final Set<String> newlyCreatedTopics = new HashSet<>();
+        final Set<String> topicsNotReady = new HashSet<>(topics.keySet());
+        final Set<String> newTopics = new HashSet<>();
 
         while (!topicsNotReady.isEmpty()) {
-            final Set<String> tempUnknownTopics = new HashSet<>();
-            topicsNotReady = validateTopics(topicsNotReady, topics, 
tempUnknownTopics);
-            newlyCreatedTopics.addAll(topicsNotReady);
-
+            final Set<NewTopic> topicsToCreate = computeTopicsToCreate(topics, 
topicsNotReady, newTopics);
+            if (!topicsToCreate.isEmpty()) {
+                readyTopics(topicsToCreate, topicsNotReady);
+            }
             if (!topicsNotReady.isEmpty()) {
-                final Set<NewTopic> newTopics = new HashSet<>();
+                maybeThrowTimeout(new TimeoutContext(
+                    Collections.singleton("makeReadyCheck"), // dummy 
collection just to trigger if `topicsNotReady` is non-empty
+                    deadlineMs,
+                    "MakeReady timeout",
+                    String.format("Could not create topics within %d 
milliseconds. This can happen if the Kafka cluster is temporarily not 
available.", retryTimeoutMs),
+                    null
+                ));
 
-                for (final String topicName : topicsNotReady) {
-                    if (tempUnknownTopics.contains(topicName)) {
-                        // for the tempUnknownTopics, don't create topic for 
them
-                        // we'll check again later if remaining retries > 0
-                        continue;
-                    }
-                    final InternalTopicConfig internalTopicConfig = 
Objects.requireNonNull(topics.get(topicName));
-                    final Map<String, String> topicConfig = 
internalTopicConfig.properties(defaultTopicConfigs, 
windowChangeLogAdditionalRetention);
+            }
+        }
+        log.debug("Completed validating internal topics and created {}", 
newTopics);
 
-                    log.debug("Going to create topic {} with {} partitions and 
config {}.",
-                        internalTopicConfig.name(),
-                        internalTopicConfig.numberOfPartitions(),
-                        topicConfig);
+        return newTopics;
+    }
+
+    private Set<NewTopic> computeTopicsToCreate(final Map<String, 
InternalTopicConfig> topics,
+                                                      final Set<String> 
topicsNotReady,
+                                                      final Set<String> 
newTopics) {
+        final Set<String> tempUnknownTopics = new HashSet<>();
+        final Set<String> validatedTopics = validateTopics(topicsNotReady, 
topics, tempUnknownTopics);
+
+        newTopics.addAll(validatedTopics);
 
-                    newTopics.add(
-                        new NewTopic(
+        final Set<NewTopic> topicsToCreate = new HashSet<>();
+
+        for (final String topicName : topicsNotReady) {
+            if (tempUnknownTopics.contains(topicName)) {
+                // for the tempUnknownTopics, don't create topic for them
+                // we'll check again later if remaining retries > 0
+                continue;
+            }
+            final InternalTopicConfig internalTopicConfig = 
Objects.requireNonNull(topics.get(topicName));
+            final Map<String, String> topicConfig = 
internalTopicConfig.properties(defaultTopicConfigs, 
windowChangeLogAdditionalRetention);
+
+            log.debug("Going to create topic {} with {} partitions and config 
{}.",
+                    internalTopicConfig.name(),
+                    internalTopicConfig.numberOfPartitions(),
+                    topicConfig);
+
+            topicsToCreate.add(
+                    new NewTopic(
                             internalTopicConfig.name(),
                             internalTopicConfig.numberOfPartitions(),
                             Optional.of(replicationFactor))
                             .configs(topicConfig));
-                }
+        }
+        return topicsToCreate;
+    }
 
-                // it's possible that although some topics are not ready yet 
because they
-                // are temporarily not available, not that they do not exist; 
in this case
-                // the new topics to create may be empty and hence we can skip 
here
-                if (!newTopics.isEmpty()) {
-                    final CreateTopicsResult createTopicsResult = 
adminClient.createTopics(newTopics);
+    private void readyTopics(final Set<NewTopic> topicsToCreate,
+                             final Set<String> topicsNotReady) {
+        final CreateTopicsResult createTopicsResult = 
adminClient.createTopics(topicsToCreate);
 
-                    for (final Map.Entry<String, KafkaFuture<Void>> 
createTopicResult : createTopicsResult.values().entrySet()) {
-                        final String topicName = createTopicResult.getKey();
-                        try {
-                            createTopicResult.getValue().get();
-                            topicsNotReady.remove(topicName);
-                        } catch (final InterruptedException fatalException) {
-                            // this should not happen; if it ever happens it 
indicate a bug
-                            Thread.currentThread().interrupt();
-                            log.error(INTERRUPTED_ERROR_MESSAGE, 
fatalException);
-                            throw new 
IllegalStateException(INTERRUPTED_ERROR_MESSAGE, fatalException);
-                        } catch (final ExecutionException executionException) {
-                            final Throwable cause = 
executionException.getCause();
-                            if (cause instanceof TopicExistsException) {
-                                // This topic didn't exist earlier or its 
leader not known before; just retain it for next round of validation.
-                                log.info(
-                                        "Could not create topic {}. Topic is 
probably marked for deletion (number of partitions is unknown).\n"
-                                                +
-                                                "Will retry to create this 
topic in {} ms (to let broker finish async delete operation first).\n"
-                                                +
-                                                "Error message was: {}", 
topicName, retryBackOffMs,
-                                        cause.toString());
-                            } else {
-                                log.error("Unexpected error during topic 
creation for {}.\n" +
-                                        "Error message was: {}", topicName, 
cause.toString());
-
-                                if (cause instanceof 
UnsupportedVersionException) {
-                                    final String errorMessage = 
cause.getMessage();
-                                    if (errorMessage != null &&
-                                            errorMessage.startsWith("Creating 
topics with default partitions/replication factor are only supported in 
CreateTopicRequest version 4+")) {
-
-                                        throw new 
StreamsException(String.format(
-                                                "Could not create topic %s, 
because brokers don't support configuration replication.factor=-1."
-                                                        + " You can change the 
replication.factor config or upgrade your brokers to version 2.4 or newer to 
avoid this error.",
-                                                topicName)
-                                        );
-                                    }
-                                } else if (cause instanceof TimeoutException) {
-                                    log.error("Creating topic {} timed out.\n" 
+
-                                            "Error message was: {}", 
topicName, cause.toString());
-                                } else {
-                                    throw new StreamsException(
-                                            String.format("Could not create 
topic %s.", topicName),
-                                            cause
-                                    );
-                                }
-                            }
-                        }
-                    }
-                }
-            }
+        for (final Map.Entry<String, KafkaFuture<Void>> createTopicResult : 
createTopicsResult.values().entrySet()) {
+            final String topicName = createTopicResult.getKey();
+            try {
+                createTopicResult.getValue().get();
+                topicsNotReady.remove(topicName);
+            } catch (final InterruptedException fatalException) {
+                // this should not happen; if it ever happens it indicate a bug
+                Thread.currentThread().interrupt();
+                log.error(INTERRUPTED_ERROR_MESSAGE, fatalException);
+                throw new IllegalStateException(INTERRUPTED_ERROR_MESSAGE, 
fatalException);
+            } catch (final ExecutionException executionException) {
+                final Throwable cause = executionException.getCause();
+                if (cause instanceof TopicExistsException) {
+                    // This topic didn't exist earlier or its leader not known 
before; just retain it for next round of validation.
+                    log.info(
+                            "Could not create topic {}. Topic is probably 
marked for deletion (number of partitions is unknown).\n"
+                                    +
+                                    "Will retry to create this topic in {} ms 
(to let broker finish async delete operation first).\n"
+                                    +
+                                    "Error message was: {}", topicName, 
retryBackOffMs,
+                            cause.toString());
+                } else {
+                    log.error("Unexpected error during topic creation for 
{}.\n" +
+                            "Error message was: {}", topicName, 
cause.toString());
 
-            if (!topicsNotReady.isEmpty()) {
-                currentWallClockMs = time.milliseconds();
+                    if (cause instanceof UnsupportedVersionException) {
+                        final String errorMessage = cause.getMessage();
+                        if (errorMessage != null &&
+                                errorMessage.startsWith("Creating topics with 
default partitions/replication factor are only supported in CreateTopicRequest 
version 4+")) {
 
-                if (currentWallClockMs >= deadlineMs) {
-                    final String timeoutError = String.format("Could not 
create topics within %d milliseconds. " +
-                        "This can happen if the Kafka cluster is temporarily 
not available.", retryTimeoutMs);
-                    log.error(timeoutError);
-                    throw new TimeoutException(timeoutError);
+                            throw new StreamsException(String.format(
+                                    "Could not create topic %s, because 
brokers don't support configuration replication.factor=-1."
+                                            + " You can change the 
replication.factor config or upgrade your brokers to version 2.4 or newer to 
avoid this error.",
+                                    topicName)
+                            );
+                        }
+                    } else if (cause instanceof TimeoutException) {
+                        log.error("Creating topic {} timed out.\n" +
+                                "Error message was: {}", topicName, 
cause.toString());
+                    } else {
+                        throw new StreamsException(
+                                String.format("Could not create topic %s.", 
topicName),
+                                cause
+                        );
+                    }
                 }
-                log.info(
-                    "Topics {} could not be made ready. Will retry in {} 
milliseconds. Remaining time in milliseconds: {}",
-                    topicsNotReady,
-                    retryBackOffMs,
-                    deadlineMs - currentWallClockMs
-                );
-                Utils.sleep(retryBackOffMs);

Review Comment:
   It seems this retryBackOffMs got lost? (Including the INFO log?)



##########
streams/src/main/java/org/apache/kafka/streams/processor/internals/InternalTopicManager.java:
##########
@@ -461,120 +466,119 @@ public Set<String> makeReady(final Map<String, 
InternalTopicConfig> topics) {
         // have existed with the expected number of partitions, or some create 
topic returns fatal errors.
         log.debug("Starting to validate internal topics {} in partition 
assignor.", topics);
 
-        long currentWallClockMs = time.milliseconds();
+        final long currentWallClockMs = time.milliseconds();
         final long deadlineMs = currentWallClockMs + retryTimeoutMs;
 
-        Set<String> topicsNotReady = new HashSet<>(topics.keySet());
-        final Set<String> newlyCreatedTopics = new HashSet<>();
+        final Set<String> topicsNotReady = new HashSet<>(topics.keySet());
+        final Set<String> newTopics = new HashSet<>();

Review Comment:
   Why do you change the name? To me, `newlyCreatedTopics` sounds like a very 
good name?



##########
streams/src/main/java/org/apache/kafka/streams/processor/internals/InternalTopicManager.java:
##########
@@ -461,120 +466,119 @@ public Set<String> makeReady(final Map<String, 
InternalTopicConfig> topics) {
         // have existed with the expected number of partitions, or some create 
topic returns fatal errors.
         log.debug("Starting to validate internal topics {} in partition 
assignor.", topics);
 
-        long currentWallClockMs = time.milliseconds();
+        final long currentWallClockMs = time.milliseconds();
         final long deadlineMs = currentWallClockMs + retryTimeoutMs;
 
-        Set<String> topicsNotReady = new HashSet<>(topics.keySet());
-        final Set<String> newlyCreatedTopics = new HashSet<>();
+        final Set<String> topicsNotReady = new HashSet<>(topics.keySet());
+        final Set<String> newTopics = new HashSet<>();
 
         while (!topicsNotReady.isEmpty()) {
-            final Set<String> tempUnknownTopics = new HashSet<>();
-            topicsNotReady = validateTopics(topicsNotReady, topics, 
tempUnknownTopics);
-            newlyCreatedTopics.addAll(topicsNotReady);
-
+            final Set<NewTopic> topicsToCreate = computeTopicsToCreate(topics, 
topicsNotReady, newTopics);
+            if (!topicsToCreate.isEmpty()) {
+                readyTopics(topicsToCreate, topicsNotReady);

Review Comment:
   Maybe rename `readyTopics(...)` to `createTopics(...)` ?



##########
streams/src/main/java/org/apache/kafka/streams/processor/internals/InternalTopicManager.java:
##########
@@ -461,120 +466,119 @@ public Set<String> makeReady(final Map<String, 
InternalTopicConfig> topics) {
         // have existed with the expected number of partitions, or some create 
topic returns fatal errors.
         log.debug("Starting to validate internal topics {} in partition 
assignor.", topics);
 
-        long currentWallClockMs = time.milliseconds();
+        final long currentWallClockMs = time.milliseconds();
         final long deadlineMs = currentWallClockMs + retryTimeoutMs;
 
-        Set<String> topicsNotReady = new HashSet<>(topics.keySet());
-        final Set<String> newlyCreatedTopics = new HashSet<>();
+        final Set<String> topicsNotReady = new HashSet<>(topics.keySet());
+        final Set<String> newTopics = new HashSet<>();
 
         while (!topicsNotReady.isEmpty()) {
-            final Set<String> tempUnknownTopics = new HashSet<>();
-            topicsNotReady = validateTopics(topicsNotReady, topics, 
tempUnknownTopics);
-            newlyCreatedTopics.addAll(topicsNotReady);
-
+            final Set<NewTopic> topicsToCreate = computeTopicsToCreate(topics, 
topicsNotReady, newTopics);
+            if (!topicsToCreate.isEmpty()) {
+                readyTopics(topicsToCreate, topicsNotReady);
+            }
             if (!topicsNotReady.isEmpty()) {
-                final Set<NewTopic> newTopics = new HashSet<>();
+                maybeThrowTimeout(new TimeoutContext(
+                    Collections.singleton("makeReadyCheck"), // dummy 
collection just to trigger if `topicsNotReady` is non-empty
+                    deadlineMs,
+                    "MakeReady timeout",
+                    String.format("Could not create topics within %d 
milliseconds. This can happen if the Kafka cluster is temporarily not 
available.", retryTimeoutMs),
+                    null
+                ));
 
-                for (final String topicName : topicsNotReady) {
-                    if (tempUnknownTopics.contains(topicName)) {
-                        // for the tempUnknownTopics, don't create topic for 
them
-                        // we'll check again later if remaining retries > 0
-                        continue;
-                    }
-                    final InternalTopicConfig internalTopicConfig = 
Objects.requireNonNull(topics.get(topicName));
-                    final Map<String, String> topicConfig = 
internalTopicConfig.properties(defaultTopicConfigs, 
windowChangeLogAdditionalRetention);
+            }
+        }
+        log.debug("Completed validating internal topics and created {}", 
newTopics);
 
-                    log.debug("Going to create topic {} with {} partitions and 
config {}.",
-                        internalTopicConfig.name(),
-                        internalTopicConfig.numberOfPartitions(),
-                        topicConfig);
+        return newTopics;
+    }
+
+    private Set<NewTopic> computeTopicsToCreate(final Map<String, 
InternalTopicConfig> topics,
+                                                      final Set<String> 
topicsNotReady,
+                                                      final Set<String> 
newTopics) {
+        final Set<String> tempUnknownTopics = new HashSet<>();
+        final Set<String> validatedTopics = validateTopics(topicsNotReady, 
topics, tempUnknownTopics);

Review Comment:
   Doesn't `validateTopics` return a set of topic it could _not_ validate, and 
still need to get created?



##########
streams/src/main/java/org/apache/kafka/streams/processor/internals/InternalTopicManager.java:
##########
@@ -461,120 +466,119 @@ public Set<String> makeReady(final Map<String, 
InternalTopicConfig> topics) {
         // have existed with the expected number of partitions, or some create 
topic returns fatal errors.
         log.debug("Starting to validate internal topics {} in partition 
assignor.", topics);
 
-        long currentWallClockMs = time.milliseconds();
+        final long currentWallClockMs = time.milliseconds();
         final long deadlineMs = currentWallClockMs + retryTimeoutMs;
 
-        Set<String> topicsNotReady = new HashSet<>(topics.keySet());
-        final Set<String> newlyCreatedTopics = new HashSet<>();
+        final Set<String> topicsNotReady = new HashSet<>(topics.keySet());
+        final Set<String> newTopics = new HashSet<>();
 
         while (!topicsNotReady.isEmpty()) {
-            final Set<String> tempUnknownTopics = new HashSet<>();
-            topicsNotReady = validateTopics(topicsNotReady, topics, 
tempUnknownTopics);
-            newlyCreatedTopics.addAll(topicsNotReady);
-
+            final Set<NewTopic> topicsToCreate = computeTopicsToCreate(topics, 
topicsNotReady, newTopics);
+            if (!topicsToCreate.isEmpty()) {
+                readyTopics(topicsToCreate, topicsNotReady);
+            }
             if (!topicsNotReady.isEmpty()) {
-                final Set<NewTopic> newTopics = new HashSet<>();
+                maybeThrowTimeout(new TimeoutContext(
+                    Collections.singleton("makeReadyCheck"), // dummy 
collection just to trigger if `topicsNotReady` is non-empty
+                    deadlineMs,
+                    "MakeReady timeout",
+                    String.format("Could not create topics within %d 
milliseconds. This can happen if the Kafka cluster is temporarily not 
available.", retryTimeoutMs),
+                    null
+                ));
 
-                for (final String topicName : topicsNotReady) {
-                    if (tempUnknownTopics.contains(topicName)) {
-                        // for the tempUnknownTopics, don't create topic for 
them
-                        // we'll check again later if remaining retries > 0
-                        continue;
-                    }
-                    final InternalTopicConfig internalTopicConfig = 
Objects.requireNonNull(topics.get(topicName));
-                    final Map<String, String> topicConfig = 
internalTopicConfig.properties(defaultTopicConfigs, 
windowChangeLogAdditionalRetention);
+            }
+        }
+        log.debug("Completed validating internal topics and created {}", 
newTopics);
 
-                    log.debug("Going to create topic {} with {} partitions and 
config {}.",
-                        internalTopicConfig.name(),
-                        internalTopicConfig.numberOfPartitions(),
-                        topicConfig);
+        return newTopics;
+    }
+
+    private Set<NewTopic> computeTopicsToCreate(final Map<String, 
InternalTopicConfig> topics,
+                                                      final Set<String> 
topicsNotReady,
+                                                      final Set<String> 
newTopics) {
+        final Set<String> tempUnknownTopics = new HashSet<>();
+        final Set<String> validatedTopics = validateTopics(topicsNotReady, 
topics, tempUnknownTopics);
+
+        newTopics.addAll(validatedTopics);

Review Comment:
   Cf comment above: I think this line would currently add topic with are not 
read to `newTopics` what seems incorrect?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to