k-apol commented on code in PR #20063: URL: https://github.com/apache/kafka/pull/20063#discussion_r2247844632
########## streams/src/main/java/org/apache/kafka/streams/processor/internals/InternalTopicManager.java: ########## @@ -461,120 +461,125 @@ public Set<String> makeReady(final Map<String, InternalTopicConfig> topics) { // have existed with the expected number of partitions, or some create topic returns fatal errors. log.debug("Starting to validate internal topics {} in partition assignor.", topics); - long currentWallClockMs = time.milliseconds(); + final long currentWallClockMs = time.milliseconds(); final long deadlineMs = currentWallClockMs + retryTimeoutMs; - Set<String> topicsNotReady = new HashSet<>(topics.keySet()); + final Set<String> topicsNotReady = new HashSet<>(topics.keySet()); final Set<String> newlyCreatedTopics = new HashSet<>(); while (!topicsNotReady.isEmpty()) { - final Set<String> tempUnknownTopics = new HashSet<>(); - topicsNotReady = validateTopics(topicsNotReady, topics, tempUnknownTopics); - newlyCreatedTopics.addAll(topicsNotReady); - + final Set<NewTopic> validatedTopicObjects = createValidatedTopicObjects(topics, topicsNotReady, newlyCreatedTopics); + if (!validatedTopicObjects.isEmpty()) { + setupValidatedTopics(validatedTopicObjects, topicsNotReady); + } if (!topicsNotReady.isEmpty()) { - final Set<NewTopic> newTopics = new HashSet<>(); + maybeThrowTimeoutExceptionDuringMakeReady(deadlineMs); + } + } + log.debug("Completed validating internal topics and created {}", newlyCreatedTopics); - for (final String topicName : topicsNotReady) { - if (tempUnknownTopics.contains(topicName)) { - // for the tempUnknownTopics, don't create topic for them - // we'll check again later if remaining retries > 0 - continue; - } - final InternalTopicConfig internalTopicConfig = Objects.requireNonNull(topics.get(topicName)); - final Map<String, String> topicConfig = internalTopicConfig.properties(defaultTopicConfigs, windowChangeLogAdditionalRetention); + return newlyCreatedTopics; + } + + private Set<NewTopic> createValidatedTopicObjects(final Map<String, InternalTopicConfig> topics, + Set<String> topicsNotReady, + final Set<String> newlyCreatedTopics) { + final Set<String> tempUnknownTopics = new HashSet<>(); + + topicsNotReady = validateTopics(topicsNotReady, topics, tempUnknownTopics); + newlyCreatedTopics.addAll(topicsNotReady); + + final Set<NewTopic> validatedTopicObjects = new HashSet<>(); + + for (final String topicName : topicsNotReady) { + if (tempUnknownTopics.contains(topicName)) { + // for the tempUnknownTopics, don't create topic for them + // we'll check again later if remaining retries > 0 + continue; + } + final InternalTopicConfig internalTopicConfig = Objects.requireNonNull(topics.get(topicName)); + final Map<String, String> topicConfig = internalTopicConfig.properties(defaultTopicConfigs, windowChangeLogAdditionalRetention); - log.debug("Going to create topic {} with {} partitions and config {}.", - internalTopicConfig.name(), - internalTopicConfig.numberOfPartitions(), - topicConfig); + log.debug("Going to create topic {} with {} partitions and config {}.", + internalTopicConfig.name(), + internalTopicConfig.numberOfPartitions(), + topicConfig); - newTopics.add( - new NewTopic( + validatedTopicObjects.add( + new NewTopic( internalTopicConfig.name(), internalTopicConfig.numberOfPartitions(), Optional.of(replicationFactor)) .configs(topicConfig)); - } + } + return validatedTopicObjects; + } - // it's possible that although some topics are not ready yet because they - // are temporarily not available, not that they do not exist; in this case - // the new topics to create may be empty and hence we can skip here - if (!newTopics.isEmpty()) { - final CreateTopicsResult createTopicsResult = adminClient.createTopics(newTopics); + private void setupValidatedTopics(final Set<NewTopic> validatedTopicObjects, + final Set<String> topicsNotReady) { + final CreateTopicsResult createTopicsResult = adminClient.createTopics(validatedTopicObjects); - for (final Map.Entry<String, KafkaFuture<Void>> createTopicResult : createTopicsResult.values().entrySet()) { - final String topicName = createTopicResult.getKey(); - try { - createTopicResult.getValue().get(); - topicsNotReady.remove(topicName); - } catch (final InterruptedException fatalException) { - // this should not happen; if it ever happens it indicate a bug - Thread.currentThread().interrupt(); - log.error(INTERRUPTED_ERROR_MESSAGE, fatalException); - throw new IllegalStateException(INTERRUPTED_ERROR_MESSAGE, fatalException); - } catch (final ExecutionException executionException) { - final Throwable cause = executionException.getCause(); - if (cause instanceof TopicExistsException) { - // This topic didn't exist earlier or its leader not known before; just retain it for next round of validation. - log.info( - "Could not create topic {}. Topic is probably marked for deletion (number of partitions is unknown).\n" - + - "Will retry to create this topic in {} ms (to let broker finish async delete operation first).\n" - + - "Error message was: {}", topicName, retryBackOffMs, - cause.toString()); - } else { - log.error("Unexpected error during topic creation for {}.\n" + - "Error message was: {}", topicName, cause.toString()); - - if (cause instanceof UnsupportedVersionException) { - final String errorMessage = cause.getMessage(); - if (errorMessage != null && - errorMessage.startsWith("Creating topics with default partitions/replication factor are only supported in CreateTopicRequest version 4+")) { - - throw new StreamsException(String.format( - "Could not create topic %s, because brokers don't support configuration replication.factor=-1." - + " You can change the replication.factor config or upgrade your brokers to version 2.4 or newer to avoid this error.", - topicName) - ); - } - } else if (cause instanceof TimeoutException) { - log.error("Creating topic {} timed out.\n" + - "Error message was: {}", topicName, cause.toString()); - } else { - throw new StreamsException( - String.format("Could not create topic %s.", topicName), - cause - ); - } - } + for (final Map.Entry<String, KafkaFuture<Void>> createTopicResult : createTopicsResult.values().entrySet()) { + final String topicName = createTopicResult.getKey(); + try { + createTopicResult.getValue().get(); + topicsNotReady.remove(topicName); + } catch (final InterruptedException fatalException) { + // this should not happen; if it ever happens it indicate a bug + Thread.currentThread().interrupt(); + log.error(INTERRUPTED_ERROR_MESSAGE, fatalException); + throw new IllegalStateException(INTERRUPTED_ERROR_MESSAGE, fatalException); + } catch (final ExecutionException executionException) { + final Throwable cause = executionException.getCause(); + if (cause instanceof TopicExistsException) { + // This topic didn't exist earlier or its leader not known before; just retain it for next round of validation. + log.info( + "Could not create topic {}. Topic is probably marked for deletion (number of partitions is unknown).\n" + + + "Will retry to create this topic in {} ms (to let broker finish async delete operation first).\n" + + + "Error message was: {}", topicName, retryBackOffMs, + cause.toString()); + } else { + log.error("Unexpected error during topic creation for {}.\n" + + "Error message was: {}", topicName, cause.toString()); + + if (cause instanceof UnsupportedVersionException) { + final String errorMessage = cause.getMessage(); + if (errorMessage != null && + errorMessage.startsWith("Creating topics with default partitions/replication factor are only supported in CreateTopicRequest version 4+")) { + + throw new StreamsException(String.format( + "Could not create topic %s, because brokers don't support configuration replication.factor=-1." + + " You can change the replication.factor config or upgrade your brokers to version 2.4 or newer to avoid this error.", + topicName) + ); } + } else if (cause instanceof TimeoutException) { + log.error("Creating topic {} timed out.\n" + + "Error message was: {}", topicName, cause.toString()); + } else { + throw new StreamsException( + String.format("Could not create topic %s.", topicName), + cause + ); } } } + } + } - if (!topicsNotReady.isEmpty()) { - currentWallClockMs = time.milliseconds(); + private void maybeThrowTimeoutExceptionDuringMakeReady(final long deadlineMs) { + final long currentWallClockMs = time.milliseconds(); + if (currentWallClockMs >= deadlineMs) { + final String timeoutError = String.format("Could not create topics within %d milliseconds. " + + "This can happen if the Kafka cluster is temporarily not available.", retryTimeoutMs); + log.error(timeoutError); - if (currentWallClockMs >= deadlineMs) { - final String timeoutError = String.format("Could not create topics within %d milliseconds. " + - "This can happen if the Kafka cluster is temporarily not available.", retryTimeoutMs); - log.error(timeoutError); - throw new TimeoutException(timeoutError); - } - log.info( Review Comment: I will add it back where its needed, see above comment on refactoring timeouts in this class -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org