guozhangwang commented on a change in pull request #8662:
URL: https://github.com/apache/kafka/pull/8662#discussion_r425343094



##########
File path: 
streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamsPartitionAssignor.java
##########
@@ -1520,38 +1534,6 @@ private static void validateActiveTaskEncoding(final 
List<TopicPartition> partit
         }
     }
 
-    /**
-     * Internal helper function that creates a Kafka topic
-     *
-     * @param topicPartitions Map that contains the topic names to be created 
with the number of partitions
-     */
-    private void prepareTopic(final Map<String, InternalTopicConfig> 
topicPartitions) {
-        log.debug("Starting to validate internal topics {} in partition 
assignor.", topicPartitions);
-
-        // first construct the topics to make ready
-        final Map<String, InternalTopicConfig> topicsToMakeReady = new 
HashMap<>();
-
-        for (final InternalTopicConfig topic : topicPartitions.values()) {
-            final Optional<Integer> numPartitions = topic.numberOfPartitions();
-            if (!numPartitions.isPresent()) {
-                throw new StreamsException(
-                    String.format("%sTopic [%s] number of partitions not 
defined",
-                                  logPrefix, topic.name())
-                );
-            }
-            if (!topic.hasEnforcedNumberOfPartitions()) {
-                topic.setNumberOfPartitions(numPartitions.get());
-            }
-            topicsToMakeReady.put(topic.name(), topic);
-        }

Review comment:
       The logic indeed seem redundant to me.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Reply via email to